tidb backend 源码

  • 2022-09-19
  • 浏览 (504)

tidb backend 代码

文件路径:/br/pkg/lightning/backend/backend.go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backend

import (
	"context"
	"fmt"
	"time"

	"github.com/google/uuid"
	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
	"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
	"github.com/pingcap/tidb/br/pkg/lightning/common"
	"github.com/pingcap/tidb/br/pkg/lightning/config"
	"github.com/pingcap/tidb/br/pkg/lightning/log"
	"github.com/pingcap/tidb/br/pkg/lightning/metric"
	"github.com/pingcap/tidb/br/pkg/lightning/mydump"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/table"
	"go.uber.org/zap"
	"golang.org/x/exp/slices"
)

const (
	importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

/*

Usual workflow:

1. Create a `Backend` for the whole process.

2. For each table,

	i. Split into multiple "batches" consisting of data files with roughly equal total size.

	ii. For each batch,

		a. Create an `OpenedEngine` via `backend.OpenEngine()`

		b. For each chunk, deliver data into the engine via `engine.WriteRows()`

		c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()`

		d. Import data via `engine.Import()`

		e. Cleanup via `engine.Cleanup()`

3. Close the connection via `backend.Close()`

*/

func makeTag(tableName string, engineID int32) string {
	return fmt.Sprintf("%s:%d", tableName, engineID)
}

func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger {
	return logger.With(
		zap.String("engineTag", tag),
		zap.Stringer("engineUUID", engineUUID),
	)
}

func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {
	tag := makeTag(tableName, engineID)
	engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
	return tag, engineUUID
}

var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")

type EngineFileSize struct {
	// UUID is the engine's UUID.
	UUID uuid.UUID
	// DiskSize is the estimated total file size on disk right now.
	DiskSize int64
	// MemSize is the total memory size used by the engine. This is the
	// estimated additional size saved onto disk after calling Flush().
	MemSize int64
	// IsImporting indicates whether the engine performing Import().
	IsImporting bool
}

// LocalWriterConfig defines the configuration to open a LocalWriter
type LocalWriterConfig struct {
	// is the chunk KV written to this LocalWriter sent in order
	IsKVSorted bool
}

// EngineConfig defines configuration used for open engine
type EngineConfig struct {
	// TableInfo is the corresponding tidb table info
	TableInfo *checkpoints.TidbTableInfo
	// local backend specified configuration
	Local *LocalEngineConfig
}

// LocalEngineConfig is the configuration used for local backend in OpenEngine.
type LocalEngineConfig struct {
	// compact small SSTs before ingest into pebble
	Compact bool
	// raw kvs size threshold to trigger compact
	CompactThreshold int64
	// compact routine concurrency
	CompactConcurrency int
}

// CheckCtx contains all parameters used in CheckRequirements
type CheckCtx struct {
	DBMetas []*mydump.MDDatabaseMeta
}

// TargetInfoGetter defines the interfaces to get target information.
type TargetInfoGetter interface {
	// FetchRemoteTableModels obtains the models of all tables given the schema
	// name. The returned table info does not need to be precise if the encoder,
	// is not requiring them, but must at least fill in the following fields for
	// TablesFromMeta to succeed:
	//  - Name
	//  - State (must be model.StatePublic)
	//  - ID
	//  - Columns
	//     * Name
	//     * State (must be model.StatePublic)
	//     * Offset (must be 0, 1, 2, ...)
	//  - PKIsHandle (true = do not generate _tidb_rowid)
	FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

	// CheckRequirements performs the check whether the backend satisfies the version requirements
	CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}

// EncodingBuilder consists of operations to handle encoding backend row data formats from source.
type EncodingBuilder interface {
	// NewEncoder creates an encoder of a TiDB table.
	NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)
	// MakeEmptyRows creates an empty collection of encoded rows.
	MakeEmptyRows() kv.Rows
}

