tidb local 源码

  • 2022-09-19
  • 浏览 (484)

tidb local 代码

文件路径:/br/pkg/lightning/backend/local/local.go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"github.com/cockroachdb/pebble"
	"github.com/coreos/go-semver/semver"
	"github.com/docker/go-units"
	"github.com/google/uuid"
	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/kvproto/pkg/errorpb"
	sst "github.com/pingcap/kvproto/pkg/import_sstpb"
	"github.com/pingcap/kvproto/pkg/kvrpcpb"
	"github.com/pingcap/kvproto/pkg/metapb"
	"github.com/pingcap/tidb/br/pkg/lightning/backend"
	"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
	"github.com/pingcap/tidb/br/pkg/lightning/common"
	"github.com/pingcap/tidb/br/pkg/lightning/config"
	"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
	"github.com/pingcap/tidb/br/pkg/lightning/glue"
	"github.com/pingcap/tidb/br/pkg/lightning/log"
	"github.com/pingcap/tidb/br/pkg/lightning/manual"
	"github.com/pingcap/tidb/br/pkg/lightning/metric"
	"github.com/pingcap/tidb/br/pkg/lightning/tikv"
	"github.com/pingcap/tidb/br/pkg/lightning/worker"
	"github.com/pingcap/tidb/br/pkg/logutil"
	"github.com/pingcap/tidb/br/pkg/membuf"
	"github.com/pingcap/tidb/br/pkg/pdutil"
	"github.com/pingcap/tidb/br/pkg/restore/split"
	"github.com/pingcap/tidb/br/pkg/utils"
	"github.com/pingcap/tidb/br/pkg/version"
	"github.com/pingcap/tidb/infoschema"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/table"
	"github.com/pingcap/tidb/tablecodec"
	"github.com/pingcap/tidb/types"
	"github.com/pingcap/tidb/util/codec"
	"github.com/pingcap/tidb/util/engine"
	"github.com/pingcap/tidb/util/mathutil"
	tikverror "github.com/tikv/client-go/v2/error"
	"github.com/tikv/client-go/v2/oracle"
	tikvclient "github.com/tikv/client-go/v2/tikv"
	pd "github.com/tikv/pd/client"
	"go.uber.org/atomic"
	"go.uber.org/multierr"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"golang.org/x/time/rate"
	"google.golang.org/grpc"
	"google.golang.org/grpc/backoff"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/keepalive"
	"google.golang.org/grpc/status"
)

const (
	dialTimeout             = 5 * time.Minute
	maxRetryTimes           = 5
	defaultRetryBackoffTime = 3 * time.Second
	// maxWriteAndIngestRetryTimes is the max retry times for write and ingest.
	// A large retry times is for tolerating tikv cluster failures.
	maxWriteAndIngestRetryTimes = 30
	maxRetryBackoffTime         = 30 * time.Second

	gRPCKeepAliveTime    = 10 * time.Minute
	gRPCKeepAliveTimeout = 5 * time.Minute
	gRPCBackOffMaxDelay  = 10 * time.Minute

	// The max ranges count in a batch to split and scatter.
	maxBatchSplitRanges = 4096

	propRangeIndex = "tikv.range_index"

	defaultPropSizeIndexDistance = 4 * units.MiB
	defaultPropKeysIndexDistance = 40 * 1024

	// the lower threshold of max open files for pebble db.
	openFilesLowerThreshold = 128

	duplicateDBName = "duplicates"
	scanRegionLimit = 128
)

var (
	// Local backend is compatible with TiDB [4.0.0, NextMajorVersion).
	localMinTiDBVersion = *semver.New("4.0.0")
	localMinTiKVVersion = *semver.New("4.0.0")
	localMinPDVersion   = *semver.New("4.0.0")
	localMaxTiDBVersion = version.NextMajorVersion()
	localMaxTiKVVersion = version.NextMajorVersion()
	localMaxPDVersion   = version.NextMajorVersion()
	tiFlashMinVersion   = *semver.New("4.0.5")

	errorEngineClosed = errors.New("engine is closed")
)

// ImportClientFactory is factory to create new import client for specific store.
type ImportClientFactory interface {
	Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error)
	Close()
}

type importClientFactoryImpl struct {
	conns          *common.GRPCConns
	splitCli       split.SplitClient
	tls            *common.TLS
	tcpConcurrency int
}

func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl {
	return &importClientFactoryImpl{
		conns:          common.NewGRPCConns(),
		splitCli:       splitCli,
		tls:            tls,
		tcpConcurrency: tcpConcurrency,
	}
}

func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
	store, err := f.splitCli.GetStore(ctx, storeID)
	if err != nil {
		return nil, errors.Trace(err)
	}
	opt := grpc.WithInsecure()
	if f.tls.TLSConfig() != nil {
		opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))
	}
	ctx, cancel := context.WithTimeout(ctx, dialTimeout)

	bfConf := backoff.DefaultConfig
	bfConf.MaxDelay = gRPCBackOffMaxDelay
	// we should use peer address for tiflash. for tikv, peer address is empty
	addr := store.GetPeerAddress()
	if addr == "" {
		addr = store.GetAddress()
	}
	conn, err := grpc.DialContext(
		ctx,
		addr,
		opt,
		grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
		grpc.WithKeepaliveParams(keepalive.ClientParameters{
			Time:                gRPCKeepAliveTime,
			Timeout:             gRPCKeepAliveTimeout,
			PermitWithoutStream: true,
		}),
	)
	cancel()
	if err != nil {
		return nil, errors.Trace(err)
	}
	return conn, nil
}

func (f *importClientFactoryImpl) getGrpcConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
	return f.conns.GetGrpcConn(ctx, storeID, f.tcpConcurrency,
		func(ctx context.Context) (*grpc.ClientConn, error) {
			return f.makeConn(ctx, storeID)
		})
}

func (f *importClientFactoryImpl) Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
	conn, err := f.getGrpcConn(ctx, storeID)
	if err != nil {
		return nil, err
	}
	return sst.NewImportSSTClient(conn), nil
}

func (f *importClientFactoryImpl) Close() {
	f.conns.Close()
}

// Range record start and end key for localStoreDir.DB
// so we can write it to tikv in streaming
type Range struct {
	start []byte
	end   []byte
}

type encodingBuilder struct {
	metrics *metric.Metrics
}

// NewEncodingBuilder creates an KVEncodingBuilder with local backend implementation.
func NewEncodingBuilder(ctx context.Context) backend.EncodingBuilder {
	result := new(encodingBuilder)
	if m, ok := metric.FromContext(ctx); ok {
		result.metrics = m
	}
	return result
}

// NewEncoder creates a KV encoder.
// It implements the `backend.EncodingBuilder` interface.
func (b *encodingBuilder) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
	return kv.NewTableKVEncoder(tbl, options, b.metrics, log.FromContext(ctx))
}

// MakeEmptyRows creates an empty KV rows.
// It implements the `backend.EncodingBuilder` interface.
func (b *encodingBuilder) MakeEmptyRows() kv.Rows {
	return kv.MakeRowsFromKvPairs(nil)
}

type targetInfoGetter struct {
	tls          *common.TLS
	targetDBGlue glue.Glue
	pdAddr       string
}

// NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
func NewTargetInfoGetter(tls *common.TLS, g glue.Glue, pdAddr string) backend.TargetInfoGetter {
	return &targetInfoGetter{
		tls:          tls,
		targetDBGlue: g,
		pdAddr:       pdAddr,
	}
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
	return tikv.FetchRemoteTableModelsFromTLS(ctx, g.tls, schemaName)
}

// CheckRequirements performs the check whether the backend satisfies the version requirements.
// It implements the `TargetInfoGetter` interface.
func (g *targetInfoGetter) CheckRequirements(ctx context.Context, checkCtx *backend.CheckCtx) error {
	// TODO: support lightning via SQL
	db, _ := g.targetDBGlue.GetDB()
	versionStr, err := version.FetchVersion(ctx, db)
	if err != nil {
		return errors.Trace(err)
	}
	if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
		return err
	}
	if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
		return err
	}
	if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
		return err
	}

	serverInfo := version.ParseServerInfo(versionStr)
	return checkTiFlashVersion(ctx, g.targetDBGlue, checkCtx, *serverInfo.ServerVersion)
}

