tidb ddl 源码

  • 2022-09-19
  • 浏览 (382)

tidb ddl 代码

文件路径:/parser/model/ddl.go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
	"encoding/json"
	"fmt"
	"math"
	"sync"
	"time"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/parser/terror"
)

// ActionType is the type for DDL action.
type ActionType byte

// List DDL actions.
const (
	ActionNone                          ActionType = 0
	ActionCreateSchema                  ActionType = 1
	ActionDropSchema                    ActionType = 2
	ActionCreateTable                   ActionType = 3
	ActionDropTable                     ActionType = 4
	ActionAddColumn                     ActionType = 5
	ActionDropColumn                    ActionType = 6
	ActionAddIndex                      ActionType = 7
	ActionDropIndex                     ActionType = 8
	ActionAddForeignKey                 ActionType = 9
	ActionDropForeignKey                ActionType = 10
	ActionTruncateTable                 ActionType = 11
	ActionModifyColumn                  ActionType = 12
	ActionRebaseAutoID                  ActionType = 13
	ActionRenameTable                   ActionType = 14
	ActionSetDefaultValue               ActionType = 15
	ActionShardRowID                    ActionType = 16
	ActionModifyTableComment            ActionType = 17
	ActionRenameIndex                   ActionType = 18
	ActionAddTablePartition             ActionType = 19
	ActionDropTablePartition            ActionType = 20
	ActionCreateView                    ActionType = 21
	ActionModifyTableCharsetAndCollate  ActionType = 22
	ActionTruncateTablePartition        ActionType = 23
	ActionDropView                      ActionType = 24
	ActionRecoverTable                  ActionType = 25
	ActionModifySchemaCharsetAndCollate ActionType = 26
	ActionLockTable                     ActionType = 27
	ActionUnlockTable                   ActionType = 28
	ActionRepairTable                   ActionType = 29
	ActionSetTiFlashReplica             ActionType = 30
	ActionUpdateTiFlashReplicaStatus    ActionType = 31
	ActionAddPrimaryKey                 ActionType = 32
	ActionDropPrimaryKey                ActionType = 33
	ActionCreateSequence                ActionType = 34
	ActionAlterSequence                 ActionType = 35
	ActionDropSequence                  ActionType = 36
	ActionAddColumns                    ActionType = 37 // Deprecated, we use ActionMultiSchemaChange instead.
	ActionDropColumns                   ActionType = 38 // Deprecated, we use ActionMultiSchemaChange instead.
	ActionModifyTableAutoIdCache        ActionType = 39 //nolint:revive
	ActionRebaseAutoRandomBase          ActionType = 40
	ActionAlterIndexVisibility          ActionType = 41
	ActionExchangeTablePartition        ActionType = 42
	ActionAddCheckConstraint            ActionType = 43
	ActionDropCheckConstraint           ActionType = 44
	ActionAlterCheckConstraint          ActionType = 45

	// `ActionAlterTableAlterPartition` is removed and will never be used.
	// Just left a tombstone here for compatibility.
	__DEPRECATED_ActionAlterTableAlterPartition ActionType = 46 //nolint:revive

	ActionRenameTables                  ActionType = 47
	ActionDropIndexes                   ActionType = 48 // Deprecated, we use ActionMultiSchemaChange instead.
	ActionAlterTableAttributes          ActionType = 49
	ActionAlterTablePartitionAttributes ActionType = 50
	ActionCreatePlacementPolicy         ActionType = 51
	ActionAlterPlacementPolicy          ActionType = 52
	ActionDropPlacementPolicy           ActionType = 53
	ActionAlterTablePartitionPlacement  ActionType = 54
	ActionModifySchemaDefaultPlacement  ActionType = 55
	ActionAlterTablePlacement           ActionType = 56
	ActionAlterCacheTable               ActionType = 57
	ActionAlterTableStatsOptions        ActionType = 58
	ActionAlterNoCacheTable             ActionType = 59
	ActionCreateTables                  ActionType = 60
	ActionMultiSchemaChange             ActionType = 61
	ActionFlashbackCluster              ActionType = 62
)

