tidb batch_coprocessor 源码
tidb batch_coprocessor 代码
文件路径:/store/copr/batch_coprocessor.go
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package copr
import (
"bytes"
"context"
"fmt"
"io"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
// batchCopTask comprises of multiple copTask that will send to same store.
type batchCopTask struct {
storeAddr string
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext
regionInfos []RegionInfo // region info for single physical table
// PartitionTableRegions indicates region infos for each partition table, used by scanning partitions in batch.
// Thus, one of `regionInfos` and `PartitionTableRegions` must be nil.
PartitionTableRegions []*coprocessor.TableRegions
}
type batchCopResponse struct {
pbResp *coprocessor.BatchResponse
detail *CopRuntimeStats
// batch Cop Response is yet to return startKey. So batchCop cannot retry partially.
startKey kv.Key
err error
respSize int64
respTime time.Duration
}
// GetData implements the kv.ResultSubset GetData interface.
func (rs *batchCopResponse) GetData() []byte {
return rs.pbResp.Data
}
// GetStartKey implements the kv.ResultSubset GetStartKey interface.
func (rs *batchCopResponse) GetStartKey() kv.Key {
return rs.startKey
}
// GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop.
// TODO: Will fix in near future.
func (rs *batchCopResponse) GetCopRuntimeStats() *CopRuntimeStats {
return rs.detail
}
// MemSize returns how many bytes of memory this response use
func (rs *batchCopResponse) MemSize() int64 {
if rs.respSize != 0 {
return rs.respSize
}
// ignore rs.err
rs.respSize += int64(cap(rs.startKey))
if rs.detail != nil {
rs.respSize += int64(sizeofExecDetails)
}
if rs.pbResp != nil {
// Using a approximate size since it's hard to get a accurate value.
rs.respSize += int64(rs.pbResp.Size())
}
return rs.respSize
}
func (rs *batchCopResponse) RespTime() time.Duration {
return rs.respTime
}
func deepCopyStoreTaskMap(storeTaskMap map[uint64]*batchCopTask) map[uint64]*batchCopTask {
storeTasks := make(map[uint64]*batchCopTask)
for storeID, task := range storeTaskMap {
t := batchCopTask{
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
}
t.regionInfos = make([]RegionInfo, len(task.regionInfos))
copy(t.regionInfos, task.regionInfos)
storeTasks[storeID] = &t
}
return storeTasks
}
func regionTotalCount(storeTasks map[uint64]*batchCopTask, candidateRegionInfos []RegionInfo) int {
count := len(candidateRegionInfos)
for _, task := range storeTasks {
count += len(task.regionInfos)
}
return count
}
const (
maxBalanceScore = 100
balanceScoreThreshold = 85
)
// Select at most cnt RegionInfos from candidateRegionInfos that belong to storeID.
// If selected[i] is true, candidateRegionInfos[i] has been selected and should be skip.
// storeID2RegionIndex is a map that key is storeID and value is a region index slice.
// selectRegion use storeID2RegionIndex to find RegionInfos that belong to storeID efficiently.
func selectRegion(storeID uint64, candidateRegionInfos []RegionInfo, selected []bool, storeID2RegionIndex map[uint64][]int, cnt int64) []RegionInfo {
regionIndexes, ok := storeID2RegionIndex[storeID]
if !ok {
logutil.BgLogger().Error("selectRegion: storeID2RegionIndex not found", zap.Uint64("storeID", storeID))
return nil
}
var regionInfos []RegionInfo
i := 0
for ; i < len(regionIndexes) && len(regionInfos) < int(cnt); i++ {
idx := regionIndexes[i]
if selected[idx] {
continue
}
selected[idx] = true
regionInfos = append(regionInfos, candidateRegionInfos[idx])
}
// Remove regions that has beed selected.
storeID2RegionIndex[storeID] = regionIndexes[i:]
return regionInfos
}
// Higher scores mean more balance: (100 - unblance percentage)
func balanceScore(maxRegionCount, minRegionCount int, balanceContinuousRegionCount int64) int {
if minRegionCount <= 0 {
return math.MinInt32
}
unbalanceCount := maxRegionCount - minRegionCount
if unbalanceCount <= int(balanceContinuousRegionCount) {
return maxBalanceScore
}
return maxBalanceScore - unbalanceCount*100/minRegionCount
}
func isBalance(score int) bool {
return score >= balanceScoreThreshold
}
func checkBatchCopTaskBalance(storeTasks map[uint64]*batchCopTask, balanceContinuousRegionCount int64) (int, []string) {
if len(storeTasks) == 0 {
return 0, []string{}
}
maxRegionCount := 0
minRegionCount := math.MaxInt32
balanceInfos := []string{}
for storeID, task := range storeTasks {
cnt := len(task.regionInfos)
if cnt > maxRegionCount {
maxRegionCount = cnt
}
if cnt < minRegionCount {
minRegionCount = cnt
}
balanceInfos = append(balanceInfos, fmt.Sprintf("storeID %d storeAddr %s regionCount %d", storeID, task.storeAddr, cnt))
}
return balanceScore(maxRegionCount, minRegionCount, balanceContinuousRegionCount), balanceInfos
}
// balanceBatchCopTaskWithContinuity try to balance `continuous regions` between TiFlash Stores.
// In fact, not absolutely continuous is required, regions' range are closed to store in a TiFlash segment is enough for internal read optimization.
//
// First, sort candidateRegionInfos by their key ranges.
// Second, build a storeID2RegionIndex data structure to fastly locate regions of a store (avoid scanning candidateRegionInfos repeatly).
// Third, each store will take balanceContinuousRegionCount from the sorted candidateRegionInfos. These regions are stored very close to each other in TiFlash.
// Fourth, if the region count is not balance between TiFlash, it may fallback to the original balance logic.
func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, candidateRegionInfos []RegionInfo, balanceContinuousRegionCount int64) ([]*batchCopTask, int) {
if len(candidateRegionInfos) < 500 {
return nil, 0
}
funcStart := time.Now()
regionCount := regionTotalCount(storeTaskMap, candidateRegionInfos)
storeTasks := deepCopyStoreTaskMap(storeTaskMap)
// Sort regions by their key ranges.
slices.SortFunc(candidateRegionInfos, func(i, j RegionInfo) bool {
// Special case: Sort empty ranges to the end.
if i.Ranges.Len() < 1 || j.Ranges.Len() < 1 {
return i.Ranges.Len() > j.Ranges.Len()
}
// StartKey0 < StartKey1
return bytes.Compare(i.Ranges.At(0).StartKey, j.Ranges.At(0).StartKey) == -1
})
balanceStart := time.Now()
// Build storeID -> region index slice index and we can fastly locate regions of a store.
storeID2RegionIndex := make(map[uint64][]int)
for i, ri := range candidateRegionInfos {
for _, storeID := range ri.AllStores {
if val, ok := storeID2RegionIndex[storeID]; ok {
storeID2RegionIndex[storeID] = append(val, i)
} else {
storeID2RegionIndex[storeID] = []int{i}
}
}
}
// If selected[i] is true, candidateRegionInfos[i] is selected by a store and should skip it in selectRegion.
selected := make([]bool, len(candidateRegionInfos))
for {
totalCount := 0
selectCountThisRound := 0
for storeID, task := range storeTasks {
// Each store select balanceContinuousRegionCount regions from candidateRegionInfos.
// Since candidateRegionInfos is sorted, it is very likely that these regions are close to each other in TiFlash.
regionInfo := selectRegion(storeID, candidateRegionInfos, selected, storeID2RegionIndex, balanceContinuousRegionCount)
task.regionInfos = append(task.regionInfos, regionInfo...)
totalCount += len(task.regionInfos)
selectCountThisRound += len(regionInfo)
}
if totalCount >= regionCount {
break
}
if selectCountThisRound == 0 {
logutil.BgLogger().Error("selectCandidateRegionInfos fail: some region cannot find relevant store.", zap.Int("regionCount", regionCount), zap.Int("candidateCount", len(candidateRegionInfos)))
return nil, 0
}
}
balanceEnd := time.Now()
score, balanceInfos := checkBatchCopTaskBalance(storeTasks, balanceContinuousRegionCount)
if !isBalance(score) {
logutil.BgLogger().Warn("balanceBatchCopTaskWithContinuity is not balance", zap.Int("score", score), zap.Strings("balanceInfos", balanceInfos))
}
totalCount := 0
var res []*batchCopTask
for _, task := range storeTasks {
totalCount += len(task.regionInfos)
if len(task.regionInfos) > 0 {
res = append(res, task)
}
}
if totalCount != regionCount {
logutil.BgLogger().Error("balanceBatchCopTaskWithContinuity error", zap.Int("totalCount", totalCount), zap.Int("regionCount", regionCount))
return nil, 0
}
logutil.BgLogger().Debug("balanceBatchCopTaskWithContinuity time",
zap.Int("candidateRegionCount", len(candidateRegionInfos)),
zap.Int64("balanceContinuousRegionCount", balanceContinuousRegionCount),
zap.Int("balanceScore", score),
zap.Duration("balanceTime", balanceEnd.Sub(balanceStart)),
zap.Duration("totalTime", time.Since(funcStart)))
return res, score
}
// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store because some
// meta data(like the rpc context) in batchCopTask is related to it
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, these region will be balance between TiFlash stores.
//
// Currently, there are two balance strategies.
// The first balance strategy: use a greedy algorithm to put it into the store with highest weight. This strategy only consider the region count between TiFlash stores.
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
return originalTasks
}
cache := kvStore.GetRegionCache()
storeTaskMap := make(map[uint64]*batchCopTask)
// storeCandidateRegionMap stores all the possible store->region map. Its content is
// store id -> region signature -> region info. We can see it as store id -> region lists.
storeCandidateRegionMap := make(map[uint64]map[string]RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0
if !isMPP {
for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
batchTask := &batchCopTask{
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
}
} else {
logutil.BgLogger().Info("detecting available mpp stores")
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores()
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
cur := time.Now()
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]
var last time.Time
var ok bool
mu.Lock()
if last, ok = mppStoreLastFailTime[s.GetAddr()]; ok && cur.Sub(last) < 100*time.Millisecond {
// The interval time is so short that may happen in a same query, so we needn't to check again.
mu.Unlock()
return
}
mu.Unlock()
resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{
Type: tikvrpc.CmdMPPAlive,
StoreTp: tikvrpc.TiFlash,
Req: &mpp.IsAliveRequest{},
Context: kvrpcpb.Context{},
}, 2*time.Second)
if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available {
errMsg := "store not ready to serve"
if err != nil {
errMsg = err.Error()
}
logutil.BgLogger().Warn("Store is not ready", zap.String("store address", s.GetAddr()), zap.String("err message", errMsg))
mu.Lock()
mppStoreLastFailTime[s.GetAddr()] = time.Now()
mu.Unlock()
return
}
if cur.Sub(last) < ttl {
logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", last))
return
}
mu.Lock()
defer mu.Unlock()
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s},
}
}(i)
}
wg.Wait()
}
var candidateRegionInfos []RegionInfo
for _, task := range originalTasks {
for index, ri := range task.regionInfos {
// for each region, figure out the valid store num
validStoreNum := 0
if index == 0 && !isMPP {
continue
}
var validStoreID uint64
for _, storeID := range ri.AllStores {
if _, ok := storeTaskMap[storeID]; ok {
validStoreNum++
// original store id might be invalid, so we have to set it again.
validStoreID = storeID
}
}
if validStoreNum == 0 {
logutil.BgLogger().Warn("Meet regions that don't have an available store. Give up balancing")
return originalTasks
} else if validStoreNum == 1 {
// if only one store is valid, just put it to storeTaskMap
storeTaskMap[validStoreID].regionInfos = append(storeTaskMap[validStoreID].regionInfos, ri)
} else {
// if more than one store is valid, put the region
// to store candidate map
totalRegionCandidateNum += validStoreNum
totalRemainingRegionNum++
candidateRegionInfos = append(candidateRegionInfos, ri)
taskKey := ri.Region.String()
for _, storeID := range ri.AllStores {
if _, validStore := storeTaskMap[storeID]; !validStore {
continue
}
if _, ok := storeCandidateRegionMap[storeID]; !ok {
candidateMap := make(map[string]RegionInfo)
storeCandidateRegionMap[storeID] = candidateMap
}
if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion {
// duplicated region, should not happen, just give up balance
logutil.BgLogger().Warn("Meet duplicated region info during when trying to balance batch cop task, give up balancing")
return originalTasks
}
storeCandidateRegionMap[storeID][taskKey] = ri
}
}
}
}
// If balanceBatchCopTaskWithContinuity failed (not balance or return nil), it will fallback to the original balance logic.
// So storeTaskMap should not be modify.
var contiguousTasks []*batchCopTask = nil
contiguousBalanceScore := 0
if balanceWithContinuity {
contiguousTasks, contiguousBalanceScore = balanceBatchCopTaskWithContinuity(storeTaskMap, candidateRegionInfos, balanceContinuousRegionCount)
if isBalance(contiguousBalanceScore) && contiguousTasks != nil {
return contiguousTasks
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
return store
}
store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}
}
if contiguousTasks != nil {
score, balanceInfos := checkBatchCopTaskBalance(storeTaskMap, balanceContinuousRegionCount)
if !isBalance(score) {
logutil.BgLogger().Warn("Region count is not balance and use contiguousTasks", zap.Int("contiguousBalanceScore", contiguousBalanceScore), zap.Int("score", score), zap.Strings("balanceInfos", balanceInfos))
return contiguousTasks
}
}
var ret []*batchCopTask
for _, task := range storeTaskMap {
if len(task.regionInfos) > 0 {
ret = append(ret, task)
}
}
return ret
}
func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64, partitionIDs []int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
if err != nil {
return nil, err
}
// generate tableRegions for batchCopTasks
convertRegionInfosToPartitionTableRegions(batchTasks, partitionIDs)
return batchTasks, nil
}
// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan.
// At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`.
// Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table.
func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
cache := store.GetRegionCache()
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
rangesLen := 0
for {
var tasks []*copTask
rangesLen = 0
for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
for _, lo := range locations {
tasks = append(tasks, &copTask{
region: lo.Location.Region,
ranges: lo.Ranges,
cmdType: cmdType,
storeType: storeType,
partitionIndex: int64(i),
})
}
}
var batchTasks []*batchCopTask
storeTaskMap := make(map[string]*batchCopTask)
needRetry := false
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, false)
if err != nil {
return nil, errors.Trace(err)
}
// When rpcCtx is nil, it's not only attributed to the miss region, but also
// some TiFlash stores crash and can't be recovered.
// That is not an error that can be easily recovered, so we regard this error
// same as rpc error.
if rpcCtx == nil {
needRetry = true
logutil.BgLogger().Info("retry for TiFlash peer with region missing", zap.Uint64("region id", task.region.GetID()))
// Probably all the regions are invalid. Make the loop continue and mark all the regions invalid.
// Then `splitRegion` will reloads these regions.
continue
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
}
if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
for _, task := range storeTaskMap {
batchTasks = append(batchTasks, task)
}
if log.GetLevel() <= zap.DebugLevel {
msg := "Before region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}
balanceStart := time.Now()
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}
if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasksCore takes too much time",
zap.Duration("elapsed", elapsed),
zap.Duration("balanceElapsed", balanceElapsed),
zap.Int("range len", rangesLen),
zap.Int("task len", len(batchTasks)))
}
metrics.TxnRegionsNumHistogramWithBatchCoprocessor.Observe(float64(len(batchTasks)))
return batchTasks, nil
}
}
func convertRegionInfosToPartitionTableRegions(batchTasks []*batchCopTask, partitionIDs []int64) {
for _, copTask := range batchTasks {
tableRegions := make([]*coprocessor.TableRegions, len(partitionIDs))
// init coprocessor.TableRegions
for j, pid := range partitionIDs {
tableRegions[j] = &coprocessor.TableRegions{
PhysicalTableId: pid,
}
}
// fill region infos
for _, ri := range copTask.regionInfos {
tableRegions[ri.PartitionIndex].Regions = append(tableRegions[ri.PartitionIndex].Regions,
ri.toCoprocessorRegionInfo())
}
count := 0
// clear empty table region
for j := 0; j < len(tableRegions); j++ {
if len(tableRegions[j].Regions) != 0 {
tableRegions[count] = tableRegions[j]
count++
}
}
copTask.PartitionTableRegions = tableRegions[:count]
copTask.regionInfos = nil
}
}
func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.Variables, option *kv.ClientSendOption) kv.Response {
if req.KeepOrder || req.Desc {
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
var tasks []*batchCopTask
var err error
if req.PartitionIDAndRanges != nil {
// For Partition Table Scan
keyRanges := make([]*KeyRanges, 0, len(req.PartitionIDAndRanges))
partitionIDs := make([]int64, 0, len(req.PartitionIDAndRanges))
for _, pi := range req.PartitionIDAndRanges {
keyRanges = append(keyRanges, NewKeyRanges(pi.KeyRanges))
partitionIDs = append(partitionIDs, pi.ID)
}
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs)
} else {
ranges := NewKeyRanges(req.KeyRanges)
tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0)
}
if err != nil {
return copErrorResponse{err}
}
it := &batchCopIterator{
store: c.store.kvStore,
req: req,
finishCh: make(chan struct{}),
vars: vars,
rpcCancel: tikv.NewRPCanceller(),
enableCollectExecutionInfo: option.EnableCollectExecutionInfo,
}
ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, it.rpcCancel)
it.tasks = tasks
it.respChan = make(chan *batchCopResponse, 2048)
go it.run(ctx)
return it
}
type batchCopIterator struct {
store *kvStore
req *kv.Request
finishCh chan struct{}
tasks []*batchCopTask
// Batch results are stored in respChan.
respChan chan *batchCopResponse
vars *tikv.Variables
rpcCancel *tikv.RPCCanceller
wg sync.WaitGroup
// closed represents when the Close is called.
// There are two cases we need to close the `finishCh` channel, one is when context is done, the other one is
// when the Close is called. we use atomic.CompareAndSwap `closed` to to make sure the channel is not closed twice.
closed uint32
enableCollectExecutionInfo bool
}
func (b *batchCopIterator) run(ctx context.Context) {
// We run workers for every batch cop.
for _, task := range b.tasks {
b.wg.Add(1)
boMaxSleep := copNextMaxBackoff
failpoint.Inject("ReduceCopNextMaxBackoff", func(value failpoint.Value) {
if value.(bool) {
boMaxSleep = 2
}
})
bo := backoff.NewBackofferWithVars(ctx, boMaxSleep, b.vars)
go b.handleTask(ctx, bo, task)
}
b.wg.Wait()
close(b.respChan)
}
// Next returns next coprocessor result.
// NOTE: Use nil to indicate finish, so if the returned ResultSubset is not nil, reader should continue to call Next().
func (b *batchCopIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
var (
resp *batchCopResponse
ok bool
closed bool
)
// Get next fetched resp from chan
resp, ok, closed = b.recvFromRespCh(ctx)
if !ok || closed {
return nil, nil
}
if resp.err != nil {
return nil, errors.Trace(resp.err)
}
err := b.store.CheckVisibility(b.req.StartTs)
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
}
func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopResponse, ok bool, exit bool) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case resp, ok = <-b.respChan:
return
case <-ticker.C:
if atomic.LoadUint32(b.vars.Killed) == 1 {
resp = &batchCopResponse{err: derr.ErrQueryInterrupted}
ok = true
return
}
case <-b.finishCh:
exit = true
return
case <-ctx.Done():
// We select the ctx.Done() in the thread of `Next` instead of in the worker to avoid the cost of `WithCancel`.
if atomic.CompareAndSwapUint32(&b.closed, 0, 1) {
close(b.finishCh)
}
exit = true
return
}
}
}
// Close releases the resource.
func (b *batchCopIterator) Close() error {
if atomic.CompareAndSwapUint32(&b.closed, 0, 1) {
close(b.finishCh)
}
b.rpcCancel.CancelAll()
b.wg.Wait()
return nil
}
func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *batchCopTask) {
tasks := []*batchCopTask{task}
for idx := 0; idx < len(tasks); idx++ {
ret, err := b.handleTaskOnce(ctx, bo, tasks[idx])
if err != nil {
resp := &batchCopResponse{err: errors.Trace(err), detail: new(CopRuntimeStats)}
b.sendToRespCh(resp)
break
}
tasks = append(tasks, ret...)
}
b.wg.Done()
}
// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
if batchTask.regionInfos != nil {
var ranges []kv.KeyRange
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
}
ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0, false, 0)
return ret, err
}
// Retry Partition Table Scan
keyRanges := make([]*KeyRanges, 0, len(batchTask.PartitionTableRegions))
pid := make([]int64, 0, len(batchTask.PartitionTableRegions))
for _, trs := range batchTask.PartitionTableRegions {
pid = append(pid, trs.PhysicalTableId)
ranges := make([]kv.KeyRange, 0, len(trs.Regions))
for _, ri := range trs.Regions {
for _, ran := range ri.Ranges {
ranges = append(ranges, kv.KeyRange{
StartKey: ran.Start,
EndKey: ran.End,
})
}
}
keyRanges = append(keyRanges, NewKeyRanges(ranges))
}
ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, nil, 0, false, 0, pid)
return ret, err
}
const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.
func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo)
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo())
}
copReq := coprocessor.BatchRequest{
Tp: b.req.Tp,
StartTs: b.req.StartTs,
Data: b.req.Data,
SchemaVer: b.req.SchemaVar,
Regions: regionInfos,
TableRegions: task.PartitionTableRegions,
}
req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(b.req.IsolationLevel),
Priority: priorityToPB(b.req.Priority),
NotFillCache: b.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
TaskId: b.req.TaskID,
})
if b.req.ResourceGroupTagger != nil {
b.req.ResourceGroupTagger(req)
}
req.StoreTp = tikvrpc.TiFlash
logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo, task.ctx, task.regionInfos, req, readTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
}
if err != nil {
err = derr.ToTiDBErr(err)
return nil, errors.Trace(err)
}
defer cancel()
return nil, b.handleStreamedBatchCopResponse(ctx, bo, resp.Resp.(*tikvrpc.BatchCopStreamResponse), task)
}
func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *Backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) {
defer response.Close()
resp := response.BatchResponse
if resp == nil {
// streaming request returns io.EOF, so the first Response is nil.
return
}
for {
err = b.handleBatchCopResponse(bo, resp, task)
if err != nil {
return errors.Trace(err)
}
resp, err = response.Recv()
if err != nil {
if errors.Cause(err) == io.EOF {
return nil
}
if err1 := bo.Backoff(tikv.BoTiKVRPC(), errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}
// No coprocessor.Response for network error, rebuild task based on the last success one.
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
return derr.ErrTiFlashServerTimeout
}
}
}
func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) {
if otherErr := response.GetOtherError(); otherErr != "" {
err = errors.Errorf("other error: %s", otherErr)
logutil.BgLogger().Warn("other error",
zap.Uint64("txnStartTS", b.req.StartTs),
zap.String("storeAddr", task.storeAddr),
zap.Error(err))
return errors.Trace(err)
}
if len(response.RetryRegions) > 0 {
logutil.BgLogger().Info("multiple regions are stale and need to be refreshed", zap.Int("region size", len(response.RetryRegions)))
for idx, retry := range response.RetryRegions {
id := tikv.NewRegionVerID(retry.Id, retry.RegionEpoch.ConfVer, retry.RegionEpoch.Version)
logutil.BgLogger().Info("invalid region because tiflash detected stale region", zap.String("region id", id.String()))
b.store.GetRegionCache().InvalidateCachedRegionWithReason(id, tikv.EpochNotMatch)
if idx >= 10 {
logutil.BgLogger().Info("stale regions are too many, so we omit the rest ones")
break
}
}
return
}
resp := &batchCopResponse{
pbResp: response,
detail: new(CopRuntimeStats),
}
b.handleCollectExecutionInfo(bo, resp, task)
b.sendToRespCh(resp)
return
}
func (b *batchCopIterator) sendToRespCh(resp *batchCopResponse) (exit bool) {
select {
case b.respChan <- resp:
case <-b.finishCh:
exit = true
}
return
}
func (b *batchCopIterator) handleCollectExecutionInfo(bo *Backoffer, resp *batchCopResponse, task *batchCopTask) {
if !b.enableCollectExecutionInfo {
return
}
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
resp.detail.CalleeAddress = task.storeAddr
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