func checkTiDBVersion(_ context.Context, versionStr string, requiredMinVersion, requiredMaxVersion semver.Version) error {
	return version.CheckTiDBVersion(versionStr, requiredMinVersion, requiredMaxVersion)
}

var tiFlashReplicaQuery = "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TIFLASH_REPLICA WHERE REPLICA_COUNT > 0;"

// TiFlashReplicaQueryForTest is only used for tests.
var TiFlashReplicaQueryForTest = tiFlashReplicaQuery

type tblName struct {
	schema string
	name   string
}

type tblNames []tblName

func (t tblNames) String() string {
	var b strings.Builder
	b.WriteByte('[')
	for i, n := range t {
		if i > 0 {
			b.WriteString(", ")
		}
		b.WriteString(common.UniqueTable(n.schema, n.name))
	}
	b.WriteByte(']')
	return b.String()
}

// CheckTiFlashVersionForTest is only used for tests.
var CheckTiFlashVersionForTest = checkTiFlashVersion

// check TiFlash replicas.
// local backend doesn't support TiFlash before tidb v4.0.5
func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.CheckCtx, tidbVersion semver.Version) error {
	if tidbVersion.Compare(tiFlashMinVersion) >= 0 {
		return nil
	}

	res, err := g.GetSQLExecutor().QueryStringsWithLog(ctx, tiFlashReplicaQuery, "fetch tiflash replica info", log.FromContext(ctx))
	if err != nil {
		return errors.Annotate(err, "fetch tiflash replica info failed")
	}

	tiFlashTablesMap := make(map[tblName]struct{}, len(res))
	for _, tblInfo := range res {
		name := tblName{schema: tblInfo[0], name: tblInfo[1]}
		tiFlashTablesMap[name] = struct{}{}
	}

	tiFlashTables := make(tblNames, 0)
	for _, dbMeta := range checkCtx.DBMetas {
		for _, tblMeta := range dbMeta.Tables {
			if len(tblMeta.DataFiles) == 0 {
				continue
			}
			name := tblName{schema: tblMeta.DB, name: tblMeta.Name}
			if _, ok := tiFlashTablesMap[name]; ok {
				tiFlashTables = append(tiFlashTables, name)
			}
		}
	}

	if len(tiFlashTables) > 0 {
		helpInfo := "Please either upgrade TiDB to version >= 4.0.5 or add TiFlash replica after load data."
		return errors.Errorf("lightning local backend doesn't support TiFlash in this TiDB version. conflict tables: %s. "+helpInfo, tiFlashTables)
	}
	return nil
}

type local struct {
	engines sync.Map // sync version of map[uuid.UUID]*Engine

	pdCtl    *pdutil.PdController
	splitCli split.SplitClient
	tikvCli  *tikvclient.KVStore
	tls      *common.TLS
	pdAddr   string
	g        glue.Glue

	localStoreDir string

	rangeConcurrency  *worker.Pool
	ingestConcurrency *worker.Pool
	batchWriteKVPairs int
	checkpointEnabled bool

	dupeConcurrency int
	maxOpenFiles    int

	engineMemCacheSize      int
	localWriterMemCacheSize int64
	supportMultiIngest      bool

	checkTiKVAvaliable  bool
	duplicateDetection  bool
	duplicateDB         *pebble.DB
	keyAdapter          KeyAdapter
	errorMgr            *errormanager.ErrorManager
	importClientFactory ImportClientFactory

	bufferPool   *membuf.Pool
	metrics      *metric.Metrics
	writeLimiter StoreWriteLimiter
	logger       log.Logger

	encBuilder       backend.EncodingBuilder
	targetInfoGetter backend.TargetInfoGetter
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
	dbPath := filepath.Join(storeDir, duplicateDBName)
	// TODO: Optimize the opts for better write.
	opts := &pebble.Options{
		TablePropertyCollectors: []func() pebble.TablePropertyCollector{
			newRangePropertiesCollector,
		},
	}
	return pebble.Open(dbPath, opts)
}

// NewLocalBackend creates new connections to tikv.
func NewLocalBackend(
	ctx context.Context,
	tls *common.TLS,
	cfg *config.Config,
	g glue.Glue,
	maxOpenFiles int,
	errorMgr *errormanager.ErrorManager,
) (backend.Backend, error) {
	localFile := cfg.TikvImporter.SortedKVDir
	rangeConcurrency := cfg.TikvImporter.RangeConcurrency

	pdCtl, err := pdutil.NewPdController(ctx, cfg.TiDB.PdAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
	if err != nil {
		return backend.MakeBackend(nil), common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
	}
	splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

	shouldCreate := true
	if cfg.Checkpoint.Enable {
		if info, err := os.Stat(localFile); err != nil {
			if !os.IsNotExist(err) {
				return backend.MakeBackend(nil), err
			}
		} else if info.IsDir() {
			shouldCreate = false
		}
	}

	if shouldCreate {
		err = os.Mkdir(localFile, 0o700)
		if err != nil {
			return backend.MakeBackend(nil), common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(localFile)
		}
	}

	var duplicateDB *pebble.DB
	if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
		duplicateDB, err = openDuplicateDB(localFile)
		if err != nil {
			return backend.MakeBackend(nil), common.ErrOpenDuplicateDB.Wrap(err).GenWithStackByArgs()
		}
	}

	// The following copies tikv.NewTxnClient without creating yet another pdClient.
	spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(cfg.TiDB.PdAddr, ","), tls.TLSConfig())
	if err != nil {
		return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
	}
	rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()))
	pdCliForTiKV := &tikvclient.CodecPDClient{Client: pdCtl.GetPDClient()}
	tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
	if err != nil {
		return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
	}
	importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency)
	duplicateDetection := cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone
	keyAdapter := KeyAdapter(noopKeyAdapter{})
	if duplicateDetection {
		keyAdapter = dupDetectKeyAdapter{}
	}
	var writeLimiter StoreWriteLimiter
	if cfg.TikvImporter.StoreWriteBWLimit > 0 {
		writeLimiter = newStoreWriteLimiter(int(cfg.TikvImporter.StoreWriteBWLimit))
	} else {
		writeLimiter = noopStoreWriteLimiter{}
	}
	local := &local{
		engines:  sync.Map{},
		pdCtl:    pdCtl,
		splitCli: splitCli,
		tikvCli:  tikvCli,
		tls:      tls,
		pdAddr:   cfg.TiDB.PdAddr,
		g:        g,

		localStoreDir:     localFile,
		rangeConcurrency:  worker.NewPool(ctx, rangeConcurrency, "range"),
		ingestConcurrency: worker.NewPool(ctx, rangeConcurrency*2, "ingest"),
		dupeConcurrency:   rangeConcurrency * 2,
		batchWriteKVPairs: cfg.TikvImporter.SendKVPairs,
		checkpointEnabled: cfg.Checkpoint.Enable,
		maxOpenFiles:      mathutil.Max(maxOpenFiles, openFilesLowerThreshold),

		engineMemCacheSize:      int(cfg.TikvImporter.EngineMemCacheSize),
		localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
		duplicateDetection:      duplicateDetection,
		checkTiKVAvaliable:      cfg.App.CheckRequirements,
		duplicateDB:             duplicateDB,
		keyAdapter:              keyAdapter,
		errorMgr:                errorMgr,
		importClientFactory:     importClientFactory,
		bufferPool:              membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
		writeLimiter:            writeLimiter,
		logger:                  log.FromContext(ctx),
		encBuilder:              NewEncodingBuilder(ctx),
		targetInfoGetter:        NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr),
	}
	if m, ok := metric.FromContext(ctx); ok {
		local.metrics = m
	}
	if err = local.checkMultiIngestSupport(ctx); err != nil {
		return backend.MakeBackend(nil), common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
	}

	return backend.MakeBackend(local), nil
}