var actionMap = map[ActionType]string{
	ActionCreateSchema:                  "create schema",
	ActionDropSchema:                    "drop schema",
	ActionCreateTable:                   "create table",
	ActionCreateTables:                  "create tables",
	ActionDropTable:                     "drop table",
	ActionAddColumn:                     "add column",
	ActionDropColumn:                    "drop column",
	ActionAddIndex:                      "add index",
	ActionDropIndex:                     "drop index",
	ActionAddForeignKey:                 "add foreign key",
	ActionDropForeignKey:                "drop foreign key",
	ActionTruncateTable:                 "truncate table",
	ActionModifyColumn:                  "modify column",
	ActionRebaseAutoID:                  "rebase auto_increment ID",
	ActionRenameTable:                   "rename table",
	ActionRenameTables:                  "rename tables",
	ActionSetDefaultValue:               "set default value",
	ActionShardRowID:                    "shard row ID",
	ActionModifyTableComment:            "modify table comment",
	ActionRenameIndex:                   "rename index",
	ActionAddTablePartition:             "add partition",
	ActionDropTablePartition:            "drop partition",
	ActionCreateView:                    "create view",
	ActionModifyTableCharsetAndCollate:  "modify table charset and collate",
	ActionTruncateTablePartition:        "truncate partition",
	ActionDropView:                      "drop view",
	ActionRecoverTable:                  "recover table",
	ActionModifySchemaCharsetAndCollate: "modify schema charset and collate",
	ActionLockTable:                     "lock table",
	ActionUnlockTable:                   "unlock table",
	ActionRepairTable:                   "repair table",
	ActionSetTiFlashReplica:             "set tiflash replica",
	ActionUpdateTiFlashReplicaStatus:    "update tiflash replica status",
	ActionAddPrimaryKey:                 "add primary key",
	ActionDropPrimaryKey:                "drop primary key",
	ActionCreateSequence:                "create sequence",
	ActionAlterSequence:                 "alter sequence",
	ActionDropSequence:                  "drop sequence",
	ActionModifyTableAutoIdCache:        "modify auto id cache",
	ActionRebaseAutoRandomBase:          "rebase auto_random ID",
	ActionAlterIndexVisibility:          "alter index visibility",
	ActionExchangeTablePartition:        "exchange partition",
	ActionAddCheckConstraint:            "add check constraint",
	ActionDropCheckConstraint:           "drop check constraint",
	ActionAlterCheckConstraint:          "alter check constraint",
	ActionAlterTableAttributes:          "alter table attributes",
	ActionAlterTablePartitionPlacement:  "alter table partition placement",
	ActionAlterTablePartitionAttributes: "alter table partition attributes",
	ActionCreatePlacementPolicy:         "create placement policy",
	ActionAlterPlacementPolicy:          "alter placement policy",
	ActionDropPlacementPolicy:           "drop placement policy",
	ActionModifySchemaDefaultPlacement:  "modify schema default placement",
	ActionAlterTablePlacement:           "alter table placement",
	ActionAlterCacheTable:               "alter table cache",
	ActionAlterNoCacheTable:             "alter table nocache",
	ActionAlterTableStatsOptions:        "alter table statistics options",
	ActionMultiSchemaChange:             "alter table multi-schema change",
	ActionFlashbackCluster:              "flashback cluster",

	// `ActionAlterTableAlterPartition` is removed and will never be used.
	// Just left a tombstone here for compatibility.
	__DEPRECATED_ActionAlterTableAlterPartition: "alter partition",
}

// String return current ddl action in string
func (action ActionType) String() string {
	if v, ok := actionMap[action]; ok {
		return v
	}
	return "none"
}

// HistoryInfo is used for binlog.
type HistoryInfo struct {
	SchemaVersion int64
	DBInfo        *DBInfo
	TableInfo     *TableInfo
	FinishedTS    uint64

	// MultipleTableInfos is like TableInfo but only for operations updating multiple tables.
	MultipleTableInfos []*TableInfo
}

// AddDBInfo adds schema version and schema information that are used for binlog.
// dbInfo is added in the following operations: create database, drop database.
func (h *HistoryInfo) AddDBInfo(schemaVer int64, dbInfo *DBInfo) {
	h.SchemaVersion = schemaVer
	h.DBInfo = dbInfo
}

