greenplumn ic_tcp 源码
greenplumn ic_tcp 代码
文件路径:/src/backend/cdb/motion/ic_tcp.c
/*-------------------------------------------------------------------------
* ic_tcp.c
* Interconnect code specific to TCP transport.
*
* Portions Copyright (c) 2005-2008, Greenplum, Inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/cdb/motion/ic_tcp.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "common/ip.h"
#include "nodes/execnodes.h" /* ExecSlice, SliceTable */
#include "nodes/pg_list.h"
#include "nodes/print.h"
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "postmaster/postmaster.h"
#include "utils/builtins.h"
#include "cdb/cdbselect.h"
#include "cdb/tupchunklist.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp.h"
#ifdef ENABLE_IC_PROXY
#include "ic_proxy_backend.h"
#endif /* ENABLE_IC_PROXY */
#include <fcntl.h>
#include <limits.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <netinet/in.h>
#define USECS_PER_SECOND 1000000
#define MSECS_PER_SECOND 1000
/*
* GpMonotonicTime: used to guarantee that the elapsed time is in
* the monotonic order between two gp_get_monotonic_time calls.
*/
typedef struct GpMonotonicTime
{
struct timeval beginTime;
struct timeval endTime;
} GpMonotonicTime;
static void gp_set_monotonic_begin_time(GpMonotonicTime *time);
static void gp_get_monotonic_time(GpMonotonicTime *time);
static inline uint64 gp_get_elapsed_ms(GpMonotonicTime *time);
static inline uint64 gp_get_elapsed_us(GpMonotonicTime *time);
static inline int timeCmp(struct timeval *t1, struct timeval *t2);
/*
* backlog for listen() call: it is important that this be something like a
* good match for the maximum number of QEs. Slow insert performance will
* result if it is too low.
*/
#define CONNECT_RETRY_MS 4000
#define CONNECT_AGGRESSIVERETRY_MS 500
/* listener backlog is calculated at listener-creation time */
int listenerBacklog = 128;
/* our timeout value for select() and other socket operations. */
static struct timeval tval;
static inline MotionConn *
getMotionConn(ChunkTransportStateEntry *pEntry, int iConn)
{
Assert(pEntry);
Assert(pEntry->conns);
Assert(iConn < pEntry->numConns);
return pEntry->conns + iConn;
}
static ChunkTransportStateEntry *startOutgoingConnections(ChunkTransportState *transportStates,
ExecSlice *sendSlice,
int *pOutgoingCount);
static void format_fd_set(StringInfo buf, int nfds, mpp_fd_set *fds, char *pfx, char *sfx);
static void setupOutgoingConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn);
static void updateOutgoingConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, int errnoSave);
static void sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn);
static bool readRegisterMessage(ChunkTransportState *transportStates,
MotionConn *conn);
static MotionConn *acceptIncomingConnection(void);
static void flushInterconnectListenerBacklog(void);
static void waitOnOutbound(ChunkTransportStateEntry *pEntry);
static TupleChunkListItem RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute);
static TupleChunkListItem RecvTupleChunkFromTCP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute);
static void SendEosTCP(ChunkTransportState *transportStates,
int motNodeID, TupleChunkListItem tcItem);
static bool SendChunkTCP(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, TupleChunkListItem tcItem, int16 motionId);
static bool flushBuffer(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, int16 motionId);
static void doSendStopMessageTCP(ChunkTransportState *transportStates, int16 motNodeID);
#ifdef AMS_VERBOSE_LOGGING
static void dumpEntryConnections(int elevel, ChunkTransportStateEntry *pEntry);
static void print_connection(ChunkTransportState *transportStates, int fd, const char *msg);
#endif
/*
* setupTCPListeningSocket
*/
static void
setupTCPListeningSocket(int backlog, int *listenerSocketFd, uint16 *listenerPort)
{
int errnoSave;
int fd = -1;
const char *fun;
*listenerSocketFd = -1;
*listenerPort = 0;
struct sockaddr_storage addr;
socklen_t addrlen;
struct addrinfo hints;
struct addrinfo *addrs,
*rp;
int s;
char service[32];
/*
* we let the system pick the TCP port here so we don't have to manage
* port resources ourselves. So set the port to 0 (any port)
*/
snprintf(service, 32, "%d", 0);
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Two-way, out of band connection */
hints.ai_protocol = 0; /* Any protocol - TCP implied for network use due to SOCK_STREAM */
if (Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST)
{
Assert(interconnect_address && strlen(interconnect_address) > 0);
hints.ai_flags |= AI_NUMERICHOST;
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errmsg("getaddrinfo called with unicast address: %s",
interconnect_address)));
}
else
{
Assert(interconnect_address == NULL);
hints.ai_flags |= AI_PASSIVE;
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errmsg("getaddrinfo called with wildcard address")));
}
s = getaddrinfo(interconnect_address, service, &hints, &addrs);
if (s != 0)
elog(ERROR, "getaddrinfo says %s", gai_strerror(s));
/*
* getaddrinfo() returns a list of address structures, one for each valid
* address and family we can use.
*
* Try each address until we successfully bind. If socket (or bind) fails,
* we (close the socket and) try the next address. This can happen if the
* system supports IPv6, but IPv6 is disabled from working, or if it
* supports IPv6 and IPv4 is disabled.
*/
/*
* If there is both an AF_INET6 and an AF_INET choice, we prefer the
* AF_INET6, because on UNIX it can receive either protocol, whereas
* AF_INET can only get IPv4. Otherwise we'd need to bind two sockets,
* one for each protocol.
*
* Why not just use AF_INET6 in the hints? That works perfect if we know
* this machine supports IPv6 and IPv6 is enabled, but we don't know that.
*/
#ifdef HAVE_IPV6
if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6)
{
/*
* We got both an INET and INET6 possibility, but we want to prefer
* the INET6 one if it works. Reverse the order we got from
* getaddrinfo so that we try things in our preferred order. If we got
* more possibilities (other AFs??), I don't think we care about them,
* so don't worry if the list is more that two, we just rearrange the
* first two.
*/
struct addrinfo *temp = addrs->ai_next; /* second node */
addrs->ai_next = addrs->ai_next->ai_next; /* point old first node to
* third node if any */
temp->ai_next = addrs; /* point second node to first */
addrs = temp; /* start the list with the old second node */
}
#endif
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/*
* getaddrinfo gives us all the parameters for the socket() call as
* well as the parameters for the bind() call.
*/
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd == -1)
continue;
/*
* we let the system pick the TCP port here so we don't have to manage
* port resources ourselves.
*/
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0)
break; /* Success */
close(fd);
fd = -1;
}
fun = "bind";
if (fd == -1)
goto error;
/* Make socket non-blocking. */
fun = "fcntl(O_NONBLOCK)";
if (!pg_set_noblock(fd))
goto error;
fun = "listen";
if (listen(fd, backlog) < 0)
goto error;
/* Get the listening socket's port number. */
fun = "getsockname";
addrlen = sizeof(addr);
if (getsockname(fd, (struct sockaddr *) &addr, &addrlen) < 0)
goto error;
/* Give results to caller. */
*listenerSocketFd = fd;
/* display which port was chosen by the system. */
if (addr.ss_family == AF_INET6)
*listenerPort = ntohs(((struct sockaddr_in6 *) &addr)->sin6_port);
else
*listenerPort = ntohs(((struct sockaddr_in *) &addr)->sin_port);
freeaddrinfo(addrs);
return;
error:
errnoSave = errno;
if (fd >= 0)
closesocket(fd);
errno = errnoSave;
freeaddrinfo(addrs);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect Error: Could not set up tcp listener socket"),
errdetail("%s: %m", fun)));
} /* setupListeningSocket */
/*
* Initialize TCP specific comms.
*/
void
InitMotionTCP(int *listenerSocketFd, uint16 *listenerPort)
{
tval.tv_sec = 0;
tval.tv_usec = 500000;
setupTCPListeningSocket(listenerBacklog, listenerSocketFd, listenerPort);
return;
}
/* cleanup any TCP-specific comms info */
void
CleanupMotionTCP(void)
{
/* nothing to do. */
return;
}
/* Function readPacket() is used to read in the next packet from the given
* MotionConn.
*
* This call blocks until the packet is read in, and is part of a
* global scheme where senders block until the entire message is sent, and
* receivers block until the entire message is read. Both use non-blocking
* socket calls so that we can handle any PG interrupts.
*
* Note, that for speed we want to read a message all in one go,
* header and all. A consequence is that we may read in part of the
* next message, which we've got to keep track of ... recvBytes holds
* the byte-count of the unprocessed messages.
*
* PARAMETERS
* conn - MotionConn to read the packet from.
*
*/
/* static inline void */
void
readPacket(MotionConn *conn, ChunkTransportState *transportStates)
{
int n,
bytesRead = conn->recvBytes;
bool gotHeader = false,
gotPacket = false;
mpp_fd_set rset;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: (fd %d) (max %d) outstanding bytes %d", conn->sockfd, Gp_max_packet_size, conn->recvBytes);
#endif
/* do we have a complete message waiting to be processed ? */
if (conn->recvBytes >= PACKET_HEADER_SIZE)
{
memcpy(&conn->msgSize, conn->msgPos, sizeof(uint32));
gotHeader = true;
if (conn->recvBytes >= conn->msgSize)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: returning previously read data (%d)", conn->recvBytes);
#endif
return;
}
}
/*
* partial message waiting in recv buffer! Move to head of buffer:
* eliminate the slack (which will always be at the beginning) in the
* buffer
*/
if (conn->recvBytes != 0)
memmove(conn->pBuff, conn->msgPos, conn->recvBytes);
conn->msgPos = conn->pBuff;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: %s on previous call msgSize %d", gotHeader ? "got header" : "no header", conn->msgSize);
#endif
while (!gotPacket && bytesRead < Gp_max_packet_size)
{
/* see if user canceled and stuff like that */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
/*
* we read at the end of the buffer, we've eliminated any slack above
*/
if ((n = recv(conn->sockfd, conn->pBuff + bytesRead,
Gp_max_packet_size - bytesRead, 0)) < 0)
{
if (errno == EINTR)
continue;
if (errno == EWOULDBLOCK)
{
int retry = 0;
do
{
struct timeval timeout = tval;
/* check for the QD cancel for every 2 seconds */
if (retry++ > 4)
{
retry = 0;
/* check to see if the dispatcher should cancel */
if (Gp_role == GP_ROLE_DISPATCH)
{
checkForCancelFromQD(transportStates);
}
}
/* see if user canceled and stuff like that */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
n = select(conn->sockfd + 1, (fd_set *) &rset, NULL, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
continue;
else if (n < 0)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error reading an incoming packet"),
errdetail("select from seg%d at %s: %m",
conn->remoteContentId,
conn->remoteHostAndPort)));
}
}
while (n < 1);
}
else
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error reading an incoming packet"),
errdetail("read from seg%d at %s: %m",
conn->remoteContentId,
conn->remoteHostAndPort)));
}
}
else if (n == 0)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket(); breaking in while (fd %d) recvBytes %d msgSize %d", conn->sockfd, conn->recvBytes, conn->msgSize);
print_connection(transportStates, conn->sockfd, "interconnect error on");
#endif
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: connection closed prematurely"),
errdetail("from Remote Connection: contentId=%d at %s",
conn->remoteContentId, conn->remoteHostAndPort)));
break;
}
else
{
bytesRead += n;
if (!gotHeader && bytesRead >= PACKET_HEADER_SIZE)
{
/* got the header */
memcpy(&conn->msgSize, conn->msgPos, sizeof(uint32));
gotHeader = true;
}
conn->recvBytes = bytesRead;
if (gotHeader && bytesRead >= conn->msgSize)
gotPacket = true;
}
}
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: got %d bytes", conn->recvBytes);
#endif
}
static void
flushIncomingData(int fd)
{
static char trash[8192];
int bytes;
/*
* If we're in TeardownInterconnect, we should only have to call recv() a
* couple of times to empty out our socket buffers
*/
do
{
bytes = recv(fd, trash, sizeof(trash), 0);
} while (bytes > 0);
}
/* Function startOutgoingConnections() is used to initially kick-off any outgoing
* connections for mySlice.
*
* This should not be called for root slices (i.e. QD ones) since they don't
* ever have outgoing connections.
*
* PARAMETERS
*
* sendSlice - Slice that this process is member of.
* pIncIdx - index in the parent slice list of myslice.
*
* RETURNS
* Initialized ChunkTransportState for the Sending Motion Node Id.
*/
static ChunkTransportStateEntry *
startOutgoingConnections(ChunkTransportState *transportStates,
ExecSlice *sendSlice,
int *pOutgoingCount)
{
ChunkTransportStateEntry *pEntry;
MotionConn *conn;
ListCell *cell;
ExecSlice *recvSlice;
CdbProcess *cdbProc;
*pOutgoingCount = 0;
recvSlice = &transportStates->sliceTable->slices[sendSlice->parentIndex];
if (gp_interconnect_aggressive_retry)
{
if ((list_length(recvSlice->children) * list_length(sendSlice->segments)) > listenerBacklog)
transportStates->aggressiveRetry = true;
}
else
transportStates->aggressiveRetry = false;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "Interconnect seg%d slice%d setting up sending motion node (aggressive retry is %s)",
GpIdentity.segindex, sendSlice->sliceIndex,
(transportStates->aggressiveRetry ? "active" : "inactive"));
pEntry = createChunkTransportState(transportStates,
sendSlice,
recvSlice,
list_length(recvSlice->primaryProcesses));
/*
* Setup a MotionConn entry for each of our outbound connections. Request
* a connection to each receiving backend's listening port.
*/
conn = pEntry->conns;
foreach(cell, recvSlice->primaryProcesses)
{
cdbProc = (CdbProcess *) lfirst(cell);
if (cdbProc)
{
conn->cdbProc = cdbProc;
conn->pBuff = palloc(Gp_max_packet_size);
conn->state = mcsSetupOutgoingConnection;
(*pOutgoingCount)++;
}
conn++;
}
return pEntry;
} /* startOutgoingConnections */
/*
* setupOutgoingConnection
*
* Called by SetupInterconnect when conn->state == mcsSetupOutgoingConnection.
*
* On return, state is:
* mcsSetupOutgoingConnection if failed and caller should retry.
* mcsConnecting if non-blocking connect() is pending. Caller should
* send registration message when socket becomes write-ready.
* mcsSendRegMsg or mcsStarted if connect() completed successfully.
*/
static void
setupOutgoingConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
CdbProcess *cdbProc = conn->cdbProc;
int n;
int ret;
char portNumberStr[32];
char *service;
struct addrinfo *addrs = NULL;
struct addrinfo hint;
Assert(conn->cdbProc);
Assert(conn->state == mcsSetupOutgoingConnection);
conn->wakeup_ms = 0;
conn->remoteContentId = cdbProc->contentid;
/*
* record the destination IP addr and port for error messages. Since the
* IP addr might be IPv6, it might have ':' embedded, so in that case, put
* '[]' around it so we can see that the string is an IP and port
* (otherwise it might look just like an IP).
*/
if (strchr(cdbProc->listenerAddr, ':') != 0)
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"[%s]:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
else
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"%s:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
/* Might be retrying due to connection failure etc. Close old socket. */
if (conn->sockfd >= 0)
{
closesocket(conn->sockfd);
conn->sockfd = -1;
}
#ifdef ENABLE_IC_PROXY
if (Gp_interconnect_type == INTERCONNECT_TYPE_PROXY)
{
/*
* Using libuv pipe to register backend to proxy.
* ic_proxy_backend_connect only appends the connect request into
* connection queue and waits for the libuv_run_loop to handle the queue.
*/
ic_proxy_backend_connect(transportStates->proxyContext,
pEntry, conn, true);
conn->pBuff = palloc(Gp_max_packet_size);
conn->recvBytes = 0;
conn->msgPos = NULL;
conn->msgSize = PACKET_HEADER_SIZE;
conn->state = mcsStarted;
conn->stillActive = true;
conn->tupleCount = 0;
conn->remoteContentId = conn->cdbProc->contentid;
return;
}
#endif /* ENABLE_IC_PROXY */
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name
* resolution */
#else
hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
#endif
snprintf(portNumberStr, sizeof(portNumberStr), "%d", cdbProc->listenerPort);
service = portNumberStr;
ret = pg_getaddrinfo_all(cdbProc->listenerAddr, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(ERROR,
(errmsg("could not translate host addr \"%s\", port \"%d\" to address: %s",
cdbProc->listenerAddr, cdbProc->listenerPort, gai_strerror(ret))));
return;
}
/*
* Since we aren't using name resolution, getaddrinfo will return only 1
* entry
*/
/*
* Create a socket. getaddrinfo() returns the parameters needed by
* socket()
*/
conn->sockfd = socket(addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol);
if (conn->sockfd < 0)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error setting up outgoing connection"),
errdetail("%s: %m", "socket")));
/* make socket non-blocking BEFORE we connect. */
if (!pg_set_noblock(conn->sockfd))
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error setting up outgoing connection"),
errdetail("%s: %m", "fcntl(O_NONBLOCK)")));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("Interconnect connecting to seg%d slice%d %s "
"pid=%d sockfd=%d",
conn->remoteContentId,
pEntry->recvSlice->sliceIndex,
conn->remoteHostAndPort,
conn->cdbProc->pid,
conn->sockfd)));
/*
* Initiate the connection.
*/
for (;;)
{ /* connect() EINTR retry loop */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
n = connect(conn->sockfd, addrs->ai_addr, addrs->ai_addrlen);
/* Non-blocking socket never connects immediately, but check anyway. */
if (n == 0)
{
sendRegisterMessage(transportStates, pEntry, conn);
pg_freeaddrinfo_all(hint.ai_family, addrs);
return;
}
/* Retry if a signal was received. */
if (errno == EINTR)
continue;
/* Normal case: select() will tell us when connection is made. */
if (errno == EINPROGRESS ||
errno == EWOULDBLOCK)
{
conn->state = mcsConnecting;
pg_freeaddrinfo_all(hint.ai_family, addrs);
return;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
/* connect() failed. Log the error. Caller should retry. */
updateOutgoingConnection(transportStates, pEntry, conn, errno);
return;
} /* connect() EINTR retry loop */
} /* setupOutgoingConnection */
/*
* updateOutgoingConnection
*
* Called when connect() succeeds or fails.
*/
static void
updateOutgoingConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, int errnoSave)
{
socklen_t sizeoferrno = sizeof(errnoSave);
/* Get errno value indicating success or failure. */
if (errnoSave == -1 &&
getsockopt(conn->sockfd, SOL_SOCKET, SO_ERROR,
(void *) &errnoSave, &sizeoferrno))
{
/* getsockopt failed */
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect could not connect to seg%d %s",
conn->remoteContentId, conn->remoteHostAndPort),
errdetail("%s sockfd=%d: %m",
"getsockopt(SO_ERROR)", conn->sockfd)));
}
switch (errnoSave)
{
/* Success! Advance to next state. */
case 0:
sendRegisterMessage(transportStates, pEntry, conn);
return;
default:
errno = errnoSave;
ereport(LOG,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect could not connect to seg%d %s pid=%d; will retry; %s: %m",
conn->remoteContentId, conn->remoteHostAndPort,
conn->cdbProc->pid, "connect")));
break;
}
/* Tell caller to close the socket and try again. */
conn->state = mcsSetupOutgoingConnection;
} /* updateOutgoingConnection */
/* Function sendRegisterMessage() used to send a Register message to the
* remote destination on the other end of the provided conn.
*
* PARAMETERS
*
* pEntry - ChunkTransportState.
* conn - MotionConn to send message out on.
*
* Called by SetupInterconnect when conn->state == mcsSetupOutgoingConnection.
*
* On return, state is:
* mcsSendRegMsg if registration message has not been completely sent.
* Caller should retry when socket becomes write-ready.
* mcsStarted if registration message has been sent. Caller can start
* sending data.
*/
static void
sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
int bytesToSend;
int bytesSent;
SliceTable *sliceTbl = transportStates->sliceTable;
if (conn->state != mcsSendRegMsg)
{
RegisterMessage *regMsg = (RegisterMessage *) conn->pBuff;
struct sockaddr_storage localAddr;
socklen_t addrsize;
Assert(conn->cdbProc &&
conn->pBuff &&
sizeof(*regMsg) <= Gp_max_packet_size);
/* Save local host and port for log messages. */
addrsize = sizeof(localAddr);
if (getsockname(conn->sockfd, (struct sockaddr *) &localAddr, &addrsize))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error after making connection"),
errdetail("getsockname sockfd=%d remote=%s: %m",
conn->sockfd, conn->remoteHostAndPort)));
}
format_sockaddr(&localAddr, conn->localHostAndPort,
sizeof(conn->localHostAndPort));
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
ereport(LOG,
(errmsg("interconnect sending registration message to seg%d slice%d %s pid=%d from seg%d slice%d %s sockfd=%d",
conn->remoteContentId,
pEntry->recvSlice->sliceIndex,
conn->remoteHostAndPort,
conn->cdbProc->pid,
GpIdentity.segindex,
pEntry->sendSlice->sliceIndex,
conn->localHostAndPort,
conn->sockfd)));
regMsg->msgBytes = sizeof(*regMsg);
regMsg->recvSliceIndex = pEntry->recvSlice->sliceIndex;
regMsg->sendSliceIndex = pEntry->sendSlice->sliceIndex;
regMsg->srcContentId = GpIdentity.segindex;
regMsg->srcListenerPort = Gp_listener_port & 0x0ffff;
regMsg->srcPid = MyProcPid;
regMsg->srcSessionId = gp_session_id;
regMsg->srcCommandCount = sliceTbl->ic_instance_id;
conn->state = mcsSendRegMsg;
conn->msgPos = conn->pBuff;
conn->msgSize = sizeof(*regMsg);
}
/* Send as much as we can. */
for (;;)
{
bytesToSend = conn->pBuff + conn->msgSize - conn->msgPos;
bytesSent = send(conn->sockfd, conn->msgPos, bytesToSend, 0);
if (bytesSent == bytesToSend)
break;
else if (bytesSent >= 0)
conn->msgPos += bytesSent;
else if (errno == EWOULDBLOCK)
return; /* call me again to send the rest */
else if (errno == EINTR)
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
else
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error writing registration message to seg%d at %s",
conn->remoteContentId,
conn->remoteHostAndPort),
errdetail("write pid=%d sockfd=%d local=%s: %m",
conn->cdbProc->pid,
conn->sockfd,
conn->localHostAndPort)));
}
}
/* Sent it all. */
conn->state = mcsStarted;
conn->msgPos = NULL;
conn->msgSize = PACKET_HEADER_SIZE;
conn->stillActive = true;
} /* sendRegisterMessage */
/* Function readRegisterMessage() reads a "Register" message off of the conn
* and places it in the right MotionLayerEntry conn slot based on the contents
* of the register message.
*
* PARAMETERS
*
* conn - MotionConn to read the register messagefrom.
*
* Returns true if message has been received; or false if caller must retry
* when socket becomes read-ready.
*/
static bool
readRegisterMessage(ChunkTransportState *transportStates,
MotionConn *conn)
{
int bytesToReceive;
int bytesReceived;
int iconn;
RegisterMessage *regMsg;
RegisterMessage msg;
MotionConn *newConn;
ChunkTransportStateEntry *pEntry = NULL;
CdbProcess *cdbproc = NULL;
ListCell *lc;
SliceTable *sliceTbl = transportStates->sliceTable;
/* Get ready to receive the Register message. */
if (conn->state != mcsRecvRegMsg)
{
conn->state = mcsRecvRegMsg;
conn->msgSize = sizeof(*regMsg);
conn->msgPos = conn->pBuff;
Assert(conn->pBuff &&
sizeof(*regMsg) <= Gp_max_packet_size);
}
/* Receive all that is available, up to the expected message size. */
for (;;)
{
bytesToReceive = conn->pBuff + conn->msgSize - conn->msgPos;
bytesReceived = recv(conn->sockfd, conn->msgPos, bytesToReceive, 0);
if (bytesReceived == bytesToReceive)
break;
else if (bytesReceived > 0)
conn->msgPos += bytesReceived;
else if (bytesReceived == 0)
{
elog(LOG, "Interconnect error reading register message from %s: connection closed",
conn->remoteHostAndPort);
/* maybe this peer is already retrying ? */
goto old_conn;
}
else if (errno == EWOULDBLOCK)
return false; /* call me again to receive the rest */
else if (errno == EINTR)
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
else
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error reading register message from %s",
conn->remoteHostAndPort),
errdetail("read sockfd=%d local=%s: %m",
conn->sockfd,
conn->localHostAndPort)));
}
}
/*
* Got the whole message. Convert fields to native byte order.
*/
regMsg = (RegisterMessage *) conn->pBuff;
msg.msgBytes = regMsg->msgBytes;
msg.recvSliceIndex = regMsg->recvSliceIndex;
msg.sendSliceIndex = regMsg->sendSliceIndex;
msg.srcContentId = regMsg->srcContentId;
msg.srcListenerPort = regMsg->srcListenerPort;
msg.srcPid = regMsg->srcPid;
msg.srcSessionId = regMsg->srcSessionId;
msg.srcCommandCount = regMsg->srcCommandCount;
/* Check for valid message format. */
if (msg.msgBytes != sizeof(*regMsg))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error reading register message from %s: format not recognized",
conn->remoteHostAndPort),
errdetail("msgBytes=%d expected=%d sockfd=%d local=%s",
msg.msgBytes, (int) sizeof(*regMsg),
conn->sockfd, conn->localHostAndPort)));
}
/* get rid of old connections first */
if (msg.srcSessionId != gp_session_id ||
msg.srcCommandCount < sliceTbl->ic_instance_id)
{
/*
* This is an old connection, which can be safely ignored. We get this
* kind of stuff for cases in which one gang participating in the
* interconnect exited a query before calling SetupInterconnect().
* Later queries wind up receiving their registration messages.
*/
elog(LOG, "Received invalid, old registration message: "
"will ignore ('expected:received' session %d:%d ic-id %d:%d)",
gp_session_id, msg.srcSessionId,
sliceTbl->ic_instance_id, msg.srcCommandCount);
goto old_conn;
}
/* Verify that the message pertains to one of our receiving Motion nodes. */
if (msg.sendSliceIndex > 0 &&
msg.sendSliceIndex <= transportStates->size &&
msg.recvSliceIndex == transportStates->sliceId &&
msg.srcContentId >= -1)
{
/* this is a good connection */
}
else
{
/* something is wrong */
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: Invalid registration message received from %s",
conn->remoteHostAndPort),
errdetail("sendSlice=%d recvSlice=%d srcContentId=%d srcPid=%d srcListenerPort=%d srcSessionId=%d srcCommandCount=%d motnode=%d",
msg.sendSliceIndex, msg.recvSliceIndex,
msg.srcContentId, msg.srcPid,
msg.srcListenerPort, msg.srcSessionId,
msg.srcCommandCount, msg.sendSliceIndex)));
}
/*
* Find state info for the specified Motion node. The sender's slice
* number equals the motion node id.
*/
getChunkTransportState(transportStates, msg.sendSliceIndex, &pEntry);
Assert(pEntry);
foreach_with_count(lc, pEntry->sendSlice->primaryProcesses, iconn)
{
cdbproc = (CdbProcess *)lfirst(lc);
if (!cdbproc)
continue;
if (msg.srcContentId == cdbproc->contentid &&
msg.srcListenerPort == cdbproc->listenerPort &&
msg.srcPid == cdbproc->pid)
break;
}
if (iconn == list_length(pEntry->sendSlice->primaryProcesses))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: Invalid registration message received from %s",
conn->remoteHostAndPort),
errdetail("sendSlice=%d srcContentId=%d srcPid=%d srcListenerPort=%d",
msg.sendSliceIndex, msg.srcContentId,
msg.srcPid, msg.srcListenerPort)));
}
/*
* Allocate MotionConn slot corresponding to sender's position in the
* sending slice's CdbProc list.
*/
newConn = getMotionConn(pEntry, iconn);
if (newConn->sockfd != -1 ||
newConn->state != mcsNull)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: Duplicate registration message received from %s",
conn->remoteHostAndPort),
errdetail("Already accepted registration from %s for sendSlice=%d srcContentId=%d srcPid=%d srcListenerPort=%d",
newConn->remoteHostAndPort, msg.sendSliceIndex,
msg.srcContentId, msg.srcPid, msg.srcListenerPort)));
}
/* message looks good */
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
{
ereport(LOG,
(errmsg("interconnect seg%d slice%d sockfd=%d accepted registration message from seg%d slice%d %s pid=%d",
GpIdentity.segindex, msg.recvSliceIndex, conn->sockfd,
msg.srcContentId, msg.sendSliceIndex,
conn->remoteHostAndPort, msg.srcPid)));
}
/* Copy caller's temporary MotionConn to its assigned slot. */
*newConn = *conn;
newConn->cdbProc = cdbproc;
newConn->remoteContentId = msg.srcContentId;
/*
* The caller's MotionConn object is no longer valid.
*/
MemSet(conn, 0, sizeof(*conn));
conn->state = mcsNull;
/*
* Prepare to begin reading tuples.
*/
newConn->state = mcsStarted;
newConn->msgPos = NULL;
newConn->msgSize = 0;
newConn->stillActive = true;
MPP_FD_SET(newConn->sockfd, &pEntry->readSet);
if (newConn->sockfd > pEntry->highReadSock)
pEntry->highReadSock = newConn->sockfd;
#ifdef AMS_VERBOSE_LOGGING
dumpEntryConnections(DEBUG4, pEntry);
#endif
/* we've completed registration of this connection */
return true;
old_conn:
shutdown(conn->sockfd, SHUT_RDWR);
closesocket(conn->sockfd);
conn->sockfd = -1;
pfree(conn->pBuff);
conn->pBuff = NULL;
/*
* this connection is done, but with sockfd == -1 isn't a "success"
*/
return true;
} /* readRegisterMessage */
/*
* acceptIncomingConnection
*
* accept() a connection request that is pending on the listening socket.
* Returns a newly palloc'ed MotionConn object; or NULL if the listening
* socket does not have any pending connection requests.
*/
static MotionConn *
acceptIncomingConnection(void)
{
int newsockfd;
socklen_t addrsize;
MotionConn *conn;
struct sockaddr_storage remoteAddr;
struct sockaddr_storage localAddr;
/*
* Accept a connection.
*/
for (;;)
{ /* loop until success or EWOULDBLOCK */
MemSet(&remoteAddr, 0, sizeof(remoteAddr));
addrsize = sizeof(remoteAddr);
newsockfd = accept(TCP_listenerFd, (struct sockaddr *) &remoteAddr, &addrsize);
if (newsockfd >= 0)
break;
switch (errno)
{
case EINTR:
/* A signal arrived. Loop to retry the accept(). */
break;
case EWOULDBLOCK:
/* Connection request queue is empty. Normal return. */
return NULL;
case EBADF:
case EFAULT:
case EINVAL:
#ifndef _WIN32
case ENOTSOCK:
#endif
case EOPNOTSUPP:
/* Shouldn't get these errors unless there is a bug. */
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error on listener port %d",
Gp_listener_port),
errdetail("accept sockfd=%d: %m", TCP_listenerFd)));
break; /* not reached */
case ENOMEM:
case ENFILE:
case EMFILE:
case ENOBUFS:
/* Out of resources. */
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error on listener port %d",
Gp_listener_port),
errdetail("accept sockfd=%d: %m", TCP_listenerFd)));
break; /* not reached */
default:
/* Network problem, connection aborted, etc. Continue. */
ereport(LOG,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect connection request not completed on listener port %d",
Gp_listener_port),
errdetail("accept sockfd=%d: %m", TCP_listenerFd)));
} /* switch (errno) */
} /* loop until success or EWOULDBLOCK */
/*
* Create a MotionConn object to hold the connection state.
*/
conn = palloc0(sizeof(MotionConn));
conn->sockfd = newsockfd;
conn->pBuff = palloc(Gp_max_packet_size);
conn->msgSize = 0;
conn->recvBytes = 0;
conn->msgPos = 0;
conn->tupleCount = 0;
conn->stillActive = false;
conn->state = mcsAccepted;
conn->remoteContentId = -2;
/* Save remote and local host:port strings for error messages. */
format_sockaddr(&remoteAddr, conn->remoteHostAndPort,
sizeof(conn->remoteHostAndPort));
addrsize = sizeof(localAddr);
if (getsockname(newsockfd, (struct sockaddr *) &localAddr, &addrsize))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error after accepting connection"),
errdetail("getsockname sockfd=%d remote=%s: %m",
newsockfd, conn->remoteHostAndPort)));
}
format_sockaddr(&localAddr, conn->localHostAndPort,
sizeof(conn->localHostAndPort));
/* make socket non-blocking */
if (!pg_set_noblock(newsockfd))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error after accepting connection"),
errdetail("fcntl(O_NONBLOCK) sockfd=%d remote=%s local=%s: %m",
newsockfd, conn->remoteHostAndPort,
conn->localHostAndPort)));
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "Interconnect got incoming connection "
"from remote=%s to local=%s sockfd=%d",
conn->remoteHostAndPort, conn->localHostAndPort, newsockfd);
return conn;
} /* acceptIncomingConnection */
/* See ml_ipc.h */
void
SetupTCPInterconnect(EState *estate)
{
int i,
index,
n;
ListCell *cell;
ExecSlice *mySlice;
ExecSlice *aSlice;
MotionConn *conn;
SliceTable *sliceTable = estate->es_sliceTable;
int incoming_count = 0;
int outgoing_count = 0;
int expectedTotalIncoming = 0;
int expectedTotalOutgoing = 0;
int iteration = 0;
GpMonotonicTime startTime;
StringInfoData logbuf;
uint64 elapsed_ms = 0;
uint64 last_qd_check_ms = 0;
/* we can have at most one of these. */
ChunkTransportStateEntry *sendingChunkTransportState = NULL;
ChunkTransportState *interconnect_context;
SIMPLE_FAULT_INJECTOR("interconnect_setup_palloc");
interconnect_context = palloc0(sizeof(ChunkTransportState));
/* initialize state variables */
Assert(interconnect_context->size == 0);
interconnect_context->estate = estate;
interconnect_context->size = CTS_INITIAL_SIZE;
interconnect_context->states = palloc0(CTS_INITIAL_SIZE * sizeof(ChunkTransportStateEntry));
interconnect_context->teardownActive = false;
interconnect_context->activated = false;
interconnect_context->networkTimeoutIsLogged = false;
interconnect_context->incompleteConns = NIL;
interconnect_context->sliceTable = copyObject(sliceTable);
interconnect_context->sliceId = sliceTable->localSlice;
interconnect_context->RecvTupleChunkFrom = RecvTupleChunkFromTCP;
interconnect_context->RecvTupleChunkFromAny = RecvTupleChunkFromAnyTCP;
interconnect_context->SendEos = SendEosTCP;
interconnect_context->SendChunk = SendChunkTCP;
interconnect_context->doSendStopMessage = doSendStopMessageTCP;
#ifdef ENABLE_IC_PROXY
ic_proxy_backend_init_context(interconnect_context);
#endif /* ENABLE_IC_PROXY */
mySlice = &interconnect_context->sliceTable->slices[sliceTable->localSlice];
Assert(sliceTable &&
mySlice->sliceIndex == sliceTable->localSlice);
gp_set_monotonic_begin_time(&startTime);
/* now we'll do some setup for each of our Receiving Motion Nodes. */
foreach(cell, mySlice->children)
{
int totalNumProcs;
int childId = lfirst_int(cell);
ChunkTransportStateEntry *pEntry = NULL;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "Setting up RECEIVING motion node %d", childId);
#endif
aSlice = &interconnect_context->sliceTable->slices[childId];
/*
* If we're using directed-dispatch we have dummy primary-process
* entries, so we count the entries.
*/
totalNumProcs = list_length(aSlice->primaryProcesses);
pEntry = createChunkTransportState(interconnect_context, aSlice, mySlice, totalNumProcs);
for (i = 0; i < totalNumProcs; i++)
{
CdbProcess *cdbProc;
cdbProc = list_nth(aSlice->primaryProcesses, i);
if (cdbProc)
expectedTotalIncoming++;
#ifdef ENABLE_IC_PROXY
if (Gp_interconnect_type == INTERCONNECT_TYPE_PROXY)
{
conn = &pEntry->conns[i];
conn->cdbProc = list_nth(aSlice->primaryProcesses, i);
if (conn->cdbProc)
{
incoming_count++;
/*
* Using libuv pipe to register backend to proxy.
* ic_proxy_backend_connect only appends the connect request
* into connection queue and waits for the libuv_run_loop to
* handle the queue.
*/
ic_proxy_backend_connect(interconnect_context->proxyContext,
pEntry, conn, false /* isSender */);
conn->pBuff = palloc(Gp_max_packet_size);
conn->recvBytes = 0;
conn->msgPos = NULL;
conn->msgSize = 0;
conn->state = mcsStarted;
conn->stillActive = true;
conn->tupleCount = 0;
conn->remoteContentId = conn->cdbProc->contentid;
conn->remapper = CreateTupleRemapper();
}
}
#endif /* ENABLE_IC_PROXY */
}
}
/*
* Initiate outgoing connections.
*
* startOutgoingConnections() and createChunkTransportState() must not be
* called during the lifecycle of sendingChunkTransportState, they will
* repalloc() interconnect_context->states so sendingChunkTransportState
* points to invalid memory.
*/
if (mySlice->parentIndex != -1)
sendingChunkTransportState = startOutgoingConnections(interconnect_context, mySlice, &expectedTotalOutgoing);
#ifdef ENABLE_IC_PROXY
if (Gp_interconnect_type == INTERCONNECT_TYPE_PROXY)
{
for (i = 0; i < expectedTotalOutgoing; i++)
{
conn = &sendingChunkTransportState->conns[i];
setupOutgoingConnection(interconnect_context,
sendingChunkTransportState, conn);
}
outgoing_count = expectedTotalOutgoing;
}
/*
* Before ic_proxy_backend_run_loop, we have already gone though all the
* incoming and outgoing connections and append them into the connect queue.
* ic_proxy_backend_run_loop will trigger the uv_loop and begin to handle
* the connect event in parallel and asynchronous way.
*
* Note that the domain socket fds are binded to libuv pipe handle, but we
* still depends on ic_tcp code to send/recv interconnect data based on
* these fds and close these fds in teardown function. As a result, we
* should not touch the libuv pipe handles until ic_tcp close all the fds in
* teardown function. In future, we should retire the ic_tcp code in ic_proxy
* backend and use libuv to handle connection setup, data transfer and
* teardown in a unified way.
*/
ic_proxy_backend_run_loop(interconnect_context->proxyContext);
#endif /* ENABLE_IC_PROXY */
if (expectedTotalIncoming > listenerBacklog)
ereport(WARNING, (errmsg("SetupTCPInterconnect: too many expected incoming connections(%d), Interconnect setup might possibly fail", expectedTotalIncoming),
errhint("Try enlarging the gp_interconnect_tcp_listener_backlog GUC value and OS net.core.somaxconn parameter")));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("SetupInterconnect will activate "
"%d incoming, %d outgoing routes. "
"Listening on port=%d sockfd=%d.",
expectedTotalIncoming, expectedTotalOutgoing,
Gp_listener_port, TCP_listenerFd)));
/*
* Loop until all connections are completed or time limit is exceeded.
*/
while (outgoing_count < expectedTotalOutgoing ||
incoming_count < expectedTotalIncoming)
{ /* select() loop */
struct timeval timeout;
mpp_fd_set rset,
wset,
eset;
int highsock = -1;
uint64 timeout_ms = 20 * 60 * 1000;
int outgoing_fail_count = 0;
int select_errno;
iteration++;
MPP_FD_ZERO(&rset);
MPP_FD_ZERO(&wset);
MPP_FD_ZERO(&eset);
/* Expecting any new inbound connections? */
if (incoming_count < expectedTotalIncoming)
{
if (TCP_listenerFd < 0)
{
elog(FATAL, "SetupTCPInterconnect: bad listener");
}
MPP_FD_SET(TCP_listenerFd, &rset);
highsock = TCP_listenerFd;
}
/* Inbound connections awaiting registration message */
foreach(cell, interconnect_context->incompleteConns)
{
conn = (MotionConn *) lfirst(cell);
if (conn->state != mcsRecvRegMsg || conn->sockfd < 0)
{
elog(FATAL, "SetupTCPInterconnect: incomplete connection bad state or bad fd");
}
MPP_FD_SET(conn->sockfd, &rset);
highsock = Max(highsock, conn->sockfd);
}
/* Outgoing connections */
outgoing_count = 0;
n = sendingChunkTransportState ? sendingChunkTransportState->numConns : 0;
for (i = 0; i < n; i++)
{
index = i;
conn = &sendingChunkTransportState->conns[index];
/* Time to cancel incomplete connect() and retry? */
if (conn->state == mcsConnecting &&
conn->wakeup_ms > 0 &&
conn->wakeup_ms <= elapsed_ms + 20)
{
ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect timeout: Connection "
"to seg%d %s from local port %s was not "
"complete after " UINT64_FORMAT
"ms " UINT64_FORMAT " elapsed. Will retry.",
conn->remoteContentId,
conn->remoteHostAndPort,
conn->localHostAndPort,
conn->wakeup_ms, (elapsed_ms + 20))));
conn->state = mcsSetupOutgoingConnection;
}
/* Time to connect? */
if (conn->state == mcsSetupOutgoingConnection &&
conn->wakeup_ms <= elapsed_ms + 20)
{
setupOutgoingConnection(interconnect_context, sendingChunkTransportState, conn);
switch (conn->state)
{
case mcsSetupOutgoingConnection:
/* Retry failed connection after awhile. */
conn->wakeup_ms = (iteration - 1) * 1000 + elapsed_ms;
break;
case mcsConnecting:
/* Set time limit for connect() to complete. */
if (interconnect_context->aggressiveRetry)
conn->wakeup_ms = CONNECT_AGGRESSIVERETRY_MS + elapsed_ms;
else
conn->wakeup_ms = CONNECT_RETRY_MS + elapsed_ms;
break;
default:
conn->wakeup_ms = 0;
break;
}
}
/* What events are we watching for? */
switch (conn->state)
{
case mcsNull:
break;
case mcsSetupOutgoingConnection:
outgoing_fail_count++;
break;
case mcsConnecting:
if (conn->sockfd < 0)
{
elog(FATAL, "SetupTCPInterconnect: bad fd, mcsConnecting");
}
MPP_FD_SET(conn->sockfd, &wset);
MPP_FD_SET(conn->sockfd, &eset);
highsock = Max(highsock, conn->sockfd);
break;
case mcsSendRegMsg:
if (conn->sockfd < 0)
{
elog(FATAL, "SetupTCPInterconnect: bad fd, mcsSendRegMsg");
}
MPP_FD_SET(conn->sockfd, &wset);
highsock = Max(highsock, conn->sockfd);
break;
case mcsStarted:
outgoing_count++;
break;
default:
elog(FATAL, "SetupTCPInterconnect: bad connection state");
}
if (conn->wakeup_ms > 0)
timeout_ms = Min(timeout_ms, conn->wakeup_ms - elapsed_ms);
} /* loop to set up outgoing connections */
/* Break out of select() loop if completed all connections. */
if (outgoing_count == expectedTotalOutgoing &&
incoming_count == expectedTotalIncoming)
break;
/*
* Been here long? Bail if gp_interconnect_setup_timeout exceeded.
*/
if (interconnect_setup_timeout > 0)
{
int to = interconnect_setup_timeout * 1000;
if (to <= elapsed_ms + 20)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect timeout: Unable to "
"complete setup of all connections "
"within time limit."),
errdetail("Completed %d of %d incoming and "
"%d of %d outgoing connections. "
"gp_interconnect_setup_timeout = %d "
"seconds.",
incoming_count, expectedTotalIncoming,
outgoing_count, expectedTotalOutgoing,
interconnect_setup_timeout)
));
/* don't wait for more than 500ms */
timeout_ms = Min(500, Min(timeout_ms, to - elapsed_ms));
}
/* check if segments have errors already for every 2 seconds */
if (Gp_role == GP_ROLE_DISPATCH && elapsed_ms - last_qd_check_ms > 2000)
{
last_qd_check_ms = elapsed_ms;
checkForCancelFromQD(interconnect_context);
}
/*
* If no socket events to wait for, loop to retry after a pause.
*/
if (highsock < 0)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE &&
(timeout_ms > 0 || iteration > 2))
ereport(LOG, (errmsg("SetupInterconnect+" UINT64_FORMAT
"ms: pause " UINT64_FORMAT "ms "
"outgoing_fail=%d iteration=%d",
elapsed_ms, timeout_ms,
outgoing_fail_count, iteration)
));
/* Shouldn't be in this loop unless we have some work to do. */
if (outgoing_fail_count <= 0)
{
elog(FATAL, "SetupInterconnect: invalid outgoing count");
}
/* Wait until earliest wakeup time or overall timeout. */
if (timeout_ms > 0)
{
ML_CHECK_FOR_INTERRUPTS(interconnect_context->teardownActive);
pg_usleep(timeout_ms * 1000);
ML_CHECK_FOR_INTERRUPTS(interconnect_context->teardownActive);
}
/* Back to top of loop and look again. */
elapsed_ms = gp_get_elapsed_ms(&startTime);
continue;
}
/*
* Wait for socket events.
*
* In order to handle errors at intervals less than the full timeout
* length, we limit our select(2) wait to a maximum of 500ms.
*/
if (timeout_ms > 0)
{
timeout.tv_sec = timeout_ms / 1000; /* 0 */
timeout.tv_usec = (timeout_ms - (timeout.tv_sec * 1000)) * 1000;
Assert(timeout_ms == timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
}
else
timeout.tv_sec = timeout.tv_usec = 0;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
initStringInfo(&logbuf);
format_fd_set(&logbuf, highsock + 1, &rset, "r={", "} ");
format_fd_set(&logbuf, highsock + 1, &wset, "w={", "} ");
format_fd_set(&logbuf, highsock + 1, &eset, "e={", "}");
elapsed_ms = gp_get_elapsed_ms(&startTime);
ereport(DEBUG1, (errmsg("SetupInterconnect+" UINT64_FORMAT
"ms: select() "
"Interest: %s. timeout=" UINT64_FORMAT "ms "
"outgoing_fail=%d iteration=%d",
elapsed_ms, logbuf.data, timeout_ms,
outgoing_fail_count, iteration)));
pfree(logbuf.data);
MemSet(&logbuf, 0, sizeof(logbuf));
}
ML_CHECK_FOR_INTERRUPTS(interconnect_context->teardownActive);
n = select(highsock + 1, (fd_set *) &rset, (fd_set *) &wset, (fd_set *) &eset, &timeout);
select_errno = errno;
ML_CHECK_FOR_INTERRUPTS(interconnect_context->teardownActive);
if (Gp_role == GP_ROLE_DISPATCH)
checkForCancelFromQD(interconnect_context);
elapsed_ms = gp_get_elapsed_ms(&startTime);
/*
* Log the select() if requested.
*/
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG ||
n != expectedTotalIncoming + expectedTotalOutgoing)
{
int elevel = (n == expectedTotalIncoming + expectedTotalOutgoing)
? DEBUG1 : LOG;
initStringInfo(&logbuf);
if (n > 0)
{
appendStringInfo(&logbuf, "result=%d Ready: ", n);
format_fd_set(&logbuf, highsock + 1, &rset, "r={", "} ");
format_fd_set(&logbuf, highsock + 1, &wset, "w={", "} ");
format_fd_set(&logbuf, highsock + 1, &eset, "e={", "}");
}
else
appendStringInfoString(&logbuf, n < 0 ? "error" : "timeout");
ereport(elevel, (errmsg("SetupInterconnect+" UINT64_FORMAT "ms: select() %s",
elapsed_ms, logbuf.data)));
pfree(logbuf.data);
MemSet(&logbuf, 0, sizeof(logbuf));
}
}
/* An error other than EINTR is not acceptable */
if (n < 0)
{
if (select_errno == EINTR)
continue;
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error in select: %s",
strerror(select_errno))));
}
/*
* check our connections that are accepted'd but no register message.
* we don't know which motion node these apply to until we actually
* receive the REGISTER message. this is why they are all in a single
* list.
*
* NOTE: we don't use foreach() here because we want to trim from the
* list as we go.
*
* We used to bail out of the while loop when incoming_count hit
* expectedTotalIncoming, but that causes problems if some connections
* are left over -- better to just process them here.
*/
cell = list_head(interconnect_context->incompleteConns);
while (n > 0 && cell != NULL)
{
conn = (MotionConn *) lfirst(cell);
/*
* we'll get the next cell ready now in case we need to delete the
* cell that corresponds to our MotionConn
*/
cell = lnext(cell);
if (MPP_FD_ISSET(conn->sockfd, &rset))
{
n--;
if (readRegisterMessage(interconnect_context, conn))
{
/*
* We're done with this connection (either it is bogus
* (and has been dropped), or we've added it to the
* appropriate hash table)
*/
interconnect_context->incompleteConns = list_delete_ptr(interconnect_context->incompleteConns, conn);
/* is the connection ready ? */
if (conn->sockfd != -1)
incoming_count++;
if (conn->pBuff)
pfree(conn->pBuff);
/* Free temporary MotionConn storage. */
pfree(conn);
}
}
}
/*
* Someone tickling our listener port? Accept pending connections.
*/
if (MPP_FD_ISSET(TCP_listenerFd, &rset))
{
n--;
while ((conn = acceptIncomingConnection()) != NULL)
{
/*
* get the connection read for a subsequent call to
* ReadRegisterMessage()
*/
conn->state = mcsRecvRegMsg;
conn->msgSize = sizeof(RegisterMessage);
conn->msgPos = conn->pBuff;
conn->remapper = CreateTupleRemapper();
interconnect_context->incompleteConns = lappend(interconnect_context->incompleteConns, conn);
}
}
/*
* Check our outgoing connections.
*/
i = 0;
while (n > 0 &&
outgoing_count < expectedTotalOutgoing &&
i < sendingChunkTransportState->numConns)
{ /* loop to check outgoing connections */
conn = &sendingChunkTransportState->conns[i++];
switch (conn->state)
{
case mcsConnecting:
/* Has connect() succeeded or failed? */
if (MPP_FD_ISSET(conn->sockfd, &wset) ||
MPP_FD_ISSET(conn->sockfd, &eset))
{
n--;
updateOutgoingConnection(interconnect_context, sendingChunkTransportState, conn, -1);
switch (conn->state)
{
case mcsSetupOutgoingConnection:
/* Failed. Wait awhile before retrying. */
conn->wakeup_ms = (iteration - 1) * 1000 + elapsed_ms;
break;
case mcsSendRegMsg:
/* Connected, but reg msg not fully sent. */
conn->wakeup_ms = 0;
break;
case mcsStarted:
/* Connected, sent reg msg, ready to rock. */
outgoing_count++;
break;
default:
elog(FATAL, "SetupInterconnect: bad outgoing state");
}
}
break;
case mcsSendRegMsg:
/* Ready to continue sending? */
if (MPP_FD_ISSET(conn->sockfd, &wset))
{
n--;
sendRegisterMessage(interconnect_context, sendingChunkTransportState, conn);
if (conn->state == mcsStarted)
outgoing_count++;
}
break;
default:
break;
}
} /* loop to check outgoing connections */
/* By now we have dealt with all the events reported by select(). */
if (n != 0)
elog(FATAL, "SetupInterconnect: extra select events.");
} /* select() loop */
/*
* if everything really got setup properly then we shouldn't have any
* incomplete connections.
*
* XXX: In some cases (when the previous query got 'fast-track cancelled'
* because of an error during setup) we can wind up with connections here
* which ought to have been cleaned up. These connections should be closed
* out here. It would obviously be better if we could avoid these
* connections in the first place!
*/
if (list_length(interconnect_context->incompleteConns) != 0)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG2, "Incomplete connections after known connections done, cleaning %d",
list_length(interconnect_context->incompleteConns));
while ((cell = list_head(interconnect_context->incompleteConns)) != NULL)
{
conn = (MotionConn *) lfirst(cell);
if (conn->sockfd != -1)
{
flushIncomingData(conn->sockfd);
shutdown(conn->sockfd, SHUT_WR);
closesocket(conn->sockfd);
conn->sockfd = -1;
}
interconnect_context->incompleteConns = list_delete_ptr(interconnect_context->incompleteConns, conn);
if (conn->pBuff)
pfree(conn->pBuff);
pfree(conn);
}
}
interconnect_context->activated = true;
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
{
elapsed_ms = gp_get_elapsed_ms(&startTime);
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE ||
elapsed_ms >= 0.1 * 1000 * interconnect_setup_timeout)
elog(LOG, "SetupInterconnect+" UINT64_FORMAT "ms: Activated %d incoming, "
"%d outgoing routes.",
elapsed_ms, incoming_count, outgoing_count);
}
estate->interconnect_context = interconnect_context;
estate->es_interconnect_is_setup = true;
} /* SetupInterconnect */
/* TeardownInterconnect() function is used to cleanup interconnect resources that
* were allocated during SetupInterconnect(). This function should ALWAYS be
* called after SetupInterconnect to avoid leaking resources (like sockets)
* even if SetupInterconnect did not complete correctly. As a result, this
* function must complete successfully even if SetupInterconnect didn't.
*
* SetupInterconnect() always gets called under the ExecutorState MemoryContext.
* This context is destroyed at the end of the query and all memory that gets
* allocated under it is free'd. We don't have have to worry about pfree() but
* we definitely have to worry about socket resources.
*/
void
TeardownTCPInterconnect(ChunkTransportState *transportStates, bool hasErrors)
{
ListCell *cell;
ChunkTransportStateEntry *pEntry = NULL;
int i;
ExecSlice *mySlice;
MotionConn *conn;
if (transportStates == NULL || transportStates->sliceTable == NULL)
{
elog(LOG, "TeardownTCPInterconnect: missing slice table.");
return;
}
/*
* if we're already trying to clean up after an error -- don't allow
* signals to interrupt us
*/
if (hasErrors)
HOLD_INTERRUPTS();
mySlice = &transportStates->sliceTable->slices[transportStates->sliceId];
/* Log the start of TeardownInterconnect. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
{
int elevel = 0;
if (hasErrors || !transportStates->activated)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = LOG;
else
elevel = DEBUG1;
}
else if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = DEBUG4;
if (elevel)
ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: "
"%s; setup was %s",
GpIdentity.segindex, mySlice->sliceIndex,
hasErrors ? "error" : "normal",
transportStates->activated ? "completed" : "exited")));
/* if setup did not complete, log the slicetable */
if (!transportStates->activated &&
gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog_node_display(DEBUG3, "local slice table", transportStates->sliceTable, true);
}
/*
* phase 1 mark all sockets (senders and receivers) with shutdown(2),
* start with incomplete connections (if any).
*/
/*
* The incompleteConns list is only used as a staging area for MotionConns
* during by SetupInterconnect(). So we only expect to have entries here
* if SetupInterconnect() did not finish correctly.
*
* NOTE: we don't use foreach() here because we want to trim from the list
* as we go.
*/
if (transportStates->incompleteConns &&
gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG2, "Found incomplete conn. length %d", list_length(transportStates->incompleteConns));
/*
* These are connected inbound peers that we haven't dealt with quite yet
*/
while ((cell = list_head(transportStates->incompleteConns)) != NULL)
{
MotionConn *conn = (MotionConn *) lfirst(cell);
/* they're incomplete, so just slam them shut. */
if (conn->sockfd != -1)
{
flushIncomingData(conn->sockfd);
shutdown(conn->sockfd, SHUT_WR);
closesocket(conn->sockfd);
conn->sockfd = -1;
}
/* free up the tuple remapper */
if (conn->remapper)
{
DestroyTupleRemapper(conn->remapper);
conn->remapper = NULL;
}
/*
* The list operations are kind of confusing (see list.c), we could
* alternatively write the following line as:
*
* incompleteConns = list_delete_cell(incompleteConns, cell, NULL); or
* incompleteConns = list_delete_first(incompleteConns); or
* incompleteConns = list_delete_ptr(incompleteConns, conn)
*/
transportStates->incompleteConns = list_delete(transportStates->incompleteConns, conn);
}
list_free(transportStates->incompleteConns);
transportStates->incompleteConns = NIL;
/*
* Now "normal" connections which made it through our peer-registration
* step. With these we have to worry about "in-flight" data.
*/
if (mySlice->parentIndex != -1)
{
/* cleanup a Sending motion node. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect seg%d slice%d closing connections to slice%d",
GpIdentity.segindex, mySlice->sliceIndex, mySlice->parentIndex);
getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
shutdown(conn->sockfd, SHUT_WR);
/* free up the tuple remapper */
if (conn->remapper)
{
DestroyTupleRemapper(conn->remapper);
conn->remapper = NULL;
}
}
}
/*
* cleanup all of our Receiving Motion nodes, these get closed immediately
* (the receiver know for real if they want to shut down -- they aren't
* going to be processing any more data).
*/
foreach(cell, mySlice->children)
{
ExecSlice *aSlice;
int childId = lfirst_int(cell);
aSlice = &transportStates->sliceTable->slices[childId];
getChunkTransportState(transportStates, aSlice->sliceIndex, &pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect closing connections from slice%d",
aSlice->sliceIndex);
/*
* receivers know that they no longer care about data from below ...
* so we can safely discard data queued in both directions
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
{
flushIncomingData(conn->sockfd);
shutdown(conn->sockfd, SHUT_WR);
closesocket(conn->sockfd);
conn->sockfd = -1;
/* free up the tuple remapper */
if (conn->remapper)
{
DestroyTupleRemapper(conn->remapper);
conn->remapper = NULL;
}
}
}
removeChunkTransportState(transportStates, aSlice->sliceIndex);
pfree(pEntry->conns);
}
/*
* phase 2: wait on all sockets for completion, when complete call close
* and free (if required)
*/
if (mySlice->parentIndex != -1)
{
/* cleanup a Sending motion node. */
getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry);
/*
* On a normal teardown routine, sender has sent an EOS packet and
* disabled further send operations on phase 1. sender can't close the
* connection immediately because EOS packet or data packets within the
* kernel sending buffer may be lost on some platform if sender close the
* connection totally.
*
* The correct way is sender blocks on the connection until receivers
* get the EOS packets and close the peer, then it's safe for sender to
* close the connection totally.
*
* If some errors are happening, senders can skip this step to avoid hung
* issues, QD will take care of the error handling.
*/
if (!hasErrors)
waitOnOutbound(pEntry);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
{
closesocket(conn->sockfd);
conn->sockfd = -1;
}
}
pEntry = removeChunkTransportState(transportStates, mySlice->sliceIndex);
}
/*
* If there are clients waiting on our listener; we *must* disconnect
* them; otherwise we'll be out of sync with the client (we may accept
* them on a subsequent query!)
*/
if (TCP_listenerFd != -1)
flushInterconnectListenerBacklog();
transportStates->activated = false;
transportStates->sliceTable = NULL;
#ifdef ENABLE_IC_PROXY
ic_proxy_backend_close_context(transportStates);
#endif /* ENABLE_IC_PROXY */
if (transportStates->states != NULL)
pfree(transportStates->states);
pfree(transportStates);
if (hasErrors)
RESUME_INTERRUPTS();
#ifdef AMS_VERBOSE_LOGGING
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "TeardownInterconnect successful");
#endif
}
#ifdef AMS_VERBOSE_LOGGING
void
dumpEntryConnections(int elevel, ChunkTransportStateEntry *pEntry)
{
int i;
MotionConn *conn;
for (i = 0; i < pEntry->numConns; i++)
{
conn = &pEntry->conns[i];
if (conn->sockfd == -1 &&
conn->state == mcsNull)
elog(elevel, "... motNodeId=%d conns[%d]: not connected",
pEntry->motNodeId, i);
else
elog(elevel, "... motNodeId=%d conns[%d]: "
"%d pid=%d sockfd=%d remote=%s local=%s",
pEntry->motNodeId, i,
conn->remoteContentId,
conn->cdbProc ? conn->cdbProc->pid : 0,
conn->sockfd,
conn->remoteHostAndPort,
conn->localHostAndPort);
}
}
static void
print_connection(ChunkTransportState *transportStates, int fd, const char *msg)
{
struct sockaddr_in local,
remote;
socklen_t len;
int errlevel = transportStates->teardownActive ? LOG : ERROR;
len = sizeof(remote);
if (getpeername(fd, (struct sockaddr *) &remote, &len) < 0)
{
elog(errlevel, "print_connection(%d, %s): can't get peername err: %m",
fd, msg);
}
len = sizeof(local);
if (getsockname(fd, (struct sockaddr *) &local, &len) < 0)
{
elog(errlevel, "print_connection(%d, %s): can't get localname err: %m",
fd, msg);
}
elog(DEBUG2, "%s: w/ports (%d/%d)",
msg, ntohs(local.sin_port), ntohs(remote.sin_port));
}
#endif
static void
format_fd_set(StringInfo buf, int nfds, mpp_fd_set *fds, char *pfx, char *sfx)
{
int i;
bool first = true;
appendStringInfoString(buf, pfx);
for (i = 1; i < nfds; i++)
{
if (MPP_FD_ISSET(i, fds))
{
if (!first)
appendStringInfoChar(buf, ',');
appendStringInfo(buf, "%d", i);
first = false;
}
}
appendStringInfoString(buf, sfx);
}
static void
flushInterconnectListenerBacklog(void)
{
int pendingConn,
newfd,
i;
mpp_fd_set rset;
struct timeval timeout;
do
{
MPP_FD_ZERO(&rset);
MPP_FD_SET(TCP_listenerFd, &rset);
timeout.tv_sec = 0;
timeout.tv_usec = 0;
pendingConn = select(TCP_listenerFd + 1, (fd_set *) &rset, NULL, NULL, &timeout);
if (pendingConn > 0)
{
for (i = 0; i < pendingConn; i++)
{
struct sockaddr_storage remoteAddr;
struct sockaddr_storage localAddr;
char remoteHostAndPort[64];
char localHostAndPort[64];
socklen_t addrsize;
addrsize = sizeof(remoteAddr);
newfd = accept(TCP_listenerFd, (struct sockaddr *) &remoteAddr, &addrsize);
if (newfd < 0)
{
ereport(DEBUG3, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error while clearing incoming connections."),
errdetail("%s sockfd=%d: %m", "accept", newfd)));
continue;
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
{
/* Get remote and local host:port strings for message. */
format_sockaddr(&remoteAddr, remoteHostAndPort,
sizeof(remoteHostAndPort));
addrsize = sizeof(localAddr);
if (getsockname(newfd, (struct sockaddr *) &localAddr, &addrsize))
{
ereport(LOG,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error while clearing incoming connections"),
errdetail("getsockname sockfd=%d remote=%s: %m",
newfd, remoteHostAndPort)));
}
else
{
format_sockaddr(&localAddr, localHostAndPort,
sizeof(localHostAndPort));
ereport(DEBUG2, (errmsg("Interconnect clearing incoming connection "
"from remote=%s to local=%s. sockfd=%d.",
remoteHostAndPort, localHostAndPort,
newfd)));
}
}
/* make socket non-blocking */
if (!pg_set_noblock(newfd))
{
elog(LOG, "During incoming queue flush, could not set non-blocking.");
}
else
{
/* shutdown this socket */
flushIncomingData(newfd);
}
shutdown(newfd, SHUT_WR);
closesocket(newfd);
}
}
else if (pendingConn < 0 && errno != EINTR)
{
ereport(LOG,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error during listener cleanup"),
errdetail("select sockfd=%d: %m", TCP_listenerFd)));
}
/*
* now we either loop through for another check (on EINTR or if we
* cleaned one client) or we're done
*/
}
while (pendingConn != 0);
}
/*
* Wait for our peer to close the socket (at which point our select(2)
* will tell us that the socket is ready to read, and the socket-read
* will only return 0.
*
* This works without the select, but burns tons of CPU doing nothing
* useful.
*
* ----
* The way it used to work, is we used CHECK_FOR_INTERRUPTS(), and
* wrapped it in PG_TRY: We *must* return locally; otherwise
* TeardownInterconnect() can't exit cleanly. So we wrap our
* cancel-detection checks for interrupts with a PG_TRY block.
*
* By swallowing the non-local return on cancel, we lose the "cancel"
* state (CHECK_FOR_INTERRUPTS() clears QueryCancelPending()). So we
* should just check QueryCancelPending here ... and avoid calling
* CHECK_FOR_INTERRUPTS().
*
* ----
*
* Now we just check explicitly for interrupts (which is, as far as I
* can tell, the only interrupt-driven state change we care
* about). This should give us notification of ProcDiePending and
* QueryCancelPending
*/
static void
waitOnOutbound(ChunkTransportStateEntry *pEntry)
{
MotionConn *conn;
struct timeval timeout;
mpp_fd_set waitset,
curset;
int maxfd = -1;
int i,
n,
conn_count = 0;
MPP_FD_ZERO(&waitset);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
{
MPP_FD_SET(conn->sockfd, &waitset);
if (conn->sockfd > maxfd)
maxfd = conn->sockfd;
conn_count++;
}
}
for (;;)
{
int saved_err;
if (conn_count == 0)
return;
if (CancelRequested() || QueryFinishPending)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "waitOnOutbound(): interrupt pending fast-track");
#endif
return;
}
timeout.tv_sec = 0;
timeout.tv_usec = 500000;
memcpy(&curset, &waitset, sizeof(mpp_fd_set));
n = select(maxfd + 1, (fd_set *) &curset, NULL, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
{
continue;
}
else if (n < 0)
{
saved_err = errno;
if (CancelRequested() || QueryFinishPending)
return;
/*
* Something unexpected, but probably not horrible warn and return
*/
elog(LOG, "TeardownTCPInterconnect: waitOnOutbound select errno=%d", saved_err);
break;
}
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 && MPP_FD_ISSET(conn->sockfd, &curset))
{
int count;
char buf;
/* ready to read. */
count = recv(conn->sockfd, &buf, sizeof(buf), 0);
if (count == 0 || count == 1) /* done ! */
{
/* got a stop message */
AssertImply(count == 1, buf == 'S');
MPP_FD_CLR(conn->sockfd, &waitset);
/* we may have finished */
conn_count--;
continue;
}
else if (count < 0 && (errno == EAGAIN || errno == EINTR))
continue;
/*
* Something unexpected, but probably not horrible warn and
* return
*/
MPP_FD_CLR(conn->sockfd, &waitset);
/* we may have finished */
conn_count--;
elog(LOG, "TeardownTCPInterconnect: waitOnOutbound %s: %m", "recv");
continue;
}
}
}
return;
}
static void
doSendStopMessageTCP(ChunkTransportState *transportStates, int16 motNodeID)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
int i;
char m = 'S';
ssize_t written;
getChunkTransportState(transportStates, motNodeID, &pEntry);
Assert(pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect needs no more input from slice%d; notifying senders to stop.",
motNodeID);
/*
* Note: we're only concerned with receivers here.
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 &&
MPP_FD_ISSET(conn->sockfd, &pEntry->readSet))
{
/* someone is trying to send stuff to us, let's stop 'em */
while ((written = send(conn->sockfd, &m, sizeof(m), 0)) < 0)
{
if (errno == EINTR)
{
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
continue;
}
else
break;
}
if (written != sizeof(m))
{
/*
* how can this happen ? the kernel buffer should be empty in
* the send direction
*/
elog(LOG, "SendStopMessage: failed on write. %m");
}
}
/* CRITICAL TO AVOID DEADLOCK */
DeregisterReadInterest(transportStates, motNodeID, i,
"no more input needed");
}
}
static TupleChunkListItem
RecvTupleChunkFromTCP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFrom(motNodID=%d, srcRoute=%d)", motNodeID, srcRoute);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + srcRoute;
return RecvTupleChunk(conn, transportStates);
}
static TupleChunkListItem
RecvTupleChunkFromAnyTCP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
TupleChunkListItem tcItem;
MotionConn *conn;
mpp_fd_set rset;
int n,
i,
index;
bool skipSelect = false;
int nwaitfds = 0;
int *waitFds = NULL;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny(motNodeId=%d)", motNodeID);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
int retry = 0;
do
{
/* Every 2 seconds */
if (Gp_role == GP_ROLE_DISPATCH && retry++ > 4)
{
retry = 0;
/* check to see if the dispatcher should cancel */
checkForCancelFromQD(transportStates);
}
struct timeval timeout = tval;
int nfds = pEntry->highReadSock;
/* make sure we check for these. */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
memcpy(&rset, &pEntry->readSet, sizeof(mpp_fd_set));
/*
* since we may have data in a local buffer, we may be able to
* short-circuit the select() call (and if we don't do this we may
* wait when we have data ready, since it has already been read)
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 &&
MPP_FD_ISSET(conn->sockfd, &rset) &&
conn->recvBytes != 0)
{
/* we have data on this socket, let's short-circuit our select */
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
skipSelect = true;
}
}
if (skipSelect)
break;
/*
* Also monitor the events on dispatch fds, eg, errors or sequence
* request from QEs.
*/
nwaitfds = 0;
if (Gp_role == GP_ROLE_DISPATCH)
{
waitFds = cdbdisp_getWaitSocketFds(transportStates->estate->dispatcherState, &nwaitfds);
if (waitFds != NULL)
for (i = 0; i < nwaitfds; i++)
{
MPP_FD_SET(waitFds[i], &rset);
/* record the max fd number for select() later */
if (waitFds[i] > nfds)
nfds = waitFds[i];
}
}
// GPDB_12_MERGE_FIXME: should use WaitEventSetWait() instead of select()
// follow the routine in ic_udpifc.c
n = select(nfds + 1, (fd_set *) &rset, NULL, NULL, &timeout);
if (n < 0)
{
if (errno == EINTR)
continue;
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error receiving an incoming packet"),
errdetail("%s: %m", "select")));
}
else if (n > 0 && nwaitfds > 0)
{
bool need_check = false;
for (i = 0; i < nwaitfds; i++)
if (MPP_FD_ISSET(waitFds[i], &rset))
{
need_check = true;
n--;
}
/* handle events on dispatch connection */
if (need_check)
checkForCancelFromQD(transportStates);
}
if (waitFds)
pfree(waitFds);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny() select() returned %d ready sockets", n);
#endif
} while (n < 1);
/*
* We scan the file descriptors starting from where we left off in the
* last call (don't continually poll the first when others may be ready!).
*/
index = pEntry->scanStart;
for (i = 0; i < pEntry->numConns; i++, index++)
{
/*
* avoid division ? index = ((scanStart + i) % pEntry->numConns);
*/
if (index >= pEntry->numConns)
index = 0;
conn = pEntry->conns + index;
#ifdef AMS_VERBOSE_LOGGING
if (!conn->stillActive)
{
elog(LOG, "RecvTupleChunkFromAny: trying to read on inactive socket %d", conn->sockfd);
}
#endif
if (conn->sockfd >= 0 &&
MPP_FD_ISSET(conn->sockfd, &rset))
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny() (fd %d) %d/%d", conn->sockfd, motNodeID, index);
#endif
tcItem = RecvTupleChunk(conn, transportStates);
*srcRoute = index;
/*
* advance start point (avoid doing division/modulus operation
* here)
*/
pEntry->scanStart = index + 1;
return tcItem;
}
}
/* we should never ever get here... */
elog(FATAL, "RecvTupleChunkFromAnyTCP: didn't receive, and didn't get cancelled");
return NULL; /* keep the compiler happy */
}
/* See ml_ipc.h */
static void
SendEosTCP(ChunkTransportState *transportStates,
int motNodeID,
TupleChunkListItem tcItem)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
int i;
if (!transportStates)
{
elog(FATAL, "SendEosTCP: missing interconnect context.");
}
else if (!transportStates->activated && !transportStates->teardownActive)
{
elog(FATAL, "SendEosTCP: context and teardown inactive.");
}
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
getChunkTransportState(transportStates, motNodeID, &pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect seg%d slice%d sending end-of-stream to slice%d",
GpIdentity.segindex, motNodeID, pEntry->recvSlice->sliceIndex);
/*
* we want to add our tcItem onto each of the outgoing buffers -- this is
* guaranteed to leave things in a state where a flush is *required*.
*/
doBroadcast(transportStates, pEntry, tcItem, NULL);
/* now flush all of the buffers. */
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 && conn->state == mcsStarted)
flushBuffer(transportStates, pEntry, conn, motNodeID);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "SendEosTCP() Leaving");
#endif
}
return;
}
static bool
flushBuffer(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, int16 motionId)
{
char *sendptr;
int n,
sent = 0;
mpp_fd_set wset;
mpp_fd_set rset;
#ifdef AMS_VERBOSE_LOGGING
{
struct timeval snapTime;
gettimeofday(&snapTime, NULL);
elog(DEBUG5, "----sending chunk @%s.%d time is %d.%d",
__FILE__, __LINE__, (int) snapTime.tv_sec, (int) snapTime.tv_usec);
}
#endif
/* first set header length */
*(uint32 *) conn->pBuff = conn->msgSize;
/* now send message */
sendptr = (char *) conn->pBuff;
sent = 0;
do
{
struct timeval timeout;
/* check for stop message or peer teardown before sending anything */
timeout.tv_sec = 0;
timeout.tv_usec = 0;
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
/*
* since timeout = 0, select returns imediately and no time is wasted
* waiting trying to send data on the network
*/
n = select(conn->sockfd + 1, (fd_set *) &rset, NULL, NULL, &timeout);
/* handle errors at the write call, below */
if (n > 0 && MPP_FD_ISSET(conn->sockfd, &rset))
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
/* got a stop message */
conn->stillActive = false;
return false;
}
if ((n = send(conn->sockfd, sendptr + sent, conn->msgSize - sent, 0)) < 0)
{
int send_errno = errno;
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
if (send_errno == EINTR)
continue;
if (send_errno == EWOULDBLOCK)
{
do
{
timeout = tval;
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
MPP_FD_ZERO(&rset);
MPP_FD_ZERO(&wset);
MPP_FD_SET(conn->sockfd, &wset);
MPP_FD_SET(conn->sockfd, &rset);
n = select(conn->sockfd + 1, (fd_set *) &rset, (fd_set *) &wset, NULL, &timeout);
if (n < 0)
{
int select_errno = errno;
if (select_errno == EINTR)
continue;
/*
* if we got an error in teardown, ignore it: treat it
* as a stop message
*/
if (transportStates->teardownActive)
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
conn->stillActive = false;
return false;
}
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error writing an outgoing packet: %m"),
errdetail("Error during select() call (error: %d), for remote connection: contentId=%d at %s",
select_errno, conn->remoteContentId,
conn->remoteHostAndPort)));
}
/*
* as a sender... if there is something to read... it must
* mean its a StopSendingMessage or receiver has teared down
* the interconnect, we don't even bother to read it.
*/
if (MPP_FD_ISSET(conn->sockfd, &rset) || transportStates->teardownActive)
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
conn->stillActive = false;
return false;
}
} while (n < 1);
}
else
{
/*
* if we got an error in teardown, ignore it: treat it as a
* stop message
*/
if (transportStates->teardownActive)
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
conn->stillActive = false;
return false;
}
/* check whether receiver has teared down the interconnect */
timeout.tv_sec = 0;
timeout.tv_usec = 0;
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
n = select(conn->sockfd + 1, (fd_set *) &rset, NULL, NULL, &timeout);
/*
* as a sender... if there is something to read... it must
* mean its a StopSendingMessage or receiver has teared down
* the interconnect, we don't even bother to read it.
*/
if (n > 0 && MPP_FD_ISSET(conn->sockfd, &rset))
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
/* got a stop message */
conn->stillActive = false;
return false;
}
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error writing an outgoing packet"),
errdetail("Error during send() call (error:%d) for remote connection: contentId=%d at %s",
send_errno, conn->remoteContentId,
conn->remoteHostAndPort)));
}
}
else
{
sent += n;
}
} while (sent < conn->msgSize);
conn->tupleCount = 0;
conn->msgSize = PACKET_HEADER_SIZE;
return true;
}
/* The Function sendChunk() is used to send a tcItem to a single
* destination. Tuples often are *very small* we aggregate in our
* local buffer before sending into the kernel.
*
* PARAMETERS
* conn - MotionConn that the tcItem is to be sent to.
* tcItem - message to be sent.
* motionId - Node Motion Id.
*/
static bool
SendChunkTCP(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, TupleChunkListItem tcItem, int16 motionId)
{
int length = tcItem->chunk_length;
Assert(conn->msgSize > 0);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "sendChunk: msgSize %d this chunk length %d", conn->msgSize, tcItem->chunk_length);
#endif
if (conn->msgSize + length > Gp_max_packet_size)
{
if (!flushBuffer(transportStates, pEntry, conn, motionId))
return false;
}
memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
conn->msgSize += length;
conn->tupleCount++;
return true;
}
/*
* gp_set_monotonic_begin_time: set the beginTime and endTime to the current
* time.
*/
static void
gp_set_monotonic_begin_time(GpMonotonicTime *time)
{
time->beginTime.tv_sec = 0;
time->beginTime.tv_usec = 0;
time->endTime.tv_sec = 0;
time->endTime.tv_usec = 0;
gp_get_monotonic_time(time);
time->beginTime.tv_sec = time->endTime.tv_sec;
time->beginTime.tv_usec = time->endTime.tv_usec;
}
/*
* gp_get_monotonic_time
* This function returns the time in the monotonic order.
*
* The new time is stored in time->endTime, which has a larger value than
* the original value. The original endTime is lost.
*
* This function is intended for computing elapsed time between two
* calls. It is not for getting the system time.
*/
static void
gp_get_monotonic_time(GpMonotonicTime *time)
{
struct timeval newTime;
int status;
#if HAVE_LIBRT
/* Use clock_gettime to return monotonic time value. */
struct timespec ts;
status = clock_gettime(CLOCK_MONOTONIC, &ts);
newTime.tv_sec = ts.tv_sec;
newTime.tv_usec = ts.tv_nsec / 1000;
#else
gettimeofday(&newTime, NULL);
status = 0; /* gettimeofday always succeeds. */
#endif
if (status == 0 &&
timeCmp(&time->endTime, &newTime) < 0)
{
time->endTime.tv_sec = newTime.tv_sec;
time->endTime.tv_usec = newTime.tv_usec;
}
else
{
time->endTime.tv_usec = time->endTime.tv_usec + 1;
time->endTime.tv_sec = time->endTime.tv_sec +
(time->endTime.tv_usec / USECS_PER_SECOND);
time->endTime.tv_usec = time->endTime.tv_usec % USECS_PER_SECOND;
}
}
/*
* Compare two times.
*
* If t1 > t2, return 1.
* If t1 == t2, return 0.
* If t1 < t2, return -1;
*/
static inline int
timeCmp(struct timeval *t1, struct timeval *t2)
{
if (t1->tv_sec == t2->tv_sec &&
t1->tv_usec == t2->tv_usec)
return 0;
if (t1->tv_sec > t2->tv_sec ||
(t1->tv_sec == t2->tv_sec &&
t1->tv_usec > t2->tv_usec))
return 1;
return -1;
}
/*
* gp_get_elapsed_us -- return the elapsed time in microseconds
* after the given time->beginTime.
*
* If time->beginTime is not set (0), then return 0.
*
* Note that the beginTime is not changed, but the endTime is set
* to the current time.
*/
static inline uint64
gp_get_elapsed_us(GpMonotonicTime *time)
{
if (time->beginTime.tv_sec == 0 &&
time->beginTime.tv_usec == 0)
return 0;
gp_get_monotonic_time(time);
return ((time->endTime.tv_sec - time->beginTime.tv_sec) * USECS_PER_SECOND +
(time->endTime.tv_usec - time->beginTime.tv_usec));
}
static inline uint64
gp_get_elapsed_ms(GpMonotonicTime *time)
{
return gp_get_elapsed_us(time) / (USECS_PER_SECOND / MSECS_PER_SECOND);
}
相关信息
相关文章
greenplumn ic_proxy_backend 源码
greenplumn ic_proxy_backend 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