func (local *local) TotalMemoryConsume() int64 {
	var memConsume int64 = 0
	local.engines.Range(func(k, v interface{}) bool {
		e := v.(*Engine)
		if e != nil {
			memConsume += e.TotalMemorySize()
		}
		return true
	})
	return memConsume + local.bufferPool.TotalSize()
}

func (local *local) checkMultiIngestSupport(ctx context.Context) error {
	stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
	if err != nil {
		return errors.Trace(err)
	}

	hasTiFlash := false
	for _, s := range stores {
		if s.State == metapb.StoreState_Up && engine.IsTiFlash(s) {
			hasTiFlash = true
			break
		}
	}

	for _, s := range stores {
		// skip stores that are not online
		if s.State != metapb.StoreState_Up || engine.IsTiFlash(s) {
			continue
		}
		var err error
		for i := 0; i < maxRetryTimes; i++ {
			if i > 0 {
				select {
				case <-time.After(100 * time.Millisecond):
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			client, err1 := local.getImportClient(ctx, s.Id)
			if err1 != nil {
				err = err1
				log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
				continue
			}
			_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
			if err == nil {
				break
			}
			if st, ok := status.FromError(err); ok {
				if st.Code() == codes.Unimplemented {
					log.FromContext(ctx).Info("multi ingest not support", zap.Any("unsupported store", s))
					local.supportMultiIngest = false
					return nil
				}
			}
			log.FromContext(ctx).Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address),
				zap.Int("retry", i))
		}
		if err != nil {
			// if the cluster contains no TiFlash store, we don't need the multi-ingest feature,
			// so in this condition, downgrade the logic instead of return an error.
			if hasTiFlash {
				return errors.Trace(err)
			}
			log.FromContext(ctx).Warn("check multi failed all retry, fallback to false", log.ShortError(err))
			local.supportMultiIngest = false
			return nil
		}
	}

	local.supportMultiIngest = true
	log.FromContext(ctx).Info("multi ingest support")
	return nil
}

// rlock read locks a local file and returns the Engine instance if it exists.
func (local *local) rLockEngine(engineId uuid.UUID) *Engine {
	if e, ok := local.engines.Load(engineId); ok {
		engine := e.(*Engine)
		engine.rLock()
		return engine
	}
	return nil
}

// lock locks a local file and returns the Engine instance if it exists.
func (local *local) lockEngine(engineID uuid.UUID, state importMutexState) *Engine {
	if e, ok := local.engines.Load(engineID); ok {
		engine := e.(*Engine)
		engine.lock(state)
		return engine
	}
	return nil
}

// tryRLockAllEngines tries to read lock all engines, return all `Engine`s that are successfully locked.
func (local *local) tryRLockAllEngines() []*Engine {
	var allEngines []*Engine
	local.engines.Range(func(k, v interface{}) bool {
		engine := v.(*Engine)
		// skip closed engine
		if engine.tryRLock() {
			if !engine.closed.Load() {
				allEngines = append(allEngines, engine)
			} else {
				engine.rUnlock()
			}
		}
		return true
	})
	return allEngines
}

// lockAllEnginesUnless tries to lock all engines, unless those which are already locked in the
// state given by ignoreStateMask. Returns the list of locked engines.
func (local *local) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*Engine {
	var allEngines []*Engine
	local.engines.Range(func(k, v interface{}) bool {
		engine := v.(*Engine)
		if engine.lockUnless(newState, ignoreStateMask) {
			allEngines = append(allEngines, engine)
		}
		return true
	})
	return allEngines
}

// Close the local backend.
func (local *local) Close() {
	allEngines := local.lockAllEnginesUnless(importMutexStateClose, 0)
	local.engines = sync.Map{}

	for _, engine := range allEngines {
		_ = engine.Close()
		engine.unlock()
	}

	local.importClientFactory.Close()
	local.bufferPool.Destroy()

	if local.duplicateDB != nil {
		// Check if there are duplicates that are not collected.
		iter := local.duplicateDB.NewIter(&pebble.IterOptions{})
		hasDuplicates := iter.First()
		allIsWell := true
		if err := iter.Error(); err != nil {
			local.logger.Warn("iterate duplicate db failed", zap.Error(err))
			allIsWell = false
		}
		if err := iter.Close(); err != nil {
			local.logger.Warn("close duplicate db iter failed", zap.Error(err))
			allIsWell = false
		}
		if err := local.duplicateDB.Close(); err != nil {
			local.logger.Warn("close duplicate db failed", zap.Error(err))
			allIsWell = false
		}
		// If checkpoint is disabled, or we don't detect any duplicate, then this duplicate
		// db dir will be useless, so we clean up this dir.
		if allIsWell && (!local.checkpointEnabled || !hasDuplicates) {
			if err := os.RemoveAll(filepath.Join(local.localStoreDir, duplicateDBName)); err != nil {
				local.logger.Warn("remove duplicate db file failed", zap.Error(err))
			}
		}
		local.duplicateDB = nil
	}

	// if checkpoint is disable or we finish load all data successfully, then files in this
	// dir will be useless, so we clean up this dir and all files in it.
	if !local.checkpointEnabled || common.IsEmptyDir(local.localStoreDir) {
		err := os.RemoveAll(local.localStoreDir)
		if err != nil {
			local.logger.Warn("remove local db file failed", zap.Error(err))
		}
	}
	_ = local.tikvCli.Close()
	local.pdCtl.Close()
}

// FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart
func (local *local) FlushEngine(ctx context.Context, engineID uuid.UUID) error {
	engine := local.rLockEngine(engineID)

	// the engine cannot be deleted after while we've acquired the lock identified by UUID.
	if engine == nil {
		return errors.Errorf("engine '%s' not found", engineID)
	}
	defer engine.rUnlock()
	if engine.closed.Load() {
		return nil
	}
	return engine.flushEngineWithoutLock(ctx)
}

func (local *local) FlushAllEngines(parentCtx context.Context) (err error) {
	allEngines := local.tryRLockAllEngines()
	defer func() {
		for _, engine := range allEngines {
			engine.rUnlock()
		}
	}()

	eg, ctx := errgroup.WithContext(parentCtx)
	for _, engine := range allEngines {
		e := engine
		eg.Go(func() error {
			return e.flushEngineWithoutLock(ctx)
		})
	}
	return eg.Wait()
}

func (local *local) RetryImportDelay() time.Duration {
	return defaultRetryBackoffTime
}

func (local *local) ShouldPostProcess() bool {
	return true
}

func (local *local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.DB, error) {
	opt := &pebble.Options{
		MemTableSize: local.engineMemCacheSize,
		// the default threshold value may cause write stall.
		MemTableStopWritesThreshold: 8,
		MaxConcurrentCompactions:    16,
		// set threshold to half of the max open files to avoid trigger compaction
		L0CompactionThreshold: math.MaxInt32,
		L0StopWritesThreshold: math.MaxInt32,
		LBaseMaxBytes:         16 * units.TiB,
		MaxOpenFiles:          local.maxOpenFiles,
		DisableWAL:            true,
		ReadOnly:              readOnly,
		TablePropertyCollectors: []func() pebble.TablePropertyCollector{
			newRangePropertiesCollector,
		},
	}
	// set level target file size to avoid pebble auto triggering compaction that split ingest SST files into small SST.
	opt.Levels = []pebble.LevelOptions{
		{
			TargetFileSize: 16 * units.GiB,
		},
	}

	dbPath := filepath.Join(local.localStoreDir, engineUUID.String())
	db, err := pebble.Open(dbPath, opt)
	return db, errors.Trace(err)
}