// AddTableInfo adds schema version and table information that are used for binlog.
// tblInfo is added except for the following operations: create database, drop database.
func (h *HistoryInfo) AddTableInfo(schemaVer int64, tblInfo *TableInfo) {
	h.SchemaVersion = schemaVer
	h.TableInfo = tblInfo
}

// SetTableInfos is like AddTableInfo, but will add multiple table infos to the binlog.
func (h *HistoryInfo) SetTableInfos(schemaVer int64, tblInfos []*TableInfo) {
	h.SchemaVersion = schemaVer
	h.MultipleTableInfos = make([]*TableInfo, len(tblInfos))
	copy(h.MultipleTableInfos, tblInfos)
}

// Clean cleans history information.
func (h *HistoryInfo) Clean() {
	h.SchemaVersion = 0
	h.DBInfo = nil
	h.TableInfo = nil
	h.MultipleTableInfos = nil
}

// DDLReorgMeta is meta info of DDL reorganization.
type DDLReorgMeta struct {
	// EndHandle is the last handle of the adding indices table.
	// We should only backfill indices in the range [startHandle, EndHandle].
	EndHandle int64 `json:"end_handle"`

	SQLMode       mysql.SQLMode                    `json:"sql_mode"`
	Warnings      map[errors.ErrorID]*terror.Error `json:"warnings"`
	WarningsCount map[errors.ErrorID]int64         `json:"warnings_count"`
	Location      *TimeZoneLocation                `json:"location"`
	ReorgTp       ReorgType                        `json:"reorg_tp"`
}

// ReorgType indicates which process is used for the data reorganization.
type ReorgType int8

const (
	// ReorgTypeNone means the backfill task is not started yet.
	ReorgTypeNone ReorgType = iota
	// ReorgTypeTxn means the index records are backfill with transactions.
	// All the index KVs are written through the transaction interface.
	// This is the original backfill implementation.
	ReorgTypeTxn
	// ReorgTypeLitMerge means the index records are backfill with lightning.
	// The index KVs are encoded to SST files and imported to the storage directly.
	// The incremental index KVs written by DML are redirected to a temporary index.
	// After the backfill is finished, the temporary index records are merged back to the original index.
	ReorgTypeLitMerge
	// ReorgTypeTxnMerge means backfill with transactions and merge incremental changes.
	// The backfill index KVs are written through the transaction interface.
	// The incremental index KVs written by DML are redirected to a temporary index.
	// After the backfill is finished, the temporary index records are merged back to the original index.
	ReorgTypeTxnMerge
)

// NeedMergeProcess means the incremental changes need to be merged.
func (tp ReorgType) NeedMergeProcess() bool {
	return tp == ReorgTypeLitMerge || tp == ReorgTypeTxnMerge
}

// TimeZoneLocation represents a single time zone.
type TimeZoneLocation struct {
	Name     string `json:"name"`
	Offset   int    `json:"offset"` // seconds east of UTC
	location *time.Location
}

// GetLocation gets the timezone location.
func (tz *TimeZoneLocation) GetLocation() (*time.Location, error) {
	if tz.location != nil {
		return tz.location, nil
	}

	var err error
	if tz.Offset == 0 {
		tz.location, err = time.LoadLocation(tz.Name)
	} else {
		tz.location = time.FixedZone(tz.Name, tz.Offset)
	}
	return tz.location, err
}

// NewDDLReorgMeta new a DDLReorgMeta.
func NewDDLReorgMeta() *DDLReorgMeta {
	return &DDLReorgMeta{
		EndHandle: math.MaxInt64,
	}
}

// MultiSchemaInfo keeps some information for multi schema change.
type MultiSchemaInfo struct {
	SubJobs    []*SubJob `json:"sub_jobs"`
	Revertible bool      `json:"revertible"`

	// SkipVersion is used to control whether generating a new schema version for a sub-job.
	SkipVersion bool `json:"-"`

	AddColumns    []CIStr `json:"-"`
	DropColumns   []CIStr `json:"-"`
	ModifyColumns []CIStr `json:"-"`
	AddIndexes    []CIStr `json:"-"`
	DropIndexes   []CIStr `json:"-"`
	AlterIndexes  []CIStr `json:"-"`

	RelativeColumns []CIStr `json:"-"`
	PositionColumns []CIStr `json:"-"`
}

