tidb ddl 源码

  • 2022-09-19
  • 浏览 (553)

tidb ddl 代码

文件路径:/ddl/ddl.go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2013 The ql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSES/QL-LICENSE file.

package ddl

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"runtime"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/google/uuid"
	"github.com/ngaut/pools"
	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/kvproto/pkg/kvrpcpb"
	"github.com/pingcap/tidb/config"
	"github.com/pingcap/tidb/ddl/ingest"
	"github.com/pingcap/tidb/ddl/syncer"
	"github.com/pingcap/tidb/ddl/util"
	"github.com/pingcap/tidb/domain/infosync"
	"github.com/pingcap/tidb/infoschema"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/meta"
	"github.com/pingcap/tidb/metrics"
	"github.com/pingcap/tidb/owner"
	"github.com/pingcap/tidb/parser/ast"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/parser/terror"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/sessionctx/binloginfo"
	"github.com/pingcap/tidb/sessionctx/variable"
	"github.com/pingcap/tidb/sessiontxn"
	"github.com/pingcap/tidb/statistics/handle"
	"github.com/pingcap/tidb/table"
	pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client"
	tidbutil "github.com/pingcap/tidb/util"
	"github.com/pingcap/tidb/util/chunk"
	"github.com/pingcap/tidb/util/dbterror"
	"github.com/pingcap/tidb/util/gcutil"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/mathutil"
	"github.com/pingcap/tidb/util/sqlexec"
	"github.com/tikv/client-go/v2/tikvrpc"
	clientv3 "go.etcd.io/etcd/client/v3"
	atomicutil "go.uber.org/atomic"
	"go.uber.org/zap"
	"golang.org/x/exp/slices"
)

const (
	// currentVersion is for all new DDL jobs.
	currentVersion = 1
	// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
	DDLOwnerKey = "/tidb/ddl/fg/owner"
	// addingDDLJobPrefix is the path prefix used to record the newly added DDL job, and it's saved to etcd.
	addingDDLJobPrefix = "/tidb/ddl/add_ddl_job_"
	ddlPrompt          = "ddl"

	shardRowIDBitsMax = 15

	batchAddingJobs = 10

	reorgWorkerCnt   = 10
	generalWorkerCnt = 1
)

// OnExist specifies what to do when a new object has a name collision.
type OnExist uint8

const (
	// OnExistError throws an error on name collision.
	OnExistError OnExist = iota
	// OnExistIgnore skips creating the new object.
	OnExistIgnore
	// OnExistReplace replaces the old object by the new object. This is only
	// supported by VIEWs at the moment. For other object types, this is
	// equivalent to OnExistError.
	OnExistReplace

	jobRecordCapacity = 16
)

var (
	// EnableSplitTableRegion is a flag to decide whether to split a new region for
	// a newly created table. It takes effect only if the Storage supports split
	// region.
	EnableSplitTableRegion = uint32(0)
)

// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
type DDL interface {
	CreateSchema(ctx sessionctx.Context, stmt *ast.CreateDatabaseStmt) error
	AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
	DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) error
	CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
	CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
	DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
	RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
	DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
	CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error
	DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error
	AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast.AlterTableStmt) error
	TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error
	RenameTable(ctx sessionctx.Context, stmt *ast.RenameTableStmt) error
	LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
	UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error
	CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error
	UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error
	RepairTable(ctx sessionctx.Context, table *ast.TableName, createStmt *ast.CreateTableStmt) error
	CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
	DropSequence(ctx sessionctx.Context, stmt *ast.DropSequenceStmt) (err error)
	AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error
	CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
	DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
	AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
	FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error

	// CreateSchemaWithInfo creates a database (schema) given its database info.
	//
	// WARNING: the DDL owns the `info` after calling this function, and will modify its fields
	// in-place. If you want to keep using `info`, please call Clone() first.
	CreateSchemaWithInfo(
		ctx sessionctx.Context,
		info *model.DBInfo,
		onExist OnExist) error

	// CreateTableWithInfo creates a table, view or sequence given its table info.
	//
	// WARNING: the DDL owns the `info` after calling this function, and will modify its fields
	// in-place. If you want to keep using `info`, please call Clone() first.
	CreateTableWithInfo(
		ctx sessionctx.Context,
		schema model.CIStr,
		info *model.TableInfo,
		onExist OnExist) error

	// BatchCreateTableWithInfo is like CreateTableWithInfo, but can handle multiple tables.
	BatchCreateTableWithInfo(ctx sessionctx.Context,
		schema model.CIStr,
		info []*model.TableInfo,
		onExist OnExist) error

	// CreatePlacementPolicyWithInfo creates a placement policy
	//
	// WARNING: the DDL owns the `policy` after calling this function, and will modify its fields
	// in-place. If you want to keep using `policy`, please call Clone() first.
	CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy *model.PolicyInfo, onExist OnExist) error

	// Start campaigns the owner and starts workers.
	// ctxPool is used for the worker's delRangeManager and creates sessions.
	Start(ctxPool *pools.ResourcePool) error
	// GetLease returns current schema lease time.
	GetLease() time.Duration
	// Stats returns the DDL statistics.
	Stats(vars *variable.SessionVars) (map[string]interface{}, error)
	// GetScope gets the status variables scope.
	GetScope(status string) variable.ScopeFlag
	// Stop stops DDL worker.
	Stop() error
	// RegisterStatsHandle registers statistics handle and its corresponding event channel for ddl.
	RegisterStatsHandle(*handle.Handle)
	// SchemaSyncer gets the schema syncer.
	SchemaSyncer() syncer.SchemaSyncer
	// OwnerManager gets the owner manager.
	OwnerManager() owner.Manager
	// GetID gets the ddl ID.
	GetID() string
	// GetTableMaxHandle gets the max row ID of a normal table or a partition.
	GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
	// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
	SetBinlogClient(*pumpcli.PumpsClient)
	// GetHook gets the hook. It's exported for testing.
	GetHook() Callback
	// SetHook sets the hook.
	SetHook(h Callback)
	// GetInfoSchemaWithInterceptor gets the infoschema binding to d. It's exported for testing.
	GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema
	// DoDDLJob does the DDL job, it's exported for test.
	DoDDLJob(ctx sessionctx.Context, job *model.Job) error
	// MoveJobFromQueue2Table move existing DDLs from queue to table.
	MoveJobFromQueue2Table(bool) error
	// MoveJobFromTable2Queue move existing DDLs from table to queue.
	MoveJobFromTable2Queue() error
}

type limitJobTask struct {
	job *model.Job
	err chan error
}

// ddl is used to handle the statements that define the structure or schema of the database.
type ddl struct {
	m          sync.RWMutex
	wg         tidbutil.WaitGroupWrapper // It's only used to deal with data race in restart_test.
	limitJobCh chan *limitJobTask

	*ddlCtx
	workers           map[workerType]*worker
	sessPool          *sessionPool
	delRangeMgr       delRangeManager
	enableTiFlashPoll *atomicutil.Bool
	// used in the concurrency ddl.
	reorgWorkerPool      *workerPool
	generalDDLWorkerPool *workerPool
	// get notification if any DDL coming.
	ddlJobCh chan struct{}
}

