tidb checkpoints 源码
tidb checkpoints 代码
文件路径:/br/pkg/lightning/checkpoints/checkpoints.go
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkpoints
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"math"
"path"
"sort"
"strings"
"sync"
"github.com/joho/sqltocsv"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints/checkpointspb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
type CheckpointStatus uint8
const (
CheckpointStatusMissing CheckpointStatus = 0
CheckpointStatusMaxInvalid CheckpointStatus = 25
CheckpointStatusLoaded CheckpointStatus = 30
CheckpointStatusAllWritten CheckpointStatus = 60
CheckpointStatusClosed CheckpointStatus = 90
CheckpointStatusImported CheckpointStatus = 120
CheckpointStatusIndexImported CheckpointStatus = 140
CheckpointStatusAlteredAutoInc CheckpointStatus = 150
CheckpointStatusChecksumSkipped CheckpointStatus = 170
CheckpointStatusChecksummed CheckpointStatus = 180
CheckpointStatusAnalyzeSkipped CheckpointStatus = 200
CheckpointStatusAnalyzed CheckpointStatus = 210
)
const WholeTableEngineID = math.MaxInt32
const (
// the table names to store each kind of checkpoint in the checkpoint database
// remember to increase the version number in case of incompatible change.
CheckpointTableNameTask = "task_v2"
CheckpointTableNameTable = "table_v7"
CheckpointTableNameEngine = "engine_v5"
CheckpointTableNameChunk = "chunk_v5"
// Some frequently used table name or constants.
allTables = "all"
stringLitAll = "'all'"
columnTableName = "table_name"
)
const (
// shared by MySQLCheckpointsDB and GlueCheckpointsDB
CreateDBTemplate = "CREATE DATABASE IF NOT EXISTS %s;"
CreateTaskTableTemplate = `
CREATE TABLE IF NOT EXISTS %s.%s (
id tinyint(1) PRIMARY KEY,
task_id bigint NOT NULL,
source_dir varchar(256) NOT NULL,
backend varchar(16) NOT NULL,
importer_addr varchar(256),
tidb_host varchar(128) NOT NULL,
tidb_port int NOT NULL,
pd_addr varchar(128) NOT NULL,
sorted_kv_dir varchar(256) NOT NULL,
lightning_ver varchar(48) NOT NULL
);`
CreateTableTableTemplate = `
CREATE TABLE IF NOT EXISTS %s.%s (
task_id bigint NOT NULL,
table_name varchar(261) NOT NULL PRIMARY KEY,
hash binary(32) NOT NULL,
status tinyint unsigned DEFAULT 30,
alloc_base bigint NOT NULL DEFAULT 0,
table_id bigint NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
kv_bytes bigint unsigned NOT NULL DEFAULT 0,
kv_kvs bigint unsigned NOT NULL DEFAULT 0,
kv_checksum bigint unsigned NOT NULL DEFAULT 0,
INDEX(task_id)
);`
CreateEngineTableTemplate = `
CREATE TABLE IF NOT EXISTS %s.%s (
table_name varchar(261) NOT NULL,
engine_id int NOT NULL,
status tinyint unsigned DEFAULT 30,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(table_name, engine_id DESC)
);`
CreateChunkTableTemplate = `
CREATE TABLE IF NOT EXISTS %s.%s (
table_name varchar(261) NOT NULL,
engine_id int unsigned NOT NULL,
path varchar(2048) NOT NULL,
offset bigint NOT NULL,
type int NOT NULL,
compression int NOT NULL,
sort_key varchar(256) NOT NULL,
file_size bigint NOT NULL,
columns text NULL,
should_include_row_id BOOL NOT NULL,
end_offset bigint NOT NULL,
pos bigint NOT NULL,
prev_rowid_max bigint NOT NULL,
rowid_max bigint NOT NULL,
kvc_bytes bigint unsigned NOT NULL DEFAULT 0,
kvc_kvs bigint unsigned NOT NULL DEFAULT 0,
kvc_checksum bigint unsigned NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(table_name, engine_id, path(500), offset)
);`
InitTaskTemplate = `
REPLACE INTO %s.%s (id, task_id, source_dir, backend, importer_addr, tidb_host, tidb_port, pd_addr, sorted_kv_dir, lightning_ver)
VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?, ?);`
InitTableTemplate = `
INSERT INTO %s.%s (task_id, table_name, hash, table_id) VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE task_id = CASE
WHEN hash = VALUES(hash)
THEN VALUES(task_id)
END;`
ReadTaskTemplate = `
SELECT task_id, source_dir, backend, importer_addr, tidb_host, tidb_port, pd_addr, sorted_kv_dir, lightning_ver FROM %s.%s WHERE id = 1;`
ReadEngineTemplate = `
SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;`
ReadChunkTemplate = `
SELECT
engine_id, path, offset, type, compression, sort_key, file_size, columns,
pos, end_offset, prev_rowid_max, rowid_max,
kvc_bytes, kvc_kvs, kvc_checksum, unix_timestamp(create_time)
FROM %s.%s WHERE table_name = ?
ORDER BY engine_id, path, offset;`
ReadTableRemainTemplate = `
SELECT status, alloc_base, table_id, kv_bytes, kv_kvs, kv_checksum FROM %s.%s WHERE table_name = ?;`
ReplaceEngineTemplate = `
REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);`
ReplaceChunkTemplate = `
REPLACE INTO %s.%s (
table_name, engine_id,
path, offset, type, compression, sort_key, file_size, columns, should_include_row_id,
pos, end_offset, prev_rowid_max, rowid_max,
kvc_bytes, kvc_kvs, kvc_checksum, create_time
) VALUES (
?, ?,
?, ?, ?, ?, ?, ?, ?, FALSE,
?, ?, ?, ?,
0, 0, 0, from_unixtime(?)
);`
UpdateChunkTemplate = `
UPDATE %s.%s SET pos = ?, prev_rowid_max = ?, kvc_bytes = ?, kvc_kvs = ?, kvc_checksum = ?, columns = ?
WHERE (table_name, engine_id, path, offset) = (?, ?, ?, ?);`
UpdateTableRebaseTemplate = `
UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;`
UpdateTableStatusTemplate = `
UPDATE %s.%s SET status = ? WHERE table_name = ?;`
UpdateTableChecksumTemplate = `UPDATE %s.%s SET kv_bytes = ?, kv_kvs = ?, kv_checksum = ? WHERE table_name = ?;`
UpdateEngineTemplate = `
UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);`
DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;"
)
func IsCheckpointTable(name string) bool {
switch name {
case CheckpointTableNameTask, CheckpointTableNameTable, CheckpointTableNameEngine, CheckpointTableNameChunk:
return true
default:
return false
}
}
func (status CheckpointStatus) MetricName() string {
switch status {
case CheckpointStatusLoaded:
return "pending"
case CheckpointStatusAllWritten:
return "written"
case CheckpointStatusClosed:
return "closed"
case CheckpointStatusImported:
return "imported"
case CheckpointStatusIndexImported:
return "index_imported"
case CheckpointStatusAlteredAutoInc:
return "altered_auto_inc"
case CheckpointStatusChecksummed, CheckpointStatusChecksumSkipped:
return "checksum"
case CheckpointStatusAnalyzed, CheckpointStatusAnalyzeSkipped:
return "analyzed"
case CheckpointStatusMissing:
return "missing"
default:
return "invalid"
}
}
type ChunkCheckpointKey struct {
Path string
Offset int64
}
func (key *ChunkCheckpointKey) String() string {
return fmt.Sprintf("%s:%d", key.Path, key.Offset)
}
func (key *ChunkCheckpointKey) less(other *ChunkCheckpointKey) bool {
switch {
case key.Path < other.Path:
return true
case key.Path > other.Path:
return false
default:
return key.Offset < other.Offset
}
}
type ChunkCheckpoint struct {
Key ChunkCheckpointKey
FileMeta mydump.SourceFileMeta
ColumnPermutation []int
Chunk mydump.Chunk
Checksum verify.KVChecksum
Timestamp int64
}
func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint {
colPerm := make([]int, 0, len(ccp.ColumnPermutation))
colPerm = append(colPerm, ccp.ColumnPermutation...)
return &ChunkCheckpoint{
Key: ccp.Key,
FileMeta: ccp.FileMeta,
ColumnPermutation: colPerm,
Chunk: ccp.Chunk,
Checksum: ccp.Checksum,
Timestamp: ccp.Timestamp,
}
}
type EngineCheckpoint struct {
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
}
func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint {
chunks := make([]*ChunkCheckpoint, 0, len(engine.Chunks))
for _, chunk := range engine.Chunks {
chunks = append(chunks, chunk.DeepCopy())
}
return &EngineCheckpoint{
Status: engine.Status,
Chunks: chunks,
}
}
type TableCheckpoint struct {
Status CheckpointStatus
AllocBase int64
Engines map[int32]*EngineCheckpoint
TableID int64
// remote checksum before restore
Checksum verify.KVChecksum
}
func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint {
engines := make(map[int32]*EngineCheckpoint, len(cp.Engines))
for engineID, engine := range cp.Engines {
engines[engineID] = engine.DeepCopy()
}
return &TableCheckpoint{
Status: cp.Status,
AllocBase: cp.AllocBase,
Engines: engines,
TableID: cp.TableID,
Checksum: cp.Checksum,
}
}
func (cp *TableCheckpoint) CountChunks() int {
result := 0
for _, engine := range cp.Engines {
result += len(engine.Chunks)
}
return result
}
type chunkCheckpointDiff struct {
pos int64
rowID int64
checksum verify.KVChecksum
columnPermutation []int
}
type engineCheckpointDiff struct {
hasStatus bool
status CheckpointStatus
chunks map[ChunkCheckpointKey]chunkCheckpointDiff
}
type TableCheckpointDiff struct {
hasStatus bool
hasRebase bool
hasChecksum bool
status CheckpointStatus
allocBase int64
engines map[int32]engineCheckpointDiff
checksum verify.KVChecksum
}
func NewTableCheckpointDiff() *TableCheckpointDiff {
return &TableCheckpointDiff{
engines: make(map[int32]engineCheckpointDiff),
}
}
func (cpd *TableCheckpointDiff) insertEngineCheckpointDiff(engineID int32, newDiff engineCheckpointDiff) {
if oldDiff, ok := cpd.engines[engineID]; ok {
if newDiff.hasStatus {
oldDiff.hasStatus = true
oldDiff.status = newDiff.status
}
for key, chunkDiff := range newDiff.chunks {
oldDiff.chunks[key] = chunkDiff
}
newDiff = oldDiff
}
cpd.engines[engineID] = newDiff
}
func (cpd *TableCheckpointDiff) String() string {
return fmt.Sprintf(
"{hasStatus:%v, hasRebase:%v, status:%d, allocBase:%d, engines:[%d]}",
cpd.hasStatus, cpd.hasRebase, cpd.status, cpd.allocBase, len(cpd.engines),
)
}
// Apply the diff to the existing chunk and engine checkpoints in `cp`.
func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff) {
if cpd.hasStatus {
cp.Status = cpd.status
}
if cpd.hasRebase {
cp.AllocBase = cpd.allocBase
}
for engineID, engineDiff := range cpd.engines {
engine := cp.Engines[engineID]
if engine == nil {
continue
}
if engineDiff.hasStatus {
engine.Status = engineDiff.status
}
for key, diff := range engineDiff.chunks {
checkpointKey := key
index := sort.Search(len(engine.Chunks), func(i int) bool {
return !engine.Chunks[i].Key.less(&checkpointKey)
})
if index >= len(engine.Chunks) {
continue
}
chunk := engine.Chunks[index]
if chunk.Key != checkpointKey {
continue
}
chunk.Chunk.Offset = diff.pos
chunk.Chunk.PrevRowIDMax = diff.rowID
chunk.Checksum = diff.checksum
}
}
}
type TableCheckpointMerger interface {
// MergeInto the table checkpoint diff from a status update or chunk update.
// If there are multiple updates to the same table, only the last one will
// take effect. Therefore, the caller must ensure events for the same table
// are properly ordered by the global time (an old event must be merged
// before the new one).
MergeInto(cpd *TableCheckpointDiff)
}
type StatusCheckpointMerger struct {
EngineID int32 // WholeTableEngineID == apply to whole table.
Status CheckpointStatus
}
func (merger *StatusCheckpointMerger) SetInvalid() {
merger.Status /= 10
}
func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) {
if merger.EngineID == WholeTableEngineID || merger.Status <= CheckpointStatusMaxInvalid {
cpd.status = merger.Status
cpd.hasStatus = true
}
if merger.EngineID != WholeTableEngineID {
cpd.insertEngineCheckpointDiff(merger.EngineID, engineCheckpointDiff{
hasStatus: true,
status: merger.Status,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
})
}
}
type ChunkCheckpointMerger struct {
EngineID int32
Key ChunkCheckpointKey
Checksum verify.KVChecksum
Pos int64
RowID int64
ColumnPermutation []int
EndOffset int64 // For test only.
}
func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) {
cpd.insertEngineCheckpointDiff(merger.EngineID, engineCheckpointDiff{
chunks: map[ChunkCheckpointKey]chunkCheckpointDiff{
merger.Key: {
pos: merger.Pos,
rowID: merger.RowID,
checksum: merger.Checksum,
columnPermutation: merger.ColumnPermutation,
},
},
})
}
type TableChecksumMerger struct {
Checksum verify.KVChecksum
}
func (m *TableChecksumMerger) MergeInto(cpd *TableCheckpointDiff) {
cpd.hasChecksum = true
cpd.checksum = m.Checksum
}
type RebaseCheckpointMerger struct {
AllocBase int64
}
func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) {
cpd.hasRebase = true
cpd.allocBase = mathutil.Max(cpd.allocBase, merger.AllocBase)
}
type DestroyedTableCheckpoint struct {
TableName string
MinEngineID int32
MaxEngineID int32
}
type TaskCheckpoint struct {
TaskID int64
SourceDir string
Backend string
ImporterAddr string
TiDBHost string
TiDBPort int
PdAddr string
SortedKVDir string
LightningVer string
}
type DB interface {
Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
Close() error
// InsertEngineCheckpoints initializes the checkpoints related to a table.
// It assumes the entire table has not been imported before and will fill in
// default values for the column permutations and checksums.
InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
Update(taskCtx context.Context, checkpointDiffs map[string]*TableCheckpointDiff) error
RemoveCheckpoint(ctx context.Context, tableName string) error
// MoveCheckpoints renames the checkpoint schema to include a suffix
// including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`).
MoveCheckpoints(ctx context.Context, taskID int64) error
// GetLocalStoringTables returns a map containing tables have engine files stored on local disk.
// currently only meaningful for local backend
GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
DumpTables(ctx context.Context, csv io.Writer) error
DumpEngines(ctx context.Context, csv io.Writer) error
DumpChunks(ctx context.Context, csv io.Writer) error
}
func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (DB, error) {
if !cfg.Checkpoint.Enable {
return NewNullCheckpointsDB(), nil
}
switch cfg.Checkpoint.Driver {
case config.CheckpointDriverMySQL:
db, err := common.ConnectMySQL(cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema)
if err != nil {
_ = db.Close()
return nil, errors.Trace(err)
}
return cpdb, nil
case config.CheckpointDriverFile:
cpdb, err := NewFileCheckpointsDB(ctx, cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
return cpdb, nil
default:
return nil, common.ErrUnknownCheckpointDriver.GenWithStackByArgs(cfg.Checkpoint.Driver)
}
}
func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error) {
if !cfg.Checkpoint.Enable {
return false, nil
}
switch cfg.Checkpoint.Driver {
case config.CheckpointDriverMySQL:
db, err := sql.Open("mysql", cfg.Checkpoint.DSN)
if err != nil {
return false, errors.Trace(err)
}
//nolint: errcheck
defer db.Close()
checkSQL := "SHOW DATABASES WHERE `DATABASE` = ?"
rows, err := db.QueryContext(ctx, checkSQL, cfg.Checkpoint.Schema)
if err != nil {
return false, errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
result := rows.Next()
if err := rows.Err(); err != nil {
return false, errors.Trace(err)
}
return result, nil
case config.CheckpointDriverFile:
s, fileName, err := createExstorageByCompletePath(ctx, cfg.Checkpoint.DSN)
if err != nil {
return false, errors.Trace(err)
}
result, err := s.FileExists(ctx, fileName)
if err != nil {
return false, errors.Trace(err)
}
return result, nil
default:
return false, common.ErrUnknownCheckpointDriver.GenWithStackByArgs(cfg.Checkpoint.Driver)
}
}
// NullCheckpointsDB is a checkpoints database with no checkpoints.
type NullCheckpointsDB struct{}
func NewNullCheckpointsDB() *NullCheckpointsDB {
return &NullCheckpointsDB{}
}
func (*NullCheckpointsDB) Initialize(context.Context, *config.Config, map[string]*TidbDBInfo) error {
return nil
}
func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error) {
return nil, nil
}
func (*NullCheckpointsDB) Close() error {
return nil
}
func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error) {
return &TableCheckpoint{
Status: CheckpointStatusLoaded,
Engines: map[int32]*EngineCheckpoint{},
}, nil
}
func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error {
return nil
}
func (*NullCheckpointsDB) Update(context.Context, map[string]*TableCheckpointDiff) error {
return nil
}
type MySQLCheckpointsDB struct {
db *sql.DB
schema string
}
func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error) {
schema := common.EscapeIdentifier(schemaName)
sql := common.SQLWithRetry{
DB: db,
Logger: log.FromContext(ctx).With(zap.String("schema", schemaName)),
HideQueryLog: true,
}
err := sql.Exec(ctx, "create checkpoints database", fmt.Sprintf(CreateDBTemplate, schema))
if err != nil {
return nil, errors.Trace(err)
}
err = sql.Exec(ctx, "create task checkpoints table", fmt.Sprintf(CreateTaskTableTemplate, schema, CheckpointTableNameTask))
if err != nil {
return nil, errors.Trace(err)
}
err = sql.Exec(ctx, "create table checkpoints table", fmt.Sprintf(CreateTableTableTemplate, schema, CheckpointTableNameTable))
if err != nil {
return nil, errors.Trace(err)
}
err = sql.Exec(ctx, "create engine checkpoints table", fmt.Sprintf(CreateEngineTableTemplate, schema, CheckpointTableNameEngine))
if err != nil {
return nil, errors.Trace(err)
}
err = sql.Exec(ctx, "create chunks checkpoints table", fmt.Sprintf(CreateChunkTableTemplate, schema, CheckpointTableNameChunk))
if err != nil {
return nil, errors.Trace(err)
}
return &MySQLCheckpointsDB{
db: db,
schema: schema,
}, nil
}
func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error {
// We can have at most 65535 placeholders https://stackoverflow.com/q/4922345/
// Since this step is not performance critical, we just insert the rows one-by-one.
s := common.SQLWithRetry{DB: cpdb.db, Logger: log.FromContext(ctx)}
err := s.Transact(ctx, "insert checkpoints", func(c context.Context, tx *sql.Tx) error {
taskStmt, err := tx.PrepareContext(c, fmt.Sprintf(InitTaskTemplate, cpdb.schema, CheckpointTableNameTask))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer taskStmt.Close()
_, err = taskStmt.ExecContext(ctx, cfg.TaskID, cfg.Mydumper.SourceDir, cfg.TikvImporter.Backend,
cfg.TikvImporter.Addr, cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.PdAddr, cfg.TikvImporter.SortedKVDir,
build.ReleaseVersion)
if err != nil {
return errors.Trace(err)
}
// If `hash` is not the same but the `table_name` duplicates,
// the CASE expression will return NULL, which can be used to violate
// the NOT NULL requirement of `task_id` column, and caused this INSERT
// statement to fail with an irrecoverable error.
// We do need to capture the error is display a user friendly message
// (multiple nodes cannot import the same table) though.
stmt, err := tx.PrepareContext(c, fmt.Sprintf(InitTableTemplate, cpdb.schema, CheckpointTableNameTable))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer stmt.Close()
for _, db := range dbInfo {
for _, table := range db.Tables {
tableName := common.UniqueTable(db.Name, table.Name)
_, err = stmt.ExecContext(c, cfg.TaskID, tableName, CheckpointStatusLoaded, table.ID)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
})
if err != nil {
return errors.Trace(err)
}
return nil
}
func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error) {
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.FromContext(ctx),
}
taskQuery := fmt.Sprintf(ReadTaskTemplate, cpdb.schema, CheckpointTableNameTask)
taskCp := &TaskCheckpoint{}
err := s.QueryRow(ctx, "fetch task checkpoint", taskQuery, &taskCp.TaskID, &taskCp.SourceDir, &taskCp.Backend,
&taskCp.ImporterAddr, &taskCp.TiDBHost, &taskCp.TiDBPort, &taskCp.PdAddr, &taskCp.SortedKVDir, &taskCp.LightningVer)
if err != nil {
// if task checkpoint is empty, return nil
if errors.Cause(err) == sql.ErrNoRows {
return nil, nil
}
return nil, errors.Trace(err)
}
return taskCp, nil
}
func (cpdb *MySQLCheckpointsDB) Close() error {
return errors.Trace(cpdb.db.Close())
}
func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error) {
cp := &TableCheckpoint{
Engines: map[int32]*EngineCheckpoint{},
}
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.FromContext(ctx).With(zap.String("table", tableName)),
}
err := s.Transact(ctx, "read checkpoint", func(c context.Context, tx *sql.Tx) error {
// 1. Populate the engines.
engineQuery := fmt.Sprintf(ReadEngineTemplate, cpdb.schema, CheckpointTableNameEngine)
engineRows, err := tx.QueryContext(c, engineQuery, tableName)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer engineRows.Close()
for engineRows.Next() {
var (
engineID int32
status uint8
)
if err := engineRows.Scan(&engineID, &status); err != nil {
return errors.Trace(err)
}
cp.Engines[engineID] = &EngineCheckpoint{
Status: CheckpointStatus(status),
}
}
if err := engineRows.Err(); err != nil {
return errors.Trace(err)
}
// 2. Populate the chunks.
chunkQuery := fmt.Sprintf(ReadChunkTemplate, cpdb.schema, CheckpointTableNameChunk)
chunkRows, err := tx.QueryContext(c, chunkQuery, tableName)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer chunkRows.Close()
for chunkRows.Next() {
var (
value = &ChunkCheckpoint{}
colPerm []byte
engineID int32
kvcBytes uint64
kvcKVs uint64
kvcChecksum uint64
)
if err := chunkRows.Scan(
&engineID, &value.Key.Path, &value.Key.Offset, &value.FileMeta.Type, &value.FileMeta.Compression,
&value.FileMeta.SortKey, &value.FileMeta.FileSize, &colPerm, &value.Chunk.Offset, &value.Chunk.EndOffset,
&value.Chunk.PrevRowIDMax, &value.Chunk.RowIDMax, &kvcBytes, &kvcKVs, &kvcChecksum,
&value.Timestamp,
); err != nil {
return errors.Trace(err)
}
value.FileMeta.Path = value.Key.Path
value.Checksum = verify.MakeKVChecksum(kvcBytes, kvcKVs, kvcChecksum)
if err := json.Unmarshal(colPerm, &value.ColumnPermutation); err != nil {
return errors.Trace(err)
}
cp.Engines[engineID].Chunks = append(cp.Engines[engineID].Chunks, value)
}
if err := chunkRows.Err(); err != nil {
return errors.Trace(err)
}
// 3. Fill in the remaining table info
tableQuery := fmt.Sprintf(ReadTableRemainTemplate, cpdb.schema, CheckpointTableNameTable)
tableRow := tx.QueryRowContext(c, tableQuery, tableName)
var status uint8
var kvs, bytes, checksum uint64
if err := tableRow.Scan(&status, &cp.AllocBase, &cp.TableID, &bytes, &kvs, &checksum); err != nil {
if err == sql.ErrNoRows {
return errors.NotFoundf("checkpoint for table %s", tableName)
}
}
cp.Checksum = verify.MakeKVChecksum(bytes, kvs, checksum)
cp.Status = CheckpointStatus(status)
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return cp, nil
}
func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error {
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.FromContext(ctx).With(zap.String("table", tableName)),
}
err := s.Transact(ctx, "update engine checkpoints", func(c context.Context, tx *sql.Tx) error {
engineStmt, err := tx.PrepareContext(c, fmt.Sprintf(ReplaceEngineTemplate, cpdb.schema, CheckpointTableNameEngine))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer engineStmt.Close()
chunkStmt, err := tx.PrepareContext(c, fmt.Sprintf(ReplaceChunkTemplate, cpdb.schema, CheckpointTableNameChunk))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer chunkStmt.Close()
for engineID, engine := range checkpoints {
_, err = engineStmt.ExecContext(c, tableName, engineID, engine.Status)
if err != nil {
return errors.Trace(err)
}
for _, value := range engine.Chunks {
columnPerm, err := json.Marshal(value.ColumnPermutation)
if err != nil {
return errors.Trace(err)
}
_, err = chunkStmt.ExecContext(
c, tableName, engineID,
value.Key.Path, value.Key.Offset, value.FileMeta.Type, value.FileMeta.Compression,
value.FileMeta.SortKey, value.FileMeta.FileSize, columnPerm, value.Chunk.Offset, value.Chunk.EndOffset,
value.Chunk.PrevRowIDMax, value.Chunk.RowIDMax, value.Timestamp,
)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
})
if err != nil {
return errors.Trace(err)
}
return nil
}
func (cpdb *MySQLCheckpointsDB) Update(taskCtx context.Context, checkpointDiffs map[string]*TableCheckpointDiff) error {
chunkQuery := fmt.Sprintf(UpdateChunkTemplate, cpdb.schema, CheckpointTableNameChunk)
rebaseQuery := fmt.Sprintf(UpdateTableRebaseTemplate, cpdb.schema, CheckpointTableNameTable)
tableStatusQuery := fmt.Sprintf(UpdateTableStatusTemplate, cpdb.schema, CheckpointTableNameTable)
tableChecksumQuery := fmt.Sprintf(UpdateTableChecksumTemplate, cpdb.schema, CheckpointTableNameTable)
engineStatusQuery := fmt.Sprintf(UpdateEngineTemplate, cpdb.schema, CheckpointTableNameEngine)
s := common.SQLWithRetry{DB: cpdb.db, Logger: log.FromContext(taskCtx)}
return s.Transact(taskCtx, "update checkpoints", func(c context.Context, tx *sql.Tx) error {
chunkStmt, e := tx.PrepareContext(c, chunkQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer chunkStmt.Close()
rebaseStmt, e := tx.PrepareContext(c, rebaseQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer rebaseStmt.Close()
tableStatusStmt, e := tx.PrepareContext(c, tableStatusQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer tableStatusStmt.Close()
tableChecksumStmt, e := tx.PrepareContext(c, tableChecksumQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer tableChecksumStmt.Close()
engineStatusStmt, e := tx.PrepareContext(c, engineStatusQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer engineStatusStmt.Close()
for tableName, cpd := range checkpointDiffs {
if cpd.hasStatus {
if _, e := tableStatusStmt.ExecContext(c, cpd.status, tableName); e != nil {
return errors.Trace(e)
}
}
if cpd.hasRebase {
if _, e := rebaseStmt.ExecContext(c, cpd.allocBase, tableName); e != nil {
return errors.Trace(e)
}
}
if cpd.hasChecksum {
if _, e := tableChecksumStmt.ExecContext(c, cpd.checksum.SumSize(), cpd.checksum.SumKVS(), cpd.checksum.Sum(), tableName); e != nil {
return errors.Trace(e)
}
}
for engineID, engineDiff := range cpd.engines {
if engineDiff.hasStatus {
if _, e := engineStatusStmt.ExecContext(c, engineDiff.status, tableName, engineID); e != nil {
return errors.Trace(e)
}
}
for key, diff := range engineDiff.chunks {
columnPerm, err := json.Marshal(diff.columnPermutation)
if err != nil {
return errors.Trace(err)
}
if _, e := chunkStmt.ExecContext(
c,
diff.pos, diff.rowID, diff.checksum.SumSize(), diff.checksum.SumKVS(), diff.checksum.Sum(),
columnPerm, tableName, engineID, key.Path, key.Offset,
); e != nil {
return errors.Trace(e)
}
}
}
}
return nil
})
}
type FileCheckpointsDB struct {
lock sync.Mutex // we need to ensure only a thread can access to `checkpoints` at a time
checkpoints checkpointspb.CheckpointsModel
ctx context.Context
path string
fileName string
exStorage storage.ExternalStorage
}
func newFileCheckpointsDB(
ctx context.Context,
path string,
exStorage storage.ExternalStorage,
fileName string,
) (*FileCheckpointsDB, error) {
cpdb := &FileCheckpointsDB{
checkpoints: checkpointspb.CheckpointsModel{
TaskCheckpoint: &checkpointspb.TaskCheckpointModel{},
Checkpoints: map[string]*checkpointspb.TableCheckpointModel{},
},
ctx: ctx,
path: path,
fileName: fileName,
exStorage: exStorage,
}
if cpdb.fileName == "" {
return nil, errors.Errorf("the checkpoint DSN '%s' must not be a directory", path)
}
exist, err := cpdb.exStorage.FileExists(ctx, cpdb.fileName)
if err != nil {
return nil, errors.Trace(err)
}
if !exist {
log.FromContext(ctx).Info("open checkpoint file failed, going to create a new one",
zap.String("path", path),
log.ShortError(err),
)
return cpdb, nil
}
content, err := cpdb.exStorage.ReadFile(ctx, cpdb.fileName)
if err != nil {
return nil, errors.Trace(err)
}
err = cpdb.checkpoints.Unmarshal(content)
if err != nil {
log.FromContext(ctx).Error("checkpoint file is broken", zap.String("path", path), zap.Error(err))
}
// FIXME: patch for empty map may need initialize manually, because currently
// FIXME: a map of zero size -> marshall -> unmarshall -> become nil, see checkpoint_test.go
if cpdb.checkpoints.Checkpoints == nil {
cpdb.checkpoints.Checkpoints = map[string]*checkpointspb.TableCheckpointModel{}
}
for _, table := range cpdb.checkpoints.Checkpoints {
if table.Engines == nil {
table.Engines = map[int32]*checkpointspb.EngineCheckpointModel{}
}
for _, engine := range table.Engines {
if engine.Chunks == nil {
engine.Chunks = map[string]*checkpointspb.ChunkCheckpointModel{}
}
}
}
return cpdb, nil
}
func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) {
// init ExternalStorage
s, fileName, err := createExstorageByCompletePath(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
return newFileCheckpointsDB(ctx, path, s, fileName)
}
func NewFileCheckpointsDBWithExstorageFileName(
ctx context.Context,
path string,
s storage.ExternalStorage,
fileName string,
) (*FileCheckpointsDB, error) {
return newFileCheckpointsDB(ctx, path, s, fileName)
}
// createExstorageByCompletePath create ExternalStorage by completePath and return fileName.
func createExstorageByCompletePath(ctx context.Context, completePath string) (storage.ExternalStorage, string, error) {
if completePath == "" {
return nil, "", nil
}
fileName, newPath, err := separateCompletePath(completePath)
if err != nil {
return nil, "", errors.Trace(err)
}
u, err := storage.ParseBackend(newPath, nil)
if err != nil {
return nil, "", errors.Trace(err)
}
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return nil, "", errors.Trace(err)
}
return s, fileName, nil
}
// separateCompletePath separates fileName from completePath, returns fileName and newPath.
func separateCompletePath(completePath string) (string, string, error) {
if completePath == "" {
return "", "", nil
}
var fileName, newPath string
purl, err := storage.ParseRawURL(completePath)
if err != nil {
return "", "", errors.Trace(err)
}
// not url format, we don't use url library to avoid being escaped or unescaped
if purl.Scheme == "" {
// no fileName, just path
if strings.HasSuffix(completePath, "/") {
return "", completePath, nil
}
fileName = path.Base(completePath)
newPath = path.Dir(completePath)
} else {
if strings.HasSuffix(purl.Path, "/") {
return "", completePath, nil
}
fileName = path.Base(purl.Path)
purl.Path = path.Dir(purl.Path)
newPath = purl.String()
}
return fileName, newPath, nil
}
func (cpdb *FileCheckpointsDB) save() error {
serialized, err := cpdb.checkpoints.Marshal()
if err != nil {
return errors.Trace(err)
}
return cpdb.exStorage.WriteFile(cpdb.ctx, cpdb.fileName, serialized)
}
func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
cpdb.checkpoints.TaskCheckpoint = &checkpointspb.TaskCheckpointModel{
TaskId: cfg.TaskID,
SourceDir: cfg.Mydumper.SourceDir,
Backend: cfg.TikvImporter.Backend,
ImporterAddr: cfg.TikvImporter.Addr,
TidbHost: cfg.TiDB.Host,
TidbPort: int32(cfg.TiDB.Port),
PdAddr: cfg.TiDB.PdAddr,
SortedKvDir: cfg.TikvImporter.SortedKVDir,
LightningVer: build.ReleaseVersion,
}
if cpdb.checkpoints.Checkpoints == nil {
cpdb.checkpoints.Checkpoints = make(map[string]*checkpointspb.TableCheckpointModel)
}
for _, db := range dbInfo {
for _, table := range db.Tables {
tableName := common.UniqueTable(db.Name, table.Name)
if _, ok := cpdb.checkpoints.Checkpoints[tableName]; !ok {
cpdb.checkpoints.Checkpoints[tableName] = &checkpointspb.TableCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Engines: map[int32]*checkpointspb.EngineCheckpointModel{},
TableID: table.ID,
}
}
// TODO check if hash matches
}
}
return errors.Trace(cpdb.save())
}
func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error) {
// this method is always called in lock
cp := cpdb.checkpoints.TaskCheckpoint
if cp == nil || cp.TaskId == 0 {
return nil, nil
}
return &TaskCheckpoint{
TaskID: cp.TaskId,
SourceDir: cp.SourceDir,
Backend: cp.Backend,
ImporterAddr: cp.ImporterAddr,
TiDBHost: cp.TidbHost,
TiDBPort: int(cp.TidbPort),
PdAddr: cp.PdAddr,
SortedKVDir: cp.SortedKvDir,
LightningVer: cp.LightningVer,
}, nil
}
func (cpdb *FileCheckpointsDB) Close() error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
return errors.Trace(cpdb.save())
}
func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error) {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
tableModel, ok := cpdb.checkpoints.Checkpoints[tableName]
if !ok {
return nil, errors.NotFoundf("checkpoint for table %s", tableName)
}
cp := &TableCheckpoint{
Status: CheckpointStatus(tableModel.Status),
AllocBase: tableModel.AllocBase,
Engines: make(map[int32]*EngineCheckpoint, len(tableModel.Engines)),
TableID: tableModel.TableID,
Checksum: verify.MakeKVChecksum(tableModel.KvBytes, tableModel.KvKvs, tableModel.KvChecksum),
}
for engineID, engineModel := range tableModel.Engines {
engine := &EngineCheckpoint{
Status: CheckpointStatus(engineModel.Status),
Chunks: make([]*ChunkCheckpoint, 0, len(engineModel.Chunks)),
}
for _, chunkModel := range engineModel.Chunks {
colPerm := make([]int, 0, len(chunkModel.ColumnPermutation))
for _, c := range chunkModel.ColumnPermutation {
colPerm = append(colPerm, int(c))
}
engine.Chunks = append(engine.Chunks, &ChunkCheckpoint{
Key: ChunkCheckpointKey{
Path: chunkModel.Path,
Offset: chunkModel.Offset,
},
FileMeta: mydump.SourceFileMeta{
Path: chunkModel.Path,
Type: mydump.SourceType(chunkModel.Type),
Compression: mydump.Compression(chunkModel.Compression),
SortKey: chunkModel.SortKey,
FileSize: chunkModel.FileSize,
},
ColumnPermutation: colPerm,
Chunk: mydump.Chunk{
Offset: chunkModel.Pos,
EndOffset: chunkModel.EndOffset,
PrevRowIDMax: chunkModel.PrevRowidMax,
RowIDMax: chunkModel.RowidMax,
},
Checksum: verify.MakeKVChecksum(chunkModel.KvcBytes, chunkModel.KvcKvs, chunkModel.KvcChecksum),
Timestamp: chunkModel.Timestamp,
})
}
slices.SortFunc(engine.Chunks, func(i, j *ChunkCheckpoint) bool {
return i.Key.less(&j.Key)
})
cp.Engines[engineID] = engine
}
return cp, nil
}
func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
tableModel := cpdb.checkpoints.Checkpoints[tableName]
for engineID, engine := range checkpoints {
engineModel := &checkpointspb.EngineCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Chunks: make(map[string]*checkpointspb.ChunkCheckpointModel),
}
for _, value := range engine.Chunks {
key := value.Key.String()
chunk, ok := engineModel.Chunks[key]
if !ok {
chunk = &checkpointspb.ChunkCheckpointModel{
Path: value.Key.Path,
Offset: value.Key.Offset,
}
engineModel.Chunks[key] = chunk
}
chunk.Type = int32(value.FileMeta.Type)
chunk.Compression = int32(value.FileMeta.Compression)
chunk.SortKey = value.FileMeta.SortKey
chunk.FileSize = value.FileMeta.FileSize
chunk.Pos = value.Chunk.Offset
chunk.EndOffset = value.Chunk.EndOffset
chunk.PrevRowidMax = value.Chunk.PrevRowIDMax
chunk.RowidMax = value.Chunk.RowIDMax
chunk.Timestamp = value.Timestamp
if len(value.ColumnPermutation) > 0 {
chunk.ColumnPermutation = intSlice2Int32Slice(value.ColumnPermutation)
}
}
tableModel.Engines[engineID] = engineModel
}
return errors.Trace(cpdb.save())
}
func (cpdb *FileCheckpointsDB) Update(_ context.Context, checkpointDiffs map[string]*TableCheckpointDiff) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
for tableName, cpd := range checkpointDiffs {
tableModel := cpdb.checkpoints.Checkpoints[tableName]
if cpd.hasStatus {
tableModel.Status = uint32(cpd.status)
}
if cpd.hasRebase {
tableModel.AllocBase = cpd.allocBase
}
if cpd.hasChecksum {
tableModel.KvBytes = cpd.checksum.SumSize()
tableModel.KvKvs = cpd.checksum.SumKVS()
tableModel.KvChecksum = cpd.checksum.Sum()
}
for engineID, engineDiff := range cpd.engines {
engineModel := tableModel.Engines[engineID]
if engineDiff.hasStatus {
engineModel.Status = uint32(engineDiff.status)
}
for key, diff := range engineDiff.chunks {
chunkModel := engineModel.Chunks[key.String()]
chunkModel.Pos = diff.pos
chunkModel.PrevRowidMax = diff.rowID
chunkModel.KvcBytes = diff.checksum.SumSize()
chunkModel.KvcKvs = diff.checksum.SumKVS()
chunkModel.KvcChecksum = diff.checksum.Sum()
chunkModel.ColumnPermutation = intSlice2Int32Slice(diff.columnPermutation)
}
}
}
return cpdb.save()
}
// Management functions ----------------------------------------------------------------------------
var errCannotManageNullDB = errors.New("cannot perform this function while checkpoints is disabled")
func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error {
return errors.Trace(errCannotManageNullDB)
}
func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error {
return errors.Trace(errCannotManageNullDB)
}
func (*NullCheckpointsDB) GetLocalStoringTables(context.Context) (map[string][]int32, error) {
return nil, nil
}
func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error {
return errors.Trace(errCannotManageNullDB)
}
func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error) {
return nil, errors.Trace(errCannotManageNullDB)
}
func (*NullCheckpointsDB) DumpTables(context.Context, io.Writer) error {
return errors.Trace(errCannotManageNullDB)
}
func (*NullCheckpointsDB) DumpEngines(context.Context, io.Writer) error {
return errors.Trace(errCannotManageNullDB)
}
func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error {
return errors.Trace(errCannotManageNullDB)
}
func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error {
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.FromContext(ctx).With(zap.String("table", tableName)),
}
if tableName == allTables {
return s.Exec(ctx, "remove all checkpoints", "DROP SCHEMA "+cpdb.schema)
}
deleteChunkQuery := fmt.Sprintf(DeleteCheckpointRecordTemplate, cpdb.schema, CheckpointTableNameChunk)
deleteEngineQuery := fmt.Sprintf(DeleteCheckpointRecordTemplate, cpdb.schema, CheckpointTableNameEngine)
deleteTableQuery := fmt.Sprintf(DeleteCheckpointRecordTemplate, cpdb.schema, CheckpointTableNameTable)
return s.Transact(ctx, "remove checkpoints", func(c context.Context, tx *sql.Tx) error {
if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteEngineQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteTableQuery, tableName); e != nil {
return errors.Trace(e)
}
return nil
})
}
func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error {
// The "cpdb.schema" is an escaped schema name of the form "`foo`".
// We use "x[1:len(x)-1]" instead of unescaping it to keep the
// double-backquotes (if any) intact.
newSchema := fmt.Sprintf("`%s.%d.bak`", cpdb.schema[1:len(cpdb.schema)-1], taskID)
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.FromContext(ctx).With(zap.Int64("taskID", taskID)),
}
createSchemaQuery := "CREATE SCHEMA IF NOT EXISTS " + newSchema
if e := s.Exec(ctx, "create backup checkpoints schema", createSchemaQuery); e != nil {
return e
}
for _, tbl := range []string{
CheckpointTableNameChunk, CheckpointTableNameEngine,
CheckpointTableNameTable, CheckpointTableNameTask,
} {
query := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, tbl)
if e := s.Exec(ctx, fmt.Sprintf("move %s checkpoints table", tbl), query); e != nil {
return e
}
}
return nil
}
func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error) {
var targetTables map[string][]int32
// lightning didn't check CheckpointStatusMaxInvalid before this function is called, so we skip invalid ones
// engines should exist if
// 1. table status is earlier than CheckpointStatusIndexImported, and
// 2. engine status is earlier than CheckpointStatusImported, and
// 3. chunk has been read
query := fmt.Sprintf(`
SELECT DISTINCT t.table_name, c.engine_id
FROM %s.%s t, %s.%s c, %s.%s e
WHERE t.table_name = c.table_name AND t.table_name = e.table_name AND c.engine_id = e.engine_id
AND %d < t.status AND t.status < %d
AND %d < e.status AND e.status < %d
AND c.pos > c.offset;`,
cpdb.schema, CheckpointTableNameTable, cpdb.schema, CheckpointTableNameChunk, cpdb.schema, CheckpointTableNameEngine,
CheckpointStatusMaxInvalid, CheckpointStatusIndexImported,
CheckpointStatusMaxInvalid, CheckpointStatusImported)
err := common.Retry("get local storing tables", log.FromContext(ctx), func() error {
targetTables = make(map[string][]int32)
rows, err := cpdb.db.QueryContext(ctx, query) // #nosec G201
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
for rows.Next() {
var (
tableName string
engineID int32
)
if err := rows.Scan(&tableName, &engineID); err != nil {
return errors.Trace(err)
}
targetTables[tableName] = append(targetTables[tableName], engineID)
}
if err := rows.Err(); err != nil {
return errors.Trace(err)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return targetTables, err
}
func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error {
var colName string
if tableName == allTables {
// This will expand to `WHERE 'all' = 'all'` and effectively allowing
// all tables to be included.
colName = stringLitAll
} else {
colName = columnTableName
}
// nolint:gosec
engineQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d;
`, cpdb.schema, CheckpointTableNameEngine, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid)
// nolint:gosec
tableQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d;
`, cpdb.schema, CheckpointTableNameTable, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid)
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.FromContext(ctx).With(zap.String("table", tableName)),
}
err := s.Transact(ctx, "ignore error checkpoints", func(c context.Context, tx *sql.Tx) error {
if _, e := tx.ExecContext(c, engineQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, tableQuery, tableName); e != nil {
return errors.Trace(e)
}
return nil
})
return errors.Trace(err)
}
func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) {
var colName, aliasedColName string
if tableName == allTables {
// These will expand to `WHERE 'all' = 'all'` and effectively allowing
// all tables to be included.
colName = stringLitAll
aliasedColName = stringLitAll
} else {
colName = columnTableName
aliasedColName = "t.table_name"
}
selectQuery := fmt.Sprintf(`
SELECT
t.table_name,
COALESCE(MIN(e.engine_id), 0),
COALESCE(MAX(e.engine_id), -1)
FROM %[1]s.%[4]s t
LEFT JOIN %[1]s.%[5]s e ON t.table_name = e.table_name
WHERE %[2]s = ? AND t.status <= %[3]d
GROUP BY t.table_name;
`, cpdb.schema, aliasedColName, CheckpointStatusMaxInvalid, CheckpointTableNameTable, CheckpointTableNameEngine)
// nolint:gosec
deleteChunkQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameChunk, CheckpointTableNameTable)
// nolint:gosec
deleteEngineQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameEngine, CheckpointTableNameTable)
// nolint:gosec
deleteTableQuery := fmt.Sprintf(`
DELETE FROM %s.%s WHERE %s = ? AND status <= %d
`, cpdb.schema, CheckpointTableNameTable, colName, CheckpointStatusMaxInvalid)
var targetTables []DestroyedTableCheckpoint
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.FromContext(ctx).With(zap.String("table", tableName)),
}
err := s.Transact(ctx, "destroy error checkpoints", func(c context.Context, tx *sql.Tx) error {
// Obtain the list of tables
targetTables = nil
rows, e := tx.QueryContext(c, selectQuery, tableName) // #nosec G201
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer rows.Close()
for rows.Next() {
var dtc DestroyedTableCheckpoint
if e := rows.Scan(&dtc.TableName, &dtc.MinEngineID, &dtc.MaxEngineID); e != nil {
return errors.Trace(e)
}
targetTables = append(targetTables, dtc)
}
if e := rows.Err(); e != nil {
return errors.Trace(e)
}
// Delete the checkpoints
if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteEngineQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteTableQuery, tableName); e != nil {
return errors.Trace(e)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return targetTables, nil
}
//nolint:rowserrcheck // sqltocsv.Write will check this.
func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error {
//nolint: rowserrcheck
rows, err := cpdb.db.QueryContext(ctx, fmt.Sprintf(`
SELECT
task_id,
table_name,
hex(hash) AS hash,
status,
alloc_base,
create_time,
update_time
FROM %s.%s;
`, cpdb.schema, CheckpointTableNameTable))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
return errors.Trace(sqltocsv.Write(writer, rows))
}
//nolint:rowserrcheck // sqltocsv.Write will check this.
func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error {
//nolint: rowserrcheck
rows, err := cpdb.db.QueryContext(ctx, fmt.Sprintf(`
SELECT
table_name,
engine_id,
status,
create_time,
update_time
FROM %s.%s;
`, cpdb.schema, CheckpointTableNameEngine))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
return errors.Trace(sqltocsv.Write(writer, rows))
}
//nolint:rowserrcheck // sqltocsv.Write will check this.
func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error {
//nolint: rowserrcheck
rows, err := cpdb.db.QueryContext(ctx, fmt.Sprintf(`
SELECT
table_name,
path,
offset,
type,
compression,
sort_key,
file_size,
columns,
pos,
end_offset,
prev_rowid_max,
rowid_max,
kvc_bytes,
kvc_kvs,
kvc_checksum,
create_time,
update_time
FROM %s.%s;
`, cpdb.schema, CheckpointTableNameChunk))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
return errors.Trace(sqltocsv.Write(writer, rows))
}
func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
if tableName == allTables {
cpdb.checkpoints.Reset()
return errors.Trace(cpdb.exStorage.DeleteFile(cpdb.ctx, cpdb.fileName))
}
delete(cpdb.checkpoints.Checkpoints, tableName)
return errors.Trace(cpdb.save())
}
func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
newFileName := fmt.Sprintf("%s.%d.bak", cpdb.fileName, taskID)
return cpdb.exStorage.Rename(cpdb.ctx, cpdb.fileName, newFileName)
}
func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error) {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
targetTables := make(map[string][]int32)
for tableName, tableModel := range cpdb.checkpoints.Checkpoints {
if tableModel.Status <= uint32(CheckpointStatusMaxInvalid) ||
tableModel.Status >= uint32(CheckpointStatusIndexImported) {
continue
}
for engineID, engineModel := range tableModel.Engines {
if engineModel.Status <= uint32(CheckpointStatusMaxInvalid) ||
engineModel.Status >= uint32(CheckpointStatusImported) {
continue
}
for _, chunkModel := range engineModel.Chunks {
if chunkModel.Pos > chunkModel.Offset {
targetTables[tableName] = append(targetTables[tableName], engineID)
break
}
}
}
}
return targetTables, nil
}
func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
for tableName, tableModel := range cpdb.checkpoints.Checkpoints {
if !(targetTableName == allTables || targetTableName == tableName) {
continue
}
if tableModel.Status <= uint32(CheckpointStatusMaxInvalid) {
tableModel.Status = uint32(CheckpointStatusLoaded)
}
for _, engineModel := range tableModel.Engines {
if engineModel.Status <= uint32(CheckpointStatusMaxInvalid) {
engineModel.Status = uint32(CheckpointStatusLoaded)
}
}
}
return errors.Trace(cpdb.save())
}
func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error) {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
var targetTables []DestroyedTableCheckpoint
for tableName, tableModel := range cpdb.checkpoints.Checkpoints {
// Obtain the list of tables
if !(targetTableName == allTables || targetTableName == tableName) {
continue
}
if tableModel.Status <= uint32(CheckpointStatusMaxInvalid) {
var minEngineID, maxEngineID int32 = math.MaxInt32, math.MinInt32
for engineID := range tableModel.Engines {
if engineID < minEngineID {
minEngineID = engineID
}
if engineID > maxEngineID {
maxEngineID = engineID
}
}
targetTables = append(targetTables, DestroyedTableCheckpoint{
TableName: tableName,
MinEngineID: minEngineID,
MaxEngineID: maxEngineID,
})
}
}
// Delete the checkpoints
for _, dtcp := range targetTables {
delete(cpdb.checkpoints.Checkpoints, dtcp.TableName)
}
if err := cpdb.save(); err != nil {
return nil, errors.Trace(err)
}
return targetTables, nil
}
func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error {
return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path)
}
func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error {
return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path)
}
func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error {
return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path)
}
func intSlice2Int32Slice(s []int) []int32 {
res := make([]int32, 0, len(s))
for _, i := range s {
res = append(res, int32(i))
}
return res
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