tidb readcommitted 源码
tidb readcommitted 代码
文件路径:/sessiontxn/isolation/readcommitted.go
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package isolation
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/util/logutil"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)
type stmtState struct {
stmtTS uint64
stmtTSFuture oracle.Future
stmtUseStartTS bool
}
func (s *stmtState) prepareStmt(useStartTS bool) error {
*s = stmtState{
stmtUseStartTS: useStartTS,
}
return nil
}
// PessimisticRCTxnContextProvider provides txn context for isolation level read-committed
type PessimisticRCTxnContextProvider struct {
baseTxnContextProvider
stmtState
latestOracleTS uint64
// latestOracleTSValid shows whether we have already fetched a ts from pd and whether the ts we fetched is still valid.
latestOracleTSValid bool
// checkTSInWriteStmt is used to set RCCheckTS isolation for getting value when doing point-write
checkTSInWriteStmt bool
}
// NewPessimisticRCTxnContextProvider returns a new PessimisticRCTxnContextProvider
func NewPessimisticRCTxnContextProvider(sctx sessionctx.Context, causalConsistencyOnly bool) *PessimisticRCTxnContextProvider {
provider := &PessimisticRCTxnContextProvider{
baseTxnContextProvider: baseTxnContextProvider{
sctx: sctx,
causalConsistencyOnly: causalConsistencyOnly,
onInitializeTxnCtx: func(txnCtx *variable.TransactionContext) {
txnCtx.IsPessimistic = true
txnCtx.Isolation = ast.ReadCommitted
},
onTxnActiveFunc: func(txn kv.Transaction, _ sessiontxn.EnterNewTxnType) {
txn.SetOption(kv.Pessimistic, true)
},
},
}
provider.onTxnActiveFunc = func(txn kv.Transaction, _ sessiontxn.EnterNewTxnType) {
txn.SetOption(kv.Pessimistic, true)
provider.latestOracleTS = txn.StartTS()
provider.latestOracleTSValid = true
}
provider.getStmtReadTSFunc = provider.getStmtTS
provider.getStmtForUpdateTSFunc = provider.getStmtTS
return provider
}
// OnStmtStart is the hook that should be called when a new statement started
func (p *PessimisticRCTxnContextProvider) OnStmtStart(ctx context.Context, node ast.StmtNode) error {
if err := p.baseTxnContextProvider.OnStmtStart(ctx, node); err != nil {
return err
}
// Try to mark the `RCCheckTS` flag for the first time execution of in-transaction read requests
// using read-consistency isolation level.
if node != nil && NeedSetRCCheckTSFlag(p.sctx, node) {
p.sctx.GetSessionVars().StmtCtx.RCCheckTS = true
}
p.checkTSInWriteStmt = false
return p.prepareStmt(!p.isTxnPrepared)
}
// NeedSetRCCheckTSFlag checks whether it's needed to set `RCCheckTS` flag in current stmtctx.
func NeedSetRCCheckTSFlag(ctx sessionctx.Context, node ast.Node) bool {
sessionVars := ctx.GetSessionVars()
if sessionVars.ConnectionID > 0 && variable.EnableRCReadCheckTS.Load() && sessionVars.InTxn() &&
!sessionVars.RetryInfo.Retrying && plannercore.IsReadOnly(node, sessionVars) {
return true
}
return false
}
// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
func (p *PessimisticRCTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
switch point {
case sessiontxn.StmtErrAfterQuery:
return p.handleAfterQueryError(err)
case sessiontxn.StmtErrAfterPessimisticLock:
return p.handleAfterPessimisticLockError(err)
default:
return p.baseTxnContextProvider.OnStmtErrorForNextAction(point, err)
}
}
// OnStmtRetry is the hook that should be called when a statement is retried internally.
func (p *PessimisticRCTxnContextProvider) OnStmtRetry(ctx context.Context) error {
if err := p.baseTxnContextProvider.OnStmtRetry(ctx); err != nil {
return err
}
p.checkTSInWriteStmt = false
return p.prepareStmt(false)
}
func (p *PessimisticRCTxnContextProvider) prepareStmtTS() {
if p.stmtTSFuture != nil {
return
}
sessVars := p.sctx.GetSessionVars()
var stmtTSFuture oracle.Future
switch {
case p.stmtUseStartTS:
stmtTSFuture = funcFuture(p.getTxnStartTS)
case p.latestOracleTSValid && sessVars.StmtCtx.RCCheckTS:
stmtTSFuture = sessiontxn.ConstantFuture(p.latestOracleTS)
default:
stmtTSFuture = p.getOracleFuture()
}
p.stmtTSFuture = stmtTSFuture
}
func (p *PessimisticRCTxnContextProvider) getOracleFuture() funcFuture {
txnCtx := p.sctx.GetSessionVars().TxnCtx
future := newOracleFuture(p.ctx, p.sctx, txnCtx.TxnScope)
return func() (ts uint64, err error) {
if ts, err = future.Wait(); err != nil {
return
}
failpoint.Inject("waitTsoOfOracleFuture", func() {
sessiontxn.TsoWaitCountInc(p.sctx)
})
txnCtx.SetForUpdateTS(ts)
ts = txnCtx.GetForUpdateTS()
p.latestOracleTS = ts
p.latestOracleTSValid = true
return
}
}
func (p *PessimisticRCTxnContextProvider) getStmtTS() (ts uint64, err error) {
if p.stmtTS != 0 {
return p.stmtTS, nil
}
var txn kv.Transaction
if txn, err = p.ActivateTxn(); err != nil {
return 0, err
}
p.prepareStmtTS()
if ts, err = p.stmtTSFuture.Wait(); err != nil {
return 0, err
}
txn.SetOption(kv.SnapshotTS, ts)
p.stmtTS = ts
return
}
// handleAfterQueryError will be called when the handle point is `StmtErrAfterQuery`.
// At this point the query will be retried from the beginning.
func (p *PessimisticRCTxnContextProvider) handleAfterQueryError(queryErr error) (sessiontxn.StmtErrorAction, error) {
sessVars := p.sctx.GetSessionVars()
if !errors.ErrorEqual(queryErr, kv.ErrWriteConflict) || !sessVars.StmtCtx.RCCheckTS {
return sessiontxn.NoIdea()
}
p.latestOracleTSValid = false
logutil.Logger(p.ctx).Info("RC read with ts checking has failed, retry RC read",
zap.String("sql", sessVars.StmtCtx.OriginalSQL), zap.Error(queryErr))
return sessiontxn.RetryReady()
}
func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) {
p.latestOracleTSValid = false
txnCtx := p.sctx.GetSessionVars().TxnCtx
retryable := false
if deadlock, ok := errors.Cause(lockErr).(*tikverr.ErrDeadlock); ok && deadlock.IsRetryable {
logutil.Logger(p.ctx).Info("single statement deadlock, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("lockTS", deadlock.LockTs),
zap.Stringer("lockKey", kv.Key(deadlock.LockKey)),
zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash))
retryable = true
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", txnCtx.GetForUpdateTS()),
zap.String("err", lockErr.Error()))
retryable = true
}
if retryable {
return sessiontxn.RetryReady()
}
return sessiontxn.ErrorAction(lockErr)
}
// AdviseWarmup provides warmup for inner state
func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error {
if err := p.prepareTxn(); err != nil {
return err
}
if !p.isTidbSnapshotEnabled() {
p.prepareStmtTS()
}
return nil
}
// planSkipGetTsoFromPD identifies the plans which don't need get newest ts from PD.
func planSkipGetTsoFromPD(sctx sessionctx.Context, plan plannercore.Plan, inLockOrWriteStmt bool) bool {
switch v := plan.(type) {
case *plannercore.PointGetPlan:
return sctx.GetSessionVars().RcWriteCheckTS && (v.Lock || inLockOrWriteStmt)
case plannercore.PhysicalPlan:
if len(v.Children()) == 0 {
return false
}
_, isPhysicalLock := v.(*plannercore.PhysicalLock)
for _, p := range v.Children() {
if !planSkipGetTsoFromPD(sctx, p, isPhysicalLock || inLockOrWriteStmt) {
return false
}
}
return true
case *plannercore.Update:
return planSkipGetTsoFromPD(sctx, v.SelectPlan, true)
case *plannercore.Delete:
return planSkipGetTsoFromPD(sctx, v.SelectPlan, true)
case *plannercore.Insert:
return v.SelectPlan == nil && len(v.OnDuplicate) == 0 && !v.IsReplace
}
return false
}
// AdviseOptimizeWithPlan in read-committed covers as many cases as repeatable-read.
// We do not fetch latest ts immediately for such scenes.
// 1. A query like the form of "SELECT ... FOR UPDATE" whose execution plan is "PointGet".
// 2. An INSERT statement without "SELECT" subquery.
// 3. A UPDATE statement whose sub execution plan is "PointGet".
// 4. A DELETE statement whose sub execution plan is "PointGet".
func (p *PessimisticRCTxnContextProvider) AdviseOptimizeWithPlan(val interface{}) (err error) {
if p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() {
return nil
}
if p.stmtUseStartTS || !p.latestOracleTSValid {
return nil
}
plan, ok := val.(plannercore.Plan)
if !ok {
return nil
}
if execute, ok := plan.(*plannercore.Execute); ok {
plan = execute.Plan
}
useLastOracleTS := false
if !p.sctx.GetSessionVars().RetryInfo.Retrying {
useLastOracleTS = planSkipGetTsoFromPD(p.sctx, plan, false)
}
if useLastOracleTS {
failpoint.Inject("tsoUseConstantFuture", func() {
sessiontxn.TsoUseConstantCountInc(p.sctx)
})
p.checkTSInWriteStmt = true
p.stmtTSFuture = sessiontxn.ConstantFuture(p.latestOracleTS)
}
return nil
}
// GetSnapshotWithStmtForUpdateTS gets snapshot with for update ts
func (p *PessimisticRCTxnContextProvider) GetSnapshotWithStmtForUpdateTS() (kv.Snapshot, error) {
snapshot, err := p.baseTxnContextProvider.GetSnapshotWithStmtForUpdateTS()
if err != nil {
return nil, err
}
if p.checkTSInWriteStmt {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}
return snapshot, err
}
// GetSnapshotWithStmtReadTS gets snapshot with read ts
func (p *PessimisticRCTxnContextProvider) GetSnapshotWithStmtReadTS() (kv.Snapshot, error) {
snapshot, err := p.baseTxnContextProvider.GetSnapshotWithStmtForUpdateTS()
if err != nil {
return nil, err
}
if p.sctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}
return snapshot, nil
}
// IsCheckTSInWriteStmtMode is only used for test
func (p *PessimisticRCTxnContextProvider) IsCheckTSInWriteStmtMode() bool {
return p.checkTSInWriteStmt
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