@@ -215,29 +215,21 @@ impl InputPort {
215215 pub fn pull_partial_data ( & self , limit : & DataBlockLimit ) -> Option < Result < DataBlock > > {
216216 unsafe {
217217 UpdateTrigger :: update_input ( & self . update_trigger ) ;
218+
218219 // retrieve data without changing flags to prevent concurrent modifications
219- match self . shared . swap ( std:: ptr:: null_mut ( ) , 0 , 0 ) {
220- address if address. is_null ( ) => None ,
221- address => {
222- let data_block = ( * Box :: from_raw ( address) ) . 0 ;
223- if let Ok ( data_block) = & data_block {
224- let total_rows = data_block. num_rows ( ) ;
225- let total_bytes = data_block. memory_size ( ) ;
226- let rows_to_take = limit. rows_to_take ( total_rows, total_bytes) ;
227- // dbg!(rows_to_take, total_rows);
228- // check if we need to split based on either rows or bytes limit
229- if rows_to_take < total_rows {
230- let ( need, remain) = data_block. split_at ( rows_to_take) ;
231- let remain_data = Box :: into_raw ( Box :: new ( SharedData ( Ok ( remain) ) ) ) ;
232- // put back the remainder without changing flags
233- // this keeps HAS_DATA set so we will have next pull to retrieve it
234- self . shared . swap ( remain_data, 0 , 0 ) ;
235- return Some ( Ok ( need) ) ;
236- }
237- }
238- // take entire datablock and unset the flags to signal ready for next loop
220+ let address = self . shared . swap ( std:: ptr:: null_mut ( ) , 0 , 0 ) ;
221+ if address. is_null ( ) {
222+ self . shared . set_flags ( 0 , HAS_DATA | NEED_DATA ) ;
223+ return None ;
224+ }
225+
226+ let shared_data = Box :: from_raw ( address) ;
227+ match shared_data. 0 {
228+ Ok ( data_block) => self . handle_data_block_with_limit ( data_block, limit) ,
229+ Err ( e) => {
230+ // for error cases, unset flags and return the error
239231 self . shared . set_flags ( 0 , HAS_DATA | NEED_DATA ) ;
240- Some ( data_block )
232+ Some ( Err ( e ) )
241233 }
242234 }
243235 }
@@ -256,6 +248,40 @@ impl InputPort {
256248 pub unsafe fn set_trigger ( & self , update_trigger : * mut UpdateTrigger ) {
257249 self . update_trigger . set_value ( update_trigger)
258250 }
251+
252+ /// Process datablock with size limit, potentially splitting it
253+ fn handle_data_block_with_limit (
254+ & self ,
255+ data_block : DataBlock ,
256+ limit : & DataBlockLimit ,
257+ ) -> Option < Result < DataBlock > > {
258+ let total_rows = data_block. num_rows ( ) ;
259+
260+ // empty datablock - consume entirely
261+ if total_rows == 0 {
262+ self . shared . set_flags ( 0 , HAS_DATA | NEED_DATA ) ;
263+ return Some ( Ok ( data_block) ) ;
264+ }
265+
266+ let total_bytes = data_block. memory_size ( ) ;
267+ let rows_to_take = limit. rows_to_take ( total_rows, total_bytes) ;
268+
269+ // no splitting needed - consume entire datablock
270+ if rows_to_take >= total_rows {
271+ self . shared . set_flags ( 0 , HAS_DATA | NEED_DATA ) ;
272+ return Some ( Ok ( data_block) ) ;
273+ }
274+
275+ // split datablock and store remainder
276+ let ( taken, remainder) = data_block. split_at ( rows_to_take) ;
277+ let remainder_ptr = Box :: into_raw ( Box :: new ( SharedData ( Ok ( remainder) ) ) ) ;
278+
279+ // put back the remainder without changing flags
280+ // this keeps HAS_DATA set so we will have next pull to retrieve it
281+ self . shared . swap ( remainder_ptr, 0 , 0 ) ;
282+
283+ Some ( Ok ( taken) )
284+ }
259285}
260286
261287pub struct OutputPort {
0 commit comments