// Copyright 2017 PingCAP, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
var _ Executor = &AnalyzeExec{}
// AnalyzeExec represents Analyze executor.
type AnalyzeExec struct {
tasks []*analyzeTask
wg util.WaitGroupWrapper
opts map[ast.AnalyzeOptionType]uint64
OptionsMap map[int64]core.V2AnalyzeOptions
var (
// RandSeed is the seed for randing package.
// It's public for test.
RandSeed = int64(1)
// MaxRegionSampleSize is the max sample size for one region when analyze v1 collects samples from table.
// It's public for test.
MaxRegionSampleSize = int64(1000)
const (
maxSketchSize = 10000
type taskType int
const (
colTask taskType = iota
// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return err
taskCh := make(chan *analyzeTask, len(e.tasks))
resultsCh := make(chan *statistics.AnalyzeResults, len(e.tasks))
if len(e.tasks) < concurrency {
concurrency = len(e.tasks)
for i := 0; i < concurrency; i++ {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
for _, task := range e.tasks {
prepareV2AnalyzeJobInfo(task.colExec, false)
AddNewAnalyzeJob(e.ctx, task.job)
failpoint.Inject("mockKillPendingAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
for _, task := range e.tasks {
taskCh <- task
pruneMode := variable.PartitionPruneMode(e.ctx.GetSessionVars().PartitionPruneMode.Load())
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
needGlobalStats := pruneMode == variable.Dynamic
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
err = e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh)
for _, task := range e.tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
if err != nil {
return err
failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
err = e.handleGlobalStats(ctx, needGlobalStats, globalStatsMap)
if err != nil {
return err
err = e.saveV2AnalyzeOpts()
if err != nil {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
if e.ctx.GetSessionVars().InRestrictedSQL {
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema))
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema), handle.WithTableStatsByQuery())
func (e *AnalyzeExec) saveV2AnalyzeOpts() error {
if !variable.PersistAnalyzeOptions.Load() || len(e.OptionsMap) == 0 {
return nil
// only to save table options if dynamic prune mode
dynamicPrune := variable.PartitionPruneMode(e.ctx.GetSessionVars().PartitionPruneMode.Load()) == variable.Dynamic
toSaveMap := make(map[int64]core.V2AnalyzeOptions)
for id, opts := range e.OptionsMap {
if !opts.IsPartition || !dynamicPrune {
toSaveMap[id] = opts
sql := new(strings.Builder)
sqlexec.MustFormatSQL(sql, "REPLACE INTO mysql.analyze_options (table_id,sample_num,sample_rate,buckets,topn,column_choice,column_ids) VALUES ")
idx := 0
for _, opts := range toSaveMap {
sampleNum := opts.RawOpts[ast.AnalyzeOptNumSamples]
sampleRate := float64(0)
if val, ok := opts.RawOpts[ast.AnalyzeOptSampleRate]; ok {
sampleRate = math.Float64frombits(val)
buckets := opts.RawOpts[ast.AnalyzeOptNumBuckets]
topn := int64(-1)
if val, ok := opts.RawOpts[ast.AnalyzeOptNumTopN]; ok {
topn = int64(val)
colChoice := opts.ColChoice.String()
colIDs := make([]string, len(opts.ColumnList))
for i, colInfo := range opts.ColumnList {
colIDs[i] = strconv.FormatInt(colInfo.ID, 10)
colIDStrs := strings.Join(colIDs, ",")
sqlexec.MustFormatSQL(sql, "(%?,%?,%?,%?,%?,%?,%?)", opts.PhyTableID, sampleNum, sampleRate, buckets, topn, colChoice, colIDStrs)
if idx < len(toSaveMap)-1 {
sqlexec.MustFormatSQL(sql, ",")
idx += 1
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, nil, sql.String())
if err != nil {
return err
return nil
func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable()
if err != nil {
return errors.Errorf("check tidb_enable_historical_stats failed: %v", err)
if !historicalStatsEnabled {
return nil
is := domain.GetDomain(e.ctx).InfoSchema()
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
tblInfo := tbl.Meta()
dbInfo, existed := is.SchemaByTable(tblInfo)
if !existed {
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil {
return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O)
return nil
// handleResultsError will handle the error fetch from resultsCh and record it in log
func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
var err error
for panicCnt < concurrency {
results, ok := <-resultsCh
if !ok {
if results.Err != nil {
err = results.Err
if isAnalyzeWorkerPanic(err) {
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
finishJobWithLog(e.ctx, results.Job, err)
if results.TableID.IsPartitionTable() && needGlobalStats {
for _, result := range results.Ars {
if result.IsIndex == 0 {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)}
histIDs := make([]int64, 0, len(result.Hist))
for _, hg := range result.Hist {
// It's normal virtual column, skip.
if hg == nil {
histIDs = append(histIDs, hg.ID)
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
if err1 := statsHandle.SaveTableStatsToStorage(results, results.TableID.IsPartitionTable(), e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := e.recordHistoricalStats(results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
return err
func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
resultsCh <- &statistics.AnalyzeResults{
Err: getAnalyzePanicErr(r),
Job: task.job,
for {
var ok bool
task, ok = <-taskCh
if !ok {
StartAnalyzeJob(e.ctx, task.job)
switch task.taskType {
case colTask:
resultsCh <- analyzeColumnsPushDownEntry(task.colExec)
case idxTask:
resultsCh <- analyzeIndexPushdown(task.idxExec)
case fastTask:
resultsCh <- analyzeFastExec(task.fastExec)
case pkIncrementalTask:
resultsCh <- analyzePKIncremental(task.colIncrementalExec)
case idxIncrementalTask:
resultsCh <- analyzeIndexIncremental(task.idxIncrementalExec)
type analyzeTask struct {
taskType taskType
idxExec *AnalyzeIndexExec
colExec *AnalyzeColumnsExec
fastExec *AnalyzeFastExec
idxIncrementalExec *analyzeIndexIncrementalExec
colIncrementalExec *analyzePKIncrementalExec
job *statistics.AnalyzeJob
type baseAnalyzeExec struct {
ctx sessionctx.Context
tableID statistics.AnalyzeTableID
concurrency int
analyzePB *tipb.AnalyzeReq
opts map[ast.AnalyzeOptionType]uint64
job *statistics.AnalyzeJob
snapshot uint64
// AddNewAnalyzeJob records the new analyze job.
func AddNewAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob) {
if job == nil {
var instance string
serverInfo, err := infosync.GetServerInfo()
if err != nil {
logutil.BgLogger().Error("failed to get server info", zap.Error(err))
instance = "unknown"
} else {
instance = fmt.Sprintf("%s:%d", serverInfo.IP, serverInfo.Port)
statsHandle := domain.GetDomain(ctx).StatsHandle()
err = statsHandle.InsertAnalyzeJob(job, instance, ctx.GetSessionVars().ConnectionID)
if err != nil {
logutil.BgLogger().Error("failed to insert analyze job", zap.Error(err))
// StartAnalyzeJob marks the state of the analyze job as running and sets the start time.
func StartAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob) {
if job == nil || job.ID == nil {
job.StartTime = time.Now()
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
const sql = "UPDATE mysql.analyze_jobs SET start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, job.StartTime.UTC().Format(types.TimeFormat), statistics.AnalyzeRunning, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzePending, statistics.AnalyzeRunning)), zap.Error(err))
// UpdateAnalyzeJob updates count of the processed rows when increment reaches a threshold.
func UpdateAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCount int64) {
if job == nil || job.ID == nil {
delta := job.Progress.Update(rowCount)
if delta == 0 {
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
const sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, delta, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("process %v rows", delta)), zap.Error(err))
// FinishAnalyzeJob updates the state of the analyze job to finished/failed according to `meetError` and sets the end time.
func FinishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
job.EndTime = time.Now()
var sql string
var args []interface{}
// process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job
// is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly.
if analyzeErr != nil {
failReason := analyzeErr.Error()
const textMaxLength = 65535
if len(failReason) > textMaxLength {
failReason = failReason[:textMaxLength]
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?"
args = []interface{}{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID}
} else {
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?"
args = []interface{}{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID}
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)
if err != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err))
func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
FinishAnalyzeJob(sctx, job, analyzeErr)
if job != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()))
