greenplumn cdbendpoint 源码
greenplumn cdbendpoint 代码
文件路径:/src/backend/cdb/endpoint/cdbendpoint.c
/*-------------------------------------------------------------------------
* cdbendpoint.c
*
* An endpoint is a query result source for a parallel retrieve cursor on a
* dedicated QE. One parallel retrieve cursor could have multiple endpoints
* on different QEs to allow retrieving in parallel.
*
* This file implements the sender part of an endpoint.
*
* Endpoints may exist on the coordinator or segments, depending on the query
* of the PARALLEL RETRIEVE CURSOR:
* (1) An endpoint is on QD only if the query of the parallel cursor needs to
* be finally gathered by the master. e.g.:
* > DECLARE c1 PARALLEL RETRIEVE CURSOR FOR SELECT * FROM T1 ORDER BY C1;
* (2) The endpoints are on specific segments node if the direct dispatch happens.
* e.g.:
* > DECLARE c1 PARALLEL RETRIEVE CURSOR FOR SELECT * FROM T1 WHERE C1=1;
* (3) The endpoints are on all segments node. e.g:
* > DECLARE c1 PARALLEL RETRIEVE CURSOR FOR SELECT * FROM T1;
*
* When a parallel retrieve cursor is declared, the query plan will be
* dispatched to the corresponding QEs. Before the query execution, endpoints
* will be created first on QEs. An instance of Endpoint struct in the shared
* memory represents the endpoint. Through the Endpoint, the client could know
* the endpoint's identification (endpoint name), location (dbid, host, port
* and session id), and the state for the retrieve session. All of this
* information can be obtained on QD by UDF gp_get_endpoints() via dispatching
* endpoint queries or on QE's retrieve session by UDF gp_get_segment_endpoints().
*
* Instead of returning the query result to QD through a normal dest receiver,
* endpoints write the results to TQueueDestReceiver which is a shared memory
* queue and can be retrieved from a different process. See
* SetupEndpointExecState(). The information about the message queue
* is also stored in the Endpoint so that the retrieve session on the same QE
* can know.
*
* The token is stored in a different structure EndpointTokenEntry to make the
* tokens same for all backends within the same session under the same postmaster.
* The token is created on each QE after plan get dispatched.
*
* DECLARE returns only when endpoint and token are ready and query starts
* execution. See WaitEndpointsReady().
*
* When the query finishes, the endpoint won't be destroyed immediately since we
* may still want to check its state on QD. In the implementation, the
* DestroyEndpointExecState() is blocked until the parallel retrieve cursor
* is closed explicitly through CLOSE statement or error happens.
*
* UDF gp_wait_parallel_retrieve_cursor() is supplied as helper function
* to monitor the retrieve state. They should be run in the declare transaction
* block on QD.
*
* Copyright (c) 2020-Present VMware, Inc. or its affiliates
*
* IDENTIFICATION
* src/backend/cdb/cdbendpoint.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/session.h"
#include "access/tupdesc.h"
#include "access/xact.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "libpq-fe.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "pgstat.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/procsignal.h"
#include "utils/backend_cancel.h"
#include "utils/builtins.h"
#include "utils/faultinjector.h"
#include "utils/guc.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbendpoint.h"
#include "cdbendpoint_private.h"
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#define WAIT_ENDPOINT_TIMEOUT_MS 100
/*
* The size of endpoint tuple queue in bytes.
* This value refers upstream PARALLEL_TUPLE_QUEUE_SIZE
*/
#define ENDPOINT_TUPLE_QUEUE_SIZE 65536
#define SHMEM_ENDPOINTS_ENTRIES "SharedMemoryEndpointEntries"
#define SHMEM_ENPOINTS_SESSION_INFO "EndpointsSessionInfosHashtable"
#ifdef FAULT_INJECTOR
#define DUMMY_ENDPOINT_NAME "DUMMYENDPOINTNAME"
#define DUMMY_CURSOR_NAME "DUMMYCURSORNAME"
#endif
static EndpointExecState * CurrentEndpointExecState;
typedef struct EndpointTokenTag
{
int sessionID;
Oid userID;
} EndpointTokenTag;
/*
* EndpointTokenHash is located in shared memory on each segment for
* authentication purpose.
*/
typedef struct EndpointTokenEntry
{
EndpointTokenTag tag;
/* The auth token for this session. */
int8 token[ENDPOINT_TOKEN_ARR_LEN];
/* How many endpoints are referred to this entry. */
uint16 refCount;
} EndpointTokenEntry;
/* Shared hash table for session infos */
static HTAB *EndpointTokenHash = NULL;
/* Point to Endpoint entries in shared memory */
static struct EndpointData *sharedEndpoints = NULL;
/* Init helper functions */
static void InitSharedEndpoints(void);
/* Token utility functions */
static const int8 *create_endpoint_token(void);
/* Endpoint helper function */
static Endpoint *alloc_endpoint(const char *cursorName, dsm_handle dsmHandle);
static void free_endpoint(Endpoint *endpoint);
static void create_and_connect_mq(TupleDesc tupleDesc,
dsm_segment **mqSeg /* out */ ,
shm_mq_handle **mqHandle /* out */ );
static void detach_mq(dsm_segment *dsmSeg);
static void setup_endpoint_token_entry(void);
static void wait_receiver(void);
static void unset_endpoint_sender_pid(Endpoint *endPoint);
static void abort_endpoint(void);
static void wait_parallel_retrieve_close(void);
/*
* Calculate the shared memory size for PARALLEL RETRIEVE CURSOR execute.
*/
Size
EndpointShmemSize(void)
{
Size size;
size = MAXALIGN(mul_size(MAX_ENDPOINT_SIZE, sizeof(struct EndpointData)));
/*
* Maximum parallel retrieve cursor session number should be no more than
* the maximum endpoint number, so use MAX_ENDPOINT_SIZE here.
*/
size = add_size(
size, hash_estimate_size(MAX_ENDPOINT_SIZE, sizeof(EndpointTokenEntry)));
return size;
}
/*
* Initialize shared memory for PARALLEL RETRIEVE CURSOR.
*/
void
EndpointShmemInit(void)
{
bool found;
HASHCTL hctl;
sharedEndpoints = (Endpoint *)
ShmemInitStruct(SHMEM_ENDPOINTS_ENTRIES,
MAXALIGN(mul_size(MAX_ENDPOINT_SIZE, sizeof(struct EndpointData))),
&found);
if (!found)
InitSharedEndpoints();
MemSet(&hctl, 0, sizeof(hctl));
hctl.keysize = sizeof(EndpointTokenTag);
hctl.entrysize = sizeof(EndpointTokenEntry);
hctl.hash = tag_hash;
EndpointTokenHash =
ShmemInitHash(SHMEM_ENPOINTS_SESSION_INFO, MAX_ENDPOINT_SIZE,
MAX_ENDPOINT_SIZE, &hctl, HASH_ELEM | HASH_FUNCTION);
}
/*
* Initialize shared memory Endpoint array.
*/
static void
InitSharedEndpoints()
{
Endpoint *endpoints = sharedEndpoints;
for (int i = 0; i < MAX_ENDPOINT_SIZE; ++i)
{
endpoints[i].name[0] = '\0';
endpoints[i].cursorName[0] = '\0';
endpoints[i].databaseID = InvalidOid;
endpoints[i].senderPid = InvalidPid;
endpoints[i].receiverPid = InvalidPid;
endpoints[i].mqDsmHandle = DSM_HANDLE_INVALID;
endpoints[i].sessionDsmHandle = DSM_HANDLE_INVALID;
endpoints[i].sessionID = InvalidEndpointSessionId;
endpoints[i].userID = InvalidOid;
endpoints[i].state = ENDPOINTSTATE_INVALID;
endpoints[i].empty = true;
InitSharedLatch(&endpoints[i].ackDone);
}
}
/*
* Get the endpoint location. Currently used in EXPLAIN only.
*/
enum EndPointExecPosition
GetParallelCursorEndpointPosition(PlannedStmt *plan)
{
if (plan->planTree->flow->flotype == FLOW_SINGLETON)
{
if (plan->planTree->flow->locustype == CdbLocusType_SegmentGeneral)
return ENDPOINT_ON_SINGLE_QE;
else
return ENDPOINT_ON_ENTRY_DB;
}
else if (plan->slices[0].directDispatch.isDirectDispatch &&
plan->slices[0].directDispatch.contentIds != NULL)
{
return ENDPOINT_ON_SOME_QE;
}
else
return ENDPOINT_ON_ALL_QE;
}
/*
* QD waits until the cursor ready for retrieve on the related segments.
*/
void
WaitEndpointsReady(EState *estate)
{
Assert(estate);
CdbDispatcherState *ds = estate->dispatcherState;
cdbdisp_checkDispatchAckMessage(ds, ENDPOINT_READY_ACK_MSG, -1);
check_parallel_retrieve_cursor_errors(estate);
}
/*
* Get or create a authentication token for current session.
*/
static const int8 *
create_endpoint_token(void)
{
static int sessionId = InvalidEndpointSessionId;
static int8 currentToken[ENDPOINT_TOKEN_ARR_LEN] = {0};
/* Generate a new token only if gp_session_id has changed */
if (sessionId != gp_session_id)
{
sessionId = gp_session_id;
if (!pg_strong_random(currentToken, ENDPOINT_TOKEN_ARR_LEN))
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to generate a new random token for session id %d", sessionId)));
}
return currentToken;
}
/*
* Send acknowledge message to QD.
*/
void
EndpointNotifyQD(const char *message)
{
NotifyMyFrontEnd(CDB_NOTIFY_ENDPOINT_ACK, message, MyProcPid);
pq_flush();
}
/*
* Allocate and initialize an endpoint and then create a dest receiver for
* PARALLEL RETRIEVE CURSOR. The dest receiver is based on shm_mq that is used
* by the upstream parallel work.
*/
void
SetupEndpointExecState(TupleDesc tupleDesc, const char *cursorName,
CmdType operation, DestReceiver **endpointDest)
{
shm_mq_handle *shmMqHandle;
allocEndpointExecState();
/*
* The message queue needs to be created first since the dsm_handle has to
* be ready when create EndpointDesc entry.
*/
create_and_connect_mq(tupleDesc, &(CurrentEndpointExecState->dsmSeg), &shmMqHandle);
/*
* Alloc endpoint and set it as the active one for sender.
*/
CurrentEndpointExecState->endpoint =
alloc_endpoint(cursorName, dsm_segment_handle(CurrentEndpointExecState->dsmSeg));
setup_endpoint_token_entry();
CurrentEndpointExecState->dest = CreateTupleQueueDestReceiver(shmMqHandle);
(CurrentEndpointExecState->dest->rStartup)(CurrentEndpointExecState->dest, operation, tupleDesc);
*endpointDest = CurrentEndpointExecState->dest;
}
/*
* Wait until the endpoint finishes and then clean up.
*
* If the queue is large enough for tuples to send, must wait for a receiver
* to attach the message queue before endpoint detaches the message queue.
* Cause if the queue gets detached before receiver attaches, the queue
* will never be attached by a receiver.
*
* Should also clean all other endpoint info here.
*/
void
DestroyEndpointExecState()
{
DestReceiver *endpointDest = CurrentEndpointExecState->dest;
Assert(CurrentEndpointExecState->endpoint);
Assert(CurrentEndpointExecState->dsmSeg);
/*
* wait for receiver to start tuple retrieving. ackDone latch will be
* reset to be re-used when retrieving finished. See notify_sender()
* callers.
*/
wait_receiver();
/*
* tqueueShutdownReceiver() (rShutdown callback) will call
* shm_mq_detach(), so need to call it before detach_mq(). Retrieving
* session will set ackDone latch again after shm_mq_detach() called here.
*/
(*endpointDest->rShutdown) (endpointDest);
(*endpointDest->rDestroy) (endpointDest);
CurrentEndpointExecState->dest = NULL;
/*
* Wait until all data is retrieved by receiver. This is needed because
* when the endpoint sends all data to shared message queue. The retrieve
* session may still not get all data.
*/
wait_receiver();
LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);
unset_endpoint_sender_pid(CurrentEndpointExecState->endpoint);
LWLockRelease(ParallelCursorEndpointLock);
/* Notify QD */
EndpointNotifyQD(ENDPOINT_FINISHED_ACK_MSG);
/*
* If all data get sent, hang the process and wait for QD to close it. The
* purpose is to not clean up Endpoint entry until CLOSE/COMMIT/ABORT
* (i.e. PortalCleanup get executed). So user can still see the finished
* endpoint status through the gp_get_endpoints() UDF. This is needed because
* pg_cursor view can still see the PARALLEL RETRIEVE CURSOR
*/
wait_parallel_retrieve_close();
LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);
free_endpoint(CurrentEndpointExecState->endpoint);
LWLockRelease(ParallelCursorEndpointLock);
CurrentEndpointExecState->endpoint = NULL;
detach_mq(CurrentEndpointExecState->dsmSeg);
CurrentEndpointExecState->dsmSeg = NULL;
CurrentEndpointExecState = NULL;
}
/*
* Allocate an Endpoint entry in shared memory.
*
* cursorName - the parallel retrieve cursor name.
* dsmHandle - dsm handle of shared memory message queue.
*/
static Endpoint
*alloc_endpoint(const char *cursorName, dsm_handle dsmHandle)
{
int i;
int foundIdx = -1;
Endpoint *ret = NULL;
dsm_handle session_dsm_handle;
session_dsm_handle = GetSessionDsmHandle();
if (session_dsm_handle == DSM_HANDLE_INVALID)
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to create the per-session DSM segment.")));
LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);
#ifdef FAULT_INJECTOR
/* inject fault "skip" to set end-point shared memory slot full */
if (SIMPLE_FAULT_INJECTOR("alloc_endpoint_slot_full") == FaultInjectorTypeSkip)
{
for (i = 0; i < MAX_ENDPOINT_SIZE; ++i)
{
if (sharedEndpoints[i].empty)
{
/* pretend to set a valid endpoint */
snprintf(sharedEndpoints[i].name, NAMEDATALEN, "%s",
DUMMY_ENDPOINT_NAME);
snprintf(sharedEndpoints[i].cursorName, NAMEDATALEN, "%s",
DUMMY_CURSOR_NAME);
sharedEndpoints[i].databaseID = MyDatabaseId;
sharedEndpoints[i].mqDsmHandle = DSM_HANDLE_INVALID;
sharedEndpoints[i].sessionDsmHandle = DSM_HANDLE_INVALID;
sharedEndpoints[i].sessionID = gp_session_id;
sharedEndpoints[i].userID = GetSessionUserId();
sharedEndpoints[i].senderPid = InvalidPid;
sharedEndpoints[i].receiverPid = InvalidPid;
sharedEndpoints[i].empty = false;
}
}
}
if (SIMPLE_FAULT_INJECTOR("alloc_endpoint_slot_full_reset") == FaultInjectorTypeSkip)
{
for (i = 0; i < MAX_ENDPOINT_SIZE; ++i)
{
if (endpoint_name_equals(sharedEndpoints[i].name,
DUMMY_ENDPOINT_NAME))
{
sharedEndpoints[i].mqDsmHandle = DSM_HANDLE_INVALID;
sharedEndpoints[i].empty = true;
}
}
}
#endif
/* find an available slot */
for (i = 0; i < MAX_ENDPOINT_SIZE; ++i)
{
if (sharedEndpoints[i].empty)
{
foundIdx = i;
break;
}
}
if (foundIdx == -1)
ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("failed to allocate endpoint for session id %d", gp_session_id)));
generate_endpoint_name(sharedEndpoints[i].name, cursorName);
StrNCpy(sharedEndpoints[i].cursorName, cursorName, NAMEDATALEN);
sharedEndpoints[i].databaseID = MyDatabaseId;
sharedEndpoints[i].sessionID = gp_session_id;
sharedEndpoints[i].userID = GetSessionUserId();
sharedEndpoints[i].senderPid = MyProcPid;
sharedEndpoints[i].receiverPid = InvalidPid;
sharedEndpoints[i].state = ENDPOINTSTATE_READY;
sharedEndpoints[i].empty = false;
sharedEndpoints[i].mqDsmHandle = dsmHandle;
sharedEndpoints[i].sessionDsmHandle = session_dsm_handle;
OwnLatch(&sharedEndpoints[i].ackDone);
ret = &sharedEndpoints[i];
LWLockRelease(ParallelCursorEndpointLock);
return ret;
}
/*
* Create and setup the shared memory message queue.
*
* Create a dsm which contains a TOC(table of content). It has 3 parts:
* 1. Tuple's TupleDesc length.
* 2. Tuple's TupleDesc.
* 3. Shared memory message queue.
*/
static void
create_and_connect_mq(TupleDesc tupleDesc, dsm_segment **mqSeg /* out */ ,
shm_mq_handle **mqHandle /* out */ )
{
shm_toc *toc;
shm_mq *mq;
shm_toc_estimator tocEst;
Size tocSize;
int tupdescLen;
char *tupdescSer;
char *tdlenSpace;
char *tupdescSpace;
TupleDescNode *node = makeNode(TupleDescNode);
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: create and setup the shared memory message queue");
/* Serialize TupleDesc */
node->natts = tupleDesc->natts;
node->tuple = tupleDesc;
tupdescSer =
serializeNode((Node *) node, &tupdescLen, NULL /* uncompressed_size */ );
/* Estimate the dsm size */
shm_toc_initialize_estimator(&tocEst);
shm_toc_estimate_chunk(&tocEst, sizeof(tupdescLen));
shm_toc_estimate_chunk(&tocEst, tupdescLen);
shm_toc_estimate_chunk(&tocEst, ENDPOINT_TUPLE_QUEUE_SIZE);
shm_toc_estimate_keys(&tocEst, 3);
tocSize = shm_toc_estimate(&tocEst);
/* Create dsm and initialize toc. */
*mqSeg = dsm_create(tocSize, 0);
/* Make sure the dsm sticks around up until session exit */
dsm_pin_mapping(*mqSeg);
toc = shm_toc_create(ENDPOINT_MSG_QUEUE_MAGIC, dsm_segment_address(*mqSeg),
tocSize);
tdlenSpace = shm_toc_allocate(toc, sizeof(tupdescLen));
memcpy(tdlenSpace, &tupdescLen, sizeof(tupdescLen));
shm_toc_insert(toc, ENDPOINT_KEY_TUPLE_DESC_LEN, tdlenSpace);
tupdescSpace = shm_toc_allocate(toc, tupdescLen);
memcpy(tupdescSpace, tupdescSer, tupdescLen);
shm_toc_insert(toc, ENDPOINT_KEY_TUPLE_DESC, tupdescSpace);
mq = shm_mq_create(shm_toc_allocate(toc, ENDPOINT_TUPLE_QUEUE_SIZE),
ENDPOINT_TUPLE_QUEUE_SIZE);
shm_toc_insert(toc, ENDPOINT_KEY_TUPLE_QUEUE, mq);
shm_mq_set_sender(mq, MyProc);
*mqHandle = shm_mq_attach(mq, *mqSeg, NULL);
if (*mqHandle == NULL)
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("attach to endpoint shared message queue failed")));
}
/*
* Create/reuse EndpointTokenEntry for current session in shared memory.
* EndpointTokenEntry is used for authentication in the retrieve sessions.
*/
static void
setup_endpoint_token_entry()
{
EndpointTokenEntry *infoEntry = NULL;
bool found = false;
EndpointTokenTag tag;
const int8 *token = NULL;
tag.sessionID = gp_session_id;
tag.userID = GetSessionUserId();
LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);
infoEntry = (EndpointTokenEntry *) hash_search(EndpointTokenHash, &tag, HASH_ENTER, &found);
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: Finish endpoint init. Found EndpointTokenEntry? %d", found);
/*
* Save the token if it is the first time we create endpoint in current
* session. One session will be mapped to one token only.
*/
if (!found)
{
token = create_endpoint_token();
memcpy(infoEntry->token, token, ENDPOINT_TOKEN_ARR_LEN);
infoEntry->refCount = 0;
}
infoEntry->refCount++;
Assert(infoEntry->refCount <= MAX_ENDPOINT_SIZE);
LWLockRelease(ParallelCursorEndpointLock);
}
/*
* check if QD connection still alive.
*/
static bool
checkQDConnectionAlive()
{
ssize_t ret;
char buf;
Assert(MyProcPort != NULL);
if (MyProcPort->sock < 0)
return false;
#ifndef WIN32
ret = recv(MyProcPort->sock, &buf, 1, MSG_PEEK | MSG_DONTWAIT);
#else
ret = recv(MyProcPort->sock, &buf, 1, MSG_PEEK | MSG_PARTIAL);
#endif
if (ret == 0) /* socket has been closed. EOF */
return false;
if (ret > 0) /* data waiting on socket, it must be OK. */
return true;
if (ret == -1) /* error, or would be block. */
{
if (errno == EAGAIN || errno == EINPROGRESS)
return true; /* connection intact, no data available */
else
return false;
}
/* not reached */
return true;
}
/*
* wait_receiver - wait receiver to retrieve at least once from the
* shared memory message queue.
*
* If the queue only attached by the sender and the queue is large enough
* for all tuples, sender should wait receiver. Cause if sender detached
* from the queue, the queue will be not available for receiver.
*/
static void
wait_receiver(void)
{
EndpointExecState * state = CurrentEndpointExecState;
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: wait receiver");
while (true)
{
int wr = 0;
CHECK_FOR_INTERRUPTS();
if (QueryFinishPending)
break;
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: sender wait latch in wait_receiver()");
wr = WaitLatchOrSocket(&state->endpoint->ackDone,
WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT | WL_SOCKET_READABLE,
MyProcPort->sock,
WAIT_ENDPOINT_TIMEOUT_MS,
PG_WAIT_PARALLEL_RETRIEVE_CURSOR);
if (wr & WL_SOCKET_READABLE)
{
if (!checkQDConnectionAlive())
{
ereport(LOG,
(errmsg("CDB_ENDPOINT: sender found that the connection to QD is broken: %m")));
abort_endpoint();
proc_exit(0);
}
}
if (wr & WL_POSTMASTER_DEATH)
{
abort_endpoint();
ereport(LOG,
(errmsg("CDB_ENDPOINT: postmaster exit, close shared memory message queue")));
proc_exit(0);
}
if (wr & WL_LATCH_SET)
{
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: sender reset latch in wait_receiver()");
ResetLatch(&state->endpoint->ackDone);
break;
}
}
}
/*
* Detach the shared memory message queue.
* This should happen after free endpoint, otherwise endpoint->mq_dsm_handle
* becomes invalid pointer.
*/
static void
detach_mq(dsm_segment *dsmSeg)
{
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: sender message queue detaching. '%p'",
(void *) dsmSeg);
Assert(dsmSeg);
dsm_detach(dsmSeg);
}
/*
* Unset endpoint sender pid.
*
* Clean the Endpoint entry sender pid when endpoint finish it's
* job or abort.
* Needs to be called with exclusive lock on ParallelCursorEndpointLock.
*/
static void
unset_endpoint_sender_pid(Endpoint *endpoint)
{
Assert(endpoint);
Assert(!endpoint->empty);
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: unset endpoint sender pid");
/*
* Only the endpoint QE/entry DB execute this unset sender pid function.
* The sender pid in Endpoint entry must be MyProcPid or InvalidPid.
*/
Assert(MyProcPid == endpoint->senderPid ||
endpoint->senderPid == InvalidPid);
Assert(!am_cursor_retrieve_handler);
endpoint->senderPid = InvalidPid;
}
/*
* abort_endpoint - xact abort routine for endpoint
*/
static void
abort_endpoint(void)
{
EndpointExecState * state = CurrentEndpointExecState;
if (state->dest)
{
/*
* rShutdown callback will call shm_mq_detach(), so need to call it
* before detach_mq() to clean up.
*/
DestReceiver *endpointDest = state->dest;
(*endpointDest->rShutdown) (endpointDest);
(*endpointDest->rDestroy) (endpointDest);
state->dest = NULL;
}
if (state->endpoint)
{
LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);
/*
* These two better be called in one lock section. So retriever abort
* will not execute extra works.
*/
unset_endpoint_sender_pid(state->endpoint);
free_endpoint(state->endpoint);
LWLockRelease(ParallelCursorEndpointLock);
/* Notify QD */
EndpointNotifyQD(ENDPOINT_FINISHED_ACK_MSG);
state->endpoint = NULL;
}
/*
* During xact abort, should make sure the endpoint_cleanup called first.
* Cause if call detach_mq to detach the message queue first, the
* retriever may read NULL from message queue, then retrieve mark itself
* down.
*
* So here, need to make sure signal retrieve abort first before endpoint
* detach message queue.
*/
if (state->dsmSeg)
{
detach_mq(state->dsmSeg);
state->dsmSeg = NULL;
}
}
/*
* Wait for PARALLEL RETRIEVE CURSOR cleanup after the endpoint sends all data.
*
* If all data get sent, hang the process and wait for QD to close it.
* The purpose is to not clean up Endpoint entry until
* CLOSE/COMMIT/ABORT (ie. PortalCleanup get executed).
*/
static void
wait_parallel_retrieve_close(void)
{
ResetLatch(&MyProc->procLatch);
while (true)
{
int wr;
CHECK_FOR_INTERRUPTS();
if (QueryFinishPending || QueryCancelPending)
break;
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: wait for parallel retrieve cursor close");
wr = WaitLatchOrSocket(&MyProc->procLatch,
WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT | WL_SOCKET_READABLE,
MyProcPort->sock,
WAIT_ENDPOINT_TIMEOUT_MS,
PG_WAIT_PARALLEL_RETRIEVE_CURSOR);
if (wr & WL_POSTMASTER_DEATH)
{
ereport(LOG,
(errmsg("CDB_ENDPOINT: postmaster exit, close shared memory message queue")));
proc_exit(0);
}
if (wr & WL_SOCKET_READABLE)
{
if (!checkQDConnectionAlive())
{
ereport(LOG,
(errmsg("CDB_ENDPOINT: sender found that the connection to QD is broken: %m")));
proc_exit(0);
}
}
if (wr & WL_LATCH_SET)
ResetLatch(&MyProc->procLatch);
}
}
/*
* free_endpoint - Frees the given endpoint.
*
* Needs to be called with exclusive lock on ParallelCursorEndpointLock.
*/
static void
free_endpoint(Endpoint *endpoint)
{
EndpointTokenTag tag;
EndpointTokenEntry *infoEntry = NULL;
bool found;
Assert(endpoint);
Assert(!endpoint->empty);
elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: free endpoint '%s'", endpoint->name);
endpoint->databaseID = InvalidOid;
endpoint->mqDsmHandle = DSM_HANDLE_INVALID;
endpoint->sessionDsmHandle = DSM_HANDLE_INVALID;
endpoint->empty = true;
MemSet(endpoint->name, 0, NAMEDATALEN);
MemSet(endpoint->cursorName, 0, NAMEDATALEN);
ResetLatch(&endpoint->ackDone);
DisownLatch(&endpoint->ackDone);
tag.sessionID = endpoint->sessionID;
tag.userID = endpoint->userID;
infoEntry = (EndpointTokenEntry *) hash_search(
EndpointTokenHash, &tag, HASH_FIND, &found);
Assert(found);
infoEntry->refCount--;
if (infoEntry->refCount == 0)
hash_search(EndpointTokenHash, &tag, HASH_REMOVE, NULL);
endpoint->sessionID = InvalidEndpointSessionId;
endpoint->userID = InvalidOid;
}
Endpoint
*get_endpointdesc_by_index(int index)
{
Assert(index > -1 && index < MAX_ENDPOINT_SIZE);
return &sharedEndpoints[index];
}
/*
*
* find_endpoint - Find the endpoint by given endpoint name and session id.
*
* For the endpoint, the session_id is the gp_session_id since it is the same
* with the session which created the parallel retrieve cursor.
* For the retriever, the session_id is picked by the token when perform the
* authentication.
*
* The caller is responsible for acquiring ParallelCursorEndpointLock lock.
*/
Endpoint
*find_endpoint(const char *endpointName, int sessionID)
{
Endpoint *res = NULL;
Assert(endpointName && strlen(endpointName) > 0);
Assert(LWLockHeldByMe(ParallelCursorEndpointLock));
Assert(sessionID != InvalidEndpointSessionId);
for (int i = 0; i < MAX_ENDPOINT_SIZE; ++i)
{
if (!sharedEndpoints[i].empty &&
sharedEndpoints[i].sessionID == sessionID &&
endpoint_name_equals(sharedEndpoints[i].name, endpointName) &&
sharedEndpoints[i].databaseID == MyDatabaseId)
{
res = &sharedEndpoints[i];
break;
}
}
return res;
}
/*
* Find the token from the hash table based on given session id and user.
*/
void
get_token_from_session_hashtable(int sessionId, Oid userID, int8 *token /* out */ )
{
EndpointTokenEntry *infoEntry = NULL;
EndpointTokenTag tag;
tag.sessionID = sessionId;
tag.userID = userID;
LWLockAcquire(ParallelCursorEndpointLock, LW_SHARED);
infoEntry = (EndpointTokenEntry *) hash_search(EndpointTokenHash, &tag,
HASH_FIND, NULL);
if (infoEntry == NULL)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("token for user id: %u, session: %d doesn't exist",
tag.userID, sessionId)));
memcpy(token, infoEntry->token, ENDPOINT_TOKEN_ARR_LEN);
LWLockRelease(ParallelCursorEndpointLock);
}
/*
* Get the corresponding session id by the given token.
*/
int
get_session_id_from_token(Oid userID, const int8 *token)
{
int sessionId = InvalidEndpointSessionId;
EndpointTokenEntry *infoEntry = NULL;
HASH_SEQ_STATUS status;
LWLockAcquire(ParallelCursorEndpointLock, LW_SHARED);
hash_seq_init(&status, EndpointTokenHash);
while ((infoEntry = (EndpointTokenEntry *) hash_seq_search(&status)) != NULL)
{
if (endpoint_token_hex_equals(infoEntry->token, token) &&
userID == infoEntry->tag.userID)
{
sessionId = infoEntry->tag.sessionID;
hash_seq_term(&status);
break;
}
}
LWLockRelease(ParallelCursorEndpointLock);
return sessionId;
}
/*
* Called during xaction abort.
*/
void
AtAbort_EndpointExecState()
{
EndpointExecState *state = CurrentEndpointExecState;
if (state != NULL)
{
abort_endpoint();
pfree(state);
CurrentEndpointExecState = NULL;
}
}
/* allocate new EndpointExecState and set it to CurrentEndpointExecState */
void
allocEndpointExecState()
{
EndpointExecState *endpointExecState;
MemoryContext oldcontext;
/* Previous endpoint estate should be cleaned up. */
Assert(!CurrentEndpointExecState);
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
endpointExecState = palloc0(sizeof(EndpointExecState));
CurrentEndpointExecState = endpointExecState;
MemoryContextSwitchTo(oldcontext);
}
相关信息
相关文章
greenplumn cdbendpoint_private 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