tidb tables 源码

  • 2022-09-19
  • 浏览 (393)

tidb tables 代码

文件路径:/table/tables/tables.go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2013 The ql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSES/QL-LICENSE file.

package tables

import (
	"context"
	"math"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/opentracing/opentracing-go"
	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/meta"
	"github.com/pingcap/tidb/meta/autoid"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/sessionctx/binloginfo"
	"github.com/pingcap/tidb/sessionctx/stmtctx"
	"github.com/pingcap/tidb/sessionctx/variable"
	"github.com/pingcap/tidb/statistics"
	"github.com/pingcap/tidb/table"
	"github.com/pingcap/tidb/tablecodec"
	"github.com/pingcap/tidb/types"
	"github.com/pingcap/tidb/util"
	"github.com/pingcap/tidb/util/codec"
	"github.com/pingcap/tidb/util/collate"
	"github.com/pingcap/tidb/util/generatedexpr"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/stringutil"
	"github.com/pingcap/tidb/util/tableutil"
	"github.com/pingcap/tipb/go-binlog"
	"github.com/pingcap/tipb/go-tipb"
	"go.uber.org/zap"
)

// TableCommon is shared by both Table and partition.
type TableCommon struct {
	tableID int64
	// physicalTableID is a unique int64 to identify a physical table.
	physicalTableID                 int64
	Columns                         []*table.Column
	PublicColumns                   []*table.Column
	VisibleColumns                  []*table.Column
	HiddenColumns                   []*table.Column
	WritableColumns                 []*table.Column
	FullHiddenColsAndVisibleColumns []*table.Column
	indices                         []table.Index
	meta                            *model.TableInfo
	allocs                          autoid.Allocators
	sequence                        *sequenceCommon

	// recordPrefix and indexPrefix are generated using physicalTableID.
	recordPrefix kv.Key
	indexPrefix  kv.Key
}

// MockTableFromMeta only serves for test.
func MockTableFromMeta(tblInfo *model.TableInfo) table.Table {
	columns := make([]*table.Column, 0, len(tblInfo.Columns))
	for _, colInfo := range tblInfo.Columns {
		col := table.ToColumn(colInfo)
		columns = append(columns, col)
	}

	var t TableCommon
	initTableCommon(&t, tblInfo, tblInfo.ID, columns, nil)
	if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable {
		ret, err := newCachedTable(&t)
		if err != nil {
			return nil
		}
		return ret
	}
	if tblInfo.GetPartitionInfo() == nil {
		if err := initTableIndices(&t); err != nil {
			return nil
		}
		return &t
	}

	ret, err := newPartitionedTable(&t, tblInfo)
	if err != nil {
		return nil
	}
	return ret
}

// TableFromMeta creates a Table instance from model.TableInfo.
func TableFromMeta(allocs autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) {
	if tblInfo.State == model.StateNone {
		return nil, table.ErrTableStateCantNone.GenWithStackByArgs(tblInfo.Name)
	}

	colsLen := len(tblInfo.Columns)
	columns := make([]*table.Column, 0, colsLen)
	for i, colInfo := range tblInfo.Columns {
		if colInfo.State == model.StateNone {
			return nil, table.ErrColumnStateCantNone.GenWithStackByArgs(colInfo.Name)
		}

		// Print some information when the column's offset isn't equal to i.
		if colInfo.Offset != i {
			logutil.BgLogger().Error("wrong table schema", zap.Any("table", tblInfo), zap.Any("column", colInfo), zap.Int("index", i), zap.Int("offset", colInfo.Offset), zap.Int("columnNumber", colsLen))
		}

		col := table.ToColumn(colInfo)
		if col.IsGenerated() {
			expr, err := generatedexpr.ParseExpression(colInfo.GeneratedExprString)
			if err != nil {
				return nil, err
			}
			expr, err = generatedexpr.SimpleResolveName(expr, tblInfo)
			if err != nil {
				return nil, err
			}
			col.GeneratedExpr = expr
		}
		// default value is expr.
		if col.DefaultIsExpr {
			expr, err := generatedexpr.ParseExpression(colInfo.DefaultValue.(string))
			if err != nil {
				return nil, err
			}
			col.DefaultExpr = expr
		}
		columns = append(columns, col)
	}

	var t TableCommon
	initTableCommon(&t, tblInfo, tblInfo.ID, columns, allocs)
	if tblInfo.GetPartitionInfo() == nil {
		if err := initTableIndices(&t); err != nil {
			return nil, err
		}
		if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable {
			return newCachedTable(&t)
		}
		return &t, nil
	}
	return newPartitionedTable(&t, tblInfo)
}

// initTableCommon initializes a TableCommon struct.
func initTableCommon(t *TableCommon, tblInfo *model.TableInfo, physicalTableID int64, cols []*table.Column, allocs autoid.Allocators) {
	t.tableID = tblInfo.ID
	t.physicalTableID = physicalTableID
	t.allocs = allocs
	t.meta = tblInfo
	t.Columns = cols
	t.PublicColumns = t.Cols()
	t.VisibleColumns = t.VisibleCols()
	t.HiddenColumns = t.HiddenCols()
	t.WritableColumns = t.WritableCols()
	t.FullHiddenColsAndVisibleColumns = t.FullHiddenColsAndVisibleCols()
	t.recordPrefix = tablecodec.GenTableRecordPrefix(physicalTableID)
	t.indexPrefix = tablecodec.GenTableIndexPrefix(physicalTableID)
	if tblInfo.IsSequence() {
		t.sequence = &sequenceCommon{meta: tblInfo.Sequence}
	}
}

// initTableIndices initializes the indices of the TableCommon.
func initTableIndices(t *TableCommon) error {
	tblInfo := t.meta
	for _, idxInfo := range tblInfo.Indices {
		if idxInfo.State == model.StateNone {
			return table.ErrIndexStateCantNone.GenWithStackByArgs(idxInfo.Name)
		}

		// Use partition ID for index, because TableCommon may be table or partition.
		idx := NewIndex(t.physicalTableID, tblInfo, idxInfo)
		t.indices = append(t.indices, idx)
	}
	return nil
}

func initTableCommonWithIndices(t *TableCommon, tblInfo *model.TableInfo, physicalTableID int64, cols []*table.Column, allocs autoid.Allocators) error {
	initTableCommon(t, tblInfo, physicalTableID, cols, allocs)
	return initTableIndices(t)
}

// Indices implements table.Table Indices interface.
func (t *TableCommon) Indices() []table.Index {
	return t.indices
}

// GetWritableIndexByName gets the index meta from the table by the index name.
func GetWritableIndexByName(idxName string, t table.Table) table.Index {
	for _, idx := range t.Indices() {
		if !IsIndexWritable(idx) {
			continue
		}
		if idxName == idx.Meta().Name.L {
			return idx
		}
	}
	return nil
}

// deletableIndices implements table.Table deletableIndices interface.
func (t *TableCommon) deletableIndices() []table.Index {
	// All indices are deletable because we don't need to check StateNone.
	return t.indices
}

// Meta implements table.Table Meta interface.
func (t *TableCommon) Meta() *model.TableInfo {
	return t.meta
}

// GetPhysicalID implements table.Table GetPhysicalID interface.
func (t *TableCommon) GetPhysicalID() int64 {
	return t.physicalTableID
}

type getColsMode int64

const (
	_ getColsMode = iota
	visible
	hidden
	full
)

func (t *TableCommon) getCols(mode getColsMode) []*table.Column {
	columns := make([]*table.Column, 0, len(t.Columns))
	for _, col := range t.Columns {
		if col.State != model.StatePublic {
			continue
		}
		if (mode == visible && col.Hidden) || (mode == hidden && !col.Hidden) {
			continue
		}
		columns = append(columns, col)
	}
	return columns
}

// Cols implements table.Table Cols interface.
func (t *TableCommon) Cols() []*table.Column {
	if len(t.PublicColumns) > 0 {
		return t.PublicColumns
	}
	return t.getCols(full)
}

// VisibleCols implements table.Table VisibleCols interface.
func (t *TableCommon) VisibleCols() []*table.Column {
	if len(t.VisibleColumns) > 0 {
		return t.VisibleColumns
	}
	return t.getCols(visible)
}

// HiddenCols implements table.Table HiddenCols interface.
func (t *TableCommon) HiddenCols() []*table.Column {
	if len(t.HiddenColumns) > 0 {
		return t.HiddenColumns
	}
	return t.getCols(hidden)
}

// WritableCols implements table WritableCols interface.
func (t *TableCommon) WritableCols() []*table.Column {
	if len(t.WritableColumns) > 0 {
		return t.WritableColumns
	}
	writableColumns := make([]*table.Column, 0, len(t.Columns))
	for _, col := range t.Columns {
		if col.State == model.StateDeleteOnly || col.State == model.StateDeleteReorganization {
			continue
		}
		writableColumns = append(writableColumns, col)
	}
	return writableColumns
}

