3939pQuery = ql2_pb2 .Query .QueryType
4040
4141
42- @asyncio .coroutine
43- def _read_until (streamreader , delimiter ):
42+ # @asyncio.coroutine
43+ async def _read_until (streamreader , delimiter ):
4444 """Naive implementation of reading until a delimiter"""
4545 buffer = bytearray ()
4646
4747 while True :
48- c = yield from streamreader .read (1 )
48+ # c = yield from streamreader.read(1)
49+ c = await streamreader .read (1 )
4950 if c == b"" :
5051 break # EOF
5152 buffer .append (c [0 ])
@@ -69,13 +70,14 @@ def reusable_waiter(loop, timeout):
6970 else :
7071 deadline = None
7172
72- @asyncio .coroutine
73- def wait (future ):
73+ # @asyncio.coroutine
74+ async def wait (future ):
7475 if deadline is not None :
7576 new_timeout = max (deadline - loop .time (), 0 )
7677 else :
7778 new_timeout = None
78- return (yield from asyncio .wait_for (future , new_timeout ))
79+ # return (yield from asyncio.wait_for(future, new_timeout))
80+ return (await asyncio .wait_for (future , new_timeout ))
7981
8082 return wait
8183
@@ -101,20 +103,22 @@ def __init__(self, *args, **kwargs):
101103 def __aiter__ (self ):
102104 return self
103105
104- @asyncio .coroutine
105- def __anext__ (self ):
106+ # @asyncio.coroutine
107+ async def __anext__ (self ):
106108 try :
107- return (yield from self ._get_next (None ))
109+ # return (yield from self._get_next(None))
110+ return (await self ._get_next (None ))
108111 except ReqlCursorEmpty :
109112 raise StopAsyncIteration
110113
111- @asyncio .coroutine
112- def close (self ):
114+ # @asyncio.coroutine
115+ async def close (self ):
113116 if self .error is None :
114117 self .error = self ._empty_error ()
115118 if self .conn .is_open ():
116119 self .outstanding_requests += 1
117- yield from self .conn ._parent ._stop (self )
120+ # yield from self.conn._parent._stop(self)
121+ await self .conn ._parent ._stop (self )
118122
119123 def _extend (self , res_buf ):
120124 Cursor ._extend (self , res_buf )
@@ -123,16 +127,17 @@ def _extend(self, res_buf):
123127
124128 # Convenience function so users know when they've hit the end of the cursor
125129 # without having to catch an exception
126- @asyncio .coroutine
127- def fetch_next (self , wait = True ):
130+ # @asyncio.coroutine
131+ async def fetch_next (self , wait = True ):
128132 timeout = Cursor ._wait_to_timeout (wait )
129133 waiter = reusable_waiter (self .conn ._io_loop , timeout )
130134 while len (self .items ) == 0 and self .error is None :
131135 self ._maybe_fetch_batch ()
132136 if self .error is not None :
133137 raise self .error
134138 with translate_timeout_errors ():
135- yield from waiter (asyncio .shield (self .new_response ))
139+ # yield from waiter(asyncio.shield(self.new_response))
140+ await waiter (asyncio .shield (self .new_response ))
136141 # If there is a (non-empty) error to be received, we return True, so the
137142 # user will receive it on the next `next` call.
138143 return len (self .items ) != 0 or not isinstance (self .error , RqlCursorEmpty )
@@ -142,15 +147,16 @@ def _empty_error(self):
142147 # with mechanisms to return from a coroutine.
143148 return RqlCursorEmpty ()
144149
145- @asyncio .coroutine
146- def _get_next (self , timeout ):
150+ # @asyncio.coroutine
151+ async def _get_next (self , timeout ):
147152 waiter = reusable_waiter (self .conn ._io_loop , timeout )
148153 while len (self .items ) == 0 :
149154 self ._maybe_fetch_batch ()
150155 if self .error is not None :
151156 raise self .error
152157 with translate_timeout_errors ():
153- yield from waiter (asyncio .shield (self .new_response ))
158+ # yield from waiter(asyncio.shield(self.new_response))
159+ await waiter (asyncio .shield (self .new_response ))
154160 return self .items .popleft ()
155161
156162 def _maybe_fetch_batch (self ):
@@ -186,8 +192,8 @@ def client_address(self):
186192 if self .is_open ():
187193 return self ._streamwriter .get_extra_info ("sockname" )[0 ]
188194
189- @asyncio .coroutine
190- def connect (self , timeout ):
195+ # @asyncio.coroutine
196+ async def connect (self , timeout ):
191197 try :
192198 ssl_context = None
193199 if len (self ._parent .ssl ) > 0 :
@@ -199,7 +205,8 @@ def connect(self, timeout):
199205 ssl_context .check_hostname = True # redundant with match_hostname
200206 ssl_context .load_verify_locations (self ._parent .ssl ["ca_certs" ])
201207
202- self ._streamreader , self ._streamwriter = yield from asyncio .open_connection (
208+ # self._streamreader, self._streamwriter = yield from asyncio.open_connection(
209+ self ._streamreader , self ._streamwriter = await asyncio .open_connection (
203210 self ._parent .host ,
204211 self ._parent .port ,
205212 ssl = ssl_context ,
@@ -229,22 +236,26 @@ def connect(self, timeout):
229236 if request != "" :
230237 self ._streamwriter .write (request )
231238
232- response = yield from asyncio .wait_for (
239+ # response = yield from asyncio.wait_for(
240+ response = await asyncio .wait_for (
233241 _read_until (self ._streamreader , b"\0 " ),
234242 timeout ,
235243 )
236244 response = response [:- 1 ]
237245 except ReqlAuthError :
238- yield from self .close ()
246+ # yield from self.close()
247+ await self .close ()
239248 raise
240249 except ReqlTimeoutError as err :
241- yield from self .close ()
250+ # yield from self.close()
251+ await self .close ()
242252 raise ReqlDriverError (
243253 "Connection interrupted during handshake with %s:%s. Error: %s"
244254 % (self ._parent .host , self ._parent .port , str (err ))
245255 )
246256 except Exception as err :
247- yield from self .close ()
257+ # yield from self.close()
258+ await self .close ()
248259 raise ReqlDriverError (
249260 "Could not connect to %s:%s. Error: %s"
250261 % (self ._parent .host , self ._parent .port , str (err ))
@@ -258,8 +269,8 @@ def connect(self, timeout):
258269 def is_open (self ):
259270 return not (self ._closing or self ._streamreader .at_eof ())
260271
261- @asyncio .coroutine
262- def close (self , noreply_wait = False , token = None , exception = None ):
272+ # @asyncio.coroutine
273+ async def close (self , noreply_wait = False , token = None , exception = None ):
263274 self ._closing = True
264275 if exception is not None :
265276 err_message = "Connection is closed (%s)." % str (exception )
@@ -279,38 +290,43 @@ def close(self, noreply_wait=False, token=None, exception=None):
279290
280291 if noreply_wait :
281292 noreply = Query (pQuery .NOREPLY_WAIT , token , None , None )
282- yield from self .run_query (noreply , False )
293+ # yield from self.run_query(noreply, False)
294+ await self .run_query (noreply , False )
283295
284296 self ._streamwriter .close ()
285297 # We must not wait for the _reader_task if we got an exception, because that
286298 # means that we were called from it. Waiting would lead to a deadlock.
287299 if self ._reader_task and exception is None :
288- yield from self ._reader_task
300+ # yield from self._reader_task
301+ await self ._reader_task
289302
290303 return None
291304
292- @asyncio .coroutine
293- def run_query (self , query , noreply ):
305+ # @asyncio.coroutine
306+ async def run_query (self , query , noreply ):
294307 self ._streamwriter .write (query .serialize (self ._parent ._get_json_encoder (query )))
295308 if noreply :
296309 return None
297310
298311 response_future = asyncio .Future ()
299312 self ._user_queries [query .token ] = (query , response_future )
300- return (yield from response_future )
313+ # return (yield from response_future)
314+ return (await response_future )
301315
302316 # The _reader coroutine runs in parallel, reading responses
303317 # off of the socket and forwarding them to the appropriate Future or Cursor.
304318 # This is shut down as a consequence of closing the stream, or an error in the
305319 # socket/protocol from the server. Unexpected errors in this coroutine will
306320 # close the ConnectionInstance and be passed to any open Futures or Cursors.
307- @asyncio .coroutine
308- def _reader (self ):
321+ # @asyncio.coroutine
322+ async def _reader (self ):
309323 try :
310324 while True :
311- buf = yield from self ._streamreader .readexactly (12 )
325+ # buf = yield from self._streamreader.readexactly(12)
326+ buf = await self ._streamreader .readexactly (12 )
312327 (token , length ,) = struct .unpack ("<qL" , buf )
313- buf = yield from self ._streamreader .readexactly (length )
328+ # buf = yield from self._streamreader.readexactly(length)
329+ buf = await self ._streamreader .readexactly (length )
314330
315331 cursor = self ._cursor_cache .get (token )
316332 if cursor is not None :
@@ -339,7 +355,8 @@ def _reader(self):
339355 raise ReqlDriverError ("Unexpected response received." )
340356 except Exception as ex :
341357 if not self ._closing :
342- yield from self .close (exception = ex )
358+ # yield from self.close(exception=ex)
359+ await self .close (exception = ex )
343360
344361
345362class Connection (ConnectionBase ):
@@ -352,30 +369,35 @@ def __init__(self, *args, **kwargs):
352369 "Could not convert port %s to an integer." % self .port
353370 )
354371
355- @asyncio .coroutine
356- def __aenter__ (self ):
372+ # @asyncio.coroutine
373+ async def __aenter__ (self ):
357374 return self
358375
359- @asyncio .coroutine
360- def __aexit__ (self , exception_type , exception_val , traceback ):
361- yield from self .close (False )
376+ # @asyncio.coroutine
377+ async def __aexit__ (self , exception_type , exception_val , traceback ):
378+ # yield from self.close(False)
379+ await self .close (False )
362380
363- @asyncio .coroutine
364- def _stop (self , cursor ):
381+ # @asyncio.coroutine
382+ async def _stop (self , cursor ):
365383 self .check_open ()
366384 q = Query (pQuery .STOP , cursor .query .token , None , None )
367- return (yield from self ._instance .run_query (q , True ))
385+ # return (yield from self._instance.run_query(q, True))
386+ return (await self ._instance .run_query (q , True ))
368387
369- @asyncio .coroutine
370- def reconnect (self , noreply_wait = True , timeout = None ):
388+ # @asyncio.coroutine
389+ async def reconnect (self , noreply_wait = True , timeout = None ):
371390 # We close before reconnect so reconnect doesn't try to close us
372391 # and then fail to return the Future (this is a little awkward).
373- yield from self .close (noreply_wait )
392+ # yield from self.close(noreply_wait)
393+ await self .close (noreply_wait )
374394 self ._instance = self ._conn_type (self , ** self ._child_kwargs )
375- return (yield from self ._instance .connect (timeout ))
395+ # return (yield from self._instance.connect(timeout))
396+ return (await self ._instance .connect (timeout ))
376397
377- @asyncio .coroutine
378- def close (self , noreply_wait = True ):
398+ # @asyncio.coroutine
399+ async def close (self , noreply_wait = True ):
379400 if self ._instance is None :
380401 return None
381- return (yield from ConnectionBase .close (self , noreply_wait = noreply_wait ))
402+ # return (yield from ConnectionBase.close(self, noreply_wait=noreply_wait))
403+ return (await ConnectionBase .close (self , noreply_wait = noreply_wait ))
0 commit comments