greenplumn gpcloud 源码

  • 2022-08-18
  • 浏览 (538)

greenplumn gpcloud 代码

文件路径:/gpcontrib/gpcloud/src/gpcloud.cpp

extern "C" {
#include "postgres.h"

/*
 * Spinlocks are in PostgreSQL defined with the register storage class,
 * which is perfectly legal C but which generates a warning when compiled
 * as C++. Ignore this warning as it's a false positive in this case.
 */
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-register"
#endif

#include "access/external.h"
#include "access/extprotocol.h"
#include "access/xact.h"
#include "catalog/pg_proc.h"
#include "commands/defrem.h"
#include "fmgr.h"
#include "funcapi.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/resowner.h"

#ifdef __clang__
#pragma clang diagnostic pop
#endif

/* Do the module magic dance */
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(s3_export);
PG_FUNCTION_INFO_V1(s3_import);

Datum s3_export(PG_FUNCTION_ARGS);
Datum s3_import(PG_FUNCTION_ARGS);
}

#include "gpreader.h"
#include "gpwriter.h"

string s3extErrorMessage;

/*
 * To detect the query interruption event (triggered by user), we should
 * consider both QueryCancelPending variable and transaction status. Here
 * QueryCancelPending is not sufficient because it will be reset before the
 * extprotocol last call, then it is hard to distinguish normal exit/finish
 * from abnormal transaction abort.
 *
 * IsAbortInProgress() can also be reset by GPDB while downloading file, we
 * must save the state
 */
static bool queryCancelFlag = false;
bool S3QueryIsAbortInProgress(void) {
    bool queryIsBeingCancelled = QueryCancelPending || IsAbortInProgress();

    // We need a thread-safe way to query and set queryCancelFlag.
    // queryCancelFlag is set to TRUE for the first time when cancel signal is
    // detected. If cancel signal is already captured, value will not be
    // swapped and queryIsCancelledAlready is true.
    bool queryIsCancelledAlready =
        !__sync_bool_compare_and_swap(&queryCancelFlag, false, queryIsBeingCancelled);

    return queryIsBeingCancelled || queryIsCancelledAlready;
}

void *S3Alloc(size_t size) {
    return palloc(size);
}

/*
 * MemoryContext will be free automatically when query is completed or aborted.
 * So no need to call pfree() again unless it is really needed inside query execution.
 * Otherwise might free already cleaned memory context for failed query.
 *
 * Currently download/upload buffer are using memory from MemoryContext.
 */
void S3Free(void *p) {
    // pfree(p);
}

/*
 * Set up the thread signal mask, we don't want to run our signal handlers
 * in downloading and uploading threads.
 */
void MaskThreadSignals() {
    sigset_t sigs;

    if (pthread_equal(main_tid, pthread_self())) {
        S3ERROR("thread_mask is called from main thread!");
        return;
    }

    sigemptyset(&sigs);

    /* make our thread to ignore these signals (which should allow that they be
     * delivered to the main thread) */
    sigaddset(&sigs, SIGHUP);
    sigaddset(&sigs, SIGINT);
    sigaddset(&sigs, SIGTERM);
    sigaddset(&sigs, SIGALRM);
    sigaddset(&sigs, SIGUSR1);
    sigaddset(&sigs, SIGUSR2);

    pthread_sigmask(SIG_BLOCK, &sigs, NULL);
}

/*
 * Detect data format
 * used to set file extension on S3 in gpwriter.
 */
static const char *getFormatStr(FunctionCallInfo fcinfo) {
    Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo);
    ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
    char fmtcode = exttbl->fmtcode;

    if (fmttype_is_text(fmtcode)) return "txt";
    if (fmttype_is_csv(fmtcode)) return "csv";
    return S3_DEFAULT_FORMAT;
}

bool hasHeader = false;

char eolString[EOL_CHARS_MAX_LEN + 1] = "\n";  // LF by default