// NewMultiSchemaInfo new a MultiSchemaInfo.
func NewMultiSchemaInfo() *MultiSchemaInfo {
	return &MultiSchemaInfo{
		SubJobs:    nil,
		Revertible: true,
	}
}

// SubJob is a representation of one DDL schema change. A Job may contain zero(when multi-schema change is not applicable) or more SubJobs.
type SubJob struct {
	Type        ActionType      `json:"type"`
	Args        []interface{}   `json:"-"`
	RawArgs     json.RawMessage `json:"raw_args"`
	SchemaState SchemaState     `json:"schema_state"`
	SnapshotVer uint64          `json:"snapshot_ver"`
	Revertible  bool            `json:"revertible"`
	State       JobState        `json:"state"`
	RowCount    int64           `json:"row_count"`
	Warning     *terror.Error   `json:"warning"`
	CtxVars     []interface{}   `json:"-"`
	SchemaVer   int64           `json:"schema_version"`
}

// IsNormal returns true if the sub-job is normally running.
func (sub *SubJob) IsNormal() bool {
	switch sub.State {
	case JobStateCancelling, JobStateCancelled,
		JobStateRollingback, JobStateRollbackDone:
		return false
	default:
		return true
	}
}

// IsFinished returns true if the job is done.
func (sub *SubJob) IsFinished() bool {
	return sub.State == JobStateDone ||
		sub.State == JobStateRollbackDone ||
		sub.State == JobStateCancelled
}

// ToProxyJob converts a sub-job to a proxy job.
func (sub *SubJob) ToProxyJob(parentJob *Job) Job {
	return Job{
		ID:              parentJob.ID,
		Type:            sub.Type,
		SchemaID:        parentJob.SchemaID,
		TableID:         parentJob.TableID,
		SchemaName:      parentJob.SchemaName,
		State:           sub.State,
		Warning:         sub.Warning,
		Error:           nil,
		ErrorCount:      0,
		RowCount:        sub.RowCount,
		Mu:              sync.Mutex{},
		CtxVars:         sub.CtxVars,
		Args:            sub.Args,
		RawArgs:         sub.RawArgs,
		SchemaState:     sub.SchemaState,
		SnapshotVer:     sub.SnapshotVer,
		RealStartTS:     parentJob.RealStartTS,
		StartTS:         parentJob.StartTS,
		DependencyID:    parentJob.DependencyID,
		Query:           parentJob.Query,
		BinlogInfo:      parentJob.BinlogInfo,
		Version:         parentJob.Version,
		ReorgMeta:       parentJob.ReorgMeta,
		MultiSchemaInfo: &MultiSchemaInfo{Revertible: sub.Revertible},
		Priority:        parentJob.Priority,
		SeqNum:          parentJob.SeqNum,
	}
}

// FromProxyJob converts a proxy job to a sub-job.
func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) {
	sub.Revertible = proxyJob.MultiSchemaInfo.Revertible
	sub.SchemaState = proxyJob.SchemaState
	sub.SnapshotVer = proxyJob.SnapshotVer
	sub.Args = proxyJob.Args
	sub.State = proxyJob.State
	sub.Warning = proxyJob.Warning
	sub.RowCount = proxyJob.RowCount
	sub.SchemaVer = ver
}

