Skip to content

Commit 7f098a1

Browse files
craig[bot]aerfreistevendannaZhouXing19
committed
152448: changefeedccl: protect system tables with their own pts record r=andyyang890 a=aerfrei Previously, we were relying on each pts record that protects a user table to also protect system tables. This would mean that we'd protect the system tables once for each table. Now, we have a dedicated pts record for system tables that protects back to the highwater and do not include the system table protections in the table specific records. We only protect the system tables once. Fixes: #152446 Epic: CRDB-1421 Release note: None 155701: jobsprotectedtsccl: move to jobsprotectedts r=msbutler a=stevendanna These tests are now allowed to live in the jobs package so the jobsccl package is no longer require. Epic: none Release note: None 156190: changefeedccl: update timestamp on event descriptor cache hits r=andyyang890 a=aerfrei Before, in some cases we could replan a CDC query with an old timestamp from the event descriptor cache, which would try to read the database descriptor at an old potentially garbage collected timestamp. Now, we make sure the event descriptor cache always gives events descriptors with an up to date timestamp. Epic: none Fixes: #156091 Release note (bug fix): A bug where changefeeds using CDC queries could sometimes unexpectedly fail after a schema change with a descriptor retrieval error has been fixed. 156416: sql/jsonpath: support index acceleration with AnyKey (`*`) at the chain end r=ZhouXing19 a=ZhouXing19 fixes: #156340 We now support index accelerating `jsonb_path_exists` filters with json path expression that ends with an AnyKey (`*`). Note that the AnyKey is allowed only at the end of the expression. I.e. the following are not allowed: ``` $.a.*.b $.a.b.*.* ``` Release note (sql change): We now support index accelerating `jsonb_path_exists` filters with json path expression that ends with an AnyKey (`*`). Co-authored-by: Aerin Freilich <aerin.freilich@cockroachlabs.com> Co-authored-by: Steven Danna <danna@cockroachlabs.com> Co-authored-by: ZhouXing19 <zhouxing@uchicago.edu>
5 parents 39159bd + 2fd5cac + 39e594b + d3cbbf7 + c22a945 commit 7f098a1

File tree

23 files changed

+769
-146
lines changed

23 files changed

