greenplumn pipe 源码

  • 2022-08-18
  • 浏览 (536)

greenplumn pipe 代码

文件路径:/gpcontrib/orafce/pipe.c

#include "postgres.h"
#include "funcapi.h"
#include "fmgr.h"
#include "access/htup_details.h"
#include "storage/shmem.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
#include "storage/lwlock.h"
#include "miscadmin.h"
#include "string.h"
#include "lib/stringinfo.h"
#include "catalog/pg_type.h"
#include "utils/builtins.h"
#include "utils/date.h"
#include "utils/numeric.h"

#include "shmmc.h"
#include "pipe.h"
#include "orafce.h"
#include "builtins.h"

/*
 * @ Pavel Stehule 2006-2018
 */

#ifndef _GetCurrentTimestamp
#define _GetCurrentTimestamp()	GetCurrentTimestamp()
#endif

#ifndef GetNowFloat
#ifdef HAVE_INT64_TIMESTAMP
#define GetNowFloat()   ((float8) _GetCurrentTimestamp() / 1000000.0)
#else
#define GetNowFloat()   _GetCurrentTimestamp()
#endif
#endif

#define RESULT_DATA	0
#define RESULT_WAIT	1

#define ONE_YEAR (60*60*24*365)

PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_text);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_text);
PG_FUNCTION_INFO_V1(dbms_pipe_send_message);
PG_FUNCTION_INFO_V1(dbms_pipe_receive_message);
PG_FUNCTION_INFO_V1(dbms_pipe_unique_session_name);
PG_FUNCTION_INFO_V1(dbms_pipe_list_pipes);
PG_FUNCTION_INFO_V1(dbms_pipe_next_item_type);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe_2);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe_1);
PG_FUNCTION_INFO_V1(dbms_pipe_reset_buffer);
PG_FUNCTION_INFO_V1(dbms_pipe_purge);
PG_FUNCTION_INFO_V1(dbms_pipe_remove_pipe);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_date);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_date);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_timestamp);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_timestamp);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_number);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_number);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_bytea);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_bytea);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_record);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_record);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_integer);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_bigint);

typedef enum {
	IT_NO_MORE_ITEMS = 0,
	IT_NUMBER = 9,
	IT_VARCHAR = 11,
	IT_DATE = 12,
	IT_TIMESTAMPTZ = 13,
	IT_BYTEA = 23,
	IT_RECORD = 24
} message_data_type;

typedef struct _queue_item {
	void *ptr;
	struct _queue_item *next_item;
} queue_item;

typedef struct {
	bool is_valid;
	bool registered;
	char *pipe_name;
	char *creator;
	Oid  uid;
	struct _queue_item *items;
	int16 count;
	int16 limit;
	int size;
} orafce_pipe;

typedef struct {
	int32 size;
	message_data_type type;
	Oid tupType;
} message_data_item;

typedef struct {
	int32 size;
	int32 items_count;
	message_data_item *next;
} message_buffer;

#define message_buffer_size		(MAXALIGN(sizeof(message_buffer)))
#define message_buffer_get_content(buf)	((message_data_item *) (((char*)buf)+message_buffer_size))


#define message_data_item_size	(MAXALIGN(sizeof(message_data_item)))
#define message_data_get_content(msg) (((char *)msg) + message_data_item_size)
#define message_data_item_next(msg) \
	((message_data_item *) (message_data_get_content(msg) + MAXALIGN(msg->size)))

typedef struct PipesFctx {
	int pipe_nth;
} PipesFctx;

typedef struct
{
#if PG_VERSION_NUM >= 90600

	int tranche_id;
	LWLock shmem_lock;
#else

	LWLockId shmem_lockid;

#endif

	orafce_pipe *pipes;
	alert_event *events;
	alert_lock *locks;
	size_t size;
	int sid;
	vardata data[1]; /* flexible array member */
} sh_memory;

#define sh_memory_size			(offsetof(sh_memory, data))