// waitSchemaSyncedController is to control whether to waitSchemaSynced or not.
type waitSchemaSyncedController struct {
	mu  sync.RWMutex
	job map[int64]struct{}

	// true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job.
	once *atomicutil.Bool
}

func newWaitSchemaSyncedController() *waitSchemaSyncedController {
	return &waitSchemaSyncedController{
		job:  make(map[int64]struct{}, jobRecordCapacity),
		once: atomicutil.NewBool(true),
	}
}

func (w *waitSchemaSyncedController) registerSync(job *model.Job) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.job[job.ID] = struct{}{}
}

func (w *waitSchemaSyncedController) isSynced(job *model.Job) bool {
	w.mu.RLock()
	defer w.mu.RUnlock()
	_, ok := w.job[job.ID]
	return !ok
}

func (w *waitSchemaSyncedController) synced(job *model.Job) {
	w.mu.Lock()
	defer w.mu.Unlock()
	delete(w.job, job.ID)
}

// ddlCtx is the context when we use worker to handle DDL jobs.
type ddlCtx struct {
	ctx          context.Context
	cancel       context.CancelFunc
	uuid         string
	store        kv.Storage
	ownerManager owner.Manager
	schemaSyncer syncer.SchemaSyncer
	ddlJobDoneCh chan struct{}
	ddlEventCh   chan<- *util.Event
	lease        time.Duration        // lease is schema lease.
	binlogCli    *pumpcli.PumpsClient // binlogCli is used for Binlog.
	infoCache    *infoschema.InfoCache
	statsHandle  *handle.Handle
	tableLockCkr util.DeadTableLockChecker
	etcdCli      *clientv3.Client

	*waitSchemaSyncedController
	*schemaVersionManager
	// recording the running jobs.
	runningJobs struct {
		sync.RWMutex
		ids map[int64]struct{}
	}
	// It holds the running DDL jobs ID.
	runningJobIDs []string
	// reorgCtx is used for reorganization.
	reorgCtx struct {
		sync.RWMutex
		// reorgCtxMap maps job ID to reorg context.
		reorgCtxMap map[int64]*reorgCtx
	}

	jobCtx struct {
		sync.RWMutex
		// jobCtxMap maps job ID to job's ctx.
		jobCtxMap map[int64]*JobContext
	}

	// hook may be modified.
	mu struct {
		sync.RWMutex
		hook        Callback
		interceptor Interceptor
	}

	ddlSeqNumMu struct {
		sync.Mutex
		seqNum uint64
	}

	waiting *atomicutil.Bool
}

// schemaVersionManager is used to manage the schema version. To prevent the conflicts on this key between different DDL job,
// we use another transaction to update the schema version, so that we need to lock the schema version and unlock it until the job is committed.
type schemaVersionManager struct {
	schemaVersionMu sync.Mutex
	// lockOwner stores the job ID that is holding the lock.
	lockOwner atomicutil.Int64
}

func newSchemaVersionManager() *schemaVersionManager {
	return &schemaVersionManager{}
}

func (sv *schemaVersionManager) setSchemaVersion(job *model.Job, store kv.Storage) (schemaVersion int64, err error) {
	sv.lockSchemaVersion(job.ID)
	err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
		var err error
		m := meta.NewMeta(txn)
		schemaVersion, err = m.GenSchemaVersion()
		return err
	})
	return schemaVersion, err
}

// lockSchemaVersion gets the lock to prevent the schema version from being updated.
func (sv *schemaVersionManager) lockSchemaVersion(jobID int64) {
	ownerID := sv.lockOwner.Load()
	// There may exist one job update schema version many times in multiple-schema-change, so we do not lock here again
	// if they are the same job.
	if ownerID != jobID {
		sv.schemaVersionMu.Lock()
		sv.lockOwner.Store(jobID)
	}
}

// unlockSchemaVersion releases the lock.
func (sv *schemaVersionManager) unlockSchemaVersion(jobID int64) {
	ownerID := sv.lockOwner.Load()
	if ownerID == jobID {
		sv.lockOwner.Store(0)
		sv.schemaVersionMu.Unlock()
	}
}

func (dc *ddlCtx) isOwner() bool {
	isOwner := dc.ownerManager.IsOwner()
	logutil.BgLogger().Debug("[ddl] check whether is the DDL owner", zap.Bool("isOwner", isOwner), zap.String("selfID", dc.uuid))
	if isOwner {
		metrics.DDLCounter.WithLabelValues(metrics.DDLOwner + "_" + mysql.TiDBReleaseVersion).Inc()
	}
	return isOwner
}

func (dc *ddlCtx) setDDLLabelForTopSQL(job *model.Job) {
	dc.jobCtx.Lock()
	defer dc.jobCtx.Unlock()
	ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
	if !exists {
		ctx = NewJobContext()
		dc.jobCtx.jobCtxMap[job.ID] = ctx
	}
	ctx.setDDLLabelForTopSQL(job)
}

func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) {
	dc.jobCtx.Lock()
	defer dc.jobCtx.Unlock()
	ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
	if !exists {
		ctx = NewJobContext()
		ctx.setDDLLabelForDiagnosis(job)
		dc.jobCtx.jobCtxMap[job.ID] = ctx
	}
}

func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger {
	dc.jobCtx.Lock()
	defer dc.jobCtx.Unlock()
	ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
	if !exists {
		return nil
	}
	return ctx.getResourceGroupTaggerForTopSQL()
}

func (dc *ddlCtx) removeJobCtx(job *model.Job) {
	dc.jobCtx.Lock()
	defer dc.jobCtx.Unlock()
	delete(dc.jobCtx.jobCtxMap, job.ID)
}

func (dc *ddlCtx) jobContext(job *model.Job) *JobContext {
	dc.jobCtx.RLock()
	defer dc.jobCtx.RUnlock()
	if jobContext, exists := dc.jobCtx.jobCtxMap[job.ID]; exists {
		return jobContext
	}
	return NewJobContext()
}

func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx {
	dc.reorgCtx.RLock()
	defer dc.reorgCtx.RUnlock()
	return dc.reorgCtx.reorgCtxMap[job.ID]
}

func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx {
	rc := &reorgCtx{}
	rc.doneCh = make(chan error, 1)
	// initial reorgCtx
	rc.setRowCount(r.Job.GetRowCount())
	rc.setNextKey(r.StartKey)
	rc.setCurrentElement(r.currElement)
	rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
	rc.mu.warningsCount = make(map[errors.ErrorID]int64)
	dc.reorgCtx.Lock()
	defer dc.reorgCtx.Unlock()
	dc.reorgCtx.reorgCtxMap[r.Job.ID] = rc
	return rc
}

func (dc *ddlCtx) removeReorgCtx(job *model.Job) {
	dc.reorgCtx.Lock()
	defer dc.reorgCtx.Unlock()
	delete(dc.reorgCtx.reorgCtxMap, job.ID)
}