// Job is for a DDL operation.
type Job struct {
	ID         int64         `json:"id"`
	Type       ActionType    `json:"type"`
	SchemaID   int64         `json:"schema_id"`
	TableID    int64         `json:"table_id"`
	SchemaName string        `json:"schema_name"`
	TableName  string        `json:"table_name"`
	State      JobState      `json:"state"`
	Warning    *terror.Error `json:"warning"`
	Error      *terror.Error `json:"err"`
	// ErrorCount will be increased, every time we meet an error when running job.
	ErrorCount int64 `json:"err_count"`
	// RowCount means the number of rows that are processed.
	RowCount int64      `json:"row_count"`
	Mu       sync.Mutex `json:"-"`
	// CtxVars are variables attached to the job. It is for internal usage.
	// E.g. passing arguments between functions by one single *Job pointer.
	CtxVars []interface{} `json:"-"`
	Args    []interface{} `json:"-"`
	// RawArgs : We must use json raw message to delay parsing special args.
	RawArgs     json.RawMessage `json:"raw_args"`
	SchemaState SchemaState     `json:"schema_state"`
	// SnapshotVer means snapshot version for this job.
	SnapshotVer uint64 `json:"snapshot_ver"`
	// RealStartTS uses timestamp allocated by TSO.
	// Now it's the TS when we actually start the job.
	RealStartTS uint64 `json:"real_start_ts"`
	// StartTS uses timestamp allocated by TSO.
	// Now it's the TS when we put the job to TiKV queue.
	StartTS uint64 `json:"start_ts"`
	// DependencyID is the job's ID that the current job depends on.
	DependencyID int64 `json:"dependency_id"`
	// Query string of the ddl job.
	Query      string       `json:"query"`
	BinlogInfo *HistoryInfo `json:"binlog"`

	// Version indicates the DDL job version. For old jobs, it will be 0.
	Version int64 `json:"version"`

	// ReorgMeta is meta info of ddl reorganization.
	ReorgMeta *DDLReorgMeta `json:"reorg_meta"`

	// MultiSchemaInfo keeps some warning now for multi schema change.
	MultiSchemaInfo *MultiSchemaInfo `json:"multi_schema_info"`

	// Priority is only used to set the operation priority of adding indices.
	Priority int `json:"priority"`

	// SeqNum is the total order in all DDLs, it's used to identify the order of DDL.
	SeqNum uint64 `json:"seq_num"`
}

// FinishTableJob is called when a job is finished.
// It updates the job's state information and adds tblInfo to the binlog.
func (job *Job) FinishTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfo *TableInfo) {
	job.State = jobState
	job.SchemaState = schemaState
	job.BinlogInfo.AddTableInfo(ver, tblInfo)
}

// FinishMultipleTableJob is called when a job is finished.
// It updates the job's state information and adds tblInfos to the binlog.
func (job *Job) FinishMultipleTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfos []*TableInfo) {
	job.State = jobState
	job.SchemaState = schemaState
	job.BinlogInfo.SchemaVersion = ver
	job.BinlogInfo.MultipleTableInfos = tblInfos
	job.BinlogInfo.TableInfo = tblInfos[len(tblInfos)-1]
}

// FinishDBJob is called when a job is finished.
// It updates the job's state information and adds dbInfo the binlog.
func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int64, dbInfo *DBInfo) {
	job.State = jobState
	job.SchemaState = schemaState
	job.BinlogInfo.AddDBInfo(ver, dbInfo)
}

// MarkNonRevertible mark the current job to be non-revertible.
// It means the job cannot be cancelled or rollbacked.
func (job *Job) MarkNonRevertible() {
	if job.MultiSchemaInfo != nil {
		job.MultiSchemaInfo.Revertible = false
	}
}

// Clone returns a copy of the job.
func (job *Job) Clone() *Job {
	encode, err := job.Encode(true)
	if err != nil {
		return nil
	}
	var clone Job
	err = clone.Decode(encode)
	if err != nil {
		return nil
	}
	if len(job.Args) > 0 {
		clone.Args = make([]interface{}, len(job.Args))
		copy(clone.Args, job.Args)
	}
	if job.MultiSchemaInfo != nil {
		for i, sub := range job.MultiSchemaInfo.SubJobs {
			clone.MultiSchemaInfo.SubJobs[i].Args = make([]interface{}, len(sub.Args))
			copy(clone.MultiSchemaInfo.SubJobs[i].Args, sub.Args)
		}
	}
	return &clone
}

// TSConvert2Time converts timestamp to time.
func TSConvert2Time(ts uint64) time.Time {
	t := int64(ts >> 18) // 18 is for the logical time.
	return time.UnixMilli(t)
}

