@@ -721,6 +721,7 @@ cdef class AsyncThinConnImpl(BaseThinConnImpl):
721721 PipelineOpImpl op_impl = result_impl.operation
722722 uint8_t op_type = op_impl.op_type
723723 AsyncThinCursorImpl cursor_impl
724+ BindVar bind_var
724725
725726 # all operations other than commit make use of a cursor
726727 if op_type == PIPELINE_OP_TYPE_COMMIT:
@@ -732,23 +733,27 @@ cdef class AsyncThinConnImpl(BaseThinConnImpl):
732733
733734 # resend the message if that is required (for operations that fetch
734735 # LOBS, for example)
735- cursor_impl = message_with_data.cursor_impl
736+ cursor_impl = < AsyncThinCursorImpl > message_with_data.cursor_impl
736737 if message.resend:
737738 await protocol._process_message(message)
738- await message.postprocess_async()
739- if op_type in (
740- PIPELINE_OP_TYPE_FETCH_ONE,
741- PIPELINE_OP_TYPE_FETCH_MANY,
742- PIPELINE_OP_TYPE_FETCH_ALL,
743- ):
744- result_impl.rows = []
745- while cursor_impl._buffer_rowcount > 0 :
746- result_impl.rows.append(cursor_impl._create_row())
739+ await message.postprocess_async()
740+ if op_impl.op_type == PIPELINE_OP_TYPE_CALL_FUNC:
741+ bind_var = < BindVar> cursor_impl.bind_vars[0 ]
742+ result_impl.return_value = bind_var.var_impl.get_value(0 )
743+ elif op_type in (
744+ PIPELINE_OP_TYPE_FETCH_ONE,
745+ PIPELINE_OP_TYPE_FETCH_MANY,
746+ PIPELINE_OP_TYPE_FETCH_ALL,
747+ ):
748+ result_impl.rows = []
749+ while cursor_impl._buffer_rowcount > 0 :
750+ result_impl.rows.append(cursor_impl._create_row())
747751 result_impl.fetch_metadata = cursor_impl.fetch_metadata
748752
749753 # for fetchall(), perform as many round trips as are required to
750754 # complete the fetch
751- if op_type == PIPELINE_OP_TYPE_FETCH_ALL:
755+ if op_type == PIPELINE_OP_TYPE_FETCH_ALL \
756+ and cursor_impl._more_rows_to_fetch:
752757 fetch_message = cursor_impl._create_message(
753758 FetchMessage, message_with_data.cursor
754759 )
@@ -980,55 +985,6 @@ cdef class AsyncThinConnImpl(BaseThinConnImpl):
980985 messages.append(message)
981986 return messages
982987
983- cdef int _populate_pipeline_op_result(self , Message message) except - 1 :
984- """
985- Populates the pipeline operation result object.
986- """
987- cdef:
988- MessageWithData message_with_data
989- AsyncThinCursorImpl cursor_impl
990- PipelineOpResultImpl result_impl
991- PipelineOpImpl op_impl
992- BindVar bind_var
993- result_impl = message.pipeline_result_impl
994- op_impl = result_impl.operation
995- if op_impl.op_type == PIPELINE_OP_TYPE_COMMIT:
996- return 0
997- message_with_data = < MessageWithData> message
998- cursor_impl = < AsyncThinCursorImpl> message_with_data.cursor_impl
999- if op_impl.op_type == PIPELINE_OP_TYPE_CALL_FUNC:
1000- bind_var = < BindVar> cursor_impl.bind_vars[0 ]
1001- result_impl.return_value = bind_var.var_impl.get_value(0 )
1002- elif op_impl.op_type in (
1003- PIPELINE_OP_TYPE_FETCH_ONE,
1004- PIPELINE_OP_TYPE_FETCH_MANY,
1005- PIPELINE_OP_TYPE_FETCH_ALL,
1006- ):
1007- result_impl.rows = []
1008- while cursor_impl._buffer_rowcount > 0 :
1009- result_impl.rows.append(cursor_impl._create_row())
1010-
1011- cdef int _populate_pipeline_op_results(
1012- self , list messages, bint continue_on_error
1013- ) except - 1 :
1014- """
1015- Populates the pipeline operation result objects associated with the
1016- messages that were processed on the database.
1017- """
1018- cdef:
1019- PipelineOpResultImpl result_impl
1020- Message message
1021- for message in messages:
1022- result_impl = message.pipeline_result_impl
1023- if result_impl.error is not None or message.resend:
1024- continue
1025- try :
1026- self ._populate_pipeline_op_result(message)
1027- except Exception as e:
1028- if not continue_on_error:
1029- raise
1030- result_impl._capture_err(e)
1031-
1032988 async def _run_pipeline_op_without_pipelining(
1033989 self , object conn, PipelineOpResultImpl result_impl
1034990 ):
@@ -1262,7 +1218,6 @@ cdef class AsyncThinConnImpl(BaseThinConnImpl):
12621218 self .pipeline_mode = TNS_PIPELINE_MODE_ABORT_ON_ERROR
12631219 self ._send_messages_for_pipeline(messages, continue_on_error)
12641220 await protocol.end_pipeline(self , messages, continue_on_error)
1265- self ._populate_pipeline_op_results(messages, continue_on_error)
12661221 await self ._complete_pipeline_ops(messages, continue_on_error)
12671222
12681223 async def run_pipeline_without_pipelining(
0 commit comments