greenplumn nodeShareInputScan 源码
greenplumn nodeShareInputScan 代码
文件路径:/src/backend/executor/nodeShareInputScan.c
/*-------------------------------------------------------------------------
*
* nodeShareInputScan.c
*
* A Share Input Scan node is used to share the result of an operation in
* two different branches in the plan tree.
*
* These come in two variants: local, and cross-slice.
*
* Local shares
* ------------
*
* In local mode, all the consumers are in the same slice as the producer.
* In that case, there's no need to communicate across processes, so we
* rely entirely on data structures in backend-private memory to track the
* state.
*
* In local mode, there is no difference between producer and consumer
* nodes. In ExecInitShareInputScan(), the producer node stores the
* PlanState of the shared child node where all the nodes can find it.
* The first ExecShareInputScan() call initializes the store.
*
* A local-mode ShareInputScan is quite similar to PostgreSQL's CteScan,
* but there are some implementation differences. CteScan uses a special
* PARAM_EXEC entry to hold the shared state, while ShareInputScan uses
* an entry in es_sharenode instead.
*
* Cross-slice shares
* ------------------
*
* A cross-slice share works basically the same as a local one, except
* that the producing slice makes the underlying tuplestore available to
* other processes, by forcing it to be written to a file on disk. The
* first ExecShareInputScan() call in the producing slice materializes
* the whole tuplestore, and advertises that it's ready in shared memory.
* Consumer slices wait for that before trying to read the store.
*
* The producer and the consumers communicate the status of the scan using
* shared memory. There's a hash table in shared memory, containing a
* 'shareinput_Xslice_state' struct for each shared scan. The producer uses
* a condition variable to wake up consumers, when the tuplestore is fully
* materialized, and the consumers use the same condition variable to inform
* the producer when they're done reading it. The producer slice keeps the
* underlying tuplestore open, until all the consumers have finished.
*
*
* Portions Copyright (c) 2007-2008, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/executor/nodeShareInputScan.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xact.h"
#include "cdb/cdbvars.h"
#include "executor/executor.h"
#include "executor/nodeShareInputScan.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/condition_variable.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "utils/faultinjector.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/tuplestore.h"
#include "port/atomics.h"
/*
* In a cross-slice ShareinputScan, the producer and consumer processes
* communicate using shared memory. There's a hash table containing one
* 'shareinput_share_state' for each in-progress shared input scan.
*
* The hash table itself,, and the fields within every entry, are protected
* by ShareInputScanLock. (Although some operations get away without the
* lock, when the field is atomic and/or there's only one possible writer.)
*
* All producers and consumers that participate in a shared scan hold
* a reference to the 'shareinput_Xslice_state' entry of the scan, for
* the whole lifecycle of the node from ExecInitShareInputScan() to
* ExecEndShareInputScan() (although it can be released early by
* ExecSquelchShareInputScan(). The entry in the hash table is created by
* the first participant that initializes, which is not necessarily the
* producer! When the last participant releases the entry, it is removed
* from the hash table.
*/
typedef struct shareinput_tag
{
int32 session_id;
int32 command_count;
int32 share_id;
} shareinput_tag;
typedef struct shareinput_Xslice_state
{
shareinput_tag tag; /* hash key */
int refcount; /* reference count of this entry */
pg_atomic_uint32 ready; /* is the input fully materialized and ready to be read? */
pg_atomic_uint32 ndone; /* # of consumers that have finished the scan */
/*
* ready_done_cv is used for signaling when the scan becomes "ready", and
* when it becomes "done". The producer wakes up everyone waiting on this
* condition variable when it sets ready = true. Also, when the last
* consumer finishes the scan (ndone reaches nconsumers), it wakes up the
* producer using this same condition variable.
*/
ConditionVariable ready_done_cv;
} shareinput_Xslice_state;
/* shared memory hash table holding 'shareinput_Xslice_state' entries */
static HTAB *shareinput_Xslice_hash = NULL;
/*
* The tuplestore files for all share input scans are held in one SharedFileSet.
* The SharedFileSet is attached to a single DSM segment that persists until
* postmaster shutdown. When the reference count of the SharedFileSet reaches
* zero, the SharedFileSet is automatically destroyed, but it is re-initialized
* the next time it's needed.
*
* The SharedFileSet deletes any remaining files when the reference count
* reaches zero, but we don't rely on that mechanism. All the files are
* held in the same SharedFileSet, so it cannot be recycled until all
* ShareInputScans in the system have finished, which might never happen if
* new queries are started continuously. The shareinput_Xslice_state entries
* are reference counted separately, and we clean up the files backing each
* individual ShareInputScan whenever its reference count reaches zero.
*/
static dsm_handle *shareinput_Xslice_dsm_handle_ptr;
static SharedFileSet *shareinput_Xslice_fileset;
/*
* 'shareinput_reference' represents a reference or "lease" to an entry
* in the shared memory hash table. It is used for garbage collection of
* the entries, on transaction abort.
*
* These are allocated in TopMemoryContext, and held in the
* 'shareinput_Xslice_refs' list.
*/
typedef struct shareinput_Xslice_reference
{
int share_id;
shareinput_Xslice_state *xslice_state;
ResourceOwner owner;
dlist_node node;
} shareinput_Xslice_reference;
static dlist_head shareinput_Xslice_refs = DLIST_STATIC_INIT(shareinput_Xslice_refs);
static bool shareinput_resowner_callback_registered = false;
/*
* For local (i.e. intra-slice) variants, we use a 'shareinput_local_state'
* to track the status. It is analogous to 'shareinput_share_state' used for
* cross-slice scans, but we don't need to keep it in shared memory. These
* are held in estate->es_sharenode, indexed by share_id.
*/
typedef struct shareinput_local_state
{
bool ready;
bool closed;
int ndone;
int nsharers;
/*
* This points to the child node that's being shared. Set by
* ExecInitShareInputScan() of the instance that has the child.
*/
PlanState *childState;
/* Tuplestore that holds the result */
Tuplestorestate *ts_state;
} shareinput_local_state;
static shareinput_Xslice_reference *get_shareinput_reference(int share_id);
static void release_shareinput_reference(shareinput_Xslice_reference *ref);
static void shareinput_release_callback(ResourceReleasePhase phase,
bool isCommit,
bool isTopLevel,
void *arg);
static void shareinput_writer_notifyready(shareinput_Xslice_reference *ref);
static void shareinput_reader_waitready(shareinput_Xslice_reference *ref);
static void shareinput_reader_notifydone(shareinput_Xslice_reference *ref, int nconsumers);
static void shareinput_writer_waitdone(shareinput_Xslice_reference *ref, int nconsumers);
static void ExecShareInputScanExplainEnd(PlanState *planstate, struct StringInfoData *buf);
/*
* init_tuplestore_state
* Initialize the tuplestore state for the Shared node if the state
* is not initialized.
*/
static void
init_tuplestore_state(ShareInputScanState *node)
{
EState *estate = node->ss.ps.state;
ShareInputScan *sisc = (ShareInputScan *) node->ss.ps.plan;
shareinput_local_state *local_state = node->local_state;
Tuplestorestate *ts;
int tsptrno;
TupleTableSlot *outerslot;
Assert(!node->isready);
Assert(node->ts_state == NULL);
Assert(node->ts_pos == -1);
if (sisc->cross_slice)
{
if (!node->ref)
elog(ERROR, "cannot execute ShareInputScan that was not initialized");
}
if (!local_state->ready)
{
if (currentSliceId == sisc->producer_slice_id || estate->es_plannedstmt->numSlices == 1)
{
/* We are the producer */
if (sisc->cross_slice)
{
char rwfile_prefix[100];
elog(DEBUG1, "SISC writer (shareid=%d, slice=%d): No tuplestore yet, creating tuplestore",
sisc->share_id, currentSliceId);
ts = tuplestore_begin_heap(true, /* randomAccess */
false, /* interXact */
10); /* maxKBytes FIXME */
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
tuplestore_make_shared(ts,
get_shareinput_fileset(),
rwfile_prefix);
#ifdef FAULT_INJECTOR
if (SIMPLE_FAULT_INJECTOR("sisc_xslice_temp_files") == FaultInjectorTypeSkip)
{
const char *filename = tuplestore_get_buffilename(ts);
if (!filename)
ereport(NOTICE, (errmsg("sisc_xslice: buffilename is null")));
else if (strstr(filename, "base/" PG_TEMP_FILES_DIR) == filename)
ereport(NOTICE, (errmsg("sisc_xslice: Use default tablespace")));
else if (strstr(filename, "pg_tblspc/") == filename)
ereport(NOTICE, (errmsg("sisc_xslice: Use temp tablespace")));
else
ereport(NOTICE, (errmsg("sisc_xslice: Unexpected prefix of the tablespace path")));
}
#endif
}
else
{
/* intra-slice */
ts = tuplestore_begin_heap(true, /* randomAccess */
false, /* interXact */
PlanStateOperatorMemKB((PlanState *) node));
/*
* Offer extra memory usage info for EXPLAIN ANALYZE.
*
* If this is a cross-slice share, the tuplestore uses very
* little memory, because it has to materialize the result on
* a file anyway, so that it can be shared across processes.
* In that case, reporting memory usage doesn't make much
* sense. The "work_mem wanted" value would particularly
* non-sensical, as we we would write to a file regardless of
* work_mem. So only track memory usage in the non-cross-slice
* case.
*/
if (node->ss.ps.instrument && node->ss.ps.instrument->need_cdb)
{
/* Let the tuplestore share our Instrumentation object. */
tuplestore_set_instrument(ts, node->ss.ps.instrument);
/* Request a callback at end of query. */
node->ss.ps.cdbexplainfun = ExecShareInputScanExplainEnd;
}
}
for (;;)
{
outerslot = ExecProcNode(local_state->childState);
if (TupIsNull(outerslot))
break;
tuplestore_puttupleslot(ts, outerslot);
}
if (sisc->cross_slice)
{
tuplestore_freeze(ts);
shareinput_writer_notifyready(node->ref);
}
tuplestore_rescan(ts);
}
else
{
/*
* We are a consumer slice. Wait for the producer to create the
* tuplestore.
*/
char rwfile_prefix[100];
Assert(sisc->cross_slice);
shareinput_reader_waitready(node->ref);
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
ts = tuplestore_open_shared(get_shareinput_fileset(), rwfile_prefix);
}
local_state->ts_state = ts;
local_state->ready = true;
tsptrno = 0;
}
else
{
/* Another local reader */
ts = local_state->ts_state;
tsptrno = tuplestore_alloc_read_pointer(ts, (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND));
tuplestore_select_read_pointer(ts, tsptrno);
tuplestore_rescan(ts);
}
node->ts_state = ts;
node->ts_pos = tsptrno;
node->isready = true;
}
/* ------------------------------------------------------------------
* ExecShareInputScan
* Retrieve a tuple from the ShareInputScan
* ------------------------------------------------------------------
*/
static TupleTableSlot *
ExecShareInputScan(PlanState *pstate)
{
ShareInputScanState *node = castNode(ShareInputScanState, pstate);
ShareInputScan *sisc = (ShareInputScan *) pstate->plan;
EState *estate;
ScanDirection dir;
bool forward;
TupleTableSlot *slot;
/*
* get state info from node
*/
estate = pstate->state;
dir = estate->es_direction;
forward = ScanDirectionIsForward(dir);
if (sisc->this_slice_id != currentSliceId && estate->es_plannedstmt->numSlices != 1)
elog(ERROR, "cannot execute alien Share Input Scan");
/* if first time call, need to initialize the tuplestore state. */
if (!node->isready)
init_tuplestore_state(node);
/*
* Return NULL when necessary.
* This could help improve performance, especially when tuplestore is huge, because ShareInputScan
* do not need to read tuple from tuplestore when discard_output is true, which means current
* ShareInputScan is one but not the last one of Sequence's subplans.
*/
if (sisc->discard_output)
return NULL;
slot = node->ss.ps.ps_ResultTupleSlot;
Assert(!node->local_state->closed);
tuplestore_select_read_pointer(node->ts_state, node->ts_pos);
while(1)
{
bool gotOK;
gotOK = tuplestore_gettupleslot(node->ts_state, forward, false, slot);
if (!gotOK)
return NULL;
SIMPLE_FAULT_INJECTOR("execshare_input_next");
return slot;
}
Assert(!"should not be here");
return NULL;
}
/* ------------------------------------------------------------------
* ExecInitShareInputScan
* ------------------------------------------------------------------
*/
ShareInputScanState *
ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags)
{
ShareInputScanState *sisstate;
Plan *outerPlan;
PlanState *childState;
Assert(innerPlan(node) == NULL);
/* create state data structure */
sisstate = makeNode(ShareInputScanState);
sisstate->ss.ps.plan = (Plan *) node;
sisstate->ss.ps.state = estate;
sisstate->ss.ps.ExecProcNode = ExecShareInputScan;
sisstate->ts_state = NULL;
sisstate->ts_pos = -1;
/*
* init child node.
* if outerPlan is NULL, this is no-op (so that the ShareInput node will be
* only init-ed once).
*/
/*
* initialize child nodes
*
* Like a Material node, we shield the child node from the need to support
* BACKWARD, or MARK/RESTORE.
*/
eflags &= ~(EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
outerPlan = outerPlan(node);
childState = ExecInitNode(outerPlan, estate, eflags);
outerPlanState(sisstate) = childState;
Assert(node->scan.plan.qual == NULL);
sisstate->ss.ps.qual = NULL;
/* Misc initialization
*
* Create expression context
*/
ExecAssignExprContext(estate, &sisstate->ss.ps);
/*
* Initialize result slot and type.
*/
ExecInitResultTupleSlotTL(&sisstate->ss.ps, &TTSOpsMinimalTuple);
sisstate->ss.ps.ps_ProjInfo = NULL;
/*
* When doing EXPLAIN only, we won't actually execute anything, so don't
* bother initializing the state. This isn't merely an optimization:
* closing a cross-slice ShareInputScan waits for the consumers to finish,
* but if we don't execute anything, it will hang forever.
*
* We could also exit here immediately if this is an "alien" node, i.e.
* a node that doesn't execute in this slice, but we can't easily
* detect that here.
*/
if ((eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0)
return sisstate;
shareinput_local_state *local_state;
/* expand the list if necessary */
while (list_length(estate->es_sharenode) <= node->share_id)
{
local_state = palloc0(sizeof(shareinput_local_state));
local_state->ready = false;
estate->es_sharenode = lappend(estate->es_sharenode, local_state);
}
local_state = list_nth(estate->es_sharenode, node->share_id);
/*
* To accumulate the number of CTE consumers executed in this slice.
* This variable will be used by the last finishing CTE consumer
* in current slice, to wake the corresponding CTE producer up for
* cleaning the materialized tuplestore, during squelching.
*/
if (currentSliceId == node->this_slice_id &&
currentSliceId != node->producer_slice_id)
local_state->nsharers++;
if (childState)
local_state->childState = childState;
sisstate->local_state = local_state;
/* Get a lease on the shared state */
if (node->cross_slice)
sisstate->ref = get_shareinput_reference(node->share_id);
else
sisstate->ref = NULL;
return sisstate;
}
/*
* ExecShareInputScanExplainEnd
* Called before ExecutorEnd to finish EXPLAIN ANALYZE reporting.
*
* Some of the cleanup that ordinarily would occur during ExecEndShareInputScan()
* needs to be done earlier in order to report statistics to EXPLAIN ANALYZE.
* Note that ExecEndShareInputScan() will still be during ExecutorEnd().
*/
static void
ExecShareInputScanExplainEnd(PlanState *planstate, struct StringInfoData *buf)
{
ShareInputScan *sisc = (ShareInputScan *) planstate->plan;
shareinput_local_state *local_state = ((ShareInputScanState *) planstate)->local_state;
/*
* Release tuplestore resources
*/
if (!sisc->cross_slice && local_state && local_state->ts_state)
{
tuplestore_end(local_state->ts_state);
local_state->ts_state = NULL;
}
}
/* ------------------------------------------------------------------
* ExecEndShareInputScan
* ------------------------------------------------------------------
*/
void
ExecEndShareInputScan(ShareInputScanState *node)
{
EState *estate = node->ss.ps.state;
ShareInputScan *sisc = (ShareInputScan *) node->ss.ps.plan;
shareinput_local_state *local_state = node->local_state;
/* clean up tuple table */
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
if (node->ref)
{
if (sisc->this_slice_id == currentSliceId || estate->es_plannedstmt->numSlices == 1)
{
/*
* The producer needs to wait for all the consumers to finish.
* Consumers signal the producer that they're done reading,
* but are free to exit immediately after that.
*/
if (currentSliceId == sisc->producer_slice_id)
{
if (!local_state->ready)
init_tuplestore_state(node);
shareinput_writer_waitdone(node->ref, sisc->nconsumers);
}
else
{
if (!local_state->closed)
{
shareinput_reader_notifydone(node->ref, sisc->nconsumers);
local_state->closed = true;
}
}
}
release_shareinput_reference(node->ref);
node->ref = NULL;
}
if (local_state && local_state->ts_state)
{
tuplestore_end(local_state->ts_state);
local_state->ts_state = NULL;
}
/*
* shutdown subplan. First scanner of underlying share input will
* do the shutdown, all other scanners are no-op because outerPlanState
* is NULL
*/
ExecEndNode(outerPlanState(node));
}
/* ------------------------------------------------------------------
* ExecReScanShareInputScan
* ------------------------------------------------------------------
*/
void
ExecReScanShareInputScan(ShareInputScanState *node)
{
/* On first call, initialize the tuplestore state */
if (!node->isready)
init_tuplestore_state(node);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
Assert(node->ts_pos != -1);
tuplestore_select_read_pointer(node->ts_state, node->ts_pos);
tuplestore_rescan(node->ts_state);
}
/*
* This is called when the node above us has finished and will not need any more
* rows from us.
*/
void
ExecSquelchShareInputScan(ShareInputScanState *node)
{
EState *estate = node->ss.ps.state;
ShareInputScan *sisc = (ShareInputScan *) node->ss.ps.plan;
shareinput_local_state *local_state = node->local_state;
/* clean up tuple table */
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
/*
* If this SharedInputScan is shared within the same slice then its
* subtree may still need to be executed and the motions in the subtree
* cannot yet be stopped. Thus, don't recurse in this case.
*
* In squelching a cross-slice SharedInputScan writer, we need to ensure
* we don't block any reader on other slices as a result of not
* materializing the shared plan.
*
* Note that we emphatically can't "fake" an empty tuple store and just
* go ahead waking up the readers because that can lead to wrong results.
*/
if (sisc->cross_slice && node->ref)
{
if (currentSliceId == sisc->producer_slice_id || estate->es_plannedstmt->numSlices == 1)
{
/*
* We are the producer. If we haven't materialized the tuplestore
* yet, we need to do it now, even though we won't need the data
* for anything. There might be other consumers that need it, and
* they will hang waiting for us forever otherwise.
*/
if (!local_state->ready)
{
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): initializing because squelched",
sisc->share_id, currentSliceId);
init_tuplestore_state(node);
}
}
else
{
/* We are a consumer. Let the producer know that we're done. */
Assert(!local_state->closed);
local_state->ndone++;
if (local_state->ndone == local_state->nsharers)
{
shareinput_reader_notifydone(node->ref, sisc->nconsumers);
local_state->closed = true;
}
release_shareinput_reference(node->ref);
node->ref = NULL;
}
}
}
/*************************************************************************
* IPC, for cross-slice variants.
**************************************************************************/
/*
* When creating a tuplestore file that will be accessed by
* multiple processes, shareinput_create_bufname_prefix() is used to
* construct the name for it.
*/
void
shareinput_create_bufname_prefix(char* p, int size, int share_id)
{
snprintf(p, size, "SIRW_%d_%d_%d",
gp_session_id, gp_command_count, share_id);
}
/*
* Initialization of the shared hash table for cross-slice communication.
*
* XXX: Use MaxBackends to size it, on the assumption that max_connections
* will scale accordingly to query complexity. This is quite fuzzy, you could
* create a query with tons of cross-slice ShareInputScans but only a few
* slice, but that ought to be rare enough in practice. This isn't a hard
* limit anyway, the hash table will use up any "slop" in shared memory if
* needed.
*/
#define N_SHAREINPUT_SLOTS() (MaxBackends * 5)
Size
ShareInputShmemSize(void)
{
Size size;
size = hash_estimate_size(N_SHAREINPUT_SLOTS(), sizeof(shareinput_Xslice_state));
return size;
}
void
ShareInputShmemInit(void)
{
bool found;
shareinput_Xslice_dsm_handle_ptr =
ShmemInitStruct("ShareInputScan DSM handle", sizeof(dsm_handle), &found);
if (!found)
{
HASHCTL info;
/* GPDB_12_MERGE_FIXME: would be nicer to store this hash in the DSM segment or DSA */
info.keysize = sizeof(shareinput_tag);
info.entrysize = sizeof(shareinput_Xslice_state);
shareinput_Xslice_hash = ShmemInitHash("ShareInputScan notifications",
N_SHAREINPUT_SLOTS(),
N_SHAREINPUT_SLOTS(),
&info,
HASH_ELEM | HASH_BLOBS);
}
}
/*
* Get reference to the SharedFileSet used to hold all the tuplestore files.
*
* This is exported so that it can also be used by the INITPLAN function
* tuplestores.
*/
SharedFileSet *
get_shareinput_fileset(void)
{
dsm_handle handle;
if (shareinput_Xslice_fileset == NULL)
{
dsm_segment *seg;
LWLockAcquire(ShareInputScanLock, LW_EXCLUSIVE);
handle = *shareinput_Xslice_dsm_handle_ptr;
if (handle)
{
seg = dsm_attach(handle);
if (seg == NULL)
elog(ERROR, "could not attach to ShareInputScan DSM segment");
dsm_pin_mapping(seg);
shareinput_Xslice_fileset = dsm_segment_address(seg);
}
else
{
seg = dsm_create(sizeof(SharedFileSet), 0);
dsm_pin_segment(seg);
*shareinput_Xslice_dsm_handle_ptr = dsm_segment_handle(seg);
dsm_pin_mapping(seg);
shareinput_Xslice_fileset = dsm_segment_address(seg);
}
if (shareinput_Xslice_fileset->refcnt == 0)
SharedFileSetInit(shareinput_Xslice_fileset, seg);
else
SharedFileSetAttach(shareinput_Xslice_fileset, seg);
LWLockRelease(ShareInputScanLock);
}
return shareinput_Xslice_fileset;
}
/*
* Get a reference to slot in shared memory for this shared scan.
*
* If the slot doesn't exist yet, it is created and initialized into
* "not ready" state.
*
* The reference is tracked by the current ResourceOwner, and will be
* automatically released on abort.
*/
static shareinput_Xslice_reference *
get_shareinput_reference(int share_id)
{
shareinput_tag tag;
shareinput_Xslice_state *xslice_state;
bool found;
shareinput_Xslice_reference *ref;
/* Register our resource owner callback to clean up on first call. */
if (!shareinput_resowner_callback_registered)
{
RegisterResourceReleaseCallback(shareinput_release_callback, NULL);
shareinput_resowner_callback_registered = true;
}
ref = MemoryContextAllocZero(TopMemoryContext,
sizeof(shareinput_Xslice_reference));
LWLockAcquire(ShareInputScanLock, LW_EXCLUSIVE);
tag.session_id = gp_session_id;
tag.command_count = gp_command_count;
tag.share_id = share_id;
xslice_state = hash_search(shareinput_Xslice_hash,
&tag,
HASH_ENTER_NULL,
&found);
if (!found)
{
if (xslice_state == NULL)
{
pfree(ref);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of cross-slice ShareInputScan slots")));
}
xslice_state->refcount = 0;
pg_atomic_init_u32(&xslice_state->ready, 0);
pg_atomic_init_u32(&xslice_state->ndone, 0);
ConditionVariableInit(&xslice_state->ready_done_cv);
}
xslice_state->refcount++;
ref->share_id = share_id;
ref->xslice_state = xslice_state;
ref->owner = CurrentResourceOwner;
dlist_push_head(&shareinput_Xslice_refs, &ref->node);
LWLockRelease(ShareInputScanLock);
return ref;
}
/*
* Release reference to a shared scan.
*
* The reference count in the shared memory slot is decreased, and if
* it reaches zero, it is destroyed.
*/
static void
release_shareinput_reference(shareinput_Xslice_reference *ref)
{
shareinput_Xslice_state *state = ref->xslice_state;
LWLockAcquire(ShareInputScanLock, LW_EXCLUSIVE);
if (state->refcount == 1)
{
bool found;
(void) hash_search(shareinput_Xslice_hash,
&state->tag,
HASH_REMOVE,
&found);
Assert(found);
}
else
state->refcount--;
dlist_delete(&ref->node);
LWLockRelease(ShareInputScanLock);
pfree(ref);
}
/*
* Callback to release references on transaction abort.
*/
static void
shareinput_release_callback(ResourceReleasePhase phase,
bool isCommit,
bool isTopLevel,
void *arg)
{
dlist_mutable_iter miter;
if (phase != RESOURCE_RELEASE_BEFORE_LOCKS)
return;
dlist_foreach_modify(miter, &shareinput_Xslice_refs)
{
shareinput_Xslice_reference *ref =
dlist_container(shareinput_Xslice_reference,
node,
miter.cur);
if (ref->owner == CurrentResourceOwner)
{
if (isCommit)
elog(WARNING, "shareinput lease reference leak: lease %p still referenced", ref);
release_shareinput_reference(ref);
}
}
}
/*
* shareinput_reader_waitready
*
* Called by the reader (consumer) to wait for the writer (producer) to produce
* all the tuples and write them to disk.
*
* This is a blocking operation.
*/
static void
shareinput_reader_waitready(shareinput_Xslice_reference *ref)
{
shareinput_Xslice_state *state = ref->xslice_state;
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Waiting for producer",
ref->share_id, currentSliceId);
/*
* Wait until the the producer sets 'ready' to true. The producer will
* use the condition variable to wake us up.
*/
for (;;)
{
/*
* set state->ready via pg_atomic_exchange_u32() in shareinput_writer_notifyready()
* it acts as a memory barrier, so always get the latest value here
*/
int ready = pg_atomic_read_u32(&state->ready);
if (ready)
break;
ConditionVariableSleep(&state->ready_done_cv, WAIT_EVENT_SHAREINPUT_SCAN);
}
ConditionVariableCancelSleep();
/* it's ready now */
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
ref->share_id, currentSliceId);
}
/*
* shareinput_writer_notifyready
*
* Called by the writer (producer) once it is done producing all tuples and
* writing them to disk. It notifies all the readers (consumers) that tuples
* are ready to be read from disk.
*/
static void
shareinput_writer_notifyready(shareinput_Xslice_reference *ref)
{
shareinput_Xslice_state *state = ref->xslice_state;
uint32 old_ready PG_USED_FOR_ASSERTS_ONLY = pg_atomic_exchange_u32(&state->ready, 1);
Assert(old_ready == 0);
#ifdef FAULT_INJECTOR
SIMPLE_FAULT_INJECTOR("shareinput_writer_notifyready");
#endif
ConditionVariableBroadcast(&state->ready_done_cv);
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): wrote notify_ready",
ref->share_id, currentSliceId);
}
/*
* shareinput_reader_notifydone
*
* Called by the reader (consumer) to notify the writer (producer) that
* it is done reading tuples from disk.
*
* This is a non-blocking operation.
*/
static void
shareinput_reader_notifydone(shareinput_Xslice_reference *ref, int nconsumers)
{
shareinput_Xslice_state *state = ref->xslice_state;
int ndone = pg_atomic_add_fetch_u32(&state->ndone, 1);
/* If we were the last consumer, wake up the producer. */
if (ndone >= nconsumers)
ConditionVariableBroadcast(&state->ready_done_cv);
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): wrote notify_done",
ref->share_id, currentSliceId);
}
/*
* shareinput_writer_waitdone
*
* Called by the writer (producer) to wait for the "done" notification from
* all readers (consumers).
*
* This is a blocking operation.
*/
static void
shareinput_writer_waitdone(shareinput_Xslice_reference *ref, int nconsumers)
{
shareinput_Xslice_state *state = ref->xslice_state;
int ready = pg_atomic_read_u32(&state->ready);
if (!ready)
elog(ERROR, "shareinput_writer_waitdone() called without creating the tuplestore");
ConditionVariablePrepareToSleep(&state->ready_done_cv);
for (;;)
{
/*
* set state->ndone via pg_atomic_add_fetch_u32() in shareinput_reader_notifydone()
* it acts as a memory barrier, so always get the latest value here
*/
int ndone = pg_atomic_read_u32(&state->ndone);
if (ndone < nconsumers)
{
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): waiting for DONE message from %d / %d readers",
ref->share_id, currentSliceId, nconsumers - ndone, nconsumers);
ConditionVariableSleep(&state->ready_done_cv, WAIT_EVENT_SHAREINPUT_SCAN);
continue;
}
ConditionVariableCancelSleep();
if (ndone > nconsumers)
elog(WARNING, "%d sharers of ShareInputScan reported to be done, but only %d were expected",
ndone, nconsumers);
break;
}
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): got DONE message from %d readers",
ref->share_id, currentSliceId, nconsumers);
/* it's all done now */
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