tidb parallel_apply 源码
tidb parallel_apply 代码
文件路径:/executor/parallel_apply.go
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"runtime/trace"
"sync"
"sync/atomic"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)
type result struct {
chk *chunk.Chunk
err error
}
type outerRow struct {
row *chunk.Row
selected bool // if this row is selected by the outer side
}
// ParallelNestedLoopApplyExec is the executor for apply.
type ParallelNestedLoopApplyExec struct {
baseExecutor
// outer-side fields
outerExec Executor
outerFilter expression.CNFExprs
outerList *chunk.List
outer bool
// inner-side fields
// use slices since the inner side is paralleled
corCols [][]*expression.CorrelatedColumn
innerFilter []expression.CNFExprs
innerExecs []Executor
innerList []*chunk.List
innerChunk []*chunk.Chunk
innerSelected [][]bool
innerIter []chunk.Iterator
outerRow []*chunk.Row
hasMatch []bool
hasNull []bool
joiners []joiner
// fields about concurrency control
concurrency int
started uint32
drained uint32 // drained == true indicates there is no more data
freeChkCh chan *chunk.Chunk
resultChkCh chan result
outerRowCh chan outerRow
exit chan struct{}
workerWg sync.WaitGroup
notifyWg sync.WaitGroup
// fields about cache
cache *applyCache
useCache bool
cacheHitCounter int64
cacheAccessCounter int64
memTracker *memory.Tracker // track memory usage.
}
// Open implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Open(ctx context.Context) error {
err := e.outerExec.Open(ctx)
if err != nil {
return err
}
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.outerList = chunk.NewList(retTypes(e.outerExec), e.initCap, e.maxChunkSize)
e.outerList.GetMemTracker().SetLabel(memory.LabelForOuterList)
e.outerList.GetMemTracker().AttachTo(e.memTracker)
e.innerList = make([]*chunk.List, e.concurrency)
e.innerChunk = make([]*chunk.Chunk, e.concurrency)
e.innerSelected = make([][]bool, e.concurrency)
e.innerIter = make([]chunk.Iterator, e.concurrency)
e.outerRow = make([]*chunk.Row, e.concurrency)
e.hasMatch = make([]bool, e.concurrency)
e.hasNull = make([]bool, e.concurrency)
for i := 0; i < e.concurrency; i++ {
e.innerChunk[i] = newFirstChunk(e.innerExecs[i])
e.innerList[i] = chunk.NewList(retTypes(e.innerExecs[i]), e.initCap, e.maxChunkSize)
e.innerList[i].GetMemTracker().SetLabel(memory.LabelForInnerList)
e.innerList[i].GetMemTracker().AttachTo(e.memTracker)
}
e.freeChkCh = make(chan *chunk.Chunk, e.concurrency)
e.resultChkCh = make(chan result, e.concurrency+1) // innerWorkers + outerWorker
e.outerRowCh = make(chan outerRow)
e.exit = make(chan struct{})
for i := 0; i < e.concurrency; i++ {
e.freeChkCh <- newFirstChunk(e)
}
if e.useCache {
if e.cache, err = newApplyCache(e.ctx); err != nil {
return err
}
e.cache.GetMemTracker().AttachTo(e.memTracker)
}
return nil
}
// Next implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if atomic.LoadUint32(&e.drained) == 1 {
req.Reset()
return nil
}
if atomic.CompareAndSwapUint32(&e.started, 0, 1) {
e.workerWg.Add(1)
go e.outerWorker(ctx)
for i := 0; i < e.concurrency; i++ {
e.workerWg.Add(1)
workID := i
go e.innerWorker(ctx, workID)
}
e.notifyWg.Add(1)
go e.notifyWorker(ctx)
}
result := <-e.resultChkCh
if result.err != nil {
return result.err
}
if result.chk == nil { // no more data
req.Reset()
atomic.StoreUint32(&e.drained, 1)
return nil
}
req.SwapColumns(result.chk)
e.freeChkCh <- result.chk
return nil
}
// Close implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Close() error {
e.memTracker = nil
if atomic.LoadUint32(&e.started) == 1 {
close(e.exit)
e.notifyWg.Wait()
e.started = 0
}
// Wait all workers to finish before Close() is called.
// Otherwise we may got data race.
err := e.outerExec.Close()
if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
if e.useCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
hitRatio = float64(e.cacheHitCounter) / float64(e.cacheAccessCounter)
}
runtimeStats.setCacheInfo(true, hitRatio)
} else {
runtimeStats.setCacheInfo(false, 0)
}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", e.concurrency))
}
return err
}
// notifyWorker waits for all inner/outer-workers finishing and then put an empty
// chunk into the resultCh to notify the upper executor there is no more data.
func (e *ParallelNestedLoopApplyExec) notifyWorker(ctx context.Context) {
defer e.handleWorkerPanic(ctx, &e.notifyWg)
e.workerWg.Wait()
e.putResult(nil, nil)
}
func (e *ParallelNestedLoopApplyExec) outerWorker(ctx context.Context) {
defer trace.StartRegion(ctx, "ParallelApplyOuterWorker").End()
defer e.handleWorkerPanic(ctx, &e.workerWg)
var selected []bool
var err error
for {
failpoint.Inject("parallelApplyOuterWorkerPanic", nil)
chk := newFirstChunk(e.outerExec)
if err := Next(ctx, e.outerExec, chk); err != nil {
e.putResult(nil, err)
return
}
if chk.NumRows() == 0 {
close(e.outerRowCh)
return
}
e.outerList.Add(chk)
outerIter := chunk.NewIterator4Chunk(chk)
selected, err = expression.VectorizedFilter(e.ctx, e.outerFilter, outerIter, selected)
if err != nil {
e.putResult(nil, err)
return
}
for i := 0; i < chk.NumRows(); i++ {
row := chk.GetRow(i)
select {
case e.outerRowCh <- outerRow{&row, selected[i]}:
case <-e.exit:
return
}
}
}
}
func (e *ParallelNestedLoopApplyExec) innerWorker(ctx context.Context, id int) {
defer trace.StartRegion(ctx, "ParallelApplyInnerWorker").End()
defer e.handleWorkerPanic(ctx, &e.workerWg)
for {
var chk *chunk.Chunk
select {
case chk = <-e.freeChkCh:
case <-e.exit:
return
}
failpoint.Inject("parallelApplyInnerWorkerPanic", nil)
err := e.fillInnerChunk(ctx, id, chk)
if err == nil && chk.NumRows() == 0 { // no more data, this goroutine can exit
return
}
if e.putResult(chk, err) {
return
}
}
}
func (e *ParallelNestedLoopApplyExec) putResult(chk *chunk.Chunk, err error) (exit bool) {
select {
case e.resultChkCh <- result{chk, err}:
return false
case <-e.exit:
return true
}
}
func (e *ParallelNestedLoopApplyExec) handleWorkerPanic(ctx context.Context, wg *sync.WaitGroup) {
if r := recover(); r != nil {
err := errors.Errorf("%v", r)
logutil.Logger(ctx).Error("parallel nested loop join worker panicked", zap.Error(err), zap.Stack("stack"))
e.resultChkCh <- result{nil, err}
}
if wg != nil {
wg.Done()
}
}
// fetchAllInners reads all data from the inner table and stores them in a List.
func (e *ParallelNestedLoopApplyExec) fetchAllInners(ctx context.Context, id int) (err error) {
var key []byte
for _, col := range e.corCols[id] {
*col.Data = e.outerRow[id].GetDatum(col.Index, col.RetType)
if e.useCache {
if key, err = codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, key, *col.Data); err != nil {
return err
}
}
}
if e.useCache { // look up the cache
atomic.AddInt64(&e.cacheAccessCounter, 1)
failpoint.Inject("parallelApplyGetCachePanic", nil)
value, err := e.cache.Get(key)
if err != nil {
return err
}
if value != nil {
e.innerList[id] = value
atomic.AddInt64(&e.cacheHitCounter, 1)
return nil
}
}
err = e.innerExecs[id].Open(ctx)
defer terror.Call(e.innerExecs[id].Close)
if err != nil {
return err
}
if e.useCache {
// create a new one in this case since it may be in the cache
e.innerList[id] = chunk.NewList(retTypes(e.innerExecs[id]), e.initCap, e.maxChunkSize)
} else {
e.innerList[id].Reset()
}
innerIter := chunk.NewIterator4Chunk(e.innerChunk[id])
for {
err := Next(ctx, e.innerExecs[id], e.innerChunk[id])
if err != nil {
return err
}
if e.innerChunk[id].NumRows() == 0 {
break
}
e.innerSelected[id], err = expression.VectorizedFilter(e.ctx, e.innerFilter[id], innerIter, e.innerSelected[id])
if err != nil {
return err
}
for row := innerIter.Begin(); row != innerIter.End(); row = innerIter.Next() {
if e.innerSelected[id][row.Idx()] {
e.innerList[id].AppendRow(row)
}
}
}
if e.useCache { // update the cache
failpoint.Inject("parallelApplySetCachePanic", nil)
if _, err := e.cache.Set(key, e.innerList[id]); err != nil {
return err
}
}
return nil
}
func (e *ParallelNestedLoopApplyExec) fetchNextOuterRow(id int, req *chunk.Chunk) (row *chunk.Row, exit bool) {
for {
select {
case outerRow, ok := <-e.outerRowCh:
if !ok { // no more data
return nil, false
}
if !outerRow.selected {
if e.outer {
e.joiners[id].onMissMatch(false, *outerRow.row, req)
if req.IsFull() {
return nil, false
}
}
continue // try the next outer row
}
return outerRow.row, false
case <-e.exit:
return nil, true
}
}
}
func (e *ParallelNestedLoopApplyExec) fillInnerChunk(ctx context.Context, id int, req *chunk.Chunk) (err error) {
req.Reset()
for {
if e.innerIter[id] == nil || e.innerIter[id].Current() == e.innerIter[id].End() {
if e.outerRow[id] != nil && !e.hasMatch[id] {
e.joiners[id].onMissMatch(e.hasNull[id], *e.outerRow[id], req)
}
var exit bool
e.outerRow[id], exit = e.fetchNextOuterRow(id, req)
if exit || req.IsFull() || e.outerRow[id] == nil {
return nil
}
e.hasMatch[id] = false
e.hasNull[id] = false
err = e.fetchAllInners(ctx, id)
if err != nil {
return err
}
e.innerIter[id] = chunk.NewIterator4List(e.innerList[id])
e.innerIter[id].Begin()
}
matched, isNull, err := e.joiners[id].tryToMatchInners(*e.outerRow[id], e.innerIter[id], req)
e.hasMatch[id] = e.hasMatch[id] || matched
e.hasNull[id] = e.hasNull[id] || isNull
if err != nil || req.IsFull() {
return err
}
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