// DeletableCols implements table DeletableCols interface.
func (t *TableCommon) DeletableCols() []*table.Column {
	return t.Columns
}

// FullHiddenColsAndVisibleCols implements table FullHiddenColsAndVisibleCols interface.
func (t *TableCommon) FullHiddenColsAndVisibleCols() []*table.Column {
	if len(t.FullHiddenColsAndVisibleColumns) > 0 {
		return t.FullHiddenColsAndVisibleColumns
	}

	cols := make([]*table.Column, 0, len(t.Columns))
	for _, col := range t.Columns {
		if col.Hidden || col.State == model.StatePublic {
			cols = append(cols, col)
		}
	}
	return cols
}

// RecordPrefix implements table.Table interface.
func (t *TableCommon) RecordPrefix() kv.Key {
	return t.recordPrefix
}

// IndexPrefix implements table.Table interface.
func (t *TableCommon) IndexPrefix() kv.Key {
	return t.indexPrefix
}

// RecordKey implements table.Table interface.
func (t *TableCommon) RecordKey(h kv.Handle) kv.Key {
	return tablecodec.EncodeRecordKey(t.recordPrefix, h)
}

// UpdateRecord implements table.Table UpdateRecord interface.
// `touched` means which columns are really modified, used for secondary indices.
// Length of `oldData` and `newData` equals to length of `t.WritableCols()`.
func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
	txn, err := sctx.Txn(true)
	if err != nil {
		return err
	}

	memBuffer := txn.GetMemBuffer()
	sh := memBuffer.Staging()
	defer memBuffer.Cleanup(sh)

	if m := t.Meta(); m.TempTableType != model.TempTableNone {
		if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil {
			if err := checkTempTableSize(sctx, tmpTable, m); err != nil {
				return err
			}
			defer handleTempTableSize(tmpTable, txn.Size(), txn)
		}
	}

	var colIDs, binlogColIDs []int64
	var row, binlogOldRow, binlogNewRow []types.Datum
	numColsCap := len(newData) + 1 // +1 for the extra handle column that we may need to append.
	colIDs = make([]int64, 0, numColsCap)
	row = make([]types.Datum, 0, numColsCap)
	if shouldWriteBinlog(sctx, t.meta) {
		binlogColIDs = make([]int64, 0, numColsCap)
		binlogOldRow = make([]types.Datum, 0, numColsCap)
		binlogNewRow = make([]types.Datum, 0, numColsCap)
	}

	for _, col := range t.Columns {
		var value types.Datum
		if col.State == model.StateDeleteOnly || col.State == model.StateDeleteReorganization {
			if col.ChangeStateInfo != nil {
				// TODO: Check overflow or ignoreTruncate.
				value, err = table.CastValue(sctx, oldData[col.DependencyColumnOffset], col.ColumnInfo, false, false)
				if err != nil {
					logutil.BgLogger().Info("update record cast value failed", zap.Any("col", col), zap.Uint64("txnStartTS", txn.StartTS()),
						zap.String("handle", h.String()), zap.Any("val", oldData[col.DependencyColumnOffset]), zap.Error(err))
					return err
				}
				oldData = append(oldData, value)
				touched = append(touched, touched[col.DependencyColumnOffset])
			}
			continue
		}
		if col.State != model.StatePublic {
			// If col is in write only or write reorganization state we should keep the oldData.
			// Because the oldData must be the original data(it's changed by other TiDBs.) or the original default value.
			// TODO: Use newData directly.
			value = oldData[col.Offset]
			if col.ChangeStateInfo != nil {
				// TODO: Check overflow or ignoreTruncate.
				value, err = table.CastValue(sctx, newData[col.DependencyColumnOffset], col.ColumnInfo, false, false)
				if err != nil {
					return err
				}
				newData[col.Offset] = value
				touched[col.Offset] = touched[col.DependencyColumnOffset]
			}
		} else {
			value = newData[col.Offset]
		}
		if !t.canSkip(col, &value) {
			colIDs = append(colIDs, col.ID)
			row = append(row, value)
		}
		if shouldWriteBinlog(sctx, t.meta) && !t.canSkipUpdateBinlog(col, value) {
			binlogColIDs = append(binlogColIDs, col.ID)
			binlogOldRow = append(binlogOldRow, oldData[col.Offset])
			binlogNewRow = append(binlogNewRow, value)
		}
	}
	sessVars := sctx.GetSessionVars()
	// rebuild index
	if !sessVars.InTxn() {
		savePresumeKeyNotExist := sessVars.PresumeKeyNotExists
		if !sessVars.ConstraintCheckInPlace && sessVars.TxnCtx.IsPessimistic {
			sessVars.PresumeKeyNotExists = true
		}
		err = t.rebuildIndices(sctx, txn, h, touched, oldData, newData, table.WithCtx(ctx))
		sessVars.PresumeKeyNotExists = savePresumeKeyNotExist
		if err != nil {
			return err
		}
	} else {
		err = t.rebuildIndices(sctx, txn, h, touched, oldData, newData, table.WithCtx(ctx))
		if err != nil {
			return err
		}
	}

	key := t.RecordKey(h)
	sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
	value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, rd)
	if err != nil {
		return err
	}
	if err = memBuffer.Set(key, value); err != nil {
		return err
	}

	failpoint.Inject("updateRecordForceAssertNotExist", func() {
		// Assert the key doesn't exist while it actually exists. This is helpful to test if assertion takes effect.
		// Since only the first assertion takes effect, set the injected assertion before setting the correct one to
		// override it.
		if sctx.GetSessionVars().ConnectionID != 0 {
			logutil.BgLogger().Info("[failpoint] force asserting not exist on UpdateRecord", zap.Uint64("startTS", txn.StartTS()))
			if err = txn.SetAssertion(key, kv.SetAssertNotExist); err != nil {
				failpoint.Return(err)
			}
		}
	})
	if err = txn.SetAssertion(key, kv.SetAssertExist); err != nil {
		return err
	}

	if err = injectMutationError(t, txn, sh); err != nil {
		return err
	}
	if sessVars.EnableMutationChecker {
		if err = CheckDataConsistency(txn, sessVars, t, newData, oldData, memBuffer, sh); err != nil {
			return errors.Trace(err)
		}
	}

	memBuffer.Release(sh)
	if shouldWriteBinlog(sctx, t.meta) {
		if !t.meta.PKIsHandle && !t.meta.IsCommonHandle {
			binlogColIDs = append(binlogColIDs, model.ExtraHandleID)
			binlogOldRow = append(binlogOldRow, types.NewIntDatum(h.IntValue()))
			binlogNewRow = append(binlogNewRow, types.NewIntDatum(h.IntValue()))
		}
		err = t.addUpdateBinlog(sctx, binlogOldRow, binlogNewRow, binlogColIDs)
		if err != nil {
			return err
		}
	}
	colSize := make(map[int64]int64, len(t.Cols()))
	for id, col := range t.Cols() {
		size, err := codec.EstimateValueSize(sc, newData[id])
		if err != nil {
			continue
		}
		newLen := size - 1
		size, err = codec.EstimateValueSize(sc, oldData[id])
		if err != nil {
			continue
		}
		oldLen := size - 1
		colSize[col.ID] = int64(newLen - oldLen)
	}
	sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 0, 1, colSize)
	return nil
}

func (t *TableCommon) rebuildIndices(ctx sessionctx.Context, txn kv.Transaction, h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum, opts ...table.CreateIdxOptFunc) error {
	for _, idx := range t.deletableIndices() {
		if t.meta.IsCommonHandle && idx.Meta().Primary {
			continue
		}
		for _, ic := range idx.Meta().Columns {
			if !touched[ic.Offset] {
				continue
			}
			oldVs, err := idx.FetchValues(oldData, nil)
			if err != nil {
				return err
			}
			if err = t.removeRowIndex(ctx.GetSessionVars().StmtCtx, h, oldVs, idx, txn); err != nil {
				return err
			}
			break
		}
	}
	for _, idx := range t.Indices() {
		if !IsIndexWritable(idx) {
			continue
		}
		if t.meta.IsCommonHandle && idx.Meta().Primary {
			continue
		}
		untouched := true
		for _, ic := range idx.Meta().Columns {
			if !touched[ic.Offset] {
				continue
			}
			untouched = false
			break
		}
		// If txn is auto commit and index is untouched, no need to write index value.
		if untouched && !ctx.GetSessionVars().InTxn() {
			continue
		}
		newVs, err := idx.FetchValues(newData, nil)
		if err != nil {
			return err
		}
		if err := t.buildIndexForRow(ctx, h, newVs, newData, idx, txn, untouched, opts...); err != nil {
			return err
		}
	}
	return nil
}

