tidb engine 源码
tidb engine 代码
文件路径:/ddl/ingest/engine.go
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingest
import (
"context"
"strconv"
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// One engine for one index reorg task, each task will create several new writers under the
// Opened Engine. Note engineInfo is not thread safe.
type engineInfo struct {
ctx context.Context
jobID int64
indexID int64
openedEngine *backend.OpenedEngine
uuid uuid.UUID
cfg *backend.EngineConfig
writerCount int
writerCache generic.SyncMap[int, *backend.LocalEngineWriter]
memRoot MemRoot
diskRoot DiskRoot
}
// NewEngineInfo create a new EngineInfo struct.
func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig,
en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *engineInfo {
return &engineInfo{
ctx: ctx,
jobID: jobID,
indexID: indexID,
cfg: cfg,
openedEngine: en,
uuid: uuid,
writerCount: wCnt,
writerCache: generic.NewSyncMap[int, *backend.LocalEngineWriter](wCnt),
memRoot: memRoot,
diskRoot: diskRoot,
}
}
// Flush imports all the key-values in engine to the storage.
func (ei *engineInfo) Flush() error {
err := ei.openedEngine.Flush(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrFlushEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}
return nil
}
func (ei *engineInfo) Clean() {
if ei.openedEngine == nil {
return
}
indexEngine := ei.openedEngine
closedEngine, err := indexEngine.Close(ei.ctx, ei.cfg)
if err != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
// Here the local intermediate files will be removed.
err = closedEngine.Cleanup(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrCleanEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
}
func (ei *engineInfo) ImportAndClean() error {
// Close engine and finish local tasks of lightning.
logutil.BgLogger().Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
indexEngine := ei.openedEngine
closeEngine, err1 := indexEngine.Close(ei.ctx, ei.cfg)
if err1 != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
}
ei.openedEngine = nil
err := ei.diskRoot.UpdateUsageAndQuota()
if err != nil {
logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}
// Ingest data to TiKV.
logutil.BgLogger().Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID),
zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10)))
err = closeEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrIngestDataErr)
}
// Clean up the engine local workspace.
err = closeEngine.Cleanup(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
}
return nil
}
// WriterContext is used to keep a lightning local writer for each backfill worker.
type WriterContext struct {
ctx context.Context
lWrite *backend.LocalEngineWriter
}
func (ei *engineInfo) NewWriterCtx(id int) (*WriterContext, error) {
ei.memRoot.RefreshConsumption()
ok := ei.memRoot.CheckConsume(StructSizeWriterCtx)
if !ok {
return nil, genEngineAllocMemFailedErr(ei.memRoot, ei.jobID, ei.indexID)
}
wCtx, err := ei.newWriterContext(id)
if err != nil {
logutil.BgLogger().Error(LitErrCreateContextFail, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID),
zap.Int("worker ID", id))
return nil, err
}
ei.memRoot.Consume(StructSizeWriterCtx)
logutil.BgLogger().Info(LitInfoCreateWrite, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID), zap.Int("worker ID", id),
zap.Int64("allocate memory", StructSizeWriterCtx),
zap.Int64("current memory usage", ei.memRoot.CurrentUsage()),
zap.Int64("max memory quota", ei.memRoot.MaxMemoryQuota()))
return wCtx, err
}
// newWriterContext will get worker local writer from engine info writer cache first, if exists.
// If local writer not exist, then create new one and store it into engine info writer cache.
// note: operate ei.writeCache map is not thread safe please make sure there is sync mechanism to
// make sure the safe.
func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) {
lWrite, exist := ei.writerCache.Load(workerID)
if !exist {
var err error
lWrite, err = ei.openedEngine.LocalWriter(ei.ctx, &backend.LocalWriterConfig{})
if err != nil {
return nil, err
}
// Cache the local writer.
ei.writerCache.Store(workerID, lWrite)
}
return &WriterContext{
ctx: ei.ctx,
lWrite: lWrite,
}, nil
}
// WriteRow Write one row into local writer buffer.
func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error {
kvs := make([]common.KvPair, 1)
kvs[0].Key = key
kvs[0].Val = idxVal
row := kv.MakeRowsFromKvPairs(kvs)
return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row)
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