message_buffer *output_buffer = NULL;
message_buffer *input_buffer = NULL;

orafce_pipe* pipes = NULL;

#define NOT_INITIALIZED		NULL

LWLockId shmem_lockid = NOT_INITIALIZED;;

int sid;                                 /* session id */

extern alert_event *events;
extern alert_lock  *locks;

/*
 * write on writer size bytes from ptr
 */

static void
pack_field(message_buffer *buffer, message_data_type type,
			int32 size, void *ptr, Oid tupType)
{
	int len;
	message_data_item *message;

	len = MAXALIGN(size) + message_data_item_size;
	if (MAXALIGN(buffer->size) + len > LOCALMSGSZ - message_buffer_size)
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of memory"),
				 errdetail("Packed message is bigger than local buffer."),
				 errhint("Increase LOCALMSGSZ in 'pipe.h' and recompile library.")));

	if (buffer->next == NULL)
		buffer->next =  message_buffer_get_content(buffer);

	message = buffer->next;

	message->size = size;
	message->type = type;
	message->tupType = tupType;

	/* padding bytes have to be zeroed - buffer creator is responsible to clear memory */

	memcpy(message_data_get_content(message), ptr, size);

	buffer->size += len;
	buffer->items_count++;
	buffer->next = message_data_item_next(message);
}


static void*
unpack_field(message_buffer *buffer, message_data_type *type,
				int32 *size, Oid *tupType)
{
	void *ptr;
	message_data_item *message;

	Assert(buffer != NULL);
	Assert(buffer->items_count > 0);
	Assert(buffer->next != NULL);

	message = buffer->next;
	*size = message->size;
	*type = message->type;
	*tupType = message->tupType;
	ptr = message_data_get_content(message);

	buffer->next = --buffer->items_count > 0 ? message_data_item_next(message) : NULL;

	return ptr;
}


/*
 * Add ptr to queue. If pipe doesn't exist, register new pipe
 */

bool
ora_lock_shmem(size_t size, int max_pipes, int max_events, int max_locks, bool reset)
{
	int i;
	bool found;

	sh_memory *sh_mem;

	if (pipes == NULL)
	{
		sh_mem = ShmemInitStruct("dbms_pipe", size, &found);
		if (sh_mem == NULL)
			ereport(FATAL,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory"),
					 errdetail("Failed while allocation block %lu bytes in shared memory.", (unsigned long) size)));

		if (!found)
		{

#if PG_VERSION_NUM >= 90600

			sh_mem->tranche_id = LWLockNewTrancheId();
			LWLockInitialize(&sh_mem->shmem_lock, sh_mem->tranche_id);

			{

#if PG_VERSION_NUM >= 100000

				LWLockRegisterTranche(sh_mem->tranche_id, "orafce");

#else

				static LWLockTranche tranche;

				tranche.name = "orafce";
				tranche.array_base = &sh_mem->shmem_lock;
				tranche.array_stride = sizeof(LWLock);
				LWLockRegisterTranche(sh_mem->tranche_id, &tranche);

#endif

				shmem_lockid = &sh_mem->shmem_lock;
			}

#else

			shmem_lockid = sh_mem->shmem_lockid = LWLockAssign();

#endif

			LWLockAcquire(shmem_lockid, LW_EXCLUSIVE);

			sh_mem->size = size - sh_memory_size;
			ora_sinit(sh_mem->data, size, true);
			pipes = sh_mem->pipes = ora_salloc(max_pipes*sizeof(orafce_pipe));
			sid = sh_mem->sid = 1;
			for (i = 0; i < max_pipes; i++)
				pipes[i].is_valid = false;

			events = sh_mem->events = ora_salloc(max_events*sizeof(alert_event));
			locks = sh_mem->locks = ora_salloc(max_locks*sizeof(alert_lock));

			for (i = 0; i < max_events; i++)
			{
				events[i].event_name = NULL;
				events[i].max_receivers = 0;
				events[i].receivers = NULL;
				events[i].messages = NULL;
			}
			for (i = 0; i < max_locks; i++)
			{
				locks[i].sid = -1;
				locks[i].echo = NULL;
			}

		}
		else if (pipes == NULL)
		{

#if PG_VERSION_NUM >= 90600


#if PG_VERSION_NUM >= 100000

			LWLockRegisterTranche(sh_mem->tranche_id, "orafce");

#else

			static LWLockTranche tranche;

			tranche.name = "orafce";
			tranche.array_base = &sh_mem->shmem_lock;
			tranche.array_stride = sizeof(LWLock);
			LWLockRegisterTranche(sh_mem->tranche_id, &tranche);

#endif

			shmem_lockid = &sh_mem->shmem_lock;

#else

			shmem_lockid = sh_mem->shmem_lockid;

#endif

			pipes = sh_mem->pipes;
			LWLockAcquire(shmem_lockid, LW_EXCLUSIVE);

			ora_sinit(sh_mem->data, sh_mem->size, reset);
			sid = ++(sh_mem->sid);
			events = sh_mem->events;
			locks = sh_mem->locks;
		}
	}
	else
	{
		LWLockAcquire(shmem_lockid, LW_EXCLUSIVE);
	}

	return pipes != NULL;
}