// adjustRowValuesBuf adjust writeBufs.AddRowValues length, AddRowValues stores the inserting values that is used
// by tablecodec.EncodeRow, the encoded row format is `id1, colval, id2, colval`, so the correct length is rowLen * 2. If
// the inserting row has null value, AddRecord will skip it, so the rowLen will be different, so we need to adjust it.
func adjustRowValuesBuf(writeBufs *variable.WriteStmtBufs, rowLen int) {
	adjustLen := rowLen * 2
	if writeBufs.AddRowValues == nil || cap(writeBufs.AddRowValues) < adjustLen {
		writeBufs.AddRowValues = make([]types.Datum, adjustLen)
	}
	writeBufs.AddRowValues = writeBufs.AddRowValues[:adjustLen]
}

// FindPrimaryIndex uses to find primary index in tableInfo.
func FindPrimaryIndex(tblInfo *model.TableInfo) *model.IndexInfo {
	var pkIdx *model.IndexInfo
	for _, idx := range tblInfo.Indices {
		if idx.Primary {
			pkIdx = idx
			break
		}
	}
	return pkIdx
}

// CommonAddRecordCtx is used in `AddRecord` to avoid memory malloc for some temp slices.
// This is useful in lightning parse row data to key-values pairs. This can gain upto 5%  performance
// improvement in lightning's local mode.
type CommonAddRecordCtx struct {
	colIDs []int64
	row    []types.Datum
}

// commonAddRecordKey is used as key in `sessionctx.Context.Value(key)`
type commonAddRecordKey struct{}

// String implement `stringer.String` for CommonAddRecordKey
func (c commonAddRecordKey) String() string {
	return "_common_add_record_context_key"
}

// addRecordCtxKey is key in `sessionctx.Context` for CommonAddRecordCtx
var addRecordCtxKey = commonAddRecordKey{}

// SetAddRecordCtx set a CommonAddRecordCtx to session context
func SetAddRecordCtx(ctx sessionctx.Context, r *CommonAddRecordCtx) {
	ctx.SetValue(addRecordCtxKey, r)
}

// ClearAddRecordCtx remove `CommonAddRecordCtx` from session context
func ClearAddRecordCtx(ctx sessionctx.Context) {
	ctx.ClearValue(addRecordCtxKey)
}

// NewCommonAddRecordCtx create a context used for `AddRecord`
func NewCommonAddRecordCtx(size int) *CommonAddRecordCtx {
	return &CommonAddRecordCtx{
		colIDs: make([]int64, 0, size),
		row:    make([]types.Datum, 0, size),
	}
}

// TryGetCommonPkColumnIds get the IDs of primary key column if the table has common handle.
func TryGetCommonPkColumnIds(tbl *model.TableInfo) []int64 {
	if !tbl.IsCommonHandle {
		return nil
	}
	pkIdx := FindPrimaryIndex(tbl)
	pkColIds := make([]int64, 0, len(pkIdx.Columns))
	for _, idxCol := range pkIdx.Columns {
		pkColIds = append(pkColIds, tbl.Columns[idxCol.Offset].ID)
	}
	return pkColIds
}

// PrimaryPrefixColumnIDs get prefix column ids in primary key.
func PrimaryPrefixColumnIDs(tbl *model.TableInfo) (prefixCols []int64) {
	for _, idx := range tbl.Indices {
		if !idx.Primary {
			continue
		}
		for _, col := range idx.Columns {
			if col.Length > 0 && tbl.Columns[col.Offset].GetFlen() > col.Length {
				prefixCols = append(prefixCols, tbl.Columns[col.Offset].ID)
			}
		}
	}
	return
}

// TryGetCommonPkColumns get the primary key columns if the table has common handle.
func TryGetCommonPkColumns(tbl table.Table) []*table.Column {
	if !tbl.Meta().IsCommonHandle {
		return nil
	}
	pkIdx := FindPrimaryIndex(tbl.Meta())
	cols := tbl.Cols()
	pkCols := make([]*table.Column, 0, len(pkIdx.Columns))
	for _, idxCol := range pkIdx.Columns {
		pkCols = append(pkCols, cols[idxCol.Offset])
	}
	return pkCols
}

func addTemporaryTable(sctx sessionctx.Context, tblInfo *model.TableInfo) tableutil.TempTable {
	tempTable := sctx.GetSessionVars().GetTemporaryTable(tblInfo)
	tempTable.SetModified(true)
	return tempTable
}

// The size of a temporary table is calculated by accumulating the transaction size delta.
func handleTempTableSize(t tableutil.TempTable, txnSizeBefore int, txn kv.Transaction) {
	txnSizeNow := txn.Size()
	delta := txnSizeNow - txnSizeBefore

	oldSize := t.GetSize()
	newSize := oldSize + int64(delta)
	t.SetSize(newSize)
}

func checkTempTableSize(ctx sessionctx.Context, tmpTable tableutil.TempTable, tblInfo *model.TableInfo) error {
	tmpTableSize := tmpTable.GetSize()
	if tempTableData := ctx.GetSessionVars().TemporaryTableData; tempTableData != nil {
		tmpTableSize += tempTableData.GetTableSize(tblInfo.ID)
	}

	if tmpTableSize > ctx.GetSessionVars().TMPTableSize {
		return table.ErrTempTableFull.GenWithStackByArgs(tblInfo.Name.O)
	}

	return nil
}

