tidb column 源码
tidb column 代码
文件路径:/util/column-mapping/column.go
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package column
import (
"fmt"
"strconv"
"strings"
"sync"
"github.com/pingcap/errors"
selector "github.com/pingcap/tidb/util/table-rule-selector"
)
var (
// for partition ID, ref definition of partitionID
instanceIDBitSize = 4
schemaIDBitSize = 7
tableIDBitSize = 8
maxOriginID int64 = 17592186044416
)
// SetPartitionRule sets bit size of schema ID and table ID
func SetPartitionRule(instanceIDSize, schemaIDSize, tableIDSize int) {
instanceIDBitSize = instanceIDSize
schemaIDBitSize = schemaIDSize
tableIDBitSize = tableIDSize
maxOriginID = 1 << uint(64-instanceIDSize-schemaIDSize-tableIDSize-1)
}
// Expr indicates how to handle column mapping
type Expr string
// poor Expr
const (
AddPrefix Expr = "add prefix"
AddSuffix Expr = "add suffix"
PartitionID Expr = "partition id"
)
// Exprs is some built-in expression for column mapping
// only supports some poor expressions now,
// we would unify tableInfo later and support more
var Exprs = map[Expr]func(*mappingInfo, []interface{}) ([]interface{}, error){
AddPrefix: addPrefix, // arguments contains prefix
AddSuffix: addSuffix, // arguments contains suffix
// arguments contains [instance_id, prefix of schema, prefix of table]
// we would compute a ID like
// [1:1 bit][2:9 bits][3:10 bits][4:44 bits] int64 (using default bits length)
// # 1 useless, no reason
// # 2 schema ID (schema suffix)
// # 3 table ID (table suffix)
// # 4 origin ID (>= 0, <= 17592186044415)
//
// others: schema = arguments[1] or arguments[1] + arguments[3] + schema suffix
// table = arguments[2] or arguments[2] + arguments[3] + table suffix
// example: schema = schema_1 table = t_1
// => arguments[1] = "schema", arguments[2] = "t", arguments[3] = "_"
// if arguments[1]/arguments[2] == "", it means we don't use schemaID/tableID to compute partition ID
// if length of arguments is < 4, arguments[3] is set to "" (empty string)
PartitionID: partitionID,
}
// Rule is a rule to map column
// TODO: we will do it later, if we need to implement a real column mapping, we need table structure of source and target system
type Rule struct {
PatternSchema string `yaml:"schema-pattern" json:"schema-pattern" toml:"schema-pattern"`
PatternTable string `yaml:"table-pattern" json:"table-pattern" toml:"table-pattern"`
SourceColumn string `yaml:"source-column" json:"source-column" toml:"source-column"` // modify, add refer column, ignore
TargetColumn string `yaml:"target-column" json:"target-column" toml:"target-column"` // add column, modify
Expression Expr `yaml:"expression" json:"expression" toml:"expression"`
Arguments []string `yaml:"arguments" json:"arguments" toml:"arguments"`
CreateTableQuery string `yaml:"create-table-query" json:"create-table-query" toml:"create-table-query"`
}
// ToLower covert schema/table parttern to lower case
func (r *Rule) ToLower() {
r.PatternSchema = strings.ToLower(r.PatternSchema)
r.PatternTable = strings.ToLower(r.PatternTable)
}
// Valid checks validity of rule.
// add prefix/suffix: it should have target column and one argument
// partition id: it should have 3 to 4 arguments
func (r *Rule) Valid() error {
if _, ok := Exprs[r.Expression]; !ok {
return errors.NotFoundf("expression %s", r.Expression)
}
if r.TargetColumn == "" {
return errors.NotValidf("rule need to be applied a target column")
}
if r.Expression == AddPrefix || r.Expression == AddSuffix {
if len(r.Arguments) != 1 {
return errors.NotValidf("arguments %v for add prefix/suffix", r.Arguments)
}
}
if r.Expression == PartitionID {
switch len(r.Arguments) {
case 3, 4:
return nil
default:
return errors.NotValidf("arguments %v for patition id", r.Arguments)
}
}
return nil
}
// Adjust normalizes the rule into an easier-to-process form, e.g. filling in
// optional arguments with the default values.
func (r *Rule) Adjust() {
if r.Expression == PartitionID && len(r.Arguments) == 3 {
r.Arguments = append(r.Arguments, "")
}
}
// check source and target position
func (r *Rule) adjustColumnPosition(source, target int) (src int, targ int, err error) {
// if not found target, ignore it
if target == -1 {
return source, target, errors.NotFoundf("target column %s", r.TargetColumn)
}
return source, target, nil
}
type mappingInfo struct {
ignore bool
sourcePosition int
targetPosition int
rule *Rule
instanceID int64
schemaID int64
tableID int64
}
// Mapping maps column to something by rules
type Mapping struct {
selector.Selector
caseSensitive bool
cache struct {
sync.RWMutex
infos map[string]*mappingInfo
}
}
// NewMapping returns a column mapping
func NewMapping(caseSensitive bool, rules []*Rule) (*Mapping, error) {
m := &Mapping{
Selector: selector.NewTrieSelector(),
caseSensitive: caseSensitive,
}
m.resetCache()
for _, rule := range rules {
if err := m.AddRule(rule); err != nil {
return nil, errors.Annotatef(err, "initial rule %+v in mapping", rule)
}
}
return m, nil
}
func (m *Mapping) addOrUpdateRule(rule *Rule, isUpdate bool) error {
if m == nil || rule == nil {
return nil
}
err := rule.Valid()
if err != nil {
return errors.Trace(err)
}
if !m.caseSensitive {
rule.ToLower()
}
rule.Adjust()
m.resetCache()
if isUpdate {
err = m.Insert(rule.PatternSchema, rule.PatternTable, rule, selector.Replace)
} else {
err = m.Insert(rule.PatternSchema, rule.PatternTable, rule, selector.Insert)
}
if err != nil {
var method string
if isUpdate {
method = "update"
} else {
method = "add"
}
return errors.Annotatef(err, "%s rule %+v into mapping", method, rule)
}
return nil
}
// AddRule adds a rule into mapping
func (m *Mapping) AddRule(rule *Rule) error {
return m.addOrUpdateRule(rule, false)
}
// UpdateRule updates mapping rule
func (m *Mapping) UpdateRule(rule *Rule) error {
return m.addOrUpdateRule(rule, true)
}
// RemoveRule removes a rule from mapping
func (m *Mapping) RemoveRule(rule *Rule) error {
if m == nil || rule == nil {
return nil
}
if !m.caseSensitive {
rule.ToLower()
}
m.resetCache()
err := m.Remove(rule.PatternSchema, rule.PatternTable)
if err != nil {
return errors.Annotatef(err, "remove rule %+v from mapping", rule)
}
return nil
}
// HandleRowValue handles row value
func (m *Mapping) HandleRowValue(schema, table string, columns []string, vals []interface{}) ([]interface{}, []int, error) {
if m == nil {
return vals, nil, nil
}
schemaL, tableL := schema, table
if !m.caseSensitive {
schemaL, tableL = strings.ToLower(schema), strings.ToLower(table)
}
info, err := m.queryColumnInfo(schemaL, tableL, columns)
if err != nil {
return nil, nil, errors.Trace(err)
}
if info.ignore {
return vals, nil, nil
}
exp, ok := Exprs[info.rule.Expression]
if !ok {
return nil, nil, errors.NotFoundf("column mapping expression %s", info.rule.Expression)
}
vals, err = exp(info, vals)
if err != nil {
return nil, nil, errors.Trace(err)
}
return vals, []int{info.sourcePosition, info.targetPosition}, nil
}
// HandleDDL handles ddl
func (m *Mapping) HandleDDL(schema, table string, columns []string, statement string) (string, []int, error) {
if m == nil {
return statement, nil, nil
}
schemaL, tableL := schema, table
if !m.caseSensitive {
schemaL, tableL = strings.ToLower(schema), strings.ToLower(table)
}
info, err := m.queryColumnInfo(schemaL, tableL, columns)
if err != nil {
return statement, nil, errors.Trace(err)
}
if info.ignore {
return statement, nil, nil
}
m.resetCache()
// only output erro now, wait fix it manually
return statement, nil, errors.Errorf("ddl %s @ column mapping rule %s/%s:%+v not implemented", statement, schema, table, info.rule)
}
func (m *Mapping) queryColumnInfo(schema, table string, columns []string) (*mappingInfo, error) {
m.cache.RLock()
ci, ok := m.cache.infos[tableName(schema, table)]
m.cache.RUnlock()
if ok {
return ci, nil
}
var info = &mappingInfo{
ignore: true,
}
rules := m.Match(schema, table)
if len(rules) == 0 {
m.cache.Lock()
m.cache.infos[tableName(schema, table)] = info
m.cache.Unlock()
return info, nil
}
var (
schemaRules []*Rule
tableRules = make([]*Rule, 0, 1)
)
// classify rules into schema level rules and table level
// table level rules have highest priority
for i := range rules {
rule, ok := rules[i].(*Rule)
if !ok {
return nil, errors.NotValidf("column mapping rule %+v", rules[i])
}
if len(rule.PatternTable) == 0 {
schemaRules = append(schemaRules, rule)
} else {
tableRules = append(tableRules, rule)
}
}
// only support one expression for one table now, refine it later
var rule *Rule
if len(table) == 0 || len(tableRules) == 0 {
if len(schemaRules) != 1 {
return nil, errors.NotSupportedf("`%s`.`%s` matches %d schema column mapping rules which should be one. It's", schema, table, len(schemaRules))
}
rule = schemaRules[0]
} else {
if len(tableRules) != 1 {
return nil, errors.NotSupportedf("`%s`.`%s` matches %d table column mapping rules which should be one. It's", schema, table, len(tableRules))
}
rule = tableRules[0]
}
if rule == nil {
m.cache.Lock()
m.cache.infos[tableName(schema, table)] = info
m.cache.Unlock()
return info, nil
}
// compute source and target column position
sourcePosition := findColumnPosition(columns, rule.SourceColumn)
targetPosition := findColumnPosition(columns, rule.TargetColumn)
sourcePosition, targetPosition, err := rule.adjustColumnPosition(sourcePosition, targetPosition)
if err != nil {
return nil, errors.Trace(err)
}
info = &mappingInfo{
sourcePosition: sourcePosition,
targetPosition: targetPosition,
rule: rule,
}
// if expr is partition ID, compute schema and table ID
if rule.Expression == PartitionID {
info.instanceID, info.schemaID, info.tableID, err = computePartitionID(schema, table, rule)
if err != nil {
return nil, errors.Trace(err)
}
}
m.cache.Lock()
m.cache.infos[tableName(schema, table)] = info
m.cache.Unlock()
return info, nil
}
func (m *Mapping) resetCache() {
m.cache.Lock()
m.cache.infos = make(map[string]*mappingInfo)
m.cache.Unlock()
}
func findColumnPosition(cols []string, col string) int {
for i := range cols {
if cols[i] == col {
return i
}
}
return -1
}
func tableName(schema, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}
func addPrefix(info *mappingInfo, vals []interface{}) ([]interface{}, error) {
prefix := info.rule.Arguments[0]
originStr, ok := vals[info.targetPosition].(string)
if !ok {
return nil, errors.NotValidf("column %d value is not string, but %v, which is", info.targetPosition, vals[info.targetPosition])
}
// fast to concatenated string
rawByte := make([]byte, 0, len(prefix)+len(originStr))
rawByte = append(rawByte, prefix...)
rawByte = append(rawByte, originStr...)
vals[info.targetPosition] = string(rawByte)
return vals, nil
}
func addSuffix(info *mappingInfo, vals []interface{}) ([]interface{}, error) {
suffix := info.rule.Arguments[0]
originStr, ok := vals[info.targetPosition].(string)
if !ok {
return nil, errors.NotValidf("column %d value is not string, but %v, which is", info.targetPosition, vals[info.targetPosition])
}
rawByte := make([]byte, 0, len(suffix)+len(originStr))
rawByte = append(rawByte, originStr...)
rawByte = append(rawByte, suffix...)
vals[info.targetPosition] = string(rawByte)
return vals, nil
}
func partitionID(info *mappingInfo, vals []interface{}) ([]interface{}, error) {
// only int64 now
var (
originID int64
err error
isChars bool
)
switch rawID := vals[info.targetPosition].(type) {
case int:
originID = int64(rawID)
case int8:
originID = int64(rawID)
case int32:
originID = int64(rawID)
case int64:
originID = rawID
case uint:
originID = int64(rawID)
case uint16:
originID = int64(rawID)
case uint32:
originID = int64(rawID)
case uint64:
originID = int64(rawID)
case string:
originID, err = strconv.ParseInt(rawID, 10, 64)
if err != nil {
return nil, errors.NotValidf("column %d value is not int, but %v, which is", info.targetPosition, vals[info.targetPosition])
}
isChars = true
default:
return nil, errors.NotValidf("type %T(%v)", vals[info.targetPosition], vals[info.targetPosition])
}
if originID >= maxOriginID || originID < 0 {
return nil, errors.NotValidf("id must less than %d, greater than or equal to 0, but got %d, which is", maxOriginID, originID)
}
originID = info.instanceID | info.schemaID | info.tableID | originID
if isChars {
vals[info.targetPosition] = strconv.FormatInt(originID, 10)
} else {
vals[info.targetPosition] = originID
}
return vals, nil
}
func computePartitionID(schema, table string, rule *Rule) (instanceID int64, schemaID int64, tableID int64, err error) {
shiftCnt := uint(63)
if instanceIDBitSize > 0 && len(rule.Arguments[0]) > 0 {
var instanceIDUnsign uint64
shiftCnt = shiftCnt - uint(instanceIDBitSize)
instanceIDUnsign, err = strconv.ParseUint(rule.Arguments[0], 10, instanceIDBitSize)
if err != nil {
return
}
instanceID = int64(instanceIDUnsign << shiftCnt)
}
sep := rule.Arguments[3]
if schemaIDBitSize > 0 && len(rule.Arguments[1]) > 0 {
shiftCnt = shiftCnt - uint(schemaIDBitSize)
schemaID, err = computeID(schema, rule.Arguments[1], sep, schemaIDBitSize, shiftCnt)
if err != nil {
return
}
}
if tableIDBitSize > 0 && len(rule.Arguments[2]) > 0 {
shiftCnt = shiftCnt - uint(tableIDBitSize)
tableID, err = computeID(table, rule.Arguments[2], sep, tableIDBitSize, shiftCnt)
}
return
}
func computeID(name string, prefix, sep string, bitSize int, shiftCount uint) (int64, error) {
if name == prefix {
return 0, nil
}
prefix += sep
if len(prefix) >= len(name) || prefix != name[:len(prefix)] {
return 0, errors.NotValidf("%s is not the prefix of %s", prefix, name)
}
idStr := name[len(prefix):]
id, err := strconv.ParseUint(idStr, 10, bitSize)
if err != nil {
return 0, errors.NotValidf("the suffix of %s can't be converted to int64", idStr)
}
return int64(id << shiftCount), nil
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