tidb readcommitted 源码
tidb readcommitted 代码
// Copyright 2022 PingCAP, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package isolation
import (
plannercore "github.com/pingcap/tidb/planner/core"
tikverr "github.com/tikv/client-go/v2/error"
type stmtState struct {
stmtTS uint64
stmtTSFuture oracle.Future
stmtUseStartTS bool
func (s *stmtState) prepareStmt(useStartTS bool) error {
*s = stmtState{
stmtUseStartTS: useStartTS,
return nil
// PessimisticRCTxnContextProvider provides txn context for isolation level read-committed
type PessimisticRCTxnContextProvider struct {
latestOracleTS uint64
// latestOracleTSValid shows whether we have already fetched a ts from pd and whether the ts we fetched is still valid.
latestOracleTSValid bool
// checkTSInWriteStmt is used to set RCCheckTS isolation for getting value when doing point-write
checkTSInWriteStmt bool
// NewPessimisticRCTxnContextProvider returns a new PessimisticRCTxnContextProvider
func NewPessimisticRCTxnContextProvider(sctx sessionctx.Context, causalConsistencyOnly bool) *PessimisticRCTxnContextProvider {
provider := &PessimisticRCTxnContextProvider{
baseTxnContextProvider: baseTxnContextProvider{
sctx: sctx,
causalConsistencyOnly: causalConsistencyOnly,
onInitializeTxnCtx: func(txnCtx *variable.TransactionContext) {
txnCtx.IsPessimistic = true
txnCtx.Isolation = ast.ReadCommitted
onTxnActiveFunc: func(txn kv.Transaction, _ sessiontxn.EnterNewTxnType) {
txn.SetOption(kv.Pessimistic, true)
provider.onTxnActiveFunc = func(txn kv.Transaction, _ sessiontxn.EnterNewTxnType) {
txn.SetOption(kv.Pessimistic, true)
provider.latestOracleTS = txn.StartTS()
provider.latestOracleTSValid = true
provider.getStmtReadTSFunc = provider.getStmtTS
provider.getStmtForUpdateTSFunc = provider.getStmtTS
return provider
// OnStmtStart is the hook that should be called when a new statement started
func (p *PessimisticRCTxnContextProvider) OnStmtStart(ctx context.Context, node ast.StmtNode) error {
if err := p.baseTxnContextProvider.OnStmtStart(ctx, node); err != nil {
return err
// Try to mark the `RCCheckTS` flag for the first time execution of in-transaction read requests
// using read-consistency isolation level.
if node != nil && NeedSetRCCheckTSFlag(p.sctx, node) {
p.sctx.GetSessionVars().StmtCtx.RCCheckTS = true
p.checkTSInWriteStmt = false
return p.prepareStmt(!p.isTxnPrepared)
// NeedSetRCCheckTSFlag checks whether it's needed to set `RCCheckTS` flag in current stmtctx.
func NeedSetRCCheckTSFlag(ctx sessionctx.Context, node ast.Node) bool {
sessionVars := ctx.GetSessionVars()
if sessionVars.ConnectionID > 0 && variable.EnableRCReadCheckTS.Load() && sessionVars.InTxn() &&
!sessionVars.RetryInfo.Retrying && plannercore.IsReadOnly(node, sessionVars) {
return true
return false
// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
func (p *PessimisticRCTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
switch point {
case sessiontxn.StmtErrAfterQuery:
return p.handleAfterQueryError(err)
case sessiontxn.StmtErrAfterPessimisticLock:
return p.handleAfterPessimisticLockError(err)
return p.baseTxnContextProvider.OnStmtErrorForNextAction(point, err)
// OnStmtRetry is the hook that should be called when a statement is retried internally.
func (p *PessimisticRCTxnContextProvider) OnStmtRetry(ctx context.Context) error {
if err := p.baseTxnContextProvider.OnStmtRetry(ctx); err != nil {
return err
p.checkTSInWriteStmt = false
return p.prepareStmt(false)
func (p *PessimisticRCTxnContextProvider) prepareStmtTS() {
if p.stmtTSFuture != nil {
sessVars := p.sctx.GetSessionVars()
var stmtTSFuture oracle.Future
switch {
case p.stmtUseStartTS:
stmtTSFuture = funcFuture(p.getTxnStartTS)
case p.latestOracleTSValid && sessVars.StmtCtx.RCCheckTS:
stmtTSFuture = sessiontxn.ConstantFuture(p.latestOracleTS)
stmtTSFuture = p.getOracleFuture()
p.stmtTSFuture = stmtTSFuture
func (p *PessimisticRCTxnContextProvider) getOracleFuture() funcFuture {
txnCtx := p.sctx.GetSessionVars().TxnCtx
future := newOracleFuture(p.ctx, p.sctx, txnCtx.TxnScope)
return func() (ts uint64, err error) {
if ts, err = future.Wait(); err != nil {
failpoint.Inject("waitTsoOfOracleFuture", func() {
ts = txnCtx.GetForUpdateTS()
p.latestOracleTS = ts
p.latestOracleTSValid = true
func (p *PessimisticRCTxnContextProvider) getStmtTS() (ts uint64, err error) {
if p.stmtTS != 0 {
return p.stmtTS, nil
var txn kv.Transaction
if txn, err = p.ActivateTxn(); err != nil {
return 0, err
if ts, err = p.stmtTSFuture.Wait(); err != nil {
return 0, err
txn.SetOption(kv.SnapshotTS, ts)
p.stmtTS = ts
// handleAfterQueryError will be called when the handle point is `StmtErrAfterQuery`.
// At this point the query will be retried from the beginning.
func (p *PessimisticRCTxnContextProvider) handleAfterQueryError(queryErr error) (sessiontxn.StmtErrorAction, error) {
sessVars := p.sctx.GetSessionVars()
if !errors.ErrorEqual(queryErr, kv.ErrWriteConflict) || !sessVars.StmtCtx.RCCheckTS {
return sessiontxn.NoIdea()
p.latestOracleTSValid = false
logutil.Logger(p.ctx).Info("RC read with ts checking has failed, retry RC read",
zap.String("sql", sessVars.StmtCtx.OriginalSQL), zap.Error(queryErr))
return sessiontxn.RetryReady()
func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) {
p.latestOracleTSValid = false
txnCtx := p.sctx.GetSessionVars().TxnCtx
retryable := false
if deadlock, ok := errors.Cause(lockErr).(*tikverr.ErrDeadlock); ok && deadlock.IsRetryable {
logutil.Logger(p.ctx).Info("single statement deadlock, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("lockTS", deadlock.LockTs),
zap.Stringer("lockKey", kv.Key(deadlock.LockKey)),
zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash))
retryable = true
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", txnCtx.GetForUpdateTS()),
zap.String("err", lockErr.Error()))
retryable = true
if retryable {
return sessiontxn.RetryReady()
return sessiontxn.ErrorAction(lockErr)
// AdviseWarmup provides warmup for inner state
func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error {
if err := p.prepareTxn(); err != nil {
return err
if !p.isTidbSnapshotEnabled() {
return nil
// planSkipGetTsoFromPD identifies the plans which don't need get newest ts from PD.
func planSkipGetTsoFromPD(sctx sessionctx.Context, plan plannercore.Plan, inLockOrWriteStmt bool) bool {
switch v := plan.(type) {
case *plannercore.PointGetPlan:
return sctx.GetSessionVars().RcWriteCheckTS && (v.Lock || inLockOrWriteStmt)
case plannercore.PhysicalPlan:
if len(v.Children()) == 0 {
return false
_, isPhysicalLock := v.(*plannercore.PhysicalLock)
for _, p := range v.Children() {
if !planSkipGetTsoFromPD(sctx, p, isPhysicalLock || inLockOrWriteStmt) {
return false
return true
case *plannercore.Update:
return planSkipGetTsoFromPD(sctx, v.SelectPlan, true)
case *plannercore.Delete:
return planSkipGetTsoFromPD(sctx, v.SelectPlan, true)
case *plannercore.Insert:
return v.SelectPlan == nil && len(v.OnDuplicate) == 0 && !v.IsReplace
return false
// AdviseOptimizeWithPlan in read-committed covers as many cases as repeatable-read.
// We do not fetch latest ts immediately for such scenes.
// 1. A query like the form of "SELECT ... FOR UPDATE" whose execution plan is "PointGet".
// 2. An INSERT statement without "SELECT" subquery.
// 3. A UPDATE statement whose sub execution plan is "PointGet".
// 4. A DELETE statement whose sub execution plan is "PointGet".
func (p *PessimisticRCTxnContextProvider) AdviseOptimizeWithPlan(val interface{}) (err error) {
if p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() {
return nil
if p.stmtUseStartTS || !p.latestOracleTSValid {
return nil
plan, ok := val.(plannercore.Plan)
if !ok {
return nil
if execute, ok := plan.(*plannercore.Execute); ok {
plan = execute.Plan
useLastOracleTS := false
if !p.sctx.GetSessionVars().RetryInfo.Retrying {
useLastOracleTS = planSkipGetTsoFromPD(p.sctx, plan, false)
if useLastOracleTS {
failpoint.Inject("tsoUseConstantFuture", func() {
p.checkTSInWriteStmt = true
p.stmtTSFuture = sessiontxn.ConstantFuture(p.latestOracleTS)
return nil
// GetSnapshotWithStmtForUpdateTS gets snapshot with for update ts
func (p *PessimisticRCTxnContextProvider) GetSnapshotWithStmtForUpdateTS() (kv.Snapshot, error) {
snapshot, err := p.baseTxnContextProvider.GetSnapshotWithStmtForUpdateTS()
if err != nil {
return nil, err
if p.checkTSInWriteStmt {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
return snapshot, err
// GetSnapshotWithStmtReadTS gets snapshot with read ts
func (p *PessimisticRCTxnContextProvider) GetSnapshotWithStmtReadTS() (kv.Snapshot, error) {
snapshot, err := p.baseTxnContextProvider.GetSnapshotWithStmtForUpdateTS()
if err != nil {
return nil, err
if p.sctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
return snapshot, nil
// IsCheckTSInWriteStmtMode is only used for test
func (p *PessimisticRCTxnContextProvider) IsCheckTSInWriteStmtMode() bool {
return p.checkTSInWriteStmt
2、 - 优质文章
3、 gate.io
8、 golang
9、 openharmony
10、 Vue中input框自动聚焦