tidb search 源码

  • 2022-09-19
  • 浏览 (650)

tidb search 代码


// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (

	backuppb "github.com/pingcap/kvproto/pkg/brpb"

// Comparator is used for comparing the relationship of src and dst
type Comparator interface {
	Compare(src, dst []byte) bool

// startWithComparator is used for comparing whether src starts with dst
type startWithComparator struct{}

// NewStartWithComparator create a comparator to compare whether src starts with dst
func NewStartWithComparator() Comparator {
	return new(startWithComparator)

// Compare whether src starts with dst
func (ec *startWithComparator) Compare(src, dst []byte) bool {
	return bytes.HasPrefix(src, dst)

// StreamKVInfo stores kv info searched from log data files
type StreamKVInfo struct {
	Key        string `json:"key"`
	EncodedKey string `json:"-"`
	WriteType  byte   `json:"write-type"`
	StartTs    uint64 `json:"start-ts"`
	CommitTs   uint64 `json:"commit-ts"`
	CFName     string `json:"cf-name"`
	Value      string `json:"value,omitempty"`
	ShortValue string `json:"short-value,omitempty"`

// StreamBackupSearch is used for searching key from log data files
type StreamBackupSearch struct {
	storage    storage.ExternalStorage
	comparator Comparator
	searchKey  []byte // hex string
	startTs    uint64
	endTs      uint64

// NewStreamBackupSearch creates an instance of StreamBackupSearch
func NewStreamBackupSearch(storage storage.ExternalStorage, comparator Comparator, searchKey []byte) *StreamBackupSearch {
	bs := &StreamBackupSearch{
		storage:    storage,
		comparator: comparator,

	bs.searchKey = codec.EncodeBytes([]byte{}, searchKey)
	return bs

// SetStartTS set start timestamp searched from
func (s *StreamBackupSearch) SetStartTS(startTs uint64) {
	s.startTs = startTs

// SetEndTs set end timestamp searched to
func (s *StreamBackupSearch) SetEndTs(endTs uint64) {
	s.endTs = endTs

func (s *StreamBackupSearch) readDataFiles(ctx context.Context, ch chan<- *backuppb.DataFileInfo) error {
	opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()}
	pool := utils.NewWorkerPool(64, "read backup meta")
	eg, egCtx := errgroup.WithContext(ctx)
	err := s.storage.WalkDir(egCtx, opt, func(path string, size int64) error {
		if !strings.Contains(path, stream.GetStreamBackupMetaPrefix()) {
			return nil

		pool.ApplyOnErrorGroup(eg, func() error {
			m := &backuppb.Metadata{}
			b, err := s.storage.ReadFile(egCtx, path)
			if err != nil {
				return errors.Trace(err)
			err = m.Unmarshal(b)
			if err != nil {
				return errors.Trace(err)

			s.resolveMetaData(egCtx, m, ch)
			log.Debug("read backup meta file", zap.String("path", path))
			return nil

		return nil

	if err != nil {
		return errors.Trace(err)

	return eg.Wait()

func (s *StreamBackupSearch) resolveMetaData(ctx context.Context, metaData *backuppb.Metadata, ch chan<- *backuppb.DataFileInfo) {
	for _, file := range metaData.Files {
		if file.IsMeta {

		// TODO dynamically configure filter policy
		if bytes.Compare(s.searchKey, file.StartKey) < 0 {
		if bytes.Compare(s.searchKey, file.EndKey) > 0 {

		if s.startTs > 0 {
			if file.MaxTs < s.startTs {
		if s.endTs > 0 {
			if file.MinTs > s.endTs {

		ch <- file

// Search kv entries from log data files
func (s *StreamBackupSearch) Search(ctx context.Context) ([]*StreamKVInfo, error) {
	dataFilesCh := make(chan *backuppb.DataFileInfo, 32)
	entriesCh, errCh := make(chan *StreamKVInfo, 64), make(chan error, 8)
	go func() {
		defer close(dataFilesCh)
		if err := s.readDataFiles(ctx, dataFilesCh); err != nil {
			errCh <- err

	pool := utils.NewWorkerPool(16, "search key")
	var wg sync.WaitGroup

	for dataFile := range dataFilesCh {
		file := dataFile
		pool.Apply(func() {
			defer wg.Done()
			if err := s.searchFromDataFile(ctx, file, entriesCh); err != nil {
				errCh <- err

	go func() {

	for err := range errCh {
		return nil, errors.Trace(err)

	defaultCFEntries := make(map[string]*StreamKVInfo, 64)
	writeCFEntries := make(map[string]*StreamKVInfo, 64)

	for entry := range entriesCh {
		if entry.CFName == writeCFName {
			writeCFEntries[entry.EncodedKey] = entry
		} else if entry.CFName == defaultCFName {
			defaultCFEntries[entry.EncodedKey] = entry

	entries := s.mergeCFEntries(defaultCFEntries, writeCFEntries)
	return entries, nil

func (s *StreamBackupSearch) searchFromDataFile(ctx context.Context, dataFile *backuppb.DataFileInfo, ch chan<- *StreamKVInfo) error {
	buff, err := s.storage.ReadFile(ctx, dataFile.Path)
	if err != nil {
		return errors.Annotatef(err, "read data file error, file: %s", dataFile.Path)

	if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], dataFile.GetSha256()) {
		return errors.Annotatef(err, "validate checksum failed, file: %s", dataFile.Path)

	iter := stream.NewEventIterator(buff)
	for iter.Valid() {
		if err := iter.GetError(); err != nil {
			return errors.Trace(err)

		k, v := iter.Key(), iter.Value()
		if !s.comparator.Compare(k, s.searchKey) {

		_, ts, err := codec.DecodeUintDesc(k[len(k)-8:])
		if err != nil {
			return errors.Annotatef(err, "decode ts from key error, file: %s", dataFile.Path)

		k = k[:len(k)-8]
		_, rawKey, err := codec.DecodeBytes(k, nil)
		if err != nil {
			return errors.Annotatef(err, "decode raw key error, file: %s", dataFile.Path)

		if dataFile.Cf == writeCFName {
			rawWriteCFValue := new(stream.RawWriteCFValue)
			if err := rawWriteCFValue.ParseFrom(v); err != nil {
				return errors.Annotatef(err, "parse raw write cf value error, file: %s", dataFile.Path)

			valueStr := ""
			if rawWriteCFValue.HasShortValue() {
				valueStr = base64.StdEncoding.EncodeToString(rawWriteCFValue.GetShortValue())

			kvInfo := &StreamKVInfo{
				WriteType:  rawWriteCFValue.GetWriteType(),
				CFName:     dataFile.Cf,
				CommitTs:   ts,
				StartTs:    rawWriteCFValue.GetStartTs(),
				Key:        strings.ToUpper(hex.EncodeToString(rawKey)),
				EncodedKey: hex.EncodeToString(iter.Key()),
				ShortValue: valueStr,
			ch <- kvInfo
		} else if dataFile.Cf == defaultCFName {
			kvInfo := &StreamKVInfo{
				CFName:     dataFile.Cf,
				StartTs:    ts,
				Key:        strings.ToUpper(hex.EncodeToString(rawKey)),
				EncodedKey: hex.EncodeToString(iter.Key()),
				Value:      base64.StdEncoding.EncodeToString(v),
			ch <- kvInfo

	log.Info("finish search data file", zap.String("file", dataFile.Path))
	return nil

func (s *StreamBackupSearch) mergeCFEntries(defaultCFEntries, writeCFEntries map[string]*StreamKVInfo) []*StreamKVInfo {
	entries := make([]*StreamKVInfo, 0, len(defaultCFEntries)+len(writeCFEntries))
	mergedDefaultCFKeys := make(map[string]struct{}, 16)
	for _, entry := range writeCFEntries {
		entries = append(entries, entry)
		if entry.ShortValue != "" {

		keyBytes, err := hex.DecodeString(entry.Key)
		if err != nil {
			log.Warn("hex decode key failed", zap.String("key", entry.Key), zap.String("encode-key", entry.EncodedKey), zap.Error(err))

		encodedKey := codec.EncodeBytes([]byte{}, keyBytes)
		defaultCFKey := hex.EncodeToString(codec.EncodeUintDesc(encodedKey, entry.StartTs))
		defaultCFEntry, ok := defaultCFEntries[defaultCFKey]
		if !ok {

		entry.Value = defaultCFEntry.Value
		mergedDefaultCFKeys[defaultCFKey] = struct{}{}

	for key, entry := range defaultCFEntries {
		if _, ok := mergedDefaultCFKeys[key]; ok {

		entries = append(entries, entry)

	sort.Slice(entries, func(i, j int) bool {
		return entries[i].CommitTs < entries[j].CommitTs

	return entries


tidb 源码目录


tidb batcher 源码

tidb client 源码

tidb data 源码

tidb db 源码

tidb import 源码

tidb import_retry 源码

tidb merge 源码

tidb pipeline_items 源码

tidb range 源码

tidb rawkv_client 源码

0  赞