tidb analyze_idx 源码

  • 2022-09-19
  • 浏览 (545)

tidb analyze_idx 代码

文件路径:/executor/analyze_idx.go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
	"context"
	"math"
	"sync/atomic"
	"time"

	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/distsql"
	"github.com/pingcap/tidb/domain"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/parser/ast"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/statistics"
	"github.com/pingcap/tidb/types"
	"github.com/pingcap/tidb/util"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/ranger"
	"github.com/pingcap/tipb/go-tipb"
	"go.uber.org/zap"
)

// AnalyzeIndexExec represents analyze index push down executor.
type AnalyzeIndexExec struct {
	baseAnalyzeExec

	idxInfo        *model.IndexInfo
	isCommonHandle bool
	result         distsql.SelectResult
	countNullRes   distsql.SelectResult
}

func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults {
	ranges := ranger.FullRange()
	// For single-column index, we do not load null rows from TiKV, so the built histogram would not include
	// null values, and its `NullCount` would be set by result of another distsql call to get null rows.
	// For multi-column index, we cannot define null for the rows, so we still use full range, and the rows
	// containing null fields would exist in built histograms. Note that, the `NullCount` of histograms for
	// multi-column index is always 0 then.
	if len(idxExec.idxInfo.Columns) == 1 {
		ranges = ranger.FullNotNullRange()
	}
	hist, cms, fms, topN, err := idxExec.buildStats(ranges, true)
	if err != nil {
		return &statistics.AnalyzeResults{Err: err, Job: idxExec.job}
	}
	var statsVer = statistics.Version1
	if idxExec.analyzePB.IdxReq.Version != nil {
		statsVer = int(*idxExec.analyzePB.IdxReq.Version)
	}
	result := &statistics.AnalyzeResult{
		Hist:    []*statistics.Histogram{hist},
		Cms:     []*statistics.CMSketch{cms},
		TopNs:   []*statistics.TopN{topN},
		Fms:     []*statistics.FMSketch{fms},
		IsIndex: 1,
	}
	cnt := hist.NullCount
	if hist.Len() > 0 {
		cnt += hist.Buckets[hist.Len()-1].Count
	}
	if topN.TotalCount() > 0 {
		cnt += int64(topN.TotalCount())
	}
	return &statistics.AnalyzeResults{
		TableID:  idxExec.tableID,
		Ars:      []*statistics.AnalyzeResult{result},
		Job:      idxExec.job,
		StatsVer: statsVer,
		Count:    cnt,
		Snapshot: idxExec.snapshot,
	}
}

func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, fms *statistics.FMSketch, topN *statistics.TopN, err error) {
	if err = e.open(ranges, considerNull); err != nil {
		return nil, nil, nil, nil, err
	}
	defer func() {
		err1 := closeAll(e.result, e.countNullRes)
		if err == nil {
			err = err1
		}
	}()
	hist, cms, fms, topN, err = e.buildStatsFromResult(e.result, true)
	if err != nil {
		return nil, nil, nil, nil, err
	}
	if e.countNullRes != nil {
		nullHist, _, _, _, err := e.buildStatsFromResult(e.countNullRes, false)
		if err != nil {
			return nil, nil, nil, nil, err
		}
		if l := nullHist.Len(); l > 0 {
			hist.NullCount = nullHist.Buckets[l-1].Count
		}
	}
	hist.ID = e.idxInfo.ID
	return hist, cms, fms, topN, nil
}

func (e *AnalyzeIndexExec) open(ranges []*ranger.Range, considerNull bool) error {
	err := e.fetchAnalyzeResult(ranges, false)
	if err != nil {
		return err
	}
	if considerNull && len(e.idxInfo.Columns) == 1 {
		ranges = ranger.NullRange()
		err = e.fetchAnalyzeResult(ranges, true)
		if err != nil {
			return err
		}
	}
	return nil
}

