3939pQuery = ql2_pb2 .Query .QueryType
4040
4141
42- # @asyncio.coroutine
4342async def _read_until (streamreader , delimiter ):
4443 """Naive implementation of reading until a delimiter"""
4544 buffer = bytearray ()
4645
4746 while True :
48- # c = yield from streamreader.read(1)
4947 c = await streamreader .read (1 )
5048 if c == b"" :
5149 break # EOF
@@ -70,13 +68,11 @@ def reusable_waiter(loop, timeout):
7068 else :
7169 deadline = None
7270
73- # @asyncio.coroutine
7471 async def wait (future ):
7572 if deadline is not None :
7673 new_timeout = max (deadline - loop .time (), 0 )
7774 else :
7875 new_timeout = None
79- # return (yield from asyncio.wait_for(future, new_timeout))
8076 return (await asyncio .wait_for (future , new_timeout ))
8177
8278 return wait
@@ -103,21 +99,17 @@ def __init__(self, *args, **kwargs):
10399 def __aiter__ (self ):
104100 return self
105101
106- # @asyncio.coroutine
107102 async def __anext__ (self ):
108103 try :
109- # return (yield from self._get_next(None))
110104 return (await self ._get_next (None ))
111105 except ReqlCursorEmpty :
112106 raise StopAsyncIteration
113107
114- # @asyncio.coroutine
115108 async def close (self ):
116109 if self .error is None :
117110 self .error = self ._empty_error ()
118111 if self .conn .is_open ():
119112 self .outstanding_requests += 1
120- # yield from self.conn._parent._stop(self)
121113 await self .conn ._parent ._stop (self )
122114
123115 def _extend (self , res_buf ):
@@ -127,7 +119,6 @@ def _extend(self, res_buf):
127119
128120 # Convenience function so users know when they've hit the end of the cursor
129121 # without having to catch an exception
130- # @asyncio.coroutine
131122 async def fetch_next (self , wait = True ):
132123 timeout = Cursor ._wait_to_timeout (wait )
133124 waiter = reusable_waiter (self .conn ._io_loop , timeout )
@@ -136,7 +127,6 @@ async def fetch_next(self, wait=True):
136127 if self .error is not None :
137128 raise self .error
138129 with translate_timeout_errors ():
139- # yield from waiter(asyncio.shield(self.new_response))
140130 await waiter (asyncio .shield (self .new_response ))
141131 # If there is a (non-empty) error to be received, we return True, so the
142132 # user will receive it on the next `next` call.
@@ -147,15 +137,13 @@ def _empty_error(self):
147137 # with mechanisms to return from a coroutine.
148138 return RqlCursorEmpty ()
149139
150- # @asyncio.coroutine
151140 async def _get_next (self , timeout ):
152141 waiter = reusable_waiter (self .conn ._io_loop , timeout )
153142 while len (self .items ) == 0 :
154143 self ._maybe_fetch_batch ()
155144 if self .error is not None :
156145 raise self .error
157146 with translate_timeout_errors ():
158- # yield from waiter(asyncio.shield(self.new_response))
159147 await waiter (asyncio .shield (self .new_response ))
160148 return self .items .popleft ()
161149
@@ -192,7 +180,6 @@ def client_address(self):
192180 if self .is_open ():
193181 return self ._streamwriter .get_extra_info ("sockname" )[0 ]
194182
195- # @asyncio.coroutine
196183 async def connect (self , timeout ):
197184 try :
198185 ssl_context = None
@@ -205,7 +192,6 @@ async def connect(self, timeout):
205192 ssl_context .check_hostname = True # redundant with match_hostname
206193 ssl_context .load_verify_locations (self ._parent .ssl ["ca_certs" ])
207194
208- # self._streamreader, self._streamwriter = yield from asyncio.open_connection(
209195 self ._streamreader , self ._streamwriter = await asyncio .open_connection (
210196 self ._parent .host ,
211197 self ._parent .port ,
@@ -236,25 +222,21 @@ async def connect(self, timeout):
236222 if request != "" :
237223 self ._streamwriter .write (request )
238224
239- # response = yield from asyncio.wait_for(
240225 response = await asyncio .wait_for (
241226 _read_until (self ._streamreader , b"\0 " ),
242227 timeout ,
243228 )
244229 response = response [:- 1 ]
245230 except ReqlAuthError :
246- # yield from self.close()
247231 await self .close ()
248232 raise
249233 except ReqlTimeoutError as err :
250- # yield from self.close()
251234 await self .close ()
252235 raise ReqlDriverError (
253236 "Connection interrupted during handshake with %s:%s. Error: %s"
254237 % (self ._parent .host , self ._parent .port , str (err ))
255238 )
256239 except Exception as err :
257- # yield from self.close()
258240 await self .close ()
259241 raise ReqlDriverError (
260242 "Could not connect to %s:%s. Error: %s"
@@ -269,7 +251,6 @@ async def connect(self, timeout):
269251 def is_open (self ):
270252 return not (self ._closing or self ._streamreader .at_eof ())
271253
272- # @asyncio.coroutine
273254 async def close (self , noreply_wait = False , token = None , exception = None ):
274255 self ._closing = True
275256 if exception is not None :
@@ -290,42 +271,35 @@ async def close(self, noreply_wait=False, token=None, exception=None):
290271
291272 if noreply_wait :
292273 noreply = Query (pQuery .NOREPLY_WAIT , token , None , None )
293- # yield from self.run_query(noreply, False)
294274 await self .run_query (noreply , False )
295275
296276 self ._streamwriter .close ()
297277 # We must not wait for the _reader_task if we got an exception, because that
298278 # means that we were called from it. Waiting would lead to a deadlock.
299279 if self ._reader_task and exception is None :
300- # yield from self._reader_task
301280 await self ._reader_task
302281
303282 return None
304283
305- # @asyncio.coroutine
306284 async def run_query (self , query , noreply ):
307285 self ._streamwriter .write (query .serialize (self ._parent ._get_json_encoder (query )))
308286 if noreply :
309287 return None
310288
311289 response_future = asyncio .Future ()
312290 self ._user_queries [query .token ] = (query , response_future )
313- # return (yield from response_future)
314291 return (await response_future )
315292
316293 # The _reader coroutine runs in parallel, reading responses
317294 # off of the socket and forwarding them to the appropriate Future or Cursor.
318295 # This is shut down as a consequence of closing the stream, or an error in the
319296 # socket/protocol from the server. Unexpected errors in this coroutine will
320297 # close the ConnectionInstance and be passed to any open Futures or Cursors.
321- # @asyncio.coroutine
322298 async def _reader (self ):
323299 try :
324300 while True :
325- # buf = yield from self._streamreader.readexactly(12)
326301 buf = await self ._streamreader .readexactly (12 )
327302 (token , length ,) = struct .unpack ("<qL" , buf )
328- # buf = yield from self._streamreader.readexactly(length)
329303 buf = await self ._streamreader .readexactly (length )
330304
331305 cursor = self ._cursor_cache .get (token )
@@ -355,7 +329,6 @@ async def _reader(self):
355329 raise ReqlDriverError ("Unexpected response received." )
356330 except Exception as ex :
357331 if not self ._closing :
358- # yield from self.close(exception=ex)
359332 await self .close (exception = ex )
360333
361334
@@ -369,35 +342,25 @@ def __init__(self, *args, **kwargs):
369342 "Could not convert port %s to an integer." % self .port
370343 )
371344
372- # @asyncio.coroutine
373345 async def __aenter__ (self ):
374346 return self
375347
376- # @asyncio.coroutine
377348 async def __aexit__ (self , exception_type , exception_val , traceback ):
378- # yield from self.close(False)
379349 await self .close (False )
380350
381- # @asyncio.coroutine
382351 async def _stop (self , cursor ):
383352 self .check_open ()
384353 q = Query (pQuery .STOP , cursor .query .token , None , None )
385- # return (yield from self._instance.run_query(q, True))
386354 return (await self ._instance .run_query (q , True ))
387355
388- # @asyncio.coroutine
389356 async def reconnect (self , noreply_wait = True , timeout = None ):
390357 # We close before reconnect so reconnect doesn't try to close us
391358 # and then fail to return the Future (this is a little awkward).
392- # yield from self.close(noreply_wait)
393359 await self .close (noreply_wait )
394360 self ._instance = self ._conn_type (self , ** self ._child_kwargs )
395- # return (yield from self._instance.connect(timeout))
396361 return (await self ._instance .connect (timeout ))
397362
398- # @asyncio.coroutine
399363 async def close (self , noreply_wait = True ):
400364 if self ._instance is None :
401365 return None
402- # return (yield from ConnectionBase.close(self, noreply_wait=noreply_wait))
403366 return (await ConnectionBase .close (self , noreply_wait = noreply_wait ))
0 commit comments