tidb writer_util 源码
tidb writer_util 代码
文件路径:/dumpling/export/writer_util.go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package export
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/dumpling/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
const lengthLimit = 1048576
var pool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
}}
type writerPipe struct {
input chan *bytes.Buffer
closed chan struct{}
errCh chan error
metrics *metrics
labels prometheus.Labels
finishedFileSize uint64
currentFileSize uint64
currentStatementSize uint64
fileSizeLimit uint64
statementSizeLimit uint64
w storage.ExternalFileWriter
}
func newWriterPipe(
w storage.ExternalFileWriter,
fileSizeLimit,
statementSizeLimit uint64,
metrics *metrics,
labels prometheus.Labels,
) *writerPipe {
return &writerPipe{
input: make(chan *bytes.Buffer, 8),
closed: make(chan struct{}),
errCh: make(chan error, 1),
w: w,
metrics: metrics,
labels: labels,
currentFileSize: 0,
currentStatementSize: 0,
fileSizeLimit: fileSizeLimit,
statementSizeLimit: statementSizeLimit,
}
}
func (b *writerPipe) Run(tctx *tcontext.Context) {
defer close(b.closed)
var errOccurs bool
receiveChunkTime := time.Now()
for {
select {
case s, ok := <-b.input:
if !ok {
return
}
if errOccurs {
continue
}
ObserveHistogram(b.metrics.receiveWriteChunkTimeHistogram, time.Since(receiveChunkTime).Seconds())
receiveChunkTime = time.Now()
err := writeBytes(tctx, b.w, s.Bytes())
ObserveHistogram(b.metrics.writeTimeHistogram, time.Since(receiveChunkTime).Seconds())
AddGauge(b.metrics.finishedSizeGauge, float64(s.Len()))
b.finishedFileSize += uint64(s.Len())
s.Reset()
pool.Put(s)
if err != nil {
errOccurs = true
b.errCh <- err
}
receiveChunkTime = time.Now()
case <-tctx.Done():
return
}
}
}
func (b *writerPipe) AddFileSize(fileSize uint64) {
b.currentFileSize += fileSize
b.currentStatementSize += fileSize
}
func (b *writerPipe) Error() error {
select {
case err := <-b.errCh:
return err
default:
return nil
}
}
func (b *writerPipe) ShouldSwitchFile() bool {
return b.fileSizeLimit != UnspecifiedSize && b.currentFileSize >= b.fileSizeLimit
}
func (b *writerPipe) ShouldSwitchStatement() bool {
return (b.fileSizeLimit != UnspecifiedSize && b.currentFileSize >= b.fileSizeLimit) ||
(b.statementSizeLimit != UnspecifiedSize && b.currentStatementSize >= b.statementSizeLimit)
}
// WriteMeta writes MetaIR to a storage.ExternalFileWriter
func WriteMeta(tctx *tcontext.Context, meta MetaIR, w storage.ExternalFileWriter) error {
tctx.L().Debug("start dumping meta data", zap.String("target", meta.TargetName()))
specCmtIter := meta.SpecialComments()
for specCmtIter.HasNext() {
if err := write(tctx, w, fmt.Sprintf("%s\n", specCmtIter.Next())); err != nil {
return err
}
}
if err := write(tctx, w, meta.MetaSQL()); err != nil {
return err
}
tctx.L().Debug("finish dumping meta data", zap.String("target", meta.TargetName()))
return nil
}
// WriteInsert writes TableDataIR to a storage.ExternalFileWriter in sql type
func WriteInsert(
pCtx *tcontext.Context,
cfg *Config,
meta TableMeta,
tblIR TableDataIR,
w storage.ExternalFileWriter,
metrics *metrics,
) (n uint64, err error) {
fileRowIter := tblIR.Rows()
if !fileRowIter.HasNext() {
return 0, fileRowIter.Error()
}
bf := pool.Get().(*bytes.Buffer)
if bfCap := bf.Cap(); bfCap < lengthLimit {
bf.Grow(lengthLimit - bfCap)
}
wp := newWriterPipe(w, cfg.FileSize, cfg.StatementSize, metrics, cfg.Labels)
// use context.Background here to make sure writerPipe can deplete all the chunks in pipeline
ctx, cancel := tcontext.Background().WithLogger(pCtx.L()).WithCancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
wp.Run(ctx)
wg.Done()
}()
defer func() {
cancel()
wg.Wait()
}()
specCmtIter := meta.SpecialComments()
for specCmtIter.HasNext() {
bf.WriteString(specCmtIter.Next())
bf.WriteByte('\n')
}
wp.currentFileSize += uint64(bf.Len())
var (
insertStatementPrefix string
row = MakeRowReceiver(meta.ColumnTypes())
counter uint64
lastCounter uint64
escapeBackslash = cfg.EscapeBackslash
)
defer func() {
if err != nil {
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", lastCounter),
zap.Uint64("finished size", wp.finishedFileSize),
log.ShortError(err))
SubGauge(metrics.finishedRowsGauge, float64(lastCounter))
SubGauge(metrics.finishedSizeGauge, float64(wp.finishedFileSize))
} else {
pCtx.L().Debug("finish dumping table(chunk)",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", counter),
zap.Uint64("finished size", wp.finishedFileSize))
summary.CollectSuccessUnit(summary.TotalBytes, 1, wp.finishedFileSize)
summary.CollectSuccessUnit("total rows", 1, counter)
}
}()
selectedField := meta.SelectedField()
// if has generated column
if selectedField != "" && selectedField != "*" {
insertStatementPrefix = fmt.Sprintf("INSERT INTO %s (%s) VALUES\n",
wrapBackTicks(escapeString(meta.TableName())), selectedField)
} else {
insertStatementPrefix = fmt.Sprintf("INSERT INTO %s VALUES\n",
wrapBackTicks(escapeString(meta.TableName())))
}
insertStatementPrefixLen := uint64(len(insertStatementPrefix))
for fileRowIter.HasNext() {
wp.currentStatementSize = 0
bf.WriteString(insertStatementPrefix)
wp.AddFileSize(insertStatementPrefixLen)
for fileRowIter.HasNext() {
lastBfSize := bf.Len()
if selectedField != "" {
if err = fileRowIter.Decode(row); err != nil {
return counter, errors.Trace(err)
}
row.WriteToBuffer(bf, escapeBackslash)
} else {
bf.WriteString("()")
}
counter++
wp.AddFileSize(uint64(bf.Len()-lastBfSize) + 2) // 2 is for ",\n" and ";\n"
failpoint.Inject("ChaosBrokenWriterConn", func(_ failpoint.Value) {
failpoint.Return(0, errors.New("connection is closed"))
})
fileRowIter.Next()
shouldSwitch := wp.ShouldSwitchStatement()
if fileRowIter.HasNext() && !shouldSwitch {
bf.WriteString(",\n")
} else {
bf.WriteString(";\n")
}
if bf.Len() >= lengthLimit {
select {
case <-pCtx.Done():
return counter, pCtx.Err()
case err = <-wp.errCh:
return counter, err
case wp.input <- bf:
bf = pool.Get().(*bytes.Buffer)
if bfCap := bf.Cap(); bfCap < lengthLimit {
bf.Grow(lengthLimit - bfCap)
}
AddGauge(metrics.finishedRowsGauge, float64(counter-lastCounter))
lastCounter = counter
}
}
if shouldSwitch {
break
}
}
if wp.ShouldSwitchFile() {
break
}
}
if bf.Len() > 0 {
wp.input <- bf
}
close(wp.input)
<-wp.closed
AddGauge(metrics.finishedRowsGauge, float64(counter-lastCounter))
lastCounter = counter
if err = fileRowIter.Error(); err != nil {
return counter, errors.Trace(err)
}
return counter, wp.Error()
}
// WriteInsertInCsv writes TableDataIR to a storage.ExternalFileWriter in csv type
func WriteInsertInCsv(
pCtx *tcontext.Context,
cfg *Config,
meta TableMeta,
tblIR TableDataIR,
w storage.ExternalFileWriter,
metrics *metrics,
) (n uint64, err error) {
fileRowIter := tblIR.Rows()
if !fileRowIter.HasNext() {
return 0, fileRowIter.Error()
}
bf := pool.Get().(*bytes.Buffer)
if bfCap := bf.Cap(); bfCap < lengthLimit {
bf.Grow(lengthLimit - bfCap)
}
wp := newWriterPipe(w, cfg.FileSize, UnspecifiedSize, metrics, cfg.Labels)
opt := &csvOption{
nullValue: cfg.CsvNullValue,
separator: []byte(cfg.CsvSeparator),
delimiter: []byte(cfg.CsvDelimiter),
}
// use context.Background here to make sure writerPipe can deplete all the chunks in pipeline
ctx, cancel := tcontext.Background().WithLogger(pCtx.L()).WithCancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
wp.Run(ctx)
wg.Done()
}()
defer func() {
cancel()
wg.Wait()
}()
var (
row = MakeRowReceiver(meta.ColumnTypes())
counter uint64
lastCounter uint64
escapeBackslash = cfg.EscapeBackslash
selectedFields = meta.SelectedField()
)
defer func() {
if err != nil {
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", lastCounter),
zap.Uint64("finished size", wp.finishedFileSize),
log.ShortError(err))
SubGauge(metrics.finishedRowsGauge, float64(lastCounter))
SubGauge(metrics.finishedSizeGauge, float64(wp.finishedFileSize))
} else {
pCtx.L().Debug("finish dumping table(chunk)",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", counter),
zap.Uint64("finished size", wp.finishedFileSize))
summary.CollectSuccessUnit(summary.TotalBytes, 1, wp.finishedFileSize)
summary.CollectSuccessUnit("total rows", 1, counter)
}
}()
if !cfg.NoHeader && len(meta.ColumnNames()) != 0 && selectedFields != "" {
for i, col := range meta.ColumnNames() {
bf.Write(opt.delimiter)
escapeCSV([]byte(col), bf, escapeBackslash, opt)
bf.Write(opt.delimiter)
if i != len(meta.ColumnTypes())-1 {
bf.Write(opt.separator)
}
}
bf.WriteByte('\r')
bf.WriteByte('\n')
}
wp.currentFileSize += uint64(bf.Len())
for fileRowIter.HasNext() {
lastBfSize := bf.Len()
if selectedFields != "" {
if err = fileRowIter.Decode(row); err != nil {
return counter, errors.Trace(err)
}
row.WriteToBufferInCsv(bf, escapeBackslash, opt)
}
counter++
wp.currentFileSize += uint64(bf.Len()-lastBfSize) + 1 // 1 is for "\n"
bf.WriteByte('\r')
bf.WriteByte('\n')
if bf.Len() >= lengthLimit {
select {
case <-pCtx.Done():
return counter, pCtx.Err()
case err = <-wp.errCh:
return counter, err
case wp.input <- bf:
bf = pool.Get().(*bytes.Buffer)
if bfCap := bf.Cap(); bfCap < lengthLimit {
bf.Grow(lengthLimit - bfCap)
}
AddGauge(metrics.finishedRowsGauge, float64(counter-lastCounter))
lastCounter = counter
}
}
fileRowIter.Next()
if wp.ShouldSwitchFile() {
break
}
}
if bf.Len() > 0 {
wp.input <- bf
}
close(wp.input)
<-wp.closed
AddGauge(metrics.finishedRowsGauge, float64(counter-lastCounter))
lastCounter = counter
if err = fileRowIter.Error(); err != nil {
return counter, errors.Trace(err)
}
return counter, wp.Error()
}
func write(tctx *tcontext.Context, writer storage.ExternalFileWriter, str string) error {
_, err := writer.Write(tctx, []byte(str))
if err != nil {
// str might be very long, only output the first 200 chars
outputLength := len(str)
if outputLength >= 200 {
outputLength = 200
}
tctx.L().Warn("fail to write",
zap.String("heading 200 characters", str[:outputLength]),
zap.Error(err))
}
return errors.Trace(err)
}
func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []byte) error {
_, err := writer.Write(tctx, p)
if err != nil {
// str might be very long, only output the first 200 chars
outputLength := len(p)
if outputLength >= 200 {
outputLength = 200
}
tctx.L().Warn("fail to write",
zap.ByteString("heading 200 characters", p[:outputLength]),
zap.Error(err))
if strings.Contains(err.Error(), "Part number must be an integer between 1 and 10000") {
err = errors.Annotate(err, "workaround: dump file exceeding 50GB, please specify -F=256MB -r=200000 to avoid this problem")
}
}
return errors.Trace(err)
}
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context), error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName)
if err != nil {
tctx.L().Warn("fail to open file",
zap.String("path", fullPath),
zap.Error(err))
return nil, nil, errors.Trace(err)
}
tctx.L().Debug("opened file", zap.String("path", fullPath))
tearDownRoutine := func(ctx context.Context) {
err := writer.Close(ctx)
if err == nil {
return
}
err = errors.Trace(err)
tctx.L().Warn("fail to close file",
zap.String("path", fullPath),
zap.Error(err))
}
return writer, tearDownRoutine, nil
}
func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(context.Context)) {
fileName += compressFileSuffix(compressType)
var writer storage.ExternalFileWriter
fullPath := s.URI() + "/" + fileName
fileWriter := &InterceptFileWriter{}
initRoutine := func() error {
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
// which will cause a context canceled error when closing gcs's Writer
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName)
if err != nil {
pCtx.L().Warn("fail to open file",
zap.String("path", fullPath),
zap.Error(err))
return newWriterError(err)
}
writer = w
pCtx.L().Debug("opened file", zap.String("path", fullPath))
fileWriter.ExternalFileWriter = writer
return nil
}
fileWriter.initRoutine = initRoutine
tearDownRoutine := func(ctx context.Context) {
if writer == nil {
return
}
pCtx.L().Debug("tear down lazy file writer...", zap.String("path", fullPath))
err := writer.Close(ctx)
if err != nil {
pCtx.L().Warn("fail to close file",
zap.String("path", fullPath),
zap.Error(err))
}
}
return fileWriter, tearDownRoutine
}
// LazyStringWriter is an interceptor of io.StringWriter,
// will lazily create file the first time StringWriter need to write something.
type LazyStringWriter struct {
initRoutine func() error
sync.Once
io.StringWriter
err error
}
// WriteString implements io.StringWriter. It check whether writer has written something and init a file at first time
func (l *LazyStringWriter) WriteString(str string) (int, error) {
l.Do(func() { l.err = l.initRoutine() })
if l.err != nil {
return 0, errors.Errorf("open file error: %s", l.err.Error())
}
return l.StringWriter.WriteString(str)
}
type writerError struct {
error
}
func (e *writerError) Error() string {
return e.error.Error()
}
func newWriterError(err error) error {
if err == nil {
return nil
}
return &writerError{error: err}
}
// InterceptFileWriter is an interceptor of os.File,
// tracking whether a StringWriter has written something.
type InterceptFileWriter struct {
storage.ExternalFileWriter
sync.Once
SomethingIsWritten bool
initRoutine func() error
err error
}
// Write implements storage.ExternalFileWriter.Write. It check whether writer has written something and init a file at first time
func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error) {
w.Do(func() { w.err = w.initRoutine() })
if len(p) > 0 {
w.SomethingIsWritten = true
}
if w.err != nil {
return 0, errors.Annotate(w.err, "open file error")
}
n, err := w.ExternalFileWriter.Write(ctx, p)
return n, newWriterError(err)
}
// Close closes the InterceptFileWriter
func (w *InterceptFileWriter) Close(ctx context.Context) error {
return w.ExternalFileWriter.Close(ctx)
}
func wrapBackTicks(identifier string) string {
if !strings.HasPrefix(identifier, "`") && !strings.HasSuffix(identifier, "`") {
return wrapStringWith(identifier, "`")
}
return identifier
}
func wrapStringWith(str string, wrapper string) string {
return fmt.Sprintf("%s%s%s", wrapper, str, wrapper)
}
func compressFileSuffix(compressType storage.CompressType) string {
switch compressType {
case storage.NoCompression:
return ""
case storage.Gzip:
return ".gz"
default:
return ""
}
}
// FileFormat is the format that output to file. Currently we support SQL text and CSV file format.
type FileFormat int32
const (
// FileFormatUnknown indicates the given file type is unknown
FileFormatUnknown FileFormat = iota
// FileFormatSQLText indicates the given file type is sql type
FileFormatSQLText
// FileFormatCSV indicates the given file type is csv type
FileFormatCSV
)
const (
// FileFormatSQLTextString indicates the string/suffix of sql type file
FileFormatSQLTextString = "sql"
// FileFormatCSVString indicates the string/suffix of csv type file
FileFormatCSVString = "csv"
)
// String implement Stringer.String method.
func (f FileFormat) String() string {
switch f {
case FileFormatSQLText:
return strings.ToUpper(FileFormatSQLTextString)
case FileFormatCSV:
return strings.ToUpper(FileFormatCSVString)
default:
return "unknown"
}
}
// Extension returns the extension for specific format.
//
// text -> "sql"
// csv -> "csv"
func (f FileFormat) Extension() string {
switch f {
case FileFormatSQLText:
return FileFormatSQLTextString
case FileFormatCSV:
return FileFormatCSVString
default:
return "unknown_format"
}
}
// WriteInsert writes TableDataIR to a storage.ExternalFileWriter in sql/csv type
func (f FileFormat) WriteInsert(
pCtx *tcontext.Context,
cfg *Config,
meta TableMeta,
tblIR TableDataIR,
w storage.ExternalFileWriter,
metrics *metrics,
) (uint64, error) {
switch f {
case FileFormatSQLText:
return WriteInsert(pCtx, cfg, meta, tblIR, w, metrics)
case FileFormatCSV:
return WriteInsertInCsv(pCtx, cfg, meta, tblIR, w, metrics)
default:
return 0, errors.Errorf("unknown file format")
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