func (dc *ddlCtx) notifyReorgCancel(job *model.Job) {
	rc := dc.getReorgCtx(job)
	if rc == nil {
		return
	}
	rc.notifyReorgCancel()
}

// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
func EnableTiFlashPoll(d interface{}) {
	if dd, ok := d.(*ddl); ok {
		dd.enableTiFlashPoll.Store(true)
	}
}

// DisableTiFlashPoll disables TiFlash poll loop aka PollTiFlashReplicaStatus.
func DisableTiFlashPoll(d interface{}) {
	if dd, ok := d.(*ddl); ok {
		dd.enableTiFlashPoll.Store(false)
	}
}

// IsTiFlashPollEnabled reveals enableTiFlashPoll
func (d *ddl) IsTiFlashPollEnabled() bool {
	return d.enableTiFlashPoll.Load()
}

// RegisterStatsHandle registers statistics handle and its corresponding even channel for ddl.
func (d *ddl) RegisterStatsHandle(h *handle.Handle) {
	d.ddlCtx.statsHandle = h
	d.ddlEventCh = h.DDLEventCh()
}

// asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may
// give up notify and log it.
func asyncNotifyEvent(d *ddlCtx, e *util.Event) {
	if d.ddlEventCh != nil {
		if d.lease == 0 {
			// If lease is 0, it's always used in test.
			select {
			case d.ddlEventCh <- e:
			default:
			}
			return
		}
		for i := 0; i < 10; i++ {
			select {
			case d.ddlEventCh <- e:
				return
			default:
				time.Sleep(time.Microsecond * 10)
			}
		}
		logutil.BgLogger().Warn("[ddl] fail to notify DDL event", zap.String("event", e.String()))
	}
}

// NewDDL creates a new DDL.
func NewDDL(ctx context.Context, options ...Option) DDL {
	return newDDL(ctx, options...)
}

func newDDL(ctx context.Context, options ...Option) *ddl {
	opt := &Options{
		Hook: &BaseCallback{},
	}
	for _, o := range options {
		o(opt)
	}

	id := uuid.New().String()
	var manager owner.Manager
	var schemaSyncer syncer.SchemaSyncer
	var deadLockCkr util.DeadTableLockChecker
	if etcdCli := opt.EtcdCli; etcdCli == nil {
		// The etcdCli is nil if the store is localstore which is only used for testing.
		// So we use mockOwnerManager and MockSchemaSyncer.
		manager = owner.NewMockManager(ctx, id)
		schemaSyncer = NewMockSchemaSyncer()
	} else {
		manager = owner.NewOwnerManager(ctx, etcdCli, ddlPrompt, id, DDLOwnerKey)
		schemaSyncer = syncer.NewSchemaSyncer(etcdCli, id)
		deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
	}

	// TODO: make store and infoCache explicit arguments
	// these two should be ensured to exist
	if opt.Store == nil {
		panic("store should not be nil")
	}
	if opt.InfoCache == nil {
		panic("infoCache should not be nil")
	}

	ddlCtx := &ddlCtx{
		uuid:                       id,
		store:                      opt.Store,
		lease:                      opt.Lease,
		ddlJobDoneCh:               make(chan struct{}, 1),
		ownerManager:               manager,
		schemaSyncer:               schemaSyncer,
		binlogCli:                  binloginfo.GetPumpsClient(),
		infoCache:                  opt.InfoCache,
		tableLockCkr:               deadLockCkr,
		etcdCli:                    opt.EtcdCli,
		schemaVersionManager:       newSchemaVersionManager(),
		waitSchemaSyncedController: newWaitSchemaSyncedController(),
		runningJobIDs:              make([]string, 0, jobRecordCapacity),
	}
	ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx)
	ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext)
	ddlCtx.mu.hook = opt.Hook
	ddlCtx.mu.interceptor = &BaseInterceptor{}
	ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
	ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx)
	ddlCtx.runningJobs.ids = make(map[int64]struct{})
	ddlCtx.waiting = atomicutil.NewBool(false)

	d := &ddl{
		ddlCtx:            ddlCtx,
		limitJobCh:        make(chan *limitJobTask, batchAddingJobs),
		enableTiFlashPoll: atomicutil.NewBool(true),
		ddlJobCh:          make(chan struct{}, 100),
	}

	// Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`.
	variable.EnableDDL = d.EnableDDL
	variable.DisableDDL = d.DisableDDL
	variable.SwitchConcurrentDDL = d.SwitchConcurrentDDL
	variable.SwitchMDL = d.SwitchMDL

	return d
}

// Stop implements DDL.Stop interface.
func (d *ddl) Stop() error {
	d.m.Lock()
	defer d.m.Unlock()

	d.close()
	logutil.BgLogger().Info("[ddl] stop DDL", zap.String("ID", d.uuid))
	return nil
}

func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {
	var delRangeMgr delRangeManager
	if !mock {
		delRangeMgr = newDelRangeManager(d.store, d.sessPool)
		logutil.BgLogger().Info("[ddl] start delRangeManager OK", zap.Bool("is a emulator", !d.store.SupportDeleteRange()))
	} else {
		delRangeMgr = newMockDelRangeManager()
	}

	delRangeMgr.start()
	return delRangeMgr
}

func (d *ddl) prepareWorkers4ConcurrencyDDL() {
	workerFactory := func(tp workerType) func() (pools.Resource, error) {
		return func() (pools.Resource, error) {
			wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx, true)
			sessForJob, err := d.sessPool.get()
			if err != nil {
				return nil, err
			}
			sessForJob.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
			wk.sess = newSession(sessForJob)
			metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
			return wk, nil
		}
	}
	// reorg worker count at least 1 at most 10.
	reorgCnt := mathutil.Min(mathutil.Max(runtime.GOMAXPROCS(0)/4, 1), reorgWorkerCnt)
	d.reorgWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(addIdxWorker), reorgCnt, reorgCnt, 0), reorg)
	d.generalDDLWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(generalWorker), generalWorkerCnt, generalWorkerCnt, 0), general)
	failpoint.Inject("NoDDLDispatchLoop", func(val failpoint.Value) {
		if val.(bool) {
			failpoint.Return()
		}
	})
	d.wg.Run(d.startDispatchLoop)
}