// OpenEngine must be called with holding mutex of Engine.
func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error {
	engineCfg := backend.LocalEngineConfig{}
	if cfg.Local != nil {
		engineCfg = *cfg.Local
	}
	db, err := local.openEngineDB(engineUUID, false)
	if err != nil {
		return err
	}

	sstDir := engineSSTDir(local.localStoreDir, engineUUID)
	if err := os.RemoveAll(sstDir); err != nil {
		return errors.Trace(err)
	}
	if !common.IsDirExists(sstDir) {
		if err := os.Mkdir(sstDir, 0o750); err != nil {
			return errors.Trace(err)
		}
	}
	engineCtx, cancel := context.WithCancel(ctx)

	e, _ := local.engines.LoadOrStore(engineUUID, &Engine{
		UUID:               engineUUID,
		sstDir:             sstDir,
		sstMetasChan:       make(chan metaOrFlush, 64),
		ctx:                engineCtx,
		cancel:             cancel,
		config:             engineCfg,
		tableInfo:          cfg.TableInfo,
		duplicateDetection: local.duplicateDetection,
		duplicateDB:        local.duplicateDB,
		errorMgr:           local.errorMgr,
		keyAdapter:         local.keyAdapter,
		logger:             log.FromContext(ctx),
	})
	engine := e.(*Engine)
	engine.db = db
	engine.sstIngester = dbSSTIngester{e: engine}
	if err = engine.loadEngineMeta(); err != nil {
		return errors.Trace(err)
	}
	if err = local.allocateTSIfNotExists(ctx, engine); err != nil {
		return errors.Trace(err)
	}
	engine.wg.Add(1)
	go engine.ingestSSTLoop()
	return nil
}

func (local *local) allocateTSIfNotExists(ctx context.Context, engine *Engine) error {
	if engine.TS > 0 {
		return nil
	}
	physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx)
	if err != nil {
		return err
	}
	ts := oracle.ComposeTS(physical, logical)
	engine.TS = ts
	return engine.saveEngineMeta()
}

// CloseEngine closes backend engine by uuid.
func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error {
	// flush mem table to storage, to free memory,
	// ask others' advise, looks like unnecessary, but with this we can control memory precisely.
	engineI, ok := local.engines.Load(engineUUID)
	if !ok {
		// recovery mode, we should reopen this engine file
		db, err := local.openEngineDB(engineUUID, true)
		if err != nil {
			return err
		}
		engine := &Engine{
			UUID:               engineUUID,
			db:                 db,
			sstMetasChan:       make(chan metaOrFlush),
			tableInfo:          cfg.TableInfo,
			keyAdapter:         local.keyAdapter,
			duplicateDetection: local.duplicateDetection,
			duplicateDB:        local.duplicateDB,
			errorMgr:           local.errorMgr,
			logger:             log.FromContext(ctx),
		}
		engine.sstIngester = dbSSTIngester{e: engine}
		if err = engine.loadEngineMeta(); err != nil {
			return err
		}
		local.engines.Store(engineUUID, engine)
		return nil
	}

	engine := engineI.(*Engine)
	engine.rLock()
	if engine.closed.Load() {
		engine.rUnlock()
		return nil
	}

	err := engine.flushEngineWithoutLock(ctx)
	engine.rUnlock()

	// use mutex to make sure we won't close sstMetasChan while other routines
	// trying to do flush.
	engine.lock(importMutexStateClose)
	engine.closed.Store(true)
	close(engine.sstMetasChan)
	engine.unlock()
	if err != nil {
		return errors.Trace(err)
	}
	engine.wg.Wait()
	return engine.ingestErr.Get()
}

func (local *local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
	return local.importClientFactory.Create(ctx, storeID)
}

type rangeStats struct {
	count      int64
	totalBytes int64
}

// WriteToTiKV writer engine key-value pairs to tikv and return the sst meta generated by tikv.
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
// tikv will takes the responsibility to do so.
func (local *local) WriteToTiKV(
	ctx context.Context,
	engine *Engine,
	region *split.RegionInfo,
	start, end []byte,
	regionSplitSize int64,
	regionSplitKeys int64,
) ([]*sst.SSTMeta, Range, rangeStats, error) {
	failpoint.Inject("WriteToTiKVNotEnoughDiskSpace", func(_ failpoint.Value) {
		failpoint.Return(nil, Range{}, rangeStats{},
			errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", "", 0, 0))
	})
	if local.checkTiKVAvaliable {
		for _, peer := range region.Region.GetPeers() {
			var e error
			for i := 0; i < maxRetryTimes; i++ {
				store, err := local.pdCtl.GetStoreInfo(ctx, peer.StoreId)
				if err != nil {
					e = err
					continue
				}
				if store.Status.Capacity > 0 {
					// The available disk percent of TiKV
					ratio := store.Status.Available * 100 / store.Status.Capacity
					if ratio < 10 {
						return nil, Range{}, rangeStats{}, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d",
							store.Store.Address, store.Status.Available, store.Status.Capacity)
					}
				}
				break
			}
			if e != nil {
				log.FromContext(ctx).Error("failed to get StoreInfo from pd http api", zap.Error(e))
			}
		}
	}
	begin := time.Now()
	regionRange := intersectRange(region.Region, Range{start: start, end: end})
	opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end}
	iter := engine.newKVIter(ctx, opt)
	//nolint: errcheck
	defer iter.Close()

	stats := rangeStats{}

	iter.First()
	if iter.Error() != nil {
		return nil, Range{}, stats, errors.Annotate(iter.Error(), "failed to read the first key")
	}
	if !iter.Valid() {
		log.FromContext(ctx).Info("keys within region is empty, skip ingest", logutil.Key("start", start),
			logutil.Key("regionStart", region.Region.StartKey), logutil.Key("end", end),
			logutil.Key("regionEnd", region.Region.EndKey))
		return nil, regionRange, stats, nil
	}
	firstKey := codec.EncodeBytes([]byte{}, iter.Key())
	iter.Last()
	if iter.Error() != nil {
		return nil, Range{}, stats, errors.Annotate(iter.Error(), "failed to seek to the last key")
	}
	lastKey := codec.EncodeBytes([]byte{}, iter.Key())

	u := uuid.New()
	meta := &sst.SSTMeta{
		Uuid:        u[:],
		RegionId:    region.Region.GetId(),
		RegionEpoch: region.Region.GetRegionEpoch(),
		Range: &sst.Range{
			Start: firstKey,
			End:   lastKey,
		},
	}

	leaderID := region.Leader.GetId()
	clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
	storeIDs := make([]uint64, 0, len(region.Region.GetPeers()))
	requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
	for _, peer := range region.Region.GetPeers() {
		cli, err := local.getImportClient(ctx, peer.StoreId)
		if err != nil {
			return nil, Range{}, stats, err
		}

		wstream, err := cli.Write(ctx)
		if err != nil {
			return nil, Range{}, stats, errors.Trace(err)
		}

		// Bind uuid for this write request
		req := &sst.WriteRequest{
			Chunk: &sst.WriteRequest_Meta{
				Meta: meta,
			},
		}
		if err = wstream.Send(req); err != nil {
			return nil, Range{}, stats, errors.Trace(err)
		}
		req.Chunk = &sst.WriteRequest_Batch{
			Batch: &sst.WriteBatch{
				CommitTs: engine.TS,
			},
		}
		clients = append(clients, wstream)
		requests = append(requests, req)
		storeIDs = append(storeIDs, peer.StoreId)
	}

	bytesBuf := local.bufferPool.NewBuffer()
	defer bytesBuf.Destroy()
	pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
	count := 0
	size := int64(0)
	totalSize := int64(0)
	totalCount := int64(0)
	// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
	// because the range-properties is not 100% accurate
	regionMaxSize := regionSplitSize
	if regionSplitSize <= int64(config.SplitRegionSize) {
		regionMaxSize = regionSplitSize * 4 / 3
	}
	// Set a lower flush limit to make the speed of write more smooth.
	flushLimit := int64(local.writeLimiter.Limit() / 10)

	flushKVs := func() error {
		for i := range clients {
			if err := local.writeLimiter.WaitN(ctx, storeIDs[i], int(size)); err != nil {
				return errors.Trace(err)
			}
			requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
			if err := clients[i].Send(requests[i]); err != nil {
				return errors.Trace(err)
			}
		}
		return nil
	}

	for iter.First(); iter.Valid(); iter.Next() {
		kvSize := int64(len(iter.Key()) + len(iter.Value()))
		// here we reuse the `*sst.Pair`s to optimize object allocation
		if count < len(pairs) {
			pairs[count].Key = bytesBuf.AddBytes(iter.Key())
			pairs[count].Value = bytesBuf.AddBytes(iter.Value())
		} else {
			pair := &sst.Pair{
				Key:   bytesBuf.AddBytes(iter.Key()),
				Value: bytesBuf.AddBytes(iter.Value()),
			}
			pairs = append(pairs, pair)
		}
		count++
		totalCount++
		size += kvSize
		totalSize += kvSize

		if count >= local.batchWriteKVPairs || size >= flushLimit {
			if err := flushKVs(); err != nil {
				return nil, Range{}, stats, err
			}
			count = 0
			size = 0
			bytesBuf.Reset()
		}
		if totalSize >= regionMaxSize || totalCount >= regionSplitKeys {
			break
		}
	}

	if iter.Error() != nil {
		return nil, Range{}, stats, errors.Trace(iter.Error())
	}

	if count > 0 {
		if err := flushKVs(); err != nil {
			return nil, Range{}, stats, err
		}
		count = 0
		size = 0
		bytesBuf.Reset()
	}

	var leaderPeerMetas []*sst.SSTMeta
	for i, wStream := range clients {
		resp, closeErr := wStream.CloseAndRecv()
		if closeErr != nil {
			return nil, Range{}, stats, errors.Trace(closeErr)
		}
		if resp.Error != nil {
			return nil, Range{}, stats, errors.New(resp.Error.Message)
		}
		if leaderID == region.Region.Peers[i].GetId() {
			leaderPeerMetas = resp.Metas
			log.FromContext(ctx).Debug("get metas after write kv stream to tikv", zap.Reflect("metas", leaderPeerMetas))
		}
	}

	// if there is not leader currently, we should directly return an error
	if len(leaderPeerMetas) == 0 {
		log.FromContext(ctx).Warn("write to tikv no leader", logutil.Region(region.Region), logutil.Leader(region.Leader),
			zap.Uint64("leader_id", leaderID), logutil.SSTMeta(meta),
			zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size))
		return nil, Range{}, stats, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
			region.Region.Id, leaderID)
	}

	log.FromContext(ctx).Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID),
		zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas),
		zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size),
		zap.Int64("buf_size", bytesBuf.TotalSize()),
		zap.Stringer("takeTime", time.Since(begin)))

	finishedRange := regionRange
	if iter.Valid() && iter.Next() {
		firstKey := append([]byte{}, iter.Key()...)
		finishedRange = Range{start: regionRange.start, end: firstKey}
		log.FromContext(ctx).Info("write to tikv partial finish", zap.Int64("count", totalCount),
			zap.Int64("size", size), logutil.Key("startKey", regionRange.start), logutil.Key("endKey", regionRange.end),
			logutil.Key("remainStart", firstKey), logutil.Key("remainEnd", regionRange.end),
			logutil.Region(region.Region), logutil.Leader(region.Leader))
	}
	stats.count = totalCount
	stats.totalBytes = totalSize

	return leaderPeerMetas, finishedRange, stats, nil
}