/*
 * can be enhanced access/hash.h
 */

static orafce_pipe*
find_pipe(text* pipe_name, bool* created, bool only_check)
{
	int i;
	orafce_pipe *result = NULL;

	*created = false;
	for (i = 0; i < MAX_PIPES; i++)
	{
		if (pipes[i].is_valid &&
			strncmp((char*)VARDATA(pipe_name), pipes[i].pipe_name, VARSIZE(pipe_name) - VARHDRSZ) == 0
			&& (strlen(pipes[i].pipe_name) == (VARSIZE(pipe_name) - VARHDRSZ)))
		{
			/* check owner if non public pipe */

			if (pipes[i].creator != NULL && pipes[i].uid != GetUserId())
			{
				LWLockRelease(shmem_lockid);
				ereport(ERROR,
						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
						 errmsg("insufficient privilege"),
						 errdetail("Insufficient privilege to access pipe")));
			}

			return &pipes[i];
		}
	}

	if (only_check)
		return result;

	for (i = 0; i < MAX_PIPES; i++)
		if (!pipes[i].is_valid)
		{
			if (NULL != (pipes[i].pipe_name = ora_scstring(pipe_name)))
			{
				pipes[i].is_valid = true;
				pipes[i].registered = false;
				pipes[i].creator = NULL;
				pipes[i].uid = -1;
				pipes[i].count = 0;
				pipes[i].limit = -1;

				*created = true;
				result = &pipes[i];
			}
			break;
		}

	return result;
}


static bool
new_last(orafce_pipe *p, void *ptr)
{
	queue_item *q, *aux_q;

	if (p->count >= p->limit && p->limit != -1)
		return false;

	if (p->items == NULL)
	{
		if (NULL == (p->items = ora_salloc(sizeof(queue_item))))
			return false;
		p->items->next_item = NULL;
		p->items->ptr = ptr;
		p->count = 1;
		return true;
	}
	q = p->items;
	while (q->next_item != NULL)
		q = q->next_item;


	if (NULL == (aux_q = ora_salloc(sizeof(queue_item))))
		return false;

	q->next_item = aux_q;
	aux_q->next_item = NULL;
	aux_q->ptr = ptr;

	p->count += 1;

	return true;
}


static void*
remove_first(orafce_pipe *p, bool *found)
{
	struct _queue_item *q;
	void *ptr = NULL;

	*found = false;

	if (NULL != (q = p->items))
	{
		p->count -= 1;
		ptr = q->ptr;
		p->items = q->next_item;
		*found = true;

		ora_sfree(q);
		if (p->items == NULL && !p->registered)
		{
			ora_sfree(p->pipe_name);
			p->is_valid = false;
		}

	}

	return ptr;
}