func (d *ddl) prepareWorkers4legacyDDL() {
	d.workers = make(map[workerType]*worker, 2)
	d.workers[generalWorker] = newWorker(d.ctx, generalWorker, d.sessPool, d.delRangeMgr, d.ddlCtx, false)
	d.workers[addIdxWorker] = newWorker(d.ctx, addIdxWorker, d.sessPool, d.delRangeMgr, d.ddlCtx, false)
	for _, worker := range d.workers {
		worker.wg.Add(1)
		w := worker
		go w.start(d.ddlCtx)

		metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, worker.String())).Inc()

		// When the start function is called, we will send a fake job to let worker
		// checks owner firstly and try to find whether a job exists and run.
		asyncNotify(worker.ddlJobCh)
	}
}

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
	logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()))

	d.wg.Run(d.limitDDLJobs)
	d.sessPool = newSessionPool(ctxPool, d.store)
	d.ownerManager.SetBeOwnerHook(func() {
		var err error
		d.ddlSeqNumMu.seqNum, err = d.GetNextDDLSeqNum()
		if err != nil {
			logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
		}
	})

	d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)

	d.prepareWorkers4ConcurrencyDDL()
	d.prepareWorkers4legacyDDL()

	if config.TableLockEnabled() {
		d.wg.Add(1)
		go d.startCleanDeadTableLock()
	}

	// If tidb_enable_ddl is true, we need campaign owner and do DDL job.
	// Otherwise, we needn't do that.
	if config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() {
		if err := d.EnableDDL(); err != nil {
			return err
		}
	}

	variable.RegisterStatistics(d)

	metrics.DDLCounter.WithLabelValues(metrics.CreateDDLInstance).Inc()

	// Start some background routine to manage TiFlash replica.
	d.wg.Run(d.PollTiFlashRoutine)

	ingest.InitGlobalLightningEnv()

	return nil
}

// EnableDDL enable this node to execute ddl.
// Since ownerManager.CampaignOwner will start a new goroutine to run ownerManager.campaignLoop,
// we should make sure that before invoking EnableDDL(), ddl is DISABLE.
func (d *ddl) EnableDDL() error {
	err := d.ownerManager.CampaignOwner()
	return errors.Trace(err)
}

// DisableDDL disable this node to execute ddl.
// We should make sure that before invoking DisableDDL(), ddl is ENABLE.
func (d *ddl) DisableDDL() error {
	if d.ownerManager.IsOwner() {
		// If there is only one node, we should NOT disable ddl.
		serverInfo, err := infosync.GetAllServerInfo(d.ctx)
		if err != nil {
			logutil.BgLogger().Error("[ddl] error when GetAllServerInfo", zap.Error(err))
			return err
		}
		if len(serverInfo) <= 1 {
			return dbterror.ErrDDLSetting.GenWithStackByArgs("can not disable ddl when there is only one instance")
		}
		// FIXME: if possible, when this node is the only node with DDL, ths setting of DisableDDL should fail.
	}

	// disable campaign by interrupting campaignLoop
	d.ownerManager.CampaignCancel()
	return nil
}

// GetNextDDLSeqNum return the next DDL seq num.
func (d *ddl) GetNextDDLSeqNum() (uint64, error) {
	var count uint64
	ctx := kv.WithInternalSourceType(d.ctx, kv.InternalTxnDDL)
	err := kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
		t := meta.NewMeta(txn)
		var err error
		count, err = t.GetHistoryDDLCount()
		return err
	})
	return count, err
}

func (d *ddl) close() {
	if isChanClosed(d.ctx.Done()) {
		return
	}

	startTime := time.Now()
	d.cancel()
	d.wg.Wait()
	d.ownerManager.Cancel()
	d.schemaSyncer.Close()
	if d.reorgWorkerPool != nil {
		d.reorgWorkerPool.close()
	}
	if d.generalDDLWorkerPool != nil {
		d.generalDDLWorkerPool.close()
	}

	for _, worker := range d.workers {
		worker.Close()
	}
	// d.delRangeMgr using sessions from d.sessPool.
	// Put it before d.sessPool.close to reduce the time spent by d.sessPool.close.
	if d.delRangeMgr != nil {
		d.delRangeMgr.clear()
	}
	if d.sessPool != nil {
		d.sessPool.close()
	}
	variable.UnregisterStatistics(d)

	logutil.BgLogger().Info("[ddl] DDL closed", zap.String("ID", d.uuid), zap.Duration("take time", time.Since(startTime)))
}

// GetLease implements DDL.GetLease interface.
func (d *ddl) GetLease() time.Duration {
	lease := d.lease
	return lease
}

// GetInfoSchemaWithInterceptor gets the infoschema binding to d. It's exported for testing.
// Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead.
// Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever.
func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema {
	is := d.infoCache.GetLatest()

	d.mu.RLock()
	defer d.mu.RUnlock()
	return d.mu.interceptor.OnGetInfoSchema(ctx, is)
}

func (d *ddl) genGlobalIDs(count int) ([]int64, error) {
	var ret []int64
	ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
	err := kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
		failpoint.Inject("mockGenGlobalIDFail", func(val failpoint.Value) {
			if val.(bool) {
				failpoint.Return(errors.New("gofail genGlobalIDs error"))
			}
		})

		m := meta.NewMeta(txn)
		var err error
		ret, err = m.GenGlobalIDs(count)
		return err
	})

	return ret, err
}

func (d *ddl) genPlacementPolicyID() (int64, error) {
	var ret int64
	ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
	err := kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
		m := meta.NewMeta(txn)
		var err error
		ret, err = m.GenPlacementPolicyID()
		return err
	})

	return ret, err
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() syncer.SchemaSyncer {
	return d.schemaSyncer
}

// OwnerManager implements DDL.OwnerManager interface.
func (d *ddl) OwnerManager() owner.Manager {
	return d.ownerManager
}

// GetID implements DDL.GetID interface.
func (d *ddl) GetID() string {
	return d.uuid
}

var (
	fastDDLIntervalPolicy = []time.Duration{
		500 * time.Millisecond,
	}
	normalDDLIntervalPolicy = []time.Duration{
		500 * time.Millisecond,
		500 * time.Millisecond,
		1 * time.Second,
	}
	slowDDLIntervalPolicy = []time.Duration{
		500 * time.Millisecond,
		500 * time.Millisecond,
		1 * time.Second,
		1 * time.Second,
		3 * time.Second,
	}
)

func getIntervalFromPolicy(policy []time.Duration, i int) (time.Duration, bool) {
	plen := len(policy)
	if i < plen {
		return policy[i], true
	}
	return policy[plen-1], false
}

func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) {
	switch job.Type {
	case model.ActionAddIndex, model.ActionAddPrimaryKey, model.ActionModifyColumn:
		return getIntervalFromPolicy(slowDDLIntervalPolicy, i)
	case model.ActionCreateTable, model.ActionCreateSchema:
		return getIntervalFromPolicy(fastDDLIntervalPolicy, i)
	default:
		return getIntervalFromPolicy(normalDDLIntervalPolicy, i)
	}
}

func (d *ddl) asyncNotifyWorker(job *model.Job) {
	// If the workers don't run, we needn't notify workers.
	if !config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() {
		return
	}
	if variable.EnableConcurrentDDL.Load() {
		if d.isOwner() {
			asyncNotify(d.ddlJobCh)
		} else {
			d.asyncNotifyByEtcd(addingDDLJobConcurrent, job)
		}
	} else {
		var worker *worker
		if job.MayNeedReorg() {
			worker = d.workers[addIdxWorker]
		} else {
			worker = d.workers[generalWorker]
		}
		if d.ownerManager.IsOwner() {
			asyncNotify(worker.ddlJobCh)
		} else {
			d.asyncNotifyByEtcd(worker.addingDDLJobKey, job)
		}
	}
}