// AddRecord implements table.Table AddRecord interface.
func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
	txn, err := sctx.Txn(true)
	if err != nil {
		return nil, err
	}

	var opt table.AddRecordOpt
	for _, fn := range opts {
		fn.ApplyOn(&opt)
	}

	if m := t.Meta(); m.TempTableType != model.TempTableNone {
		if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil {
			if err := checkTempTableSize(sctx, tmpTable, m); err != nil {
				return nil, err
			}
			defer handleTempTableSize(tmpTable, txn.Size(), txn)
		}
	}

	var ctx context.Context
	if opt.Ctx != nil {
		ctx = opt.Ctx
		if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
			span1 := span.Tracer().StartSpan("table.AddRecord", opentracing.ChildOf(span.Context()))
			defer span1.Finish()
			ctx = opentracing.ContextWithSpan(ctx, span1)
		}
	} else {
		ctx = context.Background()
	}
	var hasRecordID bool
	cols := t.Cols()
	// opt.IsUpdate is a flag for update.
	// If handle ID is changed when update, update will remove the old record first, and then call `AddRecord` to add a new record.
	// Currently, only insert can set _tidb_rowid, update can not update _tidb_rowid.
	if len(r) > len(cols) && !opt.IsUpdate {
		// The last value is _tidb_rowid.
		recordID = kv.IntHandle(r[len(r)-1].GetInt64())
		hasRecordID = true
	} else {
		tblInfo := t.Meta()
		txn.CacheTableInfo(t.physicalTableID, tblInfo)
		if tblInfo.PKIsHandle {
			recordID = kv.IntHandle(r[tblInfo.GetPkColInfo().Offset].GetInt64())
			hasRecordID = true
		} else if tblInfo.IsCommonHandle {
			pkIdx := FindPrimaryIndex(tblInfo)
			pkDts := make([]types.Datum, 0, len(pkIdx.Columns))
			for _, idxCol := range pkIdx.Columns {
				pkDts = append(pkDts, r[idxCol.Offset])
			}
			tablecodec.TruncateIndexValues(tblInfo, pkIdx, pkDts)
			var handleBytes []byte
			handleBytes, err = codec.EncodeKey(sctx.GetSessionVars().StmtCtx, nil, pkDts...)
			if err != nil {
				return
			}
			recordID, err = kv.NewCommonHandle(handleBytes)
			if err != nil {
				return
			}
			hasRecordID = true
		}
	}
	if !hasRecordID {
		if opt.ReserveAutoID > 0 {
			// Reserve a batch of auto ID in the statement context.
			// The reserved ID could be used in the future within this statement, by the
			// following AddRecord() operation.
			// Make the IDs continuous benefit for the performance of TiKV.
			stmtCtx := sctx.GetSessionVars().StmtCtx
			stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = allocHandleIDs(ctx, sctx, t, uint64(opt.ReserveAutoID))
			if err != nil {
				return nil, err
			}
		}

		recordID, err = AllocHandle(ctx, sctx, t)
		if err != nil {
			return nil, err
		}
	}

	var colIDs, binlogColIDs []int64
	var row, binlogRow []types.Datum
	if recordCtx, ok := sctx.Value(addRecordCtxKey).(*CommonAddRecordCtx); ok {
		colIDs = recordCtx.colIDs[:0]
		row = recordCtx.row[:0]
	} else {
		colIDs = make([]int64, 0, len(r))
		row = make([]types.Datum, 0, len(r))
	}
	memBuffer := txn.GetMemBuffer()
	sh := memBuffer.Staging()
	defer memBuffer.Cleanup(sh)

	sessVars := sctx.GetSessionVars()

	for _, col := range t.WritableCols() {
		var value types.Datum
		// In column type change, since we have set the origin default value for changing col, but
		// for the new insert statement, we should use the casted value of relative column to insert.
		if col.ChangeStateInfo != nil && col.State != model.StatePublic {
			// TODO: Check overflow or ignoreTruncate.
			value, err = table.CastValue(sctx, r[col.DependencyColumnOffset], col.ColumnInfo, false, false)
			if err != nil {
				return nil, err
			}
			if len(r) < len(t.WritableCols()) {
				r = append(r, value)
			} else {
				r[col.Offset] = value
			}
			row = append(row, value)
			colIDs = append(colIDs, col.ID)
			continue
		}
		if col.State != model.StatePublic &&
			// Update call `AddRecord` will already handle the write only column default value.
			// Only insert should add default value for write only column.
			!opt.IsUpdate {
			// If col is in write only or write reorganization state, we must add it with its default value.
			value, err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
			if err != nil {
				return nil, err
			}
			// add value to `r` for dirty db in transaction.
			// Otherwise when update will panic cause by get value of column in write only state from dirty db.
			if col.Offset < len(r) {
				r[col.Offset] = value
			} else {
				r = append(r, value)
			}
		} else {
			value = r[col.Offset]
		}
		if !t.canSkip(col, &value) {
			colIDs = append(colIDs, col.ID)
			row = append(row, value)
		}
	}

	writeBufs := sessVars.GetWriteStmtBufs()
	adjustRowValuesBuf(writeBufs, len(row))
	key := t.RecordKey(recordID)
	logutil.BgLogger().Debug("addRecord",
		zap.Stringer("key", key))
	sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
	writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd)
	if err != nil {
		return nil, err
	}
	value := writeBufs.RowValBuf

	var setPresume bool
	if !sctx.GetSessionVars().StmtCtx.BatchCheck {
		if t.meta.TempTableType != model.TempTableNone {
			// Always check key for temporary table because it does not write to TiKV
			_, err = txn.Get(ctx, key)
		} else if sctx.GetSessionVars().LazyCheckKeyNotExists() {
			var v []byte
			v, err = txn.GetMemBuffer().Get(ctx, key)
			if err != nil {
				setPresume = true
			}
			if err == nil && len(v) == 0 {
				err = kv.ErrNotExist
			}
		} else {
			_, err = txn.Get(ctx, key)
		}
		if err == nil {
			handleStr := getDuplicateErrorHandleString(t, recordID, r)
			return recordID, kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
		} else if !kv.ErrNotExist.Equal(err) {
			return recordID, err
		}
	}

	if setPresume {
		flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists}
		if !sessVars.ConstraintCheckInPlacePessimistic && sessVars.TxnCtx.IsPessimistic && sessVars.InTxn() {
			flags = append(flags, kv.SetNeedConstraintCheckInPrewrite)
		}
		err = memBuffer.SetWithFlags(key, value, flags...)
	} else {
		err = memBuffer.Set(key, value)
	}
	if err != nil {
		return nil, err
	}

	failpoint.Inject("addRecordForceAssertExist", func() {
		// Assert the key exists while it actually doesn't. This is helpful to test if assertion takes effect.
		// Since only the first assertion takes effect, set the injected assertion before setting the correct one to
		// override it.
		if sctx.GetSessionVars().ConnectionID != 0 {
			logutil.BgLogger().Info("[failpoint] force asserting exist on AddRecord", zap.Uint64("startTS", txn.StartTS()))
			if err = txn.SetAssertion(key, kv.SetAssertExist); err != nil {
				failpoint.Return(nil, err)
			}
		}
	})
	if setPresume && !txn.IsPessimistic() {
		err = txn.SetAssertion(key, kv.SetAssertUnknown)
	} else {
		err = txn.SetAssertion(key, kv.SetAssertNotExist)
	}
	if err != nil {
		return nil, err
	}

	var createIdxOpts []table.CreateIdxOptFunc
	if len(opts) > 0 {
		createIdxOpts = make([]table.CreateIdxOptFunc, 0, len(opts))
		for _, fn := range opts {
			if raw, ok := fn.(table.CreateIdxOptFunc); ok {
				createIdxOpts = append(createIdxOpts, raw)
			}
		}
	}
	// Insert new entries into indices.
	h, err := t.addIndices(sctx, recordID, r, txn, createIdxOpts)
	if err != nil {
		return h, err
	}

	if err = injectMutationError(t, txn, sh); err != nil {
		return nil, err
	}
	if sessVars.EnableMutationChecker {
		if err = CheckDataConsistency(txn, sessVars, t, r, nil, memBuffer, sh); err != nil {
			return nil, errors.Trace(err)
		}
	}

	memBuffer.Release(sh)

	if shouldWriteBinlog(sctx, t.meta) {
		// For insert, TiDB and Binlog can use same row and schema.
		binlogRow = row
		binlogColIDs = colIDs
		err = t.addInsertBinlog(sctx, recordID, binlogRow, binlogColIDs)
		if err != nil {
			return nil, err
		}
	}
	if sessVars.TxnCtx == nil {
		return recordID, nil
	}
	colSize := make(map[int64]int64, len(r))
	for id, col := range t.Cols() {
		size, err := codec.EstimateValueSize(sc, r[id])
		if err != nil {
			continue
		}
		colSize[col.ID] = int64(size) - 1
	}
	sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 1, 1, colSize)
	return recordID, nil
}

// genIndexKeyStr generates index content string representation.
func genIndexKeyStr(colVals []types.Datum) (string, error) {
	// Pass pre-composed error to txn.
	strVals := make([]string, 0, len(colVals))
	for _, cv := range colVals {
		cvs := "NULL"
		var err error
		if !cv.IsNull() {
			cvs, err = types.ToString(cv.GetValue())
			if err != nil {
				return "", err
			}
		}
		strVals = append(strVals, cvs)
	}
	return strings.Join(strVals, "-"), nil
}

// addIndices adds data into indices. If any key is duplicated, returns the original handle.
func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r []types.Datum, txn kv.Transaction, opts []table.CreateIdxOptFunc) (kv.Handle, error) {
	writeBufs := sctx.GetSessionVars().GetWriteStmtBufs()
	indexVals := writeBufs.IndexValsBuf
	skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck
	for _, v := range t.Indices() {
		if !IsIndexWritable(v) {
			continue
		}
		if t.meta.IsCommonHandle && v.Meta().Primary {
			continue
		}
		indexVals, err := v.FetchValues(r, indexVals)
		if err != nil {
			return nil, err
		}
		var dupErr error
		if !skipCheck && v.Meta().Unique {
			entryKey, err := genIndexKeyStr(indexVals)
			if err != nil {
				return nil, err
			}
			idxMeta := v.Meta()
			dupErr = kv.ErrKeyExists.FastGenByArgs(entryKey, idxMeta.Name.String())
		}
		rsData := TryGetHandleRestoredDataWrapper(t, r, nil, v.Meta())
		if dupHandle, err := v.Create(sctx, txn, indexVals, recordID, rsData, opts...); err != nil {
			if kv.ErrKeyExists.Equal(err) {
				return dupHandle, dupErr
			}
			return nil, err
		}
	}
	// save the buffer, multi rows insert can use it.
	writeBufs.IndexValsBuf = indexVals
	return nil, nil
}

// RowWithCols is used to get the corresponding column datum values with the given handle.
func RowWithCols(t table.Table, ctx sessionctx.Context, h kv.Handle, cols []*table.Column) ([]types.Datum, error) {
	// Get raw row data from kv.
	key := tablecodec.EncodeRecordKey(t.RecordPrefix(), h)
	txn, err := ctx.Txn(true)
	if err != nil {
		return nil, err
	}
	value, err := txn.Get(context.TODO(), key)
	if err != nil {
		return nil, err
	}
	v, _, err := DecodeRawRowData(ctx, t.Meta(), h, cols, value)
	if err != nil {
		return nil, err
	}
	return v, nil
}

func containFullColInHandle(meta *model.TableInfo, col *table.Column) (containFullCol bool, idxInHandle int) {
	pkIdx := FindPrimaryIndex(meta)
	for i, idxCol := range pkIdx.Columns {
		if meta.Columns[idxCol.Offset].ID == col.ID {
			idxInHandle = i
			containFullCol = idxCol.Length == types.UnspecifiedLength
			return
		}
	}
	return
}

