tidb buffer 源码

  • 2022-09-19
  • 浏览 (533)

tidb buffer 代码

文件路径:/br/pkg/membuf/buffer.go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package membuf

const (
	defaultPoolSize            = 1024
	defaultBlockSize           = 1 << 20 // 1M
	defaultLargeAllocThreshold = 1 << 16 // 64K
)

// Allocator is the abstract interface for allocating and freeing memory.
type Allocator interface {
	Alloc(n int) []byte
	Free([]byte)
}

type stdAllocator struct{}

func (stdAllocator) Alloc(n int) []byte {
	return make([]byte, n)
}

func (stdAllocator) Free(_ []byte) {}

// Pool is like `sync.Pool`, which manages memory for all bytes buffers.
//
// NOTE: we don't used a `sync.Pool` because when will sync.Pool release is depending on the
// garbage collector which always release the memory so late. Use a fixed size chan to reuse
// can decrease the memory usage to 1/3 compare with sync.Pool.
type Pool struct {
	allocator           Allocator
	blockSize           int
	blockCache          chan []byte
	largeAllocThreshold int
}

// Option configures a pool.
type Option func(p *Pool)

// WithPoolSize configures how many blocks cached by this pool.
func WithPoolSize(size int) Option {
	return func(p *Pool) {
		p.blockCache = make(chan []byte, size)
	}
}

// WithBlockSize configures the size of each block.
func WithBlockSize(size int) Option {
	return func(p *Pool) {
		p.blockSize = size
	}
}

// WithAllocator specifies the allocator used by pool to allocate and free memory.
func WithAllocator(allocator Allocator) Option {
	return func(p *Pool) {
		p.allocator = allocator
	}
}

// WithLargeAllocThreshold configures the threshold for large allocation of a Buffer.
// If allocate size is larger than this threshold, bytes will be allocated directly
// by the make built-in function and won't be tracked by the pool.
func WithLargeAllocThreshold(threshold int) Option {
	return func(p *Pool) {
		p.largeAllocThreshold = threshold
	}
}

// NewPool creates a new pool.
func NewPool(opts ...Option) *Pool {
	p := &Pool{
		allocator:           stdAllocator{},
		blockSize:           defaultBlockSize,
		blockCache:          make(chan []byte, defaultPoolSize),
		largeAllocThreshold: defaultLargeAllocThreshold,
	}
	for _, opt := range opts {
		opt(p)
	}
	return p
}

func (p *Pool) acquire() []byte {
	select {
	case b := <-p.blockCache:
		return b
	default:
		return p.allocator.Alloc(p.blockSize)
	}
}

func (p *Pool) release(b []byte) {
	select {
	case p.blockCache <- b:
	default:
		p.allocator.Free(b)
	}
}

// NewBuffer creates a new buffer in current pool.
func (p *Pool) NewBuffer() *Buffer {
	return &Buffer{pool: p, bufs: make([][]byte, 0, 128), curBufIdx: -1}
}

// Destroy frees all buffers.
func (p *Pool) Destroy() {
	close(p.blockCache)
	for b := range p.blockCache {
		p.allocator.Free(b)
	}
}

// TotalSize is the total memory size of this Pool.
func (p *Pool) TotalSize() int64 {
	return int64(len(p.blockCache) * p.blockSize)
}

// Buffer represents the reuse buffer.
type Buffer struct {
	pool      *Pool
	bufs      [][]byte
	curBuf    []byte
	curIdx    int
	curBufIdx int
	curBufLen int
}

// addBuf adds buffer to Buffer.
func (b *Buffer) addBuf() {
	if b.curBufIdx < len(b.bufs)-1 {
		b.curBufIdx++
		b.curBuf = b.bufs[b.curBufIdx]
	} else {
		buf := b.pool.acquire()
		b.bufs = append(b.bufs, buf)
		b.curBuf = buf
		b.curBufIdx = len(b.bufs) - 1
	}

	b.curBufLen = len(b.curBuf)
	b.curIdx = 0
}

// Reset resets the buffer.
func (b *Buffer) Reset() {
	if len(b.bufs) > 0 {
		b.curBuf = b.bufs[0]
		b.curBufLen = len(b.bufs[0])
		b.curBufIdx = 0
		b.curIdx = 0
	}
}

// Destroy frees all buffers.
func (b *Buffer) Destroy() {
	for _, buf := range b.bufs {
		b.pool.release(buf)
	}
	b.bufs = nil
}

// TotalSize represents the total memory size of this Buffer.
func (b *Buffer) TotalSize() int64 {
	return int64(len(b.bufs) * b.pool.blockSize)
}

// AllocBytes allocates bytes with the given length.
func (b *Buffer) AllocBytes(n int) []byte {
	if n > b.pool.largeAllocThreshold {
		return make([]byte, n)
	}
	if b.curIdx+n > b.curBufLen {
		b.addBuf()
	}
	idx := b.curIdx
	b.curIdx += n
	return b.curBuf[idx:b.curIdx:b.curIdx]
}

// AddBytes adds the bytes into this Buffer.
func (b *Buffer) AddBytes(bytes []byte) []byte {
	buf := b.AllocBytes(len(bytes))
	copy(buf, bytes)
	return buf
}

相关信息

tidb 源码目录

相关文章

tidb bind_cache 源码

tidb bind_record 源码

tidb handle 源码

tidb session_handle 源码

tidb stat 源码

tidb backup 源码

tidb cmd 源码

tidb debug 源码

tidb main 源码

tidb restore 源码

0  赞