func updateTickerInterval(ticker *time.Ticker, lease time.Duration, job *model.Job, i int) *time.Ticker {
	interval, changed := getJobCheckInterval(job, i)
	if !changed {
		return ticker
	}
	// For now we should stop old ticker and create a new ticker
	ticker.Stop()
	return time.NewTicker(chooseLeaseTime(lease, interval))
}

func recordLastDDLInfo(ctx sessionctx.Context, job *model.Job) {
	if job == nil {
		return
	}
	ctx.GetSessionVars().LastDDLInfo.Query = job.Query
	ctx.GetSessionVars().LastDDLInfo.SeqNum = job.SeqNum
}

func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) {
	switch job.Type {
	case model.ActionUpdateTiFlashReplicaStatus, model.ActionUnlockTable:
		job.Query = ""
	default:
		job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
	}
}

// DoDDLJob will return
// - nil: found in history DDL job and no job error
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
	if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
		// In multiple schema change, we don't run the job.
		// Instead, we merge all the jobs into one pending job.
		return appendToSubJobs(mci, job)
	}

	// Get a global job ID and put the DDL job in the queue.
	setDDLJobQuery(ctx, job)
	task := &limitJobTask{job, make(chan error)}
	d.limitJobCh <- task

	failpoint.Inject("mockParallelSameDDLJobTwice", func(val failpoint.Value) {
		if val.(bool) {
			// The same job will be put to the DDL queue twice.
			job = job.Clone()
			task1 := &limitJobTask{job, make(chan error)}
			d.limitJobCh <- task1
			<-task.err
			// The second job result is used for test.
			task = task1
		}
	})

	// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
	err := <-task.err
	if err != nil {
		// The transaction of enqueuing job is failed.
		return errors.Trace(err)
	}

	sessVars := ctx.GetSessionVars()
	sessVars.StmtCtx.IsDDLJobInQueue = true

	// Notice worker that we push a new job and wait the job done.
	d.asyncNotifyWorker(job)
	logutil.BgLogger().Info("[ddl] start DDL job", zap.String("job", job.String()), zap.String("query", job.Query))

	var historyJob *model.Job
	jobID := job.ID

	// Attach the context of the jobId to the calling session so that
	// KILL can cancel this DDL job.
	ctx.GetSessionVars().StmtCtx.DDLJobID = jobID

	// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
	// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
	// But we use etcd to speed up, normally it takes less than 0.5s now, so we use 0.5s or 1s or 3s as the max value.
	initInterval, _ := getJobCheckInterval(job, 0)
	ticker := time.NewTicker(chooseLeaseTime(10*d.lease, initInterval))
	startTime := time.Now()
	metrics.JobsGauge.WithLabelValues(job.Type.String()).Inc()
	defer func() {
		ticker.Stop()
		metrics.JobsGauge.WithLabelValues(job.Type.String()).Dec()
		metrics.HandleJobHistogram.WithLabelValues(job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
		recordLastDDLInfo(ctx, historyJob)
	}()
	i := 0
	for {
		failpoint.Inject("storeCloseInLoop", func(_ failpoint.Value) {
			_ = d.Stop()
		})

		select {
		case <-d.ddlJobDoneCh:
		case <-ticker.C:
			i++
			ticker = updateTickerInterval(ticker, 10*d.lease, job, i)
		case <-d.ctx.Done():
			logutil.BgLogger().Info("[ddl] DoDDLJob will quit because context done")
			return context.Canceled
		}

		// If the connection being killed, we need to CANCEL the DDL job.
		if atomic.LoadUint32(&sessVars.Killed) == 1 {
			if sessVars.StmtCtx.DDLJobID != 0 {
				se, err := d.sessPool.get()
				if err != nil {
					logutil.BgLogger().Error("[ddl] get session failed, check again", zap.Error(err))
					continue
				}
				sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat.
				errs, err := CancelJobs(se, d.store, []int64{jobID})
				d.sessPool.put(se)
				if len(errs) > 0 {
					logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
				}
				if err != nil {
					logutil.BgLogger().Warn("Kill command could not cancel DDL job", zap.Error(err))
					continue
				}
			}
		}

		se, err := d.sessPool.get()
		if err != nil {
			logutil.BgLogger().Error("[ddl] get session failed, check again", zap.Error(err))
			continue
		}
		historyJob, err = GetHistoryJobByID(se, jobID)
		d.sessPool.put(se)
		if err != nil {
			logutil.BgLogger().Error("[ddl] get history DDL job failed, check again", zap.Error(err))
			continue
		}
		if historyJob == nil {
			logutil.BgLogger().Debug("[ddl] DDL job is not in history, maybe not run", zap.Int64("jobID", jobID))
			continue
		}

		d.checkHistoryJobInTest(ctx, historyJob)

		// If a job is a history job, the state must be JobStateSynced or JobStateRollbackDone or JobStateCancelled.
		if historyJob.IsSynced() {
			// Judge whether there are some warnings when executing DDL under the certain SQL mode.
			if historyJob.ReorgMeta != nil && len(historyJob.ReorgMeta.Warnings) != 0 {
				if len(historyJob.ReorgMeta.Warnings) != len(historyJob.ReorgMeta.WarningsCount) {
					logutil.BgLogger().Info("[ddl] DDL warnings doesn't match the warnings count", zap.Int64("jobID", jobID))
				} else {
					for key, warning := range historyJob.ReorgMeta.Warnings {
						for j := int64(0); j < historyJob.ReorgMeta.WarningsCount[key]; j++ {
							ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
						}
					}
				}
			}
			appendMultiChangeWarningsToOwnerCtx(ctx, historyJob)

			logutil.BgLogger().Info("[ddl] DDL job is finished", zap.Int64("jobID", jobID))
			return nil
		}

		if historyJob.Error != nil {
			logutil.BgLogger().Info("[ddl] DDL job is failed", zap.Int64("jobID", jobID))
			return errors.Trace(historyJob.Error)
		}
		panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
	}
}

func (d *ddl) callHookOnChanged(job *model.Job, err error) error {
	if job.State == model.JobStateNone {
		// We don't call the hook if the job haven't run yet.
		return err
	}
	d.mu.RLock()
	defer d.mu.RUnlock()

	err = d.mu.hook.OnChanged(err)
	return errors.Trace(err)
}

// SetBinlogClient implements DDL.SetBinlogClient interface.
func (d *ddl) SetBinlogClient(binlogCli *pumpcli.PumpsClient) {
	d.binlogCli = binlogCli
}

// GetHook implements DDL.GetHook interface.
func (d *ddl) GetHook() Callback {
	d.mu.Lock()
	defer d.mu.Unlock()

	return d.mu.hook
}

// SetHook set the customized hook.
func (d *ddl) SetHook(h Callback) {
	d.mu.Lock()
	defer d.mu.Unlock()

	d.mu.hook = h
}

