tidb memtable_predicate_extractor 源码
tidb memtable_predicate_extractor 代码
文件路径:/planner/core/memtable_predicate_extractor.go
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"bytes"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"time"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
// MemTablePredicateExtractor is used to extract some predicates from `WHERE` clause
// and push the predicates down to the data retrieving on reading memory table stage.
//
// e.g:
// SELECT * FROM cluster_config WHERE type='tikv' AND instance='192.168.1.9:2379'
// We must request all components in the cluster via HTTP API for retrieving
// configurations and filter them by `type/instance` columns.
//
// The purpose of defining a `MemTablePredicateExtractor` is to optimize this
// 1. Define a `ClusterConfigTablePredicateExtractor`
// 2. Extract the `type/instance` columns on the logic optimizing stage and save them via fields.
// 3. Passing the extractor to the `ClusterReaderExecExec` executor
// 4. Executor sends requests to the target components instead of all of the components
type MemTablePredicateExtractor interface {
// Extracts predicates which can be pushed down and returns the remained predicates
Extract(sessionctx.Context, *expression.Schema, []*types.FieldName, []expression.Expression) (remained []expression.Expression)
explainInfo(p *PhysicalMemTable) string
}
// extractHelper contains some common utililty functions for all extractor.
// define an individual struct instead of a bunch of un-exported functions
// to avoid polluting the global scope of current package.
type extractHelper struct{}
func (extractHelper) extractColInConsExpr(extractCols map[int64]*types.FieldName, expr *expression.ScalarFunction) (string, []types.Datum) {
args := expr.GetArgs()
col, isCol := args[0].(*expression.Column)
if !isCol {
return "", nil
}
name, found := extractCols[col.UniqueID]
if !found {
return "", nil
}
// All expressions in IN must be a constant
// SELECT * FROM t1 WHERE c IN ('1', '2')
results := make([]types.Datum, 0, len(args[1:]))
for _, arg := range args[1:] {
constant, ok := arg.(*expression.Constant)
if !ok || constant.DeferredExpr != nil || constant.ParamMarker != nil {
return "", nil
}
results = append(results, constant.Value)
}
return name.ColName.L, results
}
func (extractHelper) extractColBinaryOpConsExpr(extractCols map[int64]*types.FieldName, expr *expression.ScalarFunction) (string, []types.Datum) {
args := expr.GetArgs()
var col *expression.Column
var colIdx int
// c = 'rhs'
// 'lhs' = c
for i := 0; i < 2; i++ {
var isCol bool
col, isCol = args[i].(*expression.Column)
if isCol {
colIdx = i
break
}
}
if col == nil {
return "", nil
}
name, found := extractCols[col.UniqueID]
if !found {
return "", nil
}
// The `lhs/rhs` of EQ expression must be a constant
// SELECT * FROM t1 WHERE c='rhs'
// SELECT * FROM t1 WHERE 'lhs'=c
constant, ok := args[1-colIdx].(*expression.Constant)
if !ok || constant.DeferredExpr != nil || constant.ParamMarker != nil {
return "", nil
}
return name.ColName.L, []types.Datum{constant.Value}
}
// extract the OR expression, e.g:
// SELECT * FROM t1 WHERE c1='a' OR c1='b' OR c1='c'
func (helper extractHelper) extractColOrExpr(extractCols map[int64]*types.FieldName, expr *expression.ScalarFunction) (string, []types.Datum) {
args := expr.GetArgs()
lhs, ok := args[0].(*expression.ScalarFunction)
if !ok {
return "", nil
}
rhs, ok := args[1].(*expression.ScalarFunction)
if !ok {
return "", nil
}
// Define an inner function to avoid populate the outer scope
var extract = func(extractCols map[int64]*types.FieldName, fn *expression.ScalarFunction) (string, []types.Datum) {
switch helper.getStringFunctionName(fn) {
case ast.EQ:
return helper.extractColBinaryOpConsExpr(extractCols, fn)
case ast.LogicOr:
return helper.extractColOrExpr(extractCols, fn)
case ast.In:
return helper.extractColInConsExpr(extractCols, fn)
default:
return "", nil
}
}
lhsColName, lhsDatums := extract(extractCols, lhs)
if lhsColName == "" {
return "", nil
}
rhsColName, rhsDatums := extract(extractCols, rhs)
if lhsColName == rhsColName {
return lhsColName, append(lhsDatums, rhsDatums...)
}
return "", nil
}
// merges `lhs` and `datums` with CNF logic
// 1. Returns `datums` set if the `lhs` is an empty set
// 2. Returns the intersection of `datums` and `lhs` if the `lhs` is not an empty set
func (extractHelper) merge(lhs set.StringSet, datums []types.Datum, toLower bool) set.StringSet {
tmpNodeTypes := set.NewStringSet()
for _, datum := range datums {
s, err := datum.ToString()
if err != nil {
return nil
}
if toLower {
s = strings.ToLower(s)
}
tmpNodeTypes.Insert(s)
}
if len(lhs) > 0 {
return lhs.Intersection(tmpNodeTypes)
}
return tmpNodeTypes
}
func (helper extractHelper) extractCol(
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
extractColName string,
valueToLower bool,
) (
remained []expression.Expression,
skipRequest bool,
result set.StringSet,
) {
remained = make([]expression.Expression, 0, len(predicates))
result = set.NewStringSet()
extractCols := helper.findColumn(schema, names, extractColName)
if len(extractCols) == 0 {
return predicates, false, result
}
// We should use INTERSECTION of sets because of the predicates is CNF array
for _, expr := range predicates {
fn, ok := expr.(*expression.ScalarFunction)
if !ok {
continue
}
var colName string
var datums []types.Datum // the memory of datums should not be reused, they will be put into result.
switch helper.getStringFunctionName(fn) {
case ast.EQ:
colName, datums = helper.extractColBinaryOpConsExpr(extractCols, fn)
case ast.In:
colName, datums = helper.extractColInConsExpr(extractCols, fn)
case ast.LogicOr:
colName, datums = helper.extractColOrExpr(extractCols, fn)
}
if colName == extractColName {
result = helper.merge(result, datums, valueToLower)
skipRequest = len(result) == 0
} else {
remained = append(remained, expr)
}
// There are no data if the low-level executor skip request, so the filter can be droped
if skipRequest {
remained = remained[:0]
break
}
}
return
}
// extracts the string pattern column, e.g:
// SELECT * FROM t WHERE c LIKE '%a%'
// SELECT * FROM t WHERE c LIKE '%a%' AND c REGEXP '.*xxx.*'
// SELECT * FROM t WHERE c LIKE '%a%' OR c REGEXP '.*xxx.*'
func (helper extractHelper) extractLikePatternCol(
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
extractColName string,
toLower bool,
needLike2Regexp bool,
) (
remained []expression.Expression,
patterns []string,
) {
remained = make([]expression.Expression, 0, len(predicates))
extractCols := helper.findColumn(schema, names, extractColName)
if len(extractCols) == 0 {
return predicates, nil
}
// We use a string array to save multiple patterns because the Golang and Rust don't
// support perl-like CNF regular expression: (?=expr1)(?=expr2).
// e.g:
// SELECT * FROM t WHERE c LIKE '%a%' AND c LIKE '%b%' AND c REGEXP 'gc.*[0-9]{10,20}'
for _, expr := range predicates {
fn, ok := expr.(*expression.ScalarFunction)
if !ok {
remained = append(remained, expr)
continue
}
var canBuildPattern bool
var pattern string
// We use '|' to combine DNF regular expression: .*a.*|.*b.*
// e.g:
// SELECT * FROM t WHERE c LIKE '%a%' OR c LIKE '%b%'
if fn.FuncName.L == ast.LogicOr && !toLower {
canBuildPattern, pattern = helper.extractOrLikePattern(fn, extractColName, extractCols, needLike2Regexp)
} else {
canBuildPattern, pattern = helper.extractLikePattern(fn, extractColName, extractCols, needLike2Regexp)
}
if canBuildPattern && toLower {
pattern = strings.ToLower(pattern)
}
if canBuildPattern {
patterns = append(patterns, pattern)
} else {
remained = append(remained, expr)
}
}
return
}
func (helper extractHelper) extractOrLikePattern(
orFunc *expression.ScalarFunction,
extractColName string,
extractCols map[int64]*types.FieldName,
needLike2Regexp bool,
) (
ok bool,
pattern string,
) {
predicates := expression.SplitDNFItems(orFunc)
if len(predicates) == 0 {
return false, ""
}
patternBuilder := make([]string, 0, len(predicates))
for _, predicate := range predicates {
fn, ok := predicate.(*expression.ScalarFunction)
if !ok {
return false, ""
}
ok, partPattern := helper.extractLikePattern(fn, extractColName, extractCols, needLike2Regexp)
if !ok {
return false, ""
}
patternBuilder = append(patternBuilder, partPattern)
}
return true, strings.Join(patternBuilder, "|")
}
func (helper extractHelper) extractLikePattern(
fn *expression.ScalarFunction,
extractColName string,
extractCols map[int64]*types.FieldName,
needLike2Regexp bool,
) (
ok bool,
pattern string,
) {
var colName string
var datums []types.Datum
switch fn.FuncName.L {
case ast.EQ, ast.Like, ast.Regexp, ast.RegexpLike:
colName, datums = helper.extractColBinaryOpConsExpr(extractCols, fn)
}
if colName == extractColName {
switch fn.FuncName.L {
case ast.EQ:
return true, "^" + regexp.QuoteMeta(datums[0].GetString()) + "$"
case ast.Like:
if needLike2Regexp {
return true, stringutil.CompileLike2Regexp(datums[0].GetString())
}
return true, datums[0].GetString()
case ast.Regexp, ast.RegexpLike:
return true, datums[0].GetString()
default:
return false, ""
}
} else {
return false, ""
}
}
func (helper extractHelper) findColumn(schema *expression.Schema, names []*types.FieldName, colName string) map[int64]*types.FieldName {
extractCols := make(map[int64]*types.FieldName)
for i, name := range names {
if name.ColName.L == colName {
extractCols[schema.Columns[i].UniqueID] = name
}
}
return extractCols
}
// getTimeFunctionName is used to get the (time) function name.
// For the expression that push down to the coprocessor, the function name is different with normal compare function,
// Then getTimeFunctionName will do a sample function name convert.
// Currently, this is used to support query `CLUSTER_SLOW_QUERY` at any time.
func (helper extractHelper) getTimeFunctionName(fn *expression.ScalarFunction) string {
switch fn.Function.PbCode() {
case tipb.ScalarFuncSig_GTTime:
return ast.GT
case tipb.ScalarFuncSig_GETime:
return ast.GE
case tipb.ScalarFuncSig_LTTime:
return ast.LT
case tipb.ScalarFuncSig_LETime:
return ast.LE
case tipb.ScalarFuncSig_EQTime:
return ast.EQ
default:
return fn.FuncName.L
}
}
// getStringFunctionName is used to get the (string) function name.
// For the expression that push down to the coprocessor, the function name is different with normal compare function,
// Then getStringFunctionName will do a sample function name convert.
// Currently, this is used to support query `CLUSTER_STMT_SUMMARY` at any string.
func (helper extractHelper) getStringFunctionName(fn *expression.ScalarFunction) string {
switch fn.Function.PbCode() {
case tipb.ScalarFuncSig_GTString:
return ast.GT
case tipb.ScalarFuncSig_GEString:
return ast.GE
case tipb.ScalarFuncSig_LTString:
return ast.LT
case tipb.ScalarFuncSig_LEString:
return ast.LE
case tipb.ScalarFuncSig_EQString:
return ast.EQ
default:
return fn.FuncName.L
}
}
// extracts the time range column, e.g:
// SELECT * FROM t WHERE time='2019-10-10 10:10:10'
// SELECT * FROM t WHERE time>'2019-10-10 10:10:10' AND time<'2019-10-11 10:10:10'
func (helper extractHelper) extractTimeRange(
ctx sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
extractColName string,
timezone *time.Location,
) (
remained []expression.Expression,
// unix timestamp in nanoseconds
startTime int64,
endTime int64,
) {
remained = make([]expression.Expression, 0, len(predicates))
extractCols := helper.findColumn(schema, names, extractColName)
if len(extractCols) == 0 {
return predicates, startTime, endTime
}
for _, expr := range predicates {
fn, ok := expr.(*expression.ScalarFunction)
if !ok {
remained = append(remained, expr)
continue
}
var colName string
var datums []types.Datum
fnName := helper.getTimeFunctionName(fn)
switch fnName {
case ast.GT, ast.GE, ast.LT, ast.LE, ast.EQ:
colName, datums = helper.extractColBinaryOpConsExpr(extractCols, fn)
}
if colName == extractColName {
timeType := types.NewFieldType(mysql.TypeDatetime)
timeType.SetDecimal(6)
timeDatum, err := datums[0].ConvertTo(ctx.GetSessionVars().StmtCtx, timeType)
if err != nil || timeDatum.Kind() == types.KindNull {
remained = append(remained, expr)
continue
}
mysqlTime := timeDatum.GetMysqlTime()
timestamp := time.Date(mysqlTime.Year(),
time.Month(mysqlTime.Month()),
mysqlTime.Day(),
mysqlTime.Hour(),
mysqlTime.Minute(),
mysqlTime.Second(),
mysqlTime.Microsecond()*1000,
timezone,
).UnixNano()
switch fnName {
case ast.EQ:
startTime = mathutil.Max(startTime, timestamp)
if endTime == 0 {
endTime = timestamp
} else {
endTime = mathutil.Min(endTime, timestamp)
}
case ast.GT:
// FixMe: add 1ms is not absolutely correct here, just because the log search precision is millisecond.
startTime = mathutil.Max(startTime, timestamp+int64(time.Millisecond))
case ast.GE:
startTime = mathutil.Max(startTime, timestamp)
case ast.LT:
if endTime == 0 {
endTime = timestamp - int64(time.Millisecond)
} else {
endTime = mathutil.Min(endTime, timestamp-int64(time.Millisecond))
}
case ast.LE:
if endTime == 0 {
endTime = timestamp
} else {
endTime = mathutil.Min(endTime, timestamp)
}
default:
remained = append(remained, expr)
}
} else {
remained = append(remained, expr)
}
}
return
}
func (helper extractHelper) parseQuantiles(quantileSet set.StringSet) []float64 {
quantiles := make([]float64, 0, len(quantileSet))
for k := range quantileSet {
v, err := strconv.ParseFloat(k, 64)
if err != nil {
// ignore the parse error won't affect result.
continue
}
quantiles = append(quantiles, v)
}
slices.Sort(quantiles)
return quantiles
}
func (helper extractHelper) parseUint64(uint64Set set.StringSet) []uint64 {
uint64s := make([]uint64, 0, len(uint64Set))
for k := range uint64Set {
v, err := strconv.ParseUint(k, 10, 64)
if err != nil {
// ignore the parse error won't affect result.
continue
}
uint64s = append(uint64s, v)
}
slices.Sort(uint64s)
return uint64s
}
func (helper extractHelper) extractCols(
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
excludeCols set.StringSet,
valueToLower bool) ([]expression.Expression, bool, map[string]set.StringSet) {
cols := map[string]set.StringSet{}
remained := predicates
skipRequest := false
// Extract the label columns.
for _, name := range names {
if excludeCols.Exist(name.ColName.L) {
continue
}
var values set.StringSet
remained, skipRequest, values = helper.extractCol(schema, names, remained, name.ColName.L, valueToLower)
if skipRequest {
return nil, true, nil
}
if len(values) == 0 {
continue
}
cols[name.ColName.L] = values
}
return remained, skipRequest, cols
}
func (helper extractHelper) convertToTime(t int64) time.Time {
if t == 0 || t == math.MaxInt64 {
return time.Now()
}
return time.Unix(0, t)
}
func (extractHelper) convertToBoolSlice(uint64Slice []uint64) []bool {
if len(uint64Slice) == 0 {
return []bool{false, true}
}
var res []bool
// use to keep res unique
b := make(map[bool]struct{}, 2)
for _, l := range uint64Slice {
tmpBool := l == 1
_, ok := b[tmpBool]
if !ok {
b[tmpBool] = struct{}{}
res = append(res, tmpBool)
}
}
return res
}
// ClusterTableExtractor is used to extract some predicates of cluster table.
type ClusterTableExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
// NodeTypes represents all components types we should send request to.
// e.g:
// 1. SELECT * FROM cluster_config WHERE type='tikv'
// 2. SELECT * FROM cluster_config WHERE type in ('tikv', 'tidb')
NodeTypes set.StringSet
// Instances represents all components instances we should send request to.
// e.g:
// 1. SELECT * FROM cluster_config WHERE instance='192.168.1.7:2379'
// 2. SELECT * FROM cluster_config WHERE type in ('192.168.1.7:2379', '192.168.1.9:2379')
Instances set.StringSet
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *ClusterTableExtractor) Extract(_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
remained, typeSkipRequest, nodeTypes := e.extractCol(schema, names, predicates, "type", true)
remained, addrSkipRequest, instances := e.extractCol(schema, names, remained, "instance", false)
e.SkipRequest = typeSkipRequest || addrSkipRequest
e.NodeTypes = nodeTypes
e.Instances = instances
return remained
}
func (e *ClusterTableExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request:true"
}
r := new(bytes.Buffer)
if len(e.NodeTypes) > 0 {
r.WriteString(fmt.Sprintf("node_types:[%s], ", extractStringFromStringSet(e.NodeTypes)))
}
if len(e.Instances) > 0 {
r.WriteString(fmt.Sprintf("instances:[%s], ", extractStringFromStringSet(e.Instances)))
}
// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}
// ClusterLogTableExtractor is used to extract some predicates of `cluster_config`
type ClusterLogTableExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
// NodeTypes represents all components types we should send request to.
// e.g:
// 1. SELECT * FROM cluster_log WHERE type='tikv'
// 2. SELECT * FROM cluster_log WHERE type in ('tikv', 'tidb')
NodeTypes set.StringSet
// Instances represents all components instances we should send request to.
// e.g:
// 1. SELECT * FROM cluster_log WHERE instance='192.168.1.7:2379'
// 2. SELECT * FROM cluster_log WHERE instance in ('192.168.1.7:2379', '192.168.1.9:2379')
Instances set.StringSet
// StartTime represents the beginning time of log message
// e.g: SELECT * FROM cluster_log WHERE time>'2019-10-10 10:10:10.999'
StartTime int64
// EndTime represents the ending time of log message
// e.g: SELECT * FROM cluster_log WHERE time<'2019-10-11 10:10:10.999'
EndTime int64
// Pattern is used to filter the log message
// e.g:
// 1. SELECT * FROM cluster_log WHERE message like '%gc%'
// 2. SELECT * FROM cluster_log WHERE message regexp '.*'
Patterns []string
LogLevels set.StringSet
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *ClusterLogTableExtractor) Extract(
ctx sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `type/instance` columns
remained, typeSkipRequest, nodeTypes := e.extractCol(schema, names, predicates, "type", true)
remained, addrSkipRequest, instances := e.extractCol(schema, names, remained, "instance", false)
remained, levlSkipRequest, logLevels := e.extractCol(schema, names, remained, "level", true)
e.SkipRequest = typeSkipRequest || addrSkipRequest || levlSkipRequest
e.NodeTypes = nodeTypes
e.Instances = instances
e.LogLevels = logLevels
if e.SkipRequest {
return nil
}
remained, startTime, endTime := e.extractTimeRange(ctx, schema, names, remained, "time", time.Local)
// The time unit for search log is millisecond.
startTime = startTime / int64(time.Millisecond)
endTime = endTime / int64(time.Millisecond)
e.StartTime = startTime
e.EndTime = endTime
if startTime != 0 && endTime != 0 {
e.SkipRequest = startTime > endTime
}
if e.SkipRequest {
return nil
}
remained, patterns := e.extractLikePatternCol(schema, names, remained, "message", false, true)
e.Patterns = patterns
return remained
}
func (e *ClusterLogTableExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
r := new(bytes.Buffer)
st, et := e.StartTime, e.EndTime
if st > 0 {
st := time.UnixMilli(st)
r.WriteString(fmt.Sprintf("start_time:%v, ", st.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format(MetricTableTimeFormat)))
}
if et > 0 {
et := time.UnixMilli(et)
r.WriteString(fmt.Sprintf("end_time:%v, ", et.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format(MetricTableTimeFormat)))
}
if len(e.NodeTypes) > 0 {
r.WriteString(fmt.Sprintf("node_types:[%s], ", extractStringFromStringSet(e.NodeTypes)))
}
if len(e.Instances) > 0 {
r.WriteString(fmt.Sprintf("instances:[%s], ", extractStringFromStringSet(e.Instances)))
}
if len(e.LogLevels) > 0 {
r.WriteString(fmt.Sprintf("log_levels:[%s], ", extractStringFromStringSet(e.LogLevels)))
}
// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}
const (
// HotRegionTypeRead hot read region.
HotRegionTypeRead = "read"
// HotRegionTypeWrite hot write region.
HotRegionTypeWrite = "write"
)
// HotRegionsHistoryTableExtractor is used to extract some predicates of `tidb_hot_regions_history`
type HotRegionsHistoryTableExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any pd server.
SkipRequest bool
// StartTime represents the beginning time of update time.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE update_time>'2019-10-10 10:10:10.999'
StartTime int64
// EndTime represents the ending time of update time.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE update_time<'2019-10-11 10:10:10.999'
EndTime int64
// RegionIDs/StoreIDs/PeerIDs represents all region/store/peer ids we should filter in PD to reduce network IO.
// e.g:
// 1. SELECT * FROM tidb_hot_regions_history WHERE region_id=1
// 2. SELECT * FROM tidb_hot_regions_history WHERE table_id in (11, 22)
// Leave range operation to above selection executor.
RegionIDs []uint64
StoreIDs []uint64
PeerIDs []uint64
// IsLearners/IsLeaders represents whether we should request for learner/leader role in PD to reduce network IO.
// e.g:
// 1. SELECT * FROM tidb_hot_regions_history WHERE is_learner=1
// 2. SELECT * FROM tidb_hot_regions_history WHERE is_learner in (0,1) -> request all
IsLearners []bool
IsLeaders []bool
// HotRegionTypes represents all hot region types we should filter in PD to reduce network IO.
// e.g:
// 1. SELECT * FROM tidb_hot_regions_history WHERE type='read'
// 2. SELECT * FROM tidb_hot_regions_history WHERE type in ('read', 'write')
// 3. SELECT * FROM tidb_hot_regions_history WHERE type='read' and type='write' -> SkipRequest = true
HotRegionTypes set.StringSet
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *HotRegionsHistoryTableExtractor) Extract(
ctx sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `region_id/store_id/peer_id` columns
remained, regionIDSkipRequest, regionIDs := e.extractCol(schema, names, predicates, "region_id", false)
remained, storeIDSkipRequest, storeIDs := e.extractCol(schema, names, remained, "store_id", false)
remained, peerIDSkipRequest, peerIDs := e.extractCol(schema, names, remained, "peer_id", false)
e.RegionIDs, e.StoreIDs, e.PeerIDs = e.parseUint64(regionIDs), e.parseUint64(storeIDs), e.parseUint64(peerIDs)
e.SkipRequest = regionIDSkipRequest || storeIDSkipRequest || peerIDSkipRequest
if e.SkipRequest {
return nil
}
// Extract the is_learner/is_leader columns
remained, isLearnerSkipRequest, isLearners := e.extractCol(schema, names, remained, "is_learner", false)
remained, isLeaderSkipRequest, isLeaders := e.extractCol(schema, names, remained, "is_leader", false)
isLearnersUint64, isLeadersUint64 := e.parseUint64(isLearners), e.parseUint64(isLeaders)
e.SkipRequest = isLearnerSkipRequest || isLeaderSkipRequest
if e.SkipRequest {
return nil
}
// uint64 slice to unique bool slice
e.IsLearners = e.convertToBoolSlice(isLearnersUint64)
e.IsLeaders = e.convertToBoolSlice(isLeadersUint64)
// Extract the `type` column
remained, typeSkipRequest, types := e.extractCol(schema, names, remained, "type", false)
e.HotRegionTypes = types
e.SkipRequest = typeSkipRequest
if e.SkipRequest {
return nil
}
// Divide read-write into two requests because of time range overlap,
// PD use [type,time] as key of hot regions.
if e.HotRegionTypes.Count() == 0 {
e.HotRegionTypes.Insert(HotRegionTypeRead)
e.HotRegionTypes.Insert(HotRegionTypeWrite)
}
remained, startTime, endTime := e.extractTimeRange(ctx, schema, names, remained, "update_time", ctx.GetSessionVars().StmtCtx.TimeZone)
// The time unit for search hot regions is millisecond
startTime = startTime / int64(time.Millisecond)
endTime = endTime / int64(time.Millisecond)
e.StartTime = startTime
e.EndTime = endTime
if startTime != 0 && endTime != 0 {
e.SkipRequest = startTime > endTime
}
if e.SkipRequest {
return nil
}
return remained
}
func (e *HotRegionsHistoryTableExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
r := new(bytes.Buffer)
st, et := e.StartTime, e.EndTime
if st > 0 {
st := time.UnixMilli(st)
r.WriteString(fmt.Sprintf("start_time:%v, ", st.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format("2006-01-02 15:04:05")))
}
if et > 0 {
et := time.UnixMilli(et)
r.WriteString(fmt.Sprintf("end_time:%v, ", et.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format("2006-01-02 15:04:05")))
}
if len(e.RegionIDs) > 0 {
r.WriteString(fmt.Sprintf("region_ids:[%s], ", extractStringFromUint64Slice(e.RegionIDs)))
}
if len(e.StoreIDs) > 0 {
r.WriteString(fmt.Sprintf("store_ids:[%s], ", extractStringFromUint64Slice(e.StoreIDs)))
}
if len(e.PeerIDs) > 0 {
r.WriteString(fmt.Sprintf("peer_ids:[%s], ", extractStringFromUint64Slice(e.PeerIDs)))
}
if len(e.IsLearners) > 0 {
r.WriteString(fmt.Sprintf("learner_roles:[%s], ", extractStringFromBoolSlice(e.IsLearners)))
}
if len(e.IsLeaders) > 0 {
r.WriteString(fmt.Sprintf("leader_roles:[%s], ", extractStringFromBoolSlice(e.IsLeaders)))
}
if len(e.HotRegionTypes) > 0 {
r.WriteString(fmt.Sprintf("hot_region_types:[%s], ", extractStringFromStringSet(e.HotRegionTypes)))
}
// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}
// MetricTableExtractor is used to extract some predicates of metrics_schema tables.
type MetricTableExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
// StartTime represents the beginning time of metric data.
StartTime time.Time
// EndTime represents the ending time of metric data.
EndTime time.Time
// LabelConditions represents the label conditions of metric data.
LabelConditions map[string]set.StringSet
Quantiles []float64
}
func newMetricTableExtractor() *MetricTableExtractor {
e := &MetricTableExtractor{}
e.StartTime, e.EndTime = e.getTimeRange(0, 0)
return e
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *MetricTableExtractor) Extract(
ctx sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `quantile` columns
remained, skipRequest, quantileSet := e.extractCol(schema, names, predicates, "quantile", true)
e.Quantiles = e.parseQuantiles(quantileSet)
e.SkipRequest = skipRequest
if e.SkipRequest {
return nil
}
// Extract the `time` columns
remained, startTime, endTime := e.extractTimeRange(ctx, schema, names, remained, "time", ctx.GetSessionVars().StmtCtx.TimeZone)
e.StartTime, e.EndTime = e.getTimeRange(startTime, endTime)
e.SkipRequest = e.StartTime.After(e.EndTime)
if e.SkipRequest {
return nil
}
excludeCols := set.NewStringSet("quantile", "time", "value")
_, skipRequest, extractCols := e.extractCols(schema, names, remained, excludeCols, false)
e.SkipRequest = skipRequest
if e.SkipRequest {
return nil
}
e.LabelConditions = extractCols
// For some metric, the metric reader can't use the predicate, so keep all label conditions remained.
return remained
}
func (e *MetricTableExtractor) getTimeRange(start, end int64) (time.Time, time.Time) {
const defaultMetricQueryDuration = 10 * time.Minute
var startTime, endTime time.Time
if start == 0 && end == 0 {
endTime = time.Now()
return endTime.Add(-defaultMetricQueryDuration), endTime
}
if start != 0 {
startTime = e.convertToTime(start)
}
if end != 0 {
endTime = e.convertToTime(end)
}
if start == 0 {
startTime = endTime.Add(-defaultMetricQueryDuration)
}
if end == 0 {
endTime = startTime.Add(defaultMetricQueryDuration)
}
return startTime, endTime
}
func (e *MetricTableExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
promQL := e.GetMetricTablePromQL(p.ctx, p.Table.Name.L)
startTime, endTime := e.StartTime, e.EndTime
step := time.Second * time.Duration(p.ctx.GetSessionVars().MetricSchemaStep)
return fmt.Sprintf("PromQL:%v, start_time:%v, end_time:%v, step:%v",
promQL,
startTime.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format(MetricTableTimeFormat),
endTime.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format(MetricTableTimeFormat),
step,
)
}
// GetMetricTablePromQL uses to get the promQL of metric table.
func (e *MetricTableExtractor) GetMetricTablePromQL(sctx sessionctx.Context, lowerTableName string) string {
quantiles := e.Quantiles
def, err := infoschema.GetMetricTableDef(lowerTableName)
if err != nil {
return ""
}
if len(quantiles) == 0 {
quantiles = []float64{def.Quantile}
}
var buf bytes.Buffer
for i, quantile := range quantiles {
promQL := def.GenPromQL(sctx, e.LabelConditions, quantile)
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(promQL)
}
return buf.String()
}
// MetricSummaryTableExtractor is used to extract some predicates of metrics_schema tables.
type MetricSummaryTableExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
MetricsNames set.StringSet
Quantiles []float64
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *MetricSummaryTableExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
remained, quantileSkip, quantiles := e.extractCol(schema, names, predicates, "quantile", false)
remained, metricsNameSkip, metricsNames := e.extractCol(schema, names, predicates, "metrics_name", true)
e.SkipRequest = quantileSkip || metricsNameSkip
e.Quantiles = e.parseQuantiles(quantiles)
e.MetricsNames = metricsNames
return remained
}
func (e *MetricSummaryTableExtractor) explainInfo(_ *PhysicalMemTable) string {
return ""
}
// InspectionResultTableExtractor is used to extract some predicates of `inspection_result`
type InspectionResultTableExtractor struct {
extractHelper
// SkipInspection means the where clause always false, we don't need to request any component
SkipInspection bool
// Rules represents rules applied to, and we should apply all inspection rules if there is no rules specified
// e.g: SELECT * FROM inspection_result WHERE rule in ('ddl', 'config')
Rules set.StringSet
// Items represents items applied to, and we should apply all inspection item if there is no rules specified
// e.g: SELECT * FROM inspection_result WHERE item in ('ddl.lease', 'raftstore.threadpool')
Items set.StringSet
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *InspectionResultTableExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
// Extract the `rule/item` columns
remained, ruleSkip, rules := e.extractCol(schema, names, predicates, "rule", true)
remained, itemSkip, items := e.extractCol(schema, names, remained, "item", true)
e.SkipInspection = ruleSkip || itemSkip
e.Rules = rules
e.Items = items
return remained
}
func (e *InspectionResultTableExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipInspection {
return "skip_inspection:true"
}
s := make([]string, 0, 2)
s = append(s, fmt.Sprintf("rules:[%s]", extractStringFromStringSet(e.Rules)))
s = append(s, fmt.Sprintf("items:[%s]", extractStringFromStringSet(e.Items)))
return strings.Join(s, ", ")
}
// InspectionSummaryTableExtractor is used to extract some predicates of `inspection_summary`
type InspectionSummaryTableExtractor struct {
extractHelper
// SkipInspection means the where clause always false, we don't need to request any component
SkipInspection bool
// Rules represents rules applied to, and we should apply all inspection rules if there is no rules specified
// e.g: SELECT * FROM inspection_summary WHERE rule in ('ddl', 'config')
Rules set.StringSet
MetricNames set.StringSet
Quantiles []float64
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *InspectionSummaryTableExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
// Extract the `rule` columns
_, ruleSkip, rules := e.extractCol(schema, names, predicates, "rule", true)
// Extract the `metric_name` columns
_, metricNameSkip, metricNames := e.extractCol(schema, names, predicates, "metrics_name", true)
// Extract the `quantile` columns
remained, quantileSkip, quantileSet := e.extractCol(schema, names, predicates, "quantile", false)
e.SkipInspection = ruleSkip || quantileSkip || metricNameSkip
e.Rules = rules
e.Quantiles = e.parseQuantiles(quantileSet)
e.MetricNames = metricNames
return remained
}
func (e *InspectionSummaryTableExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipInspection {
return "skip_inspection: true"
}
r := new(bytes.Buffer)
if len(e.Rules) > 0 {
r.WriteString(fmt.Sprintf("rules:[%s], ", extractStringFromStringSet(e.Rules)))
}
if len(e.MetricNames) > 0 {
r.WriteString(fmt.Sprintf("metric_names:[%s], ", extractStringFromStringSet(e.MetricNames)))
}
if len(e.Quantiles) > 0 {
r.WriteString("quantiles:[")
for i, quantile := range e.Quantiles {
if i > 0 {
r.WriteByte(',')
}
r.WriteString(fmt.Sprintf("%f", quantile))
}
r.WriteString("], ")
}
// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}
// InspectionRuleTableExtractor is used to extract some predicates of `inspection_rules`
type InspectionRuleTableExtractor struct {
extractHelper
SkipRequest bool
Types set.StringSet
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *InspectionRuleTableExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
// Extract the `type` columns
remained, tpSkip, tps := e.extractCol(schema, names, predicates, "type", true)
e.SkipRequest = tpSkip
e.Types = tps
return remained
}
func (e *InspectionRuleTableExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
r := new(bytes.Buffer)
if len(e.Types) > 0 {
r.WriteString(fmt.Sprintf("node_types:[%s]", extractStringFromStringSet(e.Types)))
}
return r.String()
}
// SlowQueryExtractor is used to extract some predicates of `slow_query`
type SlowQueryExtractor struct {
extractHelper
SkipRequest bool
TimeRanges []*TimeRange
// Enable is true means the executor should use the time range to locate the slow-log file that need to be parsed.
// Enable is false, means the executor should keep the behavior compatible with before, which is only parse the
// current slow-log file.
Enable bool
Desc bool
}
// TimeRange is used to check whether a given log should be extracted.
type TimeRange struct {
StartTime time.Time
EndTime time.Time
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *SlowQueryExtractor) Extract(
ctx sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
remained, startTime, endTime := e.extractTimeRange(ctx, schema, names, predicates, "time", ctx.GetSessionVars().StmtCtx.TimeZone)
e.setTimeRange(startTime, endTime)
e.SkipRequest = e.Enable && e.TimeRanges[0].StartTime.After(e.TimeRanges[0].EndTime)
if e.SkipRequest {
return nil
}
return remained
}
func (e *SlowQueryExtractor) setTimeRange(start, end int64) {
const defaultSlowQueryDuration = 24 * time.Hour
var startTime, endTime time.Time
if start == 0 && end == 0 {
return
}
if start != 0 {
startTime = e.convertToTime(start)
}
if end != 0 {
endTime = e.convertToTime(end)
}
if start == 0 {
startTime = endTime.Add(-defaultSlowQueryDuration)
}
if end == 0 {
endTime = startTime.Add(defaultSlowQueryDuration)
}
timeRange := &TimeRange{
StartTime: startTime,
EndTime: endTime,
}
e.TimeRanges = append(e.TimeRanges, timeRange)
e.Enable = true
}
func (e *SlowQueryExtractor) buildTimeRangeFromKeyRange(keyRanges []*coprocessor.KeyRange) error {
for _, kr := range keyRanges {
startTime, err := e.decodeBytesToTime(kr.Start)
if err != nil {
return err
}
endTime, err := e.decodeBytesToTime(kr.End)
if err != nil {
return err
}
e.setTimeRange(startTime, endTime)
}
return nil
}
func (e *SlowQueryExtractor) decodeBytesToTime(bs []byte) (int64, error) {
if len(bs) >= tablecodec.RecordRowKeyLen {
t, err := tablecodec.DecodeRowKey(bs)
if err != nil {
return 0, nil
}
return e.decodeToTime(t)
}
return 0, nil
}
func (e *SlowQueryExtractor) decodeToTime(handle kv.Handle) (int64, error) {
tp := types.NewFieldType(mysql.TypeDatetime)
col := rowcodec.ColInfo{ID: 0, Ft: tp}
chk := chunk.NewChunkWithCapacity([]*types.FieldType{tp}, 1)
coder := codec.NewDecoder(chk, nil)
_, err := coder.DecodeOne(handle.EncodedCol(0), 0, col.Ft)
if err != nil {
return 0, err
}
datum := chk.GetRow(0).GetDatum(0, tp)
mysqlTime := (&datum).GetMysqlTime()
timestampInNano := time.Date(mysqlTime.Year(),
time.Month(mysqlTime.Month()),
mysqlTime.Day(),
mysqlTime.Hour(),
mysqlTime.Minute(),
mysqlTime.Second(),
mysqlTime.Microsecond()*1000,
time.UTC,
).UnixNano()
return timestampInNano, err
}
// TableStorageStatsExtractor is used to extract some predicates of `disk_usage`.
type TableStorageStatsExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component.
SkipRequest bool
// TableSchema represents tableSchema applied to, and we should apply all table disk usage if there is no schema specified.
// e.g: SELECT * FROM information_schema.disk_usage WHERE table_schema in ('test', 'information_schema').
TableSchema set.StringSet
// TableName represents tableName applied to, and we should apply all table disk usage if there is no table specified.
// e.g: SELECT * FROM information_schema.disk_usage WHERE table in ('schemata', 'tables').
TableName set.StringSet
}
// Extract implements the MemTablePredicateExtractor Extract interface.
func (e *TableStorageStatsExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `table_schema` columns.
remained, schemaSkip, tableSchema := e.extractCol(schema, names, predicates, "table_schema", true)
// Extract the `table_name` columns.
remained, tableSkip, tableName := e.extractCol(schema, names, remained, "table_name", true)
e.SkipRequest = schemaSkip || tableSkip
if e.SkipRequest {
return nil
}
e.TableSchema = tableSchema
e.TableName = tableName
return remained
}
func (e *TableStorageStatsExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
r := new(bytes.Buffer)
if len(e.TableSchema) > 0 {
r.WriteString(fmt.Sprintf("schema:[%s]", extractStringFromStringSet(e.TableSchema)))
}
if r.Len() > 0 && len(e.TableName) > 0 {
r.WriteString(", ")
}
if len(e.TableName) > 0 {
r.WriteString(fmt.Sprintf("table:[%s]", extractStringFromStringSet(e.TableName)))
}
return r.String()
}
func (e *SlowQueryExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
if !e.Enable {
return fmt.Sprintf("only search in the current '%v' file", p.ctx.GetSessionVars().SlowQueryFile)
}
startTime := e.TimeRanges[0].StartTime.In(p.ctx.GetSessionVars().StmtCtx.TimeZone)
endTime := e.TimeRanges[0].EndTime.In(p.ctx.GetSessionVars().StmtCtx.TimeZone)
return fmt.Sprintf("start_time:%v, end_time:%v",
types.NewTime(types.FromGoTime(startTime), mysql.TypeDatetime, types.MaxFsp).String(),
types.NewTime(types.FromGoTime(endTime), mysql.TypeDatetime, types.MaxFsp).String())
}
// TiFlashSystemTableExtractor is used to extract some predicates of tiflash system table.
type TiFlashSystemTableExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
// TiFlashInstances represents all tiflash instances we should send request to.
// e.g:
// 1. SELECT * FROM information_schema.<table_name> WHERE tiflash_instance='192.168.1.7:3930'
// 2. SELECT * FROM information_schema.<table_name> WHERE tiflash_instance in ('192.168.1.7:3930', '192.168.1.9:3930')
TiFlashInstances set.StringSet
// TidbDatabases represents tidbDatabases applied to, and we should apply all tidb database if there is no database specified.
// e.g: SELECT * FROM information_schema.<table_name> WHERE tidb_database in ('test', 'test2').
TiDBDatabases string
// TidbTables represents tidbTables applied to, and we should apply all tidb table if there is no table specified.
// e.g: SELECT * FROM information_schema.<table_name> WHERE tidb_table in ('t', 't2').
TiDBTables string
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *TiFlashSystemTableExtractor) Extract(_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `tiflash_instance` columns.
remained, instanceSkip, tiflashInstances := e.extractCol(schema, names, predicates, "tiflash_instance", false)
// Extract the `tidb_database` columns.
remained, databaseSkip, tidbDatabases := e.extractCol(schema, names, remained, "tidb_database", true)
// Extract the `tidb_table` columns.
remained, tableSkip, tidbTables := e.extractCol(schema, names, remained, "tidb_table", true)
e.SkipRequest = instanceSkip || databaseSkip || tableSkip
if e.SkipRequest {
return nil
}
e.TiFlashInstances = tiflashInstances
e.TiDBDatabases = extractStringFromStringSet(tidbDatabases)
e.TiDBTables = extractStringFromStringSet(tidbTables)
return remained
}
func (e *TiFlashSystemTableExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request:true"
}
r := new(bytes.Buffer)
if len(e.TiFlashInstances) > 0 {
r.WriteString(fmt.Sprintf("tiflash_instances:[%s], ", extractStringFromStringSet(e.TiFlashInstances)))
}
if len(e.TiDBDatabases) > 0 {
r.WriteString(fmt.Sprintf("tidb_databases:[%s], ", e.TiDBDatabases))
}
if len(e.TiDBTables) > 0 {
r.WriteString(fmt.Sprintf("tidb_tables:[%s], ", e.TiDBTables))
}
// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}
// StatementsSummaryExtractor is used to extract some predicates of statements summary table.
type StatementsSummaryExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
// Digests represents digest applied to, and we should apply all digest if there is no digest specified.
// e.g: SELECT * FROM STATEMENTS_SUMMARY WHERE digest='8019af26debae8aa7642c501dbc43212417b3fb14e6aec779f709976b7e521be'
Digests set.StringSet
// Enable is true means the executor should use digest to locate statement summary.
// Enable is false, means the executor should keep the behavior compatible with before.
Enable bool
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *StatementsSummaryExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
// Extract the `digest` column
remained, skip, digests := e.extractCol(schema, names, predicates, "digest", false)
e.SkipRequest = skip
if e.SkipRequest {
return nil
}
if digests.Count() > 0 {
e.Enable = true
e.Digests = digests
}
return remained
}
func (e *StatementsSummaryExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
if !e.Enable {
return ""
}
return fmt.Sprintf("digests: [%s]", extractStringFromStringSet(e.Digests))
}
// TikvRegionPeersExtractor is used to extract some predicates of cluster table.
type TikvRegionPeersExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
// RegionIDs/StoreIDs represents all region/store ids we should filter in PD to reduce network IO.
// e.g:
// 1. SELECT * FROM tikv_region_peers WHERE region_id=1
// 2. SELECT * FROM tikv_region_peers WHERE table_id in (11, 22)
RegionIDs []uint64
StoreIDs []uint64
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *TikvRegionPeersExtractor) Extract(_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `region_id/store_id` columns.
remained, regionIDSkipRequest, regionIDs := e.extractCol(schema, names, predicates, "region_id", false)
remained, storeIDSkipRequest, storeIDs := e.extractCol(schema, names, remained, "store_id", false)
e.RegionIDs, e.StoreIDs = e.parseUint64(regionIDs), e.parseUint64(storeIDs)
e.SkipRequest = regionIDSkipRequest || storeIDSkipRequest
if e.SkipRequest {
return nil
}
return remained
}
func (e *TikvRegionPeersExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request:true"
}
r := new(bytes.Buffer)
if len(e.RegionIDs) > 0 {
r.WriteString(fmt.Sprintf("region_ids:[%s], ", extractStringFromUint64Slice(e.RegionIDs)))
}
if len(e.StoreIDs) > 0 {
r.WriteString(fmt.Sprintf("store_ids:[%s], ", extractStringFromUint64Slice(e.StoreIDs)))
}
// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}
// ColumnsTableExtractor is used to extract some predicates of columns table.
type ColumnsTableExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
TableSchema set.StringSet
TableName set.StringSet
// ColumnName represents all column name we should filter in memtable.
ColumnName set.StringSet
TableSchemaPatterns []string
TableNamePatterns []string
ColumnNamePatterns []string
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *ColumnsTableExtractor) Extract(_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
remained, tableSchemaSkipRequest, tableSchema := e.extractCol(schema, names, predicates, "table_schema", true)
remained, tableNameSkipRequest, tableName := e.extractCol(schema, names, remained, "table_name", true)
remained, columnNameSkipRequest, columnName := e.extractCol(schema, names, remained, "column_name", true)
e.SkipRequest = columnNameSkipRequest || tableSchemaSkipRequest || tableNameSkipRequest
if e.SkipRequest {
return
}
remained, tableSchemaPatterns := e.extractLikePatternCol(schema, names, remained, "table_schema", true, false)
remained, tableNamePatterns := e.extractLikePatternCol(schema, names, remained, "table_name", true, false)
remained, columnNamePatterns := e.extractLikePatternCol(schema, names, remained, "column_name", true, false)
e.ColumnName = columnName
e.TableName = tableName
e.TableSchema = tableSchema
e.TableSchemaPatterns = tableSchemaPatterns
e.TableNamePatterns = tableNamePatterns
e.ColumnNamePatterns = columnNamePatterns
return remained
}
func (e *ColumnsTableExtractor) explainInfo(_ *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request:true"
}
r := new(bytes.Buffer)
if len(e.TableSchema) > 0 {
r.WriteString(fmt.Sprintf("table_schema:[%s], ", extractStringFromStringSet(e.TableSchema)))
}
if len(e.TableName) > 0 {
r.WriteString(fmt.Sprintf("table_name:[%s], ", extractStringFromStringSet(e.TableName)))
}
if len(e.ColumnName) > 0 {
r.WriteString(fmt.Sprintf("column_name:[%s], ", extractStringFromStringSet(e.ColumnName)))
}
if len(e.TableSchemaPatterns) > 0 {
r.WriteString(fmt.Sprintf("table_schema_pattern:[%s], ", extractStringFromStringSlice(e.TableSchemaPatterns)))
}
if len(e.TableNamePatterns) > 0 {
r.WriteString(fmt.Sprintf("table_name_pattern:[%s], ", extractStringFromStringSlice(e.TableNamePatterns)))
}
if len(e.ColumnNamePatterns) > 0 {
r.WriteString(fmt.Sprintf("column_name_pattern:[%s], ", extractStringFromStringSlice(e.ColumnNamePatterns)))
}
// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}
// TiKVRegionStatusExtractor is used to extract single table region scan region from predictions
type TiKVRegionStatusExtractor struct {
extractHelper
tablesID []int64
}
// Extract implements the MemTablePredicateExtractor Extract interface
func (e *TiKVRegionStatusExtractor) Extract(_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
remained, _, tableIDSet := e.extractCol(schema, names, predicates, "table_id", true)
if tableIDSet.Count() < 1 {
return predicates
}
var tableID int64
var err error
for key := range tableIDSet {
tableID, err = strconv.ParseInt(key, 10, 64)
if err != nil {
logutil.BgLogger().Error("extract table_id failed", zap.Error(err), zap.String("tableID", key))
e.tablesID = nil
return predicates
}
e.tablesID = append(e.tablesID, tableID)
}
return remained
}
func (e *TiKVRegionStatusExtractor) explainInfo(_ *PhysicalMemTable) string {
r := new(bytes.Buffer)
if len(e.tablesID) > 0 {
r.WriteString("table_id in {")
for i, tableID := range e.tablesID {
if i > 0 {
r.WriteString(",")
}
r.WriteString(fmt.Sprintf("%v", tableID))
}
r.WriteString("}")
}
return r.String()
}
// GetTablesID returns TablesID
func (e *TiKVRegionStatusExtractor) GetTablesID() []int64 {
return e.tablesID
}
相关信息
相关文章
tidb collect_column_stats_usage 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