Skip to content

Commit cc61b3c

Browse files
craig[bot]jeffswenson
andcommitted
Merge #156570
156570: logical: feature flag off UDF support r=jeffswenson a=jeffswenson This adds a logical_replication.deprecated_udf_writer.enabled feature flag that turns off LDR UDF support. The UDF writer will be permanently deleted after a full upgrade cycle. Fixes: #148312 Release note: none Co-authored-by: Jeff Swenson <jeffswenson@betterthannull.com>
2 parents de29e7d + d8b02f9 commit cc61b3c

File tree

6 files changed

+78
-0
lines changed

6 files changed

+78
-0
lines changed

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ func createLogicalReplicationStreamPlanHook(
117117

118118
hasUDF := len(options.userFunctions) > 0 || options.defaultFunction != nil && options.defaultFunction.FunctionId != 0
119119

120+
if hasUDF && !crosscluster.LogicalReplicationUDFWriterEnabled.Get(&p.ExecCfg().Settings.SV) {
121+
return pgerror.Newf(pgcode.FeatureNotSupported,
122+
"UDF-based logical replication is disabled and will be deleted in a future CockroachDB release. "+
123+
"Enable with SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
124+
}
125+
120126
mode := jobspb.LogicalReplicationDetails_Immediate
121127
if m, ok := options.GetMode(); ok {
122128
switch m {

pkg/crosscluster/logical/create_logical_replication_stmt_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"testing"
1212

1313
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
1415
"github.com/cockroachdb/cockroach/pkg/security/username"
1516
"github.com/cockroachdb/cockroach/pkg/sql"
1617
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
@@ -201,3 +202,27 @@ func TestResolveDestinationObjects(t *testing.T) {
201202
}
202203

203204
}
205+
206+
func TestUDFLogicalReplicationDisabled(t *testing.T) {
207+
defer leaktest.AfterTest(t)()
208+
defer log.Scope(t).Close(t)
209+
210+
ctx := context.Background()
211+
tc, s, runnerA, runnerB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
212+
defer tc.Stopper().Stop(ctx)
213+
214+
runnerA.Exec(t, "CREATE TABLE test_tab(pk INT PRIMARY KEY, v INT)")
215+
runnerB.Exec(t, "CREATE TABLE test_tab(pk INT PRIMARY KEY, v INT)")
216+
217+
runnerB.Exec(t, `
218+
CREATE FUNCTION repl_apply(action STRING, proposed test_tab, existing test_tab, prev test_tab, existing_mvcc_timestamp DECIMAL, existing_origin_timestamp DECIMAL, proposed_mvcc_timestamp DECIMAL)
219+
RETURNS string AS $$ BEGIN RETURN 'accept_proposed'; END $$ LANGUAGE plpgsql
220+
`)
221+
222+
dbAURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("a"))
223+
224+
// Should fail with UDF writer disabled (default setting)
225+
runnerB.ExpectErr(t, "UDF-based logical replication is disabled",
226+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE test_tab ON $1 INTO TABLE test_tab WITH FUNCTION repl_apply FOR TABLE test_tab",
227+
dbAURL.String())
228+
}

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,31 @@ type logicalReplicationResumer struct {
8787

8888
var _ jobs.Resumer = (*logicalReplicationResumer)(nil)
8989

90+
func (r *logicalReplicationResumer) jobUsesUDF() bool {
91+
payload := r.job.Details().(jobspb.LogicalReplicationDetails)
92+
93+
if payload.DefaultConflictResolution.FunctionId != 0 {
94+
return true
95+
}
96+
97+
for _, pair := range payload.ReplicationPairs {
98+
if pair.DstFunctionID != 0 {
99+
return true
100+
}
101+
}
102+
103+
return false
104+
}
105+
90106
// Resume is part of the jobs.Resumer interface.
91107
func (r *logicalReplicationResumer) Resume(ctx context.Context, execCtx interface{}) error {
92108
jobExecCtx := execCtx.(sql.JobExecContext)
109+
110+
if r.jobUsesUDF() && !crosscluster.LogicalReplicationUDFWriterEnabled.Get(&jobExecCtx.ExecCfg().Settings.SV) {
111+
r.updateStatusMessage(ctx, "job paused because UDF-based logical replication writer is disabled")
112+
return jobs.MarkPauseRequestError(errors.Newf("UDF-based logical replication writer is disabled and will be deleted in a future CockroachDB release"))
113+
}
114+
93115
return r.handleResumeError(ctx, jobExecCtx, r.ingestWithRetries(ctx, jobExecCtx))
94116
}
95117

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,9 @@ func TestPreviouslyInterestingTables(t *testing.T) {
10271027
tc, s, runnerA, runnerB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
10281028
defer tc.Stopper().Stop(ctx)
10291029

1030+
runnerA.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
1031+
runnerB.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
1032+
10301033
sqlA := s.SQLConn(t, serverutils.DBName("a"))
10311034

10321035
type testCase struct {
@@ -1841,6 +1844,9 @@ func TestLogicalStreamIngestionJobWithFallbackUDF(t *testing.T) {
18411844
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
18421845
defer server.Stopper().Stop(ctx)
18431846

1847+
dbA.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
1848+
dbB.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
1849+
18441850
lwwFunc := `CREATE OR REPLACE FUNCTION repl_apply(action STRING, proposed tab, existing tab, prev tab, existing_mvcc_timestamp DECIMAL, existing_origin_timestamp DECIMAL, proposed_mvcc_timestamp DECIMAL)
18451851
RETURNS string
18461852
AS $$

pkg/crosscluster/logical/udf_row_processor_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ func TestUDFWithRandomTables(t *testing.T) {
5353
tc, s, runnerA, runnerB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
5454
defer tc.Stopper().Stop(ctx)
5555

56+
runnerA.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
57+
runnerB.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
58+
5659
tableName := "rand_table"
5760
rng, _ := randutil.NewPseudoRand()
5861
createStmt := randgen.RandCreateTableWithName(
@@ -108,6 +111,9 @@ func TestUDFInsertOnly(t *testing.T) {
108111
tc, s, runnerA, runnerB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
109112
defer tc.Stopper().Stop(ctx)
110113

114+
runnerA.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
115+
runnerB.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
116+
111117
tableName := "tallies"
112118
stmt := "CREATE TABLE tallies(pk INT PRIMARY KEY, v INT)"
113119
runnerA.Exec(t, stmt)
@@ -157,6 +163,9 @@ func TestUDFPreviousValue(t *testing.T) {
157163
tc, s, runnerA, runnerB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
158164
defer tc.Stopper().Stop(ctx)
159165

166+
runnerA.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
167+
runnerB.Exec(t, "SET CLUSTER SETTING logical_replication.deprecated_udf_writer.enabled = true")
168+
160169
tableName := "tallies"
161170
stmt := "CREATE TABLE tallies(pk INT PRIMARY KEY, v INT)"
162171
runnerA.Exec(t, stmt)

pkg/crosscluster/settings.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,13 @@ var LogicalReplanFrequency = settings.RegisterDurationSetting(
126126
10*time.Minute,
127127
settings.PositiveDuration,
128128
)
129+
130+
// LogicalReplicationUDFWriterEnabled controls whether the UDF-based logical
131+
// data replication writer is enabled. When disabled, existing UDF writer jobs
132+
// will be paused and new UDF LDR jobs cannot be created.
133+
var LogicalReplicationUDFWriterEnabled = settings.RegisterBoolSetting(
134+
settings.ApplicationLevel,
135+
"logical_replication.deprecated_udf_writer.enabled",
136+
"enables the UDF-based logical data replication writer (deprecated, will be removed)",
137+
false,
138+
)

0 commit comments

Comments
 (0)