tidb restore_ebs_meta 源码

  • 2022-09-19
  • 浏览 (547)

tidb restore_ebs_meta 代码

文件路径:/br/pkg/task/restore_ebs_meta.go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package task

import (
	"context"
	"crypto/tls"
	"encoding/json"
	"os"
	"strings"
	"time"

	"github.com/opentracing/opentracing-go"
	"github.com/pingcap/errors"
	"github.com/pingcap/log"
	"github.com/pingcap/tidb/br/pkg/aws"
	"github.com/pingcap/tidb/br/pkg/config"
	berrors "github.com/pingcap/tidb/br/pkg/errors"
	"github.com/pingcap/tidb/br/pkg/glue"
	"github.com/pingcap/tidb/br/pkg/pdutil"
	"github.com/pingcap/tidb/br/pkg/summary"
	"github.com/spf13/cobra"
	pd "github.com/tikv/pd/client"
	"go.uber.org/zap"
)

const (
	flagMetaPhase        = "meta-phase"
	flagDataPhase        = "data-phase"
	flagOutputMetaFile   = "output-file"
	flagVolumeType       = "volume-type"
	flagVolumeIOPS       = "volume-iops"
	flagVolumeThroughput = "volume-throughput"
)

// DefineRestoreSnapshotFlags defines common flags for the backup command.
func DefineRestoreSnapshotFlags(command *cobra.Command) {
	command.Flags().Bool(flagMetaPhase, false, "restore meta phase for snapshot based restore")
	command.Flags().Bool(flagDataPhase, false, "restore data phase for snapshot based restore")
	command.Flags().String(flagOutputMetaFile, "output.json", "the file path of output meta file")
	command.Flags().Bool(flagSkipAWS, false, "don't access to aws environment if set to true")
	command.Flags().Uint(flagCloudAPIConcurrency, defaultCloudAPIConcurrency, "concurrency of calling cloud api")
	command.Flags().String(flagVolumeType, string(config.GP3Volume), "volume type: gp3, io1, io2")
	command.Flags().Int64(flagVolumeIOPS, 0, "volume iops(0 means default for that volume type)")
	command.Flags().Int64(flagVolumeThroughput, 0, "volume throughout in MiB/s(0 means default for that volume type)")
	command.Flags().String(flagProgressFile, "progress.txt", "the file name of progress file")

	_ = command.Flags().MarkHidden(flagMetaPhase)
	_ = command.Flags().MarkHidden(flagDataPhase)
	_ = command.Flags().MarkHidden(flagOutputMetaFile)
	_ = command.Flags().MarkHidden(flagSkipAWS)
	_ = command.Flags().MarkHidden(flagCloudAPIConcurrency)
	_ = command.Flags().MarkHidden(flagVolumeType)
	_ = command.Flags().MarkHidden(flagVolumeIOPS)
	_ = command.Flags().MarkHidden(flagVolumeThroughput)
	_ = command.Flags().MarkHidden(flagProgressFile)
}

// RunRestoreEBSMeta phase 1 of EBS based restore
func RunRestoreEBSMeta(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
	cfg.Adjust()
	helper := restoreEBSMetaHelper{
		rootCtx: c,
		g:       g,
		cmdName: cmdName,
		cfg:     cfg,
	}
	defer helper.close()
	return helper.restore()
}

type restoreEBSMetaHelper struct {
	rootCtx context.Context
	g       glue.Glue
	cmdName string
	cfg     *RestoreConfig

	metaInfo *config.EBSBasedBRMeta
	pdc      *pdutil.PdController
}

// we don't call close of fields on failure, outer logic should call helper.close.
func (h *restoreEBSMetaHelper) preRestore(ctx context.Context) error {
	_, externStorage, err := GetStorage(ctx, h.cfg.Config.Storage, &h.cfg.Config)
	if err != nil {
		return errors.Trace(err)
	}

	// read meta from s3
	metaInfo, err := config.NewMetaFromStorage(ctx, externStorage)
	if err != nil {
		return errors.Trace(err)
	}
	if FullBackupType(metaInfo.GetFullBackupType()) != FullBackupTypeEBS {
		log.Error("invalid meta file", zap.Reflect("meta", metaInfo))
		return errors.New("invalid meta file, only support aws-ebs now")
	}
	h.metaInfo = metaInfo

	var (
		tlsConf *tls.Config
	)
	pdAddress := strings.Join(h.cfg.PD, ",")
	if len(pdAddress) == 0 {
		return errors.Annotate(berrors.ErrInvalidArgument, "pd address can not be empty")
	}

	securityOption := pd.SecurityOption{}
	if h.cfg.TLS.IsEnabled() {
		securityOption.CAPath = h.cfg.TLS.CA
		securityOption.CertPath = h.cfg.TLS.Cert
		securityOption.KeyPath = h.cfg.TLS.Key
		tlsConf, err = h.cfg.TLS.ToTLSConfig()
		if err != nil {
			return errors.Trace(err)
		}
	}

	controller, err := pdutil.NewPdController(ctx, pdAddress, tlsConf, securityOption)
	if err != nil {
		log.Error("fail to create pd controller", zap.Error(err))
		return errors.Trace(err)
	}

	h.pdc = controller

	// todo: check whether target cluster is compatible with the backup
	// but cluster hasn't bootstrapped, we cannot get cluster version from pd now.
	return nil
}