func (d *ddl) startCleanDeadTableLock() {
	defer func() {
		tidbutil.Recover(metrics.LabelDDL, "startCleanDeadTableLock", nil, false)
		d.wg.Done()
	}()

	ticker := time.NewTicker(time.Second * 10)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			if !d.ownerManager.IsOwner() {
				continue
			}
			deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.GetLatest().AllSchemas())
			if err != nil {
				logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err))
				continue
			}
			for se, tables := range deadLockTables {
				err := d.CleanDeadTableLock(tables, se)
				if err != nil {
					logutil.BgLogger().Info("[ddl] clean dead table lock failed.", zap.Error(err))
				}
			}
		case <-d.ctx.Done():
			return
		}
	}
}

// SwitchConcurrentDDL changes the DDL to concurrent DDL if toConcurrentDDL is true, otherwise, queue based DDL.
func (d *ddl) SwitchConcurrentDDL(toConcurrentDDL bool) error {
	if !d.isOwner() {
		return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), d.store, true, func(ctx context.Context, txn kv.Transaction) error {
			isConcurrentDDL, err := meta.NewMeta(txn).IsConcurrentDDL()
			if err != nil {
				return err
			}
			if isConcurrentDDL != toConcurrentDDL {
				return errors.New("please set it on the DDL owner node")
			}
			return nil
		})
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
	defer cancel()
	d.waiting.Store(true)
	defer d.waiting.Store(false)
	if err := d.wait4Switch(ctx); err != nil {
		return err
	}

	var err error
	if toConcurrentDDL {
		err = d.MoveJobFromQueue2Table(false)
	} else {
		err = d.MoveJobFromTable2Queue()
	}
	if err == nil {
		variable.EnableConcurrentDDL.Store(toConcurrentDDL)
	}
	logutil.BgLogger().Info("[ddl] SwitchConcurrentDDL", zap.Bool("toConcurrentDDL", toConcurrentDDL), zap.Error(err))
	return err
}

// SwitchMDL enables MDL or disable DDL.
func (d *ddl) SwitchMDL(enable bool) error {
	isEnableBefore := variable.EnableMDL.Load()
	if isEnableBefore == enable {
		return nil
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
	defer cancel()

	// Check if there is any DDL running.
	// This check can not cover every corner cases, so users need to guarantee that there is no DDL running by themselves.
	sess, err := d.sessPool.get()
	if err != nil {
		return err
	}
	defer d.sessPool.put(sess)
	se := newSession(sess)
	rows, err := se.execute(ctx, "select 1 from mysql.tidb_ddl_job", "check job")
	if err != nil {
		return err
	}
	if len(rows) != 0 {
		return errors.New("please wait for all jobs done")
	}

	variable.EnableMDL.Store(enable)
	logutil.BgLogger().Info("[ddl] switch metadata lock feature", zap.Bool("enable", enable), zap.Error(err))
	return nil
}

func (d *ddl) wait4Switch(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		d.runningJobs.RLock()
		if len(d.runningJobs.ids) == 0 {
			d.runningJobs.RUnlock()
			return nil
		}
		d.runningJobs.RUnlock()
		time.Sleep(time.Second * 1)
	}
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
	SchemaID      int64
	TableInfo     *model.TableInfo
	DropJobID     int64
	SnapshotTS    uint64
	AutoIDs       meta.AutoIDGroup
	OldSchemaName string
	OldTableName  string
}

// delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes.
// It should be called before any DDL that could break data consistency.
// This provides a safe window for async commit and 1PC to commit with an old schema.
func delayForAsyncCommit() {
	cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
	duration := cfg.SafeWindow + cfg.AllowedClockDrift
	logutil.BgLogger().Info("sleep before DDL finishes to make async commit and 1PC safe",
		zap.Duration("duration", duration))
	time.Sleep(duration)
}

var (
	// RunInGoTest is used to identify whether ddl in running in the test.
	RunInGoTest bool
)

func init() {
	if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
		RunInGoTest = true
	}
}

// GetDropOrTruncateTableInfoFromJobsByStore implements GetDropOrTruncateTableInfoFromJobs
func GetDropOrTruncateTableInfoFromJobsByStore(jobs []*model.Job, gcSafePoint uint64, getTable func(uint64, int64, int64) (*model.TableInfo, error), fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
	for _, job := range jobs {
		// Check GC safe point for getting snapshot infoSchema.
		err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
		if err != nil {
			return false, err
		}
		if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
			continue
		}

		tbl, err := getTable(job.StartTS, job.SchemaID, job.TableID)
		if err != nil {
			if meta.ErrDBNotExists.Equal(err) {
				// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
				// then can't find the table from the snapshot info-schema. Should just ignore error here,
				// see more in TestParallelDropSchemaAndDropTable.
				continue
			}
			return false, err
		}
		if tbl == nil {
			// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
			// then can't find the table from the snapshot info-schema. Should just ignore error here,
			// see more in TestParallelDropSchemaAndDropTable.
			continue
		}
		finish, err := fn(job, tbl)
		if err != nil || finish {
			return finish, err
		}
	}
	return false, nil
}

// Info is for DDL information.
type Info struct {
	SchemaVer   int64
	ReorgHandle kv.Key       // It's only used for DDL information.
	Jobs        []*model.Job // It's the currently running jobs.
}

// GetDDLInfoWithNewTxn returns DDL information using a new txn.
func GetDDLInfoWithNewTxn(s sessionctx.Context) (*Info, error) {
	sess := newSession(s)
	err := sess.begin()
	if err != nil {
		return nil, err
	}
	info, err := GetDDLInfo(s)
	sess.rollback()
	return info, err
}

// GetDDLInfo returns DDL information.
func GetDDLInfo(s sessionctx.Context) (*Info, error) {
	var err error
	info := &Info{}
	sess := newSession(s)
	txn, err := sess.txn()
	if err != nil {
		return nil, errors.Trace(err)
	}
	t := meta.NewMeta(txn)
	info.Jobs = make([]*model.Job, 0, 2)
	enable := variable.EnableConcurrentDDL.Load()
	var generalJob, reorgJob *model.Job
	if enable {
		generalJob, reorgJob, err = get2JobsFromTable(sess)
	} else {
		generalJob, reorgJob, err = get2JobsFromQueue(t)
	}
	if err != nil {
		return nil, errors.Trace(err)
	}

	if generalJob != nil {
		info.Jobs = append(info.Jobs, generalJob)
	}

	if reorgJob != nil {
		info.Jobs = append(info.Jobs, reorgJob)
	}

	info.SchemaVer, err = t.GetSchemaVersionWithNonEmptyDiff()
	if err != nil {
		return nil, errors.Trace(err)
	}
	if reorgJob == nil {
		return info, nil
	}

	_, info.ReorgHandle, _, _, err = newReorgHandler(t, sess, enable).GetDDLReorgHandle(reorgJob)
	if err != nil {
		if meta.ErrDDLReorgElementNotExist.Equal(err) {
			return info, nil
		}
		return nil, errors.Trace(err)
	}

	return info, nil
}

func get2JobsFromQueue(t *meta.Meta) (*model.Job, *model.Job, error) {
	generalJob, err := t.GetDDLJobByIdx(0)
	if err != nil {
		return nil, nil, errors.Trace(err)
	}
	reorgJob, err := t.GetDDLJobByIdx(0, meta.AddIndexJobListKey)
	if err != nil {
		return nil, nil, errors.Trace(err)
	}

	return generalJob, reorgJob, nil
}

