diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 4a91574..edead40 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -8,7 +8,7 @@ * except according to those terms. */ -use std::collections::{btree_map::Entry, BTreeMap}; +use std::collections::BTreeMap; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; @@ -139,15 +139,10 @@ impl PubsubConnectionInner { messages.pop(), messages.pop(), ) { - (Some(msg), Some(topic), Some(message_type), None) => { - match (msg, String::from_resp(topic), message_type) { - (msg, Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg), - _ => return Err(error::unexpected("Incorrect format of a PUBSUB message")), - } - } - (Some(msg), Some(_), Some(topic), Some(message_type)) => { - match (msg, String::from_resp(topic), message_type) { - (msg, Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg), + (Some(msg), Some(topic), Some(message_type), None) + | (Some(msg), Some(_), Some(topic), Some(message_type)) => { + match (String::from_resp(topic), message_type) { + (Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg), _ => return Err(error::unexpected("Incorrect format of a PUBSUB message")), } } @@ -165,96 +160,16 @@ impl PubsubConnectionInner { }; match message_type.as_slice() { - b"subscribe" => match self.pending_subs.remove(&topic) { - Some((sender, signal)) => { - self.subscriptions.insert(topic, sender); - signal - .send(()) - .map_err(|()| error::internal("Error confirming subscription"))? - } - None => { - return Err(error::internal(format!( - "Received unexpected subscribe notification for topic: {}", - topic - ))); - } - }, - b"psubscribe" => match self.pending_psubs.remove(&topic) { - Some((sender, signal)) => { - self.psubscriptions.insert(topic, sender); - signal - .send(()) - .map_err(|()| error::internal("Error confirming subscription"))? - } - None => { - return Err(error::internal(format!( - "Received unexpected subscribe notification for topic: {}", - topic - ))); - } - }, - b"unsubscribe" => { - match self.subscriptions.entry(topic) { - Entry::Occupied(entry) => { - entry.remove_entry(); - } - Entry::Vacant(vacant) => { - return Err(error::internal(format!( - "Unexpected unsubscribe message: {}", - vacant.key() - ))); - } - } - if self.subscriptions.is_empty() { - return Ok(false); - } + b"subscribe" => { + process_subscribe(&mut self.pending_subs, &mut self.subscriptions, topic) } - b"punsubscribe" => { - match self.psubscriptions.entry(topic) { - Entry::Occupied(entry) => { - entry.remove_entry(); - } - Entry::Vacant(vacant) => { - return Err(error::internal(format!( - "Unexpected unsubscribe message: {}", - vacant.key() - ))); - } - } - if self.psubscriptions.is_empty() { - return Ok(false); - } + b"psubscribe" => { + process_subscribe(&mut self.pending_psubs, &mut self.psubscriptions, topic) } - b"message" => match self.subscriptions.get(&topic) { - Some(sender) => { - if let Err(error) = sender.unbounded_send(Ok(msg)) { - if !error.is_disconnected() { - return Err(error::internal(format!("Cannot send message: {}", error))); - } - } - } - None => { - return Err(error::internal(format!( - "Unexpected message on topic: {}", - topic - ))); - } - }, - b"pmessage" => match self.psubscriptions.get(&topic) { - Some(sender) => { - if let Err(error) = sender.unbounded_send(Ok(msg)) { - if !error.is_disconnected() { - return Err(error::internal(format!("Cannot send message: {}", error))); - } - } - } - None => { - return Err(error::internal(format!( - "Unexpected message on topic: {}", - topic - ))); - } - }, + b"unsubscribe" => process_unsubscribe(&mut self.subscriptions, &topic), + b"punsubscribe" => process_unsubscribe(&mut self.psubscriptions, &topic), + b"message" => process_message(&self.subscriptions, &topic, msg), + b"pmessage" => process_message(&self.psubscriptions, &topic, msg), t => { return Err(error::internal(format!( "Unexpected data on Pub/Sub connection: {}", @@ -262,8 +177,6 @@ impl PubsubConnectionInner { ))); } } - - Ok(true) } /// Returns true, if there are still valid subscriptions at the end, or false if not, i.e. the whole thing can be dropped. @@ -276,18 +189,14 @@ impl PubsubConnectionInner { return Ok(false); } else { // This can only happen if the connection is closed server-side - for sub in self.subscriptions.values() { + let subs = self.subscriptions.values(); + let psubs = self.psubscriptions.values(); + for sub in subs.chain(psubs) { sub.unbounded_send(Err(error::Error::Connection( ConnectionReason::NotConnected, ))) .unwrap(); } - for psub in self.psubscriptions.values() { - psub.unbounded_send(Err(error::Error::Connection( - ConnectionReason::NotConnected, - ))) - .unwrap(); - } return Err(error::Error::Connection(ConnectionReason::NotConnected)); } } @@ -298,20 +207,15 @@ impl PubsubConnectionInner { } } Poll::Ready(Some(Err(e))) => { - for sub in self.subscriptions.values() { + let subs = self.subscriptions.values(); + let psubs = self.psubscriptions.values(); + for sub in subs.chain(psubs) { sub.unbounded_send(Err(error::unexpected(format!( "Connection is in the process of failing due to: {}", e )))) .unwrap(); } - for psub in self.psubscriptions.values() { - psub.unbounded_send(Err(error::unexpected(format!( - "Connection is in the process of failing due to: {}", - e - )))) - .unwrap(); - } return Err(e); } } @@ -319,6 +223,50 @@ impl PubsubConnectionInner { } } +fn process_subscribe( + pending_subs: &mut BTreeMap)>, + subscriptions: &mut BTreeMap, + topic: String, +) -> Result { + let (sender, signal) = pending_subs.remove(&topic).ok_or(error::internal(format!( + "Received unexpected subscribe notification for topic: {}", + topic + )))?; + subscriptions.insert(topic, sender); + signal + .send(()) + .map_err(|()| error::internal("Error confirming subscription"))?; + Ok(true) +} + +fn process_unsubscribe( + subscriptions: &mut BTreeMap, + topic: &str, +) -> Result { + subscriptions.remove(topic).ok_or(error::internal(format!( + "Unexpected unsubscribe message: {}", + topic + )))?; + Ok(!subscriptions.is_empty()) +} + +fn process_message( + subscriptions: &BTreeMap, + topic: &str, + msg: resp::RespValue, +) -> Result { + let sender = subscriptions.get(topic).ok_or(error::internal(format!( + "Unexpected message on topic: {}", + topic + )))?; + match sender.unbounded_send(Ok(msg)) { + Err(error) if !error.is_disconnected() => { + Err(error::internal(format!("Cannot send message: {}", error))) + } + _ => Ok(true), + } +} + impl Future for PubsubConnectionInner { type Output = Result<(), error::Error>; diff --git a/src/resp.rs b/src/resp.rs index e646fcc..e70fb83 100644 --- a/src/resp.rs +++ b/src/resp.rs @@ -85,7 +85,7 @@ impl RespValue { } } -/// A trait to be implemented for every time which can be read from a RESP value. +/// A trait to be implemented for every type which can be read from a RESP value. /// /// Implementing this trait on a type means that type becomes a valid return type for calls such as `send` on /// `client::PairedConnection` @@ -207,22 +207,23 @@ impl FromResp fo fn from_resp_int(resp: RespValue) -> Result, Error> { match resp { RespValue::Array(ary) => { - let mut map = HashMap::with_capacity_and_hasher(ary.len(), S::default()); - let mut items = ary.into_iter(); - - while let Some(k) = items.next() { - let key = K::from_resp(k)?; - let value = T::from_resp(items.next().ok_or_else(|| { - error::resp( - "Cannot convert an odd number of elements into a hashmap", - "".into(), - ) - })?)?; - - map.insert(key, value); + let len = ary.len(); + if len % 2 != 0 { + return Err(error::resp( + "Cannot convert an odd number of elements into a hashmap", + RespValue::Array(ary), + )); } - Ok(map) + let mut items = ary.into_iter(); + (0..(len / 2)) + .map(|_| { + // It's safe to unwrap, because we checked the length before + let key = K::from_resp(items.next().unwrap())?; + let value = T::from_resp(items.next().unwrap())?; + Ok((key, value)) + }) + .collect() } _ => Err(error::resp("Cannot be converted into a hashmap", resp)), } @@ -365,52 +366,23 @@ pub trait IntoRespString { } macro_rules! string_into_resp { - ($t:ty) => { + ($(<$l:lifetime>)?|$i:ident : $t:ty| $e:expr) => { + impl $(<$l>)? IntoRespString for $t { + fn into_resp_string(self) -> RespValue { + let $i = self; + RespValue::BulkString($e) + } + } into_resp!($t, into_resp_string); - }; -} - -impl IntoRespString for String { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.into_bytes()) } } -string_into_resp!(String); -impl<'a> IntoRespString for &'a String { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.as_bytes().into()) - } -} -string_into_resp!(&'a String); - -impl<'a> IntoRespString for &'a str { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.as_bytes().into()) - } -} -string_into_resp!(&'a str); - -impl<'a> IntoRespString for &'a [u8] { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.to_vec()) - } -} -string_into_resp!(&'a [u8]); - -impl IntoRespString for Vec { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self) - } -} -string_into_resp!(Vec); - -impl IntoRespString for Arc { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.as_bytes().into()) - } -} -string_into_resp!(Arc); +string_into_resp!(|it: String| it.into_bytes()); +string_into_resp!(<'a>|it: &'a String| it.as_bytes().into()); +string_into_resp!(<'a>|it: &'a str| it.as_bytes().into()); +string_into_resp!(<'a>|it: &'a [u8]| it.to_vec()); +string_into_resp!(|it: Vec| it); +string_into_resp!(|it: Arc| it.as_bytes().into()); pub trait IntoRespInteger { fn into_resp_integer(self) -> RespValue;