tidb glue_checkpoint 源码
tidb glue_checkpoint 代码
文件路径:/br/pkg/lightning/checkpoints/glue_checkpoint.go
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkpoints
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
type Session interface {
Close()
Execute(context.Context, string) ([]sqlexec.RecordSet, error)
CommitTxn(context.Context) error
RollbackTxn(context.Context)
PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error)
ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error)
DropPreparedStmt(stmtID uint32) error
}
// GlueCheckpointsDB is almost same with MySQLCheckpointsDB, but it uses TiDB's internal data structure which requires a
// lot to keep same with database/sql.
// TODO: Encapsulate Begin/Commit/Rollback txn, form SQL with args and query/iter/scan TiDB's RecordSet into a interface
// to reuse MySQLCheckpointsDB.
type GlueCheckpointsDB struct {
// getSessionFunc will get a new session from TiDB
getSessionFunc func() (Session, error)
schema string
}
var _ DB = (*GlueCheckpointsDB)(nil)
// dropPreparedStmt drops the statement and when meet an error,
// print an error message.
func dropPreparedStmt(ctx context.Context, session Session, stmtID uint32) {
if err := session.DropPreparedStmt(stmtID); err != nil {
log.FromContext(ctx).Error("failed to drop prepared statement", log.ShortError(err))
}
}
func NewGlueCheckpointsDB(ctx context.Context, se Session, f func() (Session, error), schemaName string) (*GlueCheckpointsDB, error) {
var escapedSchemaName strings.Builder
common.WriteMySQLIdentifier(&escapedSchemaName, schemaName)
schema := escapedSchemaName.String()
logger := log.FromContext(ctx).With(zap.String("schema", schemaName))
sql := fmt.Sprintf(CreateDBTemplate, schema)
err := common.Retry("create checkpoints database", logger, func() error {
_, err := se.Execute(ctx, sql)
return err
})
if err != nil {
return nil, errors.Trace(err)
}
sql = fmt.Sprintf(CreateTaskTableTemplate, schema, CheckpointTableNameTask)
err = common.Retry("create task checkpoints table", logger, func() error {
_, err := se.Execute(ctx, sql)
return err
})
if err != nil {
return nil, errors.Trace(err)
}
sql = fmt.Sprintf(CreateTableTableTemplate, schema, CheckpointTableNameTable)
err = common.Retry("create table checkpoints table", logger, func() error {
_, err := se.Execute(ctx, sql)
return err
})
if err != nil {
return nil, errors.Trace(err)
}
sql = fmt.Sprintf(CreateEngineTableTemplate, schema, CheckpointTableNameEngine)
err = common.Retry("create engine checkpoints table", logger, func() error {
_, err := se.Execute(ctx, sql)
return err
})
if err != nil {
return nil, errors.Trace(err)
}
sql = fmt.Sprintf(CreateChunkTableTemplate, schema, CheckpointTableNameChunk)
err = common.Retry("create chunks checkpoints table", logger, func() error {
_, err := se.Execute(ctx, sql)
return err
})
if err != nil {
return nil, errors.Trace(err)
}
return &GlueCheckpointsDB{
getSessionFunc: f,
schema: schema,
}, nil
}
func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error {
logger := log.FromContext(ctx)
se, err := g.getSessionFunc()
if err != nil {
return errors.Trace(err)
}
defer se.Close()
err = Transact(ctx, "insert checkpoints", se, logger, func(c context.Context, s Session) error {
stmtID, _, _, err := s.PrepareStmt(fmt.Sprintf(InitTaskTemplate, g.schema, CheckpointTableNameTask))
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, stmtID)
_, err = s.ExecutePreparedStmt(c, stmtID, []types.Datum{
types.NewIntDatum(cfg.TaskID),
types.NewStringDatum(cfg.Mydumper.SourceDir),
types.NewStringDatum(cfg.TikvImporter.Backend),
types.NewStringDatum(cfg.TikvImporter.Addr),
types.NewStringDatum(cfg.TiDB.Host),
types.NewIntDatum(int64(cfg.TiDB.Port)),
types.NewStringDatum(cfg.TiDB.PdAddr),
types.NewStringDatum(cfg.TikvImporter.SortedKVDir),
types.NewStringDatum(build.ReleaseVersion),
})
if err != nil {
return errors.Trace(err)
}
stmtID2, _, _, err := s.PrepareStmt(fmt.Sprintf(InitTableTemplate, g.schema, CheckpointTableNameTable))
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, stmtID2)
for _, db := range dbInfo {
for _, table := range db.Tables {
tableName := common.UniqueTable(db.Name, table.Name)
_, err = s.ExecutePreparedStmt(c, stmtID2, []types.Datum{
types.NewIntDatum(cfg.TaskID),
types.NewStringDatum(tableName),
types.NewIntDatum(0),
types.NewIntDatum(table.ID),
})
if err != nil {
return errors.Trace(err)
}
}
}
return nil
})
return errors.Trace(err)
}
func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error) {
logger := log.FromContext(ctx)
sql := fmt.Sprintf(ReadTaskTemplate, g.schema, CheckpointTableNameTask)
se, err := g.getSessionFunc()
if err != nil {
return nil, errors.Trace(err)
}
defer se.Close()
var taskCp *TaskCheckpoint
err = common.Retry("fetch task checkpoint", logger, func() error {
rs, err := se.Execute(ctx, sql)
if err != nil {
return errors.Trace(err)
}
r := rs[0]
//nolint: errcheck
defer r.Close()
req := r.NewChunk(nil)
err = r.Next(ctx, req)
if err != nil {
return err
}
if req.NumRows() == 0 {
return nil
}
row := req.GetRow(0)
taskCp = &TaskCheckpoint{}
taskCp.TaskID = row.GetInt64(0)
taskCp.SourceDir = row.GetString(1)
taskCp.Backend = row.GetString(2)
taskCp.ImporterAddr = row.GetString(3)
taskCp.TiDBHost = row.GetString(4)
taskCp.TiDBPort = int(row.GetInt64(5))
taskCp.PdAddr = row.GetString(6)
taskCp.SortedKVDir = row.GetString(7)
taskCp.LightningVer = row.GetString(8)
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return taskCp, nil
}
func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error) {
cp := &TableCheckpoint{
Engines: map[int32]*EngineCheckpoint{},
}
logger := log.FromContext(ctx).With(zap.String("table", tableName))
se, err := g.getSessionFunc()
if err != nil {
return nil, errors.Trace(err)
}
defer se.Close()
tableName = common.InterpolateMySQLString(tableName)
err = Transact(ctx, "read checkpoint", se, logger, func(c context.Context, s Session) error {
// 1. Populate the engines.
sql := fmt.Sprintf(ReadEngineTemplate, g.schema, CheckpointTableNameEngine)
sql = strings.ReplaceAll(sql, "?", tableName)
rs, err := s.Execute(ctx, sql)
if err != nil {
return errors.Trace(err)
}
r := rs[0]
req := r.NewChunk(nil)
it := chunk.NewIterator4Chunk(req)
for {
err = r.Next(ctx, req)
if err != nil {
_ = r.Close()
return err
}
if req.NumRows() == 0 {
break
}
for row := it.Begin(); row != it.End(); row = it.Next() {
engineID := int32(row.GetInt64(0))
status := uint8(row.GetUint64(1))
cp.Engines[engineID] = &EngineCheckpoint{
Status: CheckpointStatus(status),
}
}
}
_ = r.Close()
// 2. Populate the chunks.
sql = fmt.Sprintf(ReadChunkTemplate, g.schema, CheckpointTableNameChunk)
sql = strings.ReplaceAll(sql, "?", tableName)
rs, err = s.Execute(ctx, sql)
if err != nil {
return errors.Trace(err)
}
r = rs[0]
req = r.NewChunk(nil)
it = chunk.NewIterator4Chunk(req)
for {
err = r.Next(ctx, req)
if err != nil {
_ = r.Close()
return err
}
if req.NumRows() == 0 {
break
}
for row := it.Begin(); row != it.End(); row = it.Next() {
value := &ChunkCheckpoint{}
engineID := int32(row.GetInt64(0))
value.Key.Path = row.GetString(1)
value.Key.Offset = row.GetInt64(2)
value.FileMeta.Type = mydump.SourceType(row.GetInt64(3))
value.FileMeta.Compression = mydump.Compression(row.GetInt64(4))
value.FileMeta.SortKey = row.GetString(5)
value.FileMeta.FileSize = row.GetInt64(6)
colPerm := row.GetBytes(7)
value.Chunk.Offset = row.GetInt64(8)
value.Chunk.EndOffset = row.GetInt64(9)
value.Chunk.PrevRowIDMax = row.GetInt64(10)
value.Chunk.RowIDMax = row.GetInt64(11)
kvcBytes := row.GetUint64(12)
kvcKVs := row.GetUint64(13)
kvcChecksum := row.GetUint64(14)
value.Timestamp = row.GetInt64(15)
value.FileMeta.Path = value.Key.Path
value.Checksum = verify.MakeKVChecksum(kvcBytes, kvcKVs, kvcChecksum)
if err := json.Unmarshal(colPerm, &value.ColumnPermutation); err != nil {
_ = r.Close()
return errors.Trace(err)
}
cp.Engines[engineID].Chunks = append(cp.Engines[engineID].Chunks, value)
}
}
_ = r.Close()
// 3. Fill in the remaining table info
sql = fmt.Sprintf(ReadTableRemainTemplate, g.schema, CheckpointTableNameTable)
sql = strings.ReplaceAll(sql, "?", tableName)
rs, err = s.Execute(ctx, sql)
if err != nil {
return errors.Trace(err)
}
r = rs[0]
//nolint: errcheck
defer r.Close()
req = r.NewChunk(nil)
err = r.Next(ctx, req)
if err != nil {
return err
}
if req.NumRows() == 0 {
return nil
}
row := req.GetRow(0)
cp.Status = CheckpointStatus(row.GetUint64(0))
cp.AllocBase = row.GetInt64(1)
cp.TableID = row.GetInt64(2)
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return cp, nil
}
func (g GlueCheckpointsDB) Close() error {
return nil
}
func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpointMap map[int32]*EngineCheckpoint) error {
logger := log.FromContext(ctx).With(zap.String("table", tableName))
se, err := g.getSessionFunc()
if err != nil {
return errors.Trace(err)
}
defer se.Close()
err = Transact(ctx, "update engine checkpoints", se, logger, func(c context.Context, s Session) error {
engineStmt, _, _, err := s.PrepareStmt(fmt.Sprintf(ReplaceEngineTemplate, g.schema, CheckpointTableNameEngine))
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, engineStmt)
chunkStmt, _, _, err := s.PrepareStmt(fmt.Sprintf(ReplaceChunkTemplate, g.schema, CheckpointTableNameChunk))
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, chunkStmt)
for engineID, engine := range checkpointMap {
_, err := s.ExecutePreparedStmt(c, engineStmt, []types.Datum{
types.NewStringDatum(tableName),
types.NewIntDatum(int64(engineID)),
types.NewUintDatum(uint64(engine.Status)),
})
if err != nil {
return errors.Trace(err)
}
for _, value := range engine.Chunks {
columnPerm, err := json.Marshal(value.ColumnPermutation)
if err != nil {
return errors.Trace(err)
}
_, err = s.ExecutePreparedStmt(c, chunkStmt, []types.Datum{
types.NewStringDatum(tableName),
types.NewIntDatum(int64(engineID)),
types.NewStringDatum(value.Key.Path),
types.NewIntDatum(value.Key.Offset),
types.NewIntDatum(int64(value.FileMeta.Type)),
types.NewIntDatum(int64(value.FileMeta.Compression)),
types.NewStringDatum(value.FileMeta.SortKey),
types.NewIntDatum(value.FileMeta.FileSize),
types.NewBytesDatum(columnPerm),
types.NewIntDatum(value.Chunk.Offset),
types.NewIntDatum(value.Chunk.EndOffset),
types.NewIntDatum(value.Chunk.PrevRowIDMax),
types.NewIntDatum(value.Chunk.RowIDMax),
types.NewIntDatum(value.Timestamp),
})
if err != nil {
return errors.Trace(err)
}
}
}
return nil
})
return errors.Trace(err)
}
func (g GlueCheckpointsDB) Update(ctx context.Context, checkpointDiffs map[string]*TableCheckpointDiff) error {
logger := log.FromContext(ctx)
se, err := g.getSessionFunc()
if err != nil {
log.FromContext(ctx).Error("can't get a session to update GlueCheckpointsDB", zap.Error(errors.Trace(err)))
return err
}
defer se.Close()
chunkQuery := fmt.Sprintf(UpdateChunkTemplate, g.schema, CheckpointTableNameChunk)
rebaseQuery := fmt.Sprintf(UpdateTableRebaseTemplate, g.schema, CheckpointTableNameTable)
tableStatusQuery := fmt.Sprintf(UpdateTableStatusTemplate, g.schema, CheckpointTableNameTable)
engineStatusQuery := fmt.Sprintf(UpdateEngineTemplate, g.schema, CheckpointTableNameEngine)
return Transact(context.Background(), "update checkpoints", se, logger, func(c context.Context, s Session) error {
chunkStmt, _, _, err := s.PrepareStmt(chunkQuery)
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, chunkStmt)
rebaseStmt, _, _, err := s.PrepareStmt(rebaseQuery)
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, rebaseStmt)
tableStatusStmt, _, _, err := s.PrepareStmt(tableStatusQuery)
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, tableStatusStmt)
engineStatusStmt, _, _, err := s.PrepareStmt(engineStatusQuery)
if err != nil {
return errors.Trace(err)
}
defer dropPreparedStmt(ctx, s, engineStatusStmt)
for tableName, cpd := range checkpointDiffs {
if cpd.hasStatus {
_, err := s.ExecutePreparedStmt(c, tableStatusStmt, []types.Datum{
types.NewUintDatum(uint64(cpd.status)),
types.NewStringDatum(tableName),
})
if err != nil {
return errors.Trace(err)
}
}
if cpd.hasRebase {
_, err := s.ExecutePreparedStmt(c, rebaseStmt, []types.Datum{
types.NewIntDatum(cpd.allocBase),
types.NewStringDatum(tableName),
})
if err != nil {
return errors.Trace(err)
}
}
for engineID, engineDiff := range cpd.engines {
if engineDiff.hasStatus {
_, err := s.ExecutePreparedStmt(c, engineStatusStmt, []types.Datum{
types.NewUintDatum(uint64(engineDiff.status)),
types.NewStringDatum(tableName),
types.NewIntDatum(int64(engineID)),
})
if err != nil {
return errors.Trace(err)
}
}
for key, diff := range engineDiff.chunks {
columnPerm, err := json.Marshal(diff.columnPermutation)
if err != nil {
return errors.Trace(err)
}
_, err = s.ExecutePreparedStmt(c, chunkStmt, []types.Datum{
types.NewIntDatum(diff.pos),
types.NewIntDatum(diff.rowID),
types.NewUintDatum(diff.checksum.SumSize()),
types.NewUintDatum(diff.checksum.SumKVS()),
types.NewUintDatum(diff.checksum.Sum()),
types.NewBytesDatum(columnPerm),
types.NewStringDatum(tableName),
types.NewIntDatum(int64(engineID)),
types.NewStringDatum(key.Path),
types.NewIntDatum(key.Offset),
})
if err != nil {
return errors.Trace(err)
}
}
}
}
return nil
})
}
func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error {
logger := log.FromContext(ctx).With(zap.String("table", tableName))
se, err := g.getSessionFunc()
if err != nil {
return errors.Trace(err)
}
defer se.Close()
if tableName == allTables {
return common.Retry("remove all checkpoints", logger, func() error {
_, err := se.Execute(ctx, "DROP SCHEMA "+g.schema)
return err
})
}
tableName = common.InterpolateMySQLString(tableName)
deleteChunkQuery := fmt.Sprintf(DeleteCheckpointRecordTemplate, g.schema, CheckpointTableNameChunk)
deleteChunkQuery = strings.ReplaceAll(deleteChunkQuery, "?", tableName)
deleteEngineQuery := fmt.Sprintf(DeleteCheckpointRecordTemplate, g.schema, CheckpointTableNameEngine)
deleteEngineQuery = strings.ReplaceAll(deleteEngineQuery, "?", tableName)
deleteTableQuery := fmt.Sprintf(DeleteCheckpointRecordTemplate, g.schema, CheckpointTableNameTable)
deleteTableQuery = strings.ReplaceAll(deleteTableQuery, "?", tableName)
return errors.Trace(Transact(ctx, "remove checkpoints", se, logger, func(c context.Context, s Session) error {
if _, e := s.Execute(c, deleteChunkQuery); e != nil {
return e
}
if _, e := s.Execute(c, deleteEngineQuery); e != nil {
return e
}
if _, e := s.Execute(c, deleteTableQuery); e != nil {
return e
}
return nil
}))
}
func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error {
newSchema := fmt.Sprintf("`%s.%d.bak`", g.schema[1:len(g.schema)-1], taskID)
logger := log.FromContext(ctx).With(zap.Int64("taskID", taskID))
se, err := g.getSessionFunc()
if err != nil {
return errors.Trace(err)
}
defer se.Close()
err = common.Retry("create backup checkpoints schema", logger, func() error {
_, err := se.Execute(ctx, "CREATE SCHEMA IF NOT EXISTS "+newSchema)
return err
})
if err != nil {
return errors.Trace(err)
}
for _, tbl := range []string{
CheckpointTableNameChunk, CheckpointTableNameEngine,
CheckpointTableNameTable, CheckpointTableNameTask,
} {
query := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", g.schema, newSchema, tbl)
err := common.Retry(fmt.Sprintf("move %s checkpoints table", tbl), logger, func() error {
_, err := se.Execute(ctx, query)
return err
})
if err != nil {
return errors.Trace(err)
}
}
return nil
}
func (g GlueCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error) {
se, err := g.getSessionFunc()
if err != nil {
return nil, errors.Trace(err)
}
defer se.Close()
var targetTables map[string][]int32
// lightning didn't check CheckpointStatusMaxInvalid before this function is called, so we skip invalid ones
// engines should exist if
// 1. table status is earlier than CheckpointStatusIndexImported, and
// 2. engine status is earlier than CheckpointStatusImported, and
// 3. chunk has been read
query := fmt.Sprintf(`
SELECT DISTINCT t.table_name, c.engine_id
FROM %s.%s t, %s.%s c, %s.%s e
WHERE t.table_name = c.table_name AND t.table_name = e.table_name AND c.engine_id = e.engine_id
AND %d < t.status AND t.status < %d
AND %d < e.status AND e.status < %d
AND c.pos > c.offset;`,
g.schema, CheckpointTableNameTable, g.schema, CheckpointTableNameChunk, g.schema, CheckpointTableNameEngine,
CheckpointStatusMaxInvalid, CheckpointStatusIndexImported,
CheckpointStatusMaxInvalid, CheckpointStatusImported)
err = common.Retry("get local storing tables", log.FromContext(ctx), func() error {
targetTables = make(map[string][]int32)
rs, err := se.Execute(ctx, query)
if err != nil {
return errors.Trace(err)
}
rows, err := drainFirstRecordSet(ctx, rs)
if err != nil {
return errors.Trace(err)
}
for _, row := range rows {
tableName := row.GetString(0)
engineID := int32(row.GetInt64(1))
targetTables[tableName] = append(targetTables[tableName], engineID)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return targetTables, err
}
func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error {
logger := log.FromContext(ctx).With(zap.String("table", tableName))
se, err := g.getSessionFunc()
if err != nil {
return errors.Trace(err)
}
defer se.Close()
var colName string
if tableName == allTables {
// This will expand to `WHERE 'all' = 'all'` and effectively allowing
// all tables to be included.
colName = stringLitAll
} else {
colName = columnTableName
}
tableName = common.InterpolateMySQLString(tableName)
engineQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = %d WHERE %s = %s AND status <= %d;
`, g.schema, CheckpointTableNameEngine, CheckpointStatusLoaded, colName, tableName, CheckpointStatusMaxInvalid)
tableQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = %d WHERE %s = %s AND status <= %d;
`, g.schema, CheckpointTableNameTable, CheckpointStatusLoaded, colName, tableName, CheckpointStatusMaxInvalid)
return errors.Trace(Transact(ctx, "ignore error checkpoints", se, logger, func(c context.Context, s Session) error {
if _, e := s.Execute(c, engineQuery); e != nil {
return e
}
if _, e := s.Execute(c, tableQuery); e != nil {
return e
}
return nil
}))
}
func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) {
logger := log.FromContext(ctx).With(zap.String("table", tableName))
se, err := g.getSessionFunc()
if err != nil {
return nil, errors.Trace(err)
}
defer se.Close()
var colName, aliasedColName string
if tableName == allTables {
// These will expand to `WHERE 'all' = 'all'` and effectively allowing
// all tables to be included.
colName = stringLitAll
aliasedColName = stringLitAll
} else {
colName = columnTableName
aliasedColName = "t.table_name"
}
tableName = common.InterpolateMySQLString(tableName)
selectQuery := fmt.Sprintf(`
SELECT
t.table_name,
COALESCE(MIN(e.engine_id), 0),
COALESCE(MAX(e.engine_id), -1)
FROM %[1]s.%[4]s t
LEFT JOIN %[1]s.%[5]s e ON t.table_name = e.table_name
WHERE %[2]s = %[6]s AND t.status <= %[3]d
GROUP BY t.table_name;
`, g.schema, aliasedColName, CheckpointStatusMaxInvalid, CheckpointTableNameTable, CheckpointTableNameEngine, tableName)
deleteChunkQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = %[6]s AND status <= %[3]d)
`, g.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameChunk, CheckpointTableNameTable, tableName)
deleteEngineQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = %[6]s AND status <= %[3]d)
`, g.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameEngine, CheckpointTableNameTable, tableName)
deleteTableQuery := fmt.Sprintf(`
DELETE FROM %s.%s WHERE %s = %s AND status <= %d
`, g.schema, CheckpointTableNameTable, colName, tableName, CheckpointStatusMaxInvalid)
var targetTables []DestroyedTableCheckpoint
err = Transact(ctx, "destroy error checkpoints", se, logger, func(c context.Context, s Session) error {
// clean because it's in a retry
targetTables = nil
rs, err := s.Execute(c, selectQuery)
if err != nil {
return errors.Trace(err)
}
r := rs[0]
req := r.NewChunk(nil)
it := chunk.NewIterator4Chunk(req)
for {
err = r.Next(ctx, req)
if err != nil {
_ = r.Close()
return err
}
if req.NumRows() == 0 {
break
}
for row := it.Begin(); row != it.End(); row = it.Next() {
var dtc DestroyedTableCheckpoint
dtc.TableName = row.GetString(0)
dtc.MinEngineID = int32(row.GetInt64(1))
dtc.MaxEngineID = int32(row.GetInt64(2))
targetTables = append(targetTables, dtc)
}
}
_ = r.Close()
if _, e := s.Execute(c, deleteChunkQuery); e != nil {
return errors.Trace(e)
}
if _, e := s.Execute(c, deleteEngineQuery); e != nil {
return errors.Trace(e)
}
if _, e := s.Execute(c, deleteTableQuery); e != nil {
return errors.Trace(e)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return targetTables, nil
}
func (g GlueCheckpointsDB) DumpTables(ctx context.Context, csv io.Writer) error {
return errors.Errorf("dumping glue checkpoint into CSV not unsupported")
}
func (g GlueCheckpointsDB) DumpEngines(ctx context.Context, csv io.Writer) error {
return errors.Errorf("dumping glue checkpoint into CSV not unsupported")
}
func (g GlueCheckpointsDB) DumpChunks(ctx context.Context, csv io.Writer) error {
return errors.Errorf("dumping glue checkpoint into CSV not unsupported")
}
func Transact(ctx context.Context, purpose string, s Session, logger log.Logger, action func(context.Context, Session) error) error {
return common.Retry(purpose, logger, func() error {
_, err := s.Execute(ctx, "BEGIN")
if err != nil {
return errors.Annotate(err, "begin transaction failed")
}
err = action(ctx, s)
if err != nil {
s.RollbackTxn(ctx)
return err
}
err = s.CommitTxn(ctx)
if err != nil {
return errors.Annotate(err, "commit transaction failed")
}
return nil
})
}
// TODO: will use drainFirstRecordSet to reduce repeat in GlueCheckpointsDB later
func drainFirstRecordSet(ctx context.Context, rss []sqlexec.RecordSet) ([]chunk.Row, error) {
if len(rss) != 1 {
return nil, errors.New("given result set doesn't have length 1")
}
rs := rss[0]
var rows []chunk.Row
req := rs.NewChunk(nil)
for {
err := rs.Next(ctx, req)
if err != nil || req.NumRows() == 0 {
_ = rs.Close()
return rows, err
}
iter := chunk.NewIterator4Chunk(req)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
req = chunk.Renew(req, 1024)
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