// DecodeRawRowData decodes raw row data into a datum slice and a (columnID:columnValue) map.
func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h kv.Handle, cols []*table.Column,
	value []byte) ([]types.Datum, map[int64]types.Datum, error) {
	v := make([]types.Datum, len(cols))
	colTps := make(map[int64]*types.FieldType, len(cols))
	prefixCols := make(map[int64]struct{})
	for i, col := range cols {
		if col == nil {
			continue
		}
		if col.IsPKHandleColumn(meta) {
			if mysql.HasUnsignedFlag(col.GetFlag()) {
				v[i].SetUint64(uint64(h.IntValue()))
			} else {
				v[i].SetInt64(h.IntValue())
			}
			continue
		}
		if col.IsCommonHandleColumn(meta) && !types.NeedRestoredData(&col.FieldType) {
			if containFullCol, idxInHandle := containFullColInHandle(meta, col); containFullCol {
				dtBytes := h.EncodedCol(idxInHandle)
				_, dt, err := codec.DecodeOne(dtBytes)
				if err != nil {
					return nil, nil, err
				}
				dt, err = tablecodec.Unflatten(dt, &col.FieldType, ctx.GetSessionVars().Location())
				if err != nil {
					return nil, nil, err
				}
				v[i] = dt
				continue
			}
			prefixCols[col.ID] = struct{}{}
		}
		colTps[col.ID] = &col.FieldType
	}
	rowMap, err := tablecodec.DecodeRowToDatumMap(value, colTps, ctx.GetSessionVars().Location())
	if err != nil {
		return nil, rowMap, err
	}
	defaultVals := make([]types.Datum, len(cols))
	for i, col := range cols {
		if col == nil {
			continue
		}
		if col.IsPKHandleColumn(meta) || (col.IsCommonHandleColumn(meta) && !types.NeedRestoredData(&col.FieldType)) {
			if _, isPrefix := prefixCols[col.ID]; !isPrefix {
				continue
			}
		}
		ri, ok := rowMap[col.ID]
		if ok {
			v[i] = ri
			continue
		}
		if col.IsGenerated() && !col.GeneratedStored {
			continue
		}
		if col.ChangeStateInfo != nil {
			v[i], _, err = GetChangingColVal(ctx, cols, col, rowMap, defaultVals)
		} else {
			v[i], err = GetColDefaultValue(ctx, col, defaultVals)
		}
		if err != nil {
			return nil, rowMap, err
		}
	}
	return v, rowMap, nil
}

// GetChangingColVal gets the changing column value when executing "modify/change column" statement.
// For statement like update-where, it will fetch the old row out and insert it into kv again.
// Since update statement can see the writable columns, it is responsible for the casting relative column / get the fault value here.
// old row : a-b-[nil]
// new row : a-b-[a'/default]
// Thus the writable new row is corresponding to Write-Only constraints.
func GetChangingColVal(ctx sessionctx.Context, cols []*table.Column, col *table.Column, rowMap map[int64]types.Datum, defaultVals []types.Datum) (_ types.Datum, isDefaultVal bool, err error) {
	relativeCol := cols[col.ChangeStateInfo.DependencyColumnOffset]
	idxColumnVal, ok := rowMap[relativeCol.ID]
	if ok {
		idxColumnVal, err = table.CastValue(ctx, idxColumnVal, col.ColumnInfo, false, false)
		// TODO: Consider sql_mode and the error msg(encounter this error check whether to rollback).
		if err != nil {
			return idxColumnVal, false, errors.Trace(err)
		}
		return idxColumnVal, false, nil
	}

	idxColumnVal, err = GetColDefaultValue(ctx, col, defaultVals)
	if err != nil {
		return idxColumnVal, false, errors.Trace(err)
	}

	return idxColumnVal, true, nil
}

// RemoveRecord implements table.Table RemoveRecord interface.
func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error {
	txn, err := ctx.Txn(true)
	if err != nil {
		return err
	}

	memBuffer := txn.GetMemBuffer()
	sh := memBuffer.Staging()
	defer memBuffer.Cleanup(sh)

	err = t.removeRowData(ctx, h)
	if err != nil {
		return err
	}

	if m := t.Meta(); m.TempTableType != model.TempTableNone {
		if tmpTable := addTemporaryTable(ctx, m); tmpTable != nil {
			if err := checkTempTableSize(ctx, tmpTable, m); err != nil {
				return err
			}
			defer handleTempTableSize(tmpTable, txn.Size(), txn)
		}
	}

	// The table has non-public column and this column is doing the operation of "modify/change column".
	if len(t.Columns) > len(r) && t.Columns[len(r)].ChangeStateInfo != nil {
		// The changing column datum derived from related column should be casted here.
		// Otherwise, the existed changing indexes will not be deleted.
		relatedColDatum := r[t.Columns[len(r)].ChangeStateInfo.DependencyColumnOffset]
		value, err := table.CastValue(ctx, relatedColDatum, t.Columns[len(r)].ColumnInfo, false, false)
		if err != nil {
			logutil.BgLogger().Info("remove record cast value failed", zap.Any("col", t.Columns[len(r)]),
				zap.String("handle", h.String()), zap.Any("val", relatedColDatum), zap.Error(err))
			return err
		}
		r = append(r, value)
	}
	err = t.removeRowIndices(ctx, h, r)
	if err != nil {
		return err
	}

	sessVars := ctx.GetSessionVars()
	sc := sessVars.StmtCtx
	if err = injectMutationError(t, txn, sh); err != nil {
		return err
	}
	if sessVars.EnableMutationChecker {
		if err = CheckDataConsistency(txn, sessVars, t, nil, r, memBuffer, sh); err != nil {
			return errors.Trace(err)
		}
	}
	memBuffer.Release(sh)

	if shouldWriteBinlog(ctx, t.meta) {
		cols := t.Cols()
		colIDs := make([]int64, 0, len(cols)+1)
		for _, col := range cols {
			colIDs = append(colIDs, col.ID)
		}
		var binlogRow []types.Datum
		if !t.meta.PKIsHandle && !t.meta.IsCommonHandle {
			colIDs = append(colIDs, model.ExtraHandleID)
			binlogRow = make([]types.Datum, 0, len(r)+1)
			binlogRow = append(binlogRow, r...)
			handleData, err := h.Data()
			if err != nil {
				return err
			}
			binlogRow = append(binlogRow, handleData...)
		} else {
			binlogRow = r
		}
		err = t.addDeleteBinlog(ctx, binlogRow, colIDs)
	}
	if ctx.GetSessionVars().TxnCtx == nil {
		return nil
	}
	colSize := make(map[int64]int64, len(t.Cols()))
	for id, col := range t.Cols() {
		size, err := codec.EstimateValueSize(sc, r[id])
		if err != nil {
			continue
		}
		colSize[col.ID] = -int64(size - 1)
	}
	ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(t.physicalTableID, -1, 1, colSize)
	return err
}

func (t *TableCommon) addInsertBinlog(ctx sessionctx.Context, h kv.Handle, row []types.Datum, colIDs []int64) error {
	mutation := t.getMutation(ctx)
	handleData, err := h.Data()
	if err != nil {
		return err
	}
	pk, err := codec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, handleData...)
	if err != nil {
		return err
	}
	value, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil)
	if err != nil {
		return err
	}
	bin := append(pk, value...)
	mutation.InsertedRows = append(mutation.InsertedRows, bin)
	mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Insert)
	return nil
}

func (t *TableCommon) addUpdateBinlog(ctx sessionctx.Context, oldRow, newRow []types.Datum, colIDs []int64) error {
	old, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, oldRow, colIDs, nil, nil)
	if err != nil {
		return err
	}
	newVal, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, newRow, colIDs, nil, nil)
	if err != nil {
		return err
	}
	bin := append(old, newVal...)
	mutation := t.getMutation(ctx)
	mutation.UpdatedRows = append(mutation.UpdatedRows, bin)
	mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Update)
	return nil
}

func (t *TableCommon) addDeleteBinlog(ctx sessionctx.Context, r []types.Datum, colIDs []int64) error {
	data, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, r, colIDs, nil, nil)
	if err != nil {
		return err
	}
	mutation := t.getMutation(ctx)
	mutation.DeletedRows = append(mutation.DeletedRows, data)
	mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeleteRow)
	return nil
}

func writeSequenceUpdateValueBinlog(sctx sessionctx.Context, db, sequence string, end int64) error {
	// 1: when sequenceCommon update the local cache passively.
	// 2: When sequenceCommon setval to the allocator actively.
	// Both of this two case means the upper bound the sequence has changed in meta, which need to write the binlog
	// to the downstream.
	// Sequence sends `select setval(seq, num)` sql string to downstream via `setDDLBinlog`, which is mocked as a DDL binlog.
	binlogCli := sctx.GetSessionVars().BinlogClient
	sqlMode := sctx.GetSessionVars().SQLMode
	sequenceFullName := stringutil.Escape(db, sqlMode) + "." + stringutil.Escape(sequence, sqlMode)
	sql := "select setval(" + sequenceFullName + ", " + strconv.FormatInt(end, 10) + ")"

	ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta)
	err := kv.RunInNewTxn(ctx, sctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
		m := meta.NewMeta(txn)
		mockJobID, err := m.GenGlobalID()
		if err != nil {
			return err
		}
		binloginfo.SetDDLBinlog(binlogCli, txn, mockJobID, int32(model.StatePublic), sql)
		return nil
	})
	return err
}