// fetchAnalyzeResult builds and dispatches the `kv.Request` from given ranges, and stores the `SelectResult`
// in corresponding fields based on the input `isNullRange` argument, which indicates if the range is the
// special null range for single-column index to get the null count.
func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRange bool) error {
	var builder distsql.RequestBuilder
	var kvReqBuilder *distsql.RequestBuilder
	if e.isCommonHandle && e.idxInfo.Primary {
		kvReqBuilder = builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, true, ranges, nil)
	} else {
		kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges)
	}
	kvReqBuilder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
	startTS := uint64(math.MaxUint64)
	isoLevel := kv.RC
	if e.ctx.GetSessionVars().EnableAnalyzeSnapshot {
		startTS = e.snapshot
		isoLevel = kv.SI
	}
	kvReq, err := kvReqBuilder.
		SetAnalyzeRequest(e.analyzePB, isoLevel).
		SetStartTS(startTS).
		SetKeepOrder(true).
		SetConcurrency(e.concurrency).
		Build()
	if err != nil {
		return err
	}
	ctx := context.TODO()
	result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx)
	if err != nil {
		return err
	}
	if isNullRange {
		e.countNullRes = result
	} else {
		e.result = result
	}
	return nil
}

func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, needCMS bool) (*statistics.Histogram, *statistics.CMSketch, *statistics.FMSketch, *statistics.TopN, error) {
	failpoint.Inject("buildStatsFromResult", func(val failpoint.Value) {
		if val.(bool) {
			failpoint.Return(nil, nil, nil, nil, errors.New("mock buildStatsFromResult error"))
		}
	})
	hist := &statistics.Histogram{}
	var cms *statistics.CMSketch
	var topn *statistics.TopN
	if needCMS {
		cms = statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth]))
		topn = statistics.NewTopN(int(e.opts[ast.AnalyzeOptNumTopN]))
	}
	fms := statistics.NewFMSketch(maxSketchSize)
	statsVer := statistics.Version1
	if e.analyzePB.IdxReq.Version != nil {
		statsVer = int(*e.analyzePB.IdxReq.Version)
	}
	for {
		failpoint.Inject("mockKillRunningAnalyzeIndexJob", func() {
			dom := domain.GetDomain(e.ctx)
			dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
		})
		if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 {
			return nil, nil, nil, nil, errors.Trace(ErrQueryInterrupted)
		}
		failpoint.Inject("mockSlowAnalyzeIndex", func() {
			time.Sleep(1000 * time.Second)
		})
		data, err := result.NextRaw(context.TODO())
		if err != nil {
			return nil, nil, nil, nil, err
		}
		if data == nil {
			break
		}
		resp := &tipb.AnalyzeIndexResp{}
		err = resp.Unmarshal(data)
		if err != nil {
			return nil, nil, nil, nil, err
		}
		hist, cms, fms, topn, err = updateIndexResult(e.ctx, resp, e.job, hist, cms, fms, topn,
			e.idxInfo, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), statsVer)
		if err != nil {
			return nil, nil, nil, nil, err
		}
	}
	if needCMS && topn.TotalCount() > 0 {
		hist.RemoveVals(topn.TopN)
	}
	if needCMS && cms != nil {
		cms.CalcDefaultValForAnalyze(uint64(hist.NDV))
	}
	return hist, cms, fms, topn, nil
}

func (e *AnalyzeIndexExec) buildSimpleStats(ranges []*ranger.Range, considerNull bool) (fms *statistics.FMSketch, nullHist *statistics.Histogram, err error) {
	if err = e.open(ranges, considerNull); err != nil {
		return nil, nil, err
	}
	defer func() {
		err1 := closeAll(e.result, e.countNullRes)
		if err == nil {
			err = err1
		}
	}()
	_, _, fms, _, err = e.buildStatsFromResult(e.result, false)
	if e.countNullRes != nil {
		nullHist, _, _, _, err := e.buildStatsFromResult(e.countNullRes, false)
		if err != nil {
			return nil, nil, err
		}
		if l := nullHist.Len(); l > 0 {
			return fms, nullHist, nil
		}
	}
	return fms, nil, nil
}