func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
	leader := region.Leader
	if leader == nil {
		leader = region.Region.GetPeers()[0]
	}

	cli, err := local.getImportClient(ctx, leader.StoreId)
	if err != nil {
		return nil, err
	}
	reqCtx := &kvrpcpb.Context{
		RegionId:    region.Region.GetId(),
		RegionEpoch: region.Region.GetRegionEpoch(),
		Peer:        leader,
	}

	if !local.supportMultiIngest {
		if len(metas) != 1 {
			return nil, errors.New("batch ingest is not support")
		}
		req := &sst.IngestRequest{
			Context: reqCtx,
			Sst:     metas[0],
		}
		resp, err := cli.Ingest(ctx, req)
		return resp, errors.Trace(err)
	}

	req := &sst.MultiIngestRequest{
		Context: reqCtx,
		Ssts:    metas,
	}
	resp, err := cli.MultiIngest(ctx, req)
	return resp, errors.Trace(err)
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
	ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit))
	curSize := uint64(0)
	curKeys := uint64(0)
	curKey := fullRange.start

	sizeProps.iter(func(p *rangeProperty) bool {
		if bytes.Compare(p.Key, curKey) <= 0 {
			return true
		}
		if bytes.Compare(p.Key, fullRange.end) > 0 {
			return false
		}
		curSize += p.Size
		curKeys += p.Keys
		if int64(curSize) >= sizeLimit || int64(curKeys) >= keysLimit {
			ranges = append(ranges, Range{start: curKey, end: p.Key})
			curKey = p.Key
			curSize = 0
			curKeys = 0
		}
		return true
	})

	if bytes.Compare(curKey, fullRange.end) < 0 {
		// If the remaining range is too small, append it to last range.
		if len(ranges) > 0 && curKeys == 0 {
			ranges[len(ranges)-1].end = fullRange.end
		} else {
			ranges = append(ranges, Range{start: curKey, end: fullRange.end})
		}
	}
	return ranges
}

func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, regionSplitSize int64, regionSplitKeys int64) ([]Range, error) {
	iter := engine.newKVIter(ctx, &pebble.IterOptions{})
	//nolint: errcheck
	defer iter.Close()

	iterError := func(e string) error {
		err := iter.Error()
		if err != nil {
			return errors.Annotate(err, e)
		}
		return errors.New(e)
	}

	var firstKey, lastKey []byte
	if iter.First() {
		firstKey = append([]byte{}, iter.Key()...)
	} else {
		return nil, iterError("could not find first pair")
	}
	if iter.Last() {
		lastKey = append([]byte{}, iter.Key()...)
	} else {
		return nil, iterError("could not find last pair")
	}
	endKey := nextKey(lastKey)

	engineFileTotalSize := engine.TotalSize.Load()
	engineFileLength := engine.Length.Load()

	// <= 96MB no need to split into range
	if engineFileTotalSize <= regionSplitSize && engineFileLength <= regionSplitKeys {
		ranges := []Range{{start: firstKey, end: endKey}}
		return ranges, nil
	}

	logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID))
	sizeProps, err := getSizeProperties(logger, engine.db, local.keyAdapter)
	if err != nil {
		return nil, errors.Trace(err)
	}

	ranges := splitRangeBySizeProps(Range{start: firstKey, end: endKey}, sizeProps,
		regionSplitSize, regionSplitKeys)

	logger.Info("split engine key ranges",
		zap.Int64("totalSize", engineFileTotalSize), zap.Int64("totalCount", engineFileLength),
		logutil.Key("firstKey", firstKey), logutil.Key("lastKey", lastKey),
		zap.Int("ranges", len(ranges)))

	return ranges, nil
}