func (t *TableCommon) removeRowData(ctx sessionctx.Context, h kv.Handle) error {
	// Remove row data.
	txn, err := ctx.Txn(true)
	if err != nil {
		return err
	}

	key := t.RecordKey(h)
	failpoint.Inject("removeRecordForceAssertNotExist", func() {
		// Assert the key doesn't exist while it actually exists. This is helpful to test if assertion takes effect.
		// Since only the first assertion takes effect, set the injected assertion before setting the correct one to
		// override it.
		if ctx.GetSessionVars().ConnectionID != 0 {
			logutil.BgLogger().Info("[failpoint] force asserting not exist on RemoveRecord", zap.Uint64("startTS", txn.StartTS()))
			if err = txn.SetAssertion(key, kv.SetAssertNotExist); err != nil {
				failpoint.Return(err)
			}
		}
	})
	err = txn.SetAssertion(key, kv.SetAssertExist)
	if err != nil {
		return err
	}
	return txn.Delete(key)
}

// removeRowIndices removes all the indices of a row.
func (t *TableCommon) removeRowIndices(ctx sessionctx.Context, h kv.Handle, rec []types.Datum) error {
	txn, err := ctx.Txn(true)
	if err != nil {
		return err
	}
	for _, v := range t.deletableIndices() {
		if v.Meta().Primary && (t.Meta().IsCommonHandle || t.Meta().PKIsHandle) {
			continue
		}
		vals, err := v.FetchValues(rec, nil)
		if err != nil {
			logutil.BgLogger().Info("remove row index failed", zap.Any("index", v.Meta()), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("handle", h.String()), zap.Any("record", rec), zap.Error(err))
			return err
		}
		if err = v.Delete(ctx.GetSessionVars().StmtCtx, txn, vals, h); err != nil {
			if v.Meta().State != model.StatePublic && kv.ErrNotExist.Equal(err) {
				// If the index is not in public state, we may have not created the index,
				// or already deleted the index, so skip ErrNotExist error.
				logutil.BgLogger().Debug("row index not exists", zap.Any("index", v.Meta()), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("handle", h.String()))
				continue
			}
			return err
		}
	}
	return nil
}

// removeRowIndex implements table.Table RemoveRowIndex interface.
func (t *TableCommon) removeRowIndex(sc *stmtctx.StatementContext, h kv.Handle, vals []types.Datum, idx table.Index, txn kv.Transaction) error {
	return idx.Delete(sc, txn, vals, h)
}

// buildIndexForRow implements table.Table BuildIndexForRow interface.
func (t *TableCommon) buildIndexForRow(ctx sessionctx.Context, h kv.Handle, vals []types.Datum, newData []types.Datum, idx table.Index, txn kv.Transaction, untouched bool, popts ...table.CreateIdxOptFunc) error {
	var opts []table.CreateIdxOptFunc
	opts = append(opts, popts...)
	if untouched {
		opts = append(opts, table.IndexIsUntouched)
	}
	rsData := TryGetHandleRestoredDataWrapper(t, newData, nil, idx.Meta())
	if _, err := idx.Create(ctx, txn, vals, h, rsData, opts...); err != nil {
		if kv.ErrKeyExists.Equal(err) {
			// Make error message consistent with MySQL.
			entryKey, err1 := genIndexKeyStr(vals)
			if err1 != nil {
				// if genIndexKeyStr failed, return the original error.
				return err
			}

			return kv.ErrKeyExists.FastGenByArgs(entryKey, idx.Meta().Name)
		}
		return err
	}
	return nil
}

// IterRecords iterates records in the table and calls fn.
func IterRecords(t table.Table, ctx sessionctx.Context, cols []*table.Column,
	fn table.RecordIterFunc) error {
	prefix := t.RecordPrefix()
	txn, err := ctx.Txn(true)
	if err != nil {
		return err
	}

	startKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), kv.IntHandle(math.MinInt64))
	it, err := txn.Iter(startKey, prefix.PrefixNext())
	if err != nil {
		return err
	}
	defer it.Close()

	if !it.Valid() {
		return nil
	}

	logutil.BgLogger().Debug("iterate records", zap.ByteString("startKey", startKey), zap.ByteString("key", it.Key()), zap.ByteString("value", it.Value()))

	colMap := make(map[int64]*types.FieldType, len(cols))
	for _, col := range cols {
		colMap[col.ID] = &col.FieldType
	}
	defaultVals := make([]types.Datum, len(cols))
	for it.Valid() && it.Key().HasPrefix(prefix) {
		// first kv pair is row lock information.
		// TODO: check valid lock
		// get row handle
		handle, err := tablecodec.DecodeRowKey(it.Key())
		if err != nil {
			return err
		}
		rowMap, err := tablecodec.DecodeRowToDatumMap(it.Value(), colMap, ctx.GetSessionVars().Location())
		if err != nil {
			return err
		}
		pkIds, decodeLoc := TryGetCommonPkColumnIds(t.Meta()), ctx.GetSessionVars().Location()
		data := make([]types.Datum, len(cols))
		for _, col := range cols {
			if col.IsPKHandleColumn(t.Meta()) {
				if mysql.HasUnsignedFlag(col.GetFlag()) {
					data[col.Offset].SetUint64(uint64(handle.IntValue()))
				} else {
					data[col.Offset].SetInt64(handle.IntValue())
				}
				continue
			} else if mysql.HasPriKeyFlag(col.GetFlag()) {
				data[col.Offset], err = tryDecodeColumnFromCommonHandle(col, handle, pkIds, decodeLoc)
				if err != nil {
					return err
				}
				continue
			}
			if _, ok := rowMap[col.ID]; ok {
				data[col.Offset] = rowMap[col.ID]
				continue
			}
			data[col.Offset], err = GetColDefaultValue(ctx, col, defaultVals)
			if err != nil {
				return err
			}
		}
		more, err := fn(handle, data, cols)
		if !more || err != nil {
			return err
		}

		rk := tablecodec.EncodeRecordKey(t.RecordPrefix(), handle)
		err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
		if err != nil {
			return err
		}
	}

	return nil
}

func tryDecodeColumnFromCommonHandle(col *table.Column, handle kv.Handle, pkIds []int64, decodeLoc *time.Location) (types.Datum, error) {
	for i, hid := range pkIds {
		if hid != col.ID {
			continue
		}
		_, d, err := codec.DecodeOne(handle.EncodedCol(i))
		if err != nil {
			return types.Datum{}, errors.Trace(err)
		}
		if d, err = tablecodec.Unflatten(d, &col.FieldType, decodeLoc); err != nil {
			return types.Datum{}, err
		}
		return d, nil
	}
	return types.Datum{}, nil
}

// GetColDefaultValue gets a column default value.
// The defaultVals is used to avoid calculating the default value multiple times.
func GetColDefaultValue(ctx sessionctx.Context, col *table.Column, defaultVals []types.Datum) (
	colVal types.Datum, err error) {
	if col.GetOriginDefaultValue() == nil && mysql.HasNotNullFlag(col.GetFlag()) {
		return colVal, errors.New("Miss column")
	}
	if defaultVals[col.Offset].IsNull() {
		colVal, err = table.GetColOriginDefaultValue(ctx, col.ToInfo())
		if err != nil {
			return colVal, err
		}
		defaultVals[col.Offset] = colVal
	} else {
		colVal = defaultVals[col.Offset]
	}

	return colVal, nil
}

// AllocHandle allocate a new handle.
// A statement could reserve some ID in the statement context, try those ones first.
func AllocHandle(ctx context.Context, sctx sessionctx.Context, t table.Table) (kv.Handle, error) {
	if sctx != nil {
		if stmtCtx := sctx.GetSessionVars().StmtCtx; stmtCtx != nil {
			// First try to alloc if the statement has reserved auto ID.
			if stmtCtx.BaseRowID < stmtCtx.MaxRowID {
				stmtCtx.BaseRowID++
				return kv.IntHandle(stmtCtx.BaseRowID), nil
			}
		}
	}

	_, rowID, err := allocHandleIDs(ctx, sctx, t, 1)
	return kv.IntHandle(rowID), err
}

