tidb backend 源码
tidb backend 代码
文件路径:/br/pkg/lightning/backend/backend.go
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
const (
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)
/*
Usual workflow:
1. Create a `Backend` for the whole process.
2. For each table,
i. Split into multiple "batches" consisting of data files with roughly equal total size.
ii. For each batch,
a. Create an `OpenedEngine` via `backend.OpenEngine()`
b. For each chunk, deliver data into the engine via `engine.WriteRows()`
c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()`
d. Import data via `engine.Import()`
e. Cleanup via `engine.Cleanup()`
3. Close the connection via `backend.Close()`
*/
func makeTag(tableName string, engineID int32) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}
func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger {
return logger.With(
zap.String("engineTag", tag),
zap.Stringer("engineUUID", engineUUID),
)
}
func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
return tag, engineUUID
}
var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")
type EngineFileSize struct {
// UUID is the engine's UUID.
UUID uuid.UUID
// DiskSize is the estimated total file size on disk right now.
DiskSize int64
// MemSize is the total memory size used by the engine. This is the
// estimated additional size saved onto disk after calling Flush().
MemSize int64
// IsImporting indicates whether the engine performing Import().
IsImporting bool
}
// LocalWriterConfig defines the configuration to open a LocalWriter
type LocalWriterConfig struct {
// is the chunk KV written to this LocalWriter sent in order
IsKVSorted bool
}
// EngineConfig defines configuration used for open engine
type EngineConfig struct {
// TableInfo is the corresponding tidb table info
TableInfo *checkpoints.TidbTableInfo
// local backend specified configuration
Local *LocalEngineConfig
}
// LocalEngineConfig is the configuration used for local backend in OpenEngine.
type LocalEngineConfig struct {
// compact small SSTs before ingest into pebble
Compact bool
// raw kvs size threshold to trigger compact
CompactThreshold int64
// compact routine concurrency
CompactConcurrency int
}
// CheckCtx contains all parameters used in CheckRequirements
type CheckCtx struct {
DBMetas []*mydump.MDDatabaseMeta
}
// TargetInfoGetter defines the interfaces to get target information.
type TargetInfoGetter interface {
// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)
// CheckRequirements performs the check whether the backend satisfies the version requirements
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}
// EncodingBuilder consists of operations to handle encoding backend row data formats from source.
type EncodingBuilder interface {
// NewEncoder creates an encoder of a TiDB table.
NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)
// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() kv.Rows
}
// AbstractBackend is the abstract interface behind Backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
EncodingBuilder
TargetInfoGetter
// Close the connection to the backend.
Close()
// RetryImportDelay returns the duration to sleep when retrying an import
RetryImportDelay() time.Duration
// ShouldPostProcess returns whether KV-specific post-processing should be
// performed for this backend. Post-processing includes checksum and analyze.
ShouldPostProcess() bool
OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error
CloseEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error
// FlushEngine ensures all KV pairs written to an open engine has been
// synchronized, such that kill-9'ing Lightning afterwards and resuming from
// checkpoint can recover the exact same content.
//
// This method is only relevant for local backend, and is no-op for all
// other backends.
FlushEngine(ctx context.Context, engineUUID uuid.UUID) error
// FlushAllEngines performs FlushEngine on all opened engines. This is a
// very expensive operation and should only be used in some rare situation
// (e.g. preparing to resolve a disk quota violation).
FlushAllEngines(ctx context.Context) error
// EngineFileSizes obtains the size occupied locally of all engines managed
// by this backend. This method is used to compute disk quota.
// It can return nil if the content are all stored remotely.
EngineFileSizes() []EngineFileSize
// ResetEngine clears all written KV pairs in this opened engine.
ResetEngine(ctx context.Context, engineUUID uuid.UUID) error
// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)
// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
// may be repeated with other keys in local data source.
CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)
// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
// the data import by other lightning.
CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)
// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
// according to the required algorithm.
ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error
// TotalMemoryConsume counts total memory usage. This is only used for local backend.
TotalMemoryConsume() int64
}
// Backend is the delivery target for Lightning
type Backend struct {
abstract AbstractBackend
}
type engine struct {
backend AbstractBackend
logger log.Logger
uuid uuid.UUID
}
// OpenedEngine is an opened engine, allowing data to be written via WriteRows.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type OpenedEngine struct {
engine
tableName string
}
// // import_ the data written to the engine into the target.
// import_(ctx context.Context) error
// // cleanup deletes the imported data.
// cleanup(ctx context.Context) error
// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
engine
}
type LocalEngineWriter struct {
writer EngineWriter
tableName string
}
func MakeBackend(ab AbstractBackend) Backend {
return Backend{abstract: ab}
}
func (be Backend) Close() {
be.abstract.Close()
}
func (be Backend) MakeEmptyRows() kv.Rows {
return be.abstract.MakeEmptyRows()
}
func (be Backend) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(ctx, tbl, options)
}
func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}
func (be Backend) CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error {
return be.abstract.CheckRequirements(ctx, checkCtx)
}
func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}
func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}
func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}
// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
func (be Backend) CheckDiskQuota(quota int64) (
largeEngines []uuid.UUID,
inProgressLargeEngines int,
totalDiskSize int64,
totalMemSize int64,
) {
sizes := be.abstract.EngineFileSizes()
slices.SortFunc(sizes, func(i, j EngineFileSize) bool {
if i.IsImporting != j.IsImporting {
return i.IsImporting
}
return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
totalMemSize += size.MemSize
if totalDiskSize+totalMemSize > quota {
if size.IsImporting {
inProgressLargeEngines++
} else {
largeEngines = append(largeEngines, size.UUID)
}
}
}
return
}
// UnsafeImportAndReset forces the backend to import the content of an engine
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger(log.FromContext(ctx), "<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
}
// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)
if err := be.abstract.OpenEngine(ctx, config, engineUUID); err != nil {
return nil, err
}
if m, ok := metric.FromContext(ctx); ok {
openCounter := m.ImporterEngineCounter.WithLabelValues("open")
openCounter.Inc()
}
logger.Info("open engine")
failpoint.Inject("FailIfEngineCountExceeds", func(val failpoint.Value) {
if m, ok := metric.FromContext(ctx); ok {
closedCounter := m.ImporterEngineCounter.WithLabelValues("closed")
openCounter := m.ImporterEngineCounter.WithLabelValues("open")
openCount := metric.ReadCounter(openCounter)
closedCount := metric.ReadCounter(closedCounter)
if injectValue := val.(int); openCount-closedCount > float64(injectValue) {
panic(fmt.Sprintf("forcing failure due to FailIfEngineCountExceeds: %v - %v >= %d", openCount, closedCount, injectValue))
}
}
})
return &OpenedEngine{
engine: engine{
backend: be.abstract,
logger: logger,
uuid: engineUUID,
},
tableName: tableName,
}, nil
}
func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
return be.abstract.CollectLocalDuplicateRows(ctx, tbl, tableName, opts)
}
func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
return be.abstract.CollectRemoteDuplicateRows(ctx, tbl, tableName, opts)
}
func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error {
return be.abstract.ResolveDuplicateRows(ctx, tbl, tableName, algorithm)
}
// Close the opened engine to prepare it for importing.
func (engine *OpenedEngine) Close(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
closedEngine, err := engine.unsafeClose(ctx, cfg)
if err == nil {
if m, ok := metric.FromContext(ctx); ok {
m.ImporterEngineCounter.WithLabelValues("closed").Inc()
}
}
return closedEngine, err
}
// Flush current written data for local backend
func (engine *OpenedEngine) Flush(ctx context.Context) error {
return engine.backend.FlushEngine(ctx, engine.uuid)
}
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error) {
w, err := engine.backend.LocalWriter(ctx, cfg, engine.uuid)
if err != nil {
return nil, err
}
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}
func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}
// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}
func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
}
// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID)
}
// UnsafeCloseEngineWithUUID closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID) (*ClosedEngine, error) {
return engine{
backend: be.abstract,
logger: makeLogger(log.FromContext(ctx), tag, engineUUID),
uuid: engineUUID,
}.unsafeClose(ctx, cfg)
}
func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
task := en.logger.Begin(zap.InfoLevel, "engine close")
err := en.backend.CloseEngine(ctx, cfg, en.uuid)
task.End(zap.ErrorLevel, err)
if err != nil {
return nil, err
}
return &ClosedEngine{engine: en}, nil
}
// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error
for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err
}
task.Warn("import spuriously failed, going to retry again", log.ShortError(err))
time.Sleep(engine.backend.RetryImportDelay())
}
return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, importMaxRetryTimes)
}
// Cleanup deletes the intermediate data from target.
func (engine *ClosedEngine) Cleanup(ctx context.Context) error {
task := engine.logger.Begin(zap.InfoLevel, "cleanup")
err := engine.backend.CleanupEngine(ctx, engine.uuid)
task.End(zap.WarnLevel, err)
return err
}
func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}
type ChunkFlushStatus interface {
Flushed() bool
}
type EngineWriter interface {
AppendRows(
ctx context.Context,
tableName string,
columnNames []string,
rows kv.Rows,
) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}
func (engine *OpenedEngine) GetEngineUUID() uuid.UUID {
return engine.uuid
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