func (local *local) writeAndIngestByRange(
	ctxt context.Context,
	engine *Engine,
	start, end []byte,
	regionSplitSize int64,
	regionSplitKeys int64,
) error {
	ito := &pebble.IterOptions{
		LowerBound: start,
		UpperBound: end,
	}

	iter := engine.newKVIter(ctxt, ito)
	//nolint: errcheck
	defer iter.Close()
	// Needs seek to first because NewIter returns an iterator that is unpositioned
	hasKey := iter.First()
	if iter.Error() != nil {
		return errors.Annotate(iter.Error(), "failed to read the first key")
	}
	if !hasKey {
		log.FromContext(ctxt).Info("There is no pairs in iterator",
			logutil.Key("start", start),
			logutil.Key("end", end))
		engine.finishedRanges.add(Range{start: start, end: end})
		return nil
	}
	pairStart := append([]byte{}, iter.Key()...)
	iter.Last()
	if iter.Error() != nil {
		return errors.Annotate(iter.Error(), "failed to seek to the last key")
	}
	pairEnd := append([]byte{}, iter.Key()...)

	var regions []*split.RegionInfo
	var err error
	ctx, cancel := context.WithCancel(ctxt)
	defer cancel()

WriteAndIngest:
	for retry := 0; retry < maxRetryTimes; {
		if retry != 0 {
			select {
			case <-time.After(time.Second):
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		startKey := codec.EncodeBytes([]byte{}, pairStart)
		endKey := codec.EncodeBytes([]byte{}, nextKey(pairEnd))
		regions, err = split.PaginateScanRegion(ctx, local.splitCli, startKey, endKey, scanRegionLimit)
		if err != nil || len(regions) == 0 {
			log.FromContext(ctx).Warn("scan region failed", log.ShortError(err), zap.Int("region_len", len(regions)),
				logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.Int("retry", retry))
			retry++
			continue WriteAndIngest
		}

		for _, region := range regions {
			log.FromContext(ctx).Debug("get region", zap.Int("retry", retry), zap.Binary("startKey", startKey),
				zap.Binary("endKey", endKey), zap.Uint64("id", region.Region.GetId()),
				zap.Stringer("epoch", region.Region.GetRegionEpoch()), zap.Binary("start", region.Region.GetStartKey()),
				zap.Binary("end", region.Region.GetEndKey()), zap.Reflect("peers", region.Region.GetPeers()))

			w := local.ingestConcurrency.Apply()
			err = local.writeAndIngestPairs(ctx, engine, region, pairStart, end, regionSplitSize, regionSplitKeys)
			local.ingestConcurrency.Recycle(w)
			if err != nil {
				if !local.isRetryableImportTiKVError(err) {
					return err
				}
				_, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
				// if we have at least succeeded one region, retry without increasing the retry count
				if bytes.Compare(regionStart, pairStart) > 0 {
					pairStart = regionStart
				} else {
					retry++
				}
				log.FromContext(ctx).Info("retry write and ingest kv pairs", logutil.Key("startKey", pairStart),
					logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry))
				continue WriteAndIngest
			}
		}

		return err
	}

	return err
}

type retryType int

const (
	retryNone retryType = iota
	retryWrite
	retryIngest
)

func (local *local) isRetryableImportTiKVError(err error) bool {
	err = errors.Cause(err)
	// io.EOF is not retryable in normal case
	// but on TiKV restart, if we're writing to TiKV(through GRPC)
	// it might return io.EOF(it's GRPC Unavailable in most case),
	// we need to retry on this error.
	// see SendMsg in https://pkg.go.dev/google.golang.org/grpc#ClientStream
	if err == io.EOF {
		return true
	}
	return common.IsRetryableError(err)
}

func (local *local) writeAndIngestPairs(
	ctx context.Context,
	engine *Engine,
	region *split.RegionInfo,
	start, end []byte,
	regionSplitSize int64,
	regionSplitKeys int64,
) error {
	var err error

loopWrite:
	for i := 0; i < maxRetryTimes; i++ {
		var metas []*sst.SSTMeta
		var finishedRange Range
		var rangeStats rangeStats
		metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
		if err != nil {
			if !local.isRetryableImportTiKVError(err) {
				return err
			}

			log.FromContext(ctx).Warn("write to tikv failed", log.ShortError(err), zap.Int("retry", i))
			continue loopWrite
		}

		if len(metas) == 0 {
			return nil
		}

		batch := 1
		if local.supportMultiIngest {
			batch = len(metas)
		}

		for i := 0; i < len(metas); i += batch {
			start := i * batch
			end := mathutil.Min((i+1)*batch, len(metas))
			ingestMetas := metas[start:end]
			errCnt := 0
			for errCnt < maxRetryTimes {
				log.FromContext(ctx).Debug("ingest meta", zap.Reflect("meta", ingestMetas))
				var resp *sst.IngestResponse
				failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
					// only inject the error once
					switch val.(string) {
					case "notleader":
						resp = &sst.IngestResponse{
							Error: &errorpb.Error{
								NotLeader: &errorpb.NotLeader{
									RegionId: region.Region.Id,
									Leader:   region.Leader,
								},
							},
						}
					case "epochnotmatch":
						resp = &sst.IngestResponse{
							Error: &errorpb.Error{
								EpochNotMatch: &errorpb.EpochNotMatch{
									CurrentRegions: []*metapb.Region{region.Region},
								},
							},
						}
					}
					if resp != nil {
						err = nil
					}
				})
				if resp == nil {
					resp, err = local.Ingest(ctx, ingestMetas, region)
				}
				if err != nil {
					if common.IsContextCanceledError(err) {
						return err
					}
					log.FromContext(ctx).Warn("ingest failed", log.ShortError(err), logutil.SSTMetas(ingestMetas),
						logutil.Region(region.Region), logutil.Leader(region.Leader))
					errCnt++
					continue
				}

				var retryTy retryType
				var newRegion *split.RegionInfo
				retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, ingestMetas)
				if common.IsContextCanceledError(err) {
					return err
				}
				if err == nil {
					// ingest next meta
					break
				}
				switch retryTy {
				case retryNone:
					log.FromContext(ctx).Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(ingestMetas),
						logutil.Region(region.Region), logutil.Leader(region.Leader))
					// met non-retryable error retry whole Write procedure
					return err
				case retryWrite:
					region = newRegion
					continue loopWrite
				case retryIngest:
					region = newRegion
					continue
				}
			}
		}

		if err != nil {
			log.FromContext(ctx).Warn("write and ingest region, will retry import full range", log.ShortError(err),
				logutil.Region(region.Region), logutil.Key("start", start),
				logutil.Key("end", end))
		} else {
			engine.importedKVSize.Add(rangeStats.totalBytes)
			engine.importedKVCount.Add(rangeStats.count)
			engine.finishedRanges.add(finishedRange)
			if local.metrics != nil {
				local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
			}
		}
		return errors.Trace(err)
	}

	return errors.Trace(err)
}

func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine, ranges []Range, regionSplitSize int64, regionSplitKeys int64) error {
	if engine.Length.Load() == 0 {
		// engine is empty, this is likes because it's a index engine but the table contains no index
		log.FromContext(ctx).Info("engine contains no data", zap.Stringer("uuid", engine.UUID))
		return nil
	}
	log.FromContext(ctx).Debug("the ranges Length write to tikv", zap.Int("Length", len(ranges)))

	var allErrLock sync.Mutex
	var allErr error
	var wg sync.WaitGroup
	metErr := atomic.NewBool(false)

	for _, r := range ranges {
		startKey := r.start
		endKey := r.end
		w := local.rangeConcurrency.Apply()
		// if meet error here, skip try more here to allow fail fast.
		if metErr.Load() {
			local.rangeConcurrency.Recycle(w)
			break
		}
		wg.Add(1)
		go func(w *worker.Worker) {
			defer func() {
				local.rangeConcurrency.Recycle(w)
				wg.Done()
			}()
			var err error
			// max retry backoff time: 2+4+8+16+30*26=810s
			backOffTime := time.Second
			for i := 0; i < maxWriteAndIngestRetryTimes; i++ {
				err = local.writeAndIngestByRange(ctx, engine, startKey, endKey, regionSplitSize, regionSplitKeys)
				if err == nil || common.IsContextCanceledError(err) {
					return
				}
				if !local.isRetryableImportTiKVError(err) {
					break
				}
				log.FromContext(ctx).Warn("write and ingest by range failed",
					zap.Int("retry time", i+1), log.ShortError(err))
				backOffTime *= 2
				if backOffTime > maxRetryBackoffTime {
					backOffTime = maxRetryBackoffTime
				}
				select {
				case <-time.After(backOffTime):
				case <-ctx.Done():
					return
				}
			}

			allErrLock.Lock()
			allErr = multierr.Append(allErr, err)
			allErrLock.Unlock()
			if err != nil {
				metErr.Store(true)
			}
		}(w)
	}

	// wait for all sub tasks finish to avoid panic. if we return on the first error,
	// the outer tasks may close the pebble db but some sub tasks still read from the db
	wg.Wait()
	if allErr == nil {
		return ctx.Err()
	}
	return allErr
}