func allocHandleIDs(ctx context.Context, sctx sessionctx.Context, t table.Table, n uint64) (int64, int64, error) {
	meta := t.Meta()
	base, maxID, err := t.Allocators(sctx).Get(autoid.RowIDAllocType).Alloc(ctx, n, 1, 1)
	if err != nil {
		return 0, 0, err
	}
	if meta.ShardRowIDBits > 0 {
		shardFmt := autoid.NewShardIDFormat(types.NewFieldType(mysql.TypeLonglong), meta.ShardRowIDBits, autoid.RowIDBitLength)
		// Use max record ShardRowIDBits to check overflow.
		if OverflowShardBits(maxID, meta.MaxShardRowIDBits, autoid.RowIDBitLength, true) {
			// If overflow, the rowID may be duplicated. For examples,
			// t.meta.ShardRowIDBits = 4
			// rowID = 0010111111111111111111111111111111111111111111111111111111111111
			// shard = 0100000000000000000000000000000000000000000000000000000000000000
			// will be duplicated with:
			// rowID = 0100111111111111111111111111111111111111111111111111111111111111
			// shard = 0010000000000000000000000000000000000000000000000000000000000000
			return 0, 0, autoid.ErrAutoincReadFailed
		}
		txnCtx := sctx.GetSessionVars().TxnCtx
		shard := txnCtx.GetCurrentShard(int(n))
		base = shardFmt.Compose(shard, base)
		maxID = shardFmt.Compose(shard, maxID)
	}
	return base, maxID, nil
}

// OverflowShardBits checks whether the recordID overflow `1<<(typeBitsLength-shardRowIDBits-1) -1`.
func OverflowShardBits(recordID int64, shardRowIDBits uint64, typeBitsLength uint64, reservedSignBit bool) bool {
	var signBit uint64
	if reservedSignBit {
		signBit = 1
	}
	mask := (1<<shardRowIDBits - 1) << (typeBitsLength - shardRowIDBits - signBit)
	return recordID&int64(mask) > 0
}

// Allocators implements table.Table Allocators interface.
func (t *TableCommon) Allocators(ctx sessionctx.Context) autoid.Allocators {
	if ctx == nil {
		return t.allocs
	} else if ctx.GetSessionVars().IDAllocator == nil {
		// Use an independent allocator for global temporary tables.
		if t.meta.TempTableType == model.TempTableGlobal {
			if alloc := ctx.GetSessionVars().GetTemporaryTable(t.meta).GetAutoIDAllocator(); alloc != nil {
				return autoid.Allocators{alloc}
			}
			// If the session is not in a txn, for example, in "show create table", use the original allocator.
			// Otherwise the would be a nil pointer dereference.
		}
		return t.allocs
	}

	// Replace the row id allocator with the one in session variables.
	sessAlloc := ctx.GetSessionVars().IDAllocator
	retAllocs := make([]autoid.Allocator, 0, len(t.allocs))
	copy(retAllocs, t.allocs)

	overwritten := false
	for i, a := range retAllocs {
		if a.GetType() == autoid.RowIDAllocType {
			retAllocs[i] = sessAlloc
			overwritten = true
			break
		}
	}
	if !overwritten {
		retAllocs = append(retAllocs, sessAlloc)
	}
	return retAllocs
}

// Type implements table.Table Type interface.
func (t *TableCommon) Type() table.Type {
	return table.NormalTable
}

func shouldWriteBinlog(ctx sessionctx.Context, tblInfo *model.TableInfo) bool {
	failpoint.Inject("forceWriteBinlog", func() {
		// Just to cover binlog related code in this package, since the `BinlogClient` is
		// still nil, mutations won't be written to pump on commit.
		failpoint.Return(true)
	})
	if ctx.GetSessionVars().BinlogClient == nil {
		return false
	}
	if tblInfo.TempTableType != model.TempTableNone {
		return false
	}
	return !ctx.GetSessionVars().InRestrictedSQL
}

func (t *TableCommon) getMutation(ctx sessionctx.Context) *binlog.TableMutation {
	return ctx.StmtGetMutation(t.tableID)
}

func (t *TableCommon) canSkip(col *table.Column, value *types.Datum) bool {
	return CanSkip(t.Meta(), col, value)
}

// CanSkip is for these cases, we can skip the columns in encoded row:
// 1. the column is included in primary key;
// 2. the column's default value is null, and the value equals to that but has no origin default;
// 3. the column is virtual generated.
func CanSkip(info *model.TableInfo, col *table.Column, value *types.Datum) bool {
	if col.IsPKHandleColumn(info) {
		return true
	}
	if col.IsCommonHandleColumn(info) {
		pkIdx := FindPrimaryIndex(info)
		for _, idxCol := range pkIdx.Columns {
			if info.Columns[idxCol.Offset].ID != col.ID {
				continue
			}
			canSkip := idxCol.Length == types.UnspecifiedLength
			canSkip = canSkip && !types.NeedRestoredData(&col.FieldType)
			return canSkip
		}
	}
	if col.GetDefaultValue() == nil && value.IsNull() && col.GetOriginDefaultValue() == nil {
		return true
	}
	if col.IsGenerated() && !col.GeneratedStored {
		return true
	}
	return false
}

// canSkipUpdateBinlog checks whether the column can be skipped or not.
func (t *TableCommon) canSkipUpdateBinlog(col *table.Column, value types.Datum) bool {
	if col.IsGenerated() && !col.GeneratedStored {
		return true
	}
	return false
}

// FindIndexByColName returns a public table index containing only one column named `name`.
func FindIndexByColName(t table.Table, name string) table.Index {
	for _, idx := range t.Indices() {
		// only public index can be read.
		if idx.Meta().State != model.StatePublic {
			continue
		}

		if len(idx.Meta().Columns) == 1 && strings.EqualFold(idx.Meta().Columns[0].Name.L, name) {
			return idx
		}
	}
	return nil
}

func getDuplicateErrorHandleString(t table.Table, handle kv.Handle, row []types.Datum) string {
	if handle.IsInt() {
		return kv.GetDuplicateErrorHandleString(handle)
	}
	var pk table.Index
	for _, idx := range t.Indices() {
		if idx.Meta().Primary {
			pk = idx
			break
		}
	}
	if pk == nil {
		return kv.GetDuplicateErrorHandleString(handle)
	}
	var err error
	str := make([]string, len(pk.Meta().Columns))
	for i, col := range pk.Meta().Columns {
		str[i], err = row[col.Offset].ToString()
		if err != nil {
			return kv.GetDuplicateErrorHandleString(handle)
		}
	}
	return strings.Join(str, "-")
}

func init() {
	table.TableFromMeta = TableFromMeta
	table.MockTableFromMeta = MockTableFromMeta
	tableutil.TempTableFromMeta = TempTableFromMeta
}

// sequenceCommon cache the sequence value.
// `alter sequence` will invalidate the cached range.
// `setval` will recompute the start position of cached value.
type sequenceCommon struct {
	meta *model.SequenceInfo
	// base < end when increment > 0.
	// base > end when increment < 0.
	end  int64
	base int64
	// round is used to count the cycle times.
	round int64
	mu    sync.RWMutex
}

// GetSequenceBaseEndRound is used in test.
func (s *sequenceCommon) GetSequenceBaseEndRound() (int64, int64, int64) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	return s.base, s.end, s.round
}

// GetSequenceNextVal implements util.SequenceTable GetSequenceNextVal interface.
// Caching the sequence value in table, we can easily be notified with the cache empty,
// and write the binlogInfo in table level rather than in allocator.
func (t *TableCommon) GetSequenceNextVal(ctx interface{}, dbName, seqName string) (nextVal int64, err error) {
	seq := t.sequence
	if seq == nil {
		// TODO: refine the error.
		return 0, errors.New("sequenceCommon is nil")
	}
	seq.mu.Lock()
	defer seq.mu.Unlock()

	err = func() error {
		// Check if need to update the cache batch from storage.
		// Because seq.base is not always the last allocated value (may be set by setval()).
		// So we should try to seek the next value in cache (not just add increment to seq.base).
		var (
			updateCache bool
			offset      int64
			ok          bool
		)
		if seq.base == seq.end {
			// There is no cache yet.
			updateCache = true
		} else {
			// Seek the first valid value in cache.
			offset = seq.getOffset()
			if seq.meta.Increment > 0 {
				nextVal, ok = autoid.SeekToFirstSequenceValue(seq.base, seq.meta.Increment, offset, seq.base, seq.end)
			} else {
				nextVal, ok = autoid.SeekToFirstSequenceValue(seq.base, seq.meta.Increment, offset, seq.end, seq.base)
			}
			if !ok {
				updateCache = true
			}
		}
		if !updateCache {
			return nil
		}
		// Update batch alloc from kv storage.
		sequenceAlloc, err1 := getSequenceAllocator(t.allocs)
		if err1 != nil {
			return err1
		}
		var base, end, round int64
		base, end, round, err1 = sequenceAlloc.AllocSeqCache()
		if err1 != nil {
			return err1
		}
		// Only update local cache when alloc succeed.
		seq.base = base
		seq.end = end
		seq.round = round
		// write sequence binlog to the pumpClient.
		if ctx.(sessionctx.Context).GetSessionVars().BinlogClient != nil {
			err = writeSequenceUpdateValueBinlog(ctx.(sessionctx.Context), dbName, seqName, seq.end)
			if err != nil {
				return err
			}
		}
		// Seek the first valid value in new cache.
		// Offset may have changed cause the round is updated.
		offset = seq.getOffset()
		if seq.meta.Increment > 0 {
			nextVal, ok = autoid.SeekToFirstSequenceValue(seq.base, seq.meta.Increment, offset, seq.base, seq.end)
		} else {
			nextVal, ok = autoid.SeekToFirstSequenceValue(seq.base, seq.meta.Increment, offset, seq.end, seq.base)
		}
		if !ok {
			return errors.New("can't find the first value in sequence cache")
		}
		return nil
	}()
	// Sequence alloc in kv store error.
	if err != nil {
		if err == autoid.ErrAutoincReadFailed {
			return 0, table.ErrSequenceHasRunOut.GenWithStackByArgs(dbName, seqName)
		}
		return 0, err
	}
	seq.base = nextVal
	return nextVal, nil
}

