tidb point_get 源码

  • 2022-09-19
  • 浏览 (611)

tidb point_get 代码


// Copyright 2018 PingCAP, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//     http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (

	plannercore "github.com/pingcap/tidb/planner/core"

func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
	var err error
	if err = b.validCanReadTemporaryOrCacheTable(p.TblInfo); err != nil {
		b.err = err
		return nil

	if p.Lock && !b.inSelectLockStmt {
		b.inSelectLockStmt = true
		defer func() {
			b.inSelectLockStmt = false

	e := &PointGetExecutor{
		baseExecutor:     newBaseExecutor(b.ctx, p.Schema(), p.ID()),
		txnScope:         b.txnScope,
		readReplicaScope: b.readReplicaScope,
		isStaleness:      b.isStaleness,

	e.base().initCap = 1
	e.base().maxChunkSize = 1

	e.snapshot, err = b.getSnapshot()
	if err != nil {
		b.err = err
		return nil
	if b.ctx.GetSessionVars().IsReplicaReadClosestAdaptive() {
		e.snapshot.SetOption(kv.ReplicaReadAdjuster, newReplicaReadAdjuster(e.ctx, p.GetAvgRowSize()))
	if e.runtimeStats != nil {
		snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
		e.stats = &runtimeStatsWithSnapshot{
			SnapshotRuntimeStats: snapshotStats,
		e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
		b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)

	if p.IndexInfo != nil {
		sctx := b.ctx.GetSessionVars().StmtCtx
		sctx.IndexNames = append(sctx.IndexNames, p.TblInfo.Name.O+":"+p.IndexInfo.Name.O)

	failpoint.Inject("assertPointReplicaOption", func(val failpoint.Value) {
		assertScope := val.(string)
		if e.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && assertScope != e.readReplicaScope {
			panic("point get replica option fail")

	snapshotTS, err := b.getSnapshotTS()
	if err != nil {
		b.err = err
		return nil
	if p.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
		if cacheTable := b.getCacheTable(p.TblInfo, snapshotTS); cacheTable != nil {
			e.snapshot = cacheTableSnapshot{e.snapshot, cacheTable}

	if e.lock {
		b.hasLock = true

	return e

// PointGetExecutor executes point select query.
type PointGetExecutor struct {

	tblInfo          *model.TableInfo
	handle           kv.Handle
	idxInfo          *model.IndexInfo
	partInfo         *model.PartitionDefinition
	idxKey           kv.Key
	handleVal        []byte
	idxVals          []types.Datum
	txnScope         string
	readReplicaScope string
	isStaleness      bool
	txn              kv.Transaction
	snapshot         kv.Snapshot
	done             bool
	lock             bool
	lockWaitTime     int64
	rowDecoder       *rowcodec.ChunkDecoder

	columns []*model.ColumnInfo
	// virtualColumnIndex records all the indices of virtual columns and sort them in definition
	// to make sure we can compute the virtual column in right order.
	virtualColumnIndex []int

	// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
	virtualColumnRetFieldTypes []*types.FieldType

	stats *runtimeStatsWithSnapshot

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan) {
	decoder := NewRowDecoder(e.ctx, p.Schema(), p.TblInfo)
	e.tblInfo = p.TblInfo
	e.handle = p.Handle
	e.idxInfo = p.IndexInfo
	e.idxVals = p.IndexValues
	e.done = false
	if e.tblInfo.TempTableType == model.TempTableNone {
		e.lock = p.Lock
		e.lockWaitTime = p.LockWaitTime
	} else {
		// Temporary table should not do any lock operations
		e.lock = false
		e.lockWaitTime = 0
	e.rowDecoder = decoder
	e.partInfo = p.PartitionInfo
	e.columns = p.Columns

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
func (e *PointGetExecutor) buildVirtualColumnInfo() {
	e.virtualColumnIndex = buildVirtualColumnIndex(e.Schema(), e.columns)
	if len(e.virtualColumnIndex) > 0 {
		e.virtualColumnRetFieldTypes = make([]*types.FieldType, len(e.virtualColumnIndex))
		for i, idx := range e.virtualColumnIndex {
			e.virtualColumnRetFieldTypes[i] = e.schema.Columns[idx].RetType

// Open implements the Executor interface.
func (e *PointGetExecutor) Open(context.Context) error {
	var err error
	e.txn, err = e.ctx.Txn(false)
	if err != nil {
		return err
	if err := e.verifyTxnScope(); err != nil {
		return err
	setOptionForTopSQL(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
	return nil

// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
	if e.runtimeStats != nil && e.snapshot != nil {
		e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
	if e.idxInfo != nil && e.tblInfo != nil {
		actRows := int64(0)
		if e.runtimeStats != nil {
			actRows = e.runtimeStats.GetActRows()
		e.ctx.StoreIndexUsage(e.tblInfo.ID, e.idxInfo.ID, actRows)
	e.done = false
	return nil

// Next implements the Executor interface.
func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
	if e.done {
		return nil
	e.done = true

	var tblID int64
	var err error
	if e.partInfo != nil {
		tblID = e.partInfo.ID
	} else {
		tblID = e.tblInfo.ID
	if e.lock {
	if e.idxInfo != nil {
		if isCommonHandleRead(e.tblInfo, e.idxInfo) {
			handleBytes, err := EncodeUniqueIndexValuesForKey(e.ctx, e.tblInfo, e.idxInfo, e.idxVals)
			if err != nil {
				if kv.ErrNotExist.Equal(err) {
					return nil
				return err
			e.handle, err = kv.NewCommonHandle(handleBytes)
			if err != nil {
				return err
		} else {
			e.idxKey, err = EncodeUniqueIndexKey(e.ctx, e.tblInfo, e.idxInfo, e.idxVals, tblID)
			if err != nil && !kv.ErrNotExist.Equal(err) {
				return err

			// lockNonExistIdxKey indicates the key will be locked regardless of its existence.
			lockNonExistIdxKey := !e.ctx.GetSessionVars().IsPessimisticReadConsistency()
			// Non-exist keys are also locked if the isolation level is not read consistency,
			// lock it before read here, then it's able to read from pessimistic lock cache.
			if lockNonExistIdxKey {
				err = e.lockKeyIfNeeded(ctx, e.idxKey)
				if err != nil {
					return err
				e.handleVal, err = e.get(ctx, e.idxKey)
				if err != nil {
					if !kv.ErrNotExist.Equal(err) {
						return err
			} else {
				if e.lock {
					e.handleVal, err = e.lockKeyIfExists(ctx, e.idxKey)
					if err != nil {
						return err
				} else {
					e.handleVal, err = e.get(ctx, e.idxKey)
					if err != nil {
						if !kv.ErrNotExist.Equal(err) {
							return err

			if len(e.handleVal) == 0 {
				return nil

			// Change the unique index LOCK into PUT record.
			if e.lock {
				if !e.txn.Valid() {
					return kv.ErrInvalidTxn
				memBuffer := e.txn.GetMemBuffer()
				err = memBuffer.Set(e.idxKey, e.handleVal)
				if err != nil {
					return err

			var iv kv.Handle
			iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle)
			if err != nil {
				return err
			e.handle = iv

			// The injection is used to simulate following scenario:
			// 1. Session A create a point get query but pause before second time `GET` kv from backend
			// 2. Session B create an UPDATE query to update the record that will be obtained in step 1
			// 3. Then point get retrieve data from backend after step 2 finished
			// 4. Check the result
			failpoint.InjectContext(ctx, "pointGetRepeatableReadTest-step1", func() {
				if ch, ok := ctx.Value("pointGetRepeatableReadTest").(chan struct{}); ok {
					// Make `UPDATE` continue
				// Wait `UPDATE` finished
				failpoint.InjectContext(ctx, "pointGetRepeatableReadTest-step2", nil)

	key := tablecodec.EncodeRowKeyWithHandle(tblID, e.handle)
	val, err := e.getAndLock(ctx, key)
	if err != nil {
		return err
	if len(val) == 0 {
		if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) &&
			!e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
			return (&consistency.Reporter{
				HandleEncode: func(handle kv.Handle) kv.Key {
					return key
				IndexEncode: func(idxRow *consistency.RecordData) kv.Key {
					return e.idxKey
				Tbl:  e.tblInfo,
				Idx:  e.idxInfo,
				Sctx: e.ctx,
				1, 0,
		return nil
	err = DecodeRowValToChunk(e.base().ctx, e.schema, e.tblInfo, e.handle, val, req, e.rowDecoder)
	if err != nil {
		return err

	err = FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex,
		e.schema, e.columns, e.ctx, req)
	if err != nil {
		return err
	return nil

func (e *PointGetExecutor) getAndLock(ctx context.Context, key kv.Key) (val []byte, err error) {
	if e.ctx.GetSessionVars().IsPessimisticReadConsistency() {
		// Only Lock the existing keys in RC isolation.
		if e.lock {
			val, err = e.lockKeyIfExists(ctx, key)
			if err != nil {
				return nil, err
		} else {
			val, err = e.get(ctx, key)
			if err != nil {
				if !kv.ErrNotExist.Equal(err) {
					return nil, err
				return nil, nil
		return val, nil
	// Lock the key before get in RR isolation, then get will get the value from the cache.
	err = e.lockKeyIfNeeded(ctx, key)
	if err != nil {
		return nil, err
	val, err = e.get(ctx, key)
	if err != nil {
		if !kv.ErrNotExist.Equal(err) {
			return nil, err
		return nil, nil
	return val, nil

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
	_, err := e.lockKeyBase(ctx, key, false)
	return err

// lockKeyIfExists locks the key if needed, but won't lock the key if it doesn't exis.
// Returns the value of the key if the key exist.
func (e *PointGetExecutor) lockKeyIfExists(ctx context.Context, key []byte) ([]byte, error) {
	return e.lockKeyBase(ctx, key, true)

func (e *PointGetExecutor) lockKeyBase(ctx context.Context,
	key []byte,
	LockOnlyIfExists bool) ([]byte, error) {
	if len(key) == 0 {
		return nil, nil

	if e.lock {
		seVars := e.ctx.GetSessionVars()
		lockCtx, err := newLockCtx(e.ctx, e.lockWaitTime, 1)
		if err != nil {
			return nil, err
		lockCtx.LockOnlyIfExists = LockOnlyIfExists
		err = doLockKeys(ctx, e.ctx, lockCtx, key)
		if err != nil {
			return nil, err
		lockCtx.IterateValuesNotLocked(func(k, v []byte) {
			seVars.TxnCtx.SetPessimisticLockCache(k, v)
		if len(e.handleVal) > 0 {
			seVars.TxnCtx.SetPessimisticLockCache(e.idxKey, e.handleVal)
		if LockOnlyIfExists {
			return e.getValueFromLockCtx(ctx, lockCtx, key)

	return nil, nil

func (e *PointGetExecutor) getValueFromLockCtx(ctx context.Context,
	lockCtx *kv.LockCtx,
	key []byte) ([]byte, error) {
	if val, ok := lockCtx.Values[string(key)]; ok {
		if val.Exists {
			return val.Value, nil
		} else if val.AlreadyLocked {
			val, err := e.get(ctx, key)
			if err != nil {
				if !kv.ErrNotExist.Equal(err) {
					return nil, err
				return nil, nil
			return val, nil

	return nil, nil

// get will first try to get from txn buffer, then check the pessimistic lock cache,
// then the store. Kv.ErrNotExist will be returned if key is not found
func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) {
	if len(key) == 0 {
		return nil, kv.ErrNotExist

	var (
		val []byte
		err error

	if e.txn.Valid() && !e.txn.IsReadOnly() {
		// We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be
		// different for pessimistic transaction.
		val, err = e.txn.GetMemBuffer().Get(ctx, key)
		if err == nil {
			return val, err
		if !kv.IsErrNotFound(err) {
			return nil, err
		// key does not exist in mem buffer, check the lock cache
		if e.lock {
			var ok bool
			val, ok = e.ctx.GetSessionVars().TxnCtx.GetKeyInPessimisticLockCache(key)
			if ok {
				return val, nil
		// fallthrough to snapshot get.

	lock := e.tblInfo.Lock
	if lock != nil && (lock.Tp == model.TableLockRead || lock.Tp == model.TableLockReadOnly) {
		if e.ctx.GetSessionVars().EnablePointGetCache {
			cacheDB := e.ctx.GetStore().GetMemCache()
			val, err = cacheDB.UnionGet(ctx, e.tblInfo.ID, e.snapshot, key)
			if err != nil {
				return nil, err
			return val, nil
	// if not read lock or table was unlock then snapshot get
	return e.snapshot.Get(ctx, key)

func (e *PointGetExecutor) verifyTxnScope() error {
	if e.txnScope == "" || e.txnScope == kv.GlobalTxnScope {
		return nil

	var tblID int64
	var tblName string
	var partName string
	is := e.ctx.GetInfoSchema().(infoschema.InfoSchema)
	if e.partInfo != nil {
		tblID = e.partInfo.ID
		tblInfo, _, partInfo := is.FindTableByPartitionID(tblID)
		tblName = tblInfo.Meta().Name.String()
		partName = partInfo.Name.String()
	} else {
		tblID = e.tblInfo.ID
		tblInfo, _ := is.TableByID(tblID)
		tblName = tblInfo.Meta().Name.String()
	valid := distsql.VerifyTxnScope(e.txnScope, tblID, is)
	if valid {
		return nil
	if len(partName) > 0 {
		return dbterror.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
			fmt.Sprintf("table %v's partition %v can not be read by %v txn_scope", tblName, partName, e.txnScope))
	return dbterror.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
		fmt.Sprintf("table %v can not be read by %v txn_scope", tblName, e.txnScope))

// EncodeUniqueIndexKey encodes a unique index key.
func EncodeUniqueIndexKey(ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) {
	encodedIdxVals, err := EncodeUniqueIndexValuesForKey(ctx, tblInfo, idxInfo, idxVals)
	if err != nil {
		return nil, err
	return tablecodec.EncodeIndexSeekKey(tID, idxInfo.ID, encodedIdxVals), nil

// EncodeUniqueIndexValuesForKey encodes unique index values for a key.
func EncodeUniqueIndexValuesForKey(ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum) (_ []byte, err error) {
	sc := ctx.GetSessionVars().StmtCtx
	for i := range idxVals {
		colInfo := tblInfo.Columns[idxInfo.Columns[i].Offset]
		// table.CastValue will append 0x0 if the string value's length is smaller than the BINARY column's length.
		// So we don't use CastValue for string value for now.
		// TODO: merge two if branch.
		if colInfo.GetType() == mysql.TypeString || colInfo.GetType() == mysql.TypeVarString || colInfo.GetType() == mysql.TypeVarchar {
			var str string
			str, err = idxVals[i].ToString()
			idxVals[i].SetString(str, colInfo.FieldType.GetCollate())
		} else if colInfo.GetType() == mysql.TypeEnum && (idxVals[i].Kind() == types.KindString || idxVals[i].Kind() == types.KindBytes || idxVals[i].Kind() == types.KindBinaryLiteral) {
			var str string
			var e types.Enum
			str, err = idxVals[i].ToString()
			if err != nil {
				return nil, kv.ErrNotExist
			e, err = types.ParseEnumName(colInfo.FieldType.GetElems(), str, colInfo.FieldType.GetCollate())
			if err != nil {
				return nil, kv.ErrNotExist
			idxVals[i].SetMysqlEnum(e, colInfo.FieldType.GetCollate())
		} else {
			// If a truncated error or an overflow error is thrown when converting the type of `idxVal[i]` to
			// the type of `colInfo`, the `idxVal` does not exist in the `idxInfo` for sure.
			idxVals[i], err = table.CastValue(ctx, idxVals[i], colInfo, true, false)
			if types.ErrOverflow.Equal(err) || types.ErrDataTooLong.Equal(err) ||
				types.ErrTruncated.Equal(err) || types.ErrTruncatedWrongVal.Equal(err) {
				return nil, kv.ErrNotExist
		if err != nil {
			return nil, err

	encodedIdxVals, err := codec.EncodeKey(sc, nil, idxVals...)
	if err != nil {
		return nil, err
	return encodedIdxVals, nil

// DecodeRowValToChunk decodes row value into chunk checking row format used.
func DecodeRowValToChunk(sctx sessionctx.Context, schema *expression.Schema, tblInfo *model.TableInfo,
	handle kv.Handle, rowVal []byte, chk *chunk.Chunk, rd *rowcodec.ChunkDecoder) error {
	if rowcodec.IsNewFormat(rowVal) {
		return rd.DecodeToChunk(rowVal, handle, chk)
	return decodeOldRowValToChunk(sctx, schema, tblInfo, handle, rowVal, chk)

func decodeOldRowValToChunk(sctx sessionctx.Context, schema *expression.Schema, tblInfo *model.TableInfo, handle kv.Handle,
	rowVal []byte, chk *chunk.Chunk) error {
	pkCols := tables.TryGetCommonPkColumnIds(tblInfo)
	prefixColIDs := tables.PrimaryPrefixColumnIDs(tblInfo)
	colID2CutPos := make(map[int64]int, schema.Len())
	for _, col := range schema.Columns {
		if _, ok := colID2CutPos[col.ID]; !ok {
			colID2CutPos[col.ID] = len(colID2CutPos)
	cutVals, err := tablecodec.CutRowNew(rowVal, colID2CutPos)
	if err != nil {
		return err
	if cutVals == nil {
		cutVals = make([][]byte, len(colID2CutPos))
	decoder := codec.NewDecoder(chk, sctx.GetSessionVars().Location())
	for i, col := range schema.Columns {
		// fill the virtual column value after row calculation
		if col.VirtualExpr != nil {
		ok, err := tryDecodeFromHandle(tblInfo, i, col, handle, chk, decoder, pkCols, prefixColIDs)
		if err != nil {
			return err
		if ok {
		cutPos := colID2CutPos[col.ID]
		if len(cutVals[cutPos]) == 0 {
			colInfo := getColInfoByID(tblInfo, col.ID)
			d, err1 := table.GetColOriginDefaultValue(sctx, colInfo)
			if err1 != nil {
				return err1
			chk.AppendDatum(i, &d)
		_, err = decoder.DecodeOne(cutVals[cutPos], i, col.RetType)
		if err != nil {
			return err
	return nil

func tryDecodeFromHandle(tblInfo *model.TableInfo, schemaColIdx int, col *expression.Column, handle kv.Handle, chk *chunk.Chunk,
	decoder *codec.Decoder, pkCols []int64, prefixColIDs []int64) (bool, error) {
	if tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.RetType.GetFlag()) {
		chk.AppendInt64(schemaColIdx, handle.IntValue())
		return true, nil
	if col.ID == model.ExtraHandleID {
		chk.AppendInt64(schemaColIdx, handle.IntValue())
		return true, nil
	if types.NeedRestoredData(col.RetType) {
		return false, nil
	// Try to decode common handle.
	if mysql.HasPriKeyFlag(col.RetType.GetFlag()) {
		for i, hid := range pkCols {
			if col.ID == hid && notPKPrefixCol(hid, prefixColIDs) {
				_, err := decoder.DecodeOne(handle.EncodedCol(i), schemaColIdx, col.RetType)
				if err != nil {
					return false, errors.Trace(err)
				return true, nil
	return false, nil

func notPKPrefixCol(colID int64, prefixColIDs []int64) bool {
	for _, pCol := range prefixColIDs {
		if pCol == colID {
			return false
	return true

func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
	for _, col := range tbl.Columns {
		if col.ID == colID {
			return col
	return nil

type runtimeStatsWithSnapshot struct {

func (e *runtimeStatsWithSnapshot) String() string {
	if e.SnapshotRuntimeStats != nil {
		return e.SnapshotRuntimeStats.String()
	return ""

// Clone implements the RuntimeStats interface.
func (e *runtimeStatsWithSnapshot) Clone() execdetails.RuntimeStats {
	newRs := &runtimeStatsWithSnapshot{}
	if e.SnapshotRuntimeStats != nil {
		snapshotStats := e.SnapshotRuntimeStats.Clone()
		newRs.SnapshotRuntimeStats = snapshotStats
	return newRs

// Merge implements the RuntimeStats interface.
func (e *runtimeStatsWithSnapshot) Merge(other execdetails.RuntimeStats) {
	tmp, ok := other.(*runtimeStatsWithSnapshot)
	if !ok {
	if tmp.SnapshotRuntimeStats != nil {
		if e.SnapshotRuntimeStats == nil {
			snapshotStats := tmp.SnapshotRuntimeStats.Clone()
			e.SnapshotRuntimeStats = snapshotStats

// Tp implements the RuntimeStats interface.
func (e *runtimeStatsWithSnapshot) Tp() int {
	return execdetails.TpRuntimeStatsWithSnapshot


tidb 源码目录


tidb adapter 源码

tidb admin 源码

tidb admin_plugins 源码

tidb admin_telemetry 源码

tidb aggregate 源码

tidb analyze 源码

tidb analyze_col 源码

tidb analyze_col_v2 源码

tidb analyze_fast 源码

tidb analyze_global_stats 源码

0  赞