func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
	lf := local.lockEngine(engineUUID, importMutexStateImport)
	if lf == nil {
		// skip if engine not exist. See the comment of `CloseEngine` for more detail.
		return nil
	}
	defer lf.unlock()

	lfTotalSize := lf.TotalSize.Load()
	lfLength := lf.Length.Load()
	if lfTotalSize == 0 {
		log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
		return nil
	}
	kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
	if err == nil {
		if kvRegionSplitSize > regionSplitSize {
			regionSplitSize = kvRegionSplitSize
		}
		if kvRegionSplitKeys > regionSplitKeys {
			regionSplitKeys = kvRegionSplitKeys
		}
	} else {
		log.FromContext(ctx).Warn("fail to get region split keys and size", zap.Error(err))
	}

	// split sorted file into range by 96MB size per file
	ranges, err := local.readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys)
	if err != nil {
		return err
	}

	if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
		subCtx, cancel := context.WithCancel(ctx)
		defer cancel()

		var startKey, endKey []byte
		if len(ranges[0].start) > 0 {
			startKey = codec.EncodeBytes(nil, ranges[0].start)
		}
		if len(ranges[len(ranges)-1].end) > 0 {
			endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end)
		}
		done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
		if err != nil {
			return errors.Trace(err)
		}
		defer func() {
			cancel()
			<-done
		}()
	}

	log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID),
		zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))

	failpoint.Inject("ReadyForImportEngine", func() {})

	for {
		unfinishedRanges := lf.unfinishedRanges(ranges)
		if len(unfinishedRanges) == 0 {
			break
		}
		log.FromContext(ctx).Info("import engine unfinished ranges", zap.Int("count", len(unfinishedRanges)))

		// if all the kv can fit in one region, skip split regions. TiDB will split one region for
		// the table when table is created.
		needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
		// split region by given ranges
		for i := 0; i < maxRetryTimes; i++ {
			err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
			if err == nil || common.IsContextCanceledError(err) {
				break
			}

			log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engineUUID),
				log.ShortError(err), zap.Int("retry", i))
		}
		if err != nil {
			log.FromContext(ctx).Error("split & scatter ranges failed", zap.Stringer("uuid", engineUUID), log.ShortError(err))
			return err
		}

		// start to write to kv and ingest
		err = local.writeAndIngestByRanges(ctx, lf, unfinishedRanges, regionSplitSize, regionSplitKeys)
		if err != nil {
			log.FromContext(ctx).Error("write and ingest engine failed", log.ShortError(err))
			return err
		}
	}

	log.FromContext(ctx).Info("import engine success", zap.Stringer("uuid", engineUUID),
		zap.Int64("size", lfTotalSize), zap.Int64("kvs", lfLength),
		zap.Int64("importedSize", lf.importedKVSize.Load()), zap.Int64("importedCount", lf.importedKVCount.Load()))
	return nil
}

func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error) {
	logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect local duplicate keys")
	defer func() {
		logger.End(zap.ErrorLevel, err)
	}()

	atomicHasDupe := atomic.NewBool(false)
	duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
		local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
	if err != nil {
		return false, errors.Trace(err)
	}
	if err := duplicateManager.CollectDuplicateRowsFromDupDB(ctx, local.duplicateDB, local.keyAdapter); err != nil {
		return false, errors.Trace(err)
	}
	return atomicHasDupe.Load(), nil
}

func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error) {
	logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect remote duplicate keys")
	defer func() {
		logger.End(zap.ErrorLevel, err)
	}()

	atomicHasDupe := atomic.NewBool(false)
	duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
		local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
	if err != nil {
		return false, errors.Trace(err)
	}
	if err := duplicateManager.CollectDuplicateRowsFromTiKV(ctx, local.importClientFactory); err != nil {
		return false, errors.Trace(err)
	}
	return atomicHasDupe.Load(), nil
}

func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) (err error) {
	logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[resolve-dupe] resolve duplicate rows")
	defer func() {
		logger.End(zap.ErrorLevel, err)
	}()

	switch algorithm {
	case config.DupeResAlgRecord, config.DupeResAlgNone:
		logger.Warn("[resolve-dupe] skipping resolution due to selected algorithm. this table will become inconsistent!", zap.Stringer("algorithm", algorithm))
		return nil
	case config.DupeResAlgRemove:
	default:
		panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm))
	}

	// TODO: reuse the *kv.SessionOptions from NewEncoder for picking the correct time zone.
	decoder, err := kv.NewTableKVDecoder(tbl, tableName, &kv.SessionOptions{
		SQLMode: mysql.ModeStrictAllTables,
	}, log.FromContext(ctx))
	if err != nil {
		return err
	}

	errLimiter := rate.NewLimiter(1, 1)
	pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows")
	err = local.errorMgr.ResolveAllConflictKeys(
		ctx, tableName, pool,
		func(ctx context.Context, handleRows [][2][]byte) error {
			for {
				err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder)
				if err == nil {
					return nil
				}
				if types.ErrBadNumber.Equal(err) {
					logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
					return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName)
				}
				if log.IsContextCanceledError(err) {
					return err
				}
				if !tikverror.IsErrWriteConflict(errors.Cause(err)) {
					logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
				}
				if err = errLimiter.Wait(ctx); err != nil {
					return err
				}
			}
		},
	)
	return errors.Trace(err)
}

func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, handleRows [][2][]byte, decoder *kv.TableKVDecoder) (err error) {
	// Starts a Delete transaction.
	txn, err := local.tikvCli.Begin()
	if err != nil {
		return err
	}
	defer func() {
		if err == nil {
			err = txn.Commit(ctx)
		} else {
			if rollbackErr := txn.Rollback(); rollbackErr != nil {
				logger.Warn("failed to rollback transaction", zap.Error(rollbackErr))
			}
		}
	}()

	deleteKey := func(key []byte) error {
		logger.Debug("[resolve-dupe] will delete key", logutil.Key("key", key))
		return txn.Delete(key)
	}

	// Collect all rows & index keys into the deletion transaction.
	// (if the number of duplicates is small this should fit entirely in memory)
	// (Txn's MemBuf's bufferSizeLimit is currently infinity)
	for _, handleRow := range handleRows {
		logger.Debug("[resolve-dupe] found row to resolve",
			logutil.Key("handle", handleRow[0]),
			logutil.Key("row", handleRow[1]))

		if err := deleteKey(handleRow[0]); err != nil {
			return err
		}

		handle, err := decoder.DecodeHandleFromRowKey(handleRow[0])
		if err != nil {
			return err
		}

		err = decoder.IterRawIndexKeys(handle, handleRow[1], deleteKey)
		if err != nil {
			return err
		}
	}

	logger.Debug("[resolve-dupe] number of KV pairs to be deleted", zap.Int("count", txn.Len()))
	return nil
}

func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error {
	// the only way to reset the engine + reclaim the space is to delete and reopen it ?
	localEngine := local.lockEngine(engineUUID, importMutexStateClose)
	if localEngine == nil {
		log.FromContext(ctx).Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID))
		return nil
	}
	defer localEngine.unlock()
	if err := localEngine.Close(); err != nil {
		return err
	}
	if err := localEngine.Cleanup(local.localStoreDir); err != nil {
		return err
	}
	db, err := local.openEngineDB(engineUUID, false)
	if err == nil {
		localEngine.db = db
		localEngine.engineMeta = engineMeta{}
		if !common.IsDirExists(localEngine.sstDir) {
			if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil {
				return errors.Trace(err)
			}
		}
		if err = local.allocateTSIfNotExists(ctx, localEngine); err != nil {
			return errors.Trace(err)
		}
	}
	localEngine.pendingFileSize.Store(0)
	localEngine.finishedRanges.reset()

	return err
}

func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error {
	localEngine := local.lockEngine(engineUUID, importMutexStateClose)
	// release this engine after import success
	if localEngine == nil {
		log.FromContext(ctx).Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID))
		return nil
	}
	defer localEngine.unlock()

	// since closing the engine causes all subsequent operations on it panic,
	// we make sure to delete it from the engine map before calling Close().
	// (note that Close() returning error does _not_ mean the pebble DB
	// remains open/usable.)
	local.engines.Delete(engineUUID)
	err := localEngine.Close()
	if err != nil {
		return err
	}
	err = localEngine.Cleanup(local.localStoreDir)
	if err != nil {
		return err
	}
	localEngine.TotalSize.Store(0)
	localEngine.Length.Store(0)
	return nil
}

