tidb tikv_driver 源码
tidb tikv_driver 代码
文件路径:/store/driver/tikv_driver.go
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package driver
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net/url"
"strings"
"sync"
"time"
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/copr"
derr "github.com/pingcap/tidb/store/driver/error"
txn_driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
type storeCache struct {
sync.Mutex
cache map[string]*tikvStore
}
var mc storeCache
func init() {
mc.cache = make(map[string]*tikvStore)
rand.Seed(time.Now().UnixNano())
}
// Option is a function that changes some config of Driver
type Option func(*TiKVDriver)
// WithSecurity changes the config.Security used by tikv driver.
func WithSecurity(s config.Security) Option {
return func(c *TiKVDriver) {
c.security = s
}
}
// WithTiKVClientConfig changes the config.TiKVClient used by tikv driver.
func WithTiKVClientConfig(client config.TiKVClient) Option {
return func(c *TiKVDriver) {
c.tikvConfig = client
}
}
// WithTxnLocalLatches changes the config.TxnLocalLatches used by tikv driver.
func WithTxnLocalLatches(t config.TxnLocalLatches) Option {
return func(c *TiKVDriver) {
c.txnLocalLatches = t
}
}
// WithPDClientConfig changes the config.PDClient used by tikv driver.
func WithPDClientConfig(client config.PDClient) Option {
return func(c *TiKVDriver) {
c.pdConfig = client
}
}
// TiKVDriver implements engine TiKV.
type TiKVDriver struct {
pdConfig config.PDClient
security config.Security
tikvConfig config.TiKVClient
txnLocalLatches config.TxnLocalLatches
}
// Open opens or creates an TiKV storage with given path using global config.
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false
func (d TiKVDriver) Open(path string) (kv.Storage, error) {
return d.OpenWithOptions(path)
}
func (d *TiKVDriver) setDefaultAndOptions(options ...Option) {
tidbCfg := config.GetGlobalConfig()
d.pdConfig = tidbCfg.PDClient
d.security = tidbCfg.Security
d.tikvConfig = tidbCfg.TiKVClient
d.txnLocalLatches = tidbCfg.TxnLocalLatches
for _, f := range options {
f(d)
}
}
// OpenWithOptions is used by other program that use tidb as a library, to avoid modifying GlobalConfig
// unspecified options will be set to global config
func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage, error) {
mc.Lock()
defer mc.Unlock()
d.setDefaultAndOptions(options...)
etcdAddrs, disableGC, err := config.ParsePath(path)
if err != nil {
return nil, errors.Trace(err)
}
pdCli, err := pd.NewClient(etcdAddrs, pd.SecurityOption{
CAPath: d.security.ClusterSSLCA,
CertPath: d.security.ClusterSSLCert,
KeyPath: d.security.ClusterSSLKey,
},
pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(d.tikvConfig.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(d.tikvConfig.GrpcKeepAliveTimeout) * time.Second,
}),
),
pd.WithCustomTimeoutOption(time.Duration(d.pdConfig.PDServerTimeout)*time.Second),
pd.WithForwardingOption(config.GetGlobalConfig().EnableForwarding))
pdCli = util.InterceptedPDClient{Client: pdCli}
if err != nil {
return nil, errors.Trace(err)
}
// FIXME: uuid will be a very long and ugly string, simplify it.
uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO()))
if store, ok := mc.cache[uuid]; ok {
return store, nil
}
tlsConfig, err := d.security.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
spkv, err := tikv.NewEtcdSafePointKV(etcdAddrs, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}
pdClient := tikv.CodecPDClient{Client: pdCli}
s, err := tikv.NewKVStore(uuid, &pdClient, spkv, tikv.NewRPCClient(tikv.WithSecurity(d.security)))
if err != nil {
return nil, errors.Trace(err)
}
if d.txnLocalLatches.Enabled {
s.EnableTxnLocalLatches(d.txnLocalLatches.Capacity)
}
coprCacheConfig := &config.GetGlobalConfig().TiKVClient.CoprCache
coprStore, err := copr.NewStore(s, coprCacheConfig)
if err != nil {
return nil, errors.Trace(err)
}
store := &tikvStore{
KVStore: s,
etcdAddrs: etcdAddrs,
tlsConfig: tlsConfig,
memCache: kv.NewCacheDB(),
enableGC: !disableGC,
coprStore: coprStore,
}
mc.cache[uuid] = store
return store, nil
}
type tikvStore struct {
*tikv.KVStore
etcdAddrs []string
tlsConfig *tls.Config
memCache kv.MemManager // this is used to query from memory
enableGC bool
gcWorker *gcworker.GCWorker
coprStore *copr.Store
}
// Name gets the name of the storage engine
func (s *tikvStore) Name() string {
return "TiKV"
}
// Describe returns of brief introduction of the storage
func (s *tikvStore) Describe() string {
return "TiKV is a distributed transactional key-value database"
}
var ldflagGetEtcdAddrsFromConfig = "0" // 1:Yes, otherwise:No
const getAllMembersBackoff = 5000
// EtcdAddrs returns etcd server addresses.
func (s *tikvStore) EtcdAddrs() ([]string, error) {
if s.etcdAddrs == nil {
return nil, nil
}
if ldflagGetEtcdAddrsFromConfig == "1" {
// For automated test purpose.
// To manipulate connection to etcd by mandatorily setting path to a proxy.
cfg := config.GetGlobalConfig()
return strings.Split(cfg.Path, ","), nil
}
ctx := context.Background()
bo := tikv.NewBackoffer(ctx, getAllMembersBackoff)
etcdAddrs := make([]string, 0)
pdClient := s.GetPDClient()
if pdClient == nil {
return nil, errors.New("Etcd client not found")
}
for {
members, err := pdClient.GetAllMembers(ctx)
if err != nil {
err := bo.Backoff(tikv.BoRegionMiss(), err)
if err != nil {
return nil, err
}
continue
}
for _, member := range members {
if len(member.ClientUrls) > 0 {
u, err := url.Parse(member.ClientUrls[0])
if err != nil {
logutil.BgLogger().Error("fail to parse client url from pd members", zap.String("client_url", member.ClientUrls[0]), zap.Error(err))
return nil, err
}
etcdAddrs = append(etcdAddrs, u.Host)
}
}
return etcdAddrs, nil
}
}
// TLSConfig returns the tls config to connect to etcd.
func (s *tikvStore) TLSConfig() *tls.Config {
return s.tlsConfig
}
// StartGCWorker starts GC worker, it's called in BootstrapSession, don't call this function more than once.
func (s *tikvStore) StartGCWorker() error {
if !s.enableGC {
return nil
}
gcWorker, err := gcworker.NewGCWorker(s, s.GetPDClient())
if err != nil {
return derr.ToTiDBErr(err)
}
gcWorker.Start()
s.gcWorker = gcWorker
return nil
}
func (s *tikvStore) GetClient() kv.Client {
return s.coprStore.GetClient()
}
func (s *tikvStore) GetMPPClient() kv.MPPClient {
return s.coprStore.GetMPPClient()
}
// Close and unregister the store.
func (s *tikvStore) Close() error {
mc.Lock()
defer mc.Unlock()
delete(mc.cache, s.UUID())
if s.gcWorker != nil {
s.gcWorker.Close()
}
s.coprStore.Close()
err := s.KVStore.Close()
return derr.ToTiDBErr(err)
}
// GetMemCache return memory manager of the storage
func (s *tikvStore) GetMemCache() kv.MemManager {
return s.memCache
}
// Begin a global transaction.
func (s *tikvStore) Begin(opts ...tikv.TxnOption) (kv.Transaction, error) {
txn, err := s.KVStore.Begin(opts...)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
return txn_driver.NewTiKVTxn(txn), err
}
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *tikvStore) GetSnapshot(ver kv.Version) kv.Snapshot {
return txn_driver.NewSnapshot(s.KVStore.GetSnapshot(ver.Ver))
}
// CurrentVersion returns current max committed version with the given txnScope (local or global).
func (s *tikvStore) CurrentVersion(txnScope string) (kv.Version, error) {
ver, err := s.KVStore.CurrentTimestamp(txnScope)
return kv.NewVersion(ver), derr.ToTiDBErr(err)
}
// ShowStatus returns the specified status of the storage
func (s *tikvStore) ShowStatus(ctx context.Context, key string) (interface{}, error) {
return nil, kv.ErrNotImplemented
}
// GetLockWaits get return lock waits info
func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
stores := s.GetRegionCache().GetStoresByType(tikvrpc.TiKV)
//nolint: prealloc
var result []*deadlockpb.WaitForEntry
for _, store := range stores {
resp, err := s.GetTiKVClient().SendRequest(context.TODO(), store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdLockWaitInfo, &kvrpcpb.GetLockWaitInfoRequest{}), time.Second*30)
if err != nil {
logutil.BgLogger().Warn("query lock wait info failed", zap.Error(err))
continue
}
if resp.Resp == nil {
logutil.BgLogger().Warn("lock wait info from store is nil")
continue
}
entries := resp.Resp.(*kvrpcpb.GetLockWaitInfoResponse).Entries
result = append(result, entries...)
}
return result, nil
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