tidb tidb 源码

  • 2022-09-19
  • 浏览 (458)

tidb tidb 代码

文件路径:/session/tidb.go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2013 The ql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSES/QL-LICENSE file.

package session

import (
	"context"
	"sync"
	"sync/atomic"
	"time"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/config"
	"github.com/pingcap/tidb/ddl"
	"github.com/pingcap/tidb/ddl/schematracker"
	"github.com/pingcap/tidb/domain"
	"github.com/pingcap/tidb/errno"
	"github.com/pingcap/tidb/executor"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/parser"
	"github.com/pingcap/tidb/parser/ast"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/sessionctx/variable"
	"github.com/pingcap/tidb/sessiontxn"
	"github.com/pingcap/tidb/util"
	"github.com/pingcap/tidb/util/chunk"
	"github.com/pingcap/tidb/util/dbterror"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/sqlexec"
	"go.uber.org/zap"
)

type domainMap struct {
	mu      sync.Mutex
	domains map[string]*domain.Domain
}

func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
	dm.mu.Lock()
	defer dm.mu.Unlock()

	if store == nil {
		for _, d := range dm.domains {
			// return available domain if any
			return d, nil
		}
		return nil, errors.New("can not find available domain for a nil store")
	}

	key := store.UUID()

	d = dm.domains[key]
	if d != nil {
		return
	}

	ddlLease := time.Duration(atomic.LoadInt64(&schemaLease))
	statisticLease := time.Duration(atomic.LoadInt64(&statsLease))
	idxUsageSyncLease := GetIndexUsageSyncLease()
	planReplayerGCLease := GetPlanReplayerGCLease()
	err = util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (retry bool, err1 error) {
		logutil.BgLogger().Info("new domain",
			zap.String("store", store.UUID()),
			zap.Stringer("ddl lease", ddlLease),
			zap.Stringer("stats lease", statisticLease),
			zap.Stringer("index usage sync lease", idxUsageSyncLease))
		factory := createSessionFunc(store)
		sysFactory := createSessionWithDomainFunc(store)
		onClose := func() {
			dm.Delete(store)
		}
		d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose)

		var ddlInjector func(ddl.DDL) *schematracker.Checker
		if injector, ok := store.(schematracker.StorageDDLInjector); ok {
			ddlInjector = injector.Injector
		}
		err1 = d.Init(ddlLease, sysFactory, ddlInjector)
		if err1 != nil {
			// If we don't clean it, there are some dirty data when retrying the function of Init.
			d.Close()
			logutil.BgLogger().Error("[ddl] init domain failed",
				zap.Error(err1))
		}
		return true, err1
	})
	if err != nil {
		return nil, err
	}

	dm.domains[key] = d

	return
}

func (dm *domainMap) Delete(store kv.Storage) {
	dm.mu.Lock()
	delete(dm.domains, store.UUID())
	dm.mu.Unlock()
}

var (
	domap = &domainMap{
		domains: map[string]*domain.Domain{},
	}
	// store.UUID()-> IfBootstrapped
	storeBootstrapped     = make(map[string]bool)
	storeBootstrappedLock sync.Mutex

	// schemaLease is the time for re-updating remote schema.
	// In online DDL, we must wait 2 * SchemaLease time to guarantee
	// all servers get the neweset schema.
	// Default schema lease time is 1 second, you can change it with a proper time,
	// but you must know that too little may cause badly performance degradation.
	// For production, you should set a big schema lease, like 300s+.
	schemaLease = int64(1 * time.Second)

	// statsLease is the time for reload stats table.
	statsLease = int64(3 * time.Second)

	// indexUsageSyncLease is the time for index usage synchronization.
	// Because we have not completed GC and other functions, we set it to 0.
	// TODO: Set indexUsageSyncLease to 60s.
	indexUsageSyncLease = int64(0 * time.Second)

	// planReplayerGCLease is the time for plan replayer gc.
	planReplayerGCLease = int64(10 * time.Minute)
)

// ResetStoreForWithTiKVTest is only used in the test code.
// TODO: Remove domap and storeBootstrapped. Use store.SetOption() to do it.
func ResetStoreForWithTiKVTest(store kv.Storage) {
	domap.Delete(store)
	unsetStoreBootstrapped(store.UUID())
}

