tidb autoid 源码
tidb autoid 代码
文件路径:/meta/autoid/autoid.go
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package autoid
import (
"bytes"
"context"
"fmt"
"math"
"strconv"
"sync"
"time"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
tikvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
// Attention:
// For reading cluster TiDB memory tables, the system schema/table should be same.
// Once the system schema/table id been allocated, it can't be changed any more.
// Change the system schema/table id may have the compatibility problem.
const (
// SystemSchemaIDFlag is the system schema/table id flag, uses the highest bit position as system schema ID flag, it's exports for test.
SystemSchemaIDFlag = 1 << 62
// InformationSchemaDBID is the information_schema schema id, it's exports for test.
InformationSchemaDBID int64 = SystemSchemaIDFlag | 1
// PerformanceSchemaDBID is the performance_schema schema id, it's exports for test.
PerformanceSchemaDBID int64 = SystemSchemaIDFlag | 10000
// MetricSchemaDBID is the metrics_schema schema id, it's exported for test.
MetricSchemaDBID int64 = SystemSchemaIDFlag | 20000
)
const (
minStep = 30000
maxStep = 2000000
defaultConsumeTime = 10 * time.Second
minIncrement = 1
maxIncrement = 65535
)
// RowIDBitLength is the bit number of a row id in TiDB.
const RowIDBitLength = 64
const (
// AutoRandomShardBitsDefault is the default number of shard bits.
AutoRandomShardBitsDefault = 5
// AutoRandomRangeBitsDefault is the default number of range bits.
AutoRandomRangeBitsDefault = 64
// AutoRandomShardBitsMax is the max number of shard bits.
AutoRandomShardBitsMax = 15
// AutoRandomRangeBitsMax is the max number of range bits.
AutoRandomRangeBitsMax = 64
// AutoRandomRangeBitsMin is the min number of range bits.
AutoRandomRangeBitsMin = 32
// AutoRandomIncBitsMin is the min number of auto random incremental bits.
AutoRandomIncBitsMin = 27
)
// AutoRandomShardBitsNormalize normalizes the auto random shard bits.
func AutoRandomShardBitsNormalize(shard int, colName string) (ret uint64, err error) {
if shard == types.UnspecifiedLength {
return AutoRandomShardBitsDefault, nil
}
if shard <= 0 {
return 0, dbterror.ErrInvalidAutoRandom.FastGenByArgs(AutoRandomNonPositive)
}
if shard > AutoRandomShardBitsMax {
errMsg := fmt.Sprintf(AutoRandomOverflowErrMsg, AutoRandomShardBitsMax, shard, colName)
return 0, dbterror.ErrInvalidAutoRandom.FastGenByArgs(errMsg)
}
return uint64(shard), nil
}
// AutoRandomRangeBitsNormalize normalizes the auto random range bits.
func AutoRandomRangeBitsNormalize(rangeBits int) (ret uint64, err error) {
if rangeBits == types.UnspecifiedLength {
return AutoRandomRangeBitsDefault, nil
}
if rangeBits < AutoRandomRangeBitsMin || rangeBits > AutoRandomRangeBitsMax {
errMsg := fmt.Sprintf(AutoRandomInvalidRangeBits, AutoRandomRangeBitsMin, AutoRandomRangeBitsMax, rangeBits)
return 0, dbterror.ErrInvalidAutoRandom.FastGenByArgs(errMsg)
}
return uint64(rangeBits), nil
}
// Test needs to change it, so it's a variable.
var step = int64(30000)
// AllocatorType is the type of allocator for generating auto-id. Different type of allocators use different key-value pairs.
type AllocatorType uint8
const (
// RowIDAllocType indicates the allocator is used to allocate row id.
RowIDAllocType AllocatorType = iota
// AutoIncrementType indicates the allocator is used to allocate auto increment value.
AutoIncrementType
// AutoRandomType indicates the allocator is used to allocate auto-shard id.
AutoRandomType
// SequenceType indicates the allocator is used to allocate sequence value.
SequenceType
)
func (a AllocatorType) String() string {
switch a {
case RowIDAllocType:
return "_tidb_rowid"
case AutoIncrementType:
return "auto_increment"
case AutoRandomType:
return "auto_random"
case SequenceType:
return "sequence"
}
return "unknown"
}
// CustomAutoIncCacheOption is one kind of AllocOption to customize the allocator step length.
type CustomAutoIncCacheOption int64
// ApplyOn implements the AllocOption interface.
func (step CustomAutoIncCacheOption) ApplyOn(alloc *allocator) {
if step == 0 {
return
}
alloc.step = int64(step)
alloc.customStep = true
}
// AllocOptionTableInfoVersion is used to pass the TableInfo.Version to the allocator.
type AllocOptionTableInfoVersion uint16
// ApplyOn implements the AllocOption interface.
func (v AllocOptionTableInfoVersion) ApplyOn(alloc *allocator) {
alloc.tbVersion = uint16(v)
}
// AllocOption is a interface to define allocator custom options coming in future.
type AllocOption interface {
ApplyOn(*allocator)
}
// Allocator is an auto increment id generator.
// Just keep id unique actually.
type Allocator interface {
// Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch.
// It gets a batch of autoIDs at a time. So it does not need to access storage for each call.
// The consecutive feature is used to insert multiple rows in a statement.
// increment & offset is used to validate the start position (the allocator's base is not always the last allocated id).
// The returned range is (min, max]:
// case increment=1 & offset=1: you can derive the ids like min+1, min+2... max.
// case increment=x & offset=y: you firstly need to seek to firstID by `SeekToFirstAutoIDXXX`, then derive the IDs like firstID, firstID + increment * 2... in the caller.
Alloc(ctx context.Context, n uint64, increment, offset int64) (int64, int64, error)
// AllocSeqCache allocs sequence batch value cached in table level(rather than in alloc), the returned range covering
// the size of sequence cache with it's increment. The returned round indicates the sequence cycle times if it is with
// cycle option.
AllocSeqCache() (min int64, max int64, round int64, err error)
// Rebase rebases the autoID base for table with tableID and the new base value.
// If allocIDs is true, it will allocate some IDs and save to the cache.
// If allocIDs is false, it will not allocate IDs.
Rebase(ctx context.Context, newBase int64, allocIDs bool) error
// ForceRebase set the next global auto ID to newBase.
ForceRebase(newBase int64) error
// RebaseSeq rebases the sequence value in number axis with tableID and the new base value.
RebaseSeq(newBase int64) (int64, bool, error)
// Base return the current base of Allocator.
Base() int64
// End is only used for test.
End() int64
// NextGlobalAutoID returns the next global autoID.
NextGlobalAutoID() (int64, error)
GetType() AllocatorType
}
// Allocators represents a set of `Allocator`s.
type Allocators []Allocator
// NewAllocators packs multiple `Allocator`s into Allocators.
func NewAllocators(allocators ...Allocator) Allocators {
return allocators
}
// Get returns the Allocator according to the AllocatorType.
func (all Allocators) Get(allocType AllocatorType) Allocator {
for _, a := range all {
if a.GetType() == allocType {
return a
}
}
return nil
}
// Filter filters all the allocators that match pred.
func (all Allocators) Filter(pred func(Allocator) bool) Allocators {
var ret Allocators
for _, a := range all {
if pred(a) {
ret = append(ret, a)
}
}
return ret
}
type allocator struct {
mu sync.Mutex
base int64
end int64
store kv.Storage
// dbID is current database's ID.
dbID int64
tbID int64
tbVersion uint16
isUnsigned bool
lastAllocTime time.Time
step int64
customStep bool
allocType AllocatorType
sequence *model.SequenceInfo
}
// GetStep is only used by tests
func GetStep() int64 {
return step
}
// SetStep is only used by tests
func SetStep(s int64) {
step = s
}
// Base implements autoid.Allocator Base interface.
func (alloc *allocator) Base() int64 {
return alloc.base
}
// End implements autoid.Allocator End interface.
func (alloc *allocator) End() int64 {
return alloc.end
}
// NextGlobalAutoID implements autoid.Allocator NextGlobalAutoID interface.
func (alloc *allocator) NextGlobalAutoID() (int64, error) {
var autoID int64
startTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
var err1 error
autoID, err1 = alloc.getIDAccessor(txn).Get()
if err1 != nil {
return errors.Trace(err1)
}
return nil
})
metrics.AutoIDHistogram.WithLabelValues(metrics.GlobalAutoID, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if alloc.isUnsigned {
return int64(uint64(autoID) + 1), err
}
return autoID + 1, err
}
func (alloc *allocator) rebase4Unsigned(ctx context.Context, requiredBase uint64, allocIDs bool) error {
// Satisfied by alloc.base, nothing to do.
if requiredBase <= uint64(alloc.base) {
return nil
}
// Satisfied by alloc.end, need to update alloc.base.
if requiredBase <= uint64(alloc.end) {
alloc.base = int64(requiredBase)
return nil
}
ctx, allocatorStats, commitDetail := getAllocatorStatsFromCtx(ctx)
if allocatorStats != nil {
allocatorStats.rebaseCount++
defer func() {
if commitDetail != nil {
allocatorStats.mergeCommitDetail(*commitDetail)
}
}()
}
var newBase, newEnd uint64
startTime := time.Now()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
if allocatorStats != nil {
txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats)
}
idAcc := alloc.getIDAccessor(txn)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
}
uCurrentEnd := uint64(currentEnd)
if allocIDs {
newBase = mathutil.Max(uCurrentEnd, requiredBase)
newEnd = mathutil.Min(math.MaxUint64-uint64(alloc.step), newBase) + uint64(alloc.step)
} else {
if uCurrentEnd >= requiredBase {
newBase = uCurrentEnd
newEnd = uCurrentEnd
// Required base satisfied, we don't need to update KV.
return nil
}
// If we don't want to allocate IDs, for example when creating a table with a given base value,
// We need to make sure when other TiDB server allocates ID for the first time, requiredBase + 1
// will be allocated, so we need to increase the end to exactly the requiredBase.
newBase = requiredBase
newEnd = requiredBase
}
_, err1 = idAcc.Inc(int64(newEnd - uCurrentEnd))
return err1
})
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDRebase, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return err
}
alloc.base, alloc.end = int64(newBase), int64(newEnd)
return nil
}
func (alloc *allocator) rebase4Signed(ctx context.Context, requiredBase int64, allocIDs bool) error {
// Satisfied by alloc.base, nothing to do.
if requiredBase <= alloc.base {
return nil
}
// Satisfied by alloc.end, need to update alloc.base.
if requiredBase <= alloc.end {
alloc.base = requiredBase
return nil
}
ctx, allocatorStats, commitDetail := getAllocatorStatsFromCtx(ctx)
if allocatorStats != nil {
allocatorStats.rebaseCount++
defer func() {
if commitDetail != nil {
allocatorStats.mergeCommitDetail(*commitDetail)
}
}()
}
var newBase, newEnd int64
startTime := time.Now()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
if allocatorStats != nil {
txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats)
}
idAcc := alloc.getIDAccessor(txn)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
}
if allocIDs {
newBase = mathutil.Max(currentEnd, requiredBase)
newEnd = mathutil.Min(math.MaxInt64-alloc.step, newBase) + alloc.step
} else {
if currentEnd >= requiredBase {
newBase = currentEnd
newEnd = currentEnd
// Required base satisfied, we don't need to update KV.
return nil
}
// If we don't want to allocate IDs, for example when creating a table with a given base value,
// We need to make sure when other TiDB server allocates ID for the first time, requiredBase + 1
// will be allocated, so we need to increase the end to exactly the requiredBase.
newBase = requiredBase
newEnd = requiredBase
}
_, err1 = idAcc.Inc(newEnd - currentEnd)
return err1
})
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDRebase, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return err
}
alloc.base, alloc.end = newBase, newEnd
return nil
}
// rebase4Sequence won't alloc batch immediately, cause it won't cache value in allocator.
func (alloc *allocator) rebase4Sequence(requiredBase int64) (int64, bool, error) {
startTime := time.Now()
alreadySatisfied := false
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
acc := meta.NewMeta(txn).GetAutoIDAccessors(alloc.dbID, alloc.tbID)
currentEnd, err := acc.SequenceValue().Get()
if err != nil {
return err
}
if alloc.sequence.Increment > 0 {
if currentEnd >= requiredBase {
// Required base satisfied, we don't need to update KV.
alreadySatisfied = true
return nil
}
} else {
if currentEnd <= requiredBase {
// Required base satisfied, we don't need to update KV.
alreadySatisfied = true
return nil
}
}
// If we don't want to allocate IDs, for example when creating a table with a given base value,
// We need to make sure when other TiDB server allocates ID for the first time, requiredBase + 1
// will be allocated, so we need to increase the end to exactly the requiredBase.
_, err = acc.SequenceValue().Inc(requiredBase - currentEnd)
return err
})
// TODO: sequence metrics
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDRebase, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return 0, false, err
}
if alreadySatisfied {
return 0, true, nil
}
return requiredBase, false, err
}
// Rebase implements autoid.Allocator Rebase interface.
// The requiredBase is the minimum base value after Rebase.
// The real base may be greater than the required base.
func (alloc *allocator) Rebase(ctx context.Context, requiredBase int64, allocIDs bool) error {
alloc.mu.Lock()
defer alloc.mu.Unlock()
if alloc.isUnsigned {
return alloc.rebase4Unsigned(ctx, uint64(requiredBase), allocIDs)
}
return alloc.rebase4Signed(ctx, requiredBase, allocIDs)
}
// ForceRebase implements autoid.Allocator ForceRebase interface.
func (alloc *allocator) ForceRebase(requiredBase int64) error {
if requiredBase == -1 {
return ErrAutoincReadFailed.GenWithStack("Cannot force rebase the next global ID to '0'")
}
alloc.mu.Lock()
defer alloc.mu.Unlock()
startTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := alloc.getIDAccessor(txn)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
}
var step int64
if !alloc.isUnsigned {
step = requiredBase - currentEnd
} else {
uRequiredBase, uCurrentEnd := uint64(requiredBase), uint64(currentEnd)
step = int64(uRequiredBase - uCurrentEnd)
}
_, err1 = idAcc.Inc(step)
return err1
})
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDRebase, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return err
}
alloc.base, alloc.end = requiredBase, requiredBase
return nil
}
// Rebase implements autoid.Allocator RebaseSeq interface.
// The return value is quite same as expression function, bool means whether it should be NULL,
// here it will be used in setval expression function (true meaning the set value has been satisfied, return NULL).
// case1:When requiredBase is satisfied with current value, it will return (0, true, nil),
// case2:When requiredBase is successfully set in, it will return (requiredBase, false, nil).
// If some error occurs in the process, return it immediately.
func (alloc *allocator) RebaseSeq(requiredBase int64) (int64, bool, error) {
alloc.mu.Lock()
defer alloc.mu.Unlock()
return alloc.rebase4Sequence(requiredBase)
}
func (alloc *allocator) GetType() AllocatorType {
return alloc.allocType
}
// NextStep return new auto id step according to previous step and consuming time.
func NextStep(curStep int64, consumeDur time.Duration) int64 {
failpoint.Inject("mockAutoIDCustomize", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(3)
}
})
failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(step)
}
})
consumeRate := defaultConsumeTime.Seconds() / consumeDur.Seconds()
res := int64(float64(curStep) * consumeRate)
if res < minStep {
return minStep
} else if res > maxStep {
return maxStep
}
return res
}
// NewAllocator returns a new auto increment id generator on the store.
func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool,
allocType AllocatorType, opts ...AllocOption) Allocator {
alloc := &allocator{
store: store,
dbID: dbID,
tbID: tbID,
isUnsigned: isUnsigned,
step: step,
lastAllocTime: time.Now(),
allocType: allocType,
}
for _, fn := range opts {
fn.ApplyOn(alloc)
}
return alloc
}
// NewSequenceAllocator returns a new sequence value generator on the store.
func NewSequenceAllocator(store kv.Storage, dbID, tbID int64, info *model.SequenceInfo) Allocator {
return &allocator{
store: store,
dbID: dbID,
tbID: tbID,
// Sequence allocator is always signed.
isUnsigned: false,
lastAllocTime: time.Now(),
allocType: SequenceType,
sequence: info,
}
}
// NewAllocatorsFromTblInfo creates an array of allocators of different types with the information of model.TableInfo.
func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) Allocators {
var allocs []Allocator
dbID := tblInfo.GetDBID(schemaID)
idCacheOpt := CustomAutoIncCacheOption(tblInfo.AutoIdCache)
tblVer := AllocOptionTableInfoVersion(tblInfo.Version)
hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
if hasRowID || hasAutoIncID {
alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer)
allocs = append(allocs, alloc)
}
hasAutoRandID := tblInfo.ContainsAutoRandomBits()
if hasAutoRandID {
alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer)
allocs = append(allocs, alloc)
}
if tblInfo.IsSequence() {
allocs = append(allocs, NewSequenceAllocator(store, dbID, tblInfo.ID, tblInfo.Sequence))
}
return NewAllocators(allocs...)
}
// Alloc implements autoid.Allocator Alloc interface.
// For autoIncrement allocator, the increment and offset should always be positive in [1, 65535].
// Attention:
// When increment and offset is not the default value(1), the return range (min, max] need to
// calculate the correct start position rather than simply the add 1 to min. Then you can derive
// the successive autoID by adding increment * cnt to firstID for (n-1) times.
//
// Example:
// (6, 13] is returned, increment = 4, offset = 1, n = 2.
// 6 is the last allocated value for other autoID or handle, maybe with different increment and step,
// but actually we don't care about it, all we need is to calculate the new autoID corresponding to the
// increment and offset at this time now. To simplify the rule is like (ID - offset) % increment = 0,
// so the first autoID should be 9, then add increment to it to get 13.
func (alloc *allocator) Alloc(ctx context.Context, n uint64, increment, offset int64) (min int64, max int64, err error) {
if alloc.tbID == 0 {
return 0, 0, errInvalidTableID.GenWithStackByArgs("Invalid tableID")
}
if n == 0 {
return 0, 0, nil
}
if alloc.allocType == AutoIncrementType || alloc.allocType == RowIDAllocType {
if !validIncrementAndOffset(increment, offset) {
return 0, 0, errInvalidIncrementAndOffset.GenWithStackByArgs(increment, offset)
}
}
alloc.mu.Lock()
defer alloc.mu.Unlock()
if alloc.isUnsigned {
return alloc.alloc4Unsigned(ctx, n, increment, offset)
}
return alloc.alloc4Signed(ctx, n, increment, offset)
}
func (alloc *allocator) AllocSeqCache() (min int64, max int64, round int64, err error) {
alloc.mu.Lock()
defer alloc.mu.Unlock()
return alloc.alloc4Sequence()
}
func validIncrementAndOffset(increment, offset int64) bool {
return (increment >= minIncrement && increment <= maxIncrement) && (offset >= minIncrement && offset <= maxIncrement)
}
// CalcNeededBatchSize is used to calculate batch size for autoID allocation.
// It firstly seeks to the first valid position based on increment and offset,
// then plus the length remained, which could be (n-1) * increment.
func CalcNeededBatchSize(base, n, increment, offset int64, isUnsigned bool) int64 {
if increment == 1 {
return n
}
if isUnsigned {
// SeekToFirstAutoIDUnSigned seeks to the next unsigned valid position.
nr := SeekToFirstAutoIDUnSigned(uint64(base), uint64(increment), uint64(offset))
// Calculate the total batch size needed.
nr += (uint64(n) - 1) * uint64(increment)
return int64(nr - uint64(base))
}
nr := SeekToFirstAutoIDSigned(base, increment, offset)
// Calculate the total batch size needed.
nr += (n - 1) * increment
return nr - base
}
// CalcSequenceBatchSize calculate the next sequence batch size.
func CalcSequenceBatchSize(base, size, increment, offset, min, max int64) (int64, error) {
// The sequence is positive growth.
if increment > 0 {
if increment == 1 {
// Sequence is already allocated to the end.
if base >= max {
return 0, ErrAutoincReadFailed
}
// The rest of sequence < cache size, return the rest.
if max-base < size {
return max - base, nil
}
// The rest of sequence is adequate.
return size, nil
}
nr, ok := SeekToFirstSequenceValue(base, increment, offset, min, max)
if !ok {
return 0, ErrAutoincReadFailed
}
// The rest of sequence < cache size, return the rest.
if max-nr < (size-1)*increment {
return max - base, nil
}
return (nr - base) + (size-1)*increment, nil
}
// The sequence is negative growth.
if increment == -1 {
if base <= min {
return 0, ErrAutoincReadFailed
}
if base-min < size {
return base - min, nil
}
return size, nil
}
nr, ok := SeekToFirstSequenceValue(base, increment, offset, min, max)
if !ok {
return 0, ErrAutoincReadFailed
}
// The rest of sequence < cache size, return the rest.
if nr-min < (size-1)*(-increment) {
return base - min, nil
}
return (base - nr) + (size-1)*(-increment), nil
}
// SeekToFirstSequenceValue seeks to the next valid value (must be in range of [MIN, max]),
// the bool indicates whether the first value is got.
// The seeking formula is describe as below:
//
// nr := (base + increment - offset) / increment
//
// first := nr*increment + offset
// Because formula computation will overflow Int64, so we transfer it to uint64 for distance computation.
func SeekToFirstSequenceValue(base, increment, offset, min, max int64) (int64, bool) {
if increment > 0 {
// Sequence is already allocated to the end.
if base >= max {
return 0, false
}
uMax := EncodeIntToCmpUint(max)
uBase := EncodeIntToCmpUint(base)
uOffset := EncodeIntToCmpUint(offset)
uIncrement := uint64(increment)
if uMax-uBase < uIncrement {
// Enum the possible first value.
for i := uBase + 1; i <= uMax; i++ {
if (i-uOffset)%uIncrement == 0 {
return DecodeCmpUintToInt(i), true
}
}
return 0, false
}
nr := (uBase + uIncrement - uOffset) / uIncrement
nr = nr*uIncrement + uOffset
first := DecodeCmpUintToInt(nr)
return first, true
}
// Sequence is already allocated to the end.
if base <= min {
return 0, false
}
uMin := EncodeIntToCmpUint(min)
uBase := EncodeIntToCmpUint(base)
uOffset := EncodeIntToCmpUint(offset)
uIncrement := uint64(-increment)
if uBase-uMin < uIncrement {
// Enum the possible first value.
for i := uBase - 1; i >= uMin; i-- {
if (uOffset-i)%uIncrement == 0 {
return DecodeCmpUintToInt(i), true
}
}
return 0, false
}
nr := (uOffset - uBase + uIncrement) / uIncrement
nr = uOffset - nr*uIncrement
first := DecodeCmpUintToInt(nr)
return first, true
}
// SeekToFirstAutoIDSigned seeks to the next valid signed position.
func SeekToFirstAutoIDSigned(base, increment, offset int64) int64 {
nr := (base + increment - offset) / increment
nr = nr*increment + offset
return nr
}
// SeekToFirstAutoIDUnSigned seeks to the next valid unsigned position.
func SeekToFirstAutoIDUnSigned(base, increment, offset uint64) uint64 {
nr := (base + increment - offset) / increment
nr = nr*increment + offset
return nr
}
func (alloc *allocator) alloc4Signed(ctx context.Context, n uint64, increment, offset int64) (min int64, max int64, err error) {
// Check offset rebase if necessary.
if offset-1 > alloc.base {
if err := alloc.rebase4Signed(ctx, offset-1, true); err != nil {
return 0, 0, err
}
}
// CalcNeededBatchSize calculates the total batch size needed.
n1 := CalcNeededBatchSize(alloc.base, int64(n), increment, offset, alloc.isUnsigned)
// Condition alloc.base+N1 > alloc.end will overflow when alloc.base + N1 > MaxInt64. So need this.
if math.MaxInt64-alloc.base <= n1 {
return 0, 0, ErrAutoincReadFailed
}
// The local rest is not enough for allocN, skip it.
if alloc.base+n1 > alloc.end {
var newBase, newEnd int64
startTime := time.Now()
nextStep := alloc.step
if !alloc.customStep {
// Although it may skip a segment here, we still think it is consumed.
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
}
ctx, allocatorStats, commitDetail := getAllocatorStatsFromCtx(ctx)
if allocatorStats != nil {
allocatorStats.allocCount++
defer func() {
if commitDetail != nil {
allocatorStats.mergeCommitDetail(*commitDetail)
}
}()
}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("alloc.alloc4Signed", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
if allocatorStats != nil {
txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats)
}
idAcc := alloc.getIDAccessor(txn)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
return err1
}
// CalcNeededBatchSize calculates the total batch size needed on global base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := mathutil.Min(math.MaxInt64-newBase, nextStep)
// The global rest is not enough for alloc.
if tmpStep < n1 {
return ErrAutoincReadFailed
}
newEnd, err1 = idAcc.Inc(tmpStep)
return err1
})
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if newBase == math.MaxInt64 {
return 0, 0, ErrAutoincReadFailed
}
alloc.base, alloc.end = newBase, newEnd
}
logutil.Logger(context.TODO()).Debug("alloc N signed ID",
zap.Uint64("from ID", uint64(alloc.base)),
zap.Uint64("to ID", uint64(alloc.base+n1)),
zap.Int64("table ID", alloc.tbID),
zap.Int64("database ID", alloc.dbID))
min = alloc.base
alloc.base += n1
return min, alloc.base, nil
}
func (alloc *allocator) alloc4Unsigned(ctx context.Context, n uint64, increment, offset int64) (min int64, max int64, err error) {
// Check offset rebase if necessary.
if uint64(offset-1) > uint64(alloc.base) {
if err := alloc.rebase4Unsigned(ctx, uint64(offset-1), true); err != nil {
return 0, 0, err
}
}
// CalcNeededBatchSize calculates the total batch size needed.
n1 := CalcNeededBatchSize(alloc.base, int64(n), increment, offset, alloc.isUnsigned)
// Condition alloc.base+n1 > alloc.end will overflow when alloc.base + n1 > MaxInt64. So need this.
if math.MaxUint64-uint64(alloc.base) <= uint64(n1) {
return 0, 0, ErrAutoincReadFailed
}
// The local rest is not enough for alloc, skip it.
if uint64(alloc.base)+uint64(n1) > uint64(alloc.end) {
var newBase, newEnd int64
startTime := time.Now()
nextStep := alloc.step
if !alloc.customStep {
// Although it may skip a segment here, we still treat it as consumed.
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
}
ctx, allocatorStats, commitDetail := getAllocatorStatsFromCtx(ctx)
if allocatorStats != nil {
allocatorStats.allocCount++
defer func() {
if commitDetail != nil {
allocatorStats.mergeCommitDetail(*commitDetail)
}
}()
}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("alloc.alloc4Unsigned", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
if allocatorStats != nil {
txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats)
}
idAcc := alloc.getIDAccessor(txn)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
return err1
}
// CalcNeededBatchSize calculates the total batch size needed on new base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := int64(mathutil.Min(math.MaxUint64-uint64(newBase), uint64(nextStep)))
// The global rest is not enough for alloc.
if tmpStep < n1 {
return ErrAutoincReadFailed
}
newEnd, err1 = idAcc.Inc(tmpStep)
return err1
})
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if uint64(newBase) == math.MaxUint64 {
return 0, 0, ErrAutoincReadFailed
}
alloc.base, alloc.end = newBase, newEnd
}
logutil.Logger(context.TODO()).Debug("alloc unsigned ID",
zap.Uint64(" from ID", uint64(alloc.base)),
zap.Uint64("to ID", uint64(alloc.base+n1)),
zap.Int64("table ID", alloc.tbID),
zap.Int64("database ID", alloc.dbID))
min = alloc.base
// Use uint64 n directly.
alloc.base = int64(uint64(alloc.base) + uint64(n1))
return min, alloc.base, nil
}
func getAllocatorStatsFromCtx(ctx context.Context) (context.Context, *AllocatorRuntimeStats, **tikvutil.CommitDetails) {
var allocatorStats *AllocatorRuntimeStats
var commitDetail *tikvutil.CommitDetails
ctxValue := ctx.Value(AllocatorRuntimeStatsCtxKey)
if ctxValue != nil {
allocatorStats = ctxValue.(*AllocatorRuntimeStats)
ctx = context.WithValue(ctx, tikvutil.CommitDetailCtxKey, &commitDetail)
}
return ctx, allocatorStats, &commitDetail
}
// alloc4Sequence is used to alloc value for sequence, there are several aspects different from autoid logic.
// 1: sequence allocation don't need check rebase.
// 2: sequence allocation don't need auto step.
// 3: sequence allocation may have negative growth.
// 4: sequence allocation batch length can be dissatisfied.
// 5: sequence batch allocation will be consumed immediately.
func (alloc *allocator) alloc4Sequence() (min int64, max int64, round int64, err error) {
increment := alloc.sequence.Increment
offset := alloc.sequence.Start
minValue := alloc.sequence.MinValue
maxValue := alloc.sequence.MaxValue
cacheSize := alloc.sequence.CacheValue
if !alloc.sequence.Cache {
cacheSize = 1
}
var newBase, newEnd int64
startTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnMeta)
err = kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
acc := meta.NewMeta(txn).GetAutoIDAccessors(alloc.dbID, alloc.tbID)
var (
err1 error
seqStep int64
)
// Get the real offset if the sequence is in cycle.
// round is used to count cycle times in sequence with cycle option.
if alloc.sequence.Cycle {
// GetSequenceCycle is used to get the flag `round`, which indicates whether the sequence is already in cycle.
round, err1 = acc.SequenceCycle().Get()
if err1 != nil {
return err1
}
if round > 0 {
if increment > 0 {
offset = alloc.sequence.MinValue
} else {
offset = alloc.sequence.MaxValue
}
}
}
// Get the global new base.
newBase, err1 = acc.SequenceValue().Get()
if err1 != nil {
return err1
}
// CalcNeededBatchSize calculates the total batch size needed.
seqStep, err1 = CalcSequenceBatchSize(newBase, cacheSize, increment, offset, minValue, maxValue)
if err1 != nil && err1 == ErrAutoincReadFailed {
if !alloc.sequence.Cycle {
return err1
}
// Reset the sequence base and offset.
if alloc.sequence.Increment > 0 {
newBase = alloc.sequence.MinValue - 1
offset = alloc.sequence.MinValue
} else {
newBase = alloc.sequence.MaxValue + 1
offset = alloc.sequence.MaxValue
}
err1 = acc.SequenceValue().Put(newBase)
if err1 != nil {
return err1
}
// Reset sequence round state value.
round++
// SetSequenceCycle is used to store the flag `round` which indicates whether the sequence is already in cycle.
// round > 0 means the sequence is already in cycle, so the offset should be minvalue / maxvalue rather than sequence.start.
// TiDB is a stateless node, it should know whether the sequence is already in cycle when restart.
err1 = acc.SequenceCycle().Put(round)
if err1 != nil {
return err1
}
// Recompute the sequence next batch size.
seqStep, err1 = CalcSequenceBatchSize(newBase, cacheSize, increment, offset, minValue, maxValue)
if err1 != nil {
return err1
}
}
var delta int64
if alloc.sequence.Increment > 0 {
delta = seqStep
} else {
delta = -seqStep
}
newEnd, err1 = acc.SequenceValue().Inc(delta)
return err1
})
// TODO: sequence metrics
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return 0, 0, 0, err
}
logutil.Logger(context.TODO()).Debug("alloc sequence value",
zap.Uint64(" from value", uint64(newBase)),
zap.Uint64("to value", uint64(newEnd)),
zap.Int64("table ID", alloc.tbID),
zap.Int64("database ID", alloc.dbID))
return newBase, newEnd, round, nil
}
func (alloc *allocator) getIDAccessor(txn kv.Transaction) meta.AutoIDAccessor {
acc := meta.NewMeta(txn).GetAutoIDAccessors(alloc.dbID, alloc.tbID)
switch alloc.allocType {
case RowIDAllocType:
return acc.RowID()
case AutoIncrementType:
return acc.IncrementID(alloc.tbVersion)
case AutoRandomType:
return acc.RandomID()
case SequenceType:
return acc.SequenceValue()
}
return nil
}
const signMask uint64 = 0x8000000000000000
// EncodeIntToCmpUint make int v to comparable uint type
func EncodeIntToCmpUint(v int64) uint64 {
return uint64(v) ^ signMask
}
// DecodeCmpUintToInt decodes the u that encoded by EncodeIntToCmpUint
func DecodeCmpUintToInt(u uint64) int64 {
return int64(u ^ signMask)
}
// TestModifyBaseAndEndInjection exported for testing modifying the base and end.
func TestModifyBaseAndEndInjection(alloc Allocator, base, end int64) {
alloc.(*allocator).mu.Lock()
alloc.(*allocator).base = base
alloc.(*allocator).end = end
alloc.(*allocator).mu.Unlock()
}
// ShardIDFormat is used to calculate the bit length of different segments in auto id.
// Generally, an auto id is consist of 4 segments: sign bit, reserved bits, shard bits and incremental bits.
// Take "a BIGINT AUTO_INCREMENT PRIMARY KEY" as an example, assume that the `shard_row_id_bits` = 5,
// the layout is like
//
// | [sign_bit] (1 bit) | [reserved bits] (0 bits) | [shard_bits] (5 bits) | [incremental_bits] (64-1-5=58 bits) |
//
// Please always use NewShardIDFormat() to instantiate.
type ShardIDFormat struct {
FieldType *types.FieldType
ShardBits uint64
// Derived fields.
IncrementalBits uint64
}
// NewShardIDFormat create an instance of ShardIDFormat.
// RangeBits means the bit length of the sign bit + shard bits + incremental bits.
// If RangeBits is 0, it will be calculated according to field type automatically.
func NewShardIDFormat(fieldType *types.FieldType, shardBits, rangeBits uint64) ShardIDFormat {
var incrementalBits uint64
if rangeBits == 0 {
// Zero means that the range bits is not specified. We interpret it as the length of BIGINT.
incrementalBits = RowIDBitLength - shardBits
} else {
incrementalBits = rangeBits - shardBits
}
hasSignBit := !mysql.HasUnsignedFlag(fieldType.GetFlag())
if hasSignBit {
incrementalBits--
}
return ShardIDFormat{
FieldType: fieldType,
ShardBits: shardBits,
IncrementalBits: incrementalBits,
}
}
// IncrementalBitsCapacity returns the max capacity of incremental section of the current format.
func (s *ShardIDFormat) IncrementalBitsCapacity() uint64 {
return uint64(s.IncrementalMask())
}
// IncrementalMask returns 00..0[11..1], where [11..1] is the incremental part of the current format.
func (s *ShardIDFormat) IncrementalMask() int64 {
return (1 << s.IncrementalBits) - 1
}
// Compose generates an auto ID based on the given shard and an incremental ID.
func (s *ShardIDFormat) Compose(shard int64, id int64) int64 {
return ((shard & ((1 << s.ShardBits) - 1)) << s.IncrementalBits) | id
}
type allocatorRuntimeStatsCtxKeyType struct{}
// AllocatorRuntimeStatsCtxKey is the context key of allocator runtime stats.
var AllocatorRuntimeStatsCtxKey = allocatorRuntimeStatsCtxKeyType{}
// AllocatorRuntimeStats is the execution stats of auto id allocator.
type AllocatorRuntimeStats struct {
*txnsnapshot.SnapshotRuntimeStats
*execdetails.RuntimeStatsWithCommit
allocCount int
rebaseCount int
}
// NewAllocatorRuntimeStats return a new AllocatorRuntimeStats.
func NewAllocatorRuntimeStats() *AllocatorRuntimeStats {
return &AllocatorRuntimeStats{
SnapshotRuntimeStats: &txnsnapshot.SnapshotRuntimeStats{},
}
}
func (e *AllocatorRuntimeStats) mergeCommitDetail(detail *tikvutil.CommitDetails) {
if detail == nil {
return
}
if e.RuntimeStatsWithCommit == nil {
e.RuntimeStatsWithCommit = &execdetails.RuntimeStatsWithCommit{}
}
e.RuntimeStatsWithCommit.MergeCommitDetails(detail)
}
// String implements the RuntimeStats interface.
func (e *AllocatorRuntimeStats) String() string {
if e.allocCount == 0 && e.rebaseCount == 0 {
return ""
}
var buf bytes.Buffer
buf.WriteString("auto_id_allocator: {")
initialSize := buf.Len()
if e.allocCount > 0 {
buf.WriteString("alloc_cnt: ")
buf.WriteString(strconv.FormatInt(int64(e.allocCount), 10))
}
if e.rebaseCount > 0 {
if buf.Len() > initialSize {
buf.WriteString(", ")
}
buf.WriteString("rebase_cnt: ")
buf.WriteString(strconv.FormatInt(int64(e.rebaseCount), 10))
}
if e.SnapshotRuntimeStats != nil {
stats := e.SnapshotRuntimeStats.String()
if stats != "" {
if buf.Len() > initialSize {
buf.WriteString(", ")
}
buf.WriteString(e.SnapshotRuntimeStats.String())
}
}
if e.RuntimeStatsWithCommit != nil {
stats := e.RuntimeStatsWithCommit.String()
if stats != "" {
if buf.Len() > initialSize {
buf.WriteString(", ")
}
buf.WriteString(stats)
}
}
buf.WriteString("}")
return buf.String()
}
// Clone implements the RuntimeStats interface.
func (e *AllocatorRuntimeStats) Clone() *AllocatorRuntimeStats {
newRs := &AllocatorRuntimeStats{
allocCount: e.allocCount,
rebaseCount: e.rebaseCount,
}
if e.SnapshotRuntimeStats != nil {
snapshotStats := e.SnapshotRuntimeStats.Clone()
newRs.SnapshotRuntimeStats = snapshotStats
}
if e.RuntimeStatsWithCommit != nil {
newRs.RuntimeStatsWithCommit = e.RuntimeStatsWithCommit.Clone().(*execdetails.RuntimeStatsWithCommit)
}
return newRs
}
// Merge implements the RuntimeStats interface.
func (e *AllocatorRuntimeStats) Merge(other *AllocatorRuntimeStats) {
if other == nil {
return
}
if other.SnapshotRuntimeStats != nil {
if e.SnapshotRuntimeStats == nil {
e.SnapshotRuntimeStats = other.SnapshotRuntimeStats.Clone()
} else {
e.SnapshotRuntimeStats.Merge(other.SnapshotRuntimeStats)
}
}
if other.RuntimeStatsWithCommit != nil {
if e.RuntimeStatsWithCommit == nil {
e.RuntimeStatsWithCommit = other.RuntimeStatsWithCommit.Clone().(*execdetails.RuntimeStatsWithCommit)
} else {
e.RuntimeStatsWithCommit.Merge(other.RuntimeStatsWithCommit)
}
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