greenplumn logtape 源码
greenplumn logtape 代码
文件路径:/src/backend/utils/sort/logtape.c
/*-------------------------------------------------------------------------
*
* logtape.c
* Management of "logical tapes" within temporary files.
*
* This module exists to support sorting via multiple merge passes (see
* tuplesort.c). Merging is an ideal algorithm for tape devices, but if
* we implement it on disk by creating a separate file for each "tape",
* there is an annoying problem: the peak space usage is at least twice
* the volume of actual data to be sorted. (This must be so because each
* datum will appear in both the input and output tapes of the final
* merge pass. For seven-tape polyphase merge, which is otherwise a
* pretty good algorithm, peak usage is more like 4x actual data volume.)
*
* We can work around this problem by recognizing that any one tape
* dataset (with the possible exception of the final output) is written
* and read exactly once in a perfectly sequential manner. Therefore,
* a datum once read will not be required again, and we can recycle its
* space for use by the new tape dataset(s) being generated. In this way,
* the total space usage is essentially just the actual data volume, plus
* insignificant bookkeeping and start/stop overhead.
*
* Few OSes allow arbitrary parts of a file to be released back to the OS,
* so we have to implement this space-recycling ourselves within a single
* logical file. logtape.c exists to perform this bookkeeping and provide
* the illusion of N independent tape devices to tuplesort.c. Note that
* logtape.c itself depends on buffile.c to provide a "logical file" of
* larger size than the underlying OS may support.
*
* For simplicity, we allocate and release space in the underlying file
* in BLCKSZ-size blocks. Space allocation boils down to keeping track
* of which blocks in the underlying file belong to which logical tape,
* plus any blocks that are free (recycled and not yet reused).
* The blocks in each logical tape form a chain, with a prev- and next-
* pointer in each block.
*
* The initial write pass is guaranteed to fill the underlying file
* perfectly sequentially, no matter how data is divided into logical tapes.
* Once we begin merge passes, the access pattern becomes considerably
* less predictable --- but the seeking involved should be comparable to
* what would happen if we kept each logical tape in a separate file,
* so there's no serious performance penalty paid to obtain the space
* savings of recycling. We try to localize the write accesses by always
* writing to the lowest-numbered free block when we have a choice; it's
* not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
* policy for free blocks would be better?)
*
* To further make the I/Os more sequential, we can use a larger buffer
* when reading, and read multiple blocks from the same tape in one go,
* whenever the buffer becomes empty.
*
* To support the above policy of writing to the lowest free block, the
* freelist is a min heap.
*
* Since all the bookkeeping and buffer memory is allocated with palloc(),
* and the underlying file(s) are made with OpenTemporaryFile, all resources
* for a logical tape set are certain to be cleaned up even if processing
* is aborted by ereport(ERROR). To avoid confusion, the caller should take
* care that all calls for a single LogicalTapeSet are made in the same
* palloc context.
*
* To support parallel sort operations involving coordinated callers to
* tuplesort.c routines across multiple workers, it is necessary to
* concatenate each worker BufFile/tapeset into one single logical tapeset
* managed by the leader. Workers should have produced one final
* materialized tape (their entire output) when this happens in leader.
* There will always be the same number of runs as input tapes, and the same
* number of input tapes as participants (worker Tuplesortstates).
*
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/utils/sort/logtape.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/buffile.h"
#include "utils/builtins.h"
#include "utils/logtape.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
/*
* A TapeBlockTrailer is stored at the end of each BLCKSZ block.
*
* The first block of a tape has prev == -1. The last block of a tape
* stores the number of valid bytes on the block, inverted, in 'next'
* Therefore next < 0 indicates the last block.
*/
typedef struct TapeBlockTrailer
{
long prev; /* previous block on this tape, or -1 on first
* block */
long next; /* next block on this tape, or # of valid
* bytes on last block (if < 0) */
} TapeBlockTrailer;
#define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
#define TapeBlockGetTrailer(buf) \
((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
#define TapeBlockGetNBytes(buf) \
(TapeBlockIsLast(buf) ? \
(- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
#define TapeBlockSetNBytes(buf, nbytes) \
(TapeBlockGetTrailer(buf)->next = -(nbytes))
/*
* When multiple tapes are being written to concurrently (as in HashAgg),
* avoid excessive fragmentation by preallocating block numbers to individual
* tapes. Each preallocation doubles in size starting at
* TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
*
* No filesystem operations are performed for preallocation; only the block
* numbers are reserved. This may lead to sparse writes, which will cause
* ltsWriteBlock() to fill in holes with zeros.
*/
#define TAPE_WRITE_PREALLOC_MIN 8
#define TAPE_WRITE_PREALLOC_MAX 128
/*
* This data structure represents a single "logical tape" within the set
* of logical tapes stored in the same file.
*
* While writing, we hold the current partially-written data block in the
* buffer. While reading, we can hold multiple blocks in the buffer. Note
* that we don't retain the trailers of a block when it's read into the
* buffer. The buffer therefore contains one large contiguous chunk of data
* from the tape.
*/
typedef struct LogicalTape
{
bool writing; /* T while in write phase */
bool frozen; /* T if blocks should not be freed when read */
bool dirty; /* does buffer need to be written? */
/*
* Block numbers of the first, current, and next block of the tape.
*
* The "current" block number is only valid when writing, or reading from
* a frozen tape. (When reading from an unfrozen tape, we use a larger
* read buffer that holds multiple blocks, so the "current" block is
* ambiguous.)
*
* When concatenation of worker tape BufFiles is performed, an offset to
* the first block in the unified BufFile space is applied during reads.
*/
long firstBlockNumber;
long curBlockNumber;
long nextBlockNumber;
long offsetBlockNumber;
/*
* Buffer for current data block(s).
*/
char *buffer; /* physical buffer (separately palloc'd) */
int buffer_size; /* allocated size of the buffer */
int max_size; /* highest useful, safe buffer_size */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
/*
* Preallocated block numbers are held in an array sorted in descending
* order; blocks are consumed from the end of the array (lowest block
* numbers first).
*/
long *prealloc;
int nprealloc; /* number of elements in list */
int prealloc_size; /* number of elements list can hold */
} LogicalTape;
/*
* This data structure represents a set of related "logical tapes" sharing
* space in a single underlying file. (But that "file" may be multiple files
* if needed to escape OS limits on file size; buffile.c handles that for us.)
* The number of tapes is fixed at creation.
*/
struct LogicalTapeSet
{
BufFile *pfile; /* underlying file for whole tape set */
/*
* File size tracking. nBlocksWritten is the size of the underlying file,
* in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
* by ltsReleaseBlock(), and it is always greater than or equal to
* nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
* blocks that have been allocated for a tape, but have not been written
* to the underlying file yet. nHoleBlocks tracks the total number of
* blocks that are in unused holes between worker spaces following BufFile
* concatenation.
*/
long nBlocksAllocated; /* # of blocks allocated */
long nBlocksWritten; /* # of blocks used in underlying file */
long nHoleBlocks; /* # of "hole" blocks left */
/*
* We store the numbers of recycled-and-available blocks in freeBlocks[].
* When there are no such blocks, we extend the underlying file.
*
* If forgetFreeSpace is true then any freed blocks are simply forgotten
* rather than being remembered in freeBlocks[]. See notes for
* LogicalTapeSetForgetFreeSpace().
*/
bool forgetFreeSpace; /* are we remembering free blocks? */
long *freeBlocks; /* resizable array holding minheap */
long nFreeBlocks; /* # of currently free blocks */
Size freeBlocksLen; /* current allocated length of freeBlocks[] */
/* The array of logical tapes. */
int nTapes; /* # of logical tapes in set */
LogicalTape *tapes; /* has nTapes nentries */
};
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
SharedFileSet *fileset);
static void ltsInitTape(LogicalTape *lt);
static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
char *
LogicalTapeGetBufFilename(const LogicalTapeSet *lts)
{
return lts->pfile ? pstrdup(BufFileGetFilename(lts->pfile)) : NULL;
}
/*
* Write a block-sized buffer to the specified block of the underlying file.
*
* No need for an error return convention; we ereport() on any error.
*/
static void
ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
{
/*
* BufFile does not support "holes", so if we're about to write a block
* that's past the current end of file, fill the space between the current
* end of file and the target block with zeros.
*
* This should happen rarely, otherwise you are not writing very
* sequentially. In current use, this only happens when the sort ends
* writing a run, and switches to another tape. The last block of the
* previous tape isn't flushed to disk until the end of the sort, so you
* get one-block hole, where the last block of the previous tape will
* later go.
*
* Note that BufFile concatenation can leave "holes" in BufFile between
* worker-owned block ranges. These are tracked for reporting purposes
* only. We never read from nor write to these hole blocks, and so they
* are not considered here.
*/
while (blocknum > lts->nBlocksWritten)
{
PGAlignedBlock zerobuf;
MemSet(zerobuf.data, 0, sizeof(zerobuf));
ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
}
/* Write the requested block */
if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek to block %ld of temporary file",
blocknum)));
BufFileWrite(lts->pfile, buffer, BLCKSZ);
/* Update nBlocksWritten, if we extended the file */
if (blocknum == lts->nBlocksWritten)
lts->nBlocksWritten++;
}
/*
* Read a block-sized buffer from the specified block of the underlying file.
*
* No need for an error return convention; we ereport() on any error. This
* module should never attempt to read a block it doesn't know is there.
*/
static void
ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
{
size_t nread;
if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek to block %ld of temporary file",
blocknum)));
nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
if (nread != BLCKSZ)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
blocknum, nread, (size_t) BLCKSZ)));
}
/*
* Read as many blocks as we can into the per-tape buffer.
*
* Returns true if anything was read, 'false' on EOF.
*/
static bool
ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
lt->pos = 0;
lt->nbytes = 0;
do
{
char *thisbuf = lt->buffer + lt->nbytes;
long datablocknum = lt->nextBlockNumber;
/* Fetch next block number */
if (datablocknum == -1L)
break; /* EOF */
/* Apply worker offset, needed for leader tapesets */
datablocknum += lt->offsetBlockNumber;
/* Read the block */
ltsReadBlock(lts, datablocknum, (void *) thisbuf);
if (!lt->frozen)
ltsReleaseBlock(lts, datablocknum);
lt->curBlockNumber = lt->nextBlockNumber;
lt->nbytes += TapeBlockGetNBytes(thisbuf);
if (TapeBlockIsLast(thisbuf))
{
lt->nextBlockNumber = -1L;
/* EOF */
break;
}
else
lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
/* Advance to next block, if we have buffer space left */
} while (lt->buffer_size - lt->nbytes > BLCKSZ);
return (lt->nbytes > 0);
}
static inline void
swap_nodes(long *heap, unsigned long a, unsigned long b)
{
unsigned long swap;
swap = heap[a];
heap[a] = heap[b];
heap[b] = swap;
}
static inline unsigned long
left_offset(unsigned long i)
{
return 2 * i + 1;
}
static inline unsigned long
right_offset(unsigned i)
{
return 2 * i + 2;
}
static inline unsigned long
parent_offset(unsigned long i)
{
return (i - 1) / 2;
}
/*
* Select the lowest currently unused block by taking the first element from
* the freelist min heap.
*/
static long
ltsGetFreeBlock(LogicalTapeSet *lts)
{
long *heap = lts->freeBlocks;
long blocknum;
int heapsize;
unsigned long pos;
/* freelist empty; allocate a new block */
if (lts->nFreeBlocks == 0)
return lts->nBlocksAllocated++;
if (lts->nFreeBlocks == 1)
{
lts->nFreeBlocks--;
return lts->freeBlocks[0];
}
/* take top of minheap */
blocknum = heap[0];
/* replace with end of minheap array */
heap[0] = heap[--lts->nFreeBlocks];
/* sift down */
pos = 0;
heapsize = lts->nFreeBlocks;
while (true)
{
unsigned long left = left_offset(pos);
unsigned long right = right_offset(pos);
unsigned long min_child;
if (left < heapsize && right < heapsize)
min_child = (heap[left] < heap[right]) ? left : right;
else if (left < heapsize)
min_child = left;
else if (right < heapsize)
min_child = right;
else
break;
if (heap[min_child] >= heap[pos])
break;
swap_nodes(heap, min_child, pos);
pos = min_child;
}
return blocknum;
}
/*
* Return the lowest free block number from the tape's preallocation list.
* Refill the preallocation list if necessary.
*/
static long
ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
{
/* sorted in descending order, so return the last element */
if (lt->nprealloc > 0)
return lt->prealloc[--lt->nprealloc];
if (lt->prealloc == NULL)
{
lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN;
lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size);
}
else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
{
/* when the preallocation list runs out, double the size */
lt->prealloc_size *= 2;
if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX)
lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX;
lt->prealloc = (long *) repalloc(lt->prealloc,
sizeof(long) * lt->prealloc_size);
}
/* refill preallocation list */
lt->nprealloc = lt->prealloc_size;
for (int i = lt->nprealloc; i > 0; i--)
{
lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
/* verify descending order */
Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
}
return lt->prealloc[--lt->nprealloc];
}
/*
* Return a block# to the freelist.
*/
static void
ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
{
long *heap;
unsigned long pos;
/*
* Do nothing if we're no longer interested in remembering free space.
*/
if (lts->forgetFreeSpace)
return;
/*
* Enlarge freeBlocks array if full.
*/
if (lts->nFreeBlocks >= lts->freeBlocksLen)
{
/*
* If the freelist becomes very large, just return and leak this free
* block.
*/
if (lts->freeBlocksLen * 2 > MaxAllocSize)
return;
lts->freeBlocksLen *= 2;
lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
lts->freeBlocksLen * sizeof(long));
}
heap = lts->freeBlocks;
pos = lts->nFreeBlocks;
/* place entry at end of minheap array */
heap[pos] = blocknum;
lts->nFreeBlocks++;
/* sift up */
while (pos != 0)
{
unsigned long parent = parent_offset(pos);
if (heap[parent] < heap[pos])
break;
swap_nodes(heap, parent, pos);
pos = parent;
}
}
/*
* Claim ownership of a set of logical tapes from existing shared BufFiles.
*
* Caller should be leader process. Though tapes are marked as frozen in
* workers, they are not frozen when opened within leader, since unfrozen tapes
* use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
* for random access.)
*/
static void
ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
SharedFileSet *fileset)
{
LogicalTape *lt = NULL;
long tapeblocks = 0L;
long nphysicalblocks = 0L;
int i;
/* Should have at least one worker tape, plus leader's tape */
Assert(lts->nTapes >= 2);
/*
* Build concatenated view of all BufFiles, remembering the block number
* where each source file begins. No changes are needed for leader/last
* tape.
*/
for (i = 0; i < lts->nTapes - 1; i++)
{
char filename[MAXPGPATH];
BufFile *file;
int64 filesize;
lt = <s->tapes[i];
pg_itoa(i, filename);
file = BufFileOpenShared(fileset, filename);
filesize = BufFileSize(file);
/*
* Stash first BufFile, and concatenate subsequent BufFiles to that.
* Store block offset into each tape as we go.
*/
lt->firstBlockNumber = shared[i].firstblocknumber;
if (i == 0)
{
lts->pfile = file;
lt->offsetBlockNumber = 0L;
}
else
{
lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
}
/* Don't allocate more for read buffer than could possibly help */
lt->max_size = Min(MaxAllocSize, filesize);
tapeblocks = filesize / BLCKSZ;
nphysicalblocks += tapeblocks;
}
/*
* Set # of allocated blocks, as well as # blocks written. Use extent of
* new BufFile space (from 0 to end of last worker's tape space) for this.
* Allocated/written blocks should include space used by holes left
* between concatenated BufFiles.
*/
lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
lts->nBlocksWritten = lts->nBlocksAllocated;
/*
* Compute number of hole blocks so that we can later work backwards, and
* instrument number of physical blocks. We don't simply use physical
* blocks directly for instrumentation because this would break if we ever
* subsequently wrote to the leader tape.
*
* Working backwards like this keeps our options open. If shared BufFiles
* ever support being written to post-export, logtape.c can automatically
* take advantage of that. We'd then support writing to the leader tape
* while recycling space from worker tapes, because the leader tape has a
* zero offset (write routines won't need to have extra logic to apply an
* offset).
*
* The only thing that currently prevents writing to the leader tape from
* working is the fact that BufFiles opened using BufFileOpenShared() are
* read-only by definition, but that could be changed if it seemed
* worthwhile. For now, writing to the leader tape will raise a "Bad file
* descriptor" error, so tuplesort must avoid writing to the leader tape
* altogether.
*/
lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
}
/*
* Initialize per-tape struct. Note we allocate the I/O buffer lazily.
*/
static void
ltsInitTape(LogicalTape *lt)
{
lt->writing = true;
lt->frozen = false;
lt->dirty = false;
lt->firstBlockNumber = -1L;
lt->curBlockNumber = -1L;
lt->nextBlockNumber = -1L;
lt->offsetBlockNumber = 0L;
lt->buffer = NULL;
lt->buffer_size = 0;
/* palloc() larger than MaxAllocSize would fail */
lt->max_size = MaxAllocSize;
lt->pos = 0;
lt->nbytes = 0;
lt->prealloc = NULL;
lt->nprealloc = 0;
lt->prealloc_size = 0;
}
/*
* Lazily allocate and initialize the read buffer. This avoids waste when many
* tapes are open at once, but not all are active between rewinding and
* reading.
*/
static void
ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
Assert(lt->buffer_size > 0);
lt->buffer = palloc(lt->buffer_size);
/* Read the first block, or reset if tape is empty */
lt->nextBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
ltsReadFillBuffer(lts, lt);
}
/*
* Create a set of logical tapes in a temporary underlying file.
*
* Each tape is initialized in write state. Serial callers pass ntapes,
* NULL argument for shared, and -1 for worker. Parallel worker callers
* pass ntapes, a shared file handle, NULL shared argument, and their own
* worker number. Leader callers, which claim shared worker tapes here,
* must supply non-sentinel values for all arguments except worker number,
* which should be -1.
*
* Leader caller is passing back an array of metadata each worker captured
* when LogicalTapeFreeze() was called for their final result tapes. Passed
* tapes array is actually sized ntapes - 1, because it includes only
* worker tapes, whereas leader requires its own leader tape. Note that we
* rely on the assumption that reclaimed worker tapes will only be read
* from once by leader, and never written to again (tapes are initialized
* for writing, but that's only to be consistent). Leader may not write to
* its own tape purely due to a restriction in the shared buffile
* infrastructure that may be lifted in the future.
*/
LogicalTapeSet *
LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
int worker)
{
LogicalTapeSet *lts;
int i;
/*
* Create top-level struct including per-tape LogicalTape structs.
*/
Assert(ntapes > 0);
lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
lts->nBlocksAllocated = 0L;
lts->nBlocksWritten = 0L;
lts->nHoleBlocks = 0L;
lts->forgetFreeSpace = false;
lts->freeBlocksLen = 32; /* reasonable initial guess */
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
lts->nFreeBlocks = 0;
lts->nTapes = ntapes;
lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
for (i = 0; i < ntapes; i++)
ltsInitTape(<s->tapes[i]);
/*
* Create temp BufFile storage as required.
*
* Leader concatenates worker tapes, which requires special adjustment to
* final tapeset data. Things are simpler for the worker case and the
* serial case, though. They are generally very similar -- workers use a
* shared fileset, whereas serial sorts use a conventional serial BufFile.
*/
if (shared)
ltsConcatWorkerTapes(lts, shared, fileset);
else if (fileset)
{
char filename[MAXPGPATH];
workfile_set *work_set;
pg_itoa(worker, filename);
work_set = workfile_mgr_create_set("LogicalTape", filename, false /* hold pin */);
lts->pfile = BufFileCreateShared(fileset, filename, work_set);
}
else
{
lts->pfile = BufFileCreateTemp("LogicalTape", false);
}
return lts;
}
/*
* Close a logical tape set and release all resources.
*/
void
LogicalTapeSetClose(LogicalTapeSet *lts)
{
LogicalTape *lt;
int i;
BufFileClose(lts->pfile);
for (i = 0; i < lts->nTapes; i++)
{
lt = <s->tapes[i];
if (lt->buffer)
pfree(lt->buffer);
}
pfree(lts->tapes);
pfree(lts->freeBlocks);
pfree(lts);
}
/*
* Mark a logical tape set as not needing management of free space anymore.
*
* This should be called if the caller does not intend to write any more data
* into the tape set, but is reading from un-frozen tapes. Since no more
* writes are planned, remembering free blocks is no longer useful. Setting
* this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
* is not designed to handle large numbers of free blocks.
*/
void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
{
lts->forgetFreeSpace = true;
}
/*
* Write to a logical tape.
*
* There are no error returns; we ereport() on failure.
*/
void
LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size)
{
LogicalTape *lt;
size_t nthistime;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->writing);
Assert(lt->offsetBlockNumber == 0L);
/* Allocate data buffer and first block on first write */
if (lt->buffer == NULL)
{
lt->buffer = (char *) palloc(BLCKSZ);
lt->buffer_size = BLCKSZ;
}
if (lt->curBlockNumber == -1)
{
Assert(lt->firstBlockNumber == -1);
Assert(lt->pos == 0);
lt->curBlockNumber = ltsGetPreallocBlock(lts, lt);
lt->firstBlockNumber = lt->curBlockNumber;
TapeBlockGetTrailer(lt->buffer)->prev = -1L;
}
Assert(lt->buffer_size == BLCKSZ);
while (size > 0)
{
if (lt->pos >= (int) TapeBlockPayloadSize)
{
/* Buffer full, dump it out */
long nextBlockNumber;
if (!lt->dirty)
{
/* Hmm, went directly from reading to writing? */
elog(ERROR, "invalid logtape state: should be dirty");
}
/*
* First allocate the next block, so that we can store it in the
* 'next' pointer of this block.
*/
nextBlockNumber = ltsGetPreallocBlock(lts, lt);
/* set the next-pointer and dump the current block. */
TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
/* initialize the prev-pointer of the next block */
TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
lt->curBlockNumber = nextBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
}
nthistime = TapeBlockPayloadSize - lt->pos;
if (nthistime > size)
nthistime = size;
Assert(nthistime > 0);
memcpy(lt->buffer + lt->pos, ptr, nthistime);
lt->dirty = true;
lt->pos += nthistime;
if (lt->nbytes < lt->pos)
lt->nbytes = lt->pos;
ptr = (void *) ((char *) ptr + nthistime);
size -= nthistime;
}
}
/*
* Rewind logical tape and switch from writing to reading.
*
* The tape must currently be in writing state, or "frozen" in read state.
*
* 'buffer_size' specifies how much memory to use for the read buffer.
* Regardless of the argument, the actual amount of memory used is between
* BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
* rounded down and truncated to fit those constraints, if necessary. If the
* tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
* byte buffer is used.
*/
void
LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
/*
* Round and cap buffer_size if needed.
*/
if (lt->frozen)
buffer_size = BLCKSZ;
else
{
/* need at least one block */
if (buffer_size < BLCKSZ)
buffer_size = BLCKSZ;
/* palloc() larger than max_size is unlikely to be helpful */
if (buffer_size > lt->max_size)
buffer_size = lt->max_size;
/* round down to BLCKSZ boundary */
buffer_size -= buffer_size % BLCKSZ;
}
if (lt->writing)
{
/*
* Completion of a write phase. Flush last partial data block, and
* rewind for normal (destructive) read.
*/
if (lt->dirty)
{
/*
* As long as we've filled the buffer at least once, its contents
* are entirely defined from valgrind's point of view, even though
* contents beyond the current end point may be stale. But it's
* possible - at least in the case of a parallel sort - to sort
* such small amount of data that we do not fill the buffer even
* once. Tell valgrind that its contents are defined, so it
* doesn't bleat.
*/
VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
lt->buffer_size - lt->nbytes);
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
}
lt->writing = false;
}
else
{
/*
* This is only OK if tape is frozen; we rewind for (another) read
* pass.
*/
Assert(lt->frozen);
}
if (lt->buffer)
pfree(lt->buffer);
/* the buffer is lazily allocated, but set the size here */
lt->buffer = NULL;
lt->buffer_size = buffer_size;
/* free the preallocation list, and return unused block numbers */
if (lt->prealloc != NULL)
{
for (int i = lt->nprealloc; i > 0; i--)
ltsReleaseBlock(lts, lt->prealloc[i - 1]);
pfree(lt->prealloc);
lt->prealloc = NULL;
lt->nprealloc = 0;
lt->prealloc_size = 0;
}
}
/*
* Rewind logical tape and switch from reading to writing.
*
* NOTE: we assume the caller has read the tape to the end; otherwise
* untouched data will not have been freed. We could add more code to free
* any unread blocks, but in current usage of this module it'd be useless
* code.
*/
void
LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(!lt->writing && !lt->frozen);
lt->writing = true;
lt->dirty = false;
lt->firstBlockNumber = -1L;
lt->curBlockNumber = -1L;
lt->pos = 0;
lt->nbytes = 0;
if (lt->buffer)
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
}
/*
* Read from a logical tape.
*
* Early EOF is indicated by return value less than #bytes requested.
*/
size_t
LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size)
{
LogicalTape *lt;
size_t nread = 0;
size_t nthistime;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(!lt->writing);
if (lt->buffer == NULL)
ltsInitReadBuffer(lts, lt);
while (size > 0)
{
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
if (!ltsReadFillBuffer(lts, lt))
break; /* EOF */
}
nthistime = lt->nbytes - lt->pos;
if (nthistime > size)
nthistime = size;
Assert(nthistime > 0);
memcpy(ptr, lt->buffer + lt->pos, nthistime);
lt->pos += nthistime;
ptr = (void *) ((char *) ptr + nthistime);
size -= nthistime;
nread += nthistime;
}
return nread;
}
/*
* "Freeze" the contents of a tape so that it can be read multiple times
* and/or read backwards. Once a tape is frozen, its contents will not
* be released until the LogicalTapeSet is destroyed. This is expected
* to be used only for the final output pass of a merge.
*
* This *must* be called just at the end of a write pass, before the
* tape is rewound (after rewind is too late!). It performs a rewind
* and switch to read mode "for free". An immediately following rewind-
* for-read call is OK but not necessary.
*
* share output argument is set with details of storage used for tape after
* freezing, which may be passed to LogicalTapeSetCreate within leader
* process later. This metadata is only of interest to worker callers
* freezing their final output for leader (single materialized tape).
* Serial sorts should set share to NULL.
*/
void
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->writing);
Assert(lt->offsetBlockNumber == 0L);
/*
* Completion of a write phase. Flush last partial data block, and rewind
* for nondestructive read.
*/
if (lt->dirty)
{
/*
* As long as we've filled the buffer at least once, its contents are
* entirely defined from valgrind's point of view, even though
* contents beyond the current end point may be stale. But it's
* possible - at least in the case of a parallel sort - to sort such
* small amount of data that we do not fill the buffer even once. Tell
* valgrind that its contents are defined, so it doesn't bleat.
*/
VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
lt->buffer_size - lt->nbytes);
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
lt->writing = false;
}
lt->writing = false;
lt->frozen = true;
/*
* The seek and backspace functions assume a single block read buffer.
* That's OK with current usage. A larger buffer is helpful to make the
* read pattern of the backing file look more sequential to the OS, when
* we're reading from multiple tapes. But at the end of a sort, when a
* tape is frozen, we only read from a single tape anyway.
*/
if (!lt->buffer || lt->buffer_size != BLCKSZ)
{
if (lt->buffer)
pfree(lt->buffer);
lt->buffer = palloc(BLCKSZ);
lt->buffer_size = BLCKSZ;
}
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
if (lt->firstBlockNumber == -1L)
lt->nextBlockNumber = -1L;
ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
if (TapeBlockIsLast(lt->buffer))
lt->nextBlockNumber = -1L;
else
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
lt->nbytes = TapeBlockGetNBytes(lt->buffer);
/* Handle extra steps when caller is to share its tapeset */
if (share)
{
BufFileExportShared(lts->pfile);
share->firstblocknumber = lt->firstBlockNumber;
}
}
/*
* Add additional tapes to this tape set. Not intended to be used when any
* tapes are frozen.
*/
void
LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
{
int i;
int nTapesOrig = lts->nTapes;
lts->nTapes += nAdditional;
lts->tapes = (LogicalTape *) repalloc(lts->tapes,
lts->nTapes * sizeof(LogicalTape));
for (i = nTapesOrig; i < lts->nTapes; i++)
ltsInitTape(<s->tapes[i]);
}
/*
* Backspace the tape a given number of bytes. (We also support a more
* general seek interface, see below.)
*
* *Only* a frozen-for-read tape can be backed up; we don't support
* random access during write, and an unfrozen read tape may have
* already discarded the desired data!
*
* Returns the number of bytes backed up. It can be less than the
* requested amount, if there isn't that much data before the current
* position. The tape is positioned to the beginning of the tape in
* that case.
*/
size_t
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
{
LogicalTape *lt;
size_t seekpos = 0;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(lt->buffer_size == BLCKSZ);
if (lt->buffer == NULL)
ltsInitReadBuffer(lts, lt);
/*
* Easy case for seek within current block.
*/
if (size <= (size_t) lt->pos)
{
lt->pos -= (int) size;
return size;
}
/*
* Not-so-easy case, have to walk back the chain of blocks. This
* implementation would be pretty inefficient for long seeks, but we
* really aren't doing that (a seek over one tuple is typical).
*/
seekpos = (size_t) lt->pos; /* part within this block */
while (size > seekpos)
{
long prev = TapeBlockGetTrailer(lt->buffer)->prev;
if (prev == -1L)
{
/* Tried to back up beyond the beginning of tape. */
if (lt->curBlockNumber != lt->firstBlockNumber)
elog(ERROR, "unexpected end of tape");
lt->pos = 0;
return seekpos;
}
ltsReadBlock(lts, prev, (void *) lt->buffer);
if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
prev,
TapeBlockGetTrailer(lt->buffer)->next,
lt->curBlockNumber);
lt->nbytes = TapeBlockPayloadSize;
lt->curBlockNumber = prev;
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
seekpos += TapeBlockPayloadSize;
}
/*
* 'seekpos' can now be greater than 'size', because it points to the
* beginning the target block. The difference is the position within the
* page.
*/
lt->pos = seekpos - size;
return size;
}
/*
* Seek to an arbitrary position in a logical tape.
*
* *Only* a frozen-for-read tape can be seeked.
*
* Must be called with a block/offset previously returned by
* LogicalTapeTell().
*/
void
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
Assert(lt->buffer_size == BLCKSZ);
if (lt->buffer == NULL)
ltsInitReadBuffer(lts, lt);
if (blocknum != lt->curBlockNumber)
{
ltsReadBlock(lts, blocknum, (void *) lt->buffer);
lt->curBlockNumber = blocknum;
lt->nbytes = TapeBlockPayloadSize;
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
}
if (offset > lt->nbytes)
elog(ERROR, "invalid tape seek position");
lt->pos = offset;
}
/*
* Obtain current position in a form suitable for a later LogicalTapeSeek.
*
* NOTE: it'd be OK to do this during write phase with intention of using
* the position for a seek after freezing. Not clear if anyone needs that.
*/
void
LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
if (lt->buffer == NULL)
ltsInitReadBuffer(lts, lt);
Assert(lt->offsetBlockNumber == 0L);
/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
Assert(lt->buffer_size == BLCKSZ);
*blocknum = lt->curBlockNumber;
*offset = lt->pos;
}
/*
* Obtain total disk space currently used by a LogicalTapeSet, in blocks.
*/
long
LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
return lts->nBlocksAllocated - lts->nHoleBlocks;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