func get2JobsFromTable(sess *session) (*model.Job, *model.Job, error) {
	var generalJob, reorgJob *model.Job
	jobs, err := getJobsBySQL(sess, JobTable, "not reorg order by job_id limit 1")
	if err != nil {
		return nil, nil, errors.Trace(err)
	}

	if len(jobs) != 0 {
		generalJob = jobs[0]
	}
	jobs, err = getJobsBySQL(sess, JobTable, "reorg order by job_id limit 1")
	if err != nil {
		return nil, nil, errors.Trace(err)
	}
	if len(jobs) != 0 {
		reorgJob = jobs[0]
	}
	return generalJob, reorgJob, nil
}

// CancelJobs cancels the DDL jobs.
func CancelJobs(se sessionctx.Context, store kv.Storage, ids []int64) (errs []error, err error) {
	if variable.EnableConcurrentDDL.Load() {
		return cancelConcurrencyJobs(se, ids)
	}

	err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
		errs, err = cancelLegacyJobs(txn, ids)
		return err
	})
	return
}

func cancelLegacyJobs(txn kv.Transaction, ids []int64) ([]error, error) {
	if len(ids) == 0 {
		return nil, nil
	}

	errs := make([]error, len(ids))
	t := meta.NewMeta(txn)
	generalJobs, err := getDDLJobsInQueue(t, meta.DefaultJobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	addIdxJobs, err := getDDLJobsInQueue(t, meta.AddIndexJobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	jobs := append(generalJobs, addIdxJobs...)
	jobsMap := make(map[int64]int)
	for i, id := range ids {
		jobsMap[id] = i
	}
	for j, job := range jobs {
		i, ok := jobsMap[job.ID]
		if !ok {
			logutil.BgLogger().Debug("the job that needs to be canceled isn't equal to current job",
				zap.Int64("need to canceled job ID", job.ID),
				zap.Int64("current job ID", job.ID))
			continue
		}
		delete(jobsMap, job.ID)
		// These states can't be cancelled.
		if job.IsDone() || job.IsSynced() {
			errs[i] = dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID)
			continue
		}
		// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
		if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
			continue
		}
		if !job.IsRollbackable() {
			errs[i] = dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
			continue
		}

		job.State = model.JobStateCancelling
		// Make sure RawArgs isn't overwritten.
		err := json.Unmarshal(job.RawArgs, &job.Args)
		if err != nil {
			errs[i] = errors.Trace(err)
			continue
		}
		if j >= len(generalJobs) {
			offset := int64(j - len(generalJobs))
			err = t.UpdateDDLJob(offset, job, true, meta.AddIndexJobListKey)
		} else {
			err = t.UpdateDDLJob(int64(j), job, true)
		}
		if err != nil {
			errs[i] = errors.Trace(err)
		}
	}
	for id, i := range jobsMap {
		errs[i] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id)
	}
	return errs, nil
}

// cancelConcurrencyJobs cancels the DDL jobs that are in the concurrent state.
func cancelConcurrencyJobs(se sessionctx.Context, ids []int64) ([]error, error) {
	failpoint.Inject("mockCancelConcurencyDDL", func(val failpoint.Value) {
		if val.(bool) {
			failpoint.Return(nil, errors.New("mock commit error"))
		}
	})
	if len(ids) == 0 {
		return nil, nil
	}
	var jobMap = make(map[int64]int) // jobID -> error index

	sess := newSession(se)
	err := sess.begin()
	if err != nil {
		return nil, err
	}

	idsStr := make([]string, 0, len(ids))
	for idx, id := range ids {
		jobMap[id] = idx
		idsStr = append(idsStr, strconv.FormatInt(id, 10))
	}

	jobs, err := getJobsBySQL(sess, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", ")))
	if err != nil {
		sess.rollback()
		return nil, err
	}

	errs := make([]error, len(ids))

	for _, job := range jobs {
		i, ok := jobMap[job.ID]
		if !ok {
			logutil.BgLogger().Debug("the job that needs to be canceled isn't equal to current job",
				zap.Int64("need to canceled job ID", job.ID),
				zap.Int64("current job ID", job.ID))
			continue
		}
		delete(jobMap, job.ID)
		// These states can't be cancelled.
		if job.IsDone() || job.IsSynced() {
			errs[i] = dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID)
			continue
		}
		// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
		if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
			continue
		}
		if !job.IsRollbackable() {
			errs[i] = dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
			continue
		}
		job.State = model.JobStateCancelling
		// Make sure RawArgs isn't overwritten.
		err := json.Unmarshal(job.RawArgs, &job.Args)
		if err != nil {
			errs[i] = errors.Trace(err)
			continue
		}
		err = updateDDLJob2Table(sess, job, true)
		if err != nil {
			errs[i] = errors.Trace(err)
		}
	}
	err = sess.commit()
	if err != nil {
		return nil, err
	}
	for id, idx := range jobMap {
		errs[idx] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id)
	}
	return errs, nil
}