// SetRowCount sets the number of rows. Make sure it can pass `make race`.
func (job *Job) SetRowCount(count int64) {
	job.Mu.Lock()
	defer job.Mu.Unlock()

	job.RowCount = count
}

// GetRowCount gets the number of rows. Make sure it can pass `make race`.
func (job *Job) GetRowCount() int64 {
	job.Mu.Lock()
	defer job.Mu.Unlock()

	return job.RowCount
}

// SetWarnings sets the warnings of rows handled.
func (job *Job) SetWarnings(warnings map[errors.ErrorID]*terror.Error, warningsCount map[errors.ErrorID]int64) {
	job.ReorgMeta.Warnings = warnings
	job.ReorgMeta.WarningsCount = warningsCount
}

// GetWarnings gets the warnings of the rows handled.
func (job *Job) GetWarnings() (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
	return job.ReorgMeta.Warnings, job.ReorgMeta.WarningsCount
}

// Encode encodes job with json format.
// updateRawArgs is used to determine whether to update the raw args.
func (job *Job) Encode(updateRawArgs bool) ([]byte, error) {
	var err error
	if updateRawArgs {
		job.RawArgs, err = json.Marshal(job.Args)
		if err != nil {
			return nil, errors.Trace(err)
		}
		if job.MultiSchemaInfo != nil {
			for _, sub := range job.MultiSchemaInfo.SubJobs {
				// Only update the args of executing sub-jobs.
				if sub.Args == nil {
					continue
				}
				sub.RawArgs, err = json.Marshal(sub.Args)
				if err != nil {
					return nil, errors.Trace(err)
				}
			}
		}
	}

	var b []byte
	job.Mu.Lock()
	defer job.Mu.Unlock()
	b, err = json.Marshal(job)

	return b, errors.Trace(err)
}

// Decode decodes job from the json buffer, we must use DecodeArgs later to
// decode special args for this job.
func (job *Job) Decode(b []byte) error {
	err := json.Unmarshal(b, job)
	return errors.Trace(err)
}

// DecodeArgs decodes job args.
func (job *Job) DecodeArgs(args ...interface{}) error {
	var rawArgs []json.RawMessage
	if err := json.Unmarshal(job.RawArgs, &rawArgs); err != nil {
		return errors.Trace(err)
	}

	sz := len(rawArgs)
	if sz > len(args) {
		sz = len(args)
	}

	for i := 0; i < sz; i++ {
		if err := json.Unmarshal(rawArgs[i], args[i]); err != nil {
			return errors.Trace(err)
		}
	}
	job.Args = args[:sz]
	return nil
}

// String implements fmt.Stringer interface.
func (job *Job) String() string {
	rowCount := job.GetRowCount()
	ret := fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v",
		job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), TSConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer)
	if job.Type != ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
		ret += fmt.Sprintf(", Multi-Schema Change:true, Revertible:%v", job.MultiSchemaInfo.Revertible)
	}
	return ret
}

func (job *Job) hasDependentSchema(other *Job) (bool, error) {
	if other.Type == ActionDropSchema || other.Type == ActionCreateSchema {
		if other.SchemaID == job.SchemaID {
			return true, nil
		}
		if job.Type == ActionRenameTable {
			var oldSchemaID int64
			if err := job.DecodeArgs(&oldSchemaID); err != nil {
				return false, errors.Trace(err)
			}
			if other.SchemaID == oldSchemaID {
				return true, nil
			}
		}
		if job.Type == ActionExchangeTablePartition {
			var (
				defID          int64
				ptSchemaID     int64
				ptID           int64
				partName       string
				withValidation bool
			)
			if err := job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil {
				return false, errors.Trace(err)
			}
			if other.SchemaID == ptSchemaID {
				return true, nil
			}
		}
	}
	return false, nil
}

