Skip to content

Commit d169b57

Browse files
committed
wip
1 parent 056c336 commit d169b57

File tree

10 files changed

+57
-46
lines changed

10 files changed

+57
-46
lines changed

canal/expr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (ve *valueExpr) GetString() string { return ""
4343
func (ve *valueExpr) GetProjectionOffset() int { return 0 }
4444
func (ve *valueExpr) SetProjectionOffset(offset int) {}
4545
func (ve *valueExpr) Restore(ctx *format.RestoreCtx) error { return nil }
46-
func (ve *valueExpr) Accept(v ast.Visitor) (node ast.Node, ok bool) { return }
46+
func (ve *valueExpr) Accept(v ast.Visitor) (node ast.Node, ok bool) { return node, ok }
4747
func (ve *valueExpr) Text() string { return "" }
4848
func (ve *valueExpr) SetText(enc charset.Encoding, text string) {}
4949
func (ve *valueExpr) Format(w io.Writer) {}

canal/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (
249249
if err = c.eventHandler.OnTableChanged(header, db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
250250
return errors.Trace(err)
251251
}
252-
return
252+
return err
253253
}
254254

255255
func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {

client/auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func (c *Conn) writeAuthHandshake() error {
220220
c.ccaps&mysql.CLIENT_MULTI_STATEMENTS | c.ccaps&mysql.CLIENT_MULTI_RESULTS |
221221
c.ccaps&mysql.CLIENT_PS_MULTI_RESULTS | c.ccaps&mysql.CLIENT_CONNECT_ATTRS |
222222
c.ccaps&mysql.CLIENT_COMPRESS | c.ccaps&mysql.CLIENT_ZSTD_COMPRESSION_ALGORITHM |
223-
c.ccaps&mysql.CLIENT_LOCAL_FILES
223+
c.ccaps&mysql.CLIENT_LOCAL_FILES | c.ccaps&mysql.CLIENT_DEPRECATE_EOF
224224

225225
capability &^= c.clientExplicitOffCaps
226226

client/resp.go

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ func (c *Conn) readUntilEOF() (err error) {
2020
for {
2121
data, err = c.ReadPacket()
2222
if err != nil {
23-
return
23+
return err
2424
}
2525

2626
// EOF Packet
2727
if c.isEOFPacket(data) {
28-
return
28+
return err
2929
}
3030
}
3131
}
@@ -336,33 +336,16 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *mysql.Re
336336
}
337337

338338
func (c *Conn) readResultColumns(result *mysql.Result) (err error) {
339-
i := 0
340339
var data []byte
341340

342-
for {
341+
for i := range len(result.Fields) {
343342
rawPkgLen := len(result.RawPkg)
344343
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
345344
if err != nil {
346345
return err
347346
}
348347
data = result.RawPkg[rawPkgLen:]
349348

350-
// EOF Packet
351-
if c.isEOFPacket(data) {
352-
if c.capability&mysql.CLIENT_PROTOCOL_41 > 0 {
353-
result.Warnings = binary.LittleEndian.Uint16(data[1:])
354-
// todo add strict_mode, warning will be treat as error
355-
result.Status = binary.LittleEndian.Uint16(data[3:])
356-
c.status = result.Status
357-
}
358-
359-
if i != len(result.Fields) {
360-
err = mysql.ErrMalformPacket
361-
}
362-
363-
return err
364-
}
365-
366349
if result.Fields[i] == nil {
367350
result.Fields[i] = &mysql.Field{}
368351
}
@@ -372,8 +355,30 @@ func (c *Conn) readResultColumns(result *mysql.Result) (err error) {
372355
}
373356

374357
result.FieldNames[utils.ByteSliceToString(result.Fields[i].Name)] = i
358+
}
359+
360+
if !c.HasCapability(mysql.CLIENT_DEPRECATE_EOF) {
361+
// EOF Packet
362+
rawPkgLen := len(result.RawPkg)
363+
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
364+
if err != nil {
365+
return err
366+
}
367+
data = result.RawPkg[rawPkgLen:]
375368

376-
i++
369+
if c.isEOFPacket(data) {
370+
if c.capability&mysql.CLIENT_PROTOCOL_41 > 0 {
371+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
372+
// todo add strict_mode, warning will be treat as error
373+
result.Status = binary.LittleEndian.Uint16(data[3:])
374+
c.status = result.Status
375+
}
376+
return nil
377+
} else {
378+
return mysql.ErrMalformPacket
379+
}
380+
} else {
381+
return nil
377382
}
378383
}
379384

@@ -388,15 +393,21 @@ func (c *Conn) readResultRows(result *mysql.Result, isBinary bool) (err error) {
388393
}
389394
data = result.RawPkg[rawPkgLen:]
390395

391-
// EOF Packet
392-
if c.isEOFPacket(data) {
393-
if c.capability&mysql.CLIENT_PROTOCOL_41 > 0 {
396+
if data[0] == mysql.EOF_HEADER && len(data) <= 0xffffff {
397+
if c.HasCapability(mysql.CLIENT_DEPRECATE_EOF) {
398+
// Treat like OK
399+
affectedRows, _, n := mysql.LengthEncodedInt(data[1:])
400+
insertId, _, m := mysql.LengthEncodedInt(data[1+n:])
401+
result.Status = binary.LittleEndian.Uint16(data[1+n+m:])
402+
result.AffectedRows = affectedRows
403+
result.InsertId = insertId
404+
c.status = result.Status
405+
} else if c.capability&mysql.CLIENT_PROTOCOL_41 > 0 {
394406
result.Warnings = binary.LittleEndian.Uint16(data[1:])
395407
// todo add strict_mode, warning will be treat as error
396408
result.Status = binary.LittleEndian.Uint16(data[3:])
397409
c.status = result.Status
398410
}
399-
400411
break
401412
}
402413

mysql/error.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,5 @@ func ErrorCode(errMsg string) (code int) {
6262
var tmpStr string
6363
// golang scanf doesn't support %*,so I used a temporary variable
6464
_, _ = fmt.Sscanf(errMsg, "%s%d", &tmpStr, &code)
65-
return
65+
return code
6666
}

mysql/mysql_gtid.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ func parseInterval(str string) (i Interval, err error) {
4242
}
4343

4444
if err != nil {
45-
return
45+
return i, err
4646
}
4747

4848
if i.Stop <= i.Start {
4949
err = errors.Errorf("invalid interval format, must n[-n] and the end must >= start")
5050
}
5151

52-
return
52+
return i, err
5353
}
5454

5555
func (i Interval) String() string {

mysql/resultset_helper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func fieldType(value interface{}) (typ uint8, err error) {
143143
default:
144144
err = errors.Errorf("unsupport type %T for resultset", value)
145145
}
146-
return
146+
return typ, err
147147
}
148148

149149
func formatField(field *Field, value interface{}) error {

replication/event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,10 @@ func decodeSid(data []byte) (format GtidFormat, sidnr uint64) {
254254
masked := make([]byte, 8)
255255
copy(masked, data[1:7])
256256
sidnr = binary.LittleEndian.Uint64(masked)
257-
return
257+
return format, sidnr
258258
}
259259
sidnr = binary.LittleEndian.Uint64(data[:8])
260-
return
260+
return format, sidnr
261261
}
262262

263263
func (e *PreviousGTIDsEvent) Decode(data []byte) error {

replication/row_event.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,18 +344,18 @@ func (e *TableMapEvent) decodeIntSeq(v []byte) (ret []uint64, err error) {
344344
p += n
345345
ret = append(ret, i)
346346
}
347-
return
347+
return ret, err
348348
}
349349

350350
func (e *TableMapEvent) decodeDefaultCharset(v []byte) (ret []uint64, err error) {
351351
ret, err = e.decodeIntSeq(v)
352352
if err != nil {
353-
return
353+
return ret, err
354354
}
355355
if len(ret)%2 != 1 {
356356
return nil, errors.Errorf("Expect odd item in DefaultCharset but got %d", len(ret))
357357
}
358-
return
358+
return ret, err
359359
}
360360

361361
func (e *TableMapEvent) decodeColumnNames(v []byte) error {
@@ -390,7 +390,7 @@ func (e *TableMapEvent) decodeStrValue(v []byte) (ret [][][]byte, err error) {
390390
}
391391
ret = append(ret, vals)
392392
}
393-
return
393+
return ret, err
394394
}
395395

396396
func (e *TableMapEvent) decodeSimplePrimaryKey(v []byte) error {
@@ -561,7 +561,7 @@ func (e *TableMapEvent) Dump(w io.Writer) {
561561
// i must be in range [0, ColumnCount).
562562
func (e *TableMapEvent) Nullable(i int) (available, nullable bool) {
563563
if len(e.NullBitmap) == 0 {
564-
return
564+
return available, nullable
565565
}
566566
return true, e.NullBitmap[i/8]&(1<<uint(i%8)) != 0
567567
}
@@ -1083,7 +1083,7 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
10831083
data, err2 = mysql.DecompressMariadbData(data[pos:])
10841084
if err2 != nil {
10851085
//nolint:nakedret
1086-
return
1086+
return err2
10871087
}
10881088
pos = 0
10891089
}
@@ -1481,7 +1481,7 @@ func decodeString(data []byte, length int) (v string, n int) {
14811481
v = utils.ByteSliceToString(data[2:n])
14821482
}
14831483

1484-
return
1484+
return v, n
14851485
}
14861486

14871487
// ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137
@@ -1502,7 +1502,7 @@ func decodeDecimalDecompressValue(compIndx int, data []byte, mask uint8) (size i
15021502
case 4:
15031503
value = uint32(data[3]^mask) | uint32(data[2]^mask)<<8 | uint32(data[1]^mask)<<16 | uint32(data[0]^mask)<<24
15041504
}
1505-
return
1505+
return size, value
15061506
}
15071507

15081508
var zeros = [digitsPerInteger]byte{48, 48, 48, 48, 48, 48, 48, 48, 48}
@@ -1625,7 +1625,7 @@ func decodeBit(data []byte, nbits int, length int) (value int64, err error) {
16251625
value = int64(data[0])
16261626
}
16271627
}
1628-
return
1628+
return value, err
16291629
}
16301630

16311631
func littleDecodeBit(data []byte, nbits int, length int) (value int64, err error) {
@@ -1657,7 +1657,7 @@ func littleDecodeBit(data []byte, nbits int, length int) (value int64, err error
16571657
value = int64(data[0])
16581658
}
16591659
}
1660-
return
1660+
return value, err
16611661
}
16621662

16631663
func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Location) (interface{}, int, error) {
@@ -1859,7 +1859,7 @@ func decodeBlob(data []byte, meta uint16) (v []byte, n int, err error) {
18591859
err = fmt.Errorf("invalid blob packlen = %d", meta)
18601860
}
18611861

1862-
return
1862+
return v, n, err
18631863
}
18641864

18651865
func (e *RowsEvent) Dump(w io.Writer) {

serialization/serialization.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (f *Format) stringParts() (parts []string) {
5858
parts = append(parts, fmt.Sprintf(" Value: %s", f.Type.String()))
5959
}
6060
}
61-
return
61+
return parts
6262
}
6363

6464
// Field represents a `message_field`

0 commit comments

Comments
 (0)