// AbstractBackend is the abstract interface behind Backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
	EncodingBuilder
	TargetInfoGetter
	// Close the connection to the backend.
	Close()

	// RetryImportDelay returns the duration to sleep when retrying an import
	RetryImportDelay() time.Duration

	// ShouldPostProcess returns whether KV-specific post-processing should be
	// performed for this backend. Post-processing includes checksum and analyze.
	ShouldPostProcess() bool

	OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error

	CloseEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error

	// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
	// it means there is duplicate detected. For this situation, all data in the engine must be imported.
	// It's safe to reset or cleanup this engine.
	ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error

	CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

	// FlushEngine ensures all KV pairs written to an open engine has been
	// synchronized, such that kill-9'ing Lightning afterwards and resuming from
	// checkpoint can recover the exact same content.
	//
	// This method is only relevant for local backend, and is no-op for all
	// other backends.
	FlushEngine(ctx context.Context, engineUUID uuid.UUID) error

	// FlushAllEngines performs FlushEngine on all opened engines. This is a
	// very expensive operation and should only be used in some rare situation
	// (e.g. preparing to resolve a disk quota violation).
	FlushAllEngines(ctx context.Context) error

	// EngineFileSizes obtains the size occupied locally of all engines managed
	// by this backend. This method is used to compute disk quota.
	// It can return nil if the content are all stored remotely.
	EngineFileSizes() []EngineFileSize

	// ResetEngine clears all written KV pairs in this opened engine.
	ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

	// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
	LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)

	// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
	//  may be repeated with other keys in local data source.
	CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)

	// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
	//  the data import by other lightning.
	CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)

	// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
	// according to the required algorithm.
	ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error

	// TotalMemoryConsume counts total memory usage. This is only used for local backend.
	TotalMemoryConsume() int64
}

// Backend is the delivery target for Lightning
type Backend struct {
	abstract AbstractBackend
}

type engine struct {
	backend AbstractBackend
	logger  log.Logger
	uuid    uuid.UUID
}

// OpenedEngine is an opened engine, allowing data to be written via WriteRows.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type OpenedEngine struct {
	engine
	tableName string
}

// // import_ the data written to the engine into the target.
// import_(ctx context.Context) error

// // cleanup deletes the imported data.
// cleanup(ctx context.Context) error

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
	engine
}

type LocalEngineWriter struct {
	writer    EngineWriter
	tableName string
}

func MakeBackend(ab AbstractBackend) Backend {
	return Backend{abstract: ab}
}

func (be Backend) Close() {
	be.abstract.Close()
}

func (be Backend) MakeEmptyRows() kv.Rows {
	return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
	return be.abstract.NewEncoder(ctx, tbl, options)
}

func (be Backend) ShouldPostProcess() bool {
	return be.abstract.ShouldPostProcess()
}

func (be Backend) CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error {
	return be.abstract.CheckRequirements(ctx, checkCtx)
}

func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
	return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}

func (be Backend) FlushAll(ctx context.Context) error {
	return be.abstract.FlushAllEngines(ctx)
}

func (be Backend) TotalMemoryConsume() int64 {
	return be.abstract.TotalMemoryConsume()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
func (be Backend) CheckDiskQuota(quota int64) (
	largeEngines []uuid.UUID,
	inProgressLargeEngines int,
	totalDiskSize int64,
	totalMemSize int64,
) {
	sizes := be.abstract.EngineFileSizes()
	slices.SortFunc(sizes, func(i, j EngineFileSize) bool {
		if i.IsImporting != j.IsImporting {
			return i.IsImporting
		}
		return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize
	})
	for _, size := range sizes {
		totalDiskSize += size.DiskSize
		totalMemSize += size.MemSize
		if totalDiskSize+totalMemSize > quota {
			if size.IsImporting {
				inProgressLargeEngines++
			} else {
				largeEngines = append(largeEngines, size.UUID)
			}
		}
	}
	return
}

// UnsafeImportAndReset forces the backend to import the content of an engine
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
	// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
	// calling UnsafeImportAndReset().
	closedEngine := ClosedEngine{
		engine: engine{
			backend: be.abstract,
			logger:  makeLogger(log.FromContext(ctx), "<import-and-reset>", engineUUID),
			uuid:    engineUUID,
		},
	}
	if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
		return err
	}
	return be.abstract.ResetEngine(ctx, engineUUID)
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
	tag, engineUUID := MakeUUID(tableName, engineID)
	logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

	if err := be.abstract.OpenEngine(ctx, config, engineUUID); err != nil {
		return nil, err
	}

	if m, ok := metric.FromContext(ctx); ok {
		openCounter := m.ImporterEngineCounter.WithLabelValues("open")
		openCounter.Inc()
	}

	logger.Info("open engine")

	failpoint.Inject("FailIfEngineCountExceeds", func(val failpoint.Value) {
		if m, ok := metric.FromContext(ctx); ok {
			closedCounter := m.ImporterEngineCounter.WithLabelValues("closed")
			openCounter := m.ImporterEngineCounter.WithLabelValues("open")
			openCount := metric.ReadCounter(openCounter)

			closedCount := metric.ReadCounter(closedCounter)
			if injectValue := val.(int); openCount-closedCount > float64(injectValue) {
				panic(fmt.Sprintf("forcing failure due to FailIfEngineCountExceeds: %v - %v >= %d", openCount, closedCount, injectValue))
			}
		}
	})

	return &OpenedEngine{
		engine: engine{
			backend: be.abstract,
			logger:  logger,
			uuid:    engineUUID,
		},
		tableName: tableName,
	}, nil
}