func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.CheckCtx) error {
	return local.targetInfoGetter.CheckRequirements(ctx, checkCtx)
}

func (local *local) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
	return local.targetInfoGetter.FetchRemoteTableModels(ctx, schemaName)
}

func (local *local) MakeEmptyRows() kv.Rows {
	return local.encBuilder.MakeEmptyRows()
}

func (local *local) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
	return local.encBuilder.NewEncoder(ctx, tbl, options)
}

func engineSSTDir(storeDir string, engineUUID uuid.UUID) string {
	return filepath.Join(storeDir, engineUUID.String()+".sst")
}

func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) {
	e, ok := local.engines.Load(engineUUID)
	if !ok {
		return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
	}
	engine := e.(*Engine)
	return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
	w := &Writer{
		engine:             engine,
		memtableSizeLimit:  cacheSize,
		kvBuffer:           kvBuffer,
		isKVSorted:         cfg.IsKVSorted,
		isWriteBatchSorted: true,
	}
	// pre-allocate a long enough buffer to avoid a lot of runtime.growslice
	// this can help save about 3% of CPU.
	if !w.isKVSorted {
		w.writeBatch = make([]common.KvPair, units.MiB)
	}
	engine.localWriters.Store(w, nil)
	return w, nil
}

func (local *local) isIngestRetryable(
	ctx context.Context,
	resp *sst.IngestResponse,
	region *split.RegionInfo,
	metas []*sst.SSTMeta,
) (retryType, *split.RegionInfo, error) {
	if resp.GetError() == nil {
		return retryNone, nil, nil
	}

	getRegion := func() (*split.RegionInfo, error) {
		for i := 0; ; i++ {
			newRegion, err := local.splitCli.GetRegion(ctx, region.Region.GetStartKey())
			if err != nil {
				return nil, errors.Trace(err)
			}
			if newRegion != nil {
				return newRegion, nil
			}
			log.FromContext(ctx).Warn("get region by key return nil, will retry", logutil.Region(region.Region), logutil.Leader(region.Leader),
				zap.Int("retry", i))
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			case <-time.After(time.Second):
			}
		}
	}

	var newRegion *split.RegionInfo
	var err error
	switch errPb := resp.GetError(); {
	case errPb.NotLeader != nil:
		if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil {
			newRegion = &split.RegionInfo{
				Leader: newLeader,
				Region: region.Region,
			}
		} else {
			newRegion, err = getRegion()
			if err != nil {
				return retryNone, nil, errors.Trace(err)
			}
		}
		// TODO: because in some case, TiKV may return retryable error while the ingest is succeeded.
		// Thus directly retry ingest may cause TiKV panic. So always return retryWrite here to avoid
		// this issue.
		// See: https://github.com/tikv/tikv/issues/9496
		return retryWrite, newRegion, common.ErrKVNotLeader.GenWithStack(errPb.GetMessage())
	case errPb.EpochNotMatch != nil:
		if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
			var currentRegion *metapb.Region
			for _, r := range currentRegions {
				if insideRegion(r, metas) {
					currentRegion = r
					break
				}
			}
			if currentRegion != nil {
				var newLeader *metapb.Peer
				for _, p := range currentRegion.Peers {
					if p.GetStoreId() == region.Leader.GetStoreId() {
						newLeader = p
						break
					}
				}
				if newLeader != nil {
					newRegion = &split.RegionInfo{
						Leader: newLeader,
						Region: currentRegion,
					}
				}
			}
		}
		retryTy := retryNone
		if newRegion != nil {
			retryTy = retryWrite
		}
		return retryTy, newRegion, common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage())
	case strings.Contains(errPb.Message, "raft: proposal dropped"):
		newRegion, err = getRegion()
		if err != nil {
			return retryNone, nil, errors.Trace(err)
		}
		return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage())
	case errPb.ServerIsBusy != nil:
		return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
	case errPb.RegionNotFound != nil:
		return retryNone, nil, common.ErrKVRegionNotFound.GenWithStack(errPb.GetMessage())
	case errPb.ReadIndexNotReady != nil:
		// this error happens when this region is splitting, the error might be:
		//   read index not ready, reason can not read index due to split, region 64037
		// we have paused schedule, but it's temporary,
		// if next request takes a long time, there's chance schedule is enabled again
		// or on key range border, another engine sharing this region tries to split this
		// region may cause this error too.
		newRegion, err = getRegion()
		if err != nil {
			return retryNone, nil, errors.Trace(err)
		}
		return retryWrite, newRegion, common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage())
	case errPb.DiskFull != nil:
		return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
	}
	// all others ingest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange
	// here we use a single named-error ErrKVIngestFailed to represent them all
	// we can separate them later if it's needed
	return retryNone, nil, common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage())
}

// return the smallest []byte that is bigger than current bytes.
// special case when key is empty, empty bytes means infinity in our context, so directly return itself.
func nextKey(key []byte) []byte {
	if len(key) == 0 {
		return []byte{}
	}

	// in tikv <= 4.x, tikv will truncate the row key, so we should fetch the next valid row key
	// See: https://github.com/tikv/tikv/blob/f7f22f70e1585d7ca38a59ea30e774949160c3e8/components/raftstore/src/coprocessor/split_observer.rs#L36-L41
	if tablecodec.IsRecordKey(key) {
		tableID, handle, _ := tablecodec.DecodeRecordKey(key)
		nextHandle := handle.Next()
		// int handle overflow, use the next table prefix as nextKey
		if nextHandle.Compare(handle) <= 0 {
			return tablecodec.EncodeTablePrefix(tableID + 1)
		}
		return tablecodec.EncodeRowKeyWithHandle(tableID, nextHandle)
	}

	// if key is an index, directly append a 0x00 to the key.
	res := make([]byte, 0, len(key)+1)
	res = append(res, key...)
	res = append(res, 0)
	return res
}

func (local *local) EngineFileSizes() (res []backend.EngineFileSize) {
	local.engines.Range(func(k, v interface{}) bool {
		engine := v.(*Engine)
		res = append(res, engine.getEngineFileSize())
		return true
	})
	return
}

var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (int64, int64, error) {
	var (
		nested struct {
			Coprocessor struct {
				RegionSplitSize string `json:"region-split-size"`
				RegionSplitKeys int64  `json:"region-split-keys"`
			} `json:"coprocessor"`
		}
	)
	if err := tls.WithHost(host).GetJSON(ctx, "/config", &nested); err != nil {
		return 0, 0, errors.Trace(err)
	}
	splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize)
	if err != nil {
		return 0, 0, errors.Trace(err)
	}

	return splitSize, nested.Coprocessor.RegionSplitKeys, nil
}

// return region split size, region split keys, error
func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (int64, int64, error) {
	stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
	if err != nil {
		return 0, 0, err
	}
	for _, store := range stores {
		if store.StatusAddress == "" || engine.IsTiFlash(store) {
			continue
		}
		serverInfo := infoschema.ServerInfo{
			Address:    store.Address,
			StatusAddr: store.StatusAddress,
		}
		serverInfo.ResolveLoopBackAddr()
		regionSplitSize, regionSplitKeys, err := getSplitConfFromStoreFunc(ctx, serverInfo.StatusAddr, tls)
		if err == nil {
			return regionSplitSize, regionSplitKeys, nil
		}
		log.FromContext(ctx).Warn("get region split size and keys failed", zap.Error(err), zap.String("store", serverInfo.StatusAddr))
	}
	return 0, 0, errors.New("get region split size and keys failed")
}

相关信息

tidb 源码目录

相关文章

tidb duplicate 源码

tidb engine 源码

tidb iterator 源码

tidb key_adapter 源码

tidb local_freebsd 源码

tidb local_unix 源码

tidb local_unix_generic 源码

tidb local_windows 源码

tidb localhelper 源码

0  赞