func getDDLJobsInQueue(t *meta.Meta, jobListKey meta.JobListKeyType) ([]*model.Job, error) {
	cnt, err := t.DDLJobQueueLen(jobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	jobs := make([]*model.Job, cnt)
	for i := range jobs {
		jobs[i], err = t.GetDDLJobByIdx(int64(i), jobListKey)
		if err != nil {
			return nil, errors.Trace(err)
		}
	}
	return jobs, nil
}

// GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID.
func GetAllDDLJobs(sess sessionctx.Context, t *meta.Meta) ([]*model.Job, error) {
	if variable.EnableConcurrentDDL.Load() {
		return getJobsBySQL(newSession(sess), JobTable, "1 order by job_id")
	}

	return getDDLJobs(t)
}

// getDDLJobs get all DDL jobs and sorts jobs by job.ID.
func getDDLJobs(t *meta.Meta) ([]*model.Job, error) {
	generalJobs, err := getDDLJobsInQueue(t, meta.DefaultJobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	addIdxJobs, err := getDDLJobsInQueue(t, meta.AddIndexJobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	jobs := append(generalJobs, addIdxJobs...)
	slices.SortFunc(jobs, func(i, j *model.Job) bool {
		return i.ID < j.ID
	})
	return jobs, nil
}

// MaxHistoryJobs is exported for testing.
const MaxHistoryJobs = 10

// DefNumHistoryJobs is default value of the default number of history job
const DefNumHistoryJobs = 10

const batchNumHistoryJobs = 128

// GetLastNHistoryDDLJobs returns the DDL history jobs and an error.
// The maximum count of history jobs is num.
func GetLastNHistoryDDLJobs(t *meta.Meta, maxNumJobs int) ([]*model.Job, error) {
	iterator, err := GetLastHistoryDDLJobsIterator(t)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return iterator.GetLastJobs(maxNumJobs, nil)
}

// IterHistoryDDLJobs iterates history DDL jobs until the `finishFn` return true or error.
func IterHistoryDDLJobs(txn kv.Transaction, finishFn func([]*model.Job) (bool, error)) error {
	txnMeta := meta.NewMeta(txn)
	iter, err := GetLastHistoryDDLJobsIterator(txnMeta)
	if err != nil {
		return err
	}
	cacheJobs := make([]*model.Job, 0, DefNumHistoryJobs)
	for {
		cacheJobs, err = iter.GetLastJobs(DefNumHistoryJobs, cacheJobs)
		if err != nil || len(cacheJobs) == 0 {
			return err
		}
		finish, err := finishFn(cacheJobs)
		if err != nil || finish {
			return err
		}
	}
}

// IterAllDDLJobs will iterates running DDL jobs first, return directly if `finishFn` return true or error,
// then iterates history DDL jobs until the `finishFn` return true or error.
func IterAllDDLJobs(ctx sessionctx.Context, txn kv.Transaction, finishFn func([]*model.Job) (bool, error)) error {
	jobs, err := GetAllDDLJobs(ctx, meta.NewMeta(txn))
	if err != nil {
		return err
	}

	finish, err := finishFn(jobs)
	if err != nil || finish {
		return err
	}
	return IterHistoryDDLJobs(txn, finishFn)
}

// GetLastHistoryDDLJobsIterator gets latest N history DDL jobs iterator.
func GetLastHistoryDDLJobsIterator(m *meta.Meta) (meta.LastJobIterator, error) {
	return m.GetLastHistoryDDLJobsIterator()
}

// session wraps sessionctx.Context for transaction usage.
type session struct {
	sessionctx.Context
}

func newSession(s sessionctx.Context) *session {
	return &session{s}
}

func (s *session) begin() error {
	err := sessiontxn.NewTxn(context.Background(), s)
	if err != nil {
		return err
	}
	s.GetSessionVars().SetInTxn(true)
	return nil
}

func (s *session) commit() error {
	s.StmtCommit()
	return s.CommitTxn(context.Background())
}

func (s *session) txn() (kv.Transaction, error) {
	return s.Txn(true)
}

func (s *session) rollback() {
	s.StmtRollback()
	s.RollbackTxn(context.Background())
}

func (s *session) reset() {
	s.StmtRollback()
}

func (s *session) execute(ctx context.Context, query string, label string) ([]chunk.Row, error) {
	startTime := time.Now()
	var err error
	defer func() {
		metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
	}()
	rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(kv.WithInternalSourceType(ctx, kv.InternalTxnDDL), query)
	if err != nil {
		return nil, errors.Trace(err)
	}

	if rs == nil {
		return nil, nil
	}
	var rows []chunk.Row
	defer terror.Call(rs.Close)
	if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
		return nil, errors.Trace(err)
	}
	return rows, nil
}

func (s *session) session() sessionctx.Context {
	return s.Context
}

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
	iterator, err := GetLastHistoryDDLJobsIterator(m)
	if err != nil {
		return nil, errors.Trace(err)
	}
	allJobs := make([]*model.Job, 0, batchNumHistoryJobs)
	for {
		jobs, err := iterator.GetLastJobs(batchNumHistoryJobs, nil)
		if err != nil {
			return nil, errors.Trace(err)
		}
		allJobs = append(allJobs, jobs...)
		if len(jobs) < batchNumHistoryJobs {
			break
		}
	}
	// sort job.
	slices.SortFunc(allJobs, func(i, j *model.Job) bool {
		return i.ID < j.ID
	})
	return allJobs, nil
}

// ScanHistoryDDLJobs get some of the done DDL jobs.
// When the DDL history is quite large, GetAllHistoryDDLJobs() API can't work well, because it makes the server OOM.
// The result is in descending order by job ID.
func ScanHistoryDDLJobs(m *meta.Meta, startJobID int64, limit int) ([]*model.Job, error) {
	var iter meta.LastJobIterator
	var err error
	if startJobID == 0 {
		iter, err = m.GetLastHistoryDDLJobsIterator()
	} else {
		if limit == 0 {
			return nil, errors.New("when 'start_job_id' is specified, it must work with a 'limit'")
		}
		iter, err = m.GetHistoryDDLJobsIterator(startJobID)
	}
	if err != nil {
		return nil, errors.Trace(err)
	}
	return iter.GetLastJobs(limit, nil)
}

// GetHistoryJobByID return history DDL job by ID.
func GetHistoryJobByID(sess sessionctx.Context, id int64) (*model.Job, error) {
	err := sessiontxn.NewTxn(context.Background(), sess)
	if err != nil {
		return nil, err
	}
	defer func() {
		// we can ignore the commit error because this txn is readonly.
		_ = sess.CommitTxn(context.Background())
	}()
	txn, err := sess.Txn(true)
	if err != nil {
		return nil, err
	}
	t := meta.NewMeta(txn)
	job, err := t.GetHistoryDDLJob(id)
	return job, errors.Trace(err)
}

// AddHistoryDDLJobForTest used for test.
func AddHistoryDDLJobForTest(sess sessionctx.Context, t *meta.Meta, job *model.Job, updateRawArgs bool) error {
	return AddHistoryDDLJob(newSession(sess), t, job, updateRawArgs, variable.EnableConcurrentDDL.Load())
}

// AddHistoryDDLJob record the history job.
func AddHistoryDDLJob(sess *session, t *meta.Meta, job *model.Job, updateRawArgs bool, concurrentDDL bool) error {
	if concurrentDDL {
		// only add history job into table if it is concurrent DDL.
		err := addHistoryDDLJob2Table(sess, job, updateRawArgs)
		if err != nil {
			logutil.BgLogger().Info("[ddl] failed to add DDL job to history table", zap.Error(err))
		}
	}
	// we always add history DDL job to job list at this moment.
	return t.AddHistoryDDLJob(job, updateRawArgs)
}

// addHistoryDDLJob2Table adds DDL job to history table.
func addHistoryDDLJob2Table(sess *session, job *model.Job, updateRawArgs bool) error {
	b, err := job.Encode(updateRawArgs)
	if err != nil {
		return err
	}
	_, err = sess.execute(context.Background(),
		fmt.Sprintf("insert ignore into mysql.tidb_ddl_history(job_id, job_meta, db_name, table_name, schema_ids, table_ids, create_time) values (%d, %s, %s, %s, %s, %s, %v)",
			job.ID, wrapKey2String(b), strconv.Quote(job.SchemaName), strconv.Quote(job.TableName),
			strconv.Quote(strconv.FormatInt(job.SchemaID, 10)),
			strconv.Quote(strconv.FormatInt(job.TableID, 10)),
			strconv.Quote(model.TSConvert2Time(job.StartTS).String())),
		"insert_history")
	return errors.Trace(err)
}

相关信息

tidb 源码目录

相关文章

tidb backfilling 源码

tidb callback 源码

tidb cluster 源码

tidb column 源码

tidb constant 源码

tidb ddl_algorithm 源码

tidb ddl_api 源码

tidb ddl_tiflash_api 源码

tidb ddl_worker 源码

tidb ddl_workerpool 源码

0  赞