func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
	return be.abstract.CollectLocalDuplicateRows(ctx, tbl, tableName, opts)
}

func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
	return be.abstract.CollectRemoteDuplicateRows(ctx, tbl, tableName, opts)
}

func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error {
	return be.abstract.ResolveDuplicateRows(ctx, tbl, tableName, algorithm)
}

// Close the opened engine to prepare it for importing.
func (engine *OpenedEngine) Close(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
	closedEngine, err := engine.unsafeClose(ctx, cfg)
	if err == nil {
		if m, ok := metric.FromContext(ctx); ok {
			m.ImporterEngineCounter.WithLabelValues("closed").Inc()
		}
	}
	return closedEngine, err
}

// Flush current written data for local backend
func (engine *OpenedEngine) Flush(ctx context.Context) error {
	return engine.backend.FlushEngine(ctx, engine.uuid)
}

func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error) {
	w, err := engine.backend.LocalWriter(ctx, cfg, engine.uuid)
	if err != nil {
		return nil, err
	}
	return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

func (engine *OpenedEngine) TotalMemoryConsume() int64 {
	return engine.engine.backend.TotalMemoryConsume()
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
	return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
	return w.writer.Close(ctx)
}

func (w *LocalEngineWriter) IsSynced() bool {
	return w.writer.IsSynced()
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
	tag, engineUUID := MakeUUID(tableName, engineID)
	return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID)
}

// UnsafeCloseEngineWithUUID closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID) (*ClosedEngine, error) {
	return engine{
		backend: be.abstract,
		logger:  makeLogger(log.FromContext(ctx), tag, engineUUID),
		uuid:    engineUUID,
	}.unsafeClose(ctx, cfg)
}

func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
	task := en.logger.Begin(zap.InfoLevel, "engine close")
	err := en.backend.CloseEngine(ctx, cfg, en.uuid)
	task.End(zap.ErrorLevel, err)
	if err != nil {
		return nil, err
	}
	return &ClosedEngine{engine: en}, nil
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
	var err error

	for i := 0; i < importMaxRetryTimes; i++ {
		task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
		err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
		if !common.IsRetryableError(err) {
			task.End(zap.ErrorLevel, err)
			return err
		}
		task.Warn("import spuriously failed, going to retry again", log.ShortError(err))
		time.Sleep(engine.backend.RetryImportDelay())
	}

	return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, importMaxRetryTimes)
}

// Cleanup deletes the intermediate data from target.
func (engine *ClosedEngine) Cleanup(ctx context.Context) error {
	task := engine.logger.Begin(zap.InfoLevel, "cleanup")
	err := engine.backend.CleanupEngine(ctx, engine.uuid)
	task.End(zap.WarnLevel, err)
	return err
}

func (engine *ClosedEngine) Logger() log.Logger {
	return engine.logger
}

type ChunkFlushStatus interface {
	Flushed() bool
}

type EngineWriter interface {
	AppendRows(
		ctx context.Context,
		tableName string,
		columnNames []string,
		rows kv.Rows,
	) error
	IsSynced() bool
	Close(ctx context.Context) (ChunkFlushStatus, error)
}

func (engine *OpenedEngine) GetEngineUUID() uuid.UUID {
	return engine.uuid
}

相关信息

tidb 源码目录

相关文章

tidb bind_cache 源码

tidb bind_record 源码

tidb handle 源码

tidb session_handle 源码

tidb stat 源码

tidb backup 源码

tidb cmd 源码

tidb debug 源码

tidb main 源码

tidb restore 源码

0  赞