tidb base 源码
tidb base 代码
文件路径:/sessiontxn/isolation/base.go
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package isolation
import (
"context"
"time"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/internal"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table/temptable"
"github.com/tikv/client-go/v2/oracle"
)
// baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation.
// It provides some common functions below:
// - Provides a default `OnInitialize` method to initialize its inner state.
// - Provides some methods like `activateTxn` and `prepareTxn` to manage the inner transaction.
// - Provides default methods `GetTxnInfoSchema`, `GetStmtReadTS` and `GetStmtForUpdateTS` and return the snapshot information schema or ts when `tidb_snapshot` is set.
// - Provides other default methods like `Advise`, `OnStmtStart`, `OnStmtRetry` and `OnStmtErrorForNextAction`
//
// The subclass can set some inner property of `baseTxnContextProvider` when it is constructed.
// For example, `getStmtReadTSFunc` and `getStmtForUpdateTSFunc` should be set, and they will be called when `GetStmtReadTS`
// or `GetStmtForUpdate` to get the timestamp that should be used by the corresponding isolation level.
type baseTxnContextProvider struct {
// States that should be initialized when baseTxnContextProvider is created and should not be changed after that
sctx sessionctx.Context
causalConsistencyOnly bool
onInitializeTxnCtx func(*variable.TransactionContext)
onTxnActiveFunc func(kv.Transaction, sessiontxn.EnterNewTxnType)
getStmtReadTSFunc func() (uint64, error)
getStmtForUpdateTSFunc func() (uint64, error)
// Runtime states
ctx context.Context
infoSchema infoschema.InfoSchema
txn kv.Transaction
isTxnPrepared bool
enterNewTxnType sessiontxn.EnterNewTxnType
// constStartTS is only used by point get max ts optimization currently.
// When constStartTS != 0, we use constStartTS directly without fetching it from tso.
// To save the cpu cycles `PrepareTSFuture` will also not be called when warmup (postpone to activate txn).
constStartTS uint64
}
// OnInitialize is the hook that should be called when enter a new txn with this provider
func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn.EnterNewTxnType) (err error) {
if p.getStmtReadTSFunc == nil || p.getStmtForUpdateTSFunc == nil {
return errors.New("ts functions should not be nil")
}
p.ctx = ctx
sessVars := p.sctx.GetSessionVars()
activeNow := true
switch tp {
case sessiontxn.EnterNewTxnDefault:
// As we will enter a new txn, we need to commit the old txn if it's still valid.
// There are two main steps here to enter a new txn:
// 1. prepareTxnWithOracleTS
// 2. ActivateTxn
if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil {
return err
}
if err := p.prepareTxnWithOracleTS(); err != nil {
return err
}
case sessiontxn.EnterNewTxnWithBeginStmt:
if !canReuseTxnWhenExplicitBegin(p.sctx) {
// As we will enter a new txn, we need to commit the old txn if it's still valid.
// There are two main steps here to enter a new txn:
// 1. prepareTxnWithOracleTS
// 2. ActivateTxn
if err := internal.CommitBeforeEnterNewTxn(p.ctx, p.sctx); err != nil {
return err
}
if err := p.prepareTxnWithOracleTS(); err != nil {
return err
}
}
sessVars.SetInTxn(true)
case sessiontxn.EnterNewTxnBeforeStmt:
activeNow = false
default:
return errors.Errorf("Unsupported type: %v", tp)
}
p.enterNewTxnType = tp
p.infoSchema = p.sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
txnCtx := &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
CreateTime: time.Now(),
InfoSchema: p.infoSchema,
ShardStep: int(sessVars.ShardAllocateStep),
TxnScope: sessVars.CheckAndGetTxnScope(),
},
}
if p.onInitializeTxnCtx != nil {
p.onInitializeTxnCtx(txnCtx)
}
sessVars.TxnCtx = txnCtx
if variable.EnableMDL.Load() {
sessVars.TxnCtx.EnableMDL = true
}
txn, err := p.sctx.Txn(false)
if err != nil {
return err
}
p.isTxnPrepared = txn.Valid() || p.sctx.GetPreparedTxnFuture() != nil
if activeNow {
_, err = p.ActivateTxn()
}
return err
}
// GetTxnInfoSchema returns the information schema used by txn
func (p *baseTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema {
if is := p.sctx.GetSessionVars().SnapshotInfoschema; is != nil {
return is.(infoschema.InfoSchema)
}
return p.infoSchema
}
func (p *baseTxnContextProvider) SetTxnInfoSchema(is infoschema.InfoSchema) {
p.infoSchema = is
}
// GetTxnScope returns the current txn scope
func (p *baseTxnContextProvider) GetTxnScope() string {
return p.sctx.GetSessionVars().TxnCtx.TxnScope
}
// GetReadReplicaScope returns the read replica scope
func (p *baseTxnContextProvider) GetReadReplicaScope() string {
if txnScope := p.GetTxnScope(); txnScope != kv.GlobalTxnScope && txnScope != "" {
// In local txn, we should use txnScope as the readReplicaScope
return txnScope
}
if p.sctx.GetSessionVars().GetReplicaRead().IsClosestRead() {
// If closest read is set, we should use the scope where instance located.
return config.GetTxnScopeFromConfig()
}
// When it is not local txn or closet read, we should use global scope
return kv.GlobalReplicaScope
}
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
func (p *baseTxnContextProvider) GetStmtReadTS() (uint64, error) {
if _, err := p.ActivateTxn(); err != nil {
return 0, err
}
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
return snapshotTS, nil
}
return p.getStmtReadTSFunc()
}
// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
if _, err := p.ActivateTxn(); err != nil {
return 0, err
}
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
return snapshotTS, nil
}
return p.getStmtForUpdateTSFunc()
}
// OnStmtStart is the hook that should be called when a new statement started
func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context, _ ast.StmtNode) error {
p.ctx = ctx
return nil
}
// OnStmtRetry is the hook that should be called when a statement is retried internally.
func (p *baseTxnContextProvider) OnStmtRetry(ctx context.Context) error {
p.ctx = ctx
return nil
}
// OnLocalTemporaryTableCreated is the hook that should be called when a local temporary table created.
func (p *baseTxnContextProvider) OnLocalTemporaryTableCreated() {
p.infoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, p.infoSchema)
p.sctx.GetSessionVars().TxnCtx.InfoSchema = p.infoSchema
if p.txn != nil && p.txn.Valid() {
if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
p.txn.SetOption(kv.SnapInterceptor, interceptor)
}
}
}
// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
func (p *baseTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
switch point {
case sessiontxn.StmtErrAfterPessimisticLock:
// for pessimistic lock error, return the error by default
return sessiontxn.ErrorAction(err)
default:
return sessiontxn.NoIdea()
}
}
func (p *baseTxnContextProvider) getTxnStartTS() (uint64, error) {
txn, err := p.ActivateTxn()
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}
// ActivateTxn activates the transaction and set the relevant context variables.
func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
if p.txn != nil {
return p.txn, nil
}
if err := p.prepareTxn(); err != nil {
return nil, err
}
if p.constStartTS != 0 {
if err := p.replaceTxnTsFuture(sessiontxn.ConstantFuture(p.constStartTS)); err != nil {
return nil, err
}
}
txnFuture := p.sctx.GetPreparedTxnFuture()
txn, err := txnFuture.Wait(p.ctx, p.sctx)
if err != nil {
return nil, err
}
sessVars := p.sctx.GetSessionVars()
sessVars.TxnCtx.StartTS = txn.StartTS()
if p.enterNewTxnType == sessiontxn.EnterNewTxnBeforeStmt && !sessVars.IsAutocommit() && sessVars.SnapshotTS == 0 {
sessVars.SetInTxn(true)
}
txn.SetVars(sessVars.KVVars)
readReplicaType := sessVars.GetReplicaRead()
if readReplicaType.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, readReplicaType)
}
if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
txn.SetOption(kv.SnapInterceptor, interceptor)
}
if sessVars.StmtCtx.WeakConsistency {
txn.SetOption(kv.IsolationLevel, kv.RC)
}
internal.SetTxnAssertionLevel(txn, sessVars.AssertionLevel)
if p.causalConsistencyOnly {
txn.SetOption(kv.GuaranteeLinearizability, false)
}
if p.onTxnActiveFunc != nil {
p.onTxnActiveFunc(txn, p.enterNewTxnType)
}
if p.sctx.GetSessionVars().InRestrictedSQL {
txn.SetOption(kv.RequestSourceInternal, true)
}
if tp := p.sctx.GetSessionVars().RequestSourceType; tp != "" {
txn.SetOption(kv.RequestSourceType, tp)
}
p.txn = txn
return txn, nil
}
// prepareTxn prepares txn with an oracle ts future. If the snapshotTS is set,
// the txn is prepared with it.
func (p *baseTxnContextProvider) prepareTxn() error {
if p.isTxnPrepared {
return nil
}
if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
return p.replaceTxnTsFuture(sessiontxn.ConstantFuture(snapshotTS))
}
future := newOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope)
return p.replaceTxnTsFuture(future)
}
// prepareTxnWithOracleTS
// The difference between prepareTxnWithOracleTS and prepareTxn is that prepareTxnWithOracleTS
// does not consider snapshotTS
func (p *baseTxnContextProvider) prepareTxnWithOracleTS() error {
if p.isTxnPrepared {
return nil
}
future := newOracleFuture(p.ctx, p.sctx, p.sctx.GetSessionVars().TxnCtx.TxnScope)
return p.replaceTxnTsFuture(future)
}
func (p *baseTxnContextProvider) forcePrepareConstStartTS(ts uint64) error {
if p.txn != nil {
return errors.New("cannot force prepare const start ts because txn is active")
}
p.constStartTS = ts
p.isTxnPrepared = true
return nil
}
func (p *baseTxnContextProvider) replaceTxnTsFuture(future oracle.Future) error {
txn, err := p.sctx.Txn(false)
if err != nil {
return err
}
if txn.Valid() {
return nil
}
txnScope := p.sctx.GetSessionVars().TxnCtx.TxnScope
if err = p.sctx.PrepareTSFuture(p.ctx, future, txnScope); err != nil {
return err
}
p.isTxnPrepared = true
return nil
}
func (p *baseTxnContextProvider) isTidbSnapshotEnabled() bool {
return p.sctx.GetSessionVars().SnapshotTS != 0
}
// isBeginStmtWithStaleRead indicates whether the current statement is `BeginStmt` type with stale read
// Because stale read will use `staleread.StalenessTxnContextProvider` for query, so if `staleread.IsStmtStaleness()`
// returns true in other providers, it means the current statement is `BeginStmt` with stale read
func (p *baseTxnContextProvider) isBeginStmtWithStaleRead() bool {
return staleread.IsStmtStaleness(p.sctx)
}
// AdviseWarmup provides warmup for inner state
func (p *baseTxnContextProvider) AdviseWarmup() error {
if p.isTxnPrepared || p.isBeginStmtWithStaleRead() {
// When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp
return nil
}
return p.prepareTxn()
}
// AdviseOptimizeWithPlan providers optimization according to the plan
func (p *baseTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}
// GetSnapshotWithStmtReadTS gets snapshot with read ts
func (p *baseTxnContextProvider) GetSnapshotWithStmtReadTS() (kv.Snapshot, error) {
ts, err := p.GetStmtReadTS()
if err != nil {
return nil, err
}
return p.getSnapshotByTS(ts)
}
// GetSnapshotWithStmtForUpdateTS gets snapshot with for update ts
func (p *baseTxnContextProvider) GetSnapshotWithStmtForUpdateTS() (kv.Snapshot, error) {
ts, err := p.GetStmtForUpdateTS()
if err != nil {
return nil, err
}
return p.getSnapshotByTS(ts)
}
// getSnapshotByTS get snapshot from store according to the snapshotTS and set the transaction related
// options before return
func (p *baseTxnContextProvider) getSnapshotByTS(snapshotTS uint64) (kv.Snapshot, error) {
txn, err := p.sctx.Txn(false)
if err != nil {
return nil, err
}
txnCtx := p.sctx.GetSessionVars().TxnCtx
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == snapshotTS {
return txn.GetSnapshot(), nil
}
sessVars := p.sctx.GetSessionVars()
snapshot := internal.GetSnapshotWithTS(
p.sctx,
snapshotTS,
temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema),
)
replicaReadType := sessVars.GetReplicaRead()
if replicaReadType.IsFollowerRead() &&
!sessVars.StmtCtx.RCCheckTS &&
!sessVars.RcWriteCheckTS {
snapshot.SetOption(kv.ReplicaRead, replicaReadType)
}
return snapshot, nil
}
// canReuseTxnWhenExplicitBegin returns whether we should reuse the txn when starting a transaction explicitly
func canReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool {
sessVars := sctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
// If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should
// always create a new transaction.
// If the variable `tidb_snapshot` is set, we should always create a new transaction because the current txn may be
// initialized with snapshot ts.
return txnCtx.History == nil && !txnCtx.IsStaleness && sessVars.SnapshotTS == 0
}
// newOracleFuture creates new future according to the scope and the session context
func newOracleFuture(ctx context.Context, sctx sessionctx.Context, scope string) oracle.Future {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("isolation.newOracleFuture", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
failpoint.Inject("requestTsoFromPD", func() {
sessiontxn.TsoRequestCountInc(sctx)
})
oracleStore := sctx.GetStore().GetOracle()
option := &oracle.Option{TxnScope: scope}
if sctx.GetSessionVars().LowResolutionTSO {
return oracleStore.GetLowResolutionTimestampAsync(ctx, option)
}
return oracleStore.GetTimestampAsync(ctx, option)
}
// funcFuture implements oracle.Future
type funcFuture func() (uint64, error)
// Wait returns a ts got from the func
func (f funcFuture) Wait() (uint64, error) {
return f()
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