greenplumn cdbappendonlyxlog 源码
greenplumn cdbappendonlyxlog 代码
文件路径:/src/backend/cdb/cdbappendonlyxlog.c
/*-------------------------------------------------------------------------
 *
 * cdbappendonlyxlog.c
 *
 * Portions Copyright (c) 2009-2010, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbappendonlyxlog.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/file.h>
#include "access/aomd.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
#include "cdb/cdbappendonlyxlog.h"
#include "pgstat.h"
#include "storage/fd.h"
#include "utils/faultinjector.h"
#include "utils/faultinjector_lists.h"
/*
 * Insert an AO XLOG/AOCO record.
 *
 * This is also used with 0 length, to mark creation of a new segfile.
 */
void
xlog_ao_insert(RelFileNode relFileNode, int32 segmentFileNum,
			   int64 offset, void *buffer, int32 bufferLen)
{
	xl_ao_insert	xlaoinsert;
	xlaoinsert.target.node = relFileNode;
	xlaoinsert.target.segment_filenum = segmentFileNum;
	xlaoinsert.target.offset = offset;
	XLogBeginInsert();
	XLogRegisterData((char*) &xlaoinsert, SizeOfAOInsert);
	if (bufferLen != 0)
		XLogRegisterData((char*) buffer, bufferLen);
	SIMPLE_FAULT_INJECTOR("xlog_ao_insert");
	XLogInsert(RM_APPEND_ONLY_ID, XLOG_APPENDONLY_INSERT);
	wait_to_avoid_large_repl_lag();
}
static void
ao_insert_replay(XLogReaderState *record)
{
	char	   *dbPath;
	char		path[MAXPGPATH];
	int			written_len;
	File		file;
	int			fileFlags;
	xl_ao_insert *xlrec = (xl_ao_insert *) XLogRecGetData(record);
	char	   *buffer = (char *) xlrec + SizeOfAOInsert;
	uint32		len = XLogRecGetDataLen(record) - SizeOfAOInsert;
	dbPath = GetDatabasePath(xlrec->target.node.dbNode,
							 xlrec->target.node.spcNode);
	if (xlrec->target.segment_filenum == 0)
		snprintf(path, MAXPGPATH, "%s/%u", dbPath, xlrec->target.node.relNode);
	else
		snprintf(path, MAXPGPATH, "%s/%u.%u", dbPath, xlrec->target.node.relNode, xlrec->target.segment_filenum);
	pfree(dbPath);
	fileFlags = O_RDWR | PG_BINARY;
	/* When writing from the beginning of the file, it might not exist yet. Create it. */
	if (xlrec->target.offset == 0)
		fileFlags |= O_CREAT;
	file = PathNameOpenFile(path, fileFlags);
	if (file < 0)
	{
		XLogAOSegmentFile(xlrec->target.node, xlrec->target.segment_filenum);
		return;
	}
	written_len = FileWrite(file, buffer, len, xlrec->target.offset,
							WAIT_EVENT_COPY_FILE_WRITE);
	if (written_len < 0 || written_len != len)
	{
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("failed to write %d bytes in file \"%s\": %m",
						len,
						path)));
	}
	register_dirty_segment_ao(xlrec->target.node,
							  xlrec->target.segment_filenum,
							  file);
	FileClose(file);
}
/*
 * AO/CO truncate xlog record insertion.
 */
void xlog_ao_truncate(RelFileNode relFileNode, int32 segmentFileNum, int64 offset)
{
	xl_ao_truncate	xlaotruncate;
	xlaotruncate.target.node = relFileNode;
	xlaotruncate.target.segment_filenum = segmentFileNum;
	xlaotruncate.target.offset = offset;
	XLogBeginInsert();
	XLogRegisterData((char*) &xlaotruncate, sizeof(xl_ao_truncate));
	XLogInsert(RM_APPEND_ONLY_ID, XLOG_APPENDONLY_TRUNCATE);
}
static void
ao_truncate_replay(XLogReaderState *record)
{
	char	   *dbPath;
	char		path[MAXPGPATH];
	File		file;
	xl_ao_truncate *xlrec = (xl_ao_truncate*) XLogRecGetData(record);
	dbPath = GetDatabasePath(xlrec->target.node.dbNode,
							 xlrec->target.node.spcNode);
	if (xlrec->target.segment_filenum == 0)
		snprintf(path, MAXPGPATH, "%s/%u", dbPath, xlrec->target.node.relNode);
	else
		snprintf(path, MAXPGPATH, "%s/%u.%u", dbPath, xlrec->target.node.relNode, xlrec->target.segment_filenum);
	pfree(dbPath);
	dbPath = NULL;
	file = PathNameOpenFile(path, O_RDWR | PG_BINARY);
	if (file < 0)
	{
		/*
		 * Primary creates the file first and then writes the xlog record for
		 * the creation for AO tables similar to heap.  Hence, file can get
		 * created on primary without writing xlog record if failure happens
		 * on primary just after creating the file. This creates situation
		 * where VACUUM can generate truncate record based on aoseg entry with
		 * eof 0 and file present on primary. Then during replay mirror may
		 * not have the file, as was never created on mirror. So, avoid adding
		 * the entry to invalid hash table for truncate at offset zero
		 * (EOF=0).  This avoids mirror PANIC, as anyways truncate to zero is
		 * same as file not present.
		 */
		if (xlrec->target.offset != 0)
			XLogAOSegmentFile(xlrec->target.node, xlrec->target.segment_filenum);
		return;
	}
	if (FileTruncate(file, xlrec->target.offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0)
	{
		ereport(WARNING,
				(errcode_for_file_access(),
				 errmsg("failed to truncate file \"%s\" to offset:" INT64_FORMAT " : %m",
						path, xlrec->target.offset)));
	}
	FileClose(file);
}
void
appendonly_redo(XLogReaderState *record)
{
	uint8         info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
	/*
	 * Do not perform redo of AO XLOG records for crash recovery mode. We do
	 * not need to replay AO XLOG records in this case because fsync
	 * is performed on file close.
	 */
	if (IsCrashRecoveryOnly())
		return;
	switch (info)
	{
		case XLOG_APPENDONLY_INSERT:
			ao_insert_replay(record);
			break;
		case XLOG_APPENDONLY_TRUNCATE:
			ao_truncate_replay(record);
			break;
		default:
			elog(PANIC, "appendonly_redo: unknown code %u", info);
	}
}
相关信息
相关文章
greenplumn cdbappendonlystorageformat 源码
greenplumn cdbappendonlystorageread 源码
greenplumn cdbappendonlystoragewrite 源码
greenplumn cdbbufferedappend 源码
greenplumn cdbdistributedsnapshot 源码
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
- 
                        3、 gate.io
- 
                        8、 openharmony
- 
                        9、 golang