tidb conn 源码

  • 2022-09-19
  • 浏览 (556)

tidb conn 代码

文件路径:/dumpling/export/conn.go

// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package export

import (
	"database/sql"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/br/pkg/utils"
	tcontext "github.com/pingcap/tidb/dumpling/context"
	"go.uber.org/zap"
)

// BaseConn wraps connection instance.
type BaseConn struct {
	DBConn *sql.Conn

	backOffer     backOfferResettable
	rebuildConnFn func(*sql.Conn, bool) (*sql.Conn, error)
}

func newBaseConn(conn *sql.Conn, shouldRetry bool, rebuildConnFn func(*sql.Conn, bool) (*sql.Conn, error)) *BaseConn {
	baseConn := &BaseConn{DBConn: conn}
	baseConn.backOffer = newRebuildConnBackOffer(shouldRetry)
	if shouldRetry {
		baseConn.rebuildConnFn = rebuildConnFn
	}
	return baseConn
}

// QuerySQL defines query statement, and connect to real DB.
func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, handleOneRow func(*sql.Rows) error, reset func(), query string, args ...interface{}) error {
	retryTime := 0
	err := utils.WithRetry(tctx, func() (err error) {
		retryTime++
		if retryTime > 1 && conn.rebuildConnFn != nil {
			conn.DBConn, err = conn.rebuildConnFn(conn.DBConn, false)
			if err != nil {
				return
			}
		}
		err = simpleQueryWithArgs(tctx, conn.DBConn, handleOneRow, query, args...)
		if err != nil {
			tctx.L().Info("cannot execute query", zap.Int("retryTime", retryTime), zap.String("sql", query),
				zap.Any("args", args), zap.Error(err))
			reset()
			return err
		}
		return nil
	}, conn.backOffer)
	conn.backOffer.Reset()
	return err
}

// QuerySQLWithColumns defines query statement, and connect to real DB and get results for special column names
func (conn *BaseConn) QuerySQLWithColumns(tctx *tcontext.Context, columns []string, query string, args ...interface{}) ([][]string, error) {
	retryTime := 0
	var results [][]string
	err := utils.WithRetry(tctx, func() (err error) {
		retryTime++
		if retryTime > 1 && conn.rebuildConnFn != nil {
			conn.DBConn, err = conn.rebuildConnFn(conn.DBConn, false)
			if err != nil {
				return
			}
		}
		rows, err := conn.DBConn.QueryContext(tctx, query, args...)
		if err != nil {
			tctx.L().Info("cannot execute query", zap.Int("retryTime", retryTime), zap.String("sql", query),
				zap.Any("args", args), zap.Error(err))
			return errors.Annotatef(err, "sql: %s", query)
		}
		results, err = GetSpecifiedColumnValuesAndClose(rows, columns...)
		if err != nil {
			tctx.L().Info("cannot execute query", zap.Int("retryTime", retryTime), zap.String("sql", query),
				zap.Any("args", args), zap.Error(err))
			results = nil
			return errors.Annotatef(err, "sql: %s", query)
		}
		return err
	}, conn.backOffer)
	conn.backOffer.Reset()
	return results, err
}

// ExecSQL defines exec statement, and connect to real DB.
func (conn *BaseConn) ExecSQL(tctx *tcontext.Context, canRetryFunc func(sql.Result, error) error, query string, args ...interface{}) error {
	retryTime := 0
	err := utils.WithRetry(tctx, func() (err error) {
		retryTime++
		if retryTime > 1 && conn.rebuildConnFn != nil {
			conn.DBConn, err = conn.rebuildConnFn(conn.DBConn, false)
			if err != nil {
				return
			}
		}
		res, err := conn.DBConn.ExecContext(tctx, query, args...)
		if err = canRetryFunc(res, err); err != nil {
			tctx.L().Info("cannot execute query", zap.Int("retryTime", retryTime), zap.String("sql", query),
				zap.Any("args", args), zap.Error(err))
			return err
		}
		return nil
	}, conn.backOffer)
	conn.backOffer.Reset()
	return err
}

相关信息

tidb 源码目录

相关文章

tidb block_allow_list 源码

tidb config 源码

tidb consistency 源码

tidb dump 源码

tidb http_handler 源码

tidb ir 源码

tidb ir_impl 源码

tidb metadata 源码

tidb metrics 源码

tidb prepare 源码

0  赞