tidb systable_restore 源码

  • 2022-09-19
  • 浏览 (562)

tidb systable_restore 代码


// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (

	berrors "github.com/pingcap/tidb/br/pkg/errors"
	filter "github.com/pingcap/tidb/util/table-filter"

var statsTables = map[string]struct{}{
	"stats_buckets":    {},
	"stats_extended":   {},
	"stats_feedback":   {},
	"stats_fm_sketch":  {},
	"stats_histograms": {},
	"stats_meta":       {},
	"stats_top_n":      {},

var unRecoverableTable = map[string]struct{}{
	// some variables in tidb (e.g. gc_safe_point) cannot be recovered.
	"tidb":             {},
	"global_variables": {},

	"column_stats_usage":               {},
	"capture_plan_baselines_blacklist": {},
	// gc info don't need to recover.
	"gc_delete_range":      {},
	"gc_delete_range_done": {},

	// schema_index_usage has table id need to be rewrite.
	"schema_index_usage": {},

// tables in this map is restored when fullClusterRestore=true
// the value part is the filter in SQL where clause which is used to
// skip clearing or restoring 'cloud_admin'@'%' which is a special
// user on TiDB Cloud
var sysPrivilegeTableMap = map[string]string{
	"user":          "not (user = 'cloud_admin' and host = '%')",       // since v1.0.0
	"db":            "not (user = 'cloud_admin' and host = '%')",       // since v1.0.0
	"tables_priv":   "not (user = 'cloud_admin' and host = '%')",       // since v1.0.0
	"columns_priv":  "not (user = 'cloud_admin' and host = '%')",       // since v1.0.0
	"default_roles": "not (user = 'cloud_admin' and host = '%')",       // since v3.0.0
	"role_edges":    "not (to_user = 'cloud_admin' and to_host = '%')", // since v3.0.0
	"global_priv":   "not (user = 'cloud_admin' and host = '%')",       // since v3.0.8
	"global_grants": "not (user = 'cloud_admin' and host = '%')",       // since v5.0.3

func isUnrecoverableTable(tableName string) bool {
	_, ok := unRecoverableTable[tableName]
	return ok

func isStatsTable(tableName string) bool {
	_, ok := statsTables[tableName]
	return ok

// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
	sysDB := mysql.SystemDB

	temporaryDB := utils.TemporaryDBName(sysDB)
	defer rc.cleanTemporaryDatabase(ctx, sysDB)

	if !f.MatchSchema(sysDB) || !rc.withSysTable {
		log.Debug("system database filtered out", zap.String("database", sysDB))
	originDatabase, ok := rc.databases[temporaryDB.O]
	if !ok {
		log.Info("system database not backed up, skipping", zap.String("database", sysDB))
	db, ok := rc.getDatabaseByName(sysDB)
	if !ok {
		// Or should we create the database here?
		log.Warn("target database not exist, aborting", zap.String("database", sysDB))

	tablesRestored := make([]string, 0, len(originDatabase.Tables))
	for _, table := range originDatabase.Tables {
		tableName := table.Info.Name
		if f.MatchTable(sysDB, tableName.O) {
			if err := rc.replaceTemporaryTableToSystable(ctx, table.Info, db); err != nil {
				log.Warn("error during merging temporary tables into system tables",
					zap.Stringer("table", tableName),
			tablesRestored = append(tablesRestored, tableName.L)
	if err := rc.afterSystemTablesReplaced(tablesRestored); err != nil {
		for _, e := range multierr.Errors(err) {
			log.Warn("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e))

// database is a record of a database.
// For fast querying whether a table exists and the temporary database of it.
type database struct {
	ExistingTables map[string]*model.TableInfo
	Name           model.CIStr
	TemporaryName  model.CIStr

// getDatabaseByName make a record of a database from info schema by its name.
func (rc *Client) getDatabaseByName(name string) (*database, bool) {
	infoSchema := rc.dom.InfoSchema()
	schema, ok := infoSchema.SchemaByName(model.NewCIStr(name))
	if !ok {
		return nil, false
	db := &database{
		ExistingTables: map[string]*model.TableInfo{},
		Name:           model.NewCIStr(name),
		TemporaryName:  utils.TemporaryDBName(name),
	for _, t := range schema.Tables {
		db.ExistingTables[t.Name.L] = t
	return db, true

// afterSystemTablesReplaced do some extra work for special system tables.
// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect.
func (rc *Client) afterSystemTablesReplaced(tables []string) error {
	var err error
	for _, table := range tables {
		switch {
		case table == "user":
			if rc.fullClusterRestore {
				log.Info("privilege system table restored, please reconnect to make it effective")
				err = rc.dom.NotifyUpdatePrivilege()
			} else {
				// to make it compatible with older version
				// todo: should we allow restore system table in non-fresh cluster in later br version?
				// if we don't, we can check it at first place.
				err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable,
					"restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually"))
	return err

// replaceTemporaryTableToSystable replaces the temporary table to real system table.
func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model.TableInfo, db *database) error {
	tableName := ti.Name.L
	execSQL := func(sql string) error {
		// SQLs here only contain table name and database name, seems it is no need to redact them.
		if err := rc.db.se.Execute(ctx, sql); err != nil {
			log.Warn("failed to execute SQL restore system database",
				zap.String("table", tableName),
				zap.Stringer("database", db.Name),
				zap.String("sql", sql),
			return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql)
		log.Info("successfully restore system database",
			zap.String("table", tableName),
			zap.Stringer("database", db.Name),
			zap.String("sql", sql),
		return nil

	// The newly created tables have different table IDs with original tables,
	// 	hence the old statistics are invalid.
	// TODO:
	// 	1   ) Rewrite the table IDs via `UPDATE _temporary_mysql.stats_xxx SET table_id = new_table_id WHERE table_id = old_table_id`
	//		BEFORE replacing into and then execute `rc.statsHandler.Update(rc.dom.InfoSchema())`.
	//  1.5 ) (Optional) The UPDATE statement sometimes costs, the whole system tables restore step can be place into the restore pipeline.
	//  2   ) Deprecate the origin interface for backing up statistics.
	if isStatsTable(tableName) {
		return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring stats via `mysql` schema isn't support yet: " +
			"the table ID is out-of-date and may corrupt existing statistics")

	if isUnrecoverableTable(tableName) {
		return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring unsupported `mysql` schema table")

	if db.ExistingTables[tableName] != nil {
		whereClause := ""
		if rc.fullClusterRestore && sysPrivilegeTableMap[tableName] != "" {
			// cloud_admin is a special user on tidb cloud, need to skip it.
			whereClause = fmt.Sprintf("WHERE %s", sysPrivilegeTableMap[tableName])
			log.Info("full cluster restore, delete existing data",
				zap.String("table", tableName), zap.Stringer("schema", db.Name))
			deleteSQL := fmt.Sprintf("DELETE FROM %s %s;",
				utils.EncloseDBAndTable(db.Name.L, tableName), whereClause)
			if err := execSQL(deleteSQL); err != nil {
				return err
		log.Info("replace into existing table",
			zap.String("table", tableName),
			zap.Stringer("schema", db.Name))
		// target column order may different with source cluster
		columnNames := make([]string, 0, len(ti.Columns))
		for _, col := range ti.Columns {
			columnNames = append(columnNames, utils.EncloseName(col.Name.L))
		colListStr := strings.Join(columnNames, ",")
		replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s(%s) SELECT %s FROM %s %s;",
			utils.EncloseDBAndTable(db.Name.L, tableName),
			colListStr, colListStr,
			utils.EncloseDBAndTable(db.TemporaryName.L, tableName),
		return execSQL(replaceIntoSQL)

	renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;",
		utils.EncloseDBAndTable(db.TemporaryName.L, tableName),
		utils.EncloseDBAndTable(db.Name.L, tableName),
	return execSQL(renameSQL)

func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) {
	database := utils.TemporaryDBName(originDB)
	log.Debug("dropping temporary database", zap.Stringer("database", database))
	sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L))
	if err := rc.db.se.Execute(ctx, sql); err != nil {
		logutil.WarnTerm("failed to drop temporary database, it should be dropped manually",
			zap.Stringer("database", database),


tidb 源码目录


tidb batcher 源码

tidb client 源码

tidb data 源码

tidb db 源码

tidb import 源码

tidb import_retry 源码

tidb merge 源码

tidb pipeline_items 源码

tidb range 源码

tidb rawkv_client 源码

0  赞