func (job *Job) hasDependentTableForExchangePartition(other *Job) (bool, error) {
	if job.Type == ActionExchangeTablePartition {
		var (
			defID          int64
			ptSchemaID     int64
			ptID           int64
			partName       string
			withValidation bool
		)

		if err := job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil {
			return false, errors.Trace(err)
		}
		if ptID == other.TableID || defID == other.TableID {
			return true, nil
		}

		if other.Type == ActionExchangeTablePartition {
			var (
				otherDefID          int64
				otherPtSchemaID     int64
				otherPtID           int64
				otherPartName       string
				otherWithValidation bool
			)
			if err := other.DecodeArgs(&otherDefID, &otherPtSchemaID, &otherPtID, &otherPartName, &otherWithValidation); err != nil {
				return false, errors.Trace(err)
			}
			if job.TableID == other.TableID || job.TableID == otherPtID || job.TableID == otherDefID {
				return true, nil
			}
			if ptID == other.TableID || ptID == otherPtID || ptID == otherDefID {
				return true, nil
			}
			if defID == other.TableID || defID == otherPtID || defID == otherDefID {
				return true, nil
			}
		}
	}
	return false, nil
}

// IsDependentOn returns whether the job depends on "other".
// How to check the job depends on "other"?
// 1. The two jobs handle the same database when one of the two jobs is an ActionDropSchema or ActionCreateSchema type.
// 2. Or the two jobs handle the same table.
func (job *Job) IsDependentOn(other *Job) (bool, error) {
	isDependent, err := job.hasDependentSchema(other)
	if err != nil || isDependent {
		return isDependent, errors.Trace(err)
	}
	isDependent, err = other.hasDependentSchema(job)
	if err != nil || isDependent {
		return isDependent, errors.Trace(err)
	}

	// TODO: If a job is ActionRenameTable, we need to check table name.
	if other.TableID == job.TableID {
		return true, nil
	}
	isDependent, err = job.hasDependentTableForExchangePartition(other)
	if err != nil || isDependent {
		return isDependent, errors.Trace(err)
	}
	isDependent, err = other.hasDependentTableForExchangePartition(job)
	if err != nil || isDependent {
		return isDependent, errors.Trace(err)
	}
	return false, nil
}

// IsFinished returns whether job is finished or not.
// If the job state is Done or Cancelled, it is finished.
func (job *Job) IsFinished() bool {
	return job.State == JobStateDone || job.State == JobStateRollbackDone || job.State == JobStateCancelled
}

// IsCancelled returns whether the job is cancelled or not.
func (job *Job) IsCancelled() bool {
	return job.State == JobStateCancelled
}

// IsRollbackDone returns whether the job is rolled back or not.
func (job *Job) IsRollbackDone() bool {
	return job.State == JobStateRollbackDone
}

// IsRollingback returns whether the job is rolling back or not.
func (job *Job) IsRollingback() bool {
	return job.State == JobStateRollingback
}

// IsCancelling returns whether the job is cancelling or not.
func (job *Job) IsCancelling() bool {
	return job.State == JobStateCancelling
}

// IsSynced returns whether the DDL modification is synced among all TiDB servers.
func (job *Job) IsSynced() bool {
	return job.State == JobStateSynced
}

// IsDone returns whether job is done.
func (job *Job) IsDone() bool {
	return job.State == JobStateDone
}

// IsRunning returns whether job is still running or not.
func (job *Job) IsRunning() bool {
	return job.State == JobStateRunning
}

// IsQueueing returns whether job is queuing or not.
func (job *Job) IsQueueing() bool {
	return job.State == JobStateQueueing
}

// NotStarted returns true if the job is never run by a worker.
func (job *Job) NotStarted() bool {
	return job.State == JobStateNone || job.State == JobStateQueueing
}

// MayNeedReorg indicates that this job may need to reorganize the data.
func (job *Job) MayNeedReorg() bool {
	switch job.Type {
	case ActionAddIndex, ActionAddPrimaryKey:
		return true
	case ActionModifyColumn:
		if len(job.CtxVars) > 0 {
			needReorg, ok := job.CtxVars[0].(bool)
			return ok && needReorg
		}
		return false
	case ActionMultiSchemaChange:
		for _, sub := range job.MultiSchemaInfo.SubJobs {
			proxyJob := Job{Type: sub.Type, CtxVars: sub.CtxVars}
			if proxyJob.MayNeedReorg() {
				return true
			}
		}
		return false
	default:
		return false
	}
}

