tidb job 源码

  • 2022-09-19
  • 浏览 (493)

tidb job 代码

文件路径:/cmd/importer/job.go

// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"database/sql"
	"fmt"
	"time"

	"github.com/pingcap/log"
	"go.uber.org/zap"
)

func addJobs(jobCount int, jobChan chan struct{}) {
	for i := 0; i < jobCount; i++ {
		jobChan <- struct{}{}
	}

	close(jobChan)
}

func doInsert(table *table, db *sql.DB, count int) {
	sqls, err := genRowDatas(table, count)
	if err != nil {
		log.Fatal("generate data failed", zap.Error(err))
	}

	txn, err := db.Begin()
	if err != nil {
		log.Fatal("begin failed", zap.Error(err))
	}

	for _, sql := range sqls {
		_, err = txn.Exec(sql)
		if err != nil {
			log.Fatal("exec failed", zap.Error(err))
		}
	}

	err = txn.Commit()
	if err != nil {
		log.Fatal("commit failed", zap.Error(err))
	}
}

func doJob(table *table, db *sql.DB, batch int, jobChan chan struct{}, doneChan chan struct{}) {
	count := 0
	for range jobChan {
		count++
		if count == batch {
			doInsert(table, db, count)
			count = 0
		}
	}

	if count > 0 {
		doInsert(table, db, count)
	}

	doneChan <- struct{}{}
}

func doWait(doneChan chan struct{}, start time.Time, jobCount int, workerCount int) {
	for i := 0; i < workerCount; i++ {
		<-doneChan
	}

	close(doneChan)

	now := time.Now()
	seconds := now.Unix() - start.Unix()

	tps := int64(-1)
	if seconds > 0 {
		tps = int64(jobCount) / seconds
	}

	fmt.Printf("[importer]total %d cases, cost %d seconds, tps %d, start %s, now %s\n", jobCount, seconds, tps, start, now)
}

func doProcess(table *table, dbs []*sql.DB, jobCount int, workerCount int, batch int) {
	jobChan := make(chan struct{}, 16*workerCount)
	doneChan := make(chan struct{}, workerCount)

	start := time.Now()
	go addJobs(jobCount, jobChan)

	for _, col := range table.columns {
		if col.incremental {
			workerCount = 1
			break
		}
	}
	for i := 0; i < workerCount; i++ {
		go doJob(table, dbs[i], batch, jobChan, doneChan)
	}

	doWait(doneChan, start, jobCount, workerCount)
}

相关信息

tidb 源码目录

相关文章

tidb config 源码

tidb data 源码

tidb db 源码

tidb main 源码

tidb parser 源码

tidb rand 源码

tidb stats 源码

0  赞