3939pQuery = ql2_pb2 .Query .QueryType
4040
4141
42- @asyncio .coroutine
43- def _read_until (streamreader , delimiter ):
42+ async def _read_until (streamreader , delimiter ):
4443 """Naive implementation of reading until a delimiter"""
4544 buffer = bytearray ()
4645
4746 while True :
48- c = yield from streamreader .read (1 )
47+ c = await streamreader .read (1 )
4948 if c == b"" :
5049 break # EOF
5150 buffer .append (c [0 ])
@@ -69,12 +68,12 @@ def reusable_waiter(loop, timeout):
6968 else :
7069 deadline = None
7170
72- def wait (future ):
71+ async def wait (future ):
7372 if deadline is not None :
7473 new_timeout = max (deadline - loop .time (), 0 )
7574 else :
7675 new_timeout = None
77- return (yield from asyncio .wait_for (future , new_timeout , loop = loop ))
76+ return (await asyncio .wait_for (future , new_timeout ))
7877
7978 return wait
8079
@@ -100,20 +99,18 @@ def __init__(self, *args, **kwargs):
10099 def __aiter__ (self ):
101100 return self
102101
103- @asyncio .coroutine
104- def __anext__ (self ):
102+ async def __anext__ (self ):
105103 try :
106- return (yield from self ._get_next (None ))
104+ return (await self ._get_next (None ))
107105 except ReqlCursorEmpty :
108106 raise StopAsyncIteration
109107
110- @asyncio .coroutine
111- def close (self ):
108+ async def close (self ):
112109 if self .error is None :
113110 self .error = self ._empty_error ()
114111 if self .conn .is_open ():
115112 self .outstanding_requests += 1
116- yield from self .conn ._parent ._stop (self )
113+ await self .conn ._parent ._stop (self )
117114
118115 def _extend (self , res_buf ):
119116 Cursor ._extend (self , res_buf )
@@ -122,16 +119,15 @@ def _extend(self, res_buf):
122119
123120 # Convenience function so users know when they've hit the end of the cursor
124121 # without having to catch an exception
125- @asyncio .coroutine
126- def fetch_next (self , wait = True ):
122+ async def fetch_next (self , wait = True ):
127123 timeout = Cursor ._wait_to_timeout (wait )
128124 waiter = reusable_waiter (self .conn ._io_loop , timeout )
129125 while len (self .items ) == 0 and self .error is None :
130126 self ._maybe_fetch_batch ()
131127 if self .error is not None :
132128 raise self .error
133129 with translate_timeout_errors ():
134- yield from waiter (asyncio .shield (self .new_response ))
130+ await waiter (asyncio .shield (self .new_response ))
135131 # If there is a (non-empty) error to be received, we return True, so the
136132 # user will receive it on the next `next` call.
137133 return len (self .items ) != 0 or not isinstance (self .error , RqlCursorEmpty )
@@ -141,15 +137,14 @@ def _empty_error(self):
141137 # with mechanisms to return from a coroutine.
142138 return RqlCursorEmpty ()
143139
144- @asyncio .coroutine
145- def _get_next (self , timeout ):
140+ async def _get_next (self , timeout ):
146141 waiter = reusable_waiter (self .conn ._io_loop , timeout )
147142 while len (self .items ) == 0 :
148143 self ._maybe_fetch_batch ()
149144 if self .error is not None :
150145 raise self .error
151146 with translate_timeout_errors ():
152- yield from waiter (asyncio .shield (self .new_response ))
147+ await waiter (asyncio .shield (self .new_response ))
153148 return self .items .popleft ()
154149
155150 def _maybe_fetch_batch (self ):
@@ -185,8 +180,7 @@ def client_address(self):
185180 if self .is_open ():
186181 return self ._streamwriter .get_extra_info ("sockname" )[0 ]
187182
188- @asyncio .coroutine
189- def connect (self , timeout ):
183+ async def connect (self , timeout ):
190184 try :
191185 ssl_context = None
192186 if len (self ._parent .ssl ) > 0 :
@@ -198,10 +192,9 @@ def connect(self, timeout):
198192 ssl_context .check_hostname = True # redundant with match_hostname
199193 ssl_context .load_verify_locations (self ._parent .ssl ["ca_certs" ])
200194
201- self ._streamreader , self ._streamwriter = yield from asyncio .open_connection (
195+ self ._streamreader , self ._streamwriter = await asyncio .open_connection (
202196 self ._parent .host ,
203197 self ._parent .port ,
204- loop = self ._io_loop ,
205198 ssl = ssl_context ,
206199 )
207200 self ._streamwriter .get_extra_info ("socket" ).setsockopt (
@@ -226,26 +219,25 @@ def connect(self, timeout):
226219 break
227220 # This may happen in the `V1_0` protocol where we send two requests as
228221 # an optimization, then need to read each separately
229- if request is not "" :
222+ if request != "" :
230223 self ._streamwriter .write (request )
231224
232- response = yield from asyncio .wait_for (
225+ response = await asyncio .wait_for (
233226 _read_until (self ._streamreader , b"\0 " ),
234227 timeout ,
235- loop = self ._io_loop ,
236228 )
237229 response = response [:- 1 ]
238230 except ReqlAuthError :
239- yield from self .close ()
231+ await self .close ()
240232 raise
241233 except ReqlTimeoutError as err :
242- yield from self .close ()
234+ await self .close ()
243235 raise ReqlDriverError (
244236 "Connection interrupted during handshake with %s:%s. Error: %s"
245237 % (self ._parent .host , self ._parent .port , str (err ))
246238 )
247239 except Exception as err :
248- yield from self .close ()
240+ await self .close ()
249241 raise ReqlDriverError (
250242 "Could not connect to %s:%s. Error: %s"
251243 % (self ._parent .host , self ._parent .port , str (err ))
@@ -259,8 +251,7 @@ def connect(self, timeout):
259251 def is_open (self ):
260252 return not (self ._closing or self ._streamreader .at_eof ())
261253
262- @asyncio .coroutine
263- def close (self , noreply_wait = False , token = None , exception = None ):
254+ async def close (self , noreply_wait = False , token = None , exception = None ):
264255 self ._closing = True
265256 if exception is not None :
266257 err_message = "Connection is closed (%s)." % str (exception )
@@ -280,39 +271,37 @@ def close(self, noreply_wait=False, token=None, exception=None):
280271
281272 if noreply_wait :
282273 noreply = Query (pQuery .NOREPLY_WAIT , token , None , None )
283- yield from self .run_query (noreply , False )
274+ await self .run_query (noreply , False )
284275
285276 self ._streamwriter .close ()
286277 await self ._streamwriter .wait_closed ()
287278 # We must not wait for the _reader_task if we got an exception, because that
288279 # means that we were called from it. Waiting would lead to a deadlock.
289280 if self ._reader_task and exception is None :
290- yield from self ._reader_task
281+ await self ._reader_task
291282
292283 return None
293284
294- @asyncio .coroutine
295- def run_query (self , query , noreply ):
285+ async def run_query (self , query , noreply ):
296286 self ._streamwriter .write (query .serialize (self ._parent ._get_json_encoder (query )))
297287 if noreply :
298288 return None
299289
300290 response_future = asyncio .Future ()
301291 self ._user_queries [query .token ] = (query , response_future )
302- return (yield from response_future )
292+ return (await response_future )
303293
304294 # The _reader coroutine runs in parallel, reading responses
305295 # off of the socket and forwarding them to the appropriate Future or Cursor.
306296 # This is shut down as a consequence of closing the stream, or an error in the
307297 # socket/protocol from the server. Unexpected errors in this coroutine will
308298 # close the ConnectionInstance and be passed to any open Futures or Cursors.
309- @asyncio .coroutine
310- def _reader (self ):
299+ async def _reader (self ):
311300 try :
312301 while True :
313- buf = yield from self ._streamreader .readexactly (12 )
302+ buf = await self ._streamreader .readexactly (12 )
314303 (token , length ,) = struct .unpack ("<qL" , buf )
315- buf = yield from self ._streamreader .readexactly (length )
304+ buf = await self ._streamreader .readexactly (length )
316305
317306 cursor = self ._cursor_cache .get (token )
318307 if cursor is not None :
@@ -341,7 +330,7 @@ def _reader(self):
341330 raise ReqlDriverError ("Unexpected response received." )
342331 except Exception as ex :
343332 if not self ._closing :
344- yield from self .close (exception = ex )
333+ await self .close (exception = ex )
345334
346335
347336class Connection (ConnectionBase ):
@@ -354,30 +343,25 @@ def __init__(self, *args, **kwargs):
354343 "Could not convert port %s to an integer." % self .port
355344 )
356345
357- @asyncio .coroutine
358- def __aenter__ (self ):
346+ async def __aenter__ (self ):
359347 return self
360348
361- @asyncio .coroutine
362- def __aexit__ (self , exception_type , exception_val , traceback ):
363- yield from self .close (False )
349+ async def __aexit__ (self , exception_type , exception_val , traceback ):
350+ await self .close (False )
364351
365- @asyncio .coroutine
366- def _stop (self , cursor ):
352+ async def _stop (self , cursor ):
367353 self .check_open ()
368354 q = Query (pQuery .STOP , cursor .query .token , None , None )
369- return (yield from self ._instance .run_query (q , True ))
355+ return (await self ._instance .run_query (q , True ))
370356
371- @asyncio .coroutine
372- def reconnect (self , noreply_wait = True , timeout = None ):
357+ async def reconnect (self , noreply_wait = True , timeout = None ):
373358 # We close before reconnect so reconnect doesn't try to close us
374359 # and then fail to return the Future (this is a little awkward).
375- yield from self .close (noreply_wait )
360+ await self .close (noreply_wait )
376361 self ._instance = self ._conn_type (self , ** self ._child_kwargs )
377- return (yield from self ._instance .connect (timeout ))
362+ return (await self ._instance .connect (timeout ))
378363
379- @asyncio .coroutine
380- def close (self , noreply_wait = True ):
364+ async def close (self , noreply_wait = True ):
381365 if self ._instance is None :
382366 return None
383- return (yield from ConnectionBase .close (self , noreply_wait = noreply_wait ))
367+ return (await ConnectionBase .close (self , noreply_wait = noreply_wait ))
0 commit comments