@@ -36,12 +36,12 @@ var _ mutationPlanNode = &deleteNode{}
3636
3737// deleteRun contains the run-time state of deleteNode during local execution.
3838type deleteRun struct {
39- td tableDeleter
40- rowsNeeded bool
39+ mutationOutputHelper
40+ td tableDeleter
4141
42- // done informs a new call to BatchedNext() that the previous call
43- // to BatchedNext() has completed the work already .
44- done bool
42+ // rowsNeeded is set to true if the mutation operator needs to return the rows
43+ // that were affected by the mutation .
44+ rowsNeeded bool
4545
4646 // resultRowBuffer is used to prepare a result row for accumulation
4747 // into the row container above, when rowsNeeded is set.
@@ -76,7 +76,7 @@ func (r *deleteRun) init(params runParams, columns colinfo.ResultColumns) {
7676 return
7777 }
7878
79- r .td . rows = rowcontainer .NewRowContainer (
79+ r .rows = rowcontainer .NewRowContainer (
8080 params .p .Mon ().MakeBoundAccount (),
8181 colinfo .ColTypeInfoFromResCols (columns ),
8282 )
@@ -92,31 +92,34 @@ func (d *deleteNode) startExec(params runParams) error {
9292
9393 d .run .init (params , d .columns )
9494
95- return d .run .td .init (params .ctx , params .p .txn , params .EvalContext ())
96- }
95+ if err := d .run .td .init (params .ctx , params .p .txn , params .EvalContext ()); err != nil {
96+ return err
97+ }
9798
98- // Next is required because batchedPlanNode inherits from planNode, but
99- // batchedPlanNode doesn't really provide it. See the explanatory comments
100- // in plan_batch.go.
101- func (d * deleteNode ) Next (params runParams ) (bool , error ) { panic ("not valid" ) }
99+ // Run the mutation to completion.
100+ for {
101+ lastBatch , err := d .processBatch (params )
102+ if err != nil || lastBatch {
103+ return err
104+ }
105+ }
106+ }
102107
103- // Values is required because batchedPlanNode inherits from planNode, but
104- // batchedPlanNode doesn't really provide it. See the explanatory comments
105- // in plan_batch.go.
106- func ( d * deleteNode ) Values () tree. Datums { panic ( "not valid" ) }
108+ // Next implements the planNode interface.
109+ func ( d * deleteNode ) Next ( _ runParams ) ( bool , error ) {
110+ return d . run . next (), nil
111+ }
107112
108- // BatchedNext implements the batchedPlanNode interface.
109- func (d * deleteNode ) BatchedNext (params runParams ) (bool , error ) {
110- if d .run .done {
111- return false , nil
112- }
113+ // Values implements the planNode interface.
114+ func (d * deleteNode ) Values () tree.Datums {
115+ return d .run .values ()
116+ }
113117
114- // Advance one batch. First, clear the last batch.
115- d .run .td .clearLastBatch (params .ctx )
116- // Now consume/accumulate the rows for this batch.
117- lastBatch := false
118+ func (d * deleteNode ) processBatch (params runParams ) (lastBatch bool , err error ) {
119+ // Consume/accumulate the rows for this batch.
120+ lastBatch = false
118121 for {
119- if err : = params .p .cancelChecker .Check (); err != nil {
122+ if err = params .p .cancelChecker .Check (); err != nil {
120123 return false , err
121124 }
122125
@@ -131,7 +134,7 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
131134
132135 // Process the deletion of the current input row,
133136 // potentially accumulating the result row for later.
134- if err : = d .run .processSourceRow (params , d .input .Values ()); err != nil {
137+ if err = d .run .processSourceRow (params , d .input .Values ()); err != nil {
135138 return false , err
136139 }
137140
@@ -146,24 +149,21 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
146149 if ! lastBatch {
147150 // We only run/commit the batch if there were some rows processed
148151 // in this batch.
149- if err : = d .run .td .flushAndStartNewBatch (params .ctx ); err != nil {
152+ if err = d .run .td .flushAndStartNewBatch (params .ctx ); err != nil {
150153 return false , err
151154 }
152155 }
153156 }
154157
155158 if lastBatch {
156159 d .run .td .setRowsWrittenLimit (params .extendedEvalCtx .SessionData ())
157- if err : = d .run .td .finalize (params .ctx ); err != nil {
160+ if err = d .run .td .finalize (params .ctx ); err != nil {
158161 return false , err
159162 }
160- // Remember we're done for the next call to BatchedNext().
161- d .run .done = true
162163 // Possibly initiate a run of CREATE STATISTICS.
163- params .ExecCfg ().StatsRefresher .NotifyMutation (d .run .td .tableDesc (), int (d .run .td . rowsWritten ))
164+ params .ExecCfg ().StatsRefresher .NotifyMutation (d .run .td .tableDesc (), int (d .run .rowsAffected () ))
164165 }
165-
166- return d .run .td .lastBatchSize > 0 , nil
166+ return lastBatch , nil
167167}
168168
169169// processSourceRow processes one row from the source for deletion and, if
@@ -199,64 +199,60 @@ func (r *deleteRun) processSourceRow(params runParams, sourceVals tree.Datums) e
199199 ); err != nil {
200200 return err
201201 }
202+ r .onModifiedRow ()
203+ if ! r .rowsNeeded {
204+ return nil
205+ }
202206
203- // If result rows need to be accumulated, do it.
204- if r .td .rows != nil {
205- // The new values can include all columns, so the values may contain
206- // additional columns for every newly dropped column not visible. We do not
207- // want them to be available for RETURNING.
208- //
209- // r.rows.NumCols() is guaranteed to only contain the requested
210- // public columns.
211- largestRetIdx := - 1
212- for i := range r .rowIdxToRetIdx {
213- retIdx := r .rowIdxToRetIdx [i ]
214- if retIdx >= 0 {
215- if retIdx >= largestRetIdx {
216- largestRetIdx = retIdx
217- }
218- r .resultRowBuffer [retIdx ] = deleteVals [i ]
207+ // Result rows must be accumulated.
208+ //
209+ // The new values can include all columns, so the values may contain
210+ // additional columns for every newly dropped column not visible. We do not
211+ // want them to be available for RETURNING.
212+ //
213+ // r.rows.NumCols() is guaranteed to only contain the requested
214+ // public columns.
215+ largestRetIdx := - 1
216+ for i := range r .rowIdxToRetIdx {
217+ retIdx := r .rowIdxToRetIdx [i ]
218+ if retIdx >= 0 {
219+ if retIdx >= largestRetIdx {
220+ largestRetIdx = retIdx
219221 }
222+ r .resultRowBuffer [retIdx ] = deleteVals [i ]
220223 }
224+ }
221225
222- // At this point we've extracted all the RETURNING values that are part
223- // of the target table. We must now extract the columns in the RETURNING
224- // clause that refer to other tables (from the USING clause of the delete).
225- if r .numPassthrough > 0 {
226- passthroughBegin := len (r .td .rd .FetchCols )
227- passthroughEnd := passthroughBegin + r .numPassthrough
228- passthroughValues := deleteVals [passthroughBegin :passthroughEnd ]
229-
230- for i := 0 ; i < r .numPassthrough ; i ++ {
231- largestRetIdx ++
232- r .resultRowBuffer [largestRetIdx ] = passthroughValues [i ]
233- }
234-
226+ // At this point we've extracted all the RETURNING values that are part
227+ // of the target table. We must now extract the columns in the RETURNING
228+ // clause that refer to other tables (from the USING clause of the delete).
229+ if r .numPassthrough > 0 {
230+ passthroughBegin := len (r .td .rd .FetchCols )
231+ passthroughEnd := passthroughBegin + r .numPassthrough
232+ passthroughValues := deleteVals [passthroughBegin :passthroughEnd ]
233+
234+ for i := 0 ; i < r .numPassthrough ; i ++ {
235+ largestRetIdx ++
236+ r .resultRowBuffer [largestRetIdx ] = passthroughValues [i ]
235237 }
236238
237- if _ , err := r .td .rows .AddRow (params .ctx , r .resultRowBuffer ); err != nil {
238- return err
239- }
240239 }
241-
242- return nil
240+ return r .addRow (params .ctx , r .resultRowBuffer )
243241}
244242
245- // BatchedCount implements the batchedPlanNode interface.
246- func (d * deleteNode ) BatchedCount () int { return d .run .td .lastBatchSize }
247-
248- // BatchedValues implements the batchedPlanNode interface.
249- func (d * deleteNode ) BatchedValues (rowIdx int ) tree.Datums { return d .run .td .rows .At (rowIdx ) }
250-
251243func (d * deleteNode ) Close (ctx context.Context ) {
252244 d .input .Close (ctx )
253- d .run .td . close (ctx )
245+ d .run .close (ctx )
254246 * d = deleteNode {}
255247 deleteNodePool .Put (d )
256248}
257249
258250func (d * deleteNode ) rowsWritten () int64 {
259- return d .run .td .rowsWritten
251+ return d .run .rowsAffected ()
252+ }
253+
254+ func (d * deleteNode ) returnsRowsAffected () bool {
255+ return ! d .run .rowsNeeded
260256}
261257
262258func (d * deleteNode ) enableAutoCommit () {
0 commit comments