tidb restore 源码
tidb restore 代码
文件路径:/br/pkg/lightning/restore/restore.go
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package restore
import (
"context"
"database/sql"
"fmt"
"io"
"math"
"os"
"strings"
"sync"
"time"
"github.com/coreos/go-semver/semver"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/backend/tidb"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/br/pkg/lightning/glue"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/tikv"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/lightning/web"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/br/pkg/version/build"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/mathutil"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
)
const (
FullLevelCompact = -1
Level1Compact = 1
)
const (
defaultGCLifeTime = 100 * time.Hour
)
const (
indexEngineID = -1
)
const (
compactStateIdle int32 = iota
compactStateDoing
)
const (
TaskMetaTableName = "task_meta"
TableMetaTableName = "table_meta"
// CreateTableMetadataTable stores the per-table sub jobs information used by TiDB Lightning
CreateTableMetadataTable = `CREATE TABLE IF NOT EXISTS %s (
task_id BIGINT(20) UNSIGNED,
table_id BIGINT(64) NOT NULL,
table_name VARCHAR(64) NOT NULL,
row_id_base BIGINT(20) NOT NULL DEFAULT 0,
row_id_max BIGINT(20) NOT NULL DEFAULT 0,
total_kvs_base BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
total_bytes_base BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
checksum_base BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
total_kvs BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
total_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
checksum BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
status VARCHAR(32) NOT NULL,
has_duplicates BOOL NOT NULL DEFAULT 0,
PRIMARY KEY (table_id, task_id)
);`
// CreateTaskMetaTable stores the pre-lightning metadata used by TiDB Lightning
CreateTaskMetaTable = `CREATE TABLE IF NOT EXISTS %s (
task_id BIGINT(20) UNSIGNED NOT NULL,
pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '',
status VARCHAR(32) NOT NULL,
state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish',
source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
cluster_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
PRIMARY KEY (task_id)
);`
compactionLowerThreshold = 512 * units.MiB
compactionUpperThreshold = 32 * units.GiB
)
var (
minTiKVVersionForDuplicateResolution = *semver.New("5.2.0")
maxTiKVVersionForDuplicateResolution = version.NextMajorVersion()
)
// DeliverPauser is a shared pauser to pause progress to (*chunkRestore).encodeLoop
var DeliverPauser = common.NewPauser()
// nolint:gochecknoinits // TODO: refactor
func init() {
failpoint.Inject("SetMinDeliverBytes", func(v failpoint.Value) {
minDeliverBytes = uint64(v.(int))
})
}
type saveCp struct {
tableName string
merger checkpoints.TableCheckpointMerger
waitCh chan<- error
}
type errorSummary struct {
status checkpoints.CheckpointStatus
err error
}
type errorSummaries struct {
sync.Mutex
logger log.Logger
summary map[string]errorSummary
}
// makeErrorSummaries returns an initialized errorSummaries instance
func makeErrorSummaries(logger log.Logger) errorSummaries {
return errorSummaries{
logger: logger,
summary: make(map[string]errorSummary),
}
}
func (es *errorSummaries) emitLog() {
es.Lock()
defer es.Unlock()
if errorCount := len(es.summary); errorCount > 0 {
logger := es.logger
logger.Error("tables failed to be imported", zap.Int("count", errorCount))
for tableName, errorSummary := range es.summary {
logger.Error("-",
zap.String("table", tableName),
zap.String("status", errorSummary.status.MetricName()),
log.ShortError(errorSummary.err),
)
}
}
}
func (es *errorSummaries) record(tableName string, err error, status checkpoints.CheckpointStatus) {
es.Lock()
defer es.Unlock()
es.summary[tableName] = errorSummary{status: status, err: err}
}
const (
diskQuotaStateIdle int32 = iota
diskQuotaStateChecking
diskQuotaStateImporting
)
type Controller struct {
taskCtx context.Context
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
dbInfos map[string]*checkpoints.TidbDBInfo
tableWorkers *worker.Pool
indexWorkers *worker.Pool
regionWorkers *worker.Pool
ioWorkers *worker.Pool
checksumWorks *worker.Pool
pauser *common.Pauser
backend backend.Backend
tidbGlue glue.Glue
alterTableLock sync.Mutex
sysVars map[string]string
tls *common.TLS
checkTemplate Template
errorSummaries errorSummaries
checkpointsDB checkpoints.DB
saveCpCh chan saveCp
checkpointsWg sync.WaitGroup
closedEngineLimit *worker.Pool
store storage.ExternalStorage
ownStore bool
metaMgrBuilder metaMgrBuilder
errorMgr *errormanager.ErrorManager
taskMgr taskMetaMgr
diskQuotaLock sync.RWMutex
diskQuotaState atomic.Int32
compactState atomic.Int32
status *LightningStatus
preInfoGetter PreRestoreInfoGetter
precheckItemBuilder *PrecheckItemBuilder
}
type LightningStatus struct {
FinishedFileSize atomic.Int64
TotalFileSize atomic.Int64
}
// ControllerParam contains many parameters for creating a Controller.
type ControllerParam struct {
// databases that dumper created
DBMetas []*mydump.MDDatabaseMeta
// a pointer to status to report it to caller
Status *LightningStatus
// storage interface to read the dump data
DumpFileStorage storage.ExternalStorage
// true if DumpFileStorage is created by lightning. In some cases where lightning is a library, the framework may pass an DumpFileStorage
OwnExtStorage bool
// used by lightning server mode to pause tasks
Pauser *common.Pauser
// lightning via SQL will implement its glue, to let lightning use host TiDB's environment
Glue glue.Glue
// storage interface to write file checkpoints
CheckpointStorage storage.ExternalStorage
// when CheckpointStorage is not nil, save file checkpoint to it with this name
CheckpointName string
}
func NewRestoreController(
ctx context.Context,
cfg *config.Config,
param *ControllerParam,
) (*Controller, error) {
param.Pauser = DeliverPauser
return NewRestoreControllerWithPauser(ctx, cfg, param)
}
func NewRestoreControllerWithPauser(
ctx context.Context,
cfg *config.Config,
p *ControllerParam,
) (*Controller, error) {
tls, err := cfg.ToTLS()
if err != nil {
return nil, err
}
var cpdb checkpoints.DB
// if CheckpointStorage is set, we should use given ExternalStorage to create checkpoints.
if p.CheckpointStorage != nil {
cpdb, err = checkpoints.NewFileCheckpointsDBWithExstorageFileName(ctx, p.CheckpointStorage.URI(), p.CheckpointStorage, p.CheckpointName)
if err != nil {
return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs()
}
} else {
cpdb, err = p.Glue.OpenCheckpointsDB(ctx, cfg)
if err != nil {
if berrors.Is(err, common.ErrUnknownCheckpointDriver) {
return nil, err
}
return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs()
}
}
taskCp, err := cpdb.TaskCheckpoint(ctx)
if err != nil {
return nil, common.ErrReadCheckpoint.Wrap(err).GenWithStack("get task checkpoint failed")
}
if err := verifyCheckpoint(cfg, taskCp); err != nil {
return nil, errors.Trace(err)
}
// reuse task id to reuse task meta correctly.
if taskCp != nil {
cfg.TaskID = taskCp.TaskID
}
// TODO: support Lightning via SQL
db, err := p.Glue.GetDB()
if err != nil {
return nil, errors.Trace(err)
}
errorMgr := errormanager.New(db, cfg, log.FromContext(ctx))
if err := errorMgr.Init(ctx); err != nil {
return nil, common.ErrInitErrManager.Wrap(err).GenWithStackByArgs()
}
var backend backend.Backend
switch cfg.TikvImporter.Backend {
case config.BackendTiDB:
backend = tidb.NewTiDBBackend(ctx, db, cfg.TikvImporter.OnDuplicate, errorMgr)
case config.BackendLocal:
var rLimit local.Rlim_t
rLimit, err = local.GetSystemRLimit()
if err != nil {
return nil, err
}
maxOpenFiles := int(rLimit / local.Rlim_t(cfg.App.TableConcurrency))
// check overflow
if maxOpenFiles < 0 {
maxOpenFiles = math.MaxInt32
}
if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
if err := tikv.CheckTiKVVersion(ctx, tls, cfg.TiDB.PdAddr, minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
if berrors.Is(err, berrors.ErrVersionMismatch) {
log.FromContext(ctx).Warn("TiKV version doesn't support duplicate resolution. The resolution algorithm will fall back to 'none'", zap.Error(err))
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
} else {
return nil, common.ErrCheckKVVersion.Wrap(err).GenWithStackByArgs()
}
}
}
backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr)
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
}
err = verifyLocalFile(ctx, cpdb, cfg.TikvImporter.SortedKVDir)
if err != nil {
return nil, err
}
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
var metaBuilder metaMgrBuilder
isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal
switch {
case isSSTImport && cfg.TikvImporter.IncrementalImport:
metaBuilder = &dbMetaMgrBuilder{
db: db,
taskID: cfg.TaskID,
schema: cfg.App.MetaSchemaName,
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{
taskID: cfg.TaskID,
}
default:
metaBuilder = noopMetaMgrBuilder{}
}
ioWorkers := worker.NewPool(ctx, cfg.App.IOConcurrency, "io")
targetInfoGetter := &TargetInfoGetterImpl{
cfg: cfg,
targetDBGlue: p.Glue,
tls: tls,
backend: backend,
}
preInfoGetter, err := NewPreRestoreInfoGetter(
cfg,
p.DBMetas,
p.DumpFileStorage,
targetInfoGetter,
ioWorkers,
backend,
)
if err != nil {
return nil, errors.Trace(err)
}
preCheckBuilder := NewPrecheckItemBuilder(
cfg, p.DBMetas, preInfoGetter, cpdb,
)
rc := &Controller{
taskCtx: ctx,
cfg: cfg,
dbMetas: p.DBMetas,
tableWorkers: nil,
indexWorkers: nil,
regionWorkers: worker.NewPool(ctx, cfg.App.RegionConcurrency, "region"),
ioWorkers: ioWorkers,
checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"),
pauser: p.Pauser,
backend: backend,
tidbGlue: p.Glue,
sysVars: defaultImportantVariables,
tls: tls,
checkTemplate: NewSimpleTemplate(),
errorSummaries: makeErrorSummaries(log.FromContext(ctx)),
checkpointsDB: cpdb,
saveCpCh: make(chan saveCp),
closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"),
store: p.DumpFileStorage,
ownStore: p.OwnExtStorage,
metaMgrBuilder: metaBuilder,
errorMgr: errorMgr,
status: p.Status,
taskMgr: nil,
preInfoGetter: preInfoGetter,
precheckItemBuilder: preCheckBuilder,
}
return rc, nil
}
func (rc *Controller) Close() {
rc.backend.Close()
rc.tidbGlue.GetSQLExecutor().Close()
}
func (rc *Controller) Run(ctx context.Context) error {
opts := []func(context.Context) error{
rc.setGlobalVariables,
rc.restoreSchema,
rc.preCheckRequirements,
rc.initCheckpoint,
rc.restoreTables,
rc.fullCompact,
rc.cleanCheckpoints,
}
task := log.FromContext(ctx).Begin(zap.InfoLevel, "the whole procedure")
var err error
finished := false
outside:
for i, process := range opts {
err = process(ctx)
if i == len(opts)-1 {
finished = true
}
logger := task.With(zap.Int("step", i), log.ShortError(err))
switch {
case err == nil:
case log.IsContextCanceledError(err):
logger.Info("task canceled")
break outside
default:
logger.Error("run failed")
break outside // ps : not continue
}
}
// if process is cancelled, should make sure checkpoints are written to db.
if !finished {
rc.waitCheckpointFinish()
}
task.End(zap.ErrorLevel, err)
rc.errorMgr.LogErrorDetails()
rc.errorSummaries.emitLog()
return errors.Trace(err)
}
type schemaStmtType int
func (stmtType schemaStmtType) String() string {
switch stmtType {
case schemaCreateDatabase:
return "restore database schema"
case schemaCreateTable:
return "restore table schema"
case schemaCreateView:
return "restore view schema"
}
return "unknown statement of schema"
}
const (
schemaCreateDatabase schemaStmtType = iota
schemaCreateTable
schemaCreateView
)
type schemaJob struct {
dbName string
tblName string // empty for create db jobs
stmtType schemaStmtType
stmts []string
}
type restoreSchemaWorker struct {
ctx context.Context
quit context.CancelFunc
logger log.Logger
jobCh chan *schemaJob
errCh chan error
wg sync.WaitGroup
glue glue.Glue
store storage.ExternalStorage
}
func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error {
stmts, err := createIfNotExistsStmt(worker.glue.GetParser(), sqlStr, job.dbName, job.tblName)
if err != nil {
return err
}
job.stmts = stmts
return worker.appendJob(job)
}
func (worker *restoreSchemaWorker) makeJobs(
dbMetas []*mydump.MDDatabaseMeta,
getTables func(context.Context, string) ([]*model.TableInfo, error),
) error {
defer func() {
close(worker.jobCh)
worker.quit()
}()
var err error
// 1. restore databases, execute statements concurrency
for _, dbMeta := range dbMetas {
sql := dbMeta.GetSchema(worker.ctx, worker.store)
err = worker.addJob(sql, &schemaJob{
dbName: dbMeta.Name,
tblName: "",
stmtType: schemaCreateDatabase,
})
if err != nil {
return err
}
}
err = worker.wait()
if err != nil {
return err
}
// 2. restore tables, execute statements concurrency
for _, dbMeta := range dbMetas {
// we can ignore error here, and let check failed later if schema not match
tables, _ := getTables(worker.ctx, dbMeta.Name)
tableMap := make(map[string]struct{})
for _, t := range tables {
tableMap[t.Name.L] = struct{}{}
}
for _, tblMeta := range dbMeta.Tables {
if _, ok := tableMap[strings.ToLower(tblMeta.Name)]; ok {
// we already has this table in TiDB.
// we should skip ddl job and let SchemaValid check.
continue
} else if tblMeta.SchemaFile.FileMeta.Path == "" {
return common.ErrSchemaNotExists.GenWithStackByArgs(dbMeta.Name, tblMeta.Name)
}
sql, err := tblMeta.GetSchema(worker.ctx, worker.store)
if err != nil {
return err
}
if sql != "" {
err = worker.addJob(sql, &schemaJob{
dbName: dbMeta.Name,
tblName: tblMeta.Name,
stmtType: schemaCreateTable,
})
if err != nil {
return err
}
}
}
}
err = worker.wait()
if err != nil {
return err
}
// 3. restore views. Since views can cross database we must restore views after all table schemas are restored.
for _, dbMeta := range dbMetas {
for _, viewMeta := range dbMeta.Views {
sql, err := viewMeta.GetSchema(worker.ctx, worker.store)
if sql != "" {
err = worker.addJob(sql, &schemaJob{
dbName: dbMeta.Name,
tblName: viewMeta.Name,
stmtType: schemaCreateView,
})
if err != nil {
return err
}
// we don't support restore views concurrency, cauz it maybe will raise a error
err = worker.wait()
if err != nil {
return err
}
}
if err != nil {
return err
}
}
}
return nil
}
func (worker *restoreSchemaWorker) doJob() {
var session *sql.Conn
defer func() {
if session != nil {
_ = session.Close()
}
}()
loop:
for {
select {
case <-worker.ctx.Done():
// don't `return` or throw `worker.ctx.Err()`here,
// if we `return`, we can't mark cancelled jobs as done,
// if we `throw(worker.ctx.Err())`, it will be blocked to death
break loop
case job := <-worker.jobCh:
if job == nil {
// successful exit
return
}
var err error
if session == nil {
session, err = func() (*sql.Conn, error) {
// TODO: support lightning in SQL
db, err := worker.glue.GetDB()
if err != nil {
return nil, errors.Trace(err)
}
return db.Conn(worker.ctx)
}()
if err != nil {
worker.wg.Done()
worker.throw(err)
// don't return
break loop
}
}
logger := worker.logger.With(zap.String("db", job.dbName), zap.String("table", job.tblName))
sqlWithRetry := common.SQLWithRetry{
Logger: worker.logger,
DB: session,
}
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt))
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt)
task.End(zap.ErrorLevel, err)
if err != nil {
err = common.ErrCreateSchema.Wrap(err).GenWithStackByArgs(common.UniqueTable(job.dbName, job.tblName), job.stmtType.String())
worker.wg.Done()
worker.throw(err)
// don't return
break loop
}
}
worker.wg.Done()
}
}
// mark the cancelled job as `Done`, a little tricky,
// cauz we need make sure `worker.wg.Wait()` wouldn't blocked forever
for range worker.jobCh {
worker.wg.Done()
}
}
func (worker *restoreSchemaWorker) wait() error {
// avoid to `worker.wg.Wait()` blocked forever when all `doJob`'s goroutine exited.
// don't worry about goroutine below, it never become a zombie,
// cauz we have mechanism to clean cancelled jobs in `worker.jobCh`.
// means whole jobs has been send to `worker.jobCh` would be done.
waitCh := make(chan struct{})
go func() {
worker.wg.Wait()
close(waitCh)
}()
select {
case err := <-worker.errCh:
return err
case <-worker.ctx.Done():
return worker.ctx.Err()
case <-waitCh:
return nil
}
}
func (worker *restoreSchemaWorker) throw(err error) {
select {
case <-worker.ctx.Done():
// don't throw `worker.ctx.Err()` again, it will be blocked to death.
return
case worker.errCh <- err:
worker.quit()
}
}
func (worker *restoreSchemaWorker) appendJob(job *schemaJob) error {
worker.wg.Add(1)
select {
case err := <-worker.errCh:
// cancel the job
worker.wg.Done()
return err
case <-worker.ctx.Done():
// cancel the job
worker.wg.Done()
return errors.Trace(worker.ctx.Err())
case worker.jobCh <- job:
return nil
}
}
func (rc *Controller) restoreSchema(ctx context.Context) error {
// create table with schema file
// we can handle the duplicated created with createIfNotExist statement
// and we will check the schema in TiDB is valid with the datafile in DataCheck later.
logTask := log.FromContext(ctx).Begin(zap.InfoLevel, "restore all schema")
concurrency := mathutil.Min(rc.cfg.App.RegionConcurrency, 8)
childCtx, cancel := context.WithCancel(ctx)
worker := restoreSchemaWorker{
ctx: childCtx,
quit: cancel,
logger: log.FromContext(ctx),
jobCh: make(chan *schemaJob, concurrency),
errCh: make(chan error),
glue: rc.tidbGlue,
store: rc.store,
}
for i := 0; i < concurrency; i++ {
go worker.doJob()
}
err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteTableModels)
logTask.End(zap.ErrorLevel, err)
if err != nil {
return err
}
dbInfos, err := rc.preInfoGetter.GetAllTableStructures(ctx)
if err != nil {
return errors.Trace(err)
}
// For local backend, we need DBInfo.ID to operate the global autoid allocator.
if isLocalBackend(rc.cfg) {
dbs, err := tikv.FetchRemoteDBModelsFromTLS(ctx, rc.tls)
if err != nil {
return errors.Trace(err)
}
dbIDs := make(map[string]int64)
for _, db := range dbs {
dbIDs[db.Name.L] = db.ID
}
for _, dbInfo := range dbInfos {
dbInfo.ID = dbIDs[strings.ToLower(dbInfo.Name)]
}
}
rc.dbInfos = dbInfos
rc.sysVars = rc.preInfoGetter.GetTargetSysVariablesForImport(ctx)
return nil
}
// initCheckpoint initializes all tables' checkpoint data
func (rc *Controller) initCheckpoint(ctx context.Context) error {
// Load new checkpoints
err := rc.checkpointsDB.Initialize(ctx, rc.cfg, rc.dbInfos)
if err != nil {
return common.ErrInitCheckpoint.Wrap(err).GenWithStackByArgs()
}
failpoint.Inject("InitializeCheckpointExit", func() {
log.FromContext(ctx).Warn("exit triggered", zap.String("failpoint", "InitializeCheckpointExit"))
os.Exit(0)
})
rc.checkpointsWg.Add(1) // checkpointsWg will be done in `rc.listenCheckpointUpdates`
go rc.listenCheckpointUpdates(log.FromContext(ctx))
// Estimate the number of chunks for progress reporting
return rc.estimateChunkCountIntoMetrics(ctx)
}
// verifyCheckpoint check whether previous task checkpoint is compatible with task config
func verifyCheckpoint(cfg *config.Config, taskCp *checkpoints.TaskCheckpoint) error {
if taskCp == nil {
return nil
}
// always check the backend value even with 'check-requirements = false'
retryUsage := "destroy all checkpoints"
if cfg.Checkpoint.Driver == config.CheckpointDriverFile {
retryUsage = fmt.Sprintf("delete the file '%s'", cfg.Checkpoint.DSN)
}
retryUsage += " and remove all restored tables and try again"
if cfg.TikvImporter.Backend != taskCp.Backend {
return common.ErrInvalidCheckpoint.GenWithStack("config 'tikv-importer.backend' value '%s' different from checkpoint value '%s', please %s", cfg.TikvImporter.Backend, taskCp.Backend, retryUsage)
}
if cfg.App.CheckRequirements {
if build.ReleaseVersion != taskCp.LightningVer {
var displayVer string
if len(taskCp.LightningVer) != 0 {
displayVer = fmt.Sprintf("at '%s'", taskCp.LightningVer)
} else {
displayVer = "before v4.0.6/v3.0.19"
}
return common.ErrInvalidCheckpoint.GenWithStack("lightning version is '%s', but checkpoint was created %s, please %s", build.ReleaseVersion, displayVer, retryUsage)
}
errorFmt := "config '%s' value '%s' different from checkpoint value '%s'. You may set 'check-requirements = false' to skip this check or " + retryUsage
if cfg.Mydumper.SourceDir != taskCp.SourceDir {
return common.ErrInvalidCheckpoint.GenWithStack(errorFmt, "mydumper.data-source-dir", cfg.Mydumper.SourceDir, taskCp.SourceDir)
}
if cfg.TikvImporter.Backend == config.BackendLocal && cfg.TikvImporter.SortedKVDir != taskCp.SortedKVDir {
return common.ErrInvalidCheckpoint.GenWithStack(errorFmt, "mydumper.sorted-kv-dir", cfg.TikvImporter.SortedKVDir, taskCp.SortedKVDir)
}
if cfg.TiDB.Host != taskCp.TiDBHost {
return common.ErrInvalidCheckpoint.GenWithStack(errorFmt, "tidb.host", cfg.TiDB.Host, taskCp.TiDBHost)
}
if cfg.TiDB.Port != taskCp.TiDBPort {
return common.ErrInvalidCheckpoint.GenWithStack(errorFmt, "tidb.port", cfg.TiDB.Port, taskCp.TiDBPort)
}
if cfg.TiDB.PdAddr != taskCp.PdAddr {
return common.ErrInvalidCheckpoint.GenWithStack(errorFmt, "tidb.pd-addr", cfg.TiDB.PdAddr, taskCp.PdAddr)
}
}
return nil
}
// for local backend, we should check if local SST exists in disk, otherwise we'll lost data
func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error {
targetTables, err := cpdb.GetLocalStoringTables(ctx)
if err != nil {
return errors.Trace(err)
}
for tableName, engineIDs := range targetTables {
for _, engineID := range engineIDs {
_, eID := backend.MakeUUID(tableName, engineID)
file := local.Engine{UUID: eID}
err := file.Exist(dir)
if err != nil {
log.FromContext(ctx).Error("can't find local file",
zap.String("table name", tableName),
zap.Int32("engine ID", engineID))
if os.IsNotExist(err) {
err = common.ErrCheckLocalFile.GenWithStackByArgs(tableName, dir)
} else {
err = common.ErrCheckLocalFile.Wrap(err).GenWithStackByArgs(tableName, dir)
}
return err
}
}
}
return nil
}
func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error {
estimatedChunkCount := 0.0
estimatedEngineCnt := int64(0)
batchSize := rc.cfg.Mydumper.BatchSize
if batchSize <= 0 {
// if rows in source files are not sorted by primary key(if primary is number or cluster index enabled),
// the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it.
batchSize = config.DefaultBatchSize
}
for _, dbMeta := range rc.dbMetas {
for _, tableMeta := range dbMeta.Tables {
tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name)
dbCp, err := rc.checkpointsDB.Get(ctx, tableName)
if err != nil {
return errors.Trace(err)
}
fileChunks := make(map[string]float64)
for engineID, eCp := range dbCp.Engines {
if eCp.Status < checkpoints.CheckpointStatusImported {
estimatedEngineCnt++
}
if engineID == indexEngineID {
continue
}
for _, c := range eCp.Chunks {
if _, ok := fileChunks[c.Key.Path]; !ok {
fileChunks[c.Key.Path] = 0.0
}
remainChunkCnt := float64(c.Chunk.EndOffset-c.Chunk.Offset) / float64(c.Chunk.EndOffset-c.Key.Offset)
fileChunks[c.Key.Path] += remainChunkCnt
}
}
// estimate engines count if engine cp is empty
if len(dbCp.Engines) == 0 {
estimatedEngineCnt += ((tableMeta.TotalSize + int64(batchSize) - 1) / int64(batchSize)) + 1
}
for _, fileMeta := range tableMeta.DataFiles {
if cnt, ok := fileChunks[fileMeta.FileMeta.Path]; ok {
estimatedChunkCount += cnt
continue
}
if fileMeta.FileMeta.Type == mydump.SourceTypeCSV {
cfg := rc.cfg.Mydumper
if fileMeta.FileMeta.FileSize > int64(cfg.MaxRegionSize) && cfg.StrictFormat && !cfg.CSV.Header {
estimatedChunkCount += math.Round(float64(fileMeta.FileMeta.FileSize) / float64(cfg.MaxRegionSize))
} else {
estimatedChunkCount++
}
} else {
estimatedChunkCount++
}
}
}
}
if m, ok := metric.FromContext(ctx); ok {
m.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(estimatedChunkCount)
m.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess).
Add(float64(estimatedEngineCnt))
}
rc.tidbGlue.Record(glue.RecordEstimatedChunk, uint64(estimatedChunkCount))
return nil
}
func firstErr(errors ...error) error {
for _, err := range errors {
if err != nil {
return err
}
}
return nil
}
func (rc *Controller) saveStatusCheckpoint(ctx context.Context, tableName string, engineID int32, err error, statusIfSucceed checkpoints.CheckpointStatus) error {
merger := &checkpoints.StatusCheckpointMerger{Status: statusIfSucceed, EngineID: engineID}
logger := log.FromContext(ctx).With(zap.String("table", tableName), zap.Int32("engine_id", engineID),
zap.String("new_status", statusIfSucceed.MetricName()), zap.Error(err))
logger.Debug("update checkpoint")
switch {
case err == nil:
case utils.MessageIsRetryableStorageError(err.Error()), common.IsContextCanceledError(err):
// recoverable error, should not be recorded in checkpoint
// which will prevent lightning from automatically recovering
return nil
default:
// unrecoverable error
merger.SetInvalid()
rc.errorSummaries.record(tableName, err, statusIfSucceed)
}
if m, ok := metric.FromContext(ctx); ok {
if engineID == checkpoints.WholeTableEngineID {
m.RecordTableCount(statusIfSucceed.MetricName(), err)
} else {
m.RecordEngineCount(statusIfSucceed.MetricName(), err)
}
}
waitCh := make(chan error, 1)
rc.saveCpCh <- saveCp{tableName: tableName, merger: merger, waitCh: waitCh}
select {
case saveCpErr := <-waitCh:
if saveCpErr != nil {
logger.Error("failed to save status checkpoint", log.ShortError(saveCpErr))
}
return saveCpErr
case <-ctx.Done():
return ctx.Err()
}
}
// listenCheckpointUpdates will combine several checkpoints together to reduce database load.
func (rc *Controller) listenCheckpointUpdates(logger log.Logger) {
var lock sync.Mutex
coalesed := make(map[string]*checkpoints.TableCheckpointDiff)
var waiters []chan<- error
hasCheckpoint := make(chan struct{}, 1)
defer close(hasCheckpoint)
go func() {
for range hasCheckpoint {
lock.Lock()
cpd := coalesed
coalesed = make(map[string]*checkpoints.TableCheckpointDiff)
ws := waiters
waiters = nil
lock.Unlock()
//nolint:scopelint // This would be either INLINED or ERASED, at compile time.
failpoint.Inject("SlowDownCheckpointUpdate", func() {})
if len(cpd) > 0 {
err := rc.checkpointsDB.Update(rc.taskCtx, cpd)
for _, w := range ws {
w <- common.NormalizeOrWrapErr(common.ErrUpdateCheckpoint, err)
}
web.BroadcastCheckpointDiff(cpd)
}
rc.checkpointsWg.Done()
}
}()
for scp := range rc.saveCpCh {
lock.Lock()
cpd, ok := coalesed[scp.tableName]
if !ok {
cpd = checkpoints.NewTableCheckpointDiff()
coalesed[scp.tableName] = cpd
}
scp.merger.MergeInto(cpd)
if scp.waitCh != nil {
waiters = append(waiters, scp.waitCh)
}
if len(hasCheckpoint) == 0 {
rc.checkpointsWg.Add(1)
hasCheckpoint <- struct{}{}
}
lock.Unlock()
//nolint:scopelint // This would be either INLINED or ERASED, at compile time.
failpoint.Inject("FailIfImportedChunk", func() {
if merger, ok := scp.merger.(*checkpoints.ChunkCheckpointMerger); ok && merger.Pos >= merger.EndOffset {
rc.checkpointsWg.Done()
rc.checkpointsWg.Wait()
panic("forcing failure due to FailIfImportedChunk")
}
})
//nolint:scopelint // This would be either INLINED or ERASED, at compile time.
failpoint.Inject("FailIfStatusBecomes", func(val failpoint.Value) {
if merger, ok := scp.merger.(*checkpoints.StatusCheckpointMerger); ok && merger.EngineID >= 0 && int(merger.Status) == val.(int) {
rc.checkpointsWg.Done()
rc.checkpointsWg.Wait()
panic("forcing failure due to FailIfStatusBecomes")
}
})
//nolint:scopelint // This would be either INLINED or ERASED, at compile time.
failpoint.Inject("FailIfIndexEngineImported", func(val failpoint.Value) {
if merger, ok := scp.merger.(*checkpoints.StatusCheckpointMerger); ok &&
merger.EngineID == checkpoints.WholeTableEngineID &&
merger.Status == checkpoints.CheckpointStatusIndexImported && val.(int) > 0 {
rc.checkpointsWg.Done()
rc.checkpointsWg.Wait()
panic("forcing failure due to FailIfIndexEngineImported")
}
})
//nolint:scopelint // This would be either INLINED or ERASED, at compile time.
failpoint.Inject("KillIfImportedChunk", func() {
if merger, ok := scp.merger.(*checkpoints.ChunkCheckpointMerger); ok && merger.Pos >= merger.EndOffset {
rc.checkpointsWg.Done()
rc.checkpointsWg.Wait()
if err := common.KillMySelf(); err != nil {
logger.Warn("KillMySelf() failed to kill itself", log.ShortError(err))
}
for scp := range rc.saveCpCh {
if scp.waitCh != nil {
scp.waitCh <- context.Canceled
}
}
failpoint.Return()
}
})
}
// Don't put this statement in defer function at the beginning. failpoint function may call it manually.
rc.checkpointsWg.Done()
}
// buildRunPeriodicActionAndCancelFunc build the runPeriodicAction func and a cancel func
func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, stop <-chan struct{}) (func(), func(bool)) {
cancelFuncs := make([]func(bool), 0)
closeFuncs := make([]func(), 0)
// a nil channel blocks forever.
// if the cron duration is zero we use the nil channel to skip the action.
var logProgressChan <-chan time.Time
if rc.cfg.Cron.LogProgress.Duration > 0 {
logProgressTicker := time.NewTicker(rc.cfg.Cron.LogProgress.Duration)
closeFuncs = append(closeFuncs, func() {
logProgressTicker.Stop()
})
logProgressChan = logProgressTicker.C
}
glueProgressTicker := time.NewTicker(3 * time.Second)
closeFuncs = append(closeFuncs, func() {
glueProgressTicker.Stop()
})
var switchModeChan <-chan time.Time
// tidb backend don't need to switch tikv to import mode
if rc.cfg.TikvImporter.Backend != config.BackendTiDB && rc.cfg.Cron.SwitchMode.Duration > 0 {
switchModeTicker := time.NewTicker(rc.cfg.Cron.SwitchMode.Duration)
cancelFuncs = append(cancelFuncs, func(bool) { switchModeTicker.Stop() })
cancelFuncs = append(cancelFuncs, func(do bool) {
if do {
rc.switchToNormalMode(ctx)
}
})
switchModeChan = switchModeTicker.C
}
var checkQuotaChan <-chan time.Time
// only local storage has disk quota concern.
if rc.cfg.TikvImporter.Backend == config.BackendLocal && rc.cfg.Cron.CheckDiskQuota.Duration > 0 {
checkQuotaTicker := time.NewTicker(rc.cfg.Cron.CheckDiskQuota.Duration)
cancelFuncs = append(cancelFuncs, func(bool) { checkQuotaTicker.Stop() })
checkQuotaChan = checkQuotaTicker.C
}
return func() {
defer func() {
for _, f := range closeFuncs {
f()
}
}()
if rc.cfg.Cron.SwitchMode.Duration > 0 {
rc.switchToImportMode(ctx)
}
start := time.Now()
for {
select {
case <-ctx.Done():
log.FromContext(ctx).Warn("stopping periodic actions", log.ShortError(ctx.Err()))
return
case <-stop:
log.FromContext(ctx).Info("everything imported, stopping periodic actions")
return
case <-switchModeChan:
// periodically switch to import mode, as requested by TiKV 3.0
rc.switchToImportMode(ctx)
case <-logProgressChan:
metrics, ok := metric.FromContext(ctx)
if !ok {
log.FromContext(ctx).Warn("couldn't find metrics from context, skip log progress")
continue
}
// log the current progress periodically, so OPS will know that we're still working
nanoseconds := float64(time.Since(start).Nanoseconds())
totalRestoreBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore))
restoredBytes := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateRestored))
// the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate
// before the last table start, so use the bigger of the two should be a workaround
estimated := metric.ReadCounter(metrics.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated))
pending := metric.ReadCounter(metrics.ChunkCounter.WithLabelValues(metric.ChunkStatePending))
if estimated < pending {
estimated = pending
}
finished := metric.ReadCounter(metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFinished))
totalTables := metric.ReadCounter(metrics.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess))
completedTables := metric.ReadCounter(metrics.TableCounter.WithLabelValues(metric.TableStateCompleted, metric.TableResultSuccess))
bytesRead := metric.ReadHistogramSum(metrics.RowReadBytesHistogram)
engineEstimated := metric.ReadCounter(metrics.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess))
enginePending := metric.ReadCounter(metrics.ProcessedEngineCounter.WithLabelValues(metric.ChunkStatePending, metric.TableResultSuccess))
if engineEstimated < enginePending {
engineEstimated = enginePending
}
engineFinished := metric.ReadCounter(metrics.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess))
bytesWritten := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten))
bytesImported := metric.ReadCounter(metrics.BytesCounter.WithLabelValues(metric.BytesStateImported))
var state string
var remaining zap.Field
switch {
case finished >= estimated:
if engineFinished < engineEstimated {
state = "importing"
} else {
state = "post-processing"
}
case finished > 0:
state = "writing"
default:
state = "preparing"
}
// lightning restore is separated into restore engine and import engine, they are both parallelized
// and pipelined between engines, so we can only weight the progress of those 2 phase to get the
// total progress.
//
// for local & importer backend:
// in most case import engine is faster since there's little computations, but inside one engine
// restore and import is serialized, the progress of those two will not differ too much, and
// import engine determines the end time of the whole restore, so we average them for now.
// the result progress may fall behind the real progress if import is faster.
//
// for tidb backend, we do nothing during import engine, so we use restore engine progress as the
// total progress.
restoreBytesField := zap.Skip()
importBytesField := zap.Skip()
remaining = zap.Skip()
totalPercent := 0.0
if restoredBytes > 0 {
restorePercent := math.Min(restoredBytes/totalRestoreBytes, 1.0)
metrics.ProgressGauge.WithLabelValues(metric.ProgressPhaseRestore).Set(restorePercent)
if rc.cfg.TikvImporter.Backend != config.BackendTiDB {
var importPercent float64
if bytesWritten > 0 {
// estimate total import bytes from written bytes
// when importPercent = 1, totalImportBytes = bytesWritten, but there's case
// bytesImported may bigger or smaller than bytesWritten such as when deduplicate
// we calculate progress using engines then use the bigger one in case bytesImported is
// smaller.
totalImportBytes := bytesWritten / restorePercent
biggerPercent := math.Max(bytesImported/totalImportBytes, engineFinished/engineEstimated)
importPercent = math.Min(biggerPercent, 1.0)
importBytesField = zap.String("import-bytes", fmt.Sprintf("%s/%s(estimated)",
units.BytesSize(bytesImported), units.BytesSize(totalImportBytes)))
}
metrics.ProgressGauge.WithLabelValues(metric.ProgressPhaseImport).Set(importPercent)
totalPercent = (restorePercent + importPercent) / 2
} else {
totalPercent = restorePercent
}
if totalPercent < 1.0 {
remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds
remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second))
}
restoreBytesField = zap.String("restore-bytes", fmt.Sprintf("%s/%s",
units.BytesSize(restoredBytes), units.BytesSize(totalRestoreBytes)))
}
metrics.ProgressGauge.WithLabelValues(metric.ProgressPhaseTotal).Set(totalPercent)
formatPercent := func(num, denom float64) string {
if denom > 0 {
return fmt.Sprintf(" (%.1f%%)", num/denom*100)
}
return ""
}
// avoid output bytes speed if there are no unfinished chunks
encodeSpeedField := zap.Skip()
if bytesRead > 0 {
encodeSpeedField = zap.Float64("encode speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds))
}
// Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour.
log.FromContext(ctx).Info("progress",
zap.String("total", fmt.Sprintf("%.1f%%", totalPercent*100)),
// zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)),
zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))),
zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))),
zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))),
restoreBytesField, importBytesField,
encodeSpeedField,
zap.String("state", state),
remaining,
)
case <-checkQuotaChan:
// verify the total space occupied by sorted-kv-dir is below the quota,
// otherwise we perform an emergency import.
rc.enforceDiskQuota(ctx)
case <-glueProgressTicker.C:
if m, ok := metric.FromContext(ctx); ok {
finished := metric.ReadCounter(m.ChunkCounter.WithLabelValues(metric.ChunkStateFinished))
rc.tidbGlue.Record(glue.RecordFinishedChunk, uint64(finished))
}
}
}
}, func(do bool) {
log.FromContext(ctx).Info("cancel periodic actions", zap.Bool("do", do))
for _, f := range cancelFuncs {
f(do)
}
}
}
type checksumManagerKeyType struct{}
var checksumManagerKey checksumManagerKeyType
const (
pauseGCTTLForDupeRes = time.Hour
pauseGCIntervalForDupeRes = time.Minute
)
func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
serviceID := "lightning-duplicate-resolution-" + uuid.New().String()
ttl := int64(pauseGCTTLForDupeRes / time.Second)
var (
safePoint uint64
paused bool
)
// Try to get the minimum safe point across all services as our GC safe point.
for i := 0; i < 10; i++ {
if i > 0 {
time.Sleep(time.Second * 3)
}
minSafePoint, err := pdCli.UpdateServiceGCSafePoint(ctx, serviceID, ttl, 1)
if err != nil {
pdCli.Close()
return nil, errors.Trace(err)
}
newMinSafePoint, err := pdCli.UpdateServiceGCSafePoint(ctx, serviceID, ttl, minSafePoint)
if err != nil {
pdCli.Close()
return nil, errors.Trace(err)
}
if newMinSafePoint <= minSafePoint {
safePoint = minSafePoint
paused = true
break
}
log.FromContext(ctx).Warn(
"Failed to register GC safe point because the current minimum safe point is newer"+
" than what we assume, will retry newMinSafePoint next time",
zap.Uint64("minSafePoint", minSafePoint),
zap.Uint64("newMinSafePoint", newMinSafePoint),
)
}
if !paused {
pdCli.Close()
return nil, common.ErrPauseGC.GenWithStack("failed to pause GC for duplicate resolution after all retries")
}
exitCh := make(chan struct{})
go func(safePoint uint64) {
defer pdCli.Close()
defer close(exitCh)
ticker := time.NewTicker(pauseGCIntervalForDupeRes)
defer ticker.Stop()
for {
select {
case <-ticker.C:
minSafePoint, err := pdCli.UpdateServiceGCSafePoint(ctx, serviceID, ttl, safePoint)
if err != nil {
log.FromContext(ctx).Warn("Failed to register GC safe point", zap.Error(err))
continue
}
if minSafePoint > safePoint {
log.FromContext(ctx).Warn("The current minimum safe point is newer than what we hold, duplicate records are at"+
"risk of being GC and not detectable",
zap.Uint64("safePoint", safePoint),
zap.Uint64("minSafePoint", minSafePoint),
)
safePoint = minSafePoint
}
case <-ctx.Done():
stopCtx, cancelFunc := context.WithTimeout(context.Background(), time.Second*5)
if _, err := pdCli.UpdateServiceGCSafePoint(stopCtx, serviceID, 0, safePoint); err != nil {
log.FromContext(ctx).Warn("Failed to reset safe point ttl to zero", zap.Error(err))
}
// just make compiler happy
cancelFunc()
return
}
}
}(safePoint)
return exitCh, nil
}
func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
// output error summary
defer rc.outpuErrorSummary()
if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
subCtx, cancel := context.WithCancel(ctx)
exitCh, err := rc.keepPauseGCForDupeRes(subCtx)
if err != nil {
cancel()
return errors.Trace(err)
}
defer func() {
cancel()
<-exitCh
}()
}
logTask := log.FromContext(ctx).Begin(zap.InfoLevel, "restore all tables data")
if rc.tableWorkers == nil {
rc.tableWorkers = worker.NewPool(ctx, rc.cfg.App.TableConcurrency, "table")
}
if rc.indexWorkers == nil {
rc.indexWorkers = worker.NewPool(ctx, rc.cfg.App.IndexConcurrency, "index")
}
// for local backend, we should disable some pd scheduler and change some settings, to
// make split region and ingest sst more stable
// because importer backend is mostly use for v3.x cluster which doesn't support these api,
// so we also don't do this for import backend
finishSchedulers := func() {
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
// if one lightning failed abnormally, and can't determine whether it needs to switch back,
// we do not do switch back automatically
switchBack := false
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
if isLocalBackend(rc.cfg) {
var (
restoreFn pdutil.UndoFunc
err error
)
if !rc.taskMgr.CanPauseSchedulerByKeyRange() {
logTask.Info("removing PD leader®ion schedulers")
restoreFn, err = rc.taskMgr.CheckAndPausePdSchedulers(ctx)
if err != nil {
return errors.Trace(err)
}
}
finishSchedulers = func() {
taskFinished := finalErr == nil
// use context.Background to make sure this restore function can still be executed even if ctx is canceled
restoreCtx := context.Background()
needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished)
if err != nil {
logTask.Warn("check restore pd schedulers failed", zap.Error(err))
return
}
switchBack = needSwitchBack
cleanup = needCleanup
if needSwitchBack && restoreFn != nil {
logTask.Info("add back PD leader®ion schedulers")
if restoreE := restoreFn(restoreCtx); restoreE != nil {
logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
}
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
// Disable GC because TiDB enables GC already.
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
fmt.Sprintf("tikv://%s?disableGC=true", rc.cfg.TiDB.PdAddr),
driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()),
)
if err != nil {
return errors.Trace(err)
}
manager, err := newChecksumManager(ctx, rc, kvStore)
if err != nil {
return errors.Trace(err)
}
ctx = context.WithValue(ctx, &checksumManagerKey, manager)
}
type task struct {
tr *TableRestore
cp *checkpoints.TableCheckpoint
}
totalTables := 0
for _, dbMeta := range rc.dbMetas {
totalTables += len(dbMeta.Tables)
}
postProcessTaskChan := make(chan task, totalTables)
var wg sync.WaitGroup
var restoreErr common.OnceError
stopPeriodicActions := make(chan struct{})
periodicActions, cancelFunc := rc.buildRunPeriodicActionAndCancelFunc(ctx, stopPeriodicActions)
go periodicActions()
defer close(stopPeriodicActions)
defer func() {
finishSchedulers()
cancelFunc(switchBack)
if err := postProgress(); err != nil {
logTask.End(zap.ErrorLevel, err)
finalErr = err
return
}
logTask.End(zap.ErrorLevel, nil)
// clean up task metas
if cleanup {
logTask.Info("cleanup task metas")
if cleanupErr := rc.taskMgr.Cleanup(context.Background()); cleanupErr != nil {
logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr))
}
// cleanup table meta and schema db if needed.
if err := rc.taskMgr.CleanupAllMetas(context.Background()); err != nil {
logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(err))
}
}
if kvStore != nil {
if err := kvStore.Close(); err != nil {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
}()
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
tableLogTask := task.tr.logger.Begin(zap.InfoLevel, "restore table")
web.BroadcastTableCheckpoint(task.tr.tableName, task.cp)
needPostProcess, err := task.tr.restoreTable(ctx, rc, task.cp)
err = common.NormalizeOrWrapErr(common.ErrRestoreTable, err, task.tr.tableName)
tableLogTask.End(zap.ErrorLevel, err)
web.BroadcastError(task.tr.tableName, err)
if m, ok := metric.FromContext(ctx); ok {
m.RecordTableCount(metric.TableStateCompleted, err)
}
restoreErr.Set(err)
if needPostProcess {
postProcessTaskChan <- task
}
wg.Done()
}
}()
}
var allTasks []task
var totalDataSizeToRestore int64
for _, dbMeta := range rc.dbMetas {
dbInfo := rc.dbInfos[dbMeta.Name]
for _, tableMeta := range dbMeta.Tables {
tableInfo := dbInfo.Tables[tableMeta.Name]
tableName := common.UniqueTable(dbInfo.Name, tableInfo.Name)
cp, err := rc.checkpointsDB.Get(ctx, tableName)
if err != nil {
return errors.Trace(err)
}
if cp.Status < checkpoints.CheckpointStatusAllWritten && len(tableMeta.DataFiles) == 0 {
continue
}
igCols, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(dbInfo.Name, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
allTasks = append(allTasks, task{tr: tr, cp: cp})
if len(cp.Engines) == 0 {
for _, fi := range tableMeta.DataFiles {
totalDataSizeToRestore += fi.FileMeta.FileSize
}
} else {
for _, eng := range cp.Engines {
for _, chunk := range eng.Chunks {
totalDataSizeToRestore += chunk.Chunk.EndOffset - chunk.Chunk.Offset
}
}
}
}
}
if m, ok := metric.FromContext(ctx); ok {
m.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore).Add(float64(totalDataSizeToRestore))
}
for i := range allTasks {
wg.Add(1)
select {
case taskCh <- allTasks[i]:
case <-ctx.Done():
return ctx.Err()
}
}
wg.Wait()
// if context is done, should return directly
select {
case <-ctx.Done():
err := restoreErr.Get()
if err == nil {
err = ctx.Err()
}
logTask.End(zap.ErrorLevel, err)
return err
default:
}
postProgress = func() error {
close(postProcessTaskChan)
// otherwise, we should run all tasks in the post-process task chan
for i := 0; i < rc.cfg.App.TableConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range postProcessTaskChan {
metaMgr := rc.metaMgrBuilder.TableMetaMgr(task.tr)
// force all the remain post-process tasks to be executed
_, err2 := task.tr.postProcess(ctx, rc, task.cp, true, metaMgr)
restoreErr.Set(err2)
}
}()
}
wg.Wait()
return restoreErr.Get()
}
return nil
}
func (tr *TableRestore) restoreTable(
ctx context.Context,
rc *Controller,
cp *checkpoints.TableCheckpoint,
) (bool, error) {
// 1. Load the table info.
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
metaMgr := rc.metaMgrBuilder.TableMetaMgr(tr)
// no need to do anything if the chunks are already populated
if len(cp.Engines) > 0 {
tr.logger.Info("reusing engines and files info from checkpoint",
zap.Int("enginesCnt", len(cp.Engines)),
zap.Int("filesCnt", cp.CountChunks()),
)
} else if cp.Status < checkpoints.CheckpointStatusAllWritten {
if err := tr.populateChunks(ctx, rc, cp); err != nil {
return false, errors.Trace(err)
}
// fetch the max chunk row_id max value as the global max row_id
rowIDMax := int64(0)
for _, engine := range cp.Engines {
if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > rowIDMax {
rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax
}
}
db, _ := rc.tidbGlue.GetDB()
versionStr, err := version.FetchVersion(ctx, db)
if err != nil {
return false, errors.Trace(err)
}
versionInfo := version.ParseServerInfo(versionStr)
// "show table next_row_id" is only available after tidb v4.0.0
if versionInfo.ServerVersion.Major >= 4 && isLocalBackend(rc.cfg) {
// first, insert a new-line into meta table
if err = metaMgr.InitTableMeta(ctx); err != nil {
return false, err
}
checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, rowIDMax)
if err != nil {
return false, err
}
tr.RebaseChunkRowIDs(cp, rowIDBase)
if checksum != nil {
if cp.Checksum != *checksum {
cp.Checksum = *checksum
rc.saveCpCh <- saveCp{
tableName: tr.tableName,
merger: &checkpoints.TableChecksumMerger{
Checksum: cp.Checksum,
},
}
}
tr.logger.Info("checksum before restore table", zap.Object("checksum", &cp.Checksum))
}
}
if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, tr.tableName, cp.Engines); err != nil {
return false, errors.Trace(err)
}
web.BroadcastTableCheckpoint(tr.tableName, cp)
// rebase the allocator so it exceeds the number of rows.
if tr.tableInfo.Core.PKIsHandle && tr.tableInfo.Core.ContainsAutoRandomBits() {
cp.AllocBase = mathutil.Max(cp.AllocBase, tr.tableInfo.Core.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
return false, err
}
} else {
cp.AllocBase = mathutil.Max(cp.AllocBase, tr.tableInfo.Core.AutoIncID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
return false, err
}
}
rc.saveCpCh <- saveCp{
tableName: tr.tableName,
merger: &checkpoints.RebaseCheckpointMerger{
AllocBase: cp.AllocBase,
},
}
}
// 2. Restore engines (if still needed)
err := tr.restoreEngines(ctx, rc, cp)
if err != nil {
return false, errors.Trace(err)
}
err = metaMgr.UpdateTableStatus(ctx, metaStatusRestoreFinished)
if err != nil {
return false, errors.Trace(err)
}
// 3. Post-process. With the last parameter set to false, we can allow delay analyze execute latter
return tr.postProcess(ctx, rc, cp, false /* force-analyze */, metaMgr)
}
func (rc *Controller) outpuErrorSummary() {
if rc.errorMgr.HasError() {
fmt.Println(rc.errorMgr.Output())
}
}
// do full compaction for the whole data.
func (rc *Controller) fullCompact(ctx context.Context) error {
if !rc.cfg.PostRestore.Compact {
log.FromContext(ctx).Info("skip full compaction")
return nil
}
// wait until any existing level-1 compact to complete first.
task := log.FromContext(ctx).Begin(zap.InfoLevel, "wait for completion of existing level 1 compaction")
for !rc.compactState.CAS(compactStateIdle, compactStateDoing) {
time.Sleep(100 * time.Millisecond)
}
task.End(zap.ErrorLevel, nil)
return errors.Trace(rc.doCompact(ctx, FullLevelCompact))
}
func (rc *Controller) doCompact(ctx context.Context, level int32) error {
tls := rc.tls.WithHost(rc.cfg.TiDB.PdAddr)
return tikv.ForAllStores(
ctx,
tls,
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
return tikv.Compact(c, tls, store.Address, level)
},
)
}
func (rc *Controller) switchToImportMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import)
}
func (rc *Controller) switchToNormalMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal)
}
func (rc *Controller) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
// // tidb backend don't need to switch tikv to import mode
if isTiDBBackend(rc.cfg) {
return
}
log.FromContext(ctx).Info("switch import mode", zap.Stringer("mode", mode))
// It is fine if we miss some stores which did not switch to Import mode,
// since we're running it periodically, so we exclude disconnected stores.
// But it is essential all stores be switched back to Normal mode to allow
// normal operation.
var minState tikv.StoreState
if mode == sstpb.SwitchMode_Import {
minState = tikv.StoreStateOffline
} else {
minState = tikv.StoreStateDisconnected
}
tls := rc.tls.WithHost(rc.cfg.TiDB.PdAddr)
// we ignore switch mode failure since it is not fatal.
// no need log the error, it is done in kv.SwitchMode already.
_ = tikv.ForAllStores(
ctx,
tls,
minState,
func(c context.Context, store *tikv.Store) error {
return tikv.SwitchMode(c, tls, store.Address, mode)
},
)
}
func (rc *Controller) enforceDiskQuota(ctx context.Context) {
if !rc.diskQuotaState.CAS(diskQuotaStateIdle, diskQuotaStateChecking) {
// do not run multiple the disk quota check / import simultaneously.
// (we execute the lock check in background to avoid blocking the cron thread)
return
}
go func() {
// locker is assigned when we detect the disk quota is exceeded.
// before the disk quota is confirmed exceeded, we keep the diskQuotaLock
// unlocked to avoid periodically interrupting the writer threads.
var locker sync.Locker
defer func() {
rc.diskQuotaState.Store(diskQuotaStateIdle)
if locker != nil {
locker.Unlock()
}
}()
isRetrying := false
for {
// sleep for a cycle if we are retrying because there is nothing new to import.
if isRetrying {
select {
case <-ctx.Done():
return
case <-time.After(rc.cfg.Cron.CheckDiskQuota.Duration):
}
} else {
isRetrying = true
}
quota := int64(rc.cfg.TikvImporter.DiskQuota)
largeEngines, inProgressLargeEngines, totalDiskSize, totalMemSize := rc.backend.CheckDiskQuota(quota)
if m, ok := metric.FromContext(ctx); ok {
m.LocalStorageUsageBytesGauge.WithLabelValues("disk").Set(float64(totalDiskSize))
m.LocalStorageUsageBytesGauge.WithLabelValues("mem").Set(float64(totalMemSize))
}
logger := log.FromContext(ctx).With(
zap.Int64("diskSize", totalDiskSize),
zap.Int64("memSize", totalMemSize),
zap.Int64("quota", quota),
zap.Int("largeEnginesCount", len(largeEngines)),
zap.Int("inProgressLargeEnginesCount", inProgressLargeEngines))
if len(largeEngines) == 0 && inProgressLargeEngines == 0 {
logger.Debug("disk quota respected")
return
}
if locker == nil {
// blocks all writers when we detected disk quota being exceeded.
rc.diskQuotaLock.Lock()
locker = &rc.diskQuotaLock
}
logger.Warn("disk quota exceeded")
if len(largeEngines) == 0 {
logger.Warn("all large engines are already importing, keep blocking all writes")
continue
}
// flush all engines so that checkpoints can be updated.
if err := rc.backend.FlushAll(ctx); err != nil {
logger.Error("flush engine for disk quota failed, check again later", log.ShortError(err))
return
}
// at this point, all engines are synchronized on disk.
// we then import every large engines one by one and complete.
// if any engine failed to import, we just try again next time, since the data are still intact.
rc.diskQuotaState.Store(diskQuotaStateImporting)
task := logger.Begin(zap.WarnLevel, "importing large engines for disk quota")
var importErr error
for _, engine := range largeEngines {
// Use a larger split region size to avoid split the same region by many times.
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
importErr = multierr.Append(importErr, err)
}
}
task.End(zap.ErrorLevel, importErr)
return
}
}()
}
func (rc *Controller) setGlobalVariables(ctx context.Context) error {
// skip for tidb backend to be compatible with MySQL
if isTiDBBackend(rc.cfg) {
return nil
}
// set new collation flag base on tidb config
enabled, err := ObtainNewCollationEnabled(ctx, rc.tidbGlue.GetSQLExecutor())
if err != nil {
return err
}
// we should enable/disable new collation here since in server mode, tidb config
// may be different in different tasks
collate.SetNewCollationEnabledForTest(enabled)
log.FromContext(ctx).Info("new_collation_enabled", zap.Bool("enabled", enabled))
return nil
}
func (rc *Controller) waitCheckpointFinish() {
// wait checkpoint process finish so that we can do cleanup safely
close(rc.saveCpCh)
rc.checkpointsWg.Wait()
}
func (rc *Controller) cleanCheckpoints(ctx context.Context) error {
rc.waitCheckpointFinish()
if !rc.cfg.Checkpoint.Enable {
return nil
}
logger := log.FromContext(ctx).With(
zap.Stringer("keepAfterSuccess", rc.cfg.Checkpoint.KeepAfterSuccess),
zap.Int64("taskID", rc.cfg.TaskID),
)
task := logger.Begin(zap.InfoLevel, "clean checkpoints")
var err error
switch rc.cfg.Checkpoint.KeepAfterSuccess {
case config.CheckpointRename:
err = rc.checkpointsDB.MoveCheckpoints(ctx, rc.cfg.TaskID)
case config.CheckpointRemove:
err = rc.checkpointsDB.RemoveCheckpoint(ctx, "all")
}
task.End(zap.ErrorLevel, err)
if err != nil {
return common.ErrCleanCheckpoint.Wrap(err).GenWithStackByArgs()
}
return nil
}
func isLocalBackend(cfg *config.Config) bool {
return cfg.TikvImporter.Backend == config.BackendLocal
}
func isTiDBBackend(cfg *config.Config) bool {
return cfg.TikvImporter.Backend == config.BackendTiDB
}
// preCheckRequirements checks
// 1. Cluster resource
// 2. Local node resource
// 3. Cluster region
// 4. Lightning configuration
// before restore tables start.
func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err := rc.DataCheck(ctx); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
if err := rc.ClusterIsAvailable(ctx); err != nil {
return errors.Trace(err)
}
if rc.ownStore {
if err := rc.StoragePermission(ctx); err != nil {
return errors.Trace(err)
}
}
}
if err := rc.metaMgrBuilder.Init(ctx); err != nil {
return common.ErrInitMetaManager.Wrap(err).GenWithStackByArgs()
}
taskExist := false
// We still need to sample source data even if this task has existed, because we need to judge whether the
// source is in order as row key to decide how to sort local data.
estimatedSizeResult, err := rc.preInfoGetter.EstimateSourceDataSize(ctx)
if err != nil {
return common.ErrCheckDataSource.Wrap(err).GenWithStackByArgs()
}
estimatedDataSizeWithIndex := estimatedSizeResult.SizeWithIndex
// Do not import with too large concurrency because these data may be all unsorted.
if estimatedSizeResult.HasUnsortedBigTables {
if rc.cfg.App.TableConcurrency > rc.cfg.App.IndexConcurrency {
rc.cfg.App.TableConcurrency = rc.cfg.App.IndexConcurrency
}
}
if rc.status != nil {
rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex)
}
if isLocalBackend(rc.cfg) {
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
if err != nil {
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}
// PdController will be closed when `taskMetaMgr` closes.
rc.taskMgr = rc.metaMgrBuilder.TaskMetaMgr(pdController)
taskExist, err = rc.taskMgr.CheckTaskExist(ctx)
if err != nil {
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
}
if !taskExist {
if err = rc.taskMgr.InitTask(ctx, estimatedDataSizeWithIndex); err != nil {
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
}
}
if rc.cfg.App.CheckRequirements {
needCheck := true
if rc.cfg.Checkpoint.Enable {
taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx)
if err != nil {
return common.ErrReadCheckpoint.Wrap(err).GenWithStack("get task checkpoint failed")
}
// If task checkpoint is initialized, it means check has been performed before.
// We don't need and shouldn't check again, because lightning may have already imported some data.
needCheck = taskCheckpoints == nil
}
if needCheck {
err = rc.localResource(ctx)
if err != nil {
return common.ErrCheckLocalResource.Wrap(err).GenWithStackByArgs()
}
if err := rc.clusterResource(ctx); err != nil {
if err1 := rc.taskMgr.CleanupTask(ctx); err1 != nil {
log.FromContext(ctx).Warn("cleanup task failed", zap.Error(err1))
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
}
}
if err := rc.checkClusterRegion(ctx); err != nil {
return common.ErrCheckClusterRegion.Wrap(err).GenWithStackByArgs()
}
}
}
}
if rc.tidbGlue.OwnsSQLExecutor() && rc.cfg.App.CheckRequirements {
fmt.Println(rc.checkTemplate.Output())
}
if !rc.checkTemplate.Success() {
if !taskExist && rc.taskMgr != nil {
err := rc.taskMgr.CleanupTask(ctx)
if err != nil {
log.FromContext(ctx).Warn("cleanup task failed", zap.Error(err))
}
}
return common.ErrPreCheckFailed.GenWithStackByArgs(rc.checkTemplate.FailedMsg())
}
return nil
}
// DataCheck checks the data schema which needs #rc.restoreSchema finished.
func (rc *Controller) DataCheck(ctx context.Context) error {
if rc.cfg.App.CheckRequirements {
if err := rc.HasLargeCSV(ctx); err != nil {
return errors.Trace(err)
}
}
if err := rc.checkCheckpoints(ctx); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
if err := rc.checkSourceSchema(ctx); err != nil {
return errors.Trace(err)
}
}
if err := rc.checkTableEmpty(ctx); err != nil {
return common.ErrCheckTableEmpty.Wrap(err).GenWithStackByArgs()
}
if err := rc.checkCSVHeader(ctx); err != nil {
return common.ErrCheckCSVHeader.Wrap(err).GenWithStackByArgs()
}
return nil
}
type chunkRestore struct {
parser mydump.Parser
index int
chunk *checkpoints.ChunkCheckpoint
}
func newChunkRestore(
ctx context.Context,
index int,
cfg *config.Config,
chunk *checkpoints.ChunkCheckpoint,
ioWorkers *worker.Pool,
store storage.ExternalStorage,
tableInfo *checkpoints.TidbTableInfo,
) (*chunkRestore, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)
var reader storage.ReadSeekCloser
var err error
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize)
} else {
reader, err = store.Open(ctx, chunk.FileMeta.Path)
}
if err != nil {
return nil, errors.Trace(err)
}
var parser mydump.Parser
switch chunk.FileMeta.Type {
case mydump.SourceTypeCSV:
hasHeader := cfg.Mydumper.CSV.Header && chunk.Chunk.Offset == 0
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace)
if err != nil {
return nil, err
}
parser, err = mydump.NewCSVParser(ctx, &cfg.Mydumper.CSV, reader, blockBufSize, ioWorkers, hasHeader, charsetConvertor)
if err != nil {
return nil, errors.Trace(err)
}
case mydump.SourceTypeSQL:
parser = mydump.NewChunkParser(ctx, cfg.TiDB.SQLMode, reader, blockBufSize, ioWorkers)
case mydump.SourceTypeParquet:
parser, err = mydump.NewParquetParser(ctx, store, reader, chunk.FileMeta.Path)
if err != nil {
return nil, errors.Trace(err)
}
default:
panic(fmt.Sprintf("file '%s' with unknown source type '%s'", chunk.Key.Path, chunk.FileMeta.Type.String()))
}
if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
}
if len(chunk.ColumnPermutation) > 0 {
parser.SetColumns(getColumnNames(tableInfo.Core, chunk.ColumnPermutation))
}
return &chunkRestore{
parser: parser,
index: index,
chunk: chunk,
}, nil
}
func (cr *chunkRestore) close() {
_ = cr.parser.Close()
}
func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string {
colIndexes := make([]int, 0, len(permutation))
for i := 0; i < len(permutation); i++ {
colIndexes = append(colIndexes, -1)
}
colCnt := 0
for i, p := range permutation {
if p >= 0 {
colIndexes[p] = i
colCnt++
}
}
names := make([]string, 0, colCnt)
for _, idx := range colIndexes {
// skip columns with index -1
if idx >= 0 {
// original fields contains _tidb_rowid field
if idx == len(tableInfo.Columns) {
names = append(names, model.ExtraHandleName.O)
} else {
names = append(names, tableInfo.Columns[idx].Name.O)
}
}
}
return names
}
var (
maxKVQueueSize = 32 // Cache at most this number of rows before blocking the encode loop
minDeliverBytes uint64 = 96 * units.KiB // 96 KB (data + index). batch at least this amount of bytes to reduce number of messages
)
type deliveredKVs struct {
kvs kv.Row // if kvs is nil, this indicated we've got the last message.
columns []string
offset int64
rowID int64
}
type deliverResult struct {
totalDur time.Duration
err error
}
//nolint:nakedret // TODO: refactor
func (cr *chunkRestore) deliverLoop(
ctx context.Context,
kvsCh <-chan []deliveredKVs,
t *TableRestore,
engineID int32,
dataEngine, indexEngine *backend.LocalEngineWriter,
rc *Controller,
) (deliverTotalDur time.Duration, err error) {
deliverLogger := t.logger.With(
zap.Int32("engineNumber", engineID),
zap.Int("fileIndex", cr.index),
zap.Stringer("path", &cr.chunk.Key),
zap.String("task", "deliver"),
)
// Fetch enough KV pairs from the source.
dataKVs := rc.backend.MakeEmptyRows()
indexKVs := rc.backend.MakeEmptyRows()
dataSynced := true
hasMoreKVs := true
for hasMoreKVs {
var dataChecksum, indexChecksum verify.KVChecksum
var columns []string
var kvPacket []deliveredKVs
// init these two field as checkpoint current value, so even if there are no kv pairs delivered,
// chunk checkpoint should stay the same
startOffset := cr.chunk.Chunk.Offset
currOffset := startOffset
rowID := cr.chunk.Chunk.PrevRowIDMax
populate:
for dataChecksum.SumSize()+indexChecksum.SumSize() < minDeliverBytes {
select {
case kvPacket = <-kvsCh:
if len(kvPacket) == 0 {
hasMoreKVs = false
break populate
}
for _, p := range kvPacket {
if p.kvs == nil {
// This is the last message.
currOffset = p.offset
hasMoreKVs = false
break populate
}
p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
columns = p.columns
currOffset = p.offset
rowID = p.rowID
}
case <-ctx.Done():
err = ctx.Err()
return
}
}
err = func() error {
// We use `TryRLock` with sleep here to avoid blocking current goroutine during importing when disk-quota is
// triggered, so that we can save chunkCheckpoint as soon as possible after `FlushEngine` is called.
// This implementation may not be very elegant or even completely correct, but it is currently a relatively
// simple and effective solution.
for !rc.diskQuotaLock.TryRLock() {
// try to update chunk checkpoint, this can help save checkpoint after importing when disk-quota is triggered
if !dataSynced {
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine)
}
time.Sleep(time.Millisecond)
}
defer rc.diskQuotaLock.RUnlock()
// Write KVs into the engine
start := time.Now()
if err = dataEngine.WriteRows(ctx, columns, dataKVs); err != nil {
if !common.IsContextCanceledError(err) {
deliverLogger.Error("write to data engine failed", log.ShortError(err))
}
return errors.Trace(err)
}
if err = indexEngine.WriteRows(ctx, columns, indexKVs); err != nil {
if !common.IsContextCanceledError(err) {
deliverLogger.Error("write to index engine failed", log.ShortError(err))
}
return errors.Trace(err)
}
if m, ok := metric.FromContext(ctx); ok {
deliverDur := time.Since(start)
deliverTotalDur += deliverDur
m.BlockDeliverSecondsHistogram.Observe(deliverDur.Seconds())
m.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindData).Observe(float64(dataChecksum.SumSize()))
m.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindIndex).Observe(float64(indexChecksum.SumSize()))
m.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindData).Observe(float64(dataChecksum.SumKVS()))
m.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindIndex).Observe(float64(indexChecksum.SumKVS()))
}
return nil
}()
if err != nil {
return
}
dataSynced = false
dataKVs = dataKVs.Clear()
indexKVs = indexKVs.Clear()
// Update the table, and save a checkpoint.
// (the write to the importer is effective immediately, thus update these here)
// No need to apply a lock since this is the only thread updating `cr.chunk.**`.
// In local mode, we should write these checkpoints after engine flushed.
lastOffset := cr.chunk.Chunk.Offset
cr.chunk.Checksum.Add(&dataChecksum)
cr.chunk.Checksum.Add(&indexChecksum)
cr.chunk.Chunk.Offset = currOffset
cr.chunk.Chunk.PrevRowIDMax = rowID
if m, ok := metric.FromContext(ctx); ok {
// value of currOffset comes from parser.pos which increase monotonically. the init value of parser.pos
// comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0.
// but we met it one time, but cannot reproduce it now, we add this check to make code more robust
// TODO: reproduce and find the root cause and fix it completely
if currOffset >= startOffset {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset),
zap.Int64("start", startOffset))
}
}
if currOffset > lastOffset || dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 {
// No need to save checkpoint if nothing was delivered.
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine)
}
failpoint.Inject("SlowDownWriteRows", func() {
deliverLogger.Warn("Slowed down write rows")
})
failpoint.Inject("FailAfterWriteRows", nil)
// TODO: for local backend, we may save checkpoint more frequently, e.g. after written
// 10GB kv pairs to data engine, we can do a flush for both data & index engine, then we
// can safely update current checkpoint.
failpoint.Inject("LocalBackendSaveCheckpoint", func() {
if !isLocalBackend(rc.cfg) && (dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0) {
// No need to save checkpoint if nothing was delivered.
saveCheckpoint(rc, t, engineID, cr.chunk)
}
})
}
return
}
func (cr *chunkRestore) maybeSaveCheckpoint(
rc *Controller,
t *TableRestore,
engineID int32,
chunk *checkpoints.ChunkCheckpoint,
data, index *backend.LocalEngineWriter,
) bool {
if data.IsSynced() && index.IsSynced() {
saveCheckpoint(rc, t, engineID, chunk)
return true
}
return false
}
func saveCheckpoint(rc *Controller, t *TableRestore, engineID int32, chunk *checkpoints.ChunkCheckpoint) {
// We need to update the AllocBase every time we've finished a file.
// The AllocBase is determined by the maximum of the "handle" (_tidb_rowid
// or integer primary key), which can only be obtained by reading all data.
var base int64
if t.tableInfo.Core.PKIsHandle && t.tableInfo.Core.ContainsAutoRandomBits() {
base = t.alloc.Get(autoid.AutoRandomType).Base() + 1
} else {
base = t.alloc.Get(autoid.RowIDAllocType).Base() + 1
}
rc.saveCpCh <- saveCp{
tableName: t.tableName,
merger: &checkpoints.RebaseCheckpointMerger{
AllocBase: base,
},
}
rc.saveCpCh <- saveCp{
tableName: t.tableName,
merger: &checkpoints.ChunkCheckpointMerger{
EngineID: engineID,
Key: chunk.Key,
Checksum: chunk.Checksum,
Pos: chunk.Chunk.Offset,
RowID: chunk.Chunk.PrevRowIDMax,
ColumnPermutation: chunk.ColumnPermutation,
EndOffset: chunk.Chunk.EndOffset,
},
}
}
//nolint:nakedret // TODO: refactor
func (cr *chunkRestore) encodeLoop(
ctx context.Context,
kvsCh chan<- []deliveredKVs,
t *TableRestore,
logger log.Logger,
kvEncoder kv.Encoder,
deliverCompleteCh <-chan deliverResult,
rc *Controller,
) (readTotalDur time.Duration, encodeTotalDur time.Duration, err error) {
defer close(kvsCh)
send := func(kvs []deliveredKVs) error {
select {
case kvsCh <- kvs:
return nil
case <-ctx.Done():
return ctx.Err()
case deliverResult, ok := <-deliverCompleteCh:
if deliverResult.err == nil && !ok {
deliverResult.err = ctx.Err()
}
if deliverResult.err == nil {
deliverResult.err = errors.New("unexpected premature fulfillment")
logger.DPanic("unexpected: deliverCompleteCh prematurely fulfilled with no error", zap.Bool("chIsOpen", ok))
}
return errors.Trace(deliverResult.err)
}
}
pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs
initializedColumns, reachEOF := false, false
// filteredColumns is column names that excluded ignored columns
// WARN: this might be not correct when different SQL statements contains different fields,
// but since ColumnPermutation also depends on the hypothesis that the columns in one source file is the same
// so this should be ok.
var filteredColumns []string
ignoreColumns, err1 := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(t.dbInfo.Name, t.tableInfo.Core.Name.O, rc.cfg.Mydumper.CaseSensitive)
if err1 != nil {
err = err1
return
}
for !reachEOF {
if err = pauser.Wait(ctx); err != nil {
return
}
offset, _ := cr.parser.Pos()
if offset >= cr.chunk.Chunk.EndOffset {
break
}
var readDur, encodeDur time.Duration
canDeliver := false
kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt)
curOffset := offset
var newOffset, rowID int64
var kvSize uint64
outLoop:
for !canDeliver {
readDurStart := time.Now()
err = cr.parser.ReadRow()
columnNames := cr.parser.Columns()
newOffset, rowID = cr.parser.Pos()
switch errors.Cause(err) {
case nil:
if !initializedColumns {
if len(cr.chunk.ColumnPermutation) == 0 {
if err = t.initializeColumns(columnNames, cr.chunk); err != nil {
return
}
}
filteredColumns = columnNames
if ignoreColumns != nil && len(ignoreColumns.Columns) > 0 {
filteredColumns = make([]string, 0, len(columnNames))
ignoreColsMap := ignoreColumns.ColumnsMap()
if len(columnNames) > 0 {
for _, c := range columnNames {
if _, ok := ignoreColsMap[c]; !ok {
filteredColumns = append(filteredColumns, c)
}
}
} else {
// init column names by table schema
// after filtered out some columns, we must explicitly set the columns for TiDB backend
for _, col := range t.tableInfo.Core.Columns {
if _, ok := ignoreColsMap[col.Name.L]; !col.Hidden && !ok {
filteredColumns = append(filteredColumns, col.Name.O)
}
}
}
}
initializedColumns = true
}
case io.EOF:
reachEOF = true
break outLoop
default:
err = common.ErrEncodeKV.Wrap(err).GenWithStackByArgs(&cr.chunk.Key, newOffset)
return
}
readDur += time.Since(readDurStart)
encodeDurStart := time.Now()
lastRow := cr.parser.LastRow()
// sql -> kv
kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation, cr.chunk.Key.Path, curOffset)
encodeDur += time.Since(encodeDurStart)
hasIgnoredEncodeErr := false
if encodeErr != nil {
rowText := tidb.EncodeRowForRecord(ctx, t.encTable, rc.cfg.TiDB.SQLMode, lastRow.Row, cr.chunk.ColumnPermutation)
encodeErr = rc.errorMgr.RecordTypeError(ctx, logger, t.tableName, cr.chunk.Key.Path, newOffset, rowText, encodeErr)
if encodeErr != nil {
err = common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(&cr.chunk.Key, newOffset)
}
hasIgnoredEncodeErr = true
}
cr.parser.RecycleRow(lastRow)
curOffset = newOffset
if err != nil {
return
}
if hasIgnoredEncodeErr {
continue
}
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID})
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
})
// pebble cannot allow > 4.0G kv in one batch.
// we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt.
// so add this check.
if kvSize >= minDeliverBytes || len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset {
canDeliver = true
kvSize = 0
}
}
encodeTotalDur += encodeDur
readTotalDur += readDur
if m, ok := metric.FromContext(ctx); ok {
m.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds())
m.RowReadSecondsHistogram.Observe(readDur.Seconds())
m.RowReadBytesHistogram.Observe(float64(newOffset - offset))
}
if len(kvPacket) != 0 {
deliverKvStart := time.Now()
if err = send(kvPacket); err != nil {
return
}
if m, ok := metric.FromContext(ctx); ok {
m.RowKVDeliverSecondsHistogram.Observe(time.Since(deliverKvStart).Seconds())
}
}
}
err = send([]deliveredKVs{{offset: cr.chunk.Chunk.EndOffset}})
return
}
func (cr *chunkRestore) restore(
ctx context.Context,
t *TableRestore,
engineID int32,
dataEngine, indexEngine *backend.LocalEngineWriter,
rc *Controller,
) error {
// Create the encoder.
kvEncoder, err := rc.backend.NewEncoder(ctx, t.encTable, &kv.SessionOptions{
SQLMode: rc.cfg.TiDB.SQLMode,
Timestamp: cr.chunk.Timestamp,
SysVars: rc.sysVars,
// use chunk.PrevRowIDMax as the auto random seed, so it can stay the same value after recover from checkpoint.
AutoRandomSeed: cr.chunk.Chunk.PrevRowIDMax,
})
if err != nil {
return err
}
defer kvEncoder.Close()
kvsCh := make(chan []deliveredKVs, maxKVQueueSize)
deliverCompleteCh := make(chan deliverResult)
go func() {
defer close(deliverCompleteCh)
dur, err := cr.deliverLoop(ctx, kvsCh, t, engineID, dataEngine, indexEngine, rc)
select {
case <-ctx.Done():
case deliverCompleteCh <- deliverResult{dur, err}:
}
}()
logTask := t.logger.With(
zap.Int32("engineNumber", engineID),
zap.Int("fileIndex", cr.index),
zap.Stringer("path", &cr.chunk.Key),
).Begin(zap.InfoLevel, "restore file")
readTotalDur, encodeTotalDur, encodeErr := cr.encodeLoop(ctx, kvsCh, t, logTask.Logger, kvEncoder, deliverCompleteCh, rc)
var deliverErr error
select {
case deliverResult, ok := <-deliverCompleteCh:
if ok {
logTask.End(zap.ErrorLevel, deliverResult.err,
zap.Duration("readDur", readTotalDur),
zap.Duration("encodeDur", encodeTotalDur),
zap.Duration("deliverDur", deliverResult.totalDur),
zap.Object("checksum", &cr.chunk.Checksum),
)
deliverErr = deliverResult.err
} else {
// else, this must cause by ctx cancel
deliverErr = ctx.Err()
}
case <-ctx.Done():
deliverErr = ctx.Err()
}
return errors.Trace(firstErr(encodeErr, deliverErr))
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