func (h *restoreEBSMetaHelper) close() {
	if h.pdc != nil {
		h.pdc.Close()
	}
}

func (h *restoreEBSMetaHelper) restore() error {
	var (
		finished   bool
		totalSize  int64
		resolvedTs uint64
		err        error
	)
	defer func() {
		if finished {
			summary.Log("EBS restore success", zap.Int64("size", totalSize), zap.Uint64("resolved_ts", resolvedTs))
		} else {
			summary.Log("EBS restore failed, please check the log for details.")
		}
	}()
	ctx, cancel := context.WithCancel(h.rootCtx)
	defer cancel()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan("task.RunRestoreEBSMeta", opentracing.ChildOf(span.Context()))
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	if err = h.preRestore(ctx); err != nil {
		return errors.Trace(err)
	}

	storeCount := h.metaInfo.GetStoreCount()
	progress := h.g.StartProgress(ctx, h.cmdName, int64(storeCount), !h.cfg.LogProgress)
	defer progress.Close()
	go progressFileWriterRoutine(ctx, progress, int64(storeCount), h.cfg.ProgressFile)

	resolvedTs = h.metaInfo.ClusterInfo.ResolvedTS
	if totalSize, err = h.doRestore(ctx, progress); err != nil {
		return errors.Trace(err)
	}

	if err = h.writeOutputFile(); err != nil {
		return errors.Trace(err)
	}
	finished = true
	return nil
}
func (h *restoreEBSMetaHelper) doRestore(ctx context.Context, progress glue.Progress) (int64, error) {
	log.Info("mark recovering")
	if err := h.pdc.MarkRecovering(ctx); err != nil {
		return 0, errors.Trace(err)
	}
	log.Info("set pd ts = max(resolved_ts, current pd ts)", zap.Uint64("resolved ts", h.metaInfo.ClusterInfo.ResolvedTS))
	if err := h.pdc.ResetTS(ctx, h.metaInfo.ClusterInfo.ResolvedTS); err != nil {
		return 0, errors.Trace(err)
	}

	if h.cfg.SkipAWS {
		for i := 0; i < int(h.metaInfo.GetStoreCount()); i++ {
			progress.Inc()
			log.Info("mock: create volume from snapshot finished.", zap.Int("index", i))
			time.Sleep(800 * time.Millisecond)
		}
		return 1234, nil
	}

	volumeIDMap, totalSize, err := h.restoreVolumes(progress)
	if err != nil {
		return 0, errors.Trace(err)
	}

	h.metaInfo.SetRestoreVolumeIDs(volumeIDMap)
	return totalSize, nil
}

func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[string]string, int64, error) {
	log.Info("create volume from snapshots")
	var (
		ec2Session  *aws.EC2Session
		volumeIDMap = make(map[string]string)
		err         error
		totalSize   int64
	)
	ec2Session, err = aws.NewEC2Session(h.cfg.CloudAPIConcurrency)
	if err != nil {
		return nil, 0, errors.Trace(err)
	}
	defer func() {
		if err != nil {
			log.Error("failed to create all volumes, cleaning up created volume")
			ec2Session.DeleteVolumes(volumeIDMap)
		}
	}()
	volumeIDMap, err = ec2Session.CreateVolumes(h.metaInfo,
		string(h.cfg.VolumeType), h.cfg.VolumeIOPS, h.cfg.VolumeThroughput)
	if err != nil {
		return nil, 0, errors.Trace(err)
	}
	totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress)
	if err != nil {
		return nil, 0, errors.Trace(err)
	}
	return volumeIDMap, totalSize, nil
}

func (h *restoreEBSMetaHelper) writeOutputFile() error {
	data, err := json.Marshal(h.metaInfo)
	if err != nil {
		return errors.Trace(err)
	}
	err = os.WriteFile(h.cfg.OutputFile, data, 0600)
	if err != nil {
		return errors.Trace(err)
	}
	return nil
}

相关信息

tidb 源码目录

相关文章

tidb backup 源码

tidb backup_ebs 源码

tidb backup_raw 源码

tidb common 源码

tidb restore 源码

tidb restore_data 源码

tidb restore_raw 源码

tidb stream 源码

0  赞