func setStoreBootstrapped(storeUUID string) {
	storeBootstrappedLock.Lock()
	defer storeBootstrappedLock.Unlock()
	storeBootstrapped[storeUUID] = true
}

// unsetStoreBootstrapped delete store uuid from stored bootstrapped map.
// currently this function only used for test.
func unsetStoreBootstrapped(storeUUID string) {
	storeBootstrappedLock.Lock()
	defer storeBootstrappedLock.Unlock()
	delete(storeBootstrapped, storeUUID)
}

// SetSchemaLease changes the default schema lease time for DDL.
// This function is very dangerous, don't use it if you really know what you do.
// SetSchemaLease only affects not local storage after bootstrapped.
func SetSchemaLease(lease time.Duration) {
	atomic.StoreInt64(&schemaLease, int64(lease))
}

// SetStatsLease changes the default stats lease time for loading stats info.
func SetStatsLease(lease time.Duration) {
	atomic.StoreInt64(&statsLease, int64(lease))
}

// SetIndexUsageSyncLease changes the default index usage sync lease time for loading info.
func SetIndexUsageSyncLease(lease time.Duration) {
	atomic.StoreInt64(&indexUsageSyncLease, int64(lease))
}

// GetIndexUsageSyncLease returns the index usage sync lease time.
func GetIndexUsageSyncLease() time.Duration {
	return time.Duration(atomic.LoadInt64(&indexUsageSyncLease))
}

// SetPlanReplayerGCLease changes the default plan repalyer gc lease time.
func SetPlanReplayerGCLease(lease time.Duration) {
	atomic.StoreInt64(&planReplayerGCLease, int64(lease))
}

// GetPlanReplayerGCLease returns the plan replayer gc lease time.
func GetPlanReplayerGCLease() time.Duration {
	return time.Duration(atomic.LoadInt64(&planReplayerGCLease))
}

// DisableStats4Test disables the stats for tests.
func DisableStats4Test() {
	SetStatsLease(-1)
}

// Parse parses a query string to raw ast.StmtNode.
func Parse(ctx sessionctx.Context, src string) ([]ast.StmtNode, error) {
	logutil.BgLogger().Debug("compiling", zap.String("source", src))
	sessVars := ctx.GetSessionVars()
	p := parser.New()
	p.SetParserConfig(sessVars.BuildParserConfig())
	p.SetSQLMode(sessVars.SQLMode)
	stmts, warns, err := p.ParseSQL(src, sessVars.GetParseParams()...)
	for _, warn := range warns {
		sessVars.StmtCtx.AppendWarning(warn)
	}
	if err != nil {
		logutil.BgLogger().Warn("compiling",
			zap.String("source", src),
			zap.Error(err))
		return nil, err
	}
	return stmts, nil
}

func recordAbortTxnDuration(sessVars *variable.SessionVars) {
	duration := time.Since(sessVars.TxnCtx.CreateTime).Seconds()
	if sessVars.TxnCtx.IsPessimistic {
		transactionDurationPessimisticAbort.Observe(duration)
	} else {
		transactionDurationOptimisticAbort.Observe(duration)
	}
}

func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error {
	sessVars := se.sessionVars
	if !sql.IsReadOnly(sessVars) {
		// All the history should be added here.
		if meetsErr == nil && sessVars.TxnCtx.CouldRetry {
			GetHistory(se).Add(sql, sessVars.StmtCtx)
		}

		// Handle the stmt commit/rollback.
		if se.txn.Valid() {
			if meetsErr != nil {
				se.StmtRollback()
			} else {
				se.StmtCommit()
			}
		}
	}
	err := autoCommitAfterStmt(ctx, se, meetsErr, sql)
	if se.txn.pending() {
		// After run statement finish, txn state is still pending means the
		// statement never need a Txn(), such as:
		//
		// set @@tidb_general_log = 1
		// set @@autocommit = 0
		// select 1
		//
		// Reset txn state to invalid to dispose the pending start ts.
		se.txn.changeToInvalid()
	}
	if err != nil {
		return err
	}
	return checkStmtLimit(ctx, se)
}