static void parseFormatOpts(FunctionCallInfo fcinfo) {
    Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo);
    ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);

    const char fmtcode = exttbl->fmtcode;
    const List *fmtopts = exttbl->options;
    char *newline_str = NULL;

    // only TEXT and CSV have detailed options
    if (fmttype_is_csv(fmtcode) || fmttype_is_text(fmtcode)) {
        ListCell *option;

        foreach (option, fmtopts) {
            DefElem *defel = (DefElem *)lfirst(option);
            if (strcmp(defel->defname, "header") == 0) {
                hasHeader = defGetBoolean(defel);
            } else if (strcmp(defel->defname, "newline") == 0) {
                newline_str = defGetString(defel);
            }
        }

        // default end-of-line terminator
        eolString[0] = '\n';
        eolString[1] = '\0';

        if (newline_str != NULL) {
            if (pg_strcasecmp(newline_str, "crlf") == 0) {
                eolString[0] = '\r';
                eolString[1] = '\n';
                eolString[2] = '\0';
            } else if (pg_strcasecmp(newline_str, "cr") == 0) {
                eolString[0] = '\r';
                eolString[1] = '\0';
            } else if (pg_strcasecmp(newline_str, "lf") == 0) {
                eolString[0] = '\n';
                eolString[1] = '\0';
            } else {  // should never come here
	      ereport(ERROR, errmsg("invalid value for NEWLINE (%s), "
				    "valid options are: 'LF', 'CRLF', 'CR'",
				    newline_str));
            }
        }
    }
}

typedef struct gpcloudResHandle {
    GPReader *gpreader;
    GPWriter *gpwriter;

    ResourceOwner owner; /* owner of this handle */

    struct gpcloudResHandle *next;
    struct gpcloudResHandle *prev;
} gpcloudResHandle;

// Linked list of opened "handles", which are allocated in TopMemoryContext, and tracked by resource
// owners.
static gpcloudResHandle *openedResHandles;

static bool isGpcloudResReleaseCallbackRegistered;

static gpcloudResHandle *createGpcloudResHandle(void) {
    gpcloudResHandle *resHandle;

    resHandle = (gpcloudResHandle *)MemoryContextAlloc(TopMemoryContext, sizeof(gpcloudResHandle));
    resHandle->gpreader = NULL;
    resHandle->gpwriter = NULL;

    resHandle->owner = CurrentResourceOwner;
    resHandle->next = openedResHandles;
    resHandle->prev = NULL;

    if (openedResHandles) {
        openedResHandles->prev = resHandle;
    }

    openedResHandles = resHandle;

    return resHandle;
}

static void destroyGpcloudResHandle(gpcloudResHandle *resHandle) {
    if (resHandle == NULL) return;

    /* unlink from linked list first */
    if (resHandle->prev)
        resHandle->prev->next = resHandle->next;
    else
        openedResHandles = resHandle->next;

    if (resHandle->next) {
        resHandle->next->prev = resHandle->prev;
    }

    if (resHandle->gpreader != NULL) {
        if (!reader_cleanup(&resHandle->gpreader)) {
            elog(WARNING, "Failed to cleanup gpcloud extension: %s", s3extErrorMessage.c_str());
        }
    }

    if (resHandle->gpwriter != NULL) {
        if (!writer_cleanup(&resHandle->gpwriter)) {
            elog(WARNING, "Failed to cleanup gpcloud extension: %s", s3extErrorMessage.c_str());
        }
    }

    thread_cleanup();
    pfree(resHandle);
}

/*
 * Close any open handles on abort.
 */
static void gpcloudAbortCallback(ResourceReleasePhase phase, bool isCommit, bool isTopLevel,
                                 void *arg) {
    gpcloudResHandle *curr;
    gpcloudResHandle *next;

    if (phase != RESOURCE_RELEASE_AFTER_LOCKS) return;

    next = openedResHandles;
    while (next) {
        curr = next;
        next = curr->next;

        if (curr->owner == CurrentResourceOwner) {
            if (isCommit)
                elog(WARNING, "gpcloud external table reference leak: %p still referenced", curr);

            destroyGpcloudResHandle(curr);
        }
    }
}

/*
 * Import data into GPDB.
 * invoked by GPDB, be careful with C++ exceptions.
 */
