1- use std:: time:: Duration ;
1+ use std:: { marker :: PhantomData , time:: Duration } ;
22
33use bson:: { Bson , Document , Timestamp } ;
4+ use serde:: de:: DeserializeOwned ;
45
56use super :: { action_impl, deeplink, option_setters, ExplicitSession , ImplicitSession } ;
67use crate :: {
@@ -96,11 +97,11 @@ where
9697 /// Change streams require either a "majority" read concern or no read concern. Anything else
9798 /// will cause a server error.
9899 ///
99- /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document >>>`] or
100- /// d[`Result<SessionChangeStream<ChangeStreamEvent<Document >>>`] if a
100+ /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<T >>>`] or
101+ /// d[`Result<SessionChangeStream<ChangeStreamEvent<T >>>`] if a
101102 /// [`ClientSession`] has been provided.
102103 #[ deeplink]
103- pub fn watch ( & self ) -> Watch {
104+ pub fn watch ( & self ) -> Watch < T > {
104105 Watch :: new ( self . client ( ) , self . namespace ( ) . into ( ) )
105106 }
106107}
@@ -153,24 +154,25 @@ where
153154 ///
154155 /// Change streams require either a "majority" read concern or no read concern. Anything else
155156 /// will cause a server error.
156- pub fn watch ( & self ) -> Watch {
157+ pub fn watch ( & self ) -> Watch < T > {
157158 self . async_collection . watch ( )
158159 }
159160}
160161
161162/// Starts a new [`ChangeStream`] that receives events for all changes in a given scope. Create by
162163/// calling [`Client::watch`], [`Database::watch`], or [`Collection::watch`].
163164#[ must_use]
164- pub struct Watch < ' a , S = ImplicitSession > {
165+ pub struct Watch < ' a , T = Document , S = ImplicitSession > {
165166 client : & ' a Client ,
166167 target : AggregateTarget ,
167168 pipeline : Vec < Document > ,
168169 options : Option < ChangeStreamOptions > ,
169170 session : S ,
170171 cluster : bool ,
172+ phantom : PhantomData < fn ( ) -> T > ,
171173}
172174
173- impl < ' a > Watch < ' a , ImplicitSession > {
175+ impl < ' a , T > Watch < ' a , T , ImplicitSession > {
174176 fn new ( client : & ' a Client , target : AggregateTarget ) -> Self {
175177 Self {
176178 client,
@@ -179,6 +181,7 @@ impl<'a> Watch<'a, ImplicitSession> {
179181 options : None ,
180182 session : ImplicitSession ,
181183 cluster : false ,
184+ phantom : PhantomData ,
182185 }
183186 }
184187
@@ -190,6 +193,7 @@ impl<'a> Watch<'a, ImplicitSession> {
190193 options : None ,
191194 session : ImplicitSession ,
192195 cluster : true ,
196+ phantom : PhantomData ,
193197 }
194198 }
195199}
@@ -235,28 +239,29 @@ impl<'a, S> Watch<'a, S> {
235239 ) ;
236240}
237241
238- impl < ' a > Watch < ' a , ImplicitSession > {
242+ impl < ' a , T > Watch < ' a , T , ImplicitSession > {
239243 /// Use the provided ['ClientSession'].
240244 pub fn session < ' s > (
241245 self ,
242246 session : impl Into < & ' s mut ClientSession > ,
243- ) -> Watch < ' a , ExplicitSession < ' s > > {
247+ ) -> Watch < ' a , T , ExplicitSession < ' s > > {
244248 Watch {
245249 client : self . client ,
246250 target : self . target ,
247251 pipeline : self . pipeline ,
248252 options : self . options ,
249253 session : ExplicitSession ( session. into ( ) ) ,
250254 cluster : self . cluster ,
255+ phantom : PhantomData ,
251256 }
252257 }
253258}
254259
255- #[ action_impl( sync = crate :: sync:: ChangeStream <ChangeStreamEvent <Document >>) ]
256- impl < ' a > Action for Watch < ' a , ImplicitSession > {
260+ #[ action_impl( sync = crate :: sync:: ChangeStream <ChangeStreamEvent <T >>) ]
261+ impl < ' a , T : DeserializeOwned + Unpin + Send + Sync > Action for Watch < ' a , T , ImplicitSession > {
257262 type Future = WatchFuture ;
258263
259- async fn execute ( mut self ) -> Result < ChangeStream < ChangeStreamEvent < Document > > > {
264+ async fn execute ( mut self ) -> Result < ChangeStream < ChangeStreamEvent < T > > > {
260265 resolve_options ! (
261266 self . client,
262267 self . options,
@@ -273,11 +278,11 @@ impl<'a> Action for Watch<'a, ImplicitSession> {
273278 }
274279}
275280
276- #[ action_impl( sync = crate :: sync:: SessionChangeStream <ChangeStreamEvent <Document >>) ]
277- impl < ' a > Action for Watch < ' a , ExplicitSession < ' a > > {
281+ #[ action_impl( sync = crate :: sync:: SessionChangeStream <ChangeStreamEvent <T >>) ]
282+ impl < ' a , T : DeserializeOwned + Unpin + Send + Sync > Action for Watch < ' a , T , ExplicitSession < ' a > > {
278283 type Future = WatchSessionFuture ;
279284
280- async fn execute ( mut self ) -> Result < SessionChangeStream < ChangeStreamEvent < Document > > > {
285+ async fn execute ( mut self ) -> Result < SessionChangeStream < ChangeStreamEvent < T > > > {
281286 resolve_read_concern_with_session ! ( self . client, self . options, Some ( & mut * self . session. 0 ) ) ?;
282287 resolve_selection_criteria_with_session ! (
283288 self . client,
0 commit comments