// SetSequenceVal implements util.SequenceTable SetSequenceVal interface.
// The returned bool indicates the newVal is already under the base.
func (t *TableCommon) SetSequenceVal(ctx interface{}, newVal int64, dbName, seqName string) (int64, bool, error) {
	seq := t.sequence
	if seq == nil {
		// TODO: refine the error.
		return 0, false, errors.New("sequenceCommon is nil")
	}
	seq.mu.Lock()
	defer seq.mu.Unlock()

	if seq.meta.Increment > 0 {
		if newVal <= t.sequence.base {
			return 0, true, nil
		}
		if newVal <= t.sequence.end {
			t.sequence.base = newVal
			return newVal, false, nil
		}
	} else {
		if newVal >= t.sequence.base {
			return 0, true, nil
		}
		if newVal >= t.sequence.end {
			t.sequence.base = newVal
			return newVal, false, nil
		}
	}

	// Invalid the current cache.
	t.sequence.base = t.sequence.end

	// Rebase from kv storage.
	sequenceAlloc, err := getSequenceAllocator(t.allocs)
	if err != nil {
		return 0, false, err
	}
	res, alreadySatisfied, err := sequenceAlloc.RebaseSeq(newVal)
	if err != nil {
		return 0, false, err
	}
	if !alreadySatisfied {
		// Write sequence binlog to the pumpClient.
		if ctx.(sessionctx.Context).GetSessionVars().BinlogClient != nil {
			err = writeSequenceUpdateValueBinlog(ctx.(sessionctx.Context), dbName, seqName, seq.end)
			if err != nil {
				return 0, false, err
			}
		}
	}
	// Record the current end after setval succeed.
	// Consider the following case.
	// create sequence seq
	// setval(seq, 100) setval(seq, 50)
	// Because no cache (base, end keep 0), so the second setval won't return NULL.
	t.sequence.base, t.sequence.end = newVal, newVal
	return res, alreadySatisfied, nil
}

// getOffset is used in under GetSequenceNextVal & SetSequenceVal, which mu is locked.
func (s *sequenceCommon) getOffset() int64 {
	offset := s.meta.Start
	if s.meta.Cycle && s.round > 0 {
		if s.meta.Increment > 0 {
			offset = s.meta.MinValue
		} else {
			offset = s.meta.MaxValue
		}
	}
	return offset
}

// GetSequenceID implements util.SequenceTable GetSequenceID interface.
func (t *TableCommon) GetSequenceID() int64 {
	return t.tableID
}

// GetSequenceCommon is used in test to get sequenceCommon.
func (t *TableCommon) GetSequenceCommon() *sequenceCommon {
	return t.sequence
}

// TryGetHandleRestoredDataWrapper tries to get the restored data for handle if needed. The argument can be a slice or a map.
func TryGetHandleRestoredDataWrapper(t table.Table, row []types.Datum, rowMap map[int64]types.Datum, idx *model.IndexInfo) []types.Datum {
	if !collate.NewCollationEnabled() || !t.Meta().IsCommonHandle || t.Meta().CommonHandleVersion == 0 {
		return nil
	}
	rsData := make([]types.Datum, 0, 4)
	pkIdx := FindPrimaryIndex(t.Meta())
	for _, pkIdxCol := range pkIdx.Columns {
		pkCol := t.Meta().Columns[pkIdxCol.Offset]
		if !types.NeedRestoredData(&pkCol.FieldType) {
			continue
		}
		var datum types.Datum
		if len(rowMap) > 0 {
			datum = rowMap[pkCol.ID]
		} else {
			datum = row[pkCol.Offset]
		}
		// Try to truncate index values.
		// Says that primary key(a (8)),
		// For index t(a), don't truncate the value.
		// For index t(a(9)), truncate to a(9).
		// For index t(a(7)), truncate to a(8).
		truncateTargetCol := pkIdxCol
		for _, idxCol := range idx.Columns {
			if idxCol.Offset == pkCol.Offset {
				truncateTargetCol = maxIndexLen(pkIdxCol, idxCol)
				break
			}
		}
		tablecodec.TruncateIndexValue(&datum, truncateTargetCol, pkCol)
		if collate.IsBinCollation(pkCol.GetCollate()) {
			rsData = append(rsData, types.NewIntDatum(stringutil.GetTailSpaceCount(datum.GetString())))
		} else {
			rsData = append(rsData, datum)
		}
	}
	return rsData
}

func maxIndexLen(idxA, idxB *model.IndexColumn) *model.IndexColumn {
	if idxA.Length == types.UnspecifiedLength {
		return idxA
	}
	if idxB.Length == types.UnspecifiedLength {
		return idxB
	}
	if idxA.Length > idxB.Length {
		return idxA
	}
	return idxB
}

func getSequenceAllocator(allocs autoid.Allocators) (autoid.Allocator, error) {
	for _, alloc := range allocs {
		if alloc.GetType() == autoid.SequenceType {
			return alloc, nil
		}
	}
	// TODO: refine the error.
	return nil, errors.New("sequence allocator is nil")
}

// BuildTableScanFromInfos build tipb.TableScan with *model.TableInfo and *model.ColumnInfo.
func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo) *tipb.TableScan {
	pkColIds := TryGetCommonPkColumnIds(tableInfo)
	tsExec := &tipb.TableScan{
		TableId:          tableInfo.ID,
		Columns:          util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle),
		PrimaryColumnIds: pkColIds,
	}
	if tableInfo.IsCommonHandle {
		tsExec.PrimaryPrefixColumnIds = PrimaryPrefixColumnIDs(tableInfo)
	}
	return tsExec
}

// BuildPartitionTableScanFromInfos build tipb.PartitonTableScan with *model.TableInfo and *model.ColumnInfo.
func BuildPartitionTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo, fastScan bool) *tipb.PartitionTableScan {
	pkColIds := TryGetCommonPkColumnIds(tableInfo)
	tsExec := &tipb.PartitionTableScan{
		TableId:          tableInfo.ID,
		Columns:          util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle),
		PrimaryColumnIds: pkColIds,
		IsFastScan:       &fastScan,
	}
	if tableInfo.IsCommonHandle {
		tsExec.PrimaryPrefixColumnIds = PrimaryPrefixColumnIDs(tableInfo)
	}
	return tsExec
}

// TemporaryTable is used to store transaction-specific or session-specific information for global / local temporary tables.
// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions.
type TemporaryTable struct {
	// Whether it's modified in this transaction.
	modified bool
	// The stats of this table. So far it's always pseudo stats.
	stats *statistics.Table
	// The autoID allocator of this table.
	autoIDAllocator autoid.Allocator
	// Table size.
	size int64

	meta *model.TableInfo
}

// TempTableFromMeta builds a TempTable from model.TableInfo.
func TempTableFromMeta(tblInfo *model.TableInfo) tableutil.TempTable {
	return &TemporaryTable{
		modified:        false,
		stats:           statistics.PseudoTable(tblInfo),
		autoIDAllocator: autoid.NewAllocatorFromTempTblInfo(tblInfo),
		meta:            tblInfo,
	}
}

// GetAutoIDAllocator is implemented from TempTable.GetAutoIDAllocator.
func (t *TemporaryTable) GetAutoIDAllocator() autoid.Allocator {
	return t.autoIDAllocator
}

// SetModified is implemented from TempTable.SetModified.
func (t *TemporaryTable) SetModified(modified bool) {
	t.modified = modified
}

// GetModified is implemented from TempTable.GetModified.
func (t *TemporaryTable) GetModified() bool {
	return t.modified
}

// GetStats is implemented from TempTable.GetStats.
func (t *TemporaryTable) GetStats() interface{} {
	return t.stats
}

// GetSize gets the table size.
func (t *TemporaryTable) GetSize() int64 {
	return t.size
}

// SetSize sets the table size.
func (t *TemporaryTable) SetSize(v int64) {
	t.size = v
}

// GetMeta gets the table meta.
func (t *TemporaryTable) GetMeta() *model.TableInfo {
	return t.meta
}

相关信息

tidb 源码目录

相关文章

tidb cache 源码

tidb index 源码

tidb mutation_checker 源码

tidb partition 源码

tidb state_remote 源码

0  赞