Datum s3_import(PG_FUNCTION_ARGS) {
    /* Must be called via the external table format manager */
    if (!CALLED_AS_EXTPROTOCOL(fcinfo))
        elog(ERROR, "extprotocol_import: not called by external protocol manager");

    /* Get our internal description of the protocol */
    gpcloudResHandle *resHandle = (gpcloudResHandle *)EXTPROTOCOL_GET_USER_CTX(fcinfo);

    /* last call. destroy reader */
    if (EXTPROTOCOL_IS_LAST_CALL(fcinfo)) {
        destroyGpcloudResHandle(resHandle);

        EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
        PG_RETURN_INT32(0);
    }

    /* first call. do any desired init */
    if (resHandle == NULL) {
        if (!isGpcloudResReleaseCallbackRegistered) {
            RegisterResourceReleaseCallback(gpcloudAbortCallback, NULL);
            isGpcloudResReleaseCallbackRegistered = true;
        }
        resHandle = createGpcloudResHandle();

        queryCancelFlag = false;
        const char *url_with_options = EXTPROTOCOL_GET_URL(fcinfo);

        // has HEADER? and newline EOL?
        parseFormatOpts(fcinfo);

        thread_setup();

        resHandle->gpreader = reader_init(url_with_options);
        if (!resHandle->gpreader) {
            ereport(ERROR, errmsg("Failed to init gpcloud extension (segid = %d, "
				  "segnum = %d), please check your "
				  "configurations and network connection: %s",
				  s3ext_segid, s3ext_segnum, s3extErrorMessage.c_str()));
        }

        EXTPROTOCOL_SET_USER_CTX(fcinfo, resHandle);
    }

    char *data_buf = EXTPROTOCOL_GET_DATABUF(fcinfo);
    int32 data_len = EXTPROTOCOL_GET_DATALEN(fcinfo);

    if (!reader_transfer_data(resHandle->gpreader, data_buf, data_len)) {
        ereport(ERROR,
                errmsg("s3_import: could not read data: %s", s3extErrorMessage.c_str()));
    }
    PG_RETURN_INT32(data_len);
}

/*
 * Export data out of GPDB.
 * invoked by GPDB, be careful with C++ exceptions.
 */
Datum s3_export(PG_FUNCTION_ARGS) {
    /* Must be called via the external table format manager */
    if (!CALLED_AS_EXTPROTOCOL(fcinfo))
        elog(ERROR, "extprotocol_import: not called by external protocol manager");

    /* Get our internal description of the protocol */
    gpcloudResHandle *resHandle = (gpcloudResHandle *)EXTPROTOCOL_GET_USER_CTX(fcinfo);

    /* last call. destroy writer */
    if (EXTPROTOCOL_IS_LAST_CALL(fcinfo)) {
        destroyGpcloudResHandle(resHandle);

        EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
        PG_RETURN_INT32(0);
    }

    /* first call. do any desired init */
    if (resHandle == NULL) {
        if (!isGpcloudResReleaseCallbackRegistered) {
            RegisterResourceReleaseCallback(gpcloudAbortCallback, NULL);
            isGpcloudResReleaseCallbackRegistered = true;
        }
        resHandle = createGpcloudResHandle();

        queryCancelFlag = false;
        const char *url_with_options = EXTPROTOCOL_GET_URL(fcinfo);
        const char *format = getFormatStr(fcinfo);

        thread_setup();

        resHandle->gpwriter = writer_init(url_with_options, format);
        if (!resHandle->gpwriter) {
	  ereport(ERROR, errmsg("Failed to init gpcloud extension (segid = %d, "
				"segnum = %d), please check your "
				"configurations and network connection: %s",
				s3ext_segid, s3ext_segnum, s3extErrorMessage.c_str()));
        }

        EXTPROTOCOL_SET_USER_CTX(fcinfo, resHandle);
    }

    char *data_buf = EXTPROTOCOL_GET_DATABUF(fcinfo);
    int32 data_len = EXTPROTOCOL_GET_DATALEN(fcinfo);

    if (!writer_transfer_data(resHandle->gpwriter, data_buf, data_len)) {
        ereport(ERROR,
                errmsg("s3_export: could not write data: %s", s3extErrorMessage.c_str()));
    }

    PG_RETURN_INT32(data_len);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn compress_writer 源码

greenplumn decompress_reader 源码

greenplumn gpreader 源码

greenplumn gpwriter 源码

greenplumn s3bucket_reader 源码

greenplumn s3common_reader 源码

greenplumn s3common_writer 源码

greenplumn s3conf 源码

greenplumn s3http_headers 源码

greenplumn s3interface 源码

0  赞