func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error {
	sessVars := se.sessionVars
	if meetsErr != nil {
		if !sessVars.InTxn() {
			logutil.BgLogger().Info("rollbackTxn called due to ddl/autocommit failure")
			se.RollbackTxn(ctx)
			recordAbortTxnDuration(sessVars)
		} else if se.txn.Valid() && se.txn.IsPessimistic() && executor.ErrDeadlock.Equal(meetsErr) {
			logutil.BgLogger().Info("rollbackTxn for deadlock", zap.Uint64("txn", se.txn.StartTS()))
			se.RollbackTxn(ctx)
			recordAbortTxnDuration(sessVars)
		}
		return meetsErr
	}

	if !sessVars.InTxn() {
		if err := se.CommitTxn(ctx); err != nil {
			if _, ok := sql.(*executor.ExecStmt).StmtNode.(*ast.CommitStmt); ok {
				err = errors.Annotatef(err, "previous statement: %s", se.GetSessionVars().PrevStmt)
			}
			return err
		}
		return nil
	}
	return nil
}

func checkStmtLimit(ctx context.Context, se *session) error {
	// If the user insert, insert, insert ... but never commit, TiDB would OOM.
	// So we limit the statement count in a transaction here.
	var err error
	sessVars := se.GetSessionVars()
	history := GetHistory(se)
	if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) {
		if !sessVars.BatchCommit {
			se.RollbackTxn(ctx)
			return errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t",
				history.Count(), sessVars.IsAutocommit())
		}
		err = sessiontxn.NewTxn(ctx, se)
		// The transaction does not committed yet, we need to keep it in transaction.
		// The last history could not be "commit"/"rollback" statement.
		// It means it is impossible to start a new transaction at the end of the transaction.
		// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
		sessVars.SetInTxn(true)
	}
	return err
}

// GetHistory get all stmtHistory in current txn. Exported only for test.
func GetHistory(ctx sessionctx.Context) *StmtHistory {
	hist, ok := ctx.GetSessionVars().TxnCtx.History.(*StmtHistory)
	if ok {
		return hist
	}
	hist = new(StmtHistory)
	ctx.GetSessionVars().TxnCtx.History = hist
	return hist
}

// GetRows4Test gets all the rows from a RecordSet, only used for test.
func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) {
	if rs == nil {
		return nil, nil
	}
	var rows []chunk.Row
	req := rs.NewChunk(nil)
	// Must reuse `req` for imitating server.(*clientConn).writeChunks
	for {
		err := rs.Next(ctx, req)
		if err != nil {
			return nil, err
		}
		if req.NumRows() == 0 {
			break
		}

		iter := chunk.NewIterator4Chunk(req.CopyConstruct())
		for row := iter.Begin(); row != iter.End(); row = iter.Next() {
			rows = append(rows, row)
		}
	}
	return rows, nil
}

// ResultSetToStringSlice changes the RecordSet to [][]string.
func ResultSetToStringSlice(ctx context.Context, s Session, rs sqlexec.RecordSet) ([][]string, error) {
	rows, err := GetRows4Test(ctx, s, rs)
	if err != nil {
		return nil, err
	}
	err = rs.Close()
	if err != nil {
		return nil, err
	}
	sRows := make([][]string, len(rows))
	for i := range rows {
		row := rows[i]
		iRow := make([]string, row.Len())
		for j := 0; j < row.Len(); j++ {
			if row.IsNull(j) {
				iRow[j] = "<nil>"
			} else {
				d := row.GetDatum(j, &rs.Fields()[j].Column.FieldType)
				iRow[j], err = d.ToString()
				if err != nil {
					return nil, err
				}
			}
		}
		sRows[i] = iRow
	}
	return sRows, nil
}

// Session errors.
var (
	ErrForUpdateCantRetry = dbterror.ClassSession.NewStd(errno.ErrForUpdateCantRetry)
)

相关信息

tidb 源码目录

相关文章

tidb advisory_locks 源码

tidb bootstrap 源码

tidb nontransactional 源码

tidb schema_amender 源码

tidb session 源码

tidb txn 源码

tidb txnmanager 源码

0  赞