+769
-146
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ ALL_TESTS = [
4848
"//pkg/ccl/cloudccl/externalconn:externalconn_test",
4949
"//pkg/ccl/cloudccl/gcp:gcp_test",
5050
"//pkg/ccl/importerccl:importerccl_test",
51-
"//pkg/ccl/jobsccl/jobsprotectedtsccl:jobsprotectedtsccl_test",
5251
"//pkg/ccl/jwtauthccl:jwtauthccl_test",
5352
"//pkg/ccl/kvccl/kvfollowerreadsccl:kvfollowerreadsccl_test",
5453
"//pkg/ccl/kvccl/kvtenantccl/upgradeccl:upgradeccl_test",
@@ -212,6 +211,7 @@ ALL_TESTS = [
212211
"//pkg/jobs/jobsauth:jobsauth_test",
213212
"//pkg/jobs/jobspb:jobspb_test",
214213
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
214+
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
215215
"//pkg/jobs:jobs_test",
216216
"//pkg/keys:keys_test",
217217
"//pkg/keyvisualizer/spanstatsconsumer:spanstatsconsumer_test",
@@ -976,7 +976,6 @@ GO_TARGETS = [
976976
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry_lib",
977977
"//pkg/ccl/gssapiccl:gssapiccl",
978978
"//pkg/ccl/importerccl:importerccl_test",
979-
"//pkg/ccl/jobsccl/jobsprotectedtsccl:jobsprotectedtsccl_test",
980979
"//pkg/ccl/jwtauthccl:jwtauthccl",
981980
"//pkg/ccl/jwtauthccl:jwtauthccl_test",
982981
"//pkg/ccl/kvccl/kvfollowerreadsccl:kvfollowerreadsccl",
@@ -1432,6 +1431,7 @@ GO_TARGETS = [
14321431
"//pkg/jobs/jobsprofiler:jobsprofiler",
14331432
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
14341433
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
1434+
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
14351435
"//pkg/jobs/jobstest:jobstest",
14361436
"//pkg/jobs/metricspoller:metricspoller",
14371437
"//pkg/jobs:jobs",

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ go_test(
282282
"//pkg/kv/kvserver/kvserverbase",
283283
"//pkg/kv/kvserver/protectedts",
284284
"//pkg/kv/kvserver/protectedts/ptpb",
285+
"//pkg/kv/kvserver/protectedts/ptutil",
285286
"//pkg/multitenant/tenantcapabilitiespb",
286287
"//pkg/roachpb",
287288
"//pkg/scheduledjobs",

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -521,9 +521,14 @@ func getEventDescriptorCached(
521521
idVer := CacheKey{ID: desc.GetID(), Version: desc.GetVersion(), FamilyID: family.ID}
522522

523523
if v, ok := cache.Get(idVer); ok {
524-
ed := v.(*EventDescriptor)
525-
if catalog.UserDefinedTypeColsHaveSameVersion(ed.td, desc) {
526-
return ed, nil
524+
cached := v.(*EventDescriptor)
525+
if catalog.UserDefinedTypeColsHaveSameVersion(cached.td, desc) {
526+
// Make a shallow copy to avoid modifying the cached value. The cached
527+
// EventDescriptor is shared across changefeed operations and may be
528+
// referenced concurrently.
529+
ed := *cached
530+
ed.SchemaTS = schemaTS
531+
return &ed, nil
527532
}
528533
}
529534

pkg/ccl/changefeedccl/cdcprogresspb/progress.proto

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,20 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcp
99

1010
import "gogoproto/gogo.proto";
1111

12-
// ProtectedTimestampRecords is a map from table descriptor IDs to protected timestamp record IDs.
12+
// ProtectedTimestampRecords contains the IDs of protected timestamps records
13+
// for a changefeed.
1314
message ProtectedTimestampRecords {
14-
map<uint32, bytes> protected_timestamp_records = 1 [
15+
// UserTables is a map from the ID of a user table to the protected timestamp
16+
// record that protects it.
17+
map<uint32, bytes> user_tables = 1 [
1518
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID",
1619
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
1720
(gogoproto.nullable) = false
1821
];
22+
// SystemTables is the ID of PTS record that protects the system tables
23+
// back to the changefeed's highwater.
24+
bytes system_tables = 2 [
25+
(gogoproto.nullable) = false,
26+
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
27+
];
1928
}

pkg/ccl/changefeedccl/changefeed_job_info_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestChangefeedJobInfoRoundTrip(t *testing.T) {
3535
uuid1 := uuid.MakeV4()
3636
uuid2 := uuid.MakeV4()
3737
ptsRecords := cdcprogresspb.ProtectedTimestampRecords{
38-
ProtectedTimestampRecords: map[descpb.ID]uuid.UUID{
38+
UserTables: map[descpb.ID]uuid.UUID{
3939
descpb.ID(100): uuid1,
4040
descpb.ID(200): uuid2,
4141
},

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1999,7 +1999,19 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19991999
}()
20002000

20012001
if cf.spec.ProgressConfig != nil && cf.spec.ProgressConfig.PerTableProtectedTimestamps {
2002-
return cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
2002+
updatedPerTablePTS, err :=
2003+
cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater, pts)
2004+
if err != nil {
2005+
return false, err
2006+
}
2007+
2008+
updatedSystemTablesPTS, err :=
2009+
cf.advanceSystemTablesProtectedTimestamp(ctx, txn, &ptsEntries, highwater, pts)
2010+
if err != nil {
2011+
return false, err
2012+
}
2013+
2014+
return updatedPerTablePTS || updatedSystemTablesPTS, nil
20032015
}
20042016

20052017
return cf.advanceProtectedTimestamp(ctx, progress, pts, highwater)
@@ -2010,8 +2022,8 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20102022
txn isql.Txn,
20112023
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20122024
highwater hlc.Timestamp,
2025+
pts protectedts.Storage,
20132026
) (updatedPerTablePTS bool, err error) {
2014-
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
20152027
tableIDsToCreate := make(map[descpb.ID]hlc.Timestamp)
20162028
for tableID, frontier := range cf.frontier.Frontiers() {
20172029
tableHighWater := func() hlc.Timestamp {
@@ -2023,14 +2035,13 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20232035
return frontier.Frontier()
20242036
}()
20252037

2026-
if ptsEntries.ProtectedTimestampRecords[tableID] != uuid.Nil {
2038+
if ptsEntries.UserTables[tableID] != uuid.Nil {
20272039
if updated, err := cf.advancePerTableProtectedTimestampRecord(ctx, ptsEntries, tableID, tableHighWater, pts); err != nil {
20282040
return false, err
20292041
} else if updated {
20302042
updatedPerTablePTS = true
20312043
}
20322044
} else {
2033-
// TODO(#152448): Do not include system table protections in these records.
20342045
// TODO(#153894): Newly added/dropped tables should be caught and
20352046
// protected when starting the frontier, not here.
20362047
tableIDsToCreate[tableID] = tableHighWater
@@ -2061,7 +2072,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20612072
tableHighWater hlc.Timestamp,
20622073
pts protectedts.Storage,
20632074
) (updated bool, err error) {
2064-
rec, err := pts.GetRecord(ctx, ptsEntries.ProtectedTimestampRecords[tableID])
2075+
rec, err := pts.GetRecord(ctx, ptsEntries.UserTables[tableID])
20652076
if err != nil {
20662077
return false, err
20672078
}
@@ -2071,7 +2082,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20712082
return false, nil
20722083
}
20732084

2074-
if err := pts.UpdateTimestamp(ctx, ptsEntries.ProtectedTimestampRecords[tableID], tableHighWater); err != nil {
2085+
if err := pts.UpdateTimestamp(ctx, ptsEntries.UserTables[tableID], tableHighWater); err != nil {
20752086
return false, err
20762087
}
20772088
return true, nil
@@ -2083,19 +2094,19 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20832094
tableIDsToCreate map[descpb.ID]hlc.Timestamp,
20842095
pts protectedts.Storage,
20852096
) error {
2086-
if ptsEntries.ProtectedTimestampRecords == nil {
2087-
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]uuid.UUID)
2097+
if ptsEntries.UserTables == nil {
2098+
ptsEntries.UserTables = make(map[descpb.ID]uuid.UUID)
20882099
}
20892100
for tableID, tableHighWater := range tableIDsToCreate {
20902101
targets, err := cf.createPerTablePTSTargets(tableID)
20912102
if err != nil {
20922103
return err
20932104
}
2094-
ptr := createProtectedTimestampRecord(
2105+
ptr := createUserTablesProtectedTimestampRecord(
20952106
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, targets, tableHighWater,
20962107
)
20972108
uuid := ptr.ID.GetUUID()
2098-
ptsEntries.ProtectedTimestampRecords[tableID] = uuid
2109+
ptsEntries.UserTables[tableID] = uuid
20992110
if err := pts.Protect(ctx, ptr); err != nil {
21002111
return err
21012112
}
@@ -2124,14 +2135,63 @@ func (cf *changeFrontier) createPerTablePTSTargets(
21242135
return targets, nil
21252136
}
21262137

2138+
func (cf *changeFrontier) advanceSystemTablesProtectedTimestamp(
2139+
ctx context.Context,
2140+
txn isql.Txn,
2141+
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
2142+
timestamp hlc.Timestamp,
2143+
pts protectedts.Storage,
2144+
) (updated bool, err error) {
2145+
if ptsEntries.SystemTables == uuid.Nil {
2146+
// All changefeeds using per-table PTS records should have a system tables
2147+
// PTS record. If they are missing one, it should be made when starting the
2148+
// changefeed.
2149+
return false, errors.AssertionFailedf("expected system tables PTS record to be present")
2150+
}
2151+
2152+
rec, err := pts.GetRecord(ctx, ptsEntries.SystemTables)
2153+
if err != nil {
2154+
return false, err
2155+
}
2156+
2157+
if !makeSystemTablesTargetToProtect().Equal(rec.Target) {
2158+
if cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets() {
2159+
return false, nil
2160+
}
2161+
if err := cf.remakeSystemTablesPTSRecord(ctx, txn, pts, ptsEntries, timestamp); err != nil {
2162+
return false, err
2163+
}
2164+
log.VEventf(
2165+
ctx, 2, "remade system tables PTS record %v to include all targets",
2166+
ptsEntries.SystemTables,
2167+
)
2168+
return true, nil
2169+
}
2170+
2171+
ptsUpdateLag := changefeedbase.ProtectTimestampLag.Get(&cf.FlowCtx.Cfg.Settings.SV)
2172+
if rec.Timestamp.AddDuration(ptsUpdateLag).After(timestamp) {
2173+
return false, nil
2174+
}
2175+
2176+
if err := pts.UpdateTimestamp(ctx, ptsEntries.SystemTables, timestamp); err != nil {
2177+
return false, err
2178+
}
2179+
return true, nil
2180+
}
2181+
2182+
// advanceProtectedTimestamp advances the single PTS record for changefeeds that
2183+
// are not using per-table protected timestamps.
21272184
func (cf *changeFrontier) advanceProtectedTimestamp(
21282185
ctx context.Context,
21292186
progress *jobspb.ChangefeedProgress,
21302187
pts protectedts.Storage,
21312188
timestamp hlc.Timestamp,
21322189
) (updated bool, err error) {
21332190
if progress.ProtectedTimestampRecord == uuid.Nil {
2134-
ptr := createProtectedTimestampRecord(
2191+
// For changefeeds not using per-table PTS, system tables are protected
2192+
// in the single PTS record for the changefeed with all other targets
2193+
// in a combined record.
2194+
ptr := createCombinedProtectedTimestampRecord(
21352195
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, timestamp,
21362196
)
21372197
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
@@ -2159,7 +2219,7 @@ func (cf *changeFrontier) advanceProtectedTimestamp(
21592219
// If we've identified more tables that need to be protected since this
21602220
// changefeed was created, it will be missing here. If so, we "migrate" it
21612221
// to include all the appropriate targets.
2162-
if !makeTargetToProtect(cf.targets).Equal(rec.Target) {
2222+
if !makeCombinedTargetToProtect(cf.targets).Equal(rec.Target) {
21632223
if preservePTSTargets := cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets(); preservePTSTargets {
21642224
return false, nil
21652225
}
@@ -2190,7 +2250,7 @@ func (cf *changeFrontier) remakePTSRecord(
21902250
resolved hlc.Timestamp,
21912251
) error {
21922252
prevRecordId := progress.ProtectedTimestampRecord
2193-
ptr := createProtectedTimestampRecord(
2253+
ptr := createCombinedProtectedTimestampRecord(
21942254
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, resolved,
21952255
)
21962256
if err := pts.Protect(ctx, ptr); err != nil {
@@ -2207,6 +2267,29 @@ func (cf *changeFrontier) remakePTSRecord(
22072267
return nil
22082268
}
22092269

2270+
func (cf *changeFrontier) remakeSystemTablesPTSRecord(
2271+
ctx context.Context,
2272+
txn isql.Txn,
2273+
pts protectedts.Storage,
2274+
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
2275+
resolved hlc.Timestamp,
2276+
) error {
2277+
ptr := createSystemTablesProtectedTimestampRecord(
2278+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, resolved,
2279+
)
2280+
if err := pts.Protect(ctx, ptr); err != nil {
2281+
return err
2282+
}
2283+
prevRecordId := ptsEntries.SystemTables
2284+
if err := pts.Release(ctx, prevRecordId); err != nil {
2285+
return err
2286+
}
2287+
ptsEntries.SystemTables = ptr.ID.GetUUID()
2288+
log.Eventf(ctx, "created new system tables pts record %v to replace old pts record %v at %v",
2289+
ptsEntries.SystemTables, prevRecordId, resolved)
2290+
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2291+
}
2292+
22102293
func (cf *changeFrontier) maybeEmitResolved(ctx context.Context, newResolved hlc.Timestamp) error {
22112294
if cf.freqEmitResolved == emitNoResolved || newResolved.IsEmpty() {
22122295
return nil

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,9 @@ func changefeedPlanHook(
327327
// perTablePTSRecords is the per-table protected timestamp records which exist when
328328
// per-table protected timestamps are enabled.
329329
var perTablePTSRecords []*ptpb.Record
330+
// systemTablesPTSRecord is the system tables protected timestamp record which exists when
331+
// per-table protected timestamps are enabled.
332+
var systemTablesPTSRecord *ptpb.Record
330333
// ptsRecords is the protected timestamp records object containing all per-table protected
331334
// timestamp records. Its format matches what will be persisted to the job info table.
332335
var ptsRecords *cdcprogresspb.ProtectedTimestampRecords
@@ -339,9 +342,11 @@ func changefeedPlanHook(
339342
if usingPerTablePTS {
340343
protectedTimestampRecords := make(map[descpb.ID]uuid.UUID)
341344
if err := targets.EachTarget(func(target changefeedbase.Target) error {
345+
// TODO(#155957): We are likely leaking PTS records here in
346+
// the column families case.
342347
ptsTargets := changefeedbase.Targets{}
343348
ptsTargets.Add(target)
344-
ptsRecord := createProtectedTimestampRecord(
349+
ptsRecord := createUserTablesProtectedTimestampRecord(
345350
ctx,
346351
codec,
347352
jobID,
@@ -355,11 +360,18 @@ func changefeedPlanHook(
355360
}); err != nil {
356361
return err
357362
}
363+
systemTablesPTSRecord = createSystemTablesProtectedTimestampRecord(
364+
ctx,
365+
codec,
366+
jobID,
367+
details.StatementTime,
368+
)
358369
ptsRecords = &cdcprogresspb.ProtectedTimestampRecords{
359-
ProtectedTimestampRecords: protectedTimestampRecords,
370+
UserTables: protectedTimestampRecords,
371+
SystemTables: systemTablesPTSRecord.ID.GetUUID(),
360372
}
361373
} else {
362-
ptr = createProtectedTimestampRecord(
374+
ptr = createCombinedProtectedTimestampRecord(
363375
ctx,
364376
codec,
365377
jobID,
@@ -380,12 +392,15 @@ func changefeedPlanHook(
380392
}
381393
}
382394
if usingPerTablePTS {
395+
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(p.InternalSQLTxn())
383396
for _, perTableRecord := range perTablePTSRecords {
384-
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(p.InternalSQLTxn())
385397
if err := pts.Protect(ctx, perTableRecord); err != nil {
386398
return err
387399
}
388400
}
401+
if err := pts.Protect(ctx, systemTablesPTSRecord); err != nil {
402+
return err
403+
}
389404
if err := writeChangefeedJobInfo(
390405
ctx, perTableProtectedTimestampsFilename, ptsRecords, p.InternalSQLTxn(), jobID,
391406
); err != nil {
@@ -428,6 +443,9 @@ func changefeedPlanHook(
428443
return err
429444
}
430445
}
446+
if err := pts.Protect(ctx, systemTablesPTSRecord); err != nil {
447+
return err
448+
}
431449
if err := writeChangefeedJobInfo(
432450
ctx, perTableProtectedTimestampsFilename, ptsRecords, txn, jobID,
433451
); err != nil {
@@ -1966,20 +1984,27 @@ func (b *changefeedResumer) OnFailOrCancel(
19661984
}
19671985

19681986
maybeCleanUpProtectedTimestamp(progress.GetChangefeed().ProtectedTimestampRecord)
1987+
// We clean up the per-table protected timestamps (and their accompanying
1988+
// system tables protected timestamp record) in a transaction since we need
1989+
// to read from the job info.
19691990
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
19701991
ptsEntries := cdcprogresspb.ProtectedTimestampRecords{}
19711992
if err := readChangefeedJobInfo(
19721993
ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, b.job.ID(),
19731994
); err != nil {
19741995
return err
19751996
}
1976-
1977-
if len(ptsEntries.ProtectedTimestampRecords) == 0 {
1997+
// In the event that the changefeed is not using per-table protected
1998+
// timestamps, the ptsEntries populated from the job info table
1999+
// (in the file perTableProtectedTimestampsFilename) will be empty.
2000+
// There is nothing to clean up, so we can safely return here.
2001+
if len(ptsEntries.UserTables) == 0 && ptsEntries.SystemTables == uuid.Nil {
19782002
return nil
19792003
}
1980-
for _, record := range ptsEntries.ProtectedTimestampRecords {
2004+
for _, record := range ptsEntries.UserTables {
19812005
maybeCleanUpProtectedTimestamp(record)
19822006
}
2007+
maybeCleanUpProtectedTimestamp(ptsEntries.SystemTables)
19832008
return deleteChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, txn, b.job.ID())
19842009
}); err != nil {
19852010
return err

0 commit comments

Comments
 (0)