Skip to content

Commit 4863e2d

Browse files
craig[bot]jeffswenson
andcommitted
Merge #155878
155878: logical: decode into local types r=jeffswenson a=jeffswenson Previously, the event decoder used by the crud writer copied the behavior of the legacy sql writer and did not decode all UDTs into local types. Once we start using prepared statements, we need datums to have the correct type. This reworks the decoder so the types are mapped into their local representation. Release note: none Epic: CRDB-48647 Co-authored-by: Jeff Swenson <jeffswenson@betterthannull.com>
2 parents 769e7c7 + fb31ef2 commit 4863e2d

File tree

2 files changed

+117
-29
lines changed

2 files changed

+117
-29
lines changed

pkg/crosscluster/logical/event_decoder.go

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
// that are appropriate for the destination table.
2727
type eventDecoder struct {
2828
decoder cdcevent.Decoder
29-
srcToDest map[descpb.ID]destinationTable
29+
srcToDest map[descpb.ID]*destinationTable
3030

3131
// TODO(jeffswenson): clean this interface up. There's a problem with
3232
// layering that requires the event decoder to know about the most recent
@@ -70,7 +70,7 @@ func newEventDecoder(
7070
settings *cluster.Settings,
7171
procConfigByDestID map[descpb.ID]sqlProcessorTableConfig,
7272
) (*eventDecoder, error) {
73-
srcToDest := make(map[descpb.ID]destinationTable, len(procConfigByDestID))
73+
srcToDest := make(map[descpb.ID]*destinationTable, len(procConfigByDestID))
7474
err := descriptors.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
7575
for dstID, s := range procConfigByDestID {
7676
descriptor, err := txn.Descriptors().GetLeasedImmutableTableByID(ctx, txn.KV(), dstID)
@@ -80,13 +80,16 @@ func newEventDecoder(
8080

8181
columns := getColumnSchema(descriptor)
8282
columnNames := make([]string, 0, len(columns))
83+
columnTypes := make([]*types.T, 0, len(columns))
8384
for _, column := range columns {
8485
columnNames = append(columnNames, column.column.GetName())
86+
columnTypes = append(columnTypes, column.column.GetType().Canonical())
8587
}
8688

87-
srcToDest[s.srcDesc.GetID()] = destinationTable{
88-
id: dstID,
89-
columns: columnNames,
89+
srcToDest[s.srcDesc.GetID()] = &destinationTable{
90+
id: dstID,
91+
columns: columnNames,
92+
columnTypes: columnTypes,
9093
}
9194
}
9295
return nil
@@ -185,8 +188,7 @@ func (d *eventDecoder) decodeEvent(
185188
return decodedEvent{}, errors.AssertionFailedf("table %d not found", decodedRow.TableID)
186189
}
187190

188-
row := make(tree.Datums, 0, len(dstTable.columns))
189-
row, err = appendDatums(row, decodedRow, dstTable.columns)
191+
row, err := dstTable.toLocalDatums(decodedRow)
190192
if err != nil {
191193
return decodedEvent{}, err
192194
}
@@ -201,8 +203,7 @@ func (d *eventDecoder) decodeEvent(
201203
return decodedEvent{}, err
202204
}
203205

204-
prevRow := make(tree.Datums, 0, len(dstTable.columns))
205-
prevRow, err = appendDatums(prevRow, decodedPrevRow, dstTable.columns)
206+
prevRow, err := dstTable.toLocalDatums(decodedPrevRow)
206207
if err != nil {
207208
return decodedEvent{}, err
208209
}
@@ -216,37 +217,43 @@ func (d *eventDecoder) decodeEvent(
216217
}, nil
217218
}
218219

219-
// appendDatums appends datums for the specified column names from the cdcevent.Row
220-
// to the datums slice and returns the updated slice.
221-
func appendDatums(datums tree.Datums, row cdcevent.Row, columnNames []string) (tree.Datums, error) {
222-
it, err := row.DatumsNamed(columnNames)
220+
// toLocalDatums creates a row with the types of the datums converted to to the
221+
// types required by the remote descriptor. toLocalDatums takes ownerhsip of
222+
// the input row and may modify datums from the decoded input row.
223+
func (d *destinationTable) toLocalDatums(row cdcevent.Row) (tree.Datums, error) {
224+
localRow := make(tree.Datums, len(d.columns))
225+
226+
it, err := row.DatumsNamed(d.columns)
223227
if err != nil {
224228
return nil, err
225229
}
226230

227-
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
228-
if dEnum, ok := d.(*tree.DEnum); ok {
229-
// Override the type to Unknown to avoid a mismatched type OID error
230-
// during execution. Note that Unknown is the type used by default
231-
// when a SQL statement is executed without type hints.
232-
//
233-
// TODO(jeffswenson): this feels like the wrong place to do this,
234-
// but its inspired by the implementation in queryBuilder.AddRow.
235-
//
236-
// Really we should be mapping from the source datum type to the
237-
// destination datum type.
238-
dEnum.EnumTyp = types.Unknown
231+
columnIndex := 0
232+
if err := it.Datum(func(datum tree.Datum, col cdcevent.ResultColumn) error {
233+
typ := d.columnTypes[columnIndex]
234+
235+
switch d := datum.(type) {
236+
case *tree.DEnum:
237+
d.EnumTyp = typ
238+
case *tree.DArray:
239+
d.ParamTyp = typ
240+
case *tree.DTuple:
241+
datum = tree.NewDTuple(typ, d.D...)
239242
}
240-
datums = append(datums, d)
243+
244+
localRow[columnIndex] = datum
245+
246+
columnIndex += 1
241247
return nil
242248
}); err != nil {
243249
return nil, err
244250
}
245251

246-
return datums, nil
252+
return localRow, nil
247253
}
248254

249255
type destinationTable struct {
250-
id descpb.ID
251-
columns []string
256+
id descpb.ID
257+
columns []string
258+
columnTypes []*types.T
252259
}

pkg/crosscluster/logical/event_decoder_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,17 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
1515
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1616
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1718
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1819
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
20+
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
1921
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2022
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2123
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2224
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2325
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2426
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
2528
"github.com/stretchr/testify/require"
2629
)
2730

@@ -197,3 +200,81 @@ func TestEventDecoder_DeduplicationWithDiscardDelete(t *testing.T) {
197200
require.Equal(t, tree.NewDString("inserted"), events[1].row[1])
198201
require.Equal(t, tree.DNull, events[1].prevRow[1])
199202
}
203+
204+
func TestEventDecoder_UserDefinedTypes(t *testing.T) {
205+
defer leaktest.AfterTest(t)()
206+
defer log.Scope(t).Close(t)
207+
208+
// TODO(jeffswenson): it would be nice if we could implement this test using
209+
// the randgen package, but randgen appears to be missing a utililty that
210+
// lets us create a random table random dependent UDTs.
211+
212+
ctx := context.Background()
213+
rng, _ := randutil.NewTestRand()
214+
215+
srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
216+
defer srv.Stopper().Stop(ctx)
217+
server := srv.ApplicationLayer()
218+
runner := sqlutils.MakeSQLRunner(sqlDB)
219+
220+
for _, db := range []string{"srcdb", "dstdb"} {
221+
runner.Exec(t, "CREATE DATABASE "+db)
222+
runner.Exec(t, "USE "+db)
223+
runner.Exec(t, "CREATE TYPE status_enum AS ENUM ('active', 'inactive')")
224+
runner.Exec(t, "CREATE TYPE metadata_type AS (key TEXT, value INT)")
225+
runner.Exec(t, `CREATE TABLE user_types (
226+
id INT PRIMARY KEY,
227+
status status_enum,
228+
tags TEXT[],
229+
metadata metadata_type
230+
)`)
231+
}
232+
233+
srcDesc := cdctest.GetHydratedTableDescriptor(t, server.ExecutorConfig(), "srcdb", "public", "user_types")
234+
dstDesc := cdctest.GetHydratedTableDescriptor(t, server.ExecutorConfig(), "dstdb", "public", "user_types")
235+
236+
decoder, err := newEventDecoder(ctx, server.InternalDB().(descs.DB), server.ClusterSettings(), map[descpb.ID]sqlProcessorTableConfig{
237+
dstDesc.GetID(): {srcDesc: srcDesc},
238+
})
239+
require.NoError(t, err)
240+
241+
eb := newKvEventBuilder(t, srcDesc.TableDesc())
242+
243+
// Build test row with user-defined types
244+
enumType := catalog.FindColumnByName(srcDesc, "status").GetType()
245+
arrayType := catalog.FindColumnByName(srcDesc, "tags").GetType()
246+
tupleType := catalog.FindColumnByName(srcDesc, "metadata").GetType()
247+
testRow := tree.Datums{
248+
tree.NewDInt(1),
249+
&tree.DEnum{
250+
EnumTyp: enumType,
251+
PhysicalRep: enumType.TypeMeta.EnumData.PhysicalRepresentations[0], // Use correct physical rep
252+
LogicalRep: enumType.TypeMeta.EnumData.LogicalRepresentations[0], // Use correct logical rep
253+
},
254+
randgen.RandDatum(rng, arrayType, false).(*tree.DArray),
255+
randgen.RandDatum(rng, tupleType, false).(*tree.DTuple),
256+
}
257+
258+
insertEvent := eb.insertEvent(server.Clock().Now(), testRow)
259+
events, err := decoder.decodeAndCoalesceEvents(ctx, []streampb.StreamEvent_KV{insertEvent}, jobspb.LogicalReplicationDetails_DiscardNothing)
260+
require.NoError(t, err)
261+
require.Len(t, events, 1)
262+
263+
row := events[0].row
264+
require.Len(t, row, 4)
265+
266+
// Verify enum type mapping
267+
enum := row[1].(*tree.DEnum)
268+
dstEnumType := catalog.FindColumnByName(dstDesc, "status").GetType()
269+
require.Equal(t, dstEnumType, enum.EnumTyp)
270+
271+
// Verify array type mapping
272+
array := row[2].(*tree.DArray)
273+
dstArrayType := catalog.FindColumnByName(dstDesc, "tags").GetType()
274+
require.Equal(t, dstArrayType, array.ParamTyp)
275+
276+
// Verify tuple type mapping
277+
tuple := row[3].(*tree.DTuple)
278+
dstTupleType := catalog.FindColumnByName(dstDesc, "metadata").GetType()
279+
require.Equal(t, dstTupleType, tuple.ResolvedType())
280+
}

0 commit comments

Comments
 (0)