/* copy message to local memory, if exists */

static message_buffer*
get_from_pipe(text *pipe_name, bool *found)
{
	orafce_pipe *p;
	bool created;
	message_buffer *shm_msg;
	message_buffer *result = NULL;

	if (!ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS, false))
		return NULL;

	if (NULL != (p = find_pipe(pipe_name, &created,false)))
	{
		if (!created)
		{
			if (NULL != (shm_msg = remove_first(p, found)))
			{
				p->size -= shm_msg->size;

				result = (message_buffer*) MemoryContextAlloc(TopMemoryContext, shm_msg->size);
				memcpy(result, shm_msg, shm_msg->size);
				ora_sfree(shm_msg);
			}
		}
	}

	LWLockRelease(shmem_lockid);

	return result;
}


/*
 * if ptr is null, then only register pipe
 */

static bool
add_to_pipe(text *pipe_name, message_buffer *ptr, int limit, bool limit_is_valid)
{
	orafce_pipe *p;
	bool created;
	bool result = false;
	message_buffer *sh_ptr;

	if (!ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS,false))
		return false;

	for (;;)
	{
		if (NULL != (p = find_pipe(pipe_name, &created, false)))
		{
			if (created)
				p->registered = ptr == NULL;

			if (limit_is_valid && (created || (p->limit < limit)))
				p->limit = limit;

			if (ptr != NULL)
			{
				if (NULL != (sh_ptr = ora_salloc(ptr->size)))
				{
					memcpy(sh_ptr,ptr,ptr->size);
					if (new_last(p, sh_ptr))
					{
						p->size += ptr->size;
						result = true;
						break;
					}
					ora_sfree(sh_ptr);
				}
				if (created)
				{
					/* I created new pipe, but haven't memory for new value */
					ora_sfree(p->pipe_name);
					p->is_valid = false;
					result = false;
				}
			}
			else
				result = true;
		}
		break;
	}
	LWLockRelease(shmem_lockid);
	return result;
}


static void
remove_pipe(text *pipe_name, bool purge)
{
	orafce_pipe *p;
	bool created;

	if (NULL != (p = find_pipe(pipe_name, &created, true)))
	{
		queue_item *q = p->items;
		while (q != NULL)
		{
			queue_item *aux_q;

			aux_q = q->next_item;
			if (q->ptr)
				ora_sfree(q->ptr);
			ora_sfree(q);
			q = aux_q;
		}
		p->items = NULL;
		p->size = 0;
		p->count = 0;
		if (!(purge && p->registered))
		{
			ora_sfree(p->pipe_name);
			p->is_valid = false;
		}
	}
}


Datum
dbms_pipe_next_item_type (PG_FUNCTION_ARGS)
{
	PG_RETURN_INT32(input_buffer != NULL ? input_buffer->next->type : IT_NO_MORE_ITEMS);
}


static void
init_buffer(message_buffer *buffer, int32 size)
{
	memset(buffer, 0, size);
	buffer->size = message_buffer_size;
	buffer->items_count = 0;
	buffer->next = message_buffer_get_content(buffer);
}

static message_buffer*
check_buffer(message_buffer *buffer, int32 size)
{
	if (buffer == NULL)
	{
		buffer = (message_buffer*) MemoryContextAlloc(TopMemoryContext, size);
		if (buffer == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory"),
					 errdetail("Failed while allocation block %d bytes in memory.", size)));

		init_buffer(buffer, size);
	}

	return buffer;
}