func analyzeIndexNDVPushDown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults {
	ranges := ranger.FullRange()
	// For single-column index, we do not load null rows from TiKV, so the built histogram would not include
	// null values, and its `NullCount` would be set by result of another distsql call to get null rows.
	// For multi-column index, we cannot define null for the rows, so we still use full range, and the rows
	// containing null fields would exist in built histograms. Note that, the `NullCount` of histograms for
	// multi-column index is always 0 then.
	if len(idxExec.idxInfo.Columns) == 1 {
		ranges = ranger.FullNotNullRange()
	}
	fms, nullHist, err := idxExec.buildSimpleStats(ranges, len(idxExec.idxInfo.Columns) == 1)
	if err != nil {
		return &statistics.AnalyzeResults{Err: err, Job: idxExec.job}
	}
	result := &statistics.AnalyzeResult{
		Fms: []*statistics.FMSketch{fms},
		// We use histogram to get the Index's ID.
		Hist:    []*statistics.Histogram{statistics.NewHistogram(idxExec.idxInfo.ID, 0, 0, statistics.Version1, types.NewFieldType(mysql.TypeBlob), 0, 0)},
		IsIndex: 1,
	}
	r := &statistics.AnalyzeResults{
		TableID: idxExec.tableID,
		Ars:     []*statistics.AnalyzeResult{result},
		Job:     idxExec.job,
		// TODO: avoid reusing Version1.
		StatsVer: statistics.Version1,
	}
	if nullHist != nil && nullHist.Len() > 0 {
		r.Count = nullHist.Buckets[nullHist.Len()-1].Count
	}
	return r
}

func updateIndexResult(
	ctx sessionctx.Context,
	resp *tipb.AnalyzeIndexResp,
	job *statistics.AnalyzeJob,
	hist *statistics.Histogram,
	cms *statistics.CMSketch,
	fms *statistics.FMSketch,
	topn *statistics.TopN,
	idxInfo *model.IndexInfo,
	numBuckets int,
	numTopN int,
	statsVer int,
) (
	*statistics.Histogram,
	*statistics.CMSketch,
	*statistics.FMSketch,
	*statistics.TopN,
	error,
) {
	var err error
	needCMS := cms != nil
	respHist := statistics.HistogramFromProto(resp.Hist)
	if job != nil {
		UpdateAnalyzeJob(ctx, job, int64(respHist.TotalRowCount()))
	}
	hist, err = statistics.MergeHistograms(ctx.GetSessionVars().StmtCtx, hist, respHist, numBuckets, statsVer)
	if err != nil {
		return nil, nil, nil, nil, err
	}
	if needCMS {
		if resp.Cms == nil {
			logutil.Logger(context.TODO()).Warn("nil CMS in response", zap.String("table", idxInfo.Table.O), zap.String("index", idxInfo.Name.O))
		} else {
			cm, tmpTopN := statistics.CMSketchAndTopNFromProto(resp.Cms)
			if err := cms.MergeCMSketch(cm); err != nil {
				return nil, nil, nil, nil, err
			}
			statistics.MergeTopNAndUpdateCMSketch(topn, tmpTopN, cms, uint32(numTopN))
		}
	}
	if fms != nil && resp.Collector != nil && resp.Collector.FmSketch != nil {
		fms.MergeFMSketch(statistics.FMSketchFromProto(resp.Collector.FmSketch))
	}
	return hist, cms, fms, topn, nil
}

相关信息

tidb 源码目录

相关文章

tidb adapter 源码

tidb admin 源码

tidb admin_plugins 源码

tidb admin_telemetry 源码

tidb aggregate 源码

tidb analyze 源码

tidb analyze_col 源码

tidb analyze_col_v2 源码

tidb analyze_fast 源码

tidb analyze_global_stats 源码

0  赞