tidb schema 源码

  • 2022-09-19
  • 浏览 (574)

tidb schema 代码


// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package backup

import (

	backuppb "github.com/pingcap/kvproto/pkg/brpb"

const (
	// DefaultSchemaConcurrency is the default number of the concurrent
	// backup schema tasks.
	DefaultSchemaConcurrency = 64

type schemaInfo struct {
	tableInfo  *model.TableInfo
	dbInfo     *model.DBInfo
	crc64xor   uint64
	totalKvs   uint64
	totalBytes uint64
	stats      *handle.JSONTable

// Schemas is task for backuping schemas.
type Schemas struct {
	// name -> schema
	schemas map[string]*schemaInfo

func NewBackupSchemas() *Schemas {
	return &Schemas{
		schemas: make(map[string]*schemaInfo),

func (ss *Schemas) AddSchema(
	dbInfo *model.DBInfo, tableInfo *model.TableInfo,
) {
	if tableInfo == nil {
		ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{
			dbInfo: dbInfo,
	name := fmt.Sprintf("%s.%s",
		utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L))
	ss.schemas[name] = &schemaInfo{
		tableInfo: tableInfo,
		dbInfo:    dbInfo,

// BackupSchemas backups table info, including checksum and stats.
func (ss *Schemas) BackupSchemas(
	ctx context.Context,
	metaWriter *metautil.MetaWriter,
	store kv.Storage,
	statsHandle *handle.Handle,
	backupTS uint64,
	concurrency uint,
	copConcurrency uint,
	skipChecksum bool,
	updateCh glue.Progress,
) error {
	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan("Schemas.BackupSchemas", opentracing.ChildOf(span.Context()))
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)

	workerPool := utils.NewWorkerPool(concurrency, "Schemas")
	errg, ectx := errgroup.WithContext(ctx)
	startAll := time.Now()
	op := metautil.AppendSchema
	metaWriter.StartWriteMetasAsync(ctx, op)
	for _, s := range ss.schemas {
		schema := s
		// Because schema.dbInfo is a pointer that many tables point to.
		// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
		if utils.IsSysDB(schema.dbInfo.Name.L) {
			schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)

		workerPool.ApplyOnErrorGroup(errg, func() error {
			if schema.tableInfo != nil {
				logger := log.With(
					zap.String("db", schema.dbInfo.Name.O),
					zap.String("table", schema.tableInfo.Name.O),

				if !skipChecksum {
					logger.Info("Calculate table checksum start")
					start := time.Now()
					err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
					if err != nil {
						return errors.Trace(err)
					logger.Info("Calculate table checksum completed",
						zap.Uint64("Crc64Xor", schema.crc64xor),
						zap.Uint64("TotalKvs", schema.totalKvs),
						zap.Uint64("TotalBytes", schema.totalBytes),
						zap.Duration("take", time.Since(start)))
				if statsHandle != nil {
					if err := schema.dumpStatsToJSON(statsHandle); err != nil {
						logger.Error("dump table stats failed", logutil.ShortError(err))
			// Send schema to metawriter
			s, err := schema.encodeToSchema()
			if err != nil {
				return errors.Trace(err)
			if err := metaWriter.Send(s, op); err != nil {
				return errors.Trace(err)
			if updateCh != nil {
			return nil
	if err := errg.Wait(); err != nil {
		return errors.Trace(err)
	log.Info("Backup calculated table checksum into metas", zap.Duration("take", time.Since(startAll)))
	summary.CollectDuration("backup checksum", time.Since(startAll))
	return metaWriter.FinishWriteMetas(ctx, op)

// Len returns the number of schemas.
func (ss *Schemas) Len() int {
	return len(ss.schemas)

func (s *schemaInfo) calculateChecksum(
	ctx context.Context,
	client kv.Client,
	backupTS uint64,
	concurrency uint,
) error {
	exe, err := checksum.NewExecutorBuilder(s.tableInfo, backupTS).
	if err != nil {
		return errors.Trace(err)

	checksumResp, err := exe.Execute(ctx, client, func() {
		// TODO: update progress here.
	if err != nil {
		return errors.Trace(err)

	s.crc64xor = checksumResp.Checksum
	s.totalKvs = checksumResp.TotalKvs
	s.totalBytes = checksumResp.TotalBytes
	return nil

func (s *schemaInfo) dumpStatsToJSON(statsHandle *handle.Handle) error {
	jsonTable, err := statsHandle.DumpStatsToJSON(
		s.dbInfo.Name.String(), s.tableInfo, nil)
	if err != nil {
		return errors.Trace(err)

	s.stats = jsonTable
	return nil

func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
	dbBytes, err := json.Marshal(s.dbInfo)
	if err != nil {
		return nil, errors.Trace(err)

	var tableBytes []byte
	if s.tableInfo != nil {
		tableBytes, err = json.Marshal(s.tableInfo)
		if err != nil {
			return nil, errors.Trace(err)
	var statsBytes []byte
	if s.stats != nil {
		statsBytes, err = json.Marshal(s.stats)
		if err != nil {
			return nil, errors.Trace(err)

	return &backuppb.Schema{
		Db:         dbBytes,
		Table:      tableBytes,
		Crc64Xor:   s.crc64xor,
		TotalKvs:   s.totalKvs,
		TotalBytes: s.totalBytes,
		Stats:      statsBytes,
	}, nil


tidb 源码目录


tidb check 源码

tidb client 源码

tidb metrics 源码

tidb push 源码

0  赞