Datum
dbms_pipe_pack_message_text(PG_FUNCTION_ARGS)
{
	text *str = PG_GETARG_TEXT_PP(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_VARCHAR,
		VARSIZE_ANY_EXHDR(str), VARDATA_ANY(str), InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_date(PG_FUNCTION_ARGS)
{
	DateADT dt = PG_GETARG_DATEADT(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_DATE,
			   sizeof(dt), &dt, InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_timestamp(PG_FUNCTION_ARGS)
{
	TimestampTz dt = PG_GETARG_TIMESTAMPTZ(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_TIMESTAMPTZ,
			   sizeof(dt), &dt, InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_number(PG_FUNCTION_ARGS)
{
	Numeric num = PG_GETARG_NUMERIC(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_NUMBER,
			   VARSIZE(num) - VARHDRSZ, VARDATA(num), InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_bytea(PG_FUNCTION_ARGS)
{
	bytea *data = PG_GETARG_BYTEA_P(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_BYTEA,
		VARSIZE_ANY_EXHDR(data), VARDATA_ANY(data), InvalidOid);

	PG_RETURN_VOID();
}

static void
init_args_3(FunctionCallInfo info, Datum arg0, Datum arg1, Datum arg2)
{
#if PG_VERSION_NUM >= 120000

	info->args[0].value = arg0;
	info->args[1].value = arg1;
	info->args[2].value = arg2;
	info->args[0].isnull = false;
	info->args[1].isnull = false;
	info->args[2].isnull = false;

#else

	info->arg[0] = arg0;
	info->arg[1] = arg1;
	info->arg[2] = arg2;
	info->argnull[0] = false;
	info->argnull[1] = false;
	info->argnull[2] = false;

#endif
}


/*
 *  We can serialize only typed record
 */

Datum
dbms_pipe_pack_message_record(PG_FUNCTION_ARGS)
{
	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
	Oid tupType;
	bytea *data;

#if PG_VERSION_NUM >= 120000

	LOCAL_FCINFO(info, 3);

#else

	FunctionCallInfoData info_data;
	FunctionCallInfo info = &info_data;

#endif


	tupType = HeapTupleHeaderGetTypeId(rec);

	/*
	 * Normally one would call record_send() using DirectFunctionCall3,
	 * but that does not work since record_send wants to cache some data
	 * using fcinfo->flinfo->fn_extra.  So we need to pass it our own
	 * flinfo parameter.
	 */
	InitFunctionCallInfoData(*info, fcinfo->flinfo, 3, InvalidOid, NULL, NULL);
	init_args_3(info, PointerGetDatum(rec), ObjectIdGetDatum(tupType), Int32GetDatum(-1));

	data = (bytea*) DatumGetPointer(record_send(info));

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_RECORD,
			   VARSIZE(data), VARDATA(data), tupType);

	PG_RETURN_VOID();
}


static Datum
dbms_pipe_unpack_message(PG_FUNCTION_ARGS, message_data_type dtype)
{
	Oid		tupType;
	void *ptr;
	message_data_type type;
	int32 size;
	Datum result;
	message_data_type next_type;

	if (input_buffer == NULL ||
		input_buffer->items_count <= 0 ||
		input_buffer->next == NULL ||
		input_buffer->next->type == IT_NO_MORE_ITEMS)
		PG_RETURN_NULL();

	next_type = input_buffer->next->type;
	if (next_type != dtype)
		ereport(ERROR,
				(errcode(ERRCODE_DATATYPE_MISMATCH),
				 errmsg("datatype mismatch"),
				 errdetail("unpack unexpected type: %d", next_type)));

	ptr = unpack_field(input_buffer, &type, &size, &tupType);
	Assert(ptr != NULL);

	switch (type)
	{
		case IT_TIMESTAMPTZ:
			result = TimestampTzGetDatum(*(TimestampTz*)ptr);
			break;
		case IT_DATE:
			result = DateADTGetDatum(*(DateADT*)ptr);
			break;
		case IT_VARCHAR:
		case IT_NUMBER:
		case IT_BYTEA:
			result = PointerGetDatum(cstring_to_text_with_len(ptr, size));
			break;
		case IT_RECORD:
		{
#if PG_VERSION_NUM >= 120000

			LOCAL_FCINFO(info, 3);

#else

			FunctionCallInfoData info_data;
			FunctionCallInfo info = &info_data;

#endif

			StringInfoData	buf;
			text		   *data = cstring_to_text_with_len(ptr, size);

			buf.data = VARDATA(data);
			buf.len = VARSIZE(data) - VARHDRSZ;
			buf.maxlen = buf.len;
			buf.cursor = 0;

			/*
			 * Normally one would call record_recv() using DirectFunctionCall3,
			 * but that does not work since record_recv wants to cache some data
			 * using fcinfo->flinfo->fn_extra.  So we need to pass it our own
			 * flinfo parameter.
			 */
			InitFunctionCallInfoData(*info, fcinfo->flinfo, 3, InvalidOid, NULL, NULL);
			init_args_3(info, PointerGetDatum(&buf), ObjectIdGetDatum(tupType), Int32GetDatum(-1));

			result = record_recv(info);
			break;
		}
		default:
			elog(ERROR, "unexpected type: %d", type);
			result = (Datum) 0;	/* keep compiler quiet */
	}

	if (input_buffer->items_count == 0)
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	PG_RETURN_DATUM(result);
}


Datum
dbms_pipe_unpack_message_text(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_VARCHAR);
}


Datum
dbms_pipe_unpack_message_date(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_DATE);
}

Datum
dbms_pipe_unpack_message_timestamp(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_TIMESTAMPTZ);
}


Datum
dbms_pipe_unpack_message_number(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_NUMBER);
}


Datum
dbms_pipe_unpack_message_bytea(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_BYTEA);
}


Datum
dbms_pipe_unpack_message_record(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_RECORD);
}


#define WATCH_PRE(t, et, c) \
et = GetNowFloat() + (float8)t; c = 0; \
do \
{ \

#define WATCH_POST(t,et,c) \
if (GetNowFloat() >= et) \
PG_RETURN_INT32(RESULT_WAIT); \
if (cycle++ % 100 == 0) \
CHECK_FOR_INTERRUPTS(); \
pg_usleep(10000L); \
} while(true && t != 0);


Datum
dbms_pipe_receive_message(PG_FUNCTION_ARGS)
{
	text *pipe_name = NULL;
	int timeout = ONE_YEAR;
	int cycle = 0;
	float8 endtime;
	bool found = false;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));
	else
		pipe_name = PG_GETARG_TEXT_P(0);

	if (!PG_ARGISNULL(1))
		timeout = PG_GETARG_INT32(1);

	if (input_buffer != NULL)
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	WATCH_PRE(timeout, endtime, cycle);
	if (NULL != (input_buffer = get_from_pipe(pipe_name, &found)))
	{
		input_buffer->next = message_buffer_get_content(input_buffer);
		break;
	}
