tidb table_lock 源码
tidb table_lock 代码
文件路径:/ddl/table_lock.go
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/dbterror"
)
func onLockTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
arg := &LockTablesArg{}
if err := job.DecodeArgs(arg); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Unlock table first.
if arg.IndexOfUnlock < len(arg.UnlockTables) {
return unlockTables(d, t, job, arg)
}
// Check table locked by other, this can be only checked at the first time.
if arg.IndexOfLock == 0 {
for i, tl := range arg.LockTables {
job.SchemaID = tl.SchemaID
job.TableID = tl.TableID
tbInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, err
}
err = checkTableLocked(tbInfo, arg.LockTables[i].Tp, arg.SessionInfo)
if err != nil {
// If any request table was locked by other session, just cancel this job.
// No need to rolling back the unlocked tables, MySQL will release the lock first
// and block if the request table was locked by other.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}
}
// Lock tables.
if arg.IndexOfLock < len(arg.LockTables) {
job.SchemaID = arg.LockTables[arg.IndexOfLock].SchemaID
job.TableID = arg.LockTables[arg.IndexOfLock].TableID
var tbInfo *model.TableInfo
tbInfo, err = GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, err
}
err = lockTable(tbInfo, arg.IndexOfLock, arg)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
switch tbInfo.Lock.State {
case model.TableLockStateNone:
// none -> pre_lock
tbInfo.Lock.State = model.TableLockStatePreLock
tbInfo.Lock.TS = t.StartTS
ver, err = updateVersionAndTableInfo(d, t, job, tbInfo, true)
// If the state of the lock is public, it means the lock is a read lock and already locked by other session,
// so this request of lock table doesn't need pre-lock state, just update the TS and table info is ok.
case model.TableLockStatePreLock, model.TableLockStatePublic:
tbInfo.Lock.State = model.TableLockStatePublic
tbInfo.Lock.TS = t.StartTS
ver, err = updateVersionAndTableInfo(d, t, job, tbInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
arg.IndexOfLock++
job.Args = []interface{}{arg}
if arg.IndexOfLock == len(arg.LockTables) {
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, nil)
}
default:
job.State = model.JobStateCancelled
return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("table lock", tbInfo.Lock.State)
}
}
return ver, err
}
// findSessionInfoIndex gets the index of sessionInfo in the sessions. return -1 if sessions doesn't contain the sessionInfo.
func findSessionInfoIndex(sessions []model.SessionInfo, sessionInfo model.SessionInfo) int {
for i := range sessions {
if sessions[i].ServerID == sessionInfo.ServerID && sessions[i].SessionID == sessionInfo.SessionID {
return i
}
}
return -1
}
// lockTable uses to check table locked and acquire the table lock for the request session.
func lockTable(tbInfo *model.TableInfo, idx int, arg *LockTablesArg) error {
if !tbInfo.IsLocked() {
tbInfo.Lock = &model.TableLockInfo{
Tp: arg.LockTables[idx].Tp,
}
tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo)
return nil
}
// If the state of the lock is in pre-lock, then the lock must be locked by the current request. So we can just return here.
// Because the lock/unlock job must be serial execution in DDL owner now.
if tbInfo.Lock.State == model.TableLockStatePreLock {
return nil
}
if (tbInfo.Lock.Tp == model.TableLockRead && arg.LockTables[idx].Tp == model.TableLockRead) ||
(tbInfo.Lock.Tp == model.TableLockReadOnly && arg.LockTables[idx].Tp == model.TableLockReadOnly) {
sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo)
// repeat lock.
if sessionIndex >= 0 {
return nil
}
tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo)
return nil
}
// Unlock tables should execute before lock tables.
// Normally execute to here is impossible.
return infoschema.ErrTableLocked.GenWithStackByArgs(tbInfo.Name.L, tbInfo.Lock.Tp, tbInfo.Lock.Sessions[0])
}
// checkTableLocked uses to check whether table was locked.
func checkTableLocked(tbInfo *model.TableInfo, lockTp model.TableLockType, sessionInfo model.SessionInfo) error {
if !tbInfo.IsLocked() {
return nil
}
if tbInfo.Lock.State == model.TableLockStatePreLock {
return nil
}
if (tbInfo.Lock.Tp == model.TableLockRead && lockTp == model.TableLockRead) ||
(tbInfo.Lock.Tp == model.TableLockReadOnly && lockTp == model.TableLockReadOnly) {
return nil
}
sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, sessionInfo)
// If the request session already locked the table before, In other words, repeat lock.
if sessionIndex >= 0 {
if tbInfo.Lock.Tp == lockTp {
return nil
}
// If no other session locked this table, and it is not a read only lock (session unrelated).
if len(tbInfo.Lock.Sessions) == 1 && tbInfo.Lock.Tp != model.TableLockReadOnly {
return nil
}
}
return infoschema.ErrTableLocked.GenWithStackByArgs(tbInfo.Name.L, tbInfo.Lock.Tp, tbInfo.Lock.Sessions[0])
}
// unlockTables uses unlock a batch of table lock one by one.
func unlockTables(d *ddlCtx, t *meta.Meta, job *model.Job, arg *LockTablesArg) (ver int64, err error) {
if arg.IndexOfUnlock >= len(arg.UnlockTables) {
return ver, nil
}
job.SchemaID = arg.UnlockTables[arg.IndexOfUnlock].SchemaID
job.TableID = arg.UnlockTables[arg.IndexOfUnlock].TableID
tbInfo, err := getTableInfo(t, job.TableID, job.SchemaID)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
// The table maybe has been dropped. just ignore this err and go on.
arg.IndexOfUnlock++
job.Args = []interface{}{arg}
return ver, nil
}
return ver, err
}
needUpdateTableInfo := unlockTable(tbInfo, arg)
if needUpdateTableInfo {
ver, err = updateVersionAndTableInfo(d, t, job, tbInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
}
arg.IndexOfUnlock++
job.Args = []interface{}{arg}
return ver, nil
}
// unlockTable uses to unlock table lock that hold by the session.
func unlockTable(tbInfo *model.TableInfo, arg *LockTablesArg) (needUpdateTableInfo bool) {
if !tbInfo.IsLocked() {
return false
}
if arg.IsCleanup {
tbInfo.Lock = nil
return true
}
sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo)
if sessionIndex < 0 {
// When session clean table lock, session maybe send unlock table even the table lock maybe not hold by the session.
// so just ignore and return here.
return false
}
oldSessionInfo := tbInfo.Lock.Sessions
tbInfo.Lock.Sessions = oldSessionInfo[:sessionIndex]
tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, oldSessionInfo[sessionIndex+1:]...)
if len(tbInfo.Lock.Sessions) == 0 {
tbInfo.Lock = nil
}
return true
}
func onUnlockTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
arg := &LockTablesArg{}
if err := job.DecodeArgs(arg); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
ver, err = unlockTables(d, t, job, arg)
if arg.IndexOfUnlock == len(arg.UnlockTables) {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, nil)
}
return ver, err
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