Skip to content

Commit 7f174af

Browse files
committed
distsql: harden infrastructure in some edge cases
As part of looking into a nil pointer crash when rows produced by the EXPORT were used as an input to the mutation (which is fixed in the following commit), I noticed some problems with the distsql infrastructure. In particular, in f397730 we didn't properly implement `Close` method for some processors (two EXPORT ones and the column backfiller) which would mean possibly incomplete cleanup when the processor never runs. Additionally, the root cause of the panic was `nil` txn that previously silently could've been set at the very end of the flow setup if only at that point we decided that we need to create a leaf txn, yet LeafTxnInputState was nil. That condition is expected in some cases (when the flow is not running under a txn, like some bulk flows), yet for EXPORT which runs under a txn, this was unexpected. This commit adds an assertion for this. Furthermore, if we hit an error at this point, we need to perform cleanup differently from earlier error paths since we've already fully set up the flow. This commit also adds that. Release note: None
1 parent 1345a57 commit 7f174af

File tree

5 files changed

+41
-20
lines changed

5 files changed

+41
-20
lines changed

pkg/sql/backfill/backfill.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,12 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
280280

281281
// Close frees the resources used by the ColumnBackfiller.
282282
func (cb *ColumnBackfiller) Close(ctx context.Context) {
283-
cb.fetcher.Close(ctx)
284283
if cb.mon != nil {
284+
// fetcher is only initialized when mon has been set. If mon is nil,
285+
// then Close has already been called.
286+
cb.fetcher.Close(ctx)
285287
cb.mon.Stop(ctx)
288+
cb.mon = nil
286289
}
287290
}
288291

pkg/sql/distsql/server.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -200,26 +200,29 @@ func (ds *ServerImpl) setupFlow(
200200
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
201201
var monitor, diskMonitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
202202
var onFlowCleanupEnd func(context.Context) // will be called at the very end of Flow.Cleanup()
203+
var cleanupPerformed bool
203204
// Make sure that we clean up all resources (which in the happy case are
204205
// cleaned up in Flow.Cleanup()) if an error is encountered.
205206
defer func() {
206207
if retErr != nil {
207-
if monitor != nil {
208-
monitor.Stop(ctx)
209-
}
210-
if diskMonitor != nil {
211-
diskMonitor.Stop(ctx)
212-
}
213-
if onFlowCleanupEnd != nil {
214-
onFlowCleanupEnd(ctx)
215-
} else {
216-
reserved.Close(ctx)
217-
onFlowCleanup.Do()
218-
}
219-
// We finish the span after performing other cleanup in case that
220-
// cleanup accesses the context with the span.
221-
if sp != nil {
222-
sp.Finish()
208+
if !cleanupPerformed {
209+
if monitor != nil {
210+
monitor.Stop(ctx)
211+
}
212+
if diskMonitor != nil {
213+
diskMonitor.Stop(ctx)
214+
}
215+
if onFlowCleanupEnd != nil {
216+
onFlowCleanupEnd(ctx)
217+
} else {
218+
reserved.Close(ctx)
219+
onFlowCleanup.Do()
220+
}
221+
// We finish the span after performing other cleanup in case that
222+
// cleanup accesses the context with the span.
223+
if sp != nil {
224+
sp.Finish()
225+
}
223226
}
224227
retCtx = tracing.ContextWithSpan(ctx, nil)
225228
}
@@ -267,6 +270,9 @@ func (ds *ServerImpl) setupFlow(
267270
makeLeaf := func(ctx context.Context) (*kv.Txn, error) {
268271
tis := req.LeafTxnInputState
269272
if tis == nil {
273+
if localState.Txn != nil {
274+
return nil, errors.AssertionFailedf("nil LeafTxnInputState when trying to create the LeafTxn")
275+
}
270276
// This must be a flow running for some bulk-io operation that doesn't use
271277
// a txn.
272278
return nil, nil
@@ -447,6 +453,12 @@ func (ds *ServerImpl) setupFlow(
447453
if leafTxn == nil {
448454
leafTxn, err = makeLeaf(ctx)
449455
if err != nil {
456+
// Given that we've already fully set up the flow, we must do
457+
// the full cleanup. This supersedes the cleanup done in the
458+
// defer at the beginning of the method, so we mark
459+
// cleanupPerformed accordingly.
460+
f.Cleanup(ctx)
461+
cleanupPerformed = true
450462
return nil, nil, nil, err
451463
}
452464
}

pkg/sql/export/exportcsv.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,4 +306,6 @@ func (sp *csvWriter) Resume(output execinfra.RowReceiver) {
306306
}
307307

308308
// Close is part of the execinfra.Processor interface.
309-
func (*csvWriter) Close(context.Context) {}
309+
func (sp *csvWriter) Close(context.Context) {
310+
sp.input.ConsumerClosed()
311+
}

pkg/sql/export/exportparquet.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,9 @@ func (sp *parquetWriterProcessor) Resume(output execinfra.RowReceiver) {
247247
}
248248

249249
// Close is part of the execinfra.Processor interface.
250-
func (*parquetWriterProcessor) Close(context.Context) {}
250+
func (sp *parquetWriterProcessor) Close(context.Context) {
251+
sp.input.ConsumerClosed()
252+
}
251253

252254
// Resume is part of the execinfra.Processor interface.
253255
func (sp *parquetWriterProcessor) testingKnobsOrNil() *TestingKnobs {

pkg/sql/rowexec/columnbackfiller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ func (*columnBackfiller) Resume(output execinfra.RowReceiver) {
107107
}
108108

109109
// Close is part of the execinfra.Processor interface.
110-
func (*columnBackfiller) Close(context.Context) {}
110+
func (cb *columnBackfiller) Close(ctx context.Context) {
111+
cb.ColumnBackfiller.Close(ctx)
112+
}
111113

112114
func (cb *columnBackfiller) doRun(ctx context.Context) *execinfrapb.ProducerMetadata {
113115
finishedSpans, err := cb.mainLoop(ctx)

0 commit comments

Comments
 (0)