/* found empty message */
	if (found)
		break;

	WATCH_POST(timeout, endtime, cycle);
	PG_RETURN_INT32(RESULT_DATA);
}


Datum
dbms_pipe_send_message(PG_FUNCTION_ARGS)
{
	text *pipe_name = NULL;
	int timeout = ONE_YEAR;
	int limit = 0;
	bool valid_limit;

	int cycle = 0;
	float8 endtime;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));
	else
		pipe_name = PG_GETARG_TEXT_P(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);

	if (!PG_ARGISNULL(1))
		timeout = PG_GETARG_INT32(1);

	if (PG_ARGISNULL(2))
		valid_limit = false;
	else
	{
		limit = PG_GETARG_INT32(2);
		valid_limit = true;
	}

	if (input_buffer != NULL) /* XXX Strange? */
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	WATCH_PRE(timeout, endtime, cycle);
	if (add_to_pipe(pipe_name, output_buffer,
					limit, valid_limit))
		break;
	WATCH_POST(timeout, endtime, cycle);

	init_buffer(output_buffer, LOCALMSGSZ);

	PG_RETURN_INT32(RESULT_DATA);
}


Datum
dbms_pipe_unique_session_name (PG_FUNCTION_ARGS)
{
	StringInfoData strbuf;
	text *result;

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{
		initStringInfo(&strbuf);
		appendStringInfo(&strbuf,"PG$PIPE$%d$%d",sid, MyProcPid);

		result = cstring_to_text_with_len(strbuf.data, strbuf.len);
		pfree(strbuf.data);
		LWLockRelease(shmem_lockid);

		PG_RETURN_TEXT_P(result);
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_NULL();
}

#define DB_PIPES_COLS		6

Datum
dbms_pipe_list_pipes(PG_FUNCTION_ARGS)
{
	FuncCallContext *funcctx;
	TupleDesc        tupdesc;
	AttInMetadata   *attinmeta;
	PipesFctx       *fctx;

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	if (SRF_IS_FIRSTCALL())
	{
		int		i;
		MemoryContext  oldcontext;
		bool has_lock = false;

		WATCH_PRE(timeout, endtime, cycle);
		if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS, false))
		{
			has_lock = true;
			break;
		}
		WATCH_POST(timeout, endtime, cycle);
		if (!has_lock)
			LOCK_ERROR();

		funcctx = SRF_FIRSTCALL_INIT();
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		fctx = palloc(sizeof(PipesFctx));
		funcctx->user_fctx = fctx;
		fctx->pipe_nth = 0;

#if PG_VERSION_NUM >= 120000

		tupdesc = CreateTemplateTupleDesc(DB_PIPES_COLS);

#else

		tupdesc = CreateTemplateTupleDesc(DB_PIPES_COLS, false);

#endif

		i = 0;
		TupleDescInitEntry(tupdesc, ++i, "name",    VARCHAROID, -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "items",   INT4OID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "size",    INT4OID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "limit",   INT4OID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "private", BOOLOID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "owner",   VARCHAROID, -1, 0);
		Assert(i == DB_PIPES_COLS);

		attinmeta = TupleDescGetAttInMetadata(tupdesc);
		funcctx->attinmeta = attinmeta;

		MemoryContextSwitchTo(oldcontext);
	}

	funcctx = SRF_PERCALL_SETUP();
	fctx = (PipesFctx *) funcctx->user_fctx;

	while (fctx->pipe_nth < MAX_PIPES)
	{
		if (pipes[fctx->pipe_nth].is_valid)
		{
			Datum		result;
			HeapTuple	tuple;
			char	   *values[DB_PIPES_COLS];
			char		items[16];
			char		size[16];
			char		limit[16];

			/* name */
			values[0] = pipes[fctx->pipe_nth].pipe_name;
			/* items */
			snprintf(items, lengthof(items), "%d", pipes[fctx->pipe_nth].count);
			values[1] = items;
			/* items */
			snprintf(size, lengthof(size), "%d", pipes[fctx->pipe_nth].size);
			values[2] = size;
			/* limit */
			if (pipes[fctx->pipe_nth].limit != -1)
			{
				snprintf(limit, lengthof(limit), "%d", pipes[fctx->pipe_nth].limit);
				values[3] = limit;
			}
			else
				values[3] = NULL;
			/* private */
			values[4] = (pipes[fctx->pipe_nth].creator ? "true" : "false");
			/* owner */
			values[5] = pipes[fctx->pipe_nth].creator;

			tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
			result = HeapTupleGetDatum(tuple);

			fctx->pipe_nth += 1;
			SRF_RETURN_NEXT(funcctx, result);
		}
		fctx->pipe_nth += 1;
	}

	LWLockRelease(shmem_lockid);
	SRF_RETURN_DONE(funcctx);
}