// IsRollbackable checks whether the job can be rollback.
func (job *Job) IsRollbackable() bool {
	switch job.Type {
	case ActionDropIndex, ActionDropPrimaryKey:
		// We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization or StateWriteOnly, otherwise there will be an inconsistent issue between record and index.
		// In WriteOnly state, we can rollback for normal index but can't rollback for expression index(need to drop hidden column). Since we can't
		// know the type of index here, we consider all indices except primary index as non-rollbackable.
		// TODO: distinguish normal index and expression index so that we can rollback `DropIndex` for normal index in WriteOnly state.
		// TODO: make DropPrimaryKey rollbackable in WriteOnly, it need to deal with some tests.
		if job.SchemaState == StateDeleteOnly ||
			job.SchemaState == StateDeleteReorganization ||
			job.SchemaState == StateWriteOnly {
			return false
		}
	case ActionAddTablePartition:
		return job.SchemaState == StateNone || job.SchemaState == StateReplicaOnly
	case ActionDropColumn, ActionDropSchema, ActionDropTable, ActionDropSequence,
		ActionDropForeignKey, ActionDropTablePartition:
		return job.SchemaState == StatePublic
	case ActionRebaseAutoID, ActionShardRowID,
		ActionTruncateTable, ActionAddForeignKey, ActionRenameTable,
		ActionModifyTableCharsetAndCollate, ActionTruncateTablePartition,
		ActionModifySchemaCharsetAndCollate, ActionRepairTable,
		ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement:
		return job.SchemaState == StateNone
	case ActionMultiSchemaChange:
		return job.MultiSchemaInfo.Revertible
	case ActionFlashbackCluster:
		if job.SchemaState == StateWriteReorganization {
			return false
		}
	}
	return true
}

// JobState is for job state.
type JobState byte

// List job states.
const (
	JobStateNone    JobState = 0
	JobStateRunning JobState = 1
	// When DDL encountered an unrecoverable error at reorganization state,
	// some keys has been added already, we need to remove them.
	// JobStateRollingback is the state to do the rolling back job.
	JobStateRollingback  JobState = 2
	JobStateRollbackDone JobState = 3
	JobStateDone         JobState = 4
	JobStateCancelled    JobState = 5
	// JobStateSynced is used to mark the information about the completion of this job
	// has been synchronized to all servers.
	JobStateSynced JobState = 6
	// JobStateCancelling is used to mark the DDL job is cancelled by the client, but the DDL work hasn't handle it.
	JobStateCancelling JobState = 7
	// JobStateQueueing means the job has not yet been started.
	JobStateQueueing JobState = 8
)

// String implements fmt.Stringer interface.
func (s JobState) String() string {
	switch s {
	case JobStateRunning:
		return "running"
	case JobStateRollingback:
		return "rollingback"
	case JobStateRollbackDone:
		return "rollback done"
	case JobStateDone:
		return "done"
	case JobStateCancelled:
		return "cancelled"
	case JobStateCancelling:
		return "cancelling"
	case JobStateSynced:
		return "synced"
	case JobStateQueueing:
		return "queueing"
	default:
		return "none"
	}
}

// SchemaDiff contains the schema modification at a particular schema version.
// It is used to reduce schema reload cost.
type SchemaDiff struct {
	Version  int64      `json:"version"`
	Type     ActionType `json:"type"`
	SchemaID int64      `json:"schema_id"`
	TableID  int64      `json:"table_id"`

	// OldTableID is the table ID before truncate, only used by truncate table DDL.
	OldTableID int64 `json:"old_table_id"`
	// OldSchemaID is the schema ID before rename table, only used by rename table DDL.
	OldSchemaID int64 `json:"old_schema_id"`

	AffectedOpts []*AffectedOption `json:"affected_options"`
}

// AffectedOption is used when a ddl affects multi tables.
type AffectedOption struct {
	SchemaID    int64 `json:"schema_id"`
	TableID     int64 `json:"table_id"`
	OldTableID  int64 `json:"old_table_id"`
	OldSchemaID int64 `json:"old_schema_id"`
}

相关信息

tidb 源码目录

相关文章

tidb flags 源码

tidb model 源码

0  赞