tidb stream 源码
tidb stream 代码
文件路径:/br/pkg/task/stream.go
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/docker/go-units"
"github.com/fatih/color"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/conn"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/streamhelper"
advancercfg "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
)
const (
flagYes = "yes"
flagUntil = "until"
flagStreamJSONOutput = "json"
flagStreamTaskName = "task-name"
flagStreamStartTS = "start-ts"
flagStreamEndTS = "end-ts"
flagGCSafePointTTS = "gc-ttl"
)
var (
StreamStart = "log start"
StreamStop = "log stop"
StreamPause = "log pause"
StreamResume = "log resume"
StreamStatus = "log status"
StreamTruncate = "log truncate"
StreamMetadata = "log metadata"
StreamCtl = "log ctl"
skipSummaryCommandList = map[string]struct{}{
StreamStatus: {},
StreamTruncate: {},
}
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
rawKVBatchCount = 64
streamShiftDuration = time.Hour
)
var StreamCommandMap = map[string]func(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error{
StreamStart: RunStreamStart,
StreamStop: RunStreamStop,
StreamPause: RunStreamPause,
StreamResume: RunStreamResume,
StreamStatus: RunStreamStatus,
StreamTruncate: RunStreamTruncate,
StreamMetadata: RunStreamMetadata,
StreamCtl: RunStreamAdvancer,
}
// StreamConfig specifies the configure about backup stream
type StreamConfig struct {
Config
TaskName string `json:"task-name" toml:"task-name"`
// StartTS usually equals the tso of full-backup, but user can reset it
StartTS uint64 `json:"start-ts" toml:"start-ts"`
EndTS uint64 `json:"end-ts" toml:"end-ts"`
// SafePointTTL ensures TiKV can scan entries not being GC at [startTS, currentTS]
SafePointTTL int64 `json:"safe-point-ttl" toml:"safe-point-ttl"`
// Spec for the command `truncate`, we should truncate the until when?
Until uint64 `json:"until" toml:"until"`
DryRun bool `json:"dry-run" toml:"dry-run"`
SkipPrompt bool `json:"skip-prompt" toml:"skip-prompt"`
// Spec for the command `status`.
JSONOutput bool `json:"json-output" toml:"json-output"`
// Spec for the command `advancer`.
AdvancerCfg advancercfg.Config `json:"advancer-config" toml:"advancer-config"`
}
func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStorage, error) {
u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions)
if err != nil {
return nil, errors.Trace(err)
}
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
}
storage, err := storage.New(ctx, u, &opts)
if err != nil {
return nil, errors.Trace(err)
}
return storage, nil
}
// DefineStreamStartFlags defines flags used for `stream start`
func DefineStreamStartFlags(flags *pflag.FlagSet) {
DefineStreamCommonFlags(flags)
flags.String(flagStreamStartTS, "",
"usually equals last full backupTS, used for backup log. Default value is current ts.\n"+
"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'.")
// 999999999999999999 means 2090-11-18 22:07:45
flags.String(flagStreamEndTS, "999999999999999999", "end ts, indicate stopping observe after endTS"+
"support TSO or datetime")
_ = flags.MarkHidden(flagStreamEndTS)
flags.Int64(flagGCSafePointTTS, utils.DefaultStreamStartSafePointTTL,
"the TTL (in seconds) that PD holds for BR's GC safepoint")
_ = flags.MarkHidden(flagGCSafePointTTS)
}
func DefineStreamPauseFlags(flags *pflag.FlagSet) {
DefineStreamCommonFlags(flags)
flags.Int64(flagGCSafePointTTS, utils.DefaultStreamPauseSafePointTTL,
"the TTL (in seconds) that PD holds for BR's GC safepoint")
}
// DefineStreamCommonFlags define common flags for `stream task`
func DefineStreamCommonFlags(flags *pflag.FlagSet) {
flags.String(flagStreamTaskName, "", "The task name for the backup log task.")
}
func DefineStreamStatusCommonFlags(flags *pflag.FlagSet) {
flags.String(flagStreamTaskName, stream.WildCard,
"The task name for backup stream log. If default, get status of all of tasks",
)
flags.Bool(flagStreamJSONOutput, false,
"Print JSON as the output.",
)
}
func DefineStreamTruncateLogFlags(flags *pflag.FlagSet) {
flags.String(flagUntil, "", "Remove all backup data until this TS."+
"(support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'.)")
flags.Bool(flagDryRun, false, "Run the command but don't really delete the files.")
flags.BoolP(flagYes, "y", false, "Skip all prompts and always execute the command.")
}
func (cfg *StreamConfig) ParseStreamStatusFromFlags(flags *pflag.FlagSet) error {
var err error
cfg.JSONOutput, err = flags.GetBool(flagStreamJSONOutput)
if err != nil {
return errors.Trace(err)
}
if err = cfg.ParseStreamCommonFromFlags(flags); err != nil {
return errors.Trace(err)
}
return nil
}
func (cfg *StreamConfig) ParseStreamTruncateFromFlags(flags *pflag.FlagSet) error {
tsString, err := flags.GetString(flagUntil)
if err != nil {
return errors.Trace(err)
}
if cfg.Until, err = ParseTSString(tsString, true); err != nil {
return errors.Trace(err)
}
if cfg.SkipPrompt, err = flags.GetBool(flagYes); err != nil {
return errors.Trace(err)
}
if cfg.DryRun, err = flags.GetBool(flagDryRun); err != nil {
return errors.Trace(err)
}
return nil
}
// ParseStreamStartFromFlags parse parameters for `stream start`
func (cfg *StreamConfig) ParseStreamStartFromFlags(flags *pflag.FlagSet) error {
err := cfg.ParseStreamCommonFromFlags(flags)
if err != nil {
return errors.Trace(err)
}
tsString, err := flags.GetString(flagStreamStartTS)
if err != nil {
return errors.Trace(err)
}
if cfg.StartTS, err = ParseTSString(tsString, true); err != nil {
return errors.Trace(err)
}
tsString, err = flags.GetString(flagStreamEndTS)
if err != nil {
return errors.Trace(err)
}
if cfg.EndTS, err = ParseTSString(tsString, true); err != nil {
return errors.Trace(err)
}
if cfg.SafePointTTL, err = flags.GetInt64(flagGCSafePointTTS); err != nil {
return errors.Trace(err)
}
if cfg.SafePointTTL <= 0 {
cfg.SafePointTTL = utils.DefaultStreamStartSafePointTTL
}
return nil
}
// ParseStreamPauseFromFlags parse parameters for `stream pause`
func (cfg *StreamConfig) ParseStreamPauseFromFlags(flags *pflag.FlagSet) error {
err := cfg.ParseStreamCommonFromFlags(flags)
if err != nil {
return errors.Trace(err)
}
if cfg.SafePointTTL, err = flags.GetInt64(flagGCSafePointTTS); err != nil {
return errors.Trace(err)
}
if cfg.SafePointTTL <= 0 {
cfg.SafePointTTL = utils.DefaultStreamPauseSafePointTTL
}
return nil
}
// ParseStreamCommonFromFlags parse parameters for `stream task`
func (cfg *StreamConfig) ParseStreamCommonFromFlags(flags *pflag.FlagSet) error {
var err error
cfg.TaskName, err = flags.GetString(flagStreamTaskName)
if err != nil {
return errors.Trace(err)
}
if len(cfg.TaskName) <= 0 {
return errors.Annotate(berrors.ErrInvalidArgument, "Miss parameters task-name")
}
return nil
}
type streamMgr struct {
cfg *StreamConfig
mgr *conn.Mgr
bc *backup.Client
httpCli *http.Client
}
func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) {
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, true, conn.StreamVersionChecker)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err != nil {
mgr.Close()
}
}()
// just stream start need Storage
s := &streamMgr{
cfg: cfg,
mgr: mgr,
}
if isStreamStart {
backend, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions)
if err != nil {
return nil, errors.Trace(err)
}
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
}
client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
return nil, errors.Trace(err)
}
if err = client.SetStorage(ctx, backend, &opts); err != nil {
return nil, errors.Trace(err)
}
s.bc = client
// create http client to do some requirements check.
s.httpCli = httputil.NewClient(mgr.GetTLSConfig())
}
return s, nil
}
func (s *streamMgr) close() {
s.mgr.Close()
}
func (s *streamMgr) setLock(ctx context.Context) error {
return s.bc.SetLockFile(ctx)
}
// adjustAndCheckStartTS checks that startTS should be smaller than currentTS,
// and endTS is larger than currentTS.
func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error {
currentTS, err := s.mgr.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
// set currentTS to startTS as a default value
if s.cfg.StartTS == 0 {
s.cfg.StartTS = currentTS
}
if currentTS < s.cfg.StartTS || s.cfg.EndTS <= currentTS {
return errors.Annotatef(berrors.ErrInvalidArgument,
"invalid timestamps, startTS %d should be smaller than currentTS %d",
s.cfg.StartTS, currentTS)
}
if s.cfg.EndTS <= currentTS {
return errors.Annotatef(berrors.ErrInvalidArgument,
"invalid timestamps, endTS %d should be larger than currentTS %d",
s.cfg.EndTS, currentTS)
}
return nil
}
// setGCSafePoint sets the server safe point to PD.
func (s *streamMgr) setGCSafePoint(ctx context.Context, sp utils.BRServiceSafePoint) error {
err := utils.CheckGCSafePoint(ctx, s.mgr.GetPDClient(), sp.BackupTS)
if err != nil {
return errors.Annotatef(err,
"failed to check gc safePoint, ts %v", sp.BackupTS)
}
err = utils.UpdateServiceSafePoint(ctx, s.mgr.GetPDClient(), sp)
if err != nil {
return errors.Trace(err)
}
log.Info("set stream safePoint", zap.Object("safePoint", sp))
return nil
}
func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, error) {
dRanges, err := stream.BuildObserveDataRanges(
s.mgr.GetStorage(),
s.cfg.FilterStr,
s.cfg.TableFilter,
s.cfg.StartTS,
)
if err != nil {
return nil, errors.Trace(err)
}
mRange := stream.BuildObserveMetaRange()
rs := append([]kv.KeyRange{*mRange}, dRanges...)
slices.SortFunc(rs, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
return rs, nil
}
func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error {
clusterVersion, err := s.mgr.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}
metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil)
metaWriter.Update(func(m *backuppb.BackupMeta) {
// save log startTS to backupmeta file
m.StartVersion = s.cfg.StartTS
m.ClusterId = s.bc.GetClusterID()
m.ClusterVersion = clusterVersion
})
schemas, err := backup.BuildFullSchema(s.mgr.GetStorage(), s.cfg.StartTS)
if err != nil {
return errors.Trace(err)
}
schemasConcurrency := uint(mathutil.Min(backup.DefaultSchemaConcurrency, schemas.Len()))
err = schemas.BackupSchemas(ctx, metaWriter, s.mgr.GetStorage(), nil,
s.cfg.StartTS, schemasConcurrency, 0, true, nil)
if err != nil {
return errors.Trace(err)
}
if err = metaWriter.FlushBackupMeta(ctx); err != nil {
return errors.Trace(err)
}
return nil
}
// RunStreamCommand run all kinds of `stream task`
func RunStreamCommand(
ctx context.Context,
g glue.Glue,
cmdName string,
cfg *StreamConfig,
) error {
cfg.Config.adjust()
defer func() {
if _, ok := skipSummaryCommandList[cmdName]; !ok {
summary.Summary(cmdName)
}
}()
commandFn, exist := StreamCommandMap[cmdName]
if !exist {
return errors.Annotatef(berrors.ErrInvalidArgument, "invalid command %s", cmdName)
}
if err := commandFn(ctx, g, cmdName, cfg); err != nil {
log.Error("failed to stream", zap.String("command", cmdName), zap.Error(err))
summary.SetSuccessStatus(false)
summary.CollectFailureUnit(cmdName, err)
return err
}
summary.SetSuccessStatus(true)
return nil
}
// RunStreamStart specifies starting a stream task
func RunStreamStart(
c context.Context,
g glue.Glue,
cmdName string,
cfg *StreamConfig,
) error {
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("task.RunStreamStart", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
streamMgr, err := NewStreamMgr(ctx, cfg, g, true)
if err != nil {
return errors.Trace(err)
}
defer streamMgr.close()
se, err := g.CreateSession(streamMgr.mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor)
supportStream, err := utils.IsLogBackupEnabled(execCtx)
if err != nil {
return errors.Trace(err)
}
if !supportStream {
return errors.New("Unable to create task about log-backup. " +
"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
}
if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil {
return errors.Trace(err)
}
if err = streamMgr.setGCSafePoint(
ctx,
utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: cfg.SafePointTTL,
BackupTS: cfg.StartTS,
},
); err != nil {
return errors.Trace(err)
}
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// It supports single stream log task currently.
if count, err := cli.GetTaskCount(ctx); err != nil {
return errors.Trace(err)
} else if count > 0 {
return errors.Annotate(berrors.ErrStreamLogTaskExist, "It supports single stream log task currently")
}
if err = streamMgr.setLock(ctx); err != nil {
return errors.Trace(err)
}
if err = streamMgr.backupFullSchemas(ctx, g); err != nil {
return errors.Trace(err)
}
ranges, err := streamMgr.buildObserveRanges(ctx)
if err != nil {
return errors.Trace(err)
} else if len(ranges) == 0 {
// nothing to backup
pdAddress := strings.Join(cfg.PD, ",")
log.Warn("Nothing to observe, maybe connected to cluster for restoring",
zap.String("PD address", pdAddress))
return errors.Annotate(berrors.ErrInvalidArgument, "nothing need to observe")
}
ti := streamhelper.TaskInfo{
PBInfo: backuppb.StreamBackupTaskInfo{
Storage: streamMgr.bc.GetStorageBackend(),
StartTs: cfg.StartTS,
EndTs: cfg.EndTS,
Name: cfg.TaskName,
TableFilter: cfg.FilterStr,
CompressionType: backuppb.CompressionType_ZSTD,
},
Ranges: ranges,
Pausing: false,
}
if err = cli.PutTask(ctx, ti); err != nil {
return errors.Trace(err)
}
summary.Log(cmdName, ti.ZapTaskInfo()...)
return nil
}
func RunStreamMetadata(
c context.Context,
g glue.Glue,
cmdName string,
cfg *StreamConfig,
) error {
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(
"task.RunStreamCheckLog",
opentracing.ChildOf(span.Context()),
)
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
logInfo, err := getLogRange(ctx, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
logMinDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMinTS))
logMaxDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMaxTS))
summary.Log(cmdName, zap.Uint64("log-min-ts", logInfo.logMinTS),
zap.String("log-min-date", logMinDate),
zap.Uint64("log-max-ts", logInfo.logMaxTS),
zap.String("log-max-date", logMaxDate),
)
return nil
}
// RunStreamStop specifies stoping a stream task
func RunStreamStop(
c context.Context,
g glue.Glue,
cmdName string,
cfg *StreamConfig,
) error {
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(
"task.RunStreamStop",
opentracing.ChildOf(span.Context()),
)
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
streamMgr, err := NewStreamMgr(ctx, cfg, g, false)
if err != nil {
return errors.Trace(err)
}
defer streamMgr.close()
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// to add backoff
ti, err := cli.GetTask(ctx, cfg.TaskName)
if err != nil {
return errors.Trace(err)
}
if err = cli.DeleteTask(ctx, cfg.TaskName); err != nil {
return errors.Trace(err)
}
if err := streamMgr.setGCSafePoint(ctx,
utils.BRServiceSafePoint{
ID: buildPauseSafePointName(ti.Info.Name),
TTL: 0,
BackupTS: 0,
},
); err != nil {
log.Warn("failed to remove safe point", zap.String("error", err.Error()))
}
summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
return nil
}
// RunStreamPause specifies pausing a stream task.
func RunStreamPause(
c context.Context,
g glue.Glue,
cmdName string,
cfg *StreamConfig,
) error {
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(
"task.RunStreamPause",
opentracing.ChildOf(span.Context()),
)
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
streamMgr, err := NewStreamMgr(ctx, cfg, g, false)
if err != nil {
return errors.Trace(err)
}
defer streamMgr.close()
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// to add backoff
ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName)
if err != nil {
return errors.Trace(err)
} else if isPaused {
return errors.Annotatef(berrors.ErrKVUnknown, "The task %s is paused already.", cfg.TaskName)
}
globalCheckPointTS, err := ti.GetGlobalCheckPointTS(ctx)
if err != nil {
return errors.Trace(err)
}
if err = streamMgr.setGCSafePoint(
ctx,
utils.BRServiceSafePoint{
ID: buildPauseSafePointName(ti.Info.Name),
TTL: cfg.SafePointTTL,
BackupTS: globalCheckPointTS - 1,
},
); err != nil {
return errors.Trace(err)
}
err = cli.PauseTask(ctx, cfg.TaskName)
if err != nil {
return errors.Trace(err)
}
summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
return nil
}
// RunStreamResume specifies resuming a stream task.
func RunStreamResume(
c context.Context,
g glue.Glue,
cmdName string,
cfg *StreamConfig,
) error {
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(
"task.RunStreamResume",
opentracing.ChildOf(span.Context()),
)
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
streamMgr, err := NewStreamMgr(ctx, cfg, g, false)
if err != nil {
return errors.Trace(err)
}
defer streamMgr.close()
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// to add backoff
ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName)
if err != nil {
return errors.Trace(err)
} else if !isPaused {
return errors.Annotatef(berrors.ErrKVUnknown,
"The task %s is active already.", cfg.TaskName)
}
globalCheckPointTS, err := ti.GetGlobalCheckPointTS(ctx)
if err != nil {
return errors.Trace(err)
}
err = utils.CheckGCSafePoint(ctx, streamMgr.mgr.GetPDClient(), globalCheckPointTS)
if err != nil {
return errors.Annotatef(err, "the global checkpoint ts: %v(%s) has been gc. ",
globalCheckPointTS, oracle.GetTimeFromTS(globalCheckPointTS))
}
err = cli.ResumeTask(ctx, cfg.TaskName)
if err != nil {
return errors.Trace(err)
}
err = cli.CleanLastErrorOfTask(ctx, cfg.TaskName)
if err != nil {
return err
}
if err := streamMgr.setGCSafePoint(ctx,
utils.BRServiceSafePoint{
ID: buildPauseSafePointName(ti.Info.Name),
TTL: 0,
BackupTS: globalCheckPointTS,
},
); err != nil {
log.Warn("failed to remove safe point",
zap.Uint64("safe-point", globalCheckPointTS), zap.String("error", err.Error()))
}
summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
return nil
}
func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
ctx, cancel := context.WithCancel(c)
defer cancel()
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false, conn.StreamVersionChecker)
if err != nil {
return err
}
etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
if err != nil {
return err
}
env := streamhelper.CliEnv(mgr.StoreManager, etcdCLI)
advancer := streamhelper.NewCheckpointAdvancer(env)
advancer.UpdateConfig(cfg.AdvancerCfg)
daemon := streamhelper.NewAdvancerDaemon(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI))
loop, err := daemon.Begin(ctx)
if err != nil {
return err
}
loop()
return nil
}
func checkConfigForStatus(cfg *StreamConfig) error {
if len(cfg.PD) == 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the command needs access to PD, please specify `-u` or `--pd`")
}
return nil
}
// makeStatusController makes the status controller via some config.
// this should better be in the `stream` package but it is impossible because of cyclic requirements.
func makeStatusController(ctx context.Context, cfg *StreamConfig, g glue.Glue) (*stream.StatusController, error) {
console := glue.GetConsole(g)
etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
if err != nil {
return nil, err
}
cli := streamhelper.NewMetaDataClient(etcdCLI)
var printer stream.TaskPrinter
if !cfg.JSONOutput {
printer = stream.PrintTaskByTable(console)
} else {
printer = stream.PrintTaskWithJSON(console)
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false, conn.StreamVersionChecker)
if err != nil {
return nil, err
}
return stream.NewStatusController(cli, mgr, printer), nil
}
// RunStreamStatus get status for a specific stream task
func RunStreamStatus(
c context.Context,
g glue.Glue,
cmdName string,
cfg *StreamConfig,
) error {
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(
"task.RunStreamStatus",
opentracing.ChildOf(span.Context()),
)
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
if err := checkConfigForStatus(cfg); err != nil {
return err
}
ctl, err := makeStatusController(ctx, cfg, g)
if err != nil {
return err
}
return ctl.PrintStatusOfTask(ctx, cfg.TaskName)
}
// RunStreamTruncate truncates the log that belong to (0, until-ts)
func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
console := glue.GetConsole(g)
em := color.New(color.Bold).SprintFunc()
warn := color.New(color.Bold, color.FgHiRed).SprintFunc()
formatTS := func(ts uint64) string {
return oracle.GetTimeFromTS(ts).Format("2006-01-02 15:04:05.0000")
}
if cfg.Until == 0 {
return errors.Annotatef(berrors.ErrInvalidArgument, "please provide the `--until` ts")
}
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
storage, err := cfg.makeStorage(ctx)
if err != nil {
return err
}
sp, err := restore.GetTSFromFile(ctx, storage, restore.TruncateSafePointFileName)
if err != nil {
return err
}
if cfg.Until < sp {
console.Println("According to the log, you have truncated backup data before", em(formatTS(sp)))
if !cfg.SkipPrompt && !console.PromptBool("Continue? ") {
return nil
}
}
readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost())
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
BeforeDoWriteBack: func(path string, last, current *backuppb.Metadata) (skip bool) {
log.Info("Updating metadata.", zap.String("file", path),
zap.Int("data-file-before", len(last.GetFileGroups())),
zap.Int("data-file-after", len(current.GetFileGroups())))
return cfg.DryRun
},
}
if err := metas.LoadUntil(ctx, storage, cfg.Until); err != nil {
return err
}
readMetaDone()
var (
fileCount uint64 = 0
kvCount int64 = 0
totalSize uint64 = 0
shiftUntilTS = metas.CalculateShiftTS(cfg.Until)
)
metas.IterateFilesFullyBefore(shiftUntilTS, func(d *backuppb.DataFileGroup) (shouldBreak bool) {
fileCount++
totalSize += d.Length
for _, f := range d.DataFilesInfo {
kvCount += f.NumberOfEntries
}
return
})
console.Printf("We are going to remove %s files, until %s.\n",
em(fileCount),
em(formatTS(cfg.Until)),
)
if !cfg.SkipPrompt && !console.PromptBool(warn("Sure? ")) {
return nil
}
if cfg.Until > sp && !cfg.DryRun {
if err := restore.SetTSToFile(
ctx, storage, cfg.Until, restore.TruncateSafePointFileName); err != nil {
return err
}
}
removed := metas.RemoveDataBefore(shiftUntilTS)
// remove log
clearDataFileDone := console.ShowTask(
"Clearing data files... ", glue.WithTimeCost(),
glue.WithConstExtraField("kv-count", kvCount),
glue.WithConstExtraField("kv-size", fmt.Sprintf("%d(%s)", totalSize, units.HumanSize(float64(totalSize)))),
)
worker := utils.NewWorkerPool(128, "delete files")
wg := new(sync.WaitGroup)
for _, f := range removed {
if !cfg.DryRun {
wg.Add(1)
finalFile := f
worker.Apply(func() {
defer wg.Done()
if err := storage.DeleteFile(ctx, finalFile.Path); err != nil {
log.Warn("File not deleted.", zap.String("path", finalFile.Path), logutil.ShortError(err))
console.Print("\n"+em(finalFile.Path), "not deleted, you may clear it manually:", warn(err))
}
})
}
}
wg.Wait()
clearDataFileDone()
// remove metadata
removeMetaDone := console.ShowTask("Removing metadata... ", glue.WithTimeCost())
if !cfg.DryRun {
if err := metas.DoWriteBack(ctx, storage); err != nil {
return err
}
}
removeMetaDone()
return nil
}
// RunStreamRestore restores stream log.
func RunStreamRestore(
c context.Context,
g glue.Glue,
cmdName string,
cfg *RestoreConfig,
) (err error) {
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("task.RunStreamRestore", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
logInfo, err := getLogRange(ctx, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
if cfg.RestoreTS == 0 {
cfg.RestoreTS = logInfo.logMaxTS
}
if len(cfg.FullBackupStorage) > 0 {
startTS, fullClusterID, err := getFullBackupTS(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
if logInfo.clusterID > 0 && fullClusterID > 0 && logInfo.clusterID != fullClusterID {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the full snapshot(from cluster ID:%v) and log(from cluster ID:%v) come from different cluster.",
fullClusterID, logInfo.clusterID)
}
cfg.StartTS = startTS
if cfg.StartTS < logInfo.logMinTS {
return errors.Annotatef(berrors.ErrInvalidArgument,
"it has gap between full backup ts:%d(%s) and log backup ts:%d(%s). ",
cfg.StartTS, oracle.GetTimeFromTS(cfg.StartTS),
logInfo.logMinTS, oracle.GetTimeFromTS(logInfo.logMinTS))
}
}
log.Info("start restore on point",
zap.Uint64("restore-from", cfg.StartTS), zap.Uint64("restore-to", cfg.RestoreTS),
zap.Uint64("log-min-ts", logInfo.logMinTS), zap.Uint64("log-max-ts", logInfo.logMaxTS))
if err := checkLogRange(cfg.StartTS, cfg.RestoreTS, logInfo.logMinTS, logInfo.logMaxTS); err != nil {
return errors.Trace(err)
}
recorder := tiflashrec.New()
cfg.tiflashRecorder = recorder
// restore full snapshot.
if len(cfg.FullBackupStorage) > 0 {
if err := checkPiTRRequirements(ctx, g, cfg); err != nil {
return errors.Trace(err)
}
logStorage := cfg.Config.Storage
cfg.Config.Storage = cfg.FullBackupStorage
// TiFlash replica is restored to down-stream on 'pitr' currently.
if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil {
return errors.Trace(err)
}
cfg.Config.Storage = logStorage
}
// restore log.
cfg.adjustRestoreConfigForStreamRestore()
if err := restoreStream(ctx, g, cfg, logInfo.logMinTS, logInfo.logMaxTS); err != nil {
return errors.Trace(err)
}
return nil
}
// RunStreamRestore start restore job
func restoreStream(
c context.Context,
g glue.Glue,
cfg *RestoreConfig,
logMinTS, logMaxTS uint64,
) (err error) {
var (
totalKVCount uint64
totalSize uint64
mu sync.Mutex
startTime = time.Now()
)
defer func() {
if err != nil {
summary.Log("restore log failed summary", zap.Error(err))
} else {
totalDureTime := time.Since(startTime)
summary.Log("restore log success summary", zap.Duration("total-take", totalDureTime),
zap.Uint64("restore-from", cfg.StartTS), zap.Uint64("restore-to", cfg.RestoreTS),
zap.String("restore-from", stream.FormatDate(oracle.GetTimeFromTS(cfg.StartTS))),
zap.String("restore-to", stream.FormatDate(oracle.GetTimeFromTS(cfg.RestoreTS))),
zap.Uint64("total-kv-count", totalKVCount),
zap.String("total-size", units.HumanSize(float64(totalSize))),
zap.String("average-speed", units.HumanSize(float64(totalSize)/totalDureTime.Seconds())+"/s"),
)
}
}()
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(
"restoreStream",
opentracing.ChildOf(span.Context()),
)
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, true, conn.StreamVersionChecker)
if err != nil {
return errors.Trace(err)
}
defer mgr.Close()
client, err := createRestoreClient(ctx, g, cfg, mgr)
if err != nil {
return errors.Annotate(err, "failed to create restore client")
}
defer client.Close()
currentTS, err := client.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
client.SetCurrentTS(currentTS)
restoreSchedulers, err := restorePreWork(ctx, client, mgr, false)
if err != nil {
return errors.Trace(err)
}
// Always run the post-work even on error, so we don't stuck in the import
// mode or emptied schedulers
defer restorePostWork(ctx, client, restoreSchedulers)
shiftStartTS, err := client.GetShiftTS(ctx, cfg.StartTS, cfg.RestoreTS)
if err != nil {
return errors.Annotate(err, "failed to get shift TS")
}
// read meta by given ts.
metas, err := client.ReadStreamMetaByTS(ctx, shiftStartTS, cfg.RestoreTS)
if err != nil {
return errors.Trace(err)
}
if len(metas) == 0 {
log.Info("nothing to restore.")
return nil
}
client.SetRestoreRangeTS(cfg.StartTS, cfg.RestoreTS, shiftStartTS)
// read data file by given ts.
dmlFiles, ddlFiles, err := client.ReadStreamDataFiles(ctx, metas)
if err != nil {
return errors.Trace(err)
}
if len(dmlFiles) == 0 && len(ddlFiles) == 0 {
log.Info("nothing to restore.")
return nil
}
// get full backup meta to generate rewrite rules.
fullBackupTables, err := initFullBackupTables(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
// get the schemas ID replace information.
schemasReplace, err := client.InitSchemasReplaceForDDL(&fullBackupTables, cfg.TableFilter)
if err != nil {
return errors.Trace(err)
}
schemasReplace.AfterTableRewritten = func(deleted bool, tableInfo *model.TableInfo) {
// When the table replica changed to 0, the tiflash replica might be set to `nil`.
// We should remove the table if we meet.
if deleted || tableInfo.TiFlashReplica == nil {
cfg.tiflashRecorder.DelTable(tableInfo.ID)
return
}
cfg.tiflashRecorder.AddTable(tableInfo.ID, *tableInfo.TiFlashReplica)
// Remove the replica firstly. Let's restore them at the end.
tableInfo.TiFlashReplica = nil
}
updateStats := func(kvCount uint64, size uint64) {
mu.Lock()
defer mu.Unlock()
totalKVCount += kvCount
totalSize += size
}
pm := g.StartProgress(ctx, "Restore Meta Files", int64(len(ddlFiles)), !cfg.LogProgress)
if err = withProgress(pm, func(p glue.Progress) error {
client.RunGCRowsLoader(ctx)
return client.RestoreMetaKVFiles(ctx, ddlFiles, schemasReplace, updateStats, p.Inc)
}); err != nil {
return errors.Annotate(err, "failed to restore meta files")
}
// perform restore kv files
rewriteRules, err := initRewriteRules(client, fullBackupTables)
if err != nil {
return errors.Trace(err)
}
updateRewriteRules(rewriteRules, schemasReplace)
pd := g.StartProgress(ctx, "Restore KV Files", int64(len(dmlFiles)), !cfg.LogProgress)
err = withProgress(pd, func(p glue.Progress) error {
return client.RestoreKVFiles(ctx, rewriteRules, dmlFiles, updateStats, p.Inc)
})
if err != nil {
return errors.Annotate(err, "failed to restore kv files")
}
if err = client.CleanUpKVFiles(ctx); err != nil {
return errors.Annotate(err, "failed to clean up")
}
if err = client.SaveSchemas(ctx, schemasReplace, logMinTS, cfg.RestoreTS); err != nil {
return errors.Trace(err)
}
if err = client.InsertGCRows(ctx); err != nil {
return errors.Annotate(err, "failed to insert rows into gc_delete_range")
}
if cfg.tiflashRecorder != nil {
sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema())
log.Info("Generating SQLs for restoring TiFlash Replica",
zap.Strings("sqls", sqls))
err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error {
for _, sql := range sqls {
if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
logutil.ShortError(errExec),
zap.String("sql", sql),
)
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}
func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *conn.Mgr) (*restore.Client, error) {
var err error
keepaliveCfg := GetKeepalive(&cfg.Config)
keepaliveCfg.PermitWithoutStream = true
client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, false)
err = client.Init(g, mgr.GetStorage())
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err != nil {
client.Close()
}
}()
u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions)
if err != nil {
return nil, errors.Trace(err)
}
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return nil, errors.Trace(err)
}
client.SetRateLimit(cfg.RateLimit)
client.SetCrypter(&cfg.CipherInfo)
client.SetConcurrency(uint(cfg.Concurrency))
client.SetSwitchModeInterval(cfg.SwitchModeInterval)
client.InitClients(u, false)
rawKVClient, err := newRawBatchClient(ctx, cfg.PD, cfg.TLS)
if err != nil {
return nil, errors.Trace(err)
}
client.SetRawKVClient(rawKVClient)
err = client.LoadRestoreStores(ctx)
if err != nil {
return nil, errors.Trace(err)
}
client.InitMetadataHelper()
return client, nil
}
func checkLogRange(restoreFrom, restoreTo, logMinTS, logMaxTS uint64) error {
// serveral ts constraint:
// logMinTS <= restoreFrom <= restoreTo <= logMaxTS
if logMinTS > restoreFrom || restoreFrom > restoreTo || restoreTo > logMaxTS {
return errors.Annotatef(berrors.ErrInvalidArgument,
"restore log from %d(%s) to %d(%s), "+
" but the current existed log from %d(%s) to %d(%s)",
restoreFrom, oracle.GetTimeFromTS(restoreFrom),
restoreTo, oracle.GetTimeFromTS(restoreTo),
logMinTS, oracle.GetTimeFromTS(logMinTS),
logMaxTS, oracle.GetTimeFromTS(logMaxTS),
)
}
return nil
}
// withProgress execute some logic with the progress, and close it once the execution done.
func withProgress(p glue.Progress, cc func(p glue.Progress) error) error {
defer p.Close()
return cc(p)
}
// nolint: unused, deadcode
func countIndices(ts map[int64]*metautil.Table) int64 {
result := int64(0)
for _, t := range ts {
result += int64(len(t.Info.Indices))
}
return result
}
type backupLogInfo struct {
logMaxTS uint64
logMinTS uint64
clusterID uint64
}
// getLogRange gets the log-min-ts and log-max-ts of starting log backup.
func getLogRange(
ctx context.Context,
cfg *Config,
) (backupLogInfo, error) {
_, s, err := GetStorage(ctx, cfg.Storage, cfg)
if err != nil {
return backupLogInfo{}, errors.Trace(err)
}
// logStartTS: Get log start ts from backupmeta file.
metaData, err := s.ReadFile(ctx, metautil.MetaFile)
if err != nil {
return backupLogInfo{}, errors.Trace(err)
}
backupMeta := &backuppb.BackupMeta{}
if err = backupMeta.Unmarshal(metaData); err != nil {
return backupLogInfo{}, errors.Trace(err)
}
logStartTS := backupMeta.GetStartVersion()
// truncateTS: get log truncate ts from TruncateSafePointFileName.
// If truncateTS equals 0, which represents the stream log has never been truncated.
truncateTS, err := restore.GetTSFromFile(ctx, s, restore.TruncateSafePointFileName)
if err != nil {
return backupLogInfo{}, errors.Trace(err)
}
logMinTS := mathutil.Max(logStartTS, truncateTS)
// get max global resolved ts from metas.
logMaxTS, err := getGlobalCheckpointFromStorage(ctx, s)
if err != nil {
return backupLogInfo{}, errors.Trace(err)
}
logMaxTS = mathutil.Max(logMinTS, logMaxTS)
return backupLogInfo{
logMaxTS: logMaxTS,
logMinTS: logMinTS,
clusterID: backupMeta.ClusterId,
}, nil
}
func getGlobalCheckpointFromStorage(ctx context.Context, s storage.ExternalStorage) (uint64, error) {
var globalCheckPointTS uint64 = 0
opt := storage.WalkOption{SubDir: stream.GetStreamBackupGlobalCheckpointPrefix()}
err := s.WalkDir(ctx, &opt, func(path string, size int64) error {
if !strings.HasSuffix(path, ".ts") {
return nil
}
buff, err := s.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
ts := binary.LittleEndian.Uint64(buff)
globalCheckPointTS = mathutil.Max(ts, globalCheckPointTS)
return nil
})
return globalCheckPointTS, errors.Trace(err)
}
// getFullBackupTS gets the snapshot-ts of full bakcup
func getFullBackupTS(
ctx context.Context,
cfg *RestoreConfig,
) (uint64, uint64, error) {
_, s, err := GetStorage(ctx, cfg.FullBackupStorage, &cfg.Config)
if err != nil {
return 0, 0, errors.Trace(err)
}
metaData, err := s.ReadFile(ctx, metautil.MetaFile)
if err != nil {
return 0, 0, errors.Trace(err)
}
backupmeta := &backuppb.BackupMeta{}
if err = backupmeta.Unmarshal(metaData); err != nil {
return 0, 0, errors.Trace(err)
}
return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}
func getGlobalResolvedTS(
ctx context.Context,
s storage.ExternalStorage,
helper *stream.MetadataHelper,
) (uint64, error) {
storeMap := struct {
sync.Mutex
resolvedTSMap map[int64]uint64
}{}
storeMap.resolvedTSMap = make(map[int64]uint64)
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
m, err := helper.ParseToMetadata(raw)
if err != nil {
return err
}
storeMap.Lock()
if resolveTS, exist := storeMap.resolvedTSMap[m.StoreId]; !exist || resolveTS < m.ResolvedTs {
storeMap.resolvedTSMap[m.StoreId] = m.ResolvedTs
}
storeMap.Unlock()
return nil
})
if err != nil {
return 0, errors.Trace(err)
}
var globalCheckpointTS uint64 = 0
// If V3 global-checkpoint advance, the maximum value in storeMap.resolvedTSMap as global-checkpoint-ts.
// If v2 global-checkpoint advance, it need the minimal value in storeMap.resolvedTSMap as global-checkpoint-ts.
// Because each of store maintains own checkpoint-ts only.
for _, resolveTS := range storeMap.resolvedTSMap {
if globalCheckpointTS < resolveTS {
globalCheckpointTS = resolveTS
}
}
return globalCheckpointTS, nil
}
func initFullBackupTables(
ctx context.Context,
cfg *RestoreConfig,
) (map[int64]*metautil.Table, error) {
var storage string
if len(cfg.FullBackupStorage) > 0 {
storage = cfg.FullBackupStorage
} else {
storage = cfg.Storage
}
_, s, err := GetStorage(ctx, storage, &cfg.Config)
if err != nil {
return nil, errors.Trace(err)
}
metaFileName := metautil.CreateMetaFileName(cfg.StartTS)
exist, err := s.FileExists(ctx, metaFileName)
if err != nil {
return nil, errors.Annotatef(err, "failed to check filename:%s ", metaFileName)
} else if !exist {
metaFileName = metautil.MetaFile
}
log.Info("read schemas", zap.String("backupmeta", metaFileName))
metaData, err := s.ReadFile(ctx, metaFileName)
if err != nil {
return nil, errors.Trace(err)
}
backupMeta := &backuppb.BackupMeta{}
if err = backupMeta.Unmarshal(metaData); err != nil {
return nil, errors.Trace(err)
}
// read full backup databases to get map[table]table.Info
reader := metautil.NewMetaReader(backupMeta, s, nil)
databases, err := utils.LoadBackupTables(ctx, reader)
if err != nil {
return nil, errors.Trace(err)
}
tables := make(map[int64]*metautil.Table)
for _, db := range databases {
dbName := db.Info.Name.O
if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok {
dbName = name
}
if !cfg.TableFilter.MatchSchema(dbName) {
continue
}
for _, table := range db.Tables {
// check this db is empty.
if table.Info == nil {
tables[db.Info.ID] = table
continue
}
if !cfg.TableFilter.MatchTable(dbName, table.Info.Name.O) {
continue
}
tables[table.Info.ID] = table
}
}
return tables, nil
}
func initRewriteRules(client *restore.Client, tables map[int64]*metautil.Table) (map[int64]*restore.RewriteRules, error) {
// compare table exists in cluster and map[table]table.Info to get rewrite rules.
rules := make(map[int64]*restore.RewriteRules)
for _, t := range tables {
if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok {
// skip system table for now
continue
}
if t.Info == nil {
continue
}
newTableInfo, err := client.GetTableSchema(client.GetDomain(), t.DB.Name, t.Info.Name)
if err != nil {
// If table not existed, skip it directly.
continue
}
// we don't handle index rule in pitr. since we only support pitr on non-exists table.
tableRules := restore.GetRewriteRulesMap(newTableInfo, t.Info, 0, false)
for tableID, tableRule := range tableRules {
rules[tableID] = tableRule
}
log.Info("Using rewrite rule for table.", zap.Stringer("table", t.Info.Name),
zap.Stringer("database", t.DB.Name),
zap.Int("old-id", int(t.Info.ID)),
zap.Array("rewrite-rules", zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
for _, r := range rules {
for _, rule := range r.Data {
if err := ae.AppendObject(logutil.RewriteRuleObject(rule)); err != nil {
return err
}
}
}
return nil
})),
)
}
return rules, nil
}
func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *stream.SchemasReplace) {
filter := schemasReplace.TableFilter
for _, dbReplace := range schemasReplace.DbMap {
if dbReplace.OldDBInfo == nil ||
utils.IsSysDB(dbReplace.OldDBInfo.Name.O) ||
!filter.MatchSchema(dbReplace.OldDBInfo.Name.O) {
continue
}
for oldTableID, tableReplace := range dbReplace.TableMap {
if tableReplace.OldTableInfo == nil ||
!filter.MatchTable(dbReplace.OldDBInfo.Name.O, tableReplace.OldTableInfo.Name.O) {
continue
}
if _, exist := rules[oldTableID]; !exist {
log.Info("add rewrite rule",
zap.String("tableName", dbReplace.OldDBInfo.Name.O+"."+tableReplace.OldTableInfo.Name.O),
zap.Int64("oldID", oldTableID), zap.Int64("newID", tableReplace.NewTableID))
rules[oldTableID] = restore.GetRewriteRuleOfTable(
oldTableID, tableReplace.NewTableID, 0, tableReplace.IndexMap, false)
}
for oldID, newID := range tableReplace.PartitionMap {
if _, exist := rules[oldID]; !exist {
log.Info("add rewrite rule",
zap.String("tableName", dbReplace.OldDBInfo.Name.O+"."+tableReplace.OldTableInfo.Name.O),
zap.Int64("oldID", oldID), zap.Int64("newID", newID))
rules[oldID] = restore.GetRewriteRuleOfTable(oldID, newID, 0, tableReplace.IndexMap, false)
}
}
}
}
}
func newRawBatchClient(
ctx context.Context,
pdAddrs []string,
tlsConfig TLSConfig,
) (*restore.RawKVBatchClient, error) {
security := config.Security{
ClusterSSLCA: tlsConfig.CA,
ClusterSSLCert: tlsConfig.Cert,
ClusterSSLKey: tlsConfig.Key,
}
rawkvClient, err := restore.NewRawkvClient(ctx, pdAddrs, security)
if err != nil {
return nil, errors.Trace(err)
}
return restore.NewRawKVBatchClient(rawkvClient, rawKVBatchCount), nil
}
// ShiftTS gets a smaller shiftTS than startTS.
// It has a safe duration between shiftTS and startTS for trasaction.
func ShiftTS(startTS uint64) uint64 {
physical := oracle.ExtractPhysical(startTS)
logical := oracle.ExtractLogical(startTS)
shiftPhysical := physical - streamShiftDuration.Milliseconds()
if shiftPhysical < 0 {
return 0
}
return oracle.ComposeTS(shiftPhysical, logical)
}
func buildPauseSafePointName(taskName string) string {
return fmt.Sprintf("%s_pause_safepoint", taskName)
}
func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig) error {
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, true, conn.StreamVersionChecker)
if err != nil {
return errors.Trace(err)
}
defer mgr.Close()
userDBs := restore.GetExistedUserDBs(mgr.GetDomain())
if len(userDBs) > 0 {
userDBNames := make([]string, 0, len(userDBs))
for _, db := range userDBs {
userDBNames = append(userDBNames, db.Name.O)
}
return errors.Annotatef(berrors.ErrDatabasesAlreadyExisted,
"databases %s existed in restored cluster, please drop them before execute PiTR",
strings.Join(userDBNames, ","))
}
return nil
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