/*
 * secondary functions
 */

/*
 * Registration explicit pipes
 *   dbms_pipe.create_pipe(pipe_name varchar, limit := -1 int, private := false bool);
 */

Datum
dbms_pipe_create_pipe (PG_FUNCTION_ARGS)
{
	text *pipe_name = NULL;
	int   limit = 0;
	bool  is_private;
	bool  limit_is_valid = false;
	bool  created;
	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));
	else
		pipe_name = PG_GETARG_TEXT_P(0);

	if (!PG_ARGISNULL(1))
	{
		limit = PG_GETARG_INT32(1);
		limit_is_valid = true;
	}

	is_private = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{
		orafce_pipe *p;
		if (NULL != (p = find_pipe(pipe_name, &created, false)))
		{
			if (!created)
			{
				LWLockRelease(shmem_lockid);
				ereport(ERROR,
						(errcode(ERRCODE_DUPLICATE_OBJECT),
						 errmsg("pipe creation error"),
						 errdetail("Pipe is registered.")));
			}
			if (is_private)
			{
				char *user;

				p->uid = GetUserId();

				user = (char*)DirectFunctionCall1(namein,
					    CStringGetDatum(GetUserNameFromId(p->uid, false)));

				p->creator = ora_sstrcpy(user);
				pfree(user);
			}
			p->limit = limit_is_valid ? limit : -1;
			p->registered = true;

			LWLockRelease(shmem_lockid);
			PG_RETURN_VOID();
		}
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_VOID();
}


/*
 * Clean local input, output buffers
 */

Datum
dbms_pipe_reset_buffer(PG_FUNCTION_ARGS)
{
	if (output_buffer != NULL)
	{
		pfree(output_buffer);
		output_buffer = NULL;
	}

	if (input_buffer != NULL)
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	PG_RETURN_VOID();
}


/*
 * Remove all stored messages in pipe. Remove implicit created
 * pipe.
 */

Datum
dbms_pipe_purge (PG_FUNCTION_ARGS)
{
	text *pipe_name = PG_GETARG_TEXT_P(0);

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{

		remove_pipe(pipe_name, true);
		LWLockRelease(shmem_lockid);

		PG_RETURN_VOID();
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_VOID();
}

/*
 * Remove pipe if exists
 */

Datum
dbms_pipe_remove_pipe (PG_FUNCTION_ARGS)
{
	text *pipe_name = PG_GETARG_TEXT_P(0);

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{

		remove_pipe(pipe_name, false);
		LWLockRelease(shmem_lockid);

		PG_RETURN_VOID();
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_VOID();
}


/*
 * Some void udf which I can't wrap in sql
 */

Datum
dbms_pipe_create_pipe_2 (PG_FUNCTION_ARGS)
{
	Datum	arg1;
	int		limit = -1;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));

	arg1 = PG_GETARG_DATUM(0);

	if (!PG_ARGISNULL(1))
		limit = PG_GETARG_INT32(1);

	DirectFunctionCall3(dbms_pipe_create_pipe,
						arg1,
						Int32GetDatum(limit),
						BoolGetDatum(false));

	PG_RETURN_VOID();
}

Datum
dbms_pipe_create_pipe_1 (PG_FUNCTION_ARGS)
{
	Datum	arg1;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));

	arg1 = PG_GETARG_DATUM(0);

	DirectFunctionCall3(dbms_pipe_create_pipe,
						arg1,
						(Datum) -1,
						BoolGetDatum(false));

	PG_RETURN_VOID();
}

Datum
dbms_pipe_pack_message_integer(PG_FUNCTION_ARGS)
{
	/* Casting from int4 to numeric */
	DirectFunctionCall1(dbms_pipe_pack_message_number,
				DirectFunctionCall1(int4_numeric, PG_GETARG_DATUM(0)));

	PG_RETURN_VOID();
}

Datum
dbms_pipe_pack_message_bigint(PG_FUNCTION_ARGS)
{
	/* Casting from int8 to numeric */
	DirectFunctionCall1(dbms_pipe_pack_message_number,
				DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(0)));

	PG_RETURN_VOID();
}

相关信息

greenplumn 源码目录

相关文章

greenplumn aggregate 源码

greenplumn alert 源码

greenplumn assert 源码

greenplumn assert 源码

greenplumn builtins 源码

greenplumn charlen 源码

greenplumn charpad 源码

greenplumn convert 源码

greenplumn datefce 源码

greenplumn file 源码

0  赞