greenplumn ic_udpifc 源码
greenplumn ic_udpifc 代码
文件路径:/src/backend/cdb/motion/ic_udpifc.c
/*-------------------------------------------------------------------------
* ic_udpifc.c
* Interconnect code specific to UDP transport.
*
* Portions Copyright (c) 2005-2011, Greenplum Inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
* Copyright (c) 2011-2012, EMC Corporation
*
*
* IDENTIFICATION
* src/backend/cdb/motion/ic_udpifc.c
*
*-------------------------------------------------------------------------
*/
#ifdef WIN32
/*
* Need this to get WSAPoll (poll). And it
* has to be set before any header from the Win32 API is loaded.
*/
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include "postgres.h"
#include <pthread.h>
#include <fcntl.h>
#include <limits.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "access/transam.h"
#include "access/xact.h"
#include "common/ip.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "nodes/print.h"
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "port/atomics.h"
#include "port/pg_crc32c.h"
#include "pgstat.h"
#include "postmaster/postmaster.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/faultinjector.h"
#include "cdb/tupchunklist.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbicudpfaultinjection.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
/* If we have old platform sdk headers, WSAPoll() might not be there */
#ifndef POLLIN
/* Event flag definitions for WSAPoll(). */
#define POLLRDNORM 0x0100
#define POLLRDBAND 0x0200
#define POLLIN (POLLRDNORM | POLLRDBAND)
#define POLLPRI 0x0400
#define POLLWRNORM 0x0010
#define POLLOUT (POLLWRNORM)
#define POLLWRBAND 0x0020
#define POLLERR 0x0001
#define POLLHUP 0x0002
#define POLLNVAL 0x0004
typedef struct pollfd
{
SOCKET fd;
SHORT events;
SHORT revents;
} WSAPOLLFD, *PWSAPOLLFD, FAR * LPWSAPOLLFD;
__control_entrypoint(DllExport)
WINSOCK_API_LINKAGE
int
WSAAPI
WSAPoll(
IN OUT LPWSAPOLLFD fdArray,
IN ULONG fds,
IN INT timeout
);
#endif
#define poll WSAPoll
/*
* Postgres normally uses it's own custom select implementation
* on Windows, but they haven't implemented execeptfds, which
* we use here. So, undef this to use the normal Winsock version
* for now
*/
#undef select
#endif
#define MAX_TRY (11)
int
timeoutArray[] =
{
1,
1,
2,
4,
8,
16,
32,
64,
128,
256,
512,
512 /* MAX_TRY */
};
#define TIMEOUT(try) ((try) < MAX_TRY ? (timeoutArray[(try)]) : (timeoutArray[MAX_TRY]))
#define USECS_PER_SECOND 1000000
#define MSECS_PER_SECOND 1000
/* 1/4 sec in msec */
#define RX_THREAD_POLL_TIMEOUT (250)
/*
* Flags definitions for flag-field of UDP-messages
*
* We use bit operations to test these, flags are powers of two only
*/
#define UDPIC_FLAGS_RECEIVER_TO_SENDER (1)
#define UDPIC_FLAGS_ACK (2)
#define UDPIC_FLAGS_STOP (4)
#define UDPIC_FLAGS_EOS (8)
#define UDPIC_FLAGS_NAK (16)
#define UDPIC_FLAGS_DISORDER (32)
#define UDPIC_FLAGS_DUPLICATE (64)
#define UDPIC_FLAGS_CAPACITY (128)
#define UDPIC_MIN_BUF_SIZE (128 * 1024)
/*
* ConnHtabBin
*
* A connection hash table bin.
*
*/
typedef struct ConnHtabBin ConnHtabBin;
struct ConnHtabBin
{
MotionConn *conn;
struct ConnHtabBin *next;
};
/*
* ConnHashTable
*
* Connection hash table definition.
*
*/
typedef struct ConnHashTable ConnHashTable;
struct ConnHashTable
{
MemoryContext cxt;
ConnHtabBin **table;
int size;
};
#define CONN_HASH_VALUE(icpkt) ((uint32)((((icpkt)->srcPid ^ (icpkt)->dstPid)) + (icpkt)->dstContentId))
#define CONN_HASH_MATCH(a, b) (((a)->motNodeId == (b)->motNodeId && \
(a)->dstContentId == (b)->dstContentId && \
(a)->srcContentId == (b)->srcContentId && \
(a)->recvSliceIndex == (b)->recvSliceIndex && \
(a)->sendSliceIndex == (b)->sendSliceIndex && \
(a)->srcPid == (b)->srcPid && \
(a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId))
/*
* Cursor IC table definition.
*
* For cursor case, there may be several concurrent interconnect
* instances on QD. The table is used to track the status of the
* instances, which is quite useful for "ACK the past and NAK the future" paradigm.
*
*/
#define CURSOR_IC_TABLE_SIZE (128)
/*
* CursorICHistoryEntry
*
* The definition of cursor IC history entry.
*/
typedef struct CursorICHistoryEntry CursorICHistoryEntry;
struct CursorICHistoryEntry
{
/* Interconnect instance id. */
uint32 icId;
/* Command id. */
uint32 cid;
/*
* Interconnect instance status. state 1 (value 1): interconnect is setup
* state 0 (value 0): interconnect was torn down.
*/
uint8 status;
/* Next entry. */
CursorICHistoryEntry *next;
};
/*
* CursorICHistoryTable
*
* Cursor IC history table. It is a small hash table.
*/
typedef struct CursorICHistoryTable CursorICHistoryTable;
struct CursorICHistoryTable
{
uint32 count;
CursorICHistoryEntry *table[CURSOR_IC_TABLE_SIZE];
};
/*
* Synchronization timeout values
*
* MAIN_THREAD_COND_TIMEOUT - 1/4 second
*/
#define MAIN_THREAD_COND_TIMEOUT_MS (250)
/*
* Used for synchronization between main thread (receiver) and background thread.
*
*/
typedef struct ThreadWaitingState ThreadWaitingState;
struct ThreadWaitingState
{
bool waiting;
int waitingNode;
int waitingRoute;
int reachRoute;
/* main_thread_waiting_query is needed to disambiguate for cursors */
int waitingQuery;
};
/*
* ReceiveControlInfo
*
* The related control information for receiving data packets.
* Main thread (Receiver) and background thread use the information in
* this data structure to handle data packets.
*
*/
typedef struct ReceiveControlInfo ReceiveControlInfo;
struct ReceiveControlInfo
{
/* Main thread waiting state. */
ThreadWaitingState mainWaitingState;
/*
* Buffers used to assemble disorder messages at receiver side.
*/
icpkthdr *disorderBuffer;
/* The last interconnect instance id which is torn down. */
uint32 lastTornIcId;
/* Cursor history table. */
CursorICHistoryTable cursorHistoryTable;
};
/*
* Main thread (Receiver) and background thread use the information in
* this data structure to handle data packets.
*/
static ReceiveControlInfo rx_control_info;
/*
* RxBufferPool
*
* Receive thread buffer pool definition. The implementation of
* receive side buffer pool is different from send side buffer pool.
* It is because receive side buffer pool needs a ring buffer to
* easily implement disorder message handling logic.
*/
typedef struct RxBufferPool RxBufferPool;
struct RxBufferPool
{
/* The max number of buffers we can get from this pool. */
int maxCount;
/* The number of allocated buffers */
int count;
/* The list of free buffers. */
char *freeList;
};
/*
* The buffer pool used for keeping data packets.
*
* maxCount is set to 1 to make sure there is always a buffer
* for picking packets from OS buffer.
*/
static RxBufferPool rx_buffer_pool = {1, 0, NULL};
/*
* SendBufferPool
*
* The send side buffer pool definition.
*
*/
typedef struct SendBufferPool SendBufferPool;
struct SendBufferPool
{
/* The maximal number of buffers sender can use. */
int maxCount;
/* The number of buffers sender already used. */
int count;
/* The free buffer list at the sender side. */
ICBufferList freeList;
};
/*
* The sender side buffer pool.
*/
static SendBufferPool snd_buffer_pool;
/*
* SendControlInfo
*
* The related control information for sending data packets and handling acks.
* Main thread use the information in this data structure to do ack handling
* and congestion control.
*
*/
typedef struct SendControlInfo SendControlInfo;
struct SendControlInfo
{
/* The buffer used for accepting acks */
icpkthdr *ackBuffer;
/* congestion window */
float cwnd;
/* minimal congestion control window */
float minCwnd;
/* slow start threshold */
float ssthresh;
};
/*
* Main thread use the information in this data structure to do ack handling
* and congestion control.
*/
static SendControlInfo snd_control_info;
/*
* ICGlobalControlInfo
*
* Some shared control information that is used by main thread (senders, receivers, or both)
* and the background thread.
*
*/
typedef struct ICGlobalControlInfo ICGlobalControlInfo;
struct ICGlobalControlInfo
{
/* The background thread handle. */
pthread_t threadHandle;
/* Keep the udp socket buffer size used. */
uint32 socketSendBufferSize;
uint32 socketRecvBufferSize;
uint64 lastExpirationCheckTime;
uint64 lastDeadlockCheckTime;
/* Used to decide whether to retransmit for capacity based FC. */
uint64 lastPacketSendTime;
/* MemoryContext for UDP interconnect. */
MemoryContext memContext;
/*
* Lock and latch for coordination between main thread and
* background thread. It protects the shared data between the two threads
* (the connHtab, rx buffer pool and the mainWaitingState etc.).
*/
pthread_mutex_t lock;
Latch latch;
/* Am I a sender? */
bool isSender;
/* Flag showing whether the thread is created. */
bool threadCreated;
/* Error number. Actually int but we do not have pg_atomic_int32. */
pg_atomic_uint32 eno;
/*
* Global connection htab for both sending connections and receiving
* connections. Protected by the lock in this data structure.
*/
ConnHashTable connHtab;
/* The connection htab used to cache future packets. */
ConnHashTable startupCacheHtab;
/* Used by main thread to ask the background thread to exit. */
pg_atomic_uint32 shutdown;
/*
* Used by ic thread in the QE to identify the current serving ic instance
* and handle the mismatch packets. It is not used by QD because QD may have
* cursors, QD may receive packets for open the cursors with lower instance
* id, QD use cursorHistoryTable to handle packets mismatch.
*/
uint32 ic_instance_id;
};
/*
* Shared control information that is used by senders, receivers and background thread.
*/
static ICGlobalControlInfo ic_control_info;
/*
* Macro for unack queue ring, round trip time (RTT) and expiration period (RTO)
*
* UNACK_QUEUE_RING_SLOTS_NUM - the number of slots in the unack queue ring.
* this value should be greater than or equal to 2.
* TIMER_SPAN - timer period in us
* TIMER_CHECKING_PERIOD - timer checking period in us
* UNACK_QUEUE_RING_LENGTH - the whole time span of the unack queue ring
* DEFAULT_RTT - default rtt in us.
* MIN_RTT - min rtt in us
* MAX_RTT - max rtt in us
* RTT_SHIFT_COEFFICIENT - coefficient for RTT computation
*
* DEFAULT_DEV - default round trip standard deviation
* MAX_DEV - max dev
* DEV_SHIFT_COEFFICIENT - coefficient for DEV computation
*
* MAX_EXPIRATION_PERIOD - max expiration period in us
* MIN_EXPIRATION_PERIOD - min expiration period in us
* MAX_TIME_NO_TIMER_CHECKING - max time without checking timer
* DEADLOCK_CHECKING_TIME - deadlock checking time
*
* MAX_SEQS_IN_DISORDER_ACK - max number of sequences that can be transmitted in a
* disordered packet ack.
*
*
* Considerations on the settings of the values:
*
* TIMER_SPAN and UNACK_QUEUE_RING_SLOTS_NUM define the ring period.
* Currently, it is UNACK_QUEUE_RING_LENGTH (default 10 seconds).
*
* The definition of UNACK_QUEUE_RING_LENGTH is quite related to the size of
* sender side buffer and the size we may resend in a burst for an expiration event
* (which may overwhelm switch or OS if it is too large).
* Thus, we do not want to send too much data in a single expiration event. Here, a
* relatively large UNACK_QUEUE_RING_SLOTS_NUM value is used to avoid that.
*
* If the sender side buffer is X (MB), then on each slot,
* there are about X/UNACK_QUEUE_RING_SLOTS_NUM. Even we have a very large sender buffer,
* for example, 100MB, there is about 96M/2000 = 50K per slot.
* This is fine for the OS (with buffer 2M for each socket generally) and switch.
*
* Note that even when the buffers are not evenly distributed in the ring and there are some packet
* losses, the congestion control mechanism, the disorder and duplicate packet handling logic will
* assure the number of outstanding buffers (in unack queues) to be not very large.
*
* MIN_RTT/MAX_RTT/DEFAULT_RTT/MIN_EXPIRATION_PERIOD/MAX_EXPIRATION_PERIOD gives some heuristic values about
* the computation of RTT and expiration period. RTT and expiration period (RTO) are not
* constant for various kinds of hardware and workloads. Thus, they are computed dynamically.
* But we also want to bound the values of RTT and MAX_EXPIRATION_PERIOD. It is
* because there are some faults that may make RTT a very abnormal value. Thus, RTT and
* expiration period are upper and lower bounded.
*
* MAX_SEQS_IN_DISORDER_ACK should be smaller than (MIN_PACKET_SIZE - sizeof(icpkthdr))/sizeof(uint32).
* It is due to the limitation of the ack receive buffer size.
*
*/
#define UNACK_QUEUE_RING_SLOTS_NUM (2000)
#define TIMER_SPAN (Gp_interconnect_timer_period * 1000ULL) /* default: 5ms */
#define TIMER_CHECKING_PERIOD (Gp_interconnect_timer_checking_period) /* default: 20ms */
#define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN)
#define DEFAULT_RTT (Gp_interconnect_default_rtt * 1000) /* default: 20ms */
#define MIN_RTT (100) /* 0.1ms */
#define MAX_RTT (200 * 1000) /* 200ms */
#define RTT_SHIFT_COEFFICIENT (3) /* RTT_COEFFICIENT 1/8 (0.125) */
#define DEFAULT_DEV (0)
#define MIN_DEV MIN_RTT
#define MAX_DEV MAX_RTT
#define DEV_SHIFT_COEFFICIENT (2) /* DEV_COEFFICIENT 1/4 (0.25) */
#define MAX_EXPIRATION_PERIOD (1000 * 1000) /* 1s */
#define MIN_EXPIRATION_PERIOD (Gp_interconnect_min_rto * 1000) /* default: 20ms */
#define MAX_TIME_NO_TIMER_CHECKING (50 * 1000) /* 50ms */
#define DEADLOCK_CHECKING_TIME (512 * 1000) /* 512ms */
#define MAX_SEQS_IN_DISORDER_ACK (4)
/*
* UnackQueueRing
*
* An unacked queue ring is used to decide which packet is expired in constant time.
*
* Each slot of the ring represents a fixed time span, for example 1ms, and
* each slot has a associated buffer list/queue which contains the packets
* which will expire in the time span.
*
* If the current time pointer (time t) points to slot 1,
* then slot 2 represents the time span from t + 1ms to t + 2ms.
* When we check whether there are some packets expired, we start from the last
* current time recorded, and resend all the packets in the queue
* until we reach the slot that the updated current time points to.
*
*/
typedef struct UnackQueueRing UnackQueueRing;
struct UnackQueueRing
{
/* save the current time when we check the time wheel for expiration */
uint64 currentTime;
/* the slot index corresponding to current time */
int idx;
/* the number of outstanding packets in unack queue ring */
int numOutStanding;
/*
* the number of outstanding packets that use the shared bandwidth in the
* congestion window.
*/
int numSharedOutStanding;
/* time slots */
ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM];
};
/*
* All connections in a process share this unack queue ring instance.
*/
static UnackQueueRing unack_queue_ring = {0, 0, 0};
static int ICSenderSocket = -1;
static uint16 ICSenderPort = 0;
static int ICSenderFamily = 0;
/*
* AckSendParam
*
* The parameters for ack sending.
*/
typedef struct AckSendParam
{
/* header for the ack */
icpkthdr msg;
/* peer address for the ack */
struct sockaddr_storage peer;
socklen_t peer_len;
} AckSendParam;
/*
* ICStatistics
*
* A structure keeping various statistics about interconnect internal.
*
* Note that the statistics for ic are not accurate for multiple cursor case on QD.
*
* totalRecvQueueSize - receive queue size sum when main thread is trying to get a packet.
* recvQueueSizeCountingTime - counting times when computing totalRecvQueueSize.
* totalCapacity - the capacity sum when packets are tried to be sent.
* capacityCountingTime - counting times used to compute totalCapacity.
* totalBuffers - total buffers available when sending packets.
* bufferCountingTime - counting times when compute totalBuffers.
* activeConnectionsNum - the number of active connections.
* retransmits - the number of packet retransmits.
* mismatchNum - the number of mismatched packets received.
* crcErrors - the number of crc errors.
* sndPktNum - the number of packets sent by sender.
* recvPktNum - the number of packets received by receiver.
* disorderedPktNum - disordered packet number.
* duplicatedPktNum - duplicate packet number.
* recvAckNum - the number of Acks received.
* statusQueryMsgNum - the number of status query messages sent.
*
*/
typedef struct ICStatistics
{
uint64 totalRecvQueueSize;
uint64 recvQueueSizeCountingTime;
uint64 totalCapacity;
uint64 capacityCountingTime;
uint64 totalBuffers;
uint64 bufferCountingTime;
uint32 activeConnectionsNum;
int32 retransmits;
int32 startupCachedPktNum;
int32 mismatchNum;
int32 crcErrors;
int32 sndPktNum;
int32 recvPktNum;
int32 disorderedPktNum;
int32 duplicatedPktNum;
int32 recvAckNum;
int32 statusQueryMsgNum;
} ICStatistics;
/* Statistics for UDP interconnect. */
static ICStatistics ic_statistics;
/*=========================================================================
* STATIC FUNCTIONS declarations
*/
/* Cursor IC History table related functions. */
static void initCursorICHistoryTable(CursorICHistoryTable *t);
static void addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid);
static void updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status);
static CursorICHistoryEntry *getCursorIcEntry(CursorICHistoryTable *t, uint32 icId);
static void pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId);
static void purgeCursorIcEntry(CursorICHistoryTable *t);
static void resetMainThreadWaiting(ThreadWaitingState *state);
static void setMainThreadWaiting(ThreadWaitingState *state, int motNodeId, int route, int icId);
/* Background thread error handling functions. */
static void checkRxThreadError(void);
static void setRxThreadError(int eno);
static void resetRxThreadError(void);
static void SendDummyPacket(void);
static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort);
static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type);
static void setupUDPListeningSocket(int *listenerSocketFd, uint16 *listenerPort, int *txFamily);
static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates,
ExecSlice *sendSlice,
int *pOutgoingCount);
static void setupOutgoingUDPConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn);
/* Connection hash table functions. */
static bool initConnHashTable(ConnHashTable *ht, MemoryContext ctx);
static bool connAddHash(ConnHashTable *ht, MotionConn *conn);
static MotionConn *findConnByHeader(ConnHashTable *ht, icpkthdr *hdr);
static void destroyConnHashTable(ConnHashTable *ht);
static inline void sendAckWithParam(AckSendParam *param);
static void sendAck(MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq);
static void sendDisorderAck(MotionConn *conn, uint32 seq, uint32 extraSeq, uint32 lostPktCnt);
static void sendStatusQueryMessage(MotionConn *conn, int fd, uint32 seq);
static inline void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen);
static void putRxBufferAndSendAck(MotionConn *conn, AckSendParam *param);
static inline void putRxBufferToFreeList(RxBufferPool *p, icpkthdr *buf);
static inline icpkthdr *getRxBufferFromFreeList(RxBufferPool *p);
static icpkthdr *getRxBuffer(RxBufferPool *p);
/* ICBufferList functions. */
static inline void icBufferListInitHeadLink(ICBufferLink *link);
static inline void icBufferListInit(ICBufferList *list, ICBufferListType type);
static inline bool icBufferListIsHead(ICBufferList *list, ICBufferLink *link);
static inline ICBufferLink *icBufferListFirst(ICBufferList *list);
static inline int icBufferListLength(ICBufferList *list);
static inline ICBuffer *icBufferListDelete(ICBufferList *list, ICBuffer *buf);
static inline ICBuffer *icBufferListPop(ICBufferList *list);
static void icBufferListFree(ICBufferList *list);
static inline ICBuffer *icBufferListAppend(ICBufferList *list, ICBuffer *buf);
static void icBufferListReturn(ICBufferList *list, bool inExpirationQueue);
static ChunkTransportState *SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable);
static inline TupleChunkListItem RecvTupleChunkFromAnyUDPIFC_Internal(ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute);
static inline TupleChunkListItem RecvTupleChunkFromUDPIFC_Internal(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute);
static void TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
bool hasErrors);
static void freeDisorderedPackets(MotionConn *conn);
static void prepareRxConnForRead(MotionConn *conn);
static TupleChunkListItem RecvTupleChunkFromAnyUDPIFC(ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute);
static TupleChunkListItem RecvTupleChunkFromUDPIFC(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute);
static TupleChunkListItem receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
int16 motNodeID, int16 *srcRoute, MotionConn *conn);
static TupleChunkListItem receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent);
static void SendEosUDPIFC(ChunkTransportState *transportStates,
int motNodeID, TupleChunkListItem tcItem);
static bool SendChunkUDPIFC(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, TupleChunkListItem tcItem, int16 motionId);
static void doSendStopMessageUDPIFC(ChunkTransportState *transportStates, int16 motNodeID);
static bool dispatcherAYT(void);
static void checkQDConnectionAlive(void);
static void *rxThreadFunc(void *arg);
static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
static void handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now);
static bool handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry);
static void handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, int16 motionId);
static void handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt);
static bool handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param, bool *wakeup_mainthread);
static bool handleAckForDuplicatePkt(MotionConn *conn, icpkthdr *pkt);
static bool handleAckForDisorderPkt(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, icpkthdr *pkt);
static inline void prepareXmit(MotionConn *conn);
static inline void addCRC(icpkthdr *pkt);
static inline bool checkCRC(icpkthdr *pkt);
static void sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn);
static void sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, ICBuffer *buf, MotionConn *conn);
static inline uint64 computeExpirationPeriod(MotionConn *conn, uint32 retry);
static ICBuffer *getSndBuffer(MotionConn *conn);
static void initSndBufferPool();
static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now);
static void initUnackQueueRing(UnackQueueRing *uqr);
static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now);
static void checkDeadlock(ChunkTransportStateEntry *pEntry, MotionConn *conn);
static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
static void cleanupStartupCache(void);
static void handleCachedPackets(void);
static uint64 getCurrentTime(void);
static void initMutex(pthread_mutex_t *mutex);
static inline void logPkt(char *prefix, icpkthdr *pkt);
static void aggregateStatistics(ChunkTransportStateEntry *pEntry);
static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout);
static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, const char *errDetail);
/* #define TRANSFER_PROTOCOL_STATS */
#ifdef TRANSFER_PROTOCOL_STATS
typedef enum TransProtoEvent TransProtoEvent;
enum TransProtoEvent
{
TPE_DATA_PKT_SEND,
TPE_ACK_PKT_QUERY
};
typedef struct TransProtoStatEntry TransProtoStatEntry;
struct TransProtoStatEntry
{
TransProtoStatEntry *next;
/* Basic information */
uint32 time;
TransProtoEvent event;
int dstPid;
uint32 seq;
/* more attributes can be added on demand. */
/*
* float cwnd; int capacity;
*/
};
typedef struct TransProtoStats TransProtoStats;
struct TransProtoStats
{
pthread_mutex_t lock;
TransProtoStatEntry *head;
TransProtoStatEntry *tail;
uint64 count;
uint64 startTime;
};
static TransProtoStats trans_proto_stats =
{
PTHREAD_MUTEX_INITIALIZER, NULL, NULL, 0
};
/*
* initTransProtoStats
* Initialize the transport protocol states data structures.
*/
static void
initTransProtoStats()
{
pthread_mutex_lock(&trans_proto_stats.lock);
while (trans_proto_stats.head)
{
TransProtoStatEntry *cur = NULL;
cur = trans_proto_stats.head;
trans_proto_stats.head = trans_proto_stats.head->next;
free(cur);
trans_proto_stats.count--;
}
trans_proto_stats.head = NULL;
trans_proto_stats.tail = NULL;
trans_proto_stats.count = 0;
trans_proto_stats.startTime = getCurrentTime();
pthread_mutex_unlock(&trans_proto_stats.lock);
}
static void
updateStats(TransProtoEvent event, MotionConn *conn, icpkthdr *pkt)
{
TransProtoStatEntry *new = NULL;
/* Add to list */
new = (TransProtoStatEntry *) malloc(sizeof(TransProtoStatEntry));
if (!new)
return;
memset(new, 0, sizeof(*new));
/* change the list */
pthread_mutex_lock(&trans_proto_stats.lock);
if (trans_proto_stats.count == 0)
{
/* 1st element */
trans_proto_stats.head = new;
trans_proto_stats.tail = new;
}
else
{
trans_proto_stats.tail->next = new;
trans_proto_stats.tail = new;
}
trans_proto_stats.count++;
new->time = getCurrentTime() - trans_proto_stats.startTime;
new->event = event;
new->dstPid = pkt->dstPid;
new->seq = pkt->seq;
/*
* Other attributes can be added on demand new->cwnd =
* snd_control_info.cwnd; new->capacity = conn->capacity;
*/
pthread_mutex_unlock(&trans_proto_stats.lock);
}
static void
dumpTransProtoStats()
{
char tmpbuf[32];
snprintf(tmpbuf, 32, "%d." UINT64_FORMAT "txt", MyProcPid, getCurrentTime());
FILE *ofile = fopen(tmpbuf, "w+");
pthread_mutex_lock(&trans_proto_stats.lock);
while (trans_proto_stats.head)
{
TransProtoStatEntry *cur = NULL;
cur = trans_proto_stats.head;
trans_proto_stats.head = trans_proto_stats.head->next;
fprintf(ofile, "time %d event %d seq %d destpid %d\n", cur->time, cur->event, cur->seq, cur->dstPid);
free(cur);
trans_proto_stats.count--;
}
trans_proto_stats.tail = NULL;
pthread_mutex_unlock(&trans_proto_stats.lock);
fclose(ofile);
}
#endif /* TRANSFER_PROTOCOL_STATS */
/*
* initCursorICHistoryTable
* Initialize cursor ic history table.
*/
static void
initCursorICHistoryTable(CursorICHistoryTable *t)
{
t->count = 0;
memset(t->table, 0, sizeof(t->table));
}
/*
* addCursorIcEntry
* Add an entry to the cursor ic table.
*/
static void
addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid)
{
MemoryContext old;
CursorICHistoryEntry *p;
uint32 index = icId % CURSOR_IC_TABLE_SIZE;
old = MemoryContextSwitchTo(ic_control_info.memContext);
p = palloc0(sizeof(struct CursorICHistoryEntry));
MemoryContextSwitchTo(old);
p->icId = icId;
p->cid = cid;
p->status = 1;
p->next = t->table[index];
t->table[index] = p;
t->count++;
elog(DEBUG2, "add icid %d cid %d status %d", p->icId, p->cid, p->status);
return;
}
/*
* updateCursorIcEntry
* Update the status of the cursor ic entry for a given interconnect instance id.
*
* There are two states for an instance of interconnect.
* state 1 (value 1): interconnect is setup
* state 0 (value 0): interconnect was torn down.
*/
static void
updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status)
{
struct CursorICHistoryEntry *p;
uint8 index = icId % CURSOR_IC_TABLE_SIZE;
for (p = t->table[index]; p; p = p->next)
{
if (p->icId == icId)
{
p->status = status;
return;
}
}
/* not found */
}
/*
* getCursorIcEntry
* Get the cursor entry given an interconnect id.
*/
static CursorICHistoryEntry *
getCursorIcEntry(CursorICHistoryTable *t, uint32 icId)
{
struct CursorICHistoryEntry *p;
uint8 index = icId % CURSOR_IC_TABLE_SIZE;
for (p = t->table[index]; p; p = p->next)
{
if (p->icId == icId)
{
return p;
}
}
/* not found */
return NULL;
}
/*
* pruneCursorIcEntry
* Prune entries in the hash table.
*/
static void
pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId)
{
uint8 index;
for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++)
{
struct CursorICHistoryEntry *p,
*q;
p = t->table[index];
q = NULL;
while (p)
{
/* remove an entry if it is older than the prune-point */
if (p->icId < icId)
{
struct CursorICHistoryEntry *trash;
if (!q)
{
t->table[index] = p->next;
}
else
{
q->next = p->next;
}
trash = p;
/* set up next loop */
p = trash->next;
pfree(trash);
t->count--;
}
else
{
q = p;
p = p->next;
}
}
}
}
/*
* purgeCursorIcEntry
* Clean cursor ic history table.
*/
static void
purgeCursorIcEntry(CursorICHistoryTable *t)
{
uint8 index;
for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++)
{
struct CursorICHistoryEntry *trash;
while (t->table[index])
{
trash = t->table[index];
t->table[index] = trash->next;
pfree(trash);
}
}
}
/*
* resetMainThreadWaiting
* Reset main thread waiting state.
*/
static void
resetMainThreadWaiting(ThreadWaitingState *state)
{
state->waiting = false;
state->waitingNode = -1;
state->waitingRoute = ANY_ROUTE;
state->reachRoute = ANY_ROUTE;
state->waitingQuery = -1;
}
/*
* setMainThreadWaiting
* Set main thread waiting state.
*/
static void
setMainThreadWaiting(ThreadWaitingState *state, int motNodeId, int route, int icId)
{
state->waiting = true;
state->waitingNode = motNodeId;
state->waitingRoute = route;
state->reachRoute = ANY_ROUTE;
state->waitingQuery = icId;
}
/*
* checkRxThreadError
* Check whether there was error in the background thread in main thread.
*
* If error found, report it.
*/
static void
checkRxThreadError()
{
int eno;
eno = pg_atomic_read_u32(&ic_control_info.eno);
if (eno != 0)
{
errno = eno;
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect encountered an error"),
errdetail("%s: %m", "in receive background thread")));
}
}
/*
* setRxThreadError
* Set the error no in background thread.
*
* Record the error in background thread. Main thread checks the errors periodically.
* If main thread will find it, main thread will handle it.
*/
static void
setRxThreadError(int eno)
{
uint32 expected = 0;
/* always let main thread know the error that occurred first. */
if (pg_atomic_compare_exchange_u32(&ic_control_info.eno, &expected, (uint32) eno))
{
write_log("Interconnect error: in background thread, set ic_control_info.eno to %d, rx_buffer_pool.count %d, rx_buffer_pool.maxCount %d", expected, rx_buffer_pool.count, rx_buffer_pool.maxCount);
}
}
/*
* resetRxThreadError
* Reset the error no.
*
*/
static void
resetRxThreadError()
{
pg_atomic_write_u32(&ic_control_info.eno, 0);
}
/*
* setupUDPListeningSocket
* Setup udp listening socket.
*/
static void
setupUDPListeningSocket(int *listenerSocketFd, uint16 *listenerPort, int *txFamily)
{
struct addrinfo *addrs = NULL;
struct addrinfo *addr;
struct addrinfo hints;
int ret;
int ic_socket = PGINVALID_SOCKET;
struct sockaddr_storage ic_socket_addr;
int tries = 0;
struct sockaddr_storage listenerAddr;
socklen_t listenerAddrlen = sizeof(ic_socket_addr);
uint32 socketSendBufferSize;
uint32 socketRecvBufferSize;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_protocol = 0;
hints.ai_addrlen = 0;
hints.ai_addr = NULL;
hints.ai_canonname = NULL;
hints.ai_next = NULL;
hints.ai_flags |= AI_NUMERICHOST;
#ifdef USE_ASSERT_CHECKING
if (gp_udpic_network_disable_ipv6)
hints.ai_family = AF_INET;
#endif
if (Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST)
{
Assert(interconnect_address && strlen(interconnect_address) > 0);
hints.ai_flags |= AI_NUMERICHOST;
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errmsg("getaddrinfo called with unicast address: %s",
interconnect_address)));
}
else
{
Assert(interconnect_address == NULL);
hints.ai_flags |= AI_PASSIVE;
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errmsg("getaddrinfo called with wildcard address")));
}
/*
* Restrict what IP address we will listen on to just the one that was
* used to create this QE session.
*/
Assert(interconnect_address && strlen(interconnect_address) > 0);
ret = pg_getaddrinfo_all(interconnect_address, NULL, &hints, &addrs);
if (ret || !addrs)
{
ereport(LOG,
(errmsg("could not resolve address for UDP IC socket %s: %s",
interconnect_address,
gai_strerror(ret))));
goto startup_failed;
}
/*
* On some platforms, pg_getaddrinfo_all() may return multiple addresses
* only one of which will actually work (eg, both IPv6 and IPv4 addresses
* when kernel will reject IPv6). Worse, the failure may occur at the
* bind() or perhaps even connect() stage. So we must loop through the
* results till we find a working combination. We will generate DEBUG
* messages, but no error, for bogus combinations.
*/
for (addr = addrs; addr != NULL; addr = addr->ai_next)
{
#ifdef HAVE_UNIX_SOCKETS
/* Ignore AF_UNIX sockets, if any are returned. */
if (addr->ai_family == AF_UNIX)
continue;
#endif
ereportif(++tries > 1 && gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
errmsg("trying another address for UDP interconnect socket"));
ic_socket = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (ic_socket == PGINVALID_SOCKET)
{
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errcode_for_socket_access(),
errmsg("could not create UDP interconnect socket: %m")));
continue;
}
/*
* Bind the socket to a kernel assigned ephemeral port on the
* interconnect_address.
*/
if (bind(ic_socket, addr->ai_addr, addr->ai_addrlen) < 0)
{
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errcode_for_socket_access(),
errmsg("could not bind UDP interconnect socket: %m")));
closesocket(ic_socket);
ic_socket = PGINVALID_SOCKET;
continue;
}
/* Call getsockname() to eventually obtain the assigned ephemeral port */
if (getsockname(ic_socket, (struct sockaddr *) &listenerAddr, &listenerAddrlen) < 0)
{
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errcode_for_socket_access(),
errmsg("could not get address of socket for UDP interconnect: %m")));
closesocket(ic_socket);
ic_socket = PGINVALID_SOCKET;
continue;
}
/* If we get here, we have a working socket */
break;
}
if (!addr || ic_socket == PGINVALID_SOCKET)
goto startup_failed;
/* Memorize the socket fd, kernel assigned port and address family */
*listenerSocketFd = ic_socket;
if (listenerAddr.ss_family == AF_INET6)
{
*listenerPort = ntohs(((struct sockaddr_in6 *) &listenerAddr)->sin6_port);
*txFamily = AF_INET6;
}
else
{
*listenerPort = ntohs(((struct sockaddr_in *) &listenerAddr)->sin_port);
*txFamily = AF_INET;
}
/* Set up socket non-blocking mode */
if (!pg_set_noblock(ic_socket))
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not set UDP interconnect socket to nonblocking mode: %m")));
goto startup_failed;
}
/* Set up the socket's send and receive buffer sizes. */
socketRecvBufferSize = setUDPSocketBufferSize(ic_socket, SO_RCVBUF);
if (socketRecvBufferSize == -1)
goto startup_failed;
ic_control_info.socketRecvBufferSize = socketRecvBufferSize;
socketSendBufferSize = setUDPSocketBufferSize(ic_socket, SO_SNDBUF);
if (socketSendBufferSize == -1)
goto startup_failed;
ic_control_info.socketSendBufferSize = socketSendBufferSize;
pg_freeaddrinfo_all(hints.ai_family, addrs);
return;
startup_failed:
if (addrs)
pg_freeaddrinfo_all(hints.ai_family, addrs);
if (ic_socket != PGINVALID_SOCKET)
closesocket(ic_socket);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: Could not set up udp interconnect socket: %m")));
}
/*
* InitMutex
* Initialize mutex.
*/
static void
initMutex(pthread_mutex_t *mutex)
{
pthread_mutexattr_t m_atts;
pthread_mutexattr_init(&m_atts);
pthread_mutexattr_settype(&m_atts, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init(mutex, &m_atts);
}
/*
* Set up the udp interconnect pthread signal mask, we don't want to run our signal handlers
*/
static void
ic_set_pthread_sigmasks(sigset_t *old_sigs)
{
#ifndef WIN32
sigset_t sigs;
int err;
sigfillset(&sigs);
err = pthread_sigmask(SIG_BLOCK, &sigs, old_sigs);
if (err != 0)
elog(ERROR, "Failed to get pthread signal masks with return value: %d", err);
#else
(void) old_sigs;
#endif
return;
}
static void
ic_reset_pthread_sigmasks(sigset_t *sigs)
{
#ifndef WIN32
int err;
err = pthread_sigmask(SIG_SETMASK, sigs, NULL);
if (err != 0)
elog(ERROR, "Failed to reset pthread signal masks with return value: %d", err);
#else
(void) sigs;
#endif
return;
}
/*
* InitMotionUDPIFC
* Initialize UDP specific comms, and create rx-thread.
*/
void
InitMotionUDPIFC(int *listenerSocketFd, uint16 *listenerPort)
{
int pthread_err;
int txFamily = -1;
/* attributes of the thread we're creating */
pthread_attr_t t_atts;
sigset_t pthread_sigs;
MemoryContext old;
#ifdef USE_ASSERT_CHECKING
set_test_mode();
#endif
/* Initialize global ic control data. */
pg_atomic_init_u32(&ic_control_info.eno, 0);
ic_control_info.isSender = false;
ic_control_info.socketSendBufferSize = 2 * 1024 * 1024;
ic_control_info.socketRecvBufferSize = 2 * 1024 * 1024;
ic_control_info.memContext = AllocSetContextCreate(TopMemoryContext,
"UdpInterconnectMemContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
initMutex(&ic_control_info.lock);
InitLatch(&ic_control_info.latch);
pg_atomic_init_u32(&ic_control_info.shutdown, 0);
ic_control_info.threadCreated = false;
ic_control_info.ic_instance_id = 0;
old = MemoryContextSwitchTo(ic_control_info.memContext);
initConnHashTable(&ic_control_info.connHtab, ic_control_info.memContext);
if (!initConnHashTable(&ic_control_info.startupCacheHtab, NULL))
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to initialize connection htab for startup cache")));
/*
* setup listening socket and sending socket for Interconnect.
*/
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily);
setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily);
/* Initialize receive control data. */
resetMainThreadWaiting(&rx_control_info.mainWaitingState);
/* allocate a buffer for sending disorder messages */
rx_control_info.disorderBuffer = palloc0(MIN_PACKET_SIZE);
rx_control_info.lastTornIcId = 0;
initCursorICHistoryTable(&rx_control_info.cursorHistoryTable);
/* Initialize receive buffer pool */
rx_buffer_pool.count = 0;
rx_buffer_pool.maxCount = 1;
rx_buffer_pool.freeList = NULL;
/* Initialize send control data */
snd_control_info.cwnd = 0;
snd_control_info.minCwnd = 0;
snd_control_info.ackBuffer = palloc0(MIN_PACKET_SIZE);
MemoryContextSwitchTo(old);
#ifdef TRANSFER_PROTOCOL_STATS
initMutex(&trans_proto_stats.lock);
#endif
/* Start up our rx-thread */
/*
* save ourselves some memory: the defaults for thread stack size are
* large (1M+)
*/
pthread_attr_init(&t_atts);
pthread_attr_setstacksize(&t_atts, Max(PTHREAD_STACK_MIN, (128 * 1024)));
ic_set_pthread_sigmasks(&pthread_sigs);
pthread_err = pthread_create(&ic_control_info.threadHandle, &t_atts, rxThreadFunc, NULL);
ic_reset_pthread_sigmasks(&pthread_sigs);
pthread_attr_destroy(&t_atts);
if (pthread_err != 0)
{
ic_control_info.threadCreated = false;
ereport(FATAL,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("InitMotionLayerIPC: failed to create thread"),
errdetail("pthread_create() failed with err %d", pthread_err)));
}
ic_control_info.threadCreated = true;
return;
}
/*
* CleanupMotionUDPIFC
* Clean up UDP specific stuff such as cursor ic hash table, thread etc.
*/
void
CleanupMotionUDPIFC(void)
{
elog(DEBUG2, "udp-ic: telling receiver thread to shutdown.");
/*
* We should not hold any lock when we reach here even when we report
* FATAL errors. Just in case, We still release the locks here.
*/
pthread_mutex_unlock(&ic_control_info.lock);
/* Shutdown rx thread. */
pg_atomic_write_u32(&ic_control_info.shutdown, 1);
if (ic_control_info.threadCreated)
pthread_join(ic_control_info.threadHandle, NULL);
elog(DEBUG2, "udp-ic: receiver thread shutdown.");
purgeCursorIcEntry(&rx_control_info.cursorHistoryTable);
destroyConnHashTable(&ic_control_info.connHtab);
/* background thread exited, we can do the cleanup without locking. */
cleanupStartupCache();
destroyConnHashTable(&ic_control_info.startupCacheHtab);
/* free the disorder buffer */
pfree(rx_control_info.disorderBuffer);
rx_control_info.disorderBuffer = NULL;
/* free the buffer for acks */
pfree(snd_control_info.ackBuffer);
snd_control_info.ackBuffer = NULL;
MemoryContextDelete(ic_control_info.memContext);
if (ICSenderSocket >= 0)
closesocket(ICSenderSocket);
ICSenderSocket = -1;
ICSenderPort = 0;
ICSenderFamily = 0;
#ifdef USE_ASSERT_CHECKING
/*
* Check malloc times, in Interconnect part, memory are carefully released
* in tear down code (even when error occurred). But if a FATAL error is
* reported, tear down code will not be executed. Thus, it is still
* possible the malloc times and free times do not match when we reach
* here. The process will die in this case, the mismatch does not
* introduce issues.
*/
if (icudp_malloc_times != 0)
elog(LOG, "WARNING: malloc times and free times do not match.");
#endif
}
/*
* initConnHashTable
* Initialize a connection hash table.
*/
static bool
initConnHashTable(ConnHashTable *ht, MemoryContext cxt)
{
int i;
ht->cxt = cxt;
ht->size = Gp_role == GP_ROLE_DISPATCH ? (getgpsegmentCount() * 2) : ic_htab_size;
Assert(ht->size > 0);
if (ht->cxt)
{
ht->table = (struct ConnHtabBin **) palloc0(ht->size * sizeof(struct ConnHtabBin *));
}
else
{
ht->table = (struct ConnHtabBin **) malloc(ht->size * sizeof(struct ConnHtabBin *));
if (ht->table == NULL)
return false;
}
for (i = 0; i < ht->size; i++)
ht->table[i] = NULL;
return true;
}
/*
* connAddHash
* Add a connection to the hash table
*
* Note: we want to add a connection to the hashtable if it isn't
* already there ... so we just have to check the pointer values -- no
* need to use CONN_HASH_MATCH() at all!
*/
static bool
connAddHash(ConnHashTable *ht, MotionConn *conn)
{
uint32 hashcode;
struct ConnHtabBin *bin,
*newbin;
MemoryContext old = NULL;
hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size;
/*
* check for collision -- if we already have an entry for this connection,
* don't add another one.
*/
for (bin = ht->table[hashcode]; bin != NULL; bin = bin->next)
{
if (bin->conn == conn)
{
elog(DEBUG5, "connAddHash(): duplicate ?! node %d route %d", conn->conn_info.motNodeId, conn->route);
return true; /* false *only* indicates memory-alloc
* failure. */
}
}
if (ht->cxt)
{
old = MemoryContextSwitchTo(ht->cxt);
newbin = (struct ConnHtabBin *) palloc0(sizeof(struct ConnHtabBin));
}
else
{
newbin = (struct ConnHtabBin *) malloc(sizeof(struct ConnHtabBin));
if (newbin == NULL)
return false;
}
newbin->conn = conn;
newbin->next = ht->table[hashcode];
ht->table[hashcode] = newbin;
if (ht->cxt)
MemoryContextSwitchTo(old);
ic_statistics.activeConnectionsNum++;
return true;
}
/*
* connDelHash
* Delete a connection from the hash table
*
* Note: we want to remove a connection from the hashtable if it is
* there ... so we just have to check the pointer values -- no need to
* use CONN_HASH_MATCH() at all!
*/
static void
connDelHash(ConnHashTable *ht, MotionConn *conn)
{
uint32 hashcode;
struct ConnHtabBin *c,
*p,
*trash;
hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size;
c = ht->table[hashcode];
/* find entry */
p = NULL;
while (c != NULL)
{
/* found ? */
if (c->conn == conn)
break;
p = c;
c = c->next;
}
/* not found ? */
if (c == NULL)
{
return;
}
/* found the connection, remove from the chain. */
trash = c;
if (p == NULL)
ht->table[hashcode] = c->next;
else
p->next = c->next;
if (ht->cxt)
pfree(trash);
else
free(trash);
ic_statistics.activeConnectionsNum--;
return;
}
/*
* findConnByHeader
* Find the corresponding connection given a pkt header information.
*
* With the new mirroring scheme, the interconnect is no longer involved:
* we don't have to disambiguate anymore.
*
* NOTE: the icpkthdr field dstListenerPort is used for disambiguation.
* on receivers it may not match the actual port (it may have an extra bit
* set (1<<31)).
*/
static MotionConn *
findConnByHeader(ConnHashTable *ht, icpkthdr *hdr)
{
uint32 hashcode;
struct ConnHtabBin *bin;
MotionConn *ret = NULL;
hashcode = CONN_HASH_VALUE(hdr) % ht->size;
for (bin = ht->table[hashcode]; bin != NULL; bin = bin->next)
{
if (CONN_HASH_MATCH(&bin->conn->conn_info, hdr))
{
ret = bin->conn;
if (DEBUG5 >= log_min_messages)
write_log("findConnByHeader: found. route %d state %d hashcode %d conn %p",
ret->route, ret->state, hashcode, ret);
return ret;
}
}
if (DEBUG5 >= log_min_messages)
write_log("findConnByHeader: not found! (hdr->srcPid %d "
"hdr->srcContentId %d hdr->dstContentId %d hdr->dstPid %d sess(%d:%d) cmd(%d:%d)) hashcode %d",
hdr->srcPid, hdr->srcContentId, hdr->dstContentId, hdr->dstPid, hdr->sessionId,
gp_session_id, hdr->icId, ic_control_info.ic_instance_id, hashcode);
return NULL;
}
/*
* destroyConnHashTable
* Release the connection hash table.
*/
static void
destroyConnHashTable(ConnHashTable *ht)
{
int i;
for (i = 0; i < ht->size; i++)
{
struct ConnHtabBin *trash;
while (ht->table[i] != NULL)
{
trash = ht->table[i];
ht->table[i] = trash->next;
if (ht->cxt)
pfree(trash);
else
free(trash);
}
}
if (ht->cxt)
pfree(ht->table);
else
free(ht->table);
ht->table = NULL;
ht->size = 0;
}
/*
* sendControlMessage
* Helper function to send a control message.
*/
static inline void
sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen)
{
int n;
#ifdef USE_ASSERT_CHECKING
if (testmode_inject_fault(gp_udpic_dropacks_percent))
{
#ifdef AMS_VERBOSE_LOGGING
write_log("THROW CONTROL MESSAGE with seq %d extraSeq %d srcpid %d despid %d", pkt->seq, pkt->extraSeq, pkt->srcPid, pkt->dstPid);
#endif
return;
}
#endif
/* Add CRC for the control message. */
if (gp_interconnect_full_crc)
addCRC(pkt);
char errDetail[100];
snprintf(errDetail, sizeof(errDetail), "Send control message: got error with seq %u", pkt->seq);
/* Retry for infinite times since we have no retransmit mechanism for control message */
n = sendtoWithRetry(fd, (const char *) pkt, pkt->len, 0, addr, peerLen, -1, errDetail);
if (n < pkt->len)
write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq);
}
/*
* setAckSendParam
* Set the ack sending parameters.
*/
static inline void
setAckSendParam(AckSendParam *param, MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq)
{
memcpy(¶m->msg, (char *) &conn->conn_info, sizeof(icpkthdr));
param->msg.flags = flags;
param->msg.seq = seq;
param->msg.extraSeq = extraSeq;
param->msg.len = sizeof(icpkthdr);
param->peer = conn->peer;
param->peer_len = conn->peer_len;
}
/*
* sendAckWithParam
* Send acknowledgment to sender.
*/
static inline void
sendAckWithParam(AckSendParam *param)
{
sendControlMessage(¶m->msg, UDP_listenerFd, (struct sockaddr *) ¶m->peer, param->peer_len);
}
/*
* sendAck
* Send acknowledgment to sender.
*/
static void
sendAck(MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq)
{
icpkthdr msg;
memcpy(&msg, (char *) &conn->conn_info, sizeof(msg));
msg.flags = flags;
msg.seq = seq;
msg.extraSeq = extraSeq;
msg.len = sizeof(icpkthdr);
#ifdef AMS_VERBOSE_LOGGING
write_log("sendack: flags 0x%x node %d route %d seq %d extraSeq %d",
msg.flags, msg.motNodeId, conn->route, msg.seq, msg.extraSeq);
#endif
sendControlMessage(&msg, UDP_listenerFd, (struct sockaddr *) &conn->peer, conn->peer_len);
}
/*
* sendDisorderAck
* Send a disorder message to the sender.
*
* Whenever the receiver detects a disorder packet, it will assemble a disorder message
* which contains the sequence numbers of the possibly lost packets.
*
*/
static void
sendDisorderAck(MotionConn *conn, uint32 seq, uint32 extraSeq, uint32 lostPktCnt)
{
icpkthdr *disorderBuffer = rx_control_info.disorderBuffer;
memcpy(disorderBuffer, (char *) &conn->conn_info, sizeof(icpkthdr));
disorderBuffer->flags |= UDPIC_FLAGS_DISORDER;
disorderBuffer->seq = seq;
disorderBuffer->extraSeq = extraSeq;
disorderBuffer->len = lostPktCnt * sizeof(uint32) + sizeof(icpkthdr);
#ifdef AMS_VERBOSE_LOGGING
if (!(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6))
{
write_log("UDP Interconnect bug (in sendDisorderAck): trying to send ack when we don't know where to send to %s", conn->remoteHostAndPort);
}
#endif
sendControlMessage(disorderBuffer, UDP_listenerFd, (struct sockaddr *) &conn->peer, conn->peer_len);
}
/*
* sendStatusQueryMessage
* Used by senders to send a status query message for a connection to receivers.
*
* When receivers get such a message, they will respond with
* the connection status (consumed seq, received seq ...).
*/
static void
sendStatusQueryMessage(MotionConn *conn, int fd, uint32 seq)
{
icpkthdr msg;
memcpy(&msg, (char *) &conn->conn_info, sizeof(msg));
msg.flags = UDPIC_FLAGS_CAPACITY;
msg.seq = seq;
msg.extraSeq = 0;
msg.len = sizeof(msg);
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_ACK_PKT_QUERY, conn, &msg);
#endif
sendControlMessage(&msg, fd, (struct sockaddr *) &conn->peer, conn->peer_len);
}
/*
* putRxBufferAndSendAck
* Return a buffer and send an acknowledgment.
*
* SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
*/
static void
putRxBufferAndSendAck(MotionConn *conn, AckSendParam *param)
{
icpkthdr *buf;
uint32 seq;
buf = (icpkthdr *) conn->pkt_q[conn->pkt_q_head];
if (buf == NULL)
{
pthread_mutex_unlock(&ic_control_info.lock);
elog(FATAL, "putRxBufferAndSendAck: buffer is NULL");
}
seq = buf->seq;
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "putRxBufferAndSendAck conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
conn->pkt_q[conn->pkt_q_head] = NULL;
conn->pBuff = NULL;
conn->pkt_q_head = (conn->pkt_q_head + 1) % conn->pkt_q_capacity;
conn->pkt_q_size--;
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "putRxBufferAndSendAck conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
putRxBufferToFreeList(&rx_buffer_pool, buf);
conn->conn_info.extraSeq = seq;
/* Send an Ack to the sender. */
if ((seq % 2 == 0) || (conn->pkt_q_capacity == 1))
{
if (param != NULL)
{
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, seq);
}
else
{
sendAck(conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, seq);
}
}
}
/*
* MlPutRxBufferIFC
*
* The cdbmotion code has discarded our pointer to the motion-conn
* structure, but has enough info to fully specify it.
*/
void
MlPutRxBufferIFC(ChunkTransportState *transportStates, int motNodeID, int route)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
AckSendParam param;
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + route;
memset(¶m, 0, sizeof(AckSendParam));
pthread_mutex_lock(&ic_control_info.lock);
if (conn->pBuff != NULL)
{
putRxBufferAndSendAck(conn, ¶m);
}
else
{
pthread_mutex_unlock(&ic_control_info.lock);
elog(FATAL, "Interconnect error: tried to release a NULL buffer");
}
pthread_mutex_unlock(&ic_control_info.lock);
/*
* real ack sending is after lock release to decrease the lock holding
* time.
*/
if (param.msg.len != 0)
sendAckWithParam(¶m);
}
/*
* getRxBuffer
* Get a receive buffer.
*
* SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static icpkthdr *
getRxBuffer(RxBufferPool *p)
{
icpkthdr *ret = NULL;
#ifdef USE_ASSERT_CHECKING
if (FINC_HAS_FAULT(FINC_RX_BUF_NULL) &&
testmode_inject_fault(gp_udpic_fault_inject_percent))
return NULL;
#endif
do
{
if (p->freeList == NULL)
{
if (p->count > p->maxCount)
{
if (DEBUG3 >= log_min_messages)
write_log("Interconnect ran out of rx-buffers count/max %d/%d", p->count, p->maxCount);
break;
}
/* malloc is used for thread safty. */
ret = (icpkthdr *) malloc(Gp_max_packet_size);
/*
* Note: we return NULL if the malloc() fails -- and the
* background thread will set the error. Main thread will check
* the error, report it and start teardown.
*/
if (ret != NULL)
p->count++;
break;
}
/* we have buffers available in our freelist */
ret = getRxBufferFromFreeList(p);
} while (0);
return ret;
}
/*
* putRxBufferToFreeList
* Return a receive buffer to free list
*
* SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
*/
static inline void
putRxBufferToFreeList(RxBufferPool *p, icpkthdr *buf)
{
/* return the buffer into the free list. */
*(char **) buf = p->freeList;
p->freeList = (char *) buf;
}
/*
* getRxBufferFromFreeList
* Get a receive buffer from free list
*
* SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static inline icpkthdr *
getRxBufferFromFreeList(RxBufferPool *p)
{
icpkthdr *buf = NULL;
buf = (icpkthdr *) p->freeList;
p->freeList = *(char **) (p->freeList);
return buf;
}
/*
* freeRxBuffer
* Free a receive buffer.
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static inline void
freeRxBuffer(RxBufferPool *p, icpkthdr *buf)
{
free(buf);
p->count--;
}
/*
* Set UDP IC send/receive socket buffer size.
*
* We must carefully size the UDP IC socket's send/receive buffers. If the size
* is too small, say 128K, and send queue depth and receive queue depth are
* large, then there might be a lot of dropped/reordered packets. We start
* trying from a size of 2MB (unless Gp_udp_bufsize_k is specified), and
* gradually back off to UDPIC_MIN_BUF_SIZE. For a given size setting to be
* successful, the corresponding UDP kernel buffer size params must be adequate.
*
*/
static uint32
setUDPSocketBufferSize(int ic_socket, int buffer_type)
{
int expected_size;
int curr_size;
ACCEPT_TYPE_ARG3 option_len = 0;
Assert(buffer_type == SO_SNDBUF || buffer_type == SO_RCVBUF);
expected_size = (Gp_udp_bufsize_k ? Gp_udp_bufsize_k * 1024 : 2048 * 1024);
curr_size = expected_size;
option_len = sizeof(curr_size);
while (setsockopt(ic_socket, SOL_SOCKET, buffer_type, (const char *) &curr_size, option_len) < 0)
{
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errmsg("UDP-IC: setsockopt %s failed to set buffer size = %d bytes: %m",
buffer_type == SO_SNDBUF ? "send": "receive",
curr_size)));
curr_size = curr_size >> 1;
if (curr_size < UDPIC_MIN_BUF_SIZE)
return -1;
}
ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3,
(errmsg("UDP-IC: socket %s current buffer size = %d bytes",
buffer_type == SO_SNDBUF ? "send": "receive",
curr_size)));
return curr_size;
}
#if defined(USE_ASSERT_CHECKING) || defined(AMS_VERBOSE_LOGGING)
/*
* icBufferListLog
* Log the buffer list.
*/
static void
icBufferListLog(ICBufferList *list)
{
write_log("Length %d, type %d headptr %p", list->length, list->type, &list->head);
ICBufferLink *bufLink = list->head.next;
int len = list->length;
int i = 0;
while (bufLink != &list->head && len > 0)
{
ICBuffer *buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
: GET_ICBUFFER_FROM_SECONDARY(bufLink));
write_log("Node %d, linkptr %p", i++, bufLink);
logPkt("from list", buf->pkt);
bufLink = bufLink->next;
len--;
}
}
#endif
#ifdef USE_ASSERT_CHECKING
/*
* icBufferListCheck
* Buffer list sanity check.
*/
static void
icBufferListCheck(char *prefix, ICBufferList *list)
{
if (list == NULL)
{
write_log("ICBufferList ERROR %s: NULL list", prefix);
goto error;
}
if (list->length < 0)
{
write_log("ICBufferList ERROR %s: list length %d < 0 ", prefix, list->length);
goto error;
}
if (list->length == 0 && (list->head.prev != list->head.next && list->head.prev != &list->head))
{
write_log("ICBufferList ERROR %s: length is 0, &list->head %p, prev %p, next %p", prefix, &list->head, list->head.prev, list->head.next);
icBufferListLog(list);
goto error;
}
int len = list->length;
ICBufferLink *link = list->head.next;
while (len > 0)
{
link = link->next;
len--;
}
if (link != &list->head)
{
write_log("ICBufferList ERROR: %s len %d", prefix, list->length);
icBufferListLog(list);
goto error;
}
return;
error:
write_log("wait for 120s and then abort.");
pg_usleep(120000000);
abort();
}
#endif
/*
* icBufferListInitHeadLink
* Initialize the pointers in the head link to point to itself.
*/
static inline void
icBufferListInitHeadLink(ICBufferLink *link)
{
link->next = link->prev = link;
}
/*
* icBufferListInit
* Initialize the buffer list with the given type.
*/
static inline void
icBufferListInit(ICBufferList *list, ICBufferListType type)
{
list->type = type;
list->length = 0;
icBufferListInitHeadLink(&list->head);
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListInit", list);
#endif
}
/*
* icBufferListIsHead
* Return whether the given link is the head link of the list.
*
* This function is often used as the end condition of an iteration of the list.
*/
static inline bool
icBufferListIsHead(ICBufferList *list, ICBufferLink *link)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListIsHead", list);
#endif
return (link == &list->head);
}
/*
* icBufferListFirst
* Return the first link after the head link.
*
* Note that the head link is a pseudo link used to only to ease the operations of the link list.
* If the list only contains the head link, this function will return the head link.
*/
static inline ICBufferLink *
icBufferListFirst(ICBufferList *list)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListFirst", list);
#endif
return list->head.next;
}
/*
* icBufferListLength
* Get the list length.
*/
static inline int
icBufferListLength(ICBufferList *list)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListLength", list);
#endif
return list->length;
}
/*
* icBufferListDelete
* Remove an buffer from the buffer list and return the buffer.
*/
static inline ICBuffer *
icBufferListDelete(ICBufferList *list, ICBuffer *buf)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListDelete", list);
#endif
ICBufferLink *bufLink = NULL;
bufLink = (list->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary);
bufLink->prev->next = bufLink->next;
bufLink->next->prev = bufLink->prev;
list->length--;
return buf;
}
/*
* icBufferListPop
* Remove the head buffer from the list.
*/
static inline ICBuffer *
icBufferListPop(ICBufferList *list)
{
ICBuffer *buf = NULL;
ICBufferLink *bufLink = NULL;
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListPop", list);
#endif
if (list->length == 0)
return NULL;
bufLink = icBufferListFirst(list);
buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
: GET_ICBUFFER_FROM_SECONDARY(bufLink));
bufLink->prev->next = bufLink->next;
bufLink->next->prev = bufLink->prev;
list->length--;
return buf;
}
/*
* icBufferListFree
* Free all the buffers in the list.
*/
static void
icBufferListFree(ICBufferList *list)
{
ICBuffer *buf = NULL;
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListFree", list);
#endif
while ((buf = icBufferListPop(list)) != NULL)
pfree(buf);
}
/*
* icBufferListAppend
* Append a buffer to a list.
*/
static inline ICBuffer *
icBufferListAppend(ICBufferList *list, ICBuffer *buf)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListAppend", list);
#endif
ICBufferLink *bufLink = NULL;
bufLink = (list->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary);
bufLink->prev = list->head.prev;
bufLink->next = &list->head;
list->head.prev->next = bufLink;
list->head.prev = bufLink;
list->length++;
return buf;
}
/*
* icBufferListReturn
* Return the buffers in the list to the free buffer list.
*
* If the buf is also in an expiration queue, we also need to remove it from the expiration queue.
*
*/
static void
icBufferListReturn(ICBufferList *list, bool inExpirationQueue)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListReturn", list);
#endif
ICBuffer *buf = NULL;
while ((buf = icBufferListPop(list)) != NULL)
{
if (inExpirationQueue) /* the buf is in also in the expiration queue */
{
icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
unack_queue_ring.numOutStanding--;
if (icBufferListLength(list) >= 1)
unack_queue_ring.numSharedOutStanding--;
}
icBufferListAppend(&snd_buffer_pool.freeList, buf);
}
}
/*
* initUnackQueueRing
* Initialize an unack queue ring.
*
* Align current time to a slot boundary and set current slot index (time pointer) to 0.
*/
static void
initUnackQueueRing(UnackQueueRing *uqr)
{
int i = 0;
uqr->currentTime = 0;
uqr->idx = 0;
uqr->numOutStanding = 0;
uqr->numSharedOutStanding = 0;
for (; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
{
icBufferListInit(&uqr->slots[i], ICBufferListType_Secondary);
}
}
/*
* computeExpirationPeriod
* Compute expiration period according to the connection information.
*
* Considerations on expiration period computation:
*
* RTT is dynamically computed, and expiration period is based on RTT values.
* We cannot simply use RTT as the expiration value, since real workload does
* not always have a stable RTT. A small constant value is multiplied to the RTT value
* to make the resending logic insensitive to the frequent small changes of RTT.
*
*/
static inline uint64
computeExpirationPeriod(MotionConn *conn, uint32 retry)
{
/*
* In fault injection mode, we often use DEFAULT_RTT, because the
* intentional large percent of packet/ack losses will make the RTT too
* large. This will lead to a slow retransmit speed. In real hardware
* environment/workload, we do not expect such a packet loss pattern.
*/
#ifdef USE_ASSERT_CHECKING
if (udp_testmode)
{
return DEFAULT_RTT;
}
else
#endif
{
uint32 factor = (retry <= 12 ? retry : 12);
return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, (conn->rtt + (conn->dev << 2)) << (factor)));
}
}
/*
* initSndBufferPool
* Initialize the send buffer pool.
*
* The initial maxCount is set to 1 for gp_interconnect_snd_queue_depth = 1 case,
* then there is at least an extra free buffer to send for that case.
*/
static void
initSndBufferPool(SendBufferPool *p)
{
icBufferListInit(&p->freeList, ICBufferListType_Primary);
p->count = 0;
p->maxCount = (Gp_interconnect_snd_queue_depth == 1 ? 1 : 0);
}
/*
* cleanSndBufferPool
* Clean the send buffer pool.
*/
static inline void
cleanSndBufferPool(SendBufferPool *p)
{
icBufferListFree(&p->freeList);
p->count = 0;
p->maxCount = 0;
}
/*
* getSndBuffer
* Get a send buffer for a connection.
*
* Different flow control mechanisms use different buffer management policies.
* Capacity based flow control uses per-connection buffer policy and Loss based
* flow control uses shared buffer policy.
*
* Return NULL when no free buffer available.
*/
static ICBuffer *
getSndBuffer(MotionConn *conn)
{
ICBuffer *ret = NULL;
ic_statistics.totalBuffers += (icBufferListLength(&snd_buffer_pool.freeList) + snd_buffer_pool.maxCount - snd_buffer_pool.count);
ic_statistics.bufferCountingTime++;
/* Capacity based flow control does not use shared buffers */
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY)
{
Assert(icBufferListLength(&conn->unackQueue) + icBufferListLength(&conn->sndQueue) <= Gp_interconnect_snd_queue_depth);
if (icBufferListLength(&conn->unackQueue) + icBufferListLength(&conn->sndQueue) >= Gp_interconnect_snd_queue_depth)
return NULL;
}
if (icBufferListLength(&snd_buffer_pool.freeList) > 0)
{
return icBufferListPop(&snd_buffer_pool.freeList);
}
else
{
if (snd_buffer_pool.count < snd_buffer_pool.maxCount)
{
MemoryContext oldContext;
oldContext = MemoryContextSwitchTo(InterconnectContext);
ret = (ICBuffer *) palloc0(Gp_max_packet_size + sizeof(ICBuffer));
snd_buffer_pool.count++;
ret->conn = NULL;
ret->nRetry = 0;
icBufferListInitHeadLink(&ret->primary);
icBufferListInitHeadLink(&ret->secondary);
ret->unackQueueRingSlot = 0;
MemoryContextSwitchTo(oldContext);
}
else
{
return NULL;
}
}
return ret;
}
/*
* startOutgoingUDPConnections
* Used to initially kick-off any outgoing connections for mySlice.
*
* This should not be called for root slices (i.e. QD ones) since they don't
* ever have outgoing connections.
*
* PARAMETERS
*
* sendSlice - Slice that this process is a member of.
*
* RETURNS
* Initialized ChunkTransportState for the Sending Motion Node Id.
*/
static ChunkTransportStateEntry *
startOutgoingUDPConnections(ChunkTransportState *transportStates,
ExecSlice *sendSlice,
int *pOutgoingCount)
{
ChunkTransportStateEntry *pEntry;
MotionConn *conn;
ListCell *cell;
ExecSlice *recvSlice;
CdbProcess *cdbProc;
int i;
*pOutgoingCount = 0;
recvSlice = &transportStates->sliceTable->slices[sendSlice->parentIndex];
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect seg%d slice%d setting up sending motion node",
GpIdentity.segindex, sendSlice->sliceIndex);
pEntry = createChunkTransportState(transportStates,
sendSlice,
recvSlice,
list_length(recvSlice->primaryProcesses));
Assert(pEntry && pEntry->valid);
/*
* Setup a MotionConn entry for each of our outbound connections. Request
* a connection to each receiving backend's listening port. NB: Some
* mirrors could be down & have no CdbProcess entry.
*/
conn = pEntry->conns;
i = 0;
foreach(cell, recvSlice->primaryProcesses)
{
cdbProc = (CdbProcess *) lfirst(cell);
if (cdbProc)
{
conn->cdbProc = cdbProc;
icBufferListInit(&conn->sndQueue, ICBufferListType_Primary);
icBufferListInit(&conn->unackQueue, ICBufferListType_Primary);
conn->capacity = Gp_interconnect_queue_depth;
/* send buffer pool must be initialized before this. */
snd_buffer_pool.maxCount += Gp_interconnect_snd_queue_depth;
snd_control_info.cwnd += 1;
conn->curBuff = getSndBuffer(conn);
/* should have at least one buffer for each connection */
Assert(conn->curBuff != NULL);
conn->rtt = DEFAULT_RTT;
conn->dev = DEFAULT_DEV;
conn->deadlockCheckBeginTime = 0;
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
conn->sentSeq = 0;
conn->receivedAckSeq = 0;
conn->consumedSeq = 0;
conn->pBuff = (uint8 *) conn->curBuff->pkt;
conn->state = mcsSetupOutgoingConnection;
conn->route = i++;
(*pOutgoingCount)++;
}
conn++;
}
pEntry->txfd = ICSenderSocket;
pEntry->txport = ICSenderPort;
pEntry->txfd_family = ICSenderFamily;
return pEntry;
}
/*
* getSockAddr
* Convert IP addr and port to sockaddr
*/
static void
getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort)
{
int ret;
char portNumberStr[32];
char *service;
struct addrinfo *addrs = NULL;
struct addrinfo hint;
/*
* Get socketaddr to connect to.
*/
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_DGRAM; /* UDP */
hint.ai_family = AF_UNSPEC; /* Allow for any family (v4, v6, even unix in
* the future) */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name
* resolution */
#else
hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
#endif
snprintf(portNumberStr, sizeof(portNumberStr), "%d", listenerPort);
service = portNumberStr;
ret = pg_getaddrinfo_all(listenerAddr, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: Could not parse remote listener address: '%s' port '%d': %s",
listenerAddr, listenerPort, gai_strerror(ret)),
errdetail("getaddrinfo() unable to parse address: '%s'",
listenerAddr)));
}
/*
* Since we aren't using name resolution, getaddrinfo will return only 1
* entry
*/
elog(DEBUG1, "GetSockAddr socket ai_family %d ai_socktype %d ai_protocol %d for %s ", addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol, listenerAddr);
memset(peer, 0, sizeof(struct sockaddr_storage));
memcpy(peer, addrs->ai_addr, addrs->ai_addrlen);
*peer_len = addrs->ai_addrlen;
pg_freeaddrinfo_all(addrs->ai_family, addrs);
}
/*
* setupOutgoingUDPConnection
* Setup outgoing UDP connection.
*/
void
setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
CdbProcess *cdbProc = conn->cdbProc;
SliceTable *sliceTbl = transportStates->sliceTable;
Assert(conn->state == mcsSetupOutgoingConnection);
Assert(conn->cdbProc);
conn->wakeup_ms = 0;
conn->remoteContentId = cdbProc->contentid;
conn->stat_min_ack_time = ~((uint64) 0);
/* Save the information for the error message if getaddrinfo fails */
if (strchr(cdbProc->listenerAddr, ':') != 0)
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"[%s]:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
else
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"%s:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
/*
* Get socketaddr to connect to.
*/
getSockAddr(&conn->peer, &conn->peer_len, cdbProc->listenerAddr, cdbProc->listenerPort);
/* Save the destination IP address */
format_sockaddr(&conn->peer, conn->remoteHostAndPort,
sizeof(conn->remoteHostAndPort));
Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6);
{
#ifdef USE_ASSERT_CHECKING
{
struct sockaddr_storage source_addr;
socklen_t source_addr_len;
MemSet(&source_addr, 0, sizeof(source_addr));
source_addr_len = sizeof(source_addr);
if (getsockname(pEntry->txfd, (struct sockaddr *) &source_addr, &source_addr_len) == -1)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect Error: Could not get port from socket"),
errdetail("%m")));
}
Assert(pEntry->txfd_family == source_addr.ss_family);
}
#endif
/*
* If the socket was created with a different address family than the
* place we are sending to, we might need to do something special.
*/
if (pEntry->txfd_family != conn->peer.ss_family)
{
/*
* If the socket was created AF_INET6, but the address we want to
* send to is IPv4 (AF_INET), we might need to change the address
* format. On Linux, it isn't necessary: glibc automatically
* handles this. But on MAC OSX and Solaris, we need to convert
* the IPv4 address to an V4-MAPPED address in AF_INET6 format.
*/
if (pEntry->txfd_family == AF_INET6)
{
struct sockaddr_storage temp;
const struct sockaddr_in *in = (const struct sockaddr_in *) &conn->peer;
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp;
memset(&temp, 0, sizeof(temp));
elog(DEBUG1, "We are inet6, remote is inet. Converting to v4 mapped address.");
/* Construct a V4-to-6 mapped address. */
temp.ss_family = AF_INET6;
in6_new->sin6_family = AF_INET6;
in6_new->sin6_port = in->sin_port;
in6_new->sin6_flowinfo = 0;
memset(&in6_new->sin6_addr, '\0', sizeof(in6_new->sin6_addr));
/* in6_new->sin6_addr.s6_addr16[5] = 0xffff; */
((uint16 *) &in6_new->sin6_addr)[5] = 0xffff;
/* in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; */
memcpy(((char *) &in6_new->sin6_addr) + 12, &(in->sin_addr), 4);
in6_new->sin6_scope_id = 0;
/* copy it back */
memcpy(&conn->peer, &temp, sizeof(struct sockaddr_in6));
conn->peer_len = sizeof(struct sockaddr_in6);
}
else
{
/*
* If we get here, something is really wrong. We created the
* socket as IPv4-only (AF_INET), but the address we are
* trying to send to is IPv6. It's possible we could have a
* V4-mapped address that we could convert to an IPv4 address,
* but there is currently no code path where that could
* happen. So this must be an error.
*/
elog(ERROR, "Trying to use an IPv4 (AF_INET) socket to send to an IPv6 address");
}
}
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("Interconnect connecting to seg%d slice%d %s "
"pid=%d sockfd=%d",
conn->remoteContentId,
pEntry->recvSlice->sliceIndex,
conn->remoteHostAndPort,
conn->cdbProc->pid,
conn->sockfd)));
/* send connection request */
MemSet(&conn->conn_info, 0, sizeof(conn->conn_info));
conn->conn_info.len = 0;
conn->conn_info.flags = 0;
conn->conn_info.motNodeId = pEntry->motNodeId;
conn->conn_info.recvSliceIndex = pEntry->recvSlice->sliceIndex;
conn->conn_info.sendSliceIndex = pEntry->sendSlice->sliceIndex;
conn->conn_info.srcContentId = GpIdentity.segindex;
conn->conn_info.dstContentId = conn->cdbProc->contentid;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "setupOutgoingUDPConnection: node %d route %d srccontent %d dstcontent %d: %s",
pEntry->motNodeId, conn->route, GpIdentity.segindex, conn->cdbProc->contentid, conn->remoteHostAndPort);
conn->conn_info.srcListenerPort = (Gp_listener_port >> 16) & 0x0ffff;
conn->conn_info.srcPid = MyProcPid;
conn->conn_info.dstPid = conn->cdbProc->pid;
conn->conn_info.dstListenerPort = conn->cdbProc->listenerPort;
conn->conn_info.sessionId = gp_session_id;
conn->conn_info.icId = sliceTbl->ic_instance_id;
connAddHash(&ic_control_info.connHtab, conn);
/*
* No need to get the connection lock here, since background rx thread
* will never access send connections.
*/
conn->msgPos = NULL;
conn->msgSize = sizeof(conn->conn_info);
conn->stillActive = true;
conn->conn_info.seq = 1;
Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6);
} /* setupOutgoingUDPConnection */
/*
* handleCachedPackets
* Deal with cached packets.
*/
static void
handleCachedPackets(void)
{
MotionConn *cachedConn = NULL;
MotionConn *setupConn = NULL;
ConnHtabBin *bin = NULL;
icpkthdr *pkt = NULL;
AckSendParam param;
int i = 0;
int j = 0;
bool dummy;
for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
{
bin = ic_control_info.startupCacheHtab.table[i];
while (bin)
{
cachedConn = bin->conn,
setupConn = NULL;
for (j = 0; j < cachedConn->pkt_q_size; j++)
{
pkt = (icpkthdr *) cachedConn->pkt_q[j];
if (pkt == NULL)
continue;
rx_buffer_pool.maxCount--;
/* look up this pkt's connection in connHtab */
setupConn = findConnByHeader(&ic_control_info.connHtab, pkt);
if (setupConn == NULL)
{
/* mismatch! */
putRxBufferToFreeList(&rx_buffer_pool, pkt);
cachedConn->pkt_q[j] = NULL;
continue;
}
memset(¶m, 0, sizeof(param));
if (!handleDataPacket(setupConn, pkt, &cachedConn->peer, &cachedConn->peer_len, ¶m, &dummy))
{
/* no need to cache this packet */
putRxBufferToFreeList(&rx_buffer_pool, pkt);
}
ic_statistics.recvPktNum++;
if (param.msg.len != 0)
sendAckWithParam(¶m);
cachedConn->pkt_q[j] = NULL;
}
bin = bin->next;
connDelHash(&ic_control_info.startupCacheHtab, cachedConn);
/*
* MPP-19981 free the cached connections; otherwise memory leak
* would be introduced.
*/
free(cachedConn->pkt_q);
free(cachedConn);
}
}
}
/*
* SetupUDPIFCInterconnect_Internal
* Internal function for setting up UDP interconnect.
*/
static ChunkTransportState *
SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
{
int i,
n;
ListCell *cell;
ExecSlice *mySlice;
ExecSlice *aSlice;
MotionConn *conn = NULL;
int incoming_count = 0;
int outgoing_count = 0;
int expectedTotalIncoming = 0;
int expectedTotalOutgoing = 0;
ChunkTransportStateEntry *sendingChunkTransportState = NULL;
ChunkTransportState *interconnect_context;
pthread_mutex_lock(&ic_control_info.lock);
Assert(sliceTable->ic_instance_id > 0);
if (Gp_role == GP_ROLE_DISPATCH)
{
Assert(gp_interconnect_id == sliceTable->ic_instance_id);
/*
* QD use cursorHistoryTable to handle mismatch packets, no
* need to update ic_control_info.ic_instance_id
*/
}
else
{
/*
* update ic_control_info.ic_instance_id, it is mainly used
* by rx thread to handle mismatch packets
*/
ic_control_info.ic_instance_id = sliceTable->ic_instance_id;
}
interconnect_context = palloc0(sizeof(ChunkTransportState));
/* initialize state variables */
Assert(interconnect_context->size == 0);
interconnect_context->size = CTS_INITIAL_SIZE;
interconnect_context->states = palloc0(CTS_INITIAL_SIZE * sizeof(ChunkTransportStateEntry));
interconnect_context->networkTimeoutIsLogged = false;
interconnect_context->teardownActive = false;
interconnect_context->activated = false;
interconnect_context->incompleteConns = NIL;
interconnect_context->sliceTable = copyObject(sliceTable);
interconnect_context->sliceId = sliceTable->localSlice;
interconnect_context->RecvTupleChunkFrom = RecvTupleChunkFromUDPIFC;
interconnect_context->RecvTupleChunkFromAny = RecvTupleChunkFromAnyUDPIFC;
interconnect_context->SendEos = SendEosUDPIFC;
interconnect_context->SendChunk = SendChunkUDPIFC;
interconnect_context->doSendStopMessage = doSendStopMessageUDPIFC;
mySlice = &interconnect_context->sliceTable->slices[sliceTable->localSlice];
Assert(mySlice &&
mySlice->sliceIndex == sliceTable->localSlice);
#ifdef USE_ASSERT_CHECKING
set_test_mode();
#endif
if (Gp_role == GP_ROLE_DISPATCH)
{
/*
* Prune the history table if it is too large
*
* We only keep history of constant length so that
* - The history table takes only constant amount of memory.
* - It is long enough so that it is almost impossible to receive
* packets from an IC instance that is older than the first one
* in the history.
*/
if (rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE))
{
uint32 prune_id = sliceTable->ic_instance_id - CURSOR_IC_TABLE_SIZE;
/*
* Only prune if we didn't underflow -- also we want the prune id
* to be newer than the limit (hysteresis)
*/
if (prune_id < sliceTable->ic_instance_id)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, sliceTable->ic_instance_id);
pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, prune_id);
}
}
addCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id, gp_command_count);
}
/* now we'll do some setup for each of our Receiving Motion Nodes. */
foreach(cell, mySlice->children)
{
int numProcs;
int childId = lfirst_int(cell);
ChunkTransportStateEntry *pEntry = NULL;
aSlice = &interconnect_context->sliceTable->slices[childId];
numProcs = list_length(aSlice->primaryProcesses);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Setup recving connections: my slice %d, childId %d",
mySlice->sliceIndex, childId);
pEntry = createChunkTransportState(interconnect_context, aSlice, mySlice, numProcs);
Assert(pEntry);
Assert(pEntry->valid);
for (i = 0; i < pEntry->numConns; i++)
{
conn = &pEntry->conns[i];
conn->cdbProc = list_nth(aSlice->primaryProcesses, i);
if (conn->cdbProc)
{
expectedTotalIncoming++;
/* rx_buffer_queue */
conn->pkt_q_capacity = Gp_interconnect_queue_depth;
conn->pkt_q_size = 0;
conn->pkt_q_head = 0;
conn->pkt_q_tail = 0;
SIMPLE_FAULT_INJECTOR("interconnect_setup_palloc");
conn->pkt_q = (uint8 **) palloc0(conn->pkt_q_capacity * sizeof(uint8 *));
/* update the max buffer count of our rx buffer pool. */
rx_buffer_pool.maxCount += conn->pkt_q_capacity;
/*
* connection header info (defining characteristics of this
* connection)
*/
MemSet(&conn->conn_info, 0, sizeof(conn->conn_info));
conn->route = i;
conn->conn_info.seq = 1;
conn->stillActive = true;
conn->remapper = CreateTupleRemapper();
incoming_count++;
conn->conn_info.motNodeId = pEntry->motNodeId;
conn->conn_info.recvSliceIndex = mySlice->sliceIndex;
conn->conn_info.sendSliceIndex = aSlice->sliceIndex;
conn->conn_info.srcContentId = conn->cdbProc->contentid;
conn->conn_info.dstContentId = GpIdentity.segindex;
conn->conn_info.srcListenerPort = conn->cdbProc->listenerPort;
conn->conn_info.srcPid = conn->cdbProc->pid;
conn->conn_info.dstPid = MyProcPid;
conn->conn_info.dstListenerPort = (Gp_listener_port >> 16) & 0x0ffff;
conn->conn_info.sessionId = gp_session_id;
conn->conn_info.icId = sliceTable->ic_instance_id;
conn->conn_info.flags = UDPIC_FLAGS_RECEIVER_TO_SENDER;
connAddHash(&ic_control_info.connHtab, conn);
}
}
}
snd_control_info.cwnd = 0;
snd_control_info.minCwnd = 0;
snd_control_info.ssthresh = 0;
/* Initiate outgoing connections. */
if (mySlice->parentIndex != -1)
{
initSndBufferPool(&snd_buffer_pool);
initUnackQueueRing(&unack_queue_ring);
ic_control_info.isSender = true;
ic_control_info.lastExpirationCheckTime = getCurrentTime();
ic_control_info.lastPacketSendTime = ic_control_info.lastExpirationCheckTime;
ic_control_info.lastDeadlockCheckTime = ic_control_info.lastExpirationCheckTime;
sendingChunkTransportState = startOutgoingUDPConnections(interconnect_context, mySlice, &expectedTotalOutgoing);
n = sendingChunkTransportState->numConns;
for (i = 0; i < n; i++)
{ /* loop to set up outgoing connections */
conn = &sendingChunkTransportState->conns[i];
if (conn->cdbProc)
{
setupOutgoingUDPConnection(interconnect_context, sendingChunkTransportState, conn);
outgoing_count++;
}
}
snd_control_info.minCwnd = snd_control_info.cwnd;
snd_control_info.ssthresh = snd_buffer_pool.maxCount;
#ifdef TRANSFER_PROTOCOL_STATS
initTransProtoStats();
#endif
}
else
{
ic_control_info.isSender = false;
ic_control_info.lastExpirationCheckTime = 0;
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("SetupUDPInterconnect will activate "
"%d incoming, %d outgoing routes for ic_instancce_id %d. "
"Listening on ports=%d/%d sockfd=%d.",
expectedTotalIncoming, expectedTotalOutgoing, sliceTable->ic_instance_id,
Gp_listener_port & 0x0ffff, (Gp_listener_port >> 16) & 0x0ffff, UDP_listenerFd)));
/*
* If there are packets cached by background thread, add them to the
* connections.
*/
if (gp_interconnect_cache_future_packets)
handleCachedPackets();
interconnect_context->activated = true;
pthread_mutex_unlock(&ic_control_info.lock);
return interconnect_context;
}
/*
* SetupUDPIFCInterconnect
* setup UDP interconnect.
*/
void
SetupUDPIFCInterconnect(EState *estate)
{
ChunkTransportState *icContext = NULL;
PG_TRY();
{
/*
* The rx-thread might have set an error since last teardown,
* technically it is not part of current query, discard it directly.
*/
resetRxThreadError();
icContext = SetupUDPIFCInterconnect_Internal(estate->es_sliceTable);
/* Internal error if we locked the mutex but forgot to unlock it. */
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
/*
* Remove connections from hash table to avoid packet handling in the
* rx pthread, else the packet handling code could use memory whose
* context (InterconnectContext) would be soon reset - that could
* panic the process.
*/
ConnHashTable *ht = &ic_control_info.connHtab;
for (int i = 0; i < ht->size; i++)
{
struct ConnHtabBin *trash;
MotionConn *conn;
trash = ht->table[i];
while (trash != NULL)
{
conn = trash->conn;
/* Get trash at first as trash will be pfree-ed in connDelHash. */
trash = trash->next;
connDelHash(ht, conn);
}
}
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
icContext->estate = estate;
estate->interconnect_context = icContext;
estate->es_interconnect_is_setup = true;
/* Check if any of the QEs has already finished with error */
if (Gp_role == GP_ROLE_DISPATCH)
checkForCancelFromQD(icContext);
}
/*
* freeDisorderedPackets
* Put the disordered packets into free buffer list.
*/
static void
freeDisorderedPackets(MotionConn *conn)
{
int k;
if (conn->pkt_q == NULL)
return;
for (k = 0; k < conn->pkt_q_capacity; k++)
{
icpkthdr *buf = (icpkthdr *) conn->pkt_q[k];
if (buf != NULL)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "CLEAR Out-of-order PKT: conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, buf->seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
/* return the buffer into the free list. */
putRxBufferToFreeList(&rx_buffer_pool, buf);
conn->pkt_q[k] = NULL;
}
}
}
/*
* chunkTransportStateEntryInitialized
* Check whether the transport state entry is initialized.
*/
static bool
chunkTransportStateEntryInitialized(ChunkTransportState *transportStates,
int16 motNodeID)
{
if (motNodeID > transportStates->size || !transportStates->states[motNodeID - 1].valid)
return false;
return true;
}
/*
* computeNetworkStatistics
* Compute the max/min/avg network statistics.
*/
static inline void
computeNetworkStatistics(uint64 value, uint64 *min, uint64 *max, double *sum)
{
if (value >= *max)
*max = value;
if (value <= *min)
*min = value;
*sum += value;
}
/*
* TeardownUDPIFCInterconnect_Internal
* Helper function for TeardownUDPIFCInterconnect.
*
* Developers should pay attention to:
*
* 1) Do not handle interrupts/throw errors in Teardown, otherwise, Teardown may be called twice.
* It will introduce an undefined behavior. And memory leaks will be introduced.
*
* 2) Be careful about adding elog/ereport/write_log in Teardown function,
* esp, out of HOLD_INTERRUPTS/RESUME_INTERRUPTS pair, since elog/ereport/write_log may
* handle interrupts.
*
*/
static void
TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
bool hasErrors)
{
ChunkTransportStateEntry *pEntry = NULL;
int i;
ExecSlice *mySlice;
MotionConn *conn;
uint64 maxRtt = 0;
double avgRtt = 0;
uint64 minRtt = ~((uint64) 0);
uint64 maxDev = 0;
double avgDev = 0;
uint64 minDev = ~((uint64) 0);
bool isReceiver = false;
if (transportStates == NULL || transportStates->sliceTable == NULL)
{
elog(LOG, "TeardownUDPIFCInterconnect: missing slice table.");
return;
}
if (!transportStates->states)
{
elog(LOG, "TeardownUDPIFCInterconnect: missing states.");
return;
}
mySlice = &transportStates->sliceTable->slices[transportStates->sliceId];
HOLD_INTERRUPTS();
/* Log the start of TeardownInterconnect. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
{
int elevel = 0;
if (hasErrors || !transportStates->activated)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = LOG;
else
elevel = DEBUG1;
}
else if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = DEBUG4;
if (elevel)
ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: "
"%s; setup was %s",
GpIdentity.segindex, mySlice->sliceIndex,
hasErrors ? "hasErrors" : "normal",
transportStates->activated ? "completed" : "exited")));
/* if setup did not complete, log the slicetable */
if (!transportStates->activated &&
gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog_node_display(DEBUG3, "local slice table", transportStates->sliceTable, true);
}
/*
* add lock to protect the hash table, since background thread is still
* working.
*/
pthread_mutex_lock(&ic_control_info.lock);
if (gp_interconnect_cache_future_packets)
cleanupStartupCache();
/*
* Now "normal" connections which made it through our peer-registration
* step. With these we have to worry about "in-flight" data.
*/
if (mySlice->parentIndex != -1)
{
ExecSlice *parentSlice;
parentSlice = &transportStates->sliceTable->slices[mySlice->parentIndex];
/* cleanup a Sending motion node. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect seg%d slice%d closing connections to slice%d (%d peers)",
GpIdentity.segindex, mySlice->sliceIndex, mySlice->parentIndex,
list_length(parentSlice->primaryProcesses));
/*
* In the olden days, we required that the error case successfully
* transmit and end-of-stream message here. But the introduction of
* cdbdisp_check_estate_for_cancel() alleviates for the QD case, and
* the cross-connection of writer gangs in the dispatcher (propagation
* of cancel between them) fixes the I-S case.
*
* So the call to forceEosToPeers() is no longer required.
*/
if (chunkTransportStateEntryInitialized(transportStates, mySlice->sliceIndex))
{
/* now it is safe to remove. */
pEntry = removeChunkTransportState(transportStates, mySlice->sliceIndex);
/* connection array allocation may fail in interconnect setup. */
if (pEntry->conns)
{
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->cdbProc == NULL)
continue;
/* compute some statistics */
computeNetworkStatistics(conn->rtt, &minRtt, &maxRtt, &avgRtt);
computeNetworkStatistics(conn->dev, &minDev, &maxDev, &avgDev);
icBufferListReturn(&conn->sndQueue, false);
icBufferListReturn(&conn->unackQueue, Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY ? false : true);
connDelHash(&ic_control_info.connHtab, conn);
}
avgRtt = avgRtt / pEntry->numConns;
avgDev = avgDev / pEntry->numConns;
/* free all send side buffers */
cleanSndBufferPool(&snd_buffer_pool);
}
}
#ifdef TRANSFER_PROTOCOL_STATS
dumpTransProtoStats();
#endif
}
/*
* Previously, there is a piece of code that deals with pending stops. Now
* it is delegated to background rx thread which will deal with any
* mismatched packets.
*/
/*
* cleanup all of our Receiving Motion nodes, these get closed immediately
* (the receiver know for real if they want to shut down -- they aren't
* going to be processing any more data).
*/
ListCell *cell;
foreach(cell, mySlice->children)
{
ExecSlice *aSlice;
int childId = lfirst_int(cell);
aSlice = &transportStates->sliceTable->slices[childId];
/*
* First check whether the entry is initialized to avoid the potential
* errors thrown out from the removeChunkTransportState, which may
* introduce some memory leaks.
*/
if (chunkTransportStateEntryInitialized(transportStates, aSlice->sliceIndex))
{
/* remove it */
pEntry = removeChunkTransportState(transportStates, aSlice->sliceIndex);
Assert(pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect closing connections from slice%d",
aSlice->sliceIndex);
isReceiver = true;
if (pEntry->conns)
{
/*
* receivers know that they no longer care about data from
* below ... so we can safely discard data queued in both
* directions
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->cdbProc == NULL)
continue;
/* out of memory has occurred, break out */
if (!conn->pkt_q)
break;
rx_buffer_pool.maxCount -= conn->pkt_q_capacity;
connDelHash(&ic_control_info.connHtab, conn);
/*
* putRxBufferAndSendAck() dequeues messages and moves
* them to pBuff
*/
while (conn->pkt_q_size > 0)
{
putRxBufferAndSendAck(conn, NULL);
}
/* we also need to clear all the out-of-order packets */
freeDisorderedPackets(conn);
/* free up the packet queue */
pfree(conn->pkt_q);
conn->pkt_q = NULL;
/* free up the tuple remapper */
if (conn->remapper)
DestroyTupleRemapper(conn->remapper);
}
pfree(pEntry->conns);
pEntry->conns = NULL;
}
}
}
/*
* now that we've moved active rx-buffers to the freelist, we can prune
* the freelist itself
*/
while (rx_buffer_pool.count > rx_buffer_pool.maxCount)
{
icpkthdr *buf = NULL;
/* If this happened, there are some memory leaks.. */
if (rx_buffer_pool.freeList == NULL)
{
pthread_mutex_unlock(&ic_control_info.lock);
elog(FATAL, "freelist NULL: count %d max %d buf %p", rx_buffer_pool.count, rx_buffer_pool.maxCount, rx_buffer_pool.freeList);
}
buf = getRxBufferFromFreeList(&rx_buffer_pool);
freeRxBuffer(&rx_buffer_pool, buf);
}
/*
* Update the history of interconnect instance id.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
updateCursorIcEntry(&rx_control_info.cursorHistoryTable, transportStates->sliceTable->ic_instance_id, 0);
}
else if (Gp_role == GP_ROLE_EXECUTE)
{
rx_control_info.lastTornIcId = transportStates->sliceTable->ic_instance_id;
}
elog((gp_interconnect_log_stats ? LOG : DEBUG1), "Interconnect State: "
"isSender %d isReceiver %d "
"snd_queue_depth %d recv_queue_depth %d Gp_max_packet_size %d "
"UNACK_QUEUE_RING_SLOTS_NUM %d TIMER_SPAN %lld DEFAULT_RTT %d "
"hasErrors %d, ic_instance_id %d ic_id_last_teardown %d "
"snd_buffer_pool.count %d snd_buffer_pool.maxCount %d snd_sock_bufsize %d recv_sock_bufsize %d "
"snd_pkt_count %d retransmits %d crc_errors %d"
" recv_pkt_count %d recv_ack_num %d"
" recv_queue_size_avg %f"
" capacity_avg %f"
" freebuf_avg %f "
"mismatch_pkt_num %d disordered_pkt_num %d duplicated_pkt_num %d"
" rtt/dev [" UINT64_FORMAT "/" UINT64_FORMAT ", %f/%f, " UINT64_FORMAT "/" UINT64_FORMAT "] "
" cwnd %f status_query_msg_num %d",
ic_control_info.isSender, isReceiver,
Gp_interconnect_snd_queue_depth, Gp_interconnect_queue_depth, Gp_max_packet_size,
UNACK_QUEUE_RING_SLOTS_NUM, TIMER_SPAN, DEFAULT_RTT,
hasErrors, transportStates->sliceTable->ic_instance_id, rx_control_info.lastTornIcId,
snd_buffer_pool.count, snd_buffer_pool.maxCount, ic_control_info.socketSendBufferSize, ic_control_info.socketRecvBufferSize,
ic_statistics.sndPktNum, ic_statistics.retransmits, ic_statistics.crcErrors,
ic_statistics.recvPktNum, ic_statistics.recvAckNum,
(double) ((double) ic_statistics.totalRecvQueueSize) / ((double) ic_statistics.recvQueueSizeCountingTime),
(double) ((double) ic_statistics.totalCapacity) / ((double) ic_statistics.capacityCountingTime),
(double) ((double) ic_statistics.totalBuffers) / ((double) ic_statistics.bufferCountingTime),
ic_statistics.mismatchNum, ic_statistics.disorderedPktNum, ic_statistics.duplicatedPktNum,
(minRtt == ~((uint64) 0) ? 0 : minRtt), (minDev == ~((uint64) 0) ? 0 : minDev), avgRtt, avgDev, maxRtt, maxDev,
snd_control_info.cwnd, ic_statistics.statusQueryMsgNum);
ic_control_info.isSender = false;
memset(&ic_statistics, 0, sizeof(ICStatistics));
pthread_mutex_unlock(&ic_control_info.lock);
/* reset the rx thread network error flag */
resetRxThreadError();
transportStates->activated = false;
transportStates->sliceTable = NULL;
if (transportStates != NULL)
{
if (transportStates->states != NULL)
{
pfree(transportStates->states);
transportStates->states = NULL;
}
pfree(transportStates);
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
elog(DEBUG1, "TeardownUDPIFCInterconnect successful");
RESUME_INTERRUPTS();
}
/*
* TeardownUDPIFCInterconnect
* Tear down UDP interconnect.
*
* This function is called to release the resources used by interconnect.
*/
void
TeardownUDPIFCInterconnect(ChunkTransportState *transportStates,
bool hasErrors)
{
PG_TRY();
{
TeardownUDPIFCInterconnect_Internal(transportStates, hasErrors);
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* prepareRxConnForRead
* Prepare the receive connection for reading.
*
* MUST BE CALLED WITH ic_control_info.lock LOCKED.
*/
static void
prepareRxConnForRead(MotionConn *conn)
{
elog(DEBUG3, "In prepareRxConnForRead: conn %p, q_head %d q_tail %d q_size %d", conn, conn->pkt_q_head, conn->pkt_q_tail, conn->pkt_q_size);
Assert(conn->pkt_q[conn->pkt_q_head] != NULL);
conn->pBuff = conn->pkt_q[conn->pkt_q_head];
conn->msgPos = conn->pBuff;
conn->msgSize = ((icpkthdr *) conn->pBuff)->len;
conn->recvBytes = conn->msgSize;
}
/*
* receiveChunksUDPIFC
* Receive chunks from the senders
*
* MUST BE CALLED WITH ic_control_info.lock LOCKED.
*/
static TupleChunkListItem
receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
int16 motNodeID, int16 *srcRoute, MotionConn *conn)
{
int nFds = 0;
int *waitFds = NULL;
int nevent = 0;
WaitEventSet *waitset = NULL;
TupleChunkListItem tcItem = NULL;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "receivechunksUDP: motnodeid %d", motNodeID);
#endif
Assert(pTransportStates);
Assert(pTransportStates->sliceTable);
if (conn != NULL)
{
*srcRoute = conn->route;
setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, conn->route,
pTransportStates->sliceTable->ic_instance_id);
}
else
{
/* non-directed receive */
setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, ANY_ROUTE,
pTransportStates->sliceTable->ic_instance_id);
}
nevent = 2; /* nevent = waited fds number + 2 (latch and postmaster) */
if (Gp_role == GP_ROLE_DISPATCH)
{
/* get all wait sock fds */
waitFds = cdbdisp_getWaitSocketFds(pTransportStates->estate->dispatcherState, &nFds);
if (waitFds != NULL)
nevent += nFds;
}
/* init WaitEventSet */
waitset = CreateWaitEventSet(CurrentMemoryContext, nevent);
/*
* Use PG_TRY() - PG_CATCH() to make sure destroy the waiteventset (close the epoll fd)
* The main receive logic is in receiveChunksUDPIFCLoop()
*/
PG_TRY();
{
AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET, &ic_control_info.latch, NULL);
AddWaitEventToSet(waitset, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
for (int i = 0; i < nFds; i++)
{
AddWaitEventToSet(waitset, WL_SOCKET_READABLE, waitFds[i], NULL, NULL);
}
tcItem = receiveChunksUDPIFCLoop(pTransportStates, pEntry, srcRoute, conn, waitset, nevent);
}
PG_CATCH();
{
FreeWaitEventSet(waitset);
if (waitFds != NULL)
pfree(waitFds);
PG_RE_THROW();
}
PG_END_TRY();
FreeWaitEventSet(waitset);
if (waitFds != NULL)
pfree(waitFds);
return tcItem;
}
static TupleChunkListItem
receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent)
{
TupleChunkListItem tcItem = NULL;
MotionConn *rxconn = NULL;
int retries = 0;
bool directed = false;
WaitEvent *rEvents = NULL;
if (conn != NULL)
directed = true;
rEvents = palloc(nevent * sizeof(WaitEvent)); /* returned events */
/* we didn't have any data, so we've got to read it from the network. */
for (;;)
{
/* 1. Do we have data ready */
if (rx_control_info.mainWaitingState.reachRoute != ANY_ROUTE)
{
rxconn = pEntry->conns + rx_control_info.mainWaitingState.reachRoute;
prepareRxConnForRead(rxconn);
elog(DEBUG2, "receiveChunksUDPIFC: non-directed rx woke on route %d", rx_control_info.mainWaitingState.reachRoute);
resetMainThreadWaiting(&rx_control_info.mainWaitingState);
}
aggregateStatistics(pEntry);
if (rxconn != NULL)
{
Assert(rxconn->pBuff);
pthread_mutex_unlock(&ic_control_info.lock);
elog(DEBUG2, "got data with length %d", rxconn->recvBytes);
/* successfully read into this connection's buffer. */
tcItem = RecvTupleChunk(rxconn, pTransportStates);
if (!directed)
*srcRoute = rxconn->route;
pfree(rEvents);
return tcItem;
}
retries++;
/*
* Ok, we've processed all the items currently in the queue. Arm the
* latch (before releasing the mutex), and wait for more messages to
* arrive. The RX thread will wake us up using the latch.
*/
ResetLatch(&ic_control_info.latch);
pthread_mutex_unlock(&ic_control_info.lock);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
elog(DEBUG5, "waiting (timed) on route %d %s", rx_control_info.mainWaitingState.waitingRoute,
(rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE ? "(any route)" : ""));
}
/*
* Wait for data to become ready.
*
* In the QD, also wake up immediately if any QE reports an
* error through the main QD-QE libpq connection. For that, ask
* the dispatcher for a file descriptor to wait on for that.
*/
int rc = WaitEventSetWait(waitset, MAIN_THREAD_COND_TIMEOUT_MS, rEvents, nevent, WAIT_EVENT_INTERCONNECT);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG && rc == 0)
elog(DEBUG2, "receiveChunksUDPIFC(): WaitEventSetWait timeout after %d ms", MAIN_THREAD_COND_TIMEOUT_MS);
/* check the potential errors in rx thread. */
checkRxThreadError();
/* do not check interrupts when holding the lock */
ML_CHECK_FOR_INTERRUPTS(pTransportStates->teardownActive);
/*
* check to see if the dispatcher should cancel
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
for (int i = 0; i < rc; i++)
if (rEvents[i].events & WL_SOCKET_READABLE)
{
/* event happened on wait fds, need to check cancel */
checkForCancelFromQD(pTransportStates);
break;
}
}
/*
* 1. NIC on master (and thus the QD connection) may become bad, check
* it. 2. Postmaster may become invalid, check it
*/
if ((retries & 0x3f) == 0)
{
checkQDConnectionAlive();
if (!PostmasterIsAlive())
ereport(FATAL,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect failed to recv chunks"),
errdetail("Postmaster is not alive.")));
}
pthread_mutex_lock(&ic_control_info.lock);
} /* for (;;) */
/* We either got data, or get cancelled. We never make it out to here. */
return NULL; /* make GCC behave */
}
/*
* RecvTupleChunkFromAnyUDPIFC_Internal
* Receive tuple chunks from any route (connections)
*/
static inline TupleChunkListItem
RecvTupleChunkFromAnyUDPIFC_Internal(ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
int i,
index,
activeCount = 0;
TupleChunkListItem tcItem = NULL;
bool found = false;
if (!transportStates)
{
elog(FATAL, "RecvTupleChunkFromAnyUDPIFC: missing context");
}
else if (!transportStates->activated)
{
elog(FATAL, "RecvTupleChunkFromAnyUDPIFC: interconnect context not active!");
}
getChunkTransportState(transportStates, motNodeID, &pEntry);
index = pEntry->scanStart;
pthread_mutex_lock(&ic_control_info.lock);
for (i = 0; i < pEntry->numConns; i++, index++)
{
if (index >= pEntry->numConns)
index = 0;
conn = pEntry->conns + index;
if (conn->stillActive)
activeCount++;
ic_statistics.totalRecvQueueSize += conn->pkt_q_size;
ic_statistics.recvQueueSizeCountingTime++;
if (conn->pkt_q_size > 0)
{
found = true;
prepareRxConnForRead(conn);
break;
}
}
if (found)
{
pthread_mutex_unlock(&ic_control_info.lock);
tcItem = RecvTupleChunk(conn, transportStates);
*srcRoute = conn->route;
pEntry->scanStart = index + 1;
return tcItem;
}
/* no data pending in our queue */
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "RecvTupleChunkFromAnyUDPIFC(): activeCount is %d", activeCount);
#endif
if (activeCount == 0)
{
pthread_mutex_unlock(&ic_control_info.lock);
return NULL;
}
/* receiveChunksUDPIFC() releases ic_control_info.lock as a side-effect */
tcItem = receiveChunksUDPIFC(transportStates, pEntry, motNodeID, srcRoute, NULL);
pEntry->scanStart = *srcRoute + 1;
return tcItem;
}
/*
* RecvTupleChunkFromAnyUDPIFC
* Receive tuple chunks from any route (connections)
*/
static TupleChunkListItem
RecvTupleChunkFromAnyUDPIFC(ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute)
{
TupleChunkListItem icItem = NULL;
PG_TRY();
{
icItem = RecvTupleChunkFromAnyUDPIFC_Internal(transportStates, motNodeID, srcRoute);
/* error if mutex still held (debug build only) */
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
return icItem;
}
/*
* RecvTupleChunkFromUDPIFC_Internal
* Receive tuple chunks from a specific route (connection)
*/
static inline TupleChunkListItem
RecvTupleChunkFromUDPIFC_Internal(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
int16 route;
if (!transportStates)
{
elog(FATAL, "RecvTupleChunkFromUDPIFC: missing context");
}
else if (!transportStates->activated)
{
elog(FATAL, "RecvTupleChunkFromUDPIFC: interconnect context not active!");
}
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "RecvTupleChunkFromUDPIFC().");
#endif
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromUDPIFC(motNodID=%d, srcRoute=%d)", motNodeID, srcRoute);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + srcRoute;
#ifdef AMS_VERBOSE_LOGGING
if (!conn->stillActive)
{
elog(LOG, "RecvTupleChunkFromUDPIFC(): connection inactive ?!");
}
#endif
pthread_mutex_lock(&ic_control_info.lock);
if (!conn->stillActive)
{
pthread_mutex_unlock(&ic_control_info.lock);
return NULL;
}
ic_statistics.totalRecvQueueSize += conn->pkt_q_size;
ic_statistics.recvQueueSizeCountingTime++;
if (conn->pkt_q[conn->pkt_q_head] != NULL)
{
prepareRxConnForRead(conn);
pthread_mutex_unlock(&ic_control_info.lock);
TupleChunkListItem tcItem = NULL;
tcItem = RecvTupleChunk(conn, transportStates);
return tcItem;
}
/* no existing data, we've got to read a packet */
/* receiveChunksUDPIFC() releases ic_control_info.lock as a side-effect */
TupleChunkListItem chunks = receiveChunksUDPIFC(transportStates, pEntry, motNodeID, &route, conn);
return chunks;
}
/*
* RecvTupleChunkFromUDPIFC
* Receive tuple chunks from a specific route (connection)
*/
static TupleChunkListItem
RecvTupleChunkFromUDPIFC(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute)
{
TupleChunkListItem icItem = NULL;
PG_TRY();
{
icItem = RecvTupleChunkFromUDPIFC_Internal(transportStates, motNodeID, srcRoute);
/* error if mutex still held (debug build only) */
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
return icItem;
}
/*
* markUDPConnInactiveIFC
* Mark the connection inactive.
*/
void
markUDPConnInactiveIFC(MotionConn *conn)
{
pthread_mutex_lock(&ic_control_info.lock);
conn->stillActive = false;
pthread_mutex_unlock(&ic_control_info.lock);
return;
}
/*
* aggregateStatistics
* aggregate statistics.
*/
static void
aggregateStatistics(ChunkTransportStateEntry *pEntry)
{
/*
* We first clear the stats, and then compute new stats by aggregating the
* stats from each connection.
*/
pEntry->stat_total_ack_time = 0;
pEntry->stat_count_acks = 0;
pEntry->stat_max_ack_time = 0;
pEntry->stat_min_ack_time = ~((uint64) 0);
pEntry->stat_count_resent = 0;
pEntry->stat_max_resent = 0;
pEntry->stat_count_dropped = 0;
int connNo;
for (connNo = 0; connNo < pEntry->numConns; connNo++)
{
MotionConn *conn = &pEntry->conns[connNo];
pEntry->stat_total_ack_time += conn->stat_total_ack_time;
pEntry->stat_count_acks += conn->stat_count_acks;
pEntry->stat_max_ack_time = Max(pEntry->stat_max_ack_time, conn->stat_max_ack_time);
pEntry->stat_min_ack_time = Min(pEntry->stat_min_ack_time, conn->stat_min_ack_time);
pEntry->stat_count_resent += conn->stat_count_resent;
pEntry->stat_max_resent = Max(pEntry->stat_max_resent, conn->stat_max_resent);
pEntry->stat_count_dropped += conn->stat_count_dropped;
}
}
/*
* logPkt
* Log a packet.
*
*/
static inline void
logPkt(char *prefix, icpkthdr *pkt)
{
write_log("%s [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d ",
prefix, pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
pkt->seq, pkt->extraSeq, pkt->motNodeId, pkt->crc, pkt->len,
pkt->srcContentId, pkt->dstContentId,
pkt->srcPid, pkt->dstPid,
pkt->srcListenerPort, pkt->dstListenerPort,
pkt->sendSliceIndex, pkt->recvSliceIndex,
pkt->sessionId, pkt->icId,
pkt->flags);
}
/*
* handleAckedPacket
* Called by sender to process acked packet.
*
* Remove it from unack queue and unack queue ring, change the rtt ...
*
* RTT (Round Trip Time) is computed as the time between we send the packet
* and receive the acknowledgement for the packet. When an acknowledgement
* is received, an estimated RTT value (called SRTT, smoothed RTT) is updated
* by using the following equation. And we also set a limitation of the max
* value and min value for SRTT.
* (1) SRTT = (1 - g) SRTT + g x RTT (0 < g < 1)
* where RTT is the measured round trip time of the packet. In implementation,
* g is set to 1/8. In order to compute expiration period, we also compute an
* estimated delay variance SDEV by using:
* (2) SDEV = (1 - h) x SDEV + h x |SERR| (0 < h < 1, In implementation, h is set to 1/4)
* where SERR is calculated by using:
* (3) SERR = RTT - SRTT
* Expiration period determines the timing we resend a packet. A long RTT means
* a long expiration period. Delay variance is used to incorporate the variance
* of workload/network variances at different time. When a packet is retransmitted,
* we back off exponentially the expiration period.
* (4) exp_period = (SRTT + y x SDEV) << retry
* Here y is a constant (In implementation, we use 4) and retry is the times the
* packet is retransmitted.
*/
static void
handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now)
{
uint64 ackTime = 0;
bool bufIsHead = (&buf->primary == icBufferListFirst(&ackConn->unackQueue));
buf = icBufferListDelete(&ackConn->unackQueue, buf);
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
unack_queue_ring.numOutStanding--;
if (icBufferListLength(&ackConn->unackQueue) >= 1)
unack_queue_ring.numSharedOutStanding--;
ackTime = now - buf->sentTime;
/*
* In udp_testmode, we do not change rtt dynamically due to the large
* number of packet losses introduced by fault injection code. This
* can decrease the testing time.
*/
#ifdef USE_ASSERT_CHECKING
if (!udp_testmode)
#endif
{
uint64 newRTT = 0;
uint64 newDEV = 0;
if (buf->nRetry == 0)
{
newRTT = buf->conn->rtt - (buf->conn->rtt >> RTT_SHIFT_COEFFICIENT) + (ackTime >> RTT_SHIFT_COEFFICIENT);
newRTT = Min(MAX_RTT, Max(newRTT, MIN_RTT));
buf->conn->rtt = newRTT;
newDEV = buf->conn->dev - (buf->conn->dev >> DEV_SHIFT_COEFFICIENT) + ((Max(ackTime, newRTT) - Min(ackTime, newRTT)) >> DEV_SHIFT_COEFFICIENT);
newDEV = Min(MAX_DEV, Max(newDEV, MIN_DEV));
buf->conn->dev = newDEV;
/* adjust the congestion control window. */
if (snd_control_info.cwnd < snd_control_info.ssthresh)
snd_control_info.cwnd += 1;
else
snd_control_info.cwnd += 1 / snd_control_info.cwnd;
snd_control_info.cwnd = Min(snd_control_info.cwnd, snd_buffer_pool.maxCount);
}
}
}
buf->conn->stat_total_ack_time += ackTime;
buf->conn->stat_max_ack_time = Max(ackTime, buf->conn->stat_max_ack_time);
buf->conn->stat_min_ack_time = Min(ackTime, buf->conn->stat_min_ack_time);
/*
* only change receivedAckSeq when it is the smallest pkt we sent and have
* not received ack for it.
*/
if (bufIsHead)
ackConn->receivedAckSeq = buf->pkt->seq;
/* The first packet acts like a connect setup packet */
if (buf->pkt->seq == 1)
ackConn->state = mcsStarted;
icBufferListAppend(&snd_buffer_pool.freeList, buf);
#ifdef AMS_VERBOSE_LOGGING
write_log("REMOVEPKT %d from unack queue for route %d (retry %d) sndbufmaxcount %d sndbufcount %d sndbuffreelistlen %d, sntSeq %d consumedSeq %d recvAckSeq %d capacity %d, sndQ %d, unackQ %d", buf->pkt->seq, ackConn->route, buf->nRetry, snd_buffer_pool.maxCount, snd_buffer_pool.count, icBufferListLength(&snd_buffer_pool.freeList), buf->conn->sentSeq, buf->conn->consumedSeq, buf->conn->receivedAckSeq, buf->conn->capacity, icBufferListLength(&buf->conn->sndQueue), icBufferListLength(&buf->conn->unackQueue));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
icBufferListLog(&buf->conn->unackQueue);
icBufferListLog(&buf->conn->sndQueue);
}
#endif
}
/*
* handleAck
* handle acks incoming from our upstream peers.
*
* if we receive a stop message, return true (caller will clean up).
*/
static bool
handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry)
{
bool ret = false;
MotionConn *ackConn = NULL;
int n;
struct sockaddr_storage peer;
socklen_t peerlen;
struct icpkthdr *pkt = snd_control_info.ackBuffer;
bool shouldSendBuffers = false;
SliceTable *sliceTbl = transportStates->sliceTable;
for (;;)
{
/* ready to read on our socket ? */
peerlen = sizeof(peer);
n = recvfrom(pEntry->txfd, (char *) pkt, MIN_PACKET_SIZE, 0,
(struct sockaddr *) &peer, &peerlen);
if (n < 0)
{
if (errno == EWOULDBLOCK) /* had nothing to read. */
{
aggregateStatistics(pEntry);
return ret;
}
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
if (errno == EINTR)
continue;
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error waiting for peer ack"),
errdetail("During recvfrom() call.")));
}
else if (n < sizeof(struct icpkthdr))
{
continue;
}
else if (n != pkt->len)
{
continue;
}
/*
* check the CRC of the payload.
*/
if (gp_interconnect_full_crc)
{
if (!checkCRC(pkt))
{
pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &ic_statistics.crcErrors, 1);
if (DEBUG2 >= log_min_messages)
write_log("received network data error, dropping bad packet, user data unaffected.");
continue;
}
}
/*
* read packet, is this the ack we want ?
*/
if (pkt->srcContentId == GpIdentity.segindex &&
pkt->srcPid == MyProcPid &&
pkt->srcListenerPort == ((Gp_listener_port >> 16) & 0x0ffff) &&
pkt->sessionId == gp_session_id &&
pkt->icId == sliceTbl->ic_instance_id)
{
/*
* packet is for me. Note here we do not need to get a connection
* lock here, since background rx thread only read the hash table.
*/
ackConn = findConnByHeader(&ic_control_info.connHtab, pkt);
if (ackConn == NULL)
{
elog(LOG, "Received ack for unknown connection (flags 0x%x)", pkt->flags);
continue;
}
ackConn->stat_count_acks++;
ic_statistics.recvAckNum++;
uint64 now = getCurrentTime();
ackConn->deadlockCheckBeginTime = now;
/*
* We simply disregard pkt losses (NAK) due to process start race
* (that is, sender is started earlier than receiver. rx
* background thread may receive packets when connections are not
* created yet).
*
* Another option is to resend the packet immediately, but
* experiments do not show any benefits.
*/
if (pkt->flags & UDPIC_FLAGS_NAK)
continue;
while (true)
{
if (pkt->flags & UDPIC_FLAGS_CAPACITY)
{
if (pkt->extraSeq > ackConn->consumedSeq)
{
ackConn->capacity += pkt->extraSeq - ackConn->consumedSeq;
ackConn->consumedSeq = pkt->extraSeq;
shouldSendBuffers = true;
}
}
else if (pkt->flags & UDPIC_FLAGS_DUPLICATE)
{
if (DEBUG1 >= log_min_messages)
write_log("GOTDUPACK [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
shouldSendBuffers |= (handleAckForDuplicatePkt(ackConn, pkt));
break;
}
else if (pkt->flags & UDPIC_FLAGS_DISORDER)
{
if (DEBUG1 >= log_min_messages)
write_log("GOTDISORDER [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
shouldSendBuffers |= (handleAckForDisorderPkt(transportStates, pEntry, ackConn, pkt));
break;
}
/*
* don't get out of the loop if pkt->seq equals to
* ackConn->receivedAckSeq, need to check UDPIC_FLAGS_STOP
* flag
*/
if (pkt->seq < ackConn->receivedAckSeq)
{
if (DEBUG1 >= log_min_messages)
write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq %d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity, ackConn->consumedSeq);
break;
}
/* haven't gotten a stop request, maybe this is one ? */
if ((pkt->flags & UDPIC_FLAGS_STOP) && !ackConn->stopRequested && ackConn->stillActive)
{
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "got ack with stop; srcpid %d dstpid %d cmd %d flags 0x%x pktseq %d connseq %d", pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, pkt->seq, ackConn->conn_info.seq);
#endif
ackConn->stopRequested = true;
ackConn->conn_info.flags |= UDPIC_FLAGS_STOP;
ret = true;
/* continue to deal with acks */
}
if (pkt->seq == ackConn->receivedAckSeq)
{
if (DEBUG1 >= log_min_messages)
write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq %d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity, ackConn->consumedSeq);
break;
}
/* deal with a regular ack. */
if (pkt->flags & UDPIC_FLAGS_ACK)
{
ICBufferLink *link = NULL;
ICBufferLink *next = NULL;
ICBuffer *buf = NULL;
#ifdef AMS_VERBOSE_LOGGING
write_log("GOTACK [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
#endif
link = icBufferListFirst(&ackConn->unackQueue);
buf = GET_ICBUFFER_FROM_PRIMARY(link);
while (!icBufferListIsHead(&ackConn->unackQueue, link) && buf->pkt->seq <= pkt->seq)
{
next = link->next;
handleAckedPacket(ackConn, buf, now);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
}
break;
}
/*
* When there is a capacity increase or some outstanding buffers
* removed from the unack queue ring, we should try to send
* buffers for the connection. Even when stop is received, we
* still send here, since in STOP/EOS race case, we may have been
* in EOS sending logic and will not check stop message.
*/
if (shouldSendBuffers)
sendBuffers(transportStates, pEntry, ackConn);
}
else if (DEBUG1 >= log_min_messages)
write_log("handleAck: not the ack we're looking for (flags 0x%x)...mot(%d) content(%d:%d) srcpid(%d:%d) dstpid(%d) srcport(%d:%d) dstport(%d) sess(%d:%d) cmd(%d:%d)",
pkt->flags, pkt->motNodeId,
pkt->srcContentId, GpIdentity.segindex,
pkt->srcPid, MyProcPid,
pkt->dstPid,
pkt->srcListenerPort, ((Gp_listener_port >> 16) & 0x0ffff),
pkt->dstListenerPort,
pkt->sessionId, gp_session_id,
pkt->icId, sliceTbl->ic_instance_id);
}
}
/*
* addCRC
* add CRC field to the packet.
*/
static inline void
addCRC(icpkthdr *pkt)
{
pg_crc32c local_crc;
INIT_CRC32C(local_crc);
COMP_CRC32C(local_crc, pkt, pkt->len);
FIN_CRC32C(local_crc);
pkt->crc = local_crc;
}
/*
* checkCRC
* check the validity of the packet.
*/
static inline bool
checkCRC(icpkthdr *pkt)
{
pg_crc32c rx_crc,
local_crc;
rx_crc = pkt->crc;
pkt->crc = 0;
INIT_CRC32C(local_crc);
COMP_CRC32C(local_crc, pkt, pkt->len);
FIN_CRC32C(local_crc);
if (rx_crc != local_crc)
{
return false;
}
return true;
}
/*
* prepareXmit
* Prepare connection for transmit.
*/
static inline void
prepareXmit(MotionConn *conn)
{
Assert(conn != NULL);
conn->conn_info.len = conn->msgSize;
conn->conn_info.crc = 0;
memcpy(conn->pBuff, &conn->conn_info, sizeof(conn->conn_info));
/* increase the sequence no */
conn->conn_info.seq++;
if (gp_interconnect_full_crc)
{
icpkthdr *pkt = (icpkthdr *) conn->pBuff;
addCRC(pkt);
}
}
/*
* sendtoWithRetry
* Retry sendto logic and send the packets.
*/
static ssize_t
sendtoWithRetry(int socket, const void *message, size_t length,
int flags, const struct sockaddr *dest_addr,
socklen_t dest_len, int retry, const char *errDetail)
{
int32 n;
int count = 0;
xmit_retry:
/*
* If given retry count is positive, retry up to the limited times.
* Otherwise, retry for unlimited times until succeed.
*/
if (retry > 0 && ++count > retry)
return n;
n = sendto(socket, message, length, flags, dest_addr, dest_len);
if (n < 0)
{
int save_errno = errno;
if (errno == EINTR)
goto xmit_retry;
/*
* EAGAIN: no space ? not an error.
*
* EFAULT: In Linux system call, it only happens when copying a socket
* address into kernel space failed, which is less likely to happen,
* but mocked heavily by our fault injection in regression tests.
*/
if (errno == EAGAIN || errno == EFAULT)
return n;
/*
* If Linux iptables (nf_conntrack?) drops an outgoing packet, it may
* return an EPERM to the application. This might be simply because of
* traffic shaping or congestion, so ignore it.
*/
if (errno == EPERM)
{
ereport(LOG,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error writing an outgoing packet: %m"),
errdetail("error during sendto() %s", errDetail)));
return n;
}
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error writing an outgoing packet: %m"),
errdetail("error during sendto() call (error:%d).\n"
"%s", save_errno, errDetail)));
/* not reached */
}
return n;
}
/*
* sendOnce
* Send a packet.
*/
static void
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, ICBuffer *buf, MotionConn *conn)
{
int32 n;
#ifdef USE_ASSERT_CHECKING
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
{
#ifdef AMS_VERBOSE_LOGGING
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
#endif
return;
}
#endif
char errDetail[100];
snprintf(errDetail, sizeof(errDetail), "For Remote Connection: contentId=%d at %s",
conn->remoteContentId,
conn->remoteHostAndPort);
n = sendtoWithRetry(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
(struct sockaddr *) &conn->peer, conn->peer_len, -1, errDetail);
if (n != buf->pkt->len)
{
if (DEBUG1 >= log_min_messages)
write_log("Interconnect error writing an outgoing packet [seq %d]: short transmit (given %d sent %d) during sendto() call."
"For Remote Connection: contentId=%d at %s", buf->pkt->seq, buf->pkt->len, n,
conn->remoteContentId,
conn->remoteHostAndPort);
#ifdef AMS_VERBOSE_LOGGING
logPkt("PKT DETAILS ", buf->pkt);
#endif
}
return;
}
/*
* handleStopMsgs
* handle stop messages.
*
*/
static void
handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, int16 motionId)
{
int i = 0;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "handleStopMsgs: node %d", motionId);
#endif
while (i < pEntry->numConns)
{
MotionConn *conn = NULL;
conn = pEntry->conns + i;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "handleStopMsgs: node %d route %d %s %s", motionId, conn->route,
(conn->stillActive ? "active" : "NOT active"), (conn->stopRequested ? "stop requested" : ""));
elog(DEBUG3, "handleStopMsgs: node %d route %d msgSize %d", motionId, conn->route, conn->msgSize);
#endif
/*
* MPP-2427: we're guaranteed to have recently flushed, but this might
* not be empty (if we got a stop on a buffer that wasn't the one we
* were sending) ... empty it first so the outbound buffer is empty
* when we get here.
*/
if (conn->stillActive && conn->stopRequested)
{
/* mark buffer empty */
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
/* now send our stop-ack EOS */
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
Assert(conn->curBuff != NULL);
conn->pBuff[conn->msgSize] = 'S';
conn->msgSize += 1;
prepareXmit(conn);
/* now ready to actually send */
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "handleStopMsgs: node %d route %d, seq %d", motionId, i, conn->conn_info.seq);
/* place it into the send queue */
icBufferListAppend(&conn->sndQueue, conn->curBuff);
/* return all buffers */
icBufferListReturn(&conn->sndQueue, false);
icBufferListReturn(&conn->unackQueue, Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY ? false : true);
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
conn->state = mcsEosSent;
conn->curBuff = NULL;
conn->pBuff = NULL;
conn->stillActive = false;
conn->stopRequested = false;
}
i++;
if (i == pEntry->numConns)
{
if (pollAcks(transportStates, pEntry->txfd, 0))
{
if (handleAcks(transportStates, pEntry))
{
/* more stops found, loop again. */
i = 0;
continue;
}
}
}
}
}
/*
* sendBuffers
* Called by sender to send the buffers in the send queue.
*
* Send the buffers in the send queue of the connection if there is capacity left
* and the congestion control condition is satisfied.
*
* Here, we make sure that a connection can have at least one outstanding buffer.
* This is very important for two reasons:
*
* 1) The handling logic of the ack of the outstanding buffer can always send a buffer
* in the send queue. Otherwise, there may be a deadlock.
* 2) This makes sure that any connection can have a minimum bandwidth for data
* sending.
*
* After sending a buffer, the buffer will be placed into both the unack queue and
* the corresponding queue in the unack queue ring.
*/
static void
sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
while (conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0)
{
ICBuffer *buf = NULL;
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS &&
(icBufferListLength(&conn->unackQueue) > 0 &&
unack_queue_ring.numSharedOutStanding >= (snd_control_info.cwnd - snd_control_info.minCwnd)))
break;
/* for connection setup, we only allow one outstanding packet. */
if (conn->state == mcsSetupOutgoingConnection && icBufferListLength(&conn->unackQueue) >= 1)
break;
buf = icBufferListPop(&conn->sndQueue);
uint64 now = getCurrentTime();
buf->sentTime = now;
buf->unackQueueRingSlot = -1;
buf->nRetry = 0;
buf->conn = conn;
conn->capacity--;
icBufferListAppend(&conn->unackQueue, buf);
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
unack_queue_ring.numOutStanding++;
if (icBufferListLength(&conn->unackQueue) > 1)
unack_queue_ring.numSharedOutStanding++;
putIntoUnackQueueRing(&unack_queue_ring,
buf,
computeExpirationPeriod(buf->conn, buf->nRetry),
now);
}
/*
* Note the place of sendOnce here. If we send before appending it to
* the unack queue and putting it into unack queue ring, and there is
* a network error occurred in the sendOnce function, error message
* will be output. In the time of error message output, interrupts is
* potentially checked, if there is a pending query cancel, it will
* lead to a dangled buffer (memory leak).
*/
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt);
#endif
sendOnce(transportStates, pEntry, buf, conn);
ic_statistics.sndPktNum++;
#ifdef AMS_VERBOSE_LOGGING
logPkt("SEND PKT DETAIL", buf->pkt);
#endif
buf->conn->sentSeq = buf->pkt->seq;
}
}
/*
* handleDisorderPacket
* Called by rx thread to assemble and send a disorder message.
*
* In current implementation, we limit the number of lost packet sequence numbers
* in the disorder message by the MIN_PACKET_SIZE. There are two reasons here:
*
* 1) The maximal number of lost packet sequence numbers are actually bounded by the
* receive queue depth whose maximal value is very large. Since we share the packet
* receive and ack receive in the background thread, the size of disorder should be
* also limited by the max packet size.
* 2) We can use Gp_max_packet_size here to limit the number of lost packet sequence numbers.
* But considering we do not want to let senders send many packets when getting a lost
* message. Here we use MIN_PACKET_SIZE.
*
*
* the format of a disorder message:
* I) pkt header
* - seq -> packet sequence number that triggers the disorder message
* - extraSeq -> the largest seq of the received packets
* - flags -> UDPIC_FLAGS_DISORDER
* - len -> sizeof(icpkthdr) + sizeof(uint32) * (lost pkt count)
* II) content
* - an array of lost pkt sequence numbers (uint32)
*
*/
static void
handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt)
{
int start = 0;
uint32 lostPktCnt = 0;
uint32 *curSeq = (uint32 *) &rx_control_info.disorderBuffer[1];
uint32 maxSeqs = MAX_SEQS_IN_DISORDER_ACK;
#ifdef AMS_VERBOSE_LOGGING
write_log("PROCESS_DISORDER PKT BEGIN:");
#endif
start = conn->pkt_q_tail;
while (start != pos && lostPktCnt < maxSeqs)
{
if (conn->pkt_q[start] == NULL)
{
*curSeq = tailSeq;
lostPktCnt++;
curSeq++;
#ifdef AMS_VERBOSE_LOGGING
write_log("PROCESS_DISORDER add seq [%d], lostPktCnt %d", *curSeq, lostPktCnt);
#endif
}
tailSeq++;
start = (start + 1) % conn->pkt_q_capacity;
}
#ifdef AMS_VERBOSE_LOGGING
write_log("PROCESS_DISORDER PKT END:");
#endif
/* when reaching here, cnt must not be 0 */
sendDisorderAck(conn, pkt->seq, conn->conn_info.seq - 1, lostPktCnt);
}
/*
* handleAckForDisorderPkt
* Called by sender to deal with acks for disorder packet.
*/
static bool
handleAckForDisorderPkt(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry,
MotionConn *conn,
icpkthdr *pkt)
{
ICBufferLink *link = NULL;
ICBuffer *buf = NULL;
ICBufferLink *next = NULL;
uint64 now = getCurrentTime();
uint32 *curLostPktSeq = 0;
int lostPktCnt = 0;
static uint32 times = 0;
static uint32 lastSeq = 0;
bool shouldSendBuffers = false;
if (pkt->extraSeq != lastSeq)
{
lastSeq = pkt->extraSeq;
times = 0;
return false;
}
else
{
times++;
if (times != 2)
return false;
}
curLostPktSeq = (uint32 *) &pkt[1];
lostPktCnt = (pkt->len - sizeof(icpkthdr)) / sizeof(uint32);
/*
* Resend all the missed packets and remove received packets from queues
*/
link = icBufferListFirst(&conn->unackQueue);
buf = GET_ICBUFFER_FROM_PRIMARY(link);
#ifdef AMS_VERBOSE_LOGGING
write_log("DISORDER: pktlen %d cnt %d pktseq %d first loss %d buf %p",
pkt->len, lostPktCnt, pkt->seq, *curLostPktSeq, buf);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
icBufferListLog(&conn->unackQueue);
icBufferListLog(&conn->sndQueue);
}
#endif
/*
* iterate the unack queue
*/
while (!icBufferListIsHead(&conn->unackQueue, link) && buf->pkt->seq <= pkt->seq && lostPktCnt > 0)
{
#ifdef AMS_VERBOSE_LOGGING
write_log("DISORDER: bufseq %d curlostpkt %d cnt %d buf %p pkt->seq %d",
buf->pkt->seq, *curLostPktSeq, lostPktCnt, buf, pkt->seq);
#endif
if (buf->pkt->seq == pkt->seq)
{
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
break;
}
if (buf->pkt->seq == *curLostPktSeq)
{
/* this is a lost packet, retransmit */
buf->nRetry++;
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
putIntoUnackQueueRing(&unack_queue_ring, buf,
computeExpirationPeriod(buf->conn, buf->nRetry), now);
}
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt);
#endif
sendOnce(transportStates, pEntry, buf, buf->conn);
#ifdef AMS_VERBOSE_LOGGING
write_log("RESEND a buffer for DISORDER: seq %d", buf->pkt->seq);
logPkt("DISORDER RESEND DETAIL ", buf->pkt);
#endif
ic_statistics.retransmits++;
curLostPktSeq++;
lostPktCnt--;
link = link->next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
else if (buf->pkt->seq < *curLostPktSeq)
{
/* remove packet already received. */
next = link->next;
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
else /* buf->pkt->seq > *curPktSeq */
{
/*
* this case is introduced when the disorder message tell you a
* pkt is lost. But when we handle this message, a message (for
* example, duplicate ack, or another disorder message) arriving
* before this message already removed the pkt.
*/
curLostPktSeq++;
lostPktCnt--;
}
}
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, snd_control_info.minCwnd);
snd_control_info.cwnd = snd_control_info.ssthresh;
}
#ifdef AMS_VERBOSE_LOGGING
write_log("After DISORDER: sndQ %d unackQ %d",
icBufferListLength(&conn->sndQueue), icBufferListLength(&conn->unackQueue));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
icBufferListLog(&conn->unackQueue);
icBufferListLog(&conn->sndQueue);
}
#endif
return shouldSendBuffers;
}
/*
* handleAckForDuplicatePkt
* Called by sender to deal with acks for duplicate packet.
*
*/
static bool
handleAckForDuplicatePkt(MotionConn *conn, icpkthdr *pkt)
{
ICBufferLink *link = NULL;
ICBuffer *buf = NULL;
ICBufferLink *next = NULL;
uint64 now = getCurrentTime();
bool shouldSendBuffers = false;
#ifdef AMS_VERBOSE_LOGGING
write_log("RESEND the unacked buffers in the queue due to %s", pkt->len == 0 ? "PROCESS_START_RACE" : "DISORDER");
#endif
if (pkt->seq <= pkt->extraSeq)
{
/* Indicate a bug here. */
write_log("ERROR: invalid duplicate message: seq %d extraSeq %d", pkt->seq, pkt->extraSeq);
return false;
}
link = icBufferListFirst(&conn->unackQueue);
buf = GET_ICBUFFER_FROM_PRIMARY(link);
/* deal with continuous pkts */
while (!icBufferListIsHead(&conn->unackQueue, link) && (buf->pkt->seq <= pkt->extraSeq))
{
next = link->next;
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
/* deal with the single duplicate packet */
while (!icBufferListIsHead(&conn->unackQueue, link) && buf->pkt->seq <= pkt->seq)
{
next = link->next;
if (buf->pkt->seq == pkt->seq)
{
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
break;
}
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
return shouldSendBuffers;
}
/*
* checkNetworkTimeout
* check network timeout case.
*/
static inline void
checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged)
{
/*
* Using only the time to first sent time to decide timeout is not enough,
* since there is a possibility the sender process is not scheduled or
* blocked by OS for a long time. In this case, only a few times are
* tried. Thus, the GUC Gp_interconnect_min_retries_before_timeout is
* added here.
*/
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG &&
buf->nRetry % Gp_interconnect_debug_retry_interval == 0)
{
ereport(LOG,
(errmsg("resending packet (seq %d) to %s (pid %d cid %d) with %d retries in %lu seconds",
buf->pkt->seq, buf->conn->remoteHostAndPort,
buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry,
(now - buf->sentTime) / 1000 / 1000)));
}
if ((buf->nRetry > Gp_interconnect_min_retries_before_timeout) && (now - buf->sentTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect encountered a network error, please check your network"),
errdetail("Failed to send packet (seq %u) to %s (pid %d cid %d) after %u retries in %d seconds.",
buf->pkt->seq, buf->conn->remoteHostAndPort,
buf->pkt->dstPid, buf->pkt->dstContentId,
buf->nRetry, Gp_interconnect_transmit_timeout)));
}
/*
* Default value of Gp_interconnect_transmit_timeout is one hours.
* It taks too long time to detect a network error and it is not user friendly.
*
* Packets would be dropped repeatly on some specific ports. We'd better have
* a warning messgage for this case and give the DBA a chance to detect this error
* earlier. Since packets would also be dropped when network is bad, we should not
* error out here, but just give a warning message. Erroring our is still handled
* by GUC Gp_interconnect_transmit_timeout as above. Note that warning message should
* be printed for each statement only once.
*/
if ((buf->nRetry >= Gp_interconnect_min_retries_before_timeout) && !(*networkTimeoutIsLogged))
{
ereport(WARNING,
(errmsg("interconnect may encountered a network error, please check your network"),
errdetail("Failing to send packet (seq %u) to %s (pid %d cid %d) after %u retries.",
buf->pkt->seq, buf->conn->remoteHostAndPort,
buf->pkt->dstPid, buf->pkt->dstContentId,
buf->nRetry)));
*networkTimeoutIsLogged = true;
}
}
/*
* checkExpiration
* Check whether packets expire. If a packet expires, resend the packet,
* and adjust its position in the unack queue ring.
*
*/
static void
checkExpiration(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry,
MotionConn *triggerConn,
uint64 now)
{
/* check for expiration */
int count = 0;
int retransmits = 0;
Assert(unack_queue_ring.currentTime != 0);
while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM)
{
/* expired, need to resend them */
ICBuffer *curBuf = NULL;
while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL)
{
curBuf->nRetry++;
putIntoUnackQueueRing(
&unack_queue_ring,
curBuf,
computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now);
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt);
#endif
sendOnce(transportStates, pEntry, curBuf, curBuf->conn);
retransmits++;
ic_statistics.retransmits++;
curBuf->conn->stat_count_resent++;
curBuf->conn->stat_max_resent = Max(curBuf->conn->stat_max_resent,
curBuf->conn->stat_count_resent);
checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged);
#ifdef AMS_VERBOSE_LOGGING
write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d",
curBuf->pkt->seq, curBuf->nRetry, curBuf->conn->rtt, curBuf->conn->route);
logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
#endif
}
unack_queue_ring.currentTime += TIMER_SPAN;
unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM);
}
/*
* deal with case when there is a long time this function is not called.
*/
unack_queue_ring.currentTime = now - (now % TIMER_SPAN);
if (retransmits > 0)
{
snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, snd_control_info.minCwnd);
snd_control_info.cwnd = snd_control_info.minCwnd;
}
}
/*
* checkDeadlock
* Check whether deadlock occurs on a connection.
*
* What this function does is to send a status query message to rx thread when
* the connection has not received any acks for some time. This is to avoid
* potential deadlock when there are continuous ack losses. Packet resending
* logic does not help avoiding deadlock here since the packets in the unack
* queue may already been removed when the sender knows that they have been
* already buffered in the receiver side queue.
*
* Some considerations on deadlock check time period:
*
* Potential deadlock occurs rarely. According to our experiments on various
* workloads and hardware. It occurred only when fault injection is enabled
* and a large number packets and acknowledgments are discarded. Thus, here we
* use a relatively large deadlock check period.
*
*/
static void
checkDeadlock(ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
uint64 deadlockCheckTime;
if (icBufferListLength(&conn->unackQueue) == 0 && conn->capacity == 0 && icBufferListLength(&conn->sndQueue) > 0)
{
/* we must have received some acks before deadlock occurs. */
Assert(conn->deadlockCheckBeginTime > 0);
#ifdef USE_ASSERT_CHECKING
if (udp_testmode)
{
deadlockCheckTime = 100000;
}
else
#endif
{
deadlockCheckTime = DEADLOCK_CHECKING_TIME;
}
uint64 now = getCurrentTime();
/* request the capacity to avoid the deadlock case */
if (((now - ic_control_info.lastDeadlockCheckTime) > deadlockCheckTime) &&
((now - conn->deadlockCheckBeginTime) > deadlockCheckTime))
{
sendStatusQueryMessage(conn, pEntry->txfd, conn->conn_info.seq - 1);
ic_control_info.lastDeadlockCheckTime = now;
ic_statistics.statusQueryMsgNum++;
/* check network error. */
if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect encountered a network error, please check your network"),
errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.",
conn->remoteHostAndPort,
conn->conn_info.dstPid,
conn->conn_info.dstContentId,
Gp_interconnect_transmit_timeout)));
}
}
}
}
/*
* pollAcks
* Timeout polling of acks
*/
static inline bool
pollAcks(ChunkTransportState *transportStates, int fd, int timeout)
{
struct pollfd nfd;
int n;
nfd.fd = fd;
nfd.events = POLLIN;
n = poll(&nfd, 1, timeout);
if (n < 0)
{
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
if (errno == EINTR)
return false;
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error waiting for peer ack"),
errdetail("During poll() call.")));
/* not reached */
}
if (n == 0) /* timeout */
{
return false;
}
/* got an ack to handle (possibly a stop message) */
if (n == 1 && (nfd.events & POLLIN))
{
return true;
}
return false;
}
/*
* updateRetransmitStatistics
* Update the retransmit statistics.
*/
static inline void
updateRetransmitStatistics(MotionConn *conn)
{
ic_statistics.retransmits++;
conn->stat_count_resent++;
conn->stat_max_resent = Max(conn->stat_max_resent, conn->stat_count_resent);
}
/*
* checkExpirationCapacityFC
* Check expiration for capacity based flow control method.
*
*/
static void
checkExpirationCapacityFC(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry,
MotionConn *conn,
int timeout)
{
if (icBufferListLength(&conn->unackQueue) == 0)
return;
uint64 now = getCurrentTime();
uint64 elapsed = now - ic_control_info.lastPacketSendTime;
if (elapsed >= ((uint64) timeout * 1000))
{
ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue);
ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
sendOnce(transportStates, pEntry, buf, buf->conn);
buf->nRetry++;
ic_control_info.lastPacketSendTime = now;
updateRetransmitStatistics(conn);
checkNetworkTimeout(buf, now, &transportStates->networkTimeoutIsLogged);
}
}
/*
* checkExceptions
* Check exceptions including packet expiration, deadlock, bg thread error, NIC failure...
* Caller should start from 0 with retry, so that the expensive check for deadlock and
* QD connection can be avoided in a healthy state.
*/
static void
checkExceptions(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry,
MotionConn *conn,
int retry,
int timeout)
{
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY /* || conn->state ==
* mcsSetupOutgoingConnection
* */ )
{
checkExpirationCapacityFC(transportStates, pEntry, conn, timeout);
}
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
uint64 now = getCurrentTime();
if (now - ic_control_info.lastExpirationCheckTime > TIMER_CHECKING_PERIOD)
{
checkExpiration(transportStates, pEntry, conn, now);
ic_control_info.lastExpirationCheckTime = now;
}
}
if ((retry & 0x3) == 2)
{
checkDeadlock(pEntry, conn);
checkRxThreadError();
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
}
/*
* 1. NIC on master (and thus the QD connection) may become bad, check it.
* 2. Postmaster may become invalid, check it
*
* We check modulo 2 to correlate with the deadlock check above at the
* initial iteration.
*/
if ((retry & 0x3f) == 2)
{
checkQDConnectionAlive();
if (!PostmasterIsAlive())
ereport(FATAL,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect failed to send chunks"),
errdetail("Postmaster is not alive.")));
}
}
/*
* computeTimeout
* Compute timeout value in ms.
*/
static inline int
computeTimeout(MotionConn *conn, int retry)
{
if (icBufferListLength(&conn->unackQueue) == 0)
return TIMER_CHECKING_PERIOD;
ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue);
ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
if (buf->nRetry == 0 && retry == 0)
return 0;
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
return TIMER_CHECKING_PERIOD;
/* for capacity based flow control */
return TIMEOUT(buf->nRetry);
}
/*
* SendChunkUDPIFC
* is used to send a tcItem to a single destination. Tuples often are
* *very small* we aggregate in our local buffer before sending into the kernel.
*
* PARAMETERS
* conn - MotionConn that the tcItem is to be sent to.
* tcItem - message to be sent.
* motionId - Node Motion Id.
*/
static bool
SendChunkUDPIFC(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry,
MotionConn *conn,
TupleChunkListItem tcItem,
int16 motionId)
{
int length = tcItem->chunk_length;
int retry = 0;
bool doCheckExpiration = false;
bool gotStops = false;
Assert(conn->msgSize > 0);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "sendChunk: msgSize %d this chunk length %d conn seq %d",
conn->msgSize, tcItem->chunk_length, conn->conn_info.seq);
#endif
if (conn->msgSize + length <= Gp_max_packet_size)
{
memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
conn->msgSize += length;
conn->tupleCount++;
return true;
}
/* prepare this for transmit */
ic_statistics.totalCapacity += conn->capacity;
ic_statistics.capacityCountingTime++;
/* try to send it */
prepareXmit(conn);
icBufferListAppend(&conn->sndQueue, conn->curBuff);
sendBuffers(transportStates, pEntry, conn);
uint64 now = getCurrentTime();
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY)
doCheckExpiration = false;
else
doCheckExpiration = (now - ic_control_info.lastExpirationCheckTime) > MAX_TIME_NO_TIMER_CHECKING ? true : false;
/* get a new buffer */
conn->curBuff = NULL;
conn->pBuff = NULL;
ic_control_info.lastPacketSendTime = 0;
conn->deadlockCheckBeginTime = now;
while (doCheckExpiration || (conn->curBuff = getSndBuffer(conn)) == NULL)
{
int timeout = (doCheckExpiration ? 0 : computeTimeout(conn, retry));
if (pollAcks(transportStates, pEntry->txfd, timeout))
{
if (handleAcks(transportStates, pEntry))
{
/*
* We make sure that we deal with the stop messages only after
* we get a buffer. Otherwise, if the stop message is not for
* this connection, this will lead to an error for the
* following data sending of this connection.
*/
gotStops = true;
}
}
checkExceptions(transportStates, pEntry, conn, retry++, timeout);
doCheckExpiration = false;
}
conn->pBuff = (uint8 *) conn->curBuff->pkt;
if (gotStops)
{
/* handling stop message will make some connection not active anymore */
handleStopMsgs(transportStates, pEntry, motionId);
gotStops = false;
if (!conn->stillActive)
return true;
}
/* reinitialize connection */
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
/* now we can copy the input to the new buffer */
memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
conn->msgSize += length;
conn->tupleCount++;
return true;
}
/*
* SendEosUDPIFC
* broadcast eos messages to receivers.
*
* See ml_ipc.h
*
*/
static void
SendEosUDPIFC(ChunkTransportState *transportStates,
int motNodeID,
TupleChunkListItem tcItem)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
int i = 0;
int retry = 0;
int activeCount = 0;
int timeout = 0;
if (!transportStates)
{
elog(FATAL, "SendEosUDPIFC: missing interconnect context.");
}
else if (!transportStates->activated && !transportStates->teardownActive)
{
elog(FATAL, "SendEosUDPIFC: context and teardown inactive.");
}
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "entering seneosudp");
#endif
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
getChunkTransportState(transportStates, motNodeID, &pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect seg%d slice%d sending end-of-stream to slice%d",
GpIdentity.segindex, motNodeID, pEntry->recvSlice->sliceIndex);
/*
* we want to add our tcItem onto each of the outgoing buffers -- this is
* guaranteed to leave things in a state where a flush is *required*.
*/
doBroadcast(transportStates, pEntry, tcItem, NULL);
pEntry->sendingEos = true;
uint64 now = getCurrentTime();
/* now flush all of the buffers. */
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->stillActive)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "sent eos to route %d tuplecount %d seq %d flags 0x%x stillActive %s icId %d %d",
conn->route, conn->tupleCount, conn->conn_info.seq,
conn->conn_info.flags, (conn->stillActive ? "true" : "false"),
conn->conn_info.icId, conn->msgSize);
/* prepare this for transmit */
if (pEntry->sendingEos)
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
prepareXmit(conn);
/* place it into the send queue */
icBufferListAppend(&conn->sndQueue, conn->curBuff);
sendBuffers(transportStates, pEntry, conn);
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
conn->curBuff = NULL;
conn->deadlockCheckBeginTime = now;
activeCount++;
}
}
/*
* Now waiting for acks from receivers.
*
* Note here waiting is done in a separate phase from the EOS sending
* phase to make the processing faster when a lot of connections are slow
* and have frequent packet losses. In fault injection tests, we found
* this.
*
*/
while (activeCount > 0)
{
activeCount = 0;
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->stillActive)
{
retry = 0;
ic_control_info.lastPacketSendTime = 0;
/* wait until this queue is emptied */
while (icBufferListLength(&conn->unackQueue) > 0 ||
icBufferListLength(&conn->sndQueue) > 0)
{
timeout = computeTimeout(conn, retry);
if (pollAcks(transportStates, pEntry->txfd, timeout))
handleAcks(transportStates, pEntry);
checkExceptions(transportStates, pEntry, conn, retry++, timeout);
if (retry >= MAX_TRY)
break;
}
if ((!conn->cdbProc) || (icBufferListLength(&conn->unackQueue) == 0 &&
icBufferListLength(&conn->sndQueue) == 0))
{
conn->state = mcsEosSent;
conn->stillActive = false;
}
else
activeCount++;
}
}
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "SendEosUDPIFC leaving, activeCount %d", activeCount);
}
/*
* doSendStopMessageUDPIFC
* Send stop messages to all senders.
*/
static void
doSendStopMessageUDPIFC(ChunkTransportState *transportStates, int16 motNodeID)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
int i;
if (!transportStates->activated)
return;
getChunkTransportState(transportStates, motNodeID, &pEntry);
Assert(pEntry);
/*
* Note: we're only concerned with receivers here.
*/
pthread_mutex_lock(&ic_control_info.lock);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect needs no more input from slice%d; notifying senders to stop.",
motNodeID);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
/*
* Note here, the stillActive flag of a connection may have been set
* to false by markUDPConnInactiveIFC.
*/
if (conn->stillActive)
{
if (conn->conn_info.flags & UDPIC_FLAGS_EOS)
{
/*
* we have a queued packet that has EOS in it. We've acked it,
* so we're done
*/
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "do sendstop: already have queued EOS packet, we're done. node %d route %d",
motNodeID, i);
conn->stillActive = false;
/* need to drop the queues in the teardown function. */
while (conn->pkt_q_size > 0)
{
putRxBufferAndSendAck(conn, NULL);
}
}
else
{
conn->stopRequested = true;
conn->conn_info.flags |= UDPIC_FLAGS_STOP;
/*
* The peer addresses for incoming connections will not be set
* until the first packet has arrived. However, when the lower
* slice does not have data to send, the corresponding peer
* address for the incoming connection will never be set. We
* will skip sending ACKs to those connections.
*/
#ifdef FAULT_INJECTOR
if (FaultInjector_InjectFaultIfSet(
"interconnect_stop_ack_is_lost",
DDLNotSpecified,
"" /* databaseName */ ,
"" /* tableName */ ) == FaultInjectorTypeSkip)
{
continue;
}
#endif
if (conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6)
{
uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
sendAck(conn, UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, seq, seq);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "sent stop message. node %d route %d seq %d", motNodeID, i, seq);
}
else
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "first packet did not arrive yet. don't sent stop message. node %d route %d",
motNodeID, i);
}
}
}
}
pthread_mutex_unlock(&ic_control_info.lock);
}
/*
* dispatcherAYT
* Check the connection from the dispatcher to verify that it is still there.
*
* The connection is a struct Port, stored in the global MyProcPort.
*
* Return true if the dispatcher connection is still alive.
*/
static bool
dispatcherAYT(void)
{
ssize_t ret;
char buf;
/*
* For background worker or auxiliary process like gdd, there is no client.
* As a result, MyProcPort is NULL. We should skip dispatcherAYT check here.
*/
if (MyProcPort == NULL)
return true;
if (MyProcPort->sock < 0)
return false;
#ifndef WIN32
ret = recv(MyProcPort->sock, &buf, 1, MSG_PEEK | MSG_DONTWAIT);
#else
ret = recv(MyProcPort->sock, &buf, 1, MSG_PEEK | MSG_PARTIAL);
#endif
if (ret == 0) /* socket has been closed. EOF */
return false;
if (ret > 0) /* data waiting on socket, it must be OK. */
return true;
if (ret == -1) /* error, or would be block. */
{
if (errno == EAGAIN || errno == EINPROGRESS)
return true; /* connection intact, no data available */
else
return false;
}
/* not reached */
return true;
}
/*
* checkQDConnectionAlive
* Check whether QD connection is still alive. If not, report error.
*/
static void
checkQDConnectionAlive(void)
{
if (Gp_role == GP_ROLE_EXECUTE)
{
if (!dispatcherAYT())
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error segment lost contact with master (recv): %m")));
}
}
/*
* getCurrentTime
* get current time
*
*/
static uint64
getCurrentTime(void)
{
struct timeval newTime;
int status = 1;
uint64 t = 0;
#if HAVE_LIBRT
/* Use clock_gettime to return monotonic time value. */
struct timespec ts;
status = clock_gettime(CLOCK_MONOTONIC, &ts);
newTime.tv_sec = ts.tv_sec;
newTime.tv_usec = ts.tv_nsec / 1000;
#endif
if (status != 0)
gettimeofday(&newTime, NULL);
t = ((uint64) newTime.tv_sec) * USECS_PER_SECOND + newTime.tv_usec;
return t;
}
/*
* putIntoUnackQueueRing
* Put the buffer into the ring.
*
* expTime - expiration time from now
*
*/
static void
putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now)
{
uint64 diff = 0;
int idx = 0;
/* The first packet, currentTime is not initialized */
if (uqr->currentTime == 0)
uqr->currentTime = now - (now % TIMER_SPAN);
diff = now + expTime - uqr->currentTime;
if (diff >= UNACK_QUEUE_RING_LENGTH)
{
#ifdef AMS_VERBOSE_LOGGING
write_log("putIntoUnackQueueRing:" "now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime);
#endif
diff = UNACK_QUEUE_RING_LENGTH - 1;
}
else if (diff < TIMER_SPAN)
{
diff = TIMER_SPAN;
}
idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM;
#ifdef AMS_VERBOSE_LOGGING
write_log("PUTTW: curtime " UINT64_FORMAT " now " UINT64_FORMAT " (diff " UINT64_FORMAT ") expTime " UINT64_FORMAT " previdx %d, nowidx %d, nextidx %d", uqr->currentTime, now, diff, expTime, buf->unackQueueRingSlot, uqr->idx, idx);
#endif
buf->unackQueueRingSlot = idx;
icBufferListAppend(&unack_queue_ring.slots[idx], buf);
}
/*
* handleDataPacket
* Handling the data packet.
*
* On return, will set *wakeup_mainthread, if a packet was received successfully
* and the caller should wake up the main thread, after releasing the mutex.
*/
static bool
handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen,
AckSendParam *param, bool *wakeup_mainthread)
{
if ((pkt->len == sizeof(icpkthdr)) && (pkt->flags & UDPIC_FLAGS_CAPACITY))
{
if (DEBUG1 >= log_min_messages)
write_log("status queuy message received, seq %d, srcpid %d, dstpid %d, icid %d, sid %d", pkt->seq, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->sessionId);
#ifdef AMS_VERBOSE_LOGGING
logPkt("STATUS QUERY MESSAGE", pkt);
#endif
uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
uint32 extraSeq = conn->stopRequested ? seq : conn->conn_info.extraSeq;
setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, seq, extraSeq);
return false;
}
/*
* when we're not doing a full-setup on every statement, we've got to
* update the peer info -- full setups do this at setup-time.
*/
/*
* Note the change here, for process start race and disordered message, if
* we do not fill in peer address, then we may send some acks to unknown
* address. Thus, the following condition is used.
*
*/
if (pkt->seq <= conn->pkt_q_capacity)
{
/* fill in the peer. Need to cast away "volatile". ugly */
memset((void *) &conn->peer, 0, sizeof(conn->peer));
memcpy((void *) &conn->peer, peer, *peerlen);
conn->peer_len = *peerlen;
conn->conn_info.dstListenerPort = pkt->dstListenerPort;
if (DEBUG2 >= log_min_messages)
write_log("received the head packets when eliding setup, pkt seq %d", pkt->seq);
}
/* data packet */
if (pkt->flags & UDPIC_FLAGS_EOS)
{
if (DEBUG3 >= log_min_messages)
write_log("received packet with EOS motid %d route %d seq %d",
pkt->motNodeId, conn->route, pkt->seq);
}
/*
* if we got a stop, but didn't request a stop -- ignore, this is a
* startup blip: we must have acked with a stop -- we don't want to do
* anything further with the stop-message if we didn't request a stop!
*
* this is especially important after eliding setup is enabled.
*/
if (!conn->stopRequested && (pkt->flags & UDPIC_FLAGS_STOP))
{
if (pkt->flags & UDPIC_FLAGS_EOS)
{
write_log("non-requested stop flag, EOS! seq %d, flags 0x%x", pkt->seq, pkt->flags);
}
return false;
}
if (conn->stopRequested && conn->stillActive)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG && DEBUG5 >= log_min_messages)
write_log("rx_thread got packet on active connection marked stopRequested. "
"(flags 0x%x) node %d route %d pkt seq %d conn seq %d",
pkt->flags, pkt->motNodeId, conn->route, pkt->seq, conn->conn_info.seq);
/* can we update stillActive ? */
if (DEBUG2 >= log_min_messages)
if (!(pkt->flags & UDPIC_FLAGS_STOP) &&
!(pkt->flags & UDPIC_FLAGS_EOS))
write_log("stop requested but no stop flag on return packet ?!");
if (pkt->flags & UDPIC_FLAGS_EOS)
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
if (conn->conn_info.seq < pkt->seq)
conn->conn_info.seq = pkt->seq; /* note here */
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);
/* we only update stillActive if eos has been sent by peer. */
if (pkt->flags & UDPIC_FLAGS_EOS)
{
if (DEBUG2 >= log_min_messages)
write_log("stop requested and acknowledged by sending peer");
conn->stillActive = false;
}
return false;
}
/* dropped ack or timeout */
if (pkt->seq < conn->conn_info.seq)
{
ic_statistics.duplicatedPktNum++;
if (DEBUG3 >= log_min_messages)
write_log("dropped ack ? ignored data packet w/ cmd %d conn->cmd %d node %d route %d seq %d expected %d flags 0x%x",
pkt->icId, conn->conn_info.icId, pkt->motNodeId,
conn->route, pkt->seq, conn->conn_info.seq, pkt->flags);
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
return false;
}
/* sequence number is correct */
if (!conn->stillActive)
{
/* peer may have dropped ack */
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE &&
DEBUG1 >= log_min_messages)
write_log("received on inactive connection node %d route %d (seq %d pkt->seq %d)",
pkt->motNodeId, conn->route, conn->conn_info.seq, pkt->seq);
if (conn->conn_info.seq < pkt->seq)
conn->conn_info.seq = pkt->seq;
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);
return false;
}
/* headSeq is the seq for the head packet. */
uint32 headSeq = conn->conn_info.seq - conn->pkt_q_size;
if ((conn->pkt_q_size == conn->pkt_q_capacity) || (pkt->seq - headSeq >= conn->pkt_q_capacity))
{
/*
* Error case: NO RX SPACE or out of range pkt This indicates a bug.
*/
logPkt("Interconnect error: received a packet when the queue is full ", pkt);
ic_statistics.disorderedPktNum++;
conn->stat_count_dropped++;
return false;
}
/* put the packet at the his position */
bool toWakeup = false;
int pos = (pkt->seq - 1) % conn->pkt_q_capacity;
if (conn->pkt_q[pos] == NULL)
{
conn->pkt_q[pos] = (uint8 *) pkt;
if (pos == conn->pkt_q_head)
{
#ifdef AMS_VERBOSE_LOGGING
write_log("SAVE pkt at QUEUE HEAD [seq %d] for node %d route %d, queue head seq %d, queue size %d, queue head %d queue tail %d", pkt->seq, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
toWakeup = true;
}
if (pos == conn->pkt_q_tail)
{
/* move the queue tail */
for (; conn->pkt_q[conn->pkt_q_tail] != NULL && conn->pkt_q_size < conn->pkt_q_capacity;)
{
conn->pkt_q_size++;
conn->pkt_q_tail = (conn->pkt_q_tail + 1) % conn->pkt_q_capacity;
conn->conn_info.seq++;
}
/* set the EOS flag */
if (((icpkthdr *) (conn->pkt_q[(conn->pkt_q_tail + conn->pkt_q_capacity - 1) % conn->pkt_q_capacity]))->flags & UDPIC_FLAGS_EOS)
{
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
if (DEBUG1 >= log_min_messages)
write_log("RX_THREAD: the packet with EOS flag is available for access in the queue for route %d", conn->route);
}
/* ack data packet */
setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
#ifdef AMS_VERBOSE_LOGGING
write_log("SAVE conn %p pkt at QUEUE TAIL [seq %d] at pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head %d queue tail %d", conn, pkt->seq, pos, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
}
else /* deal with out-of-order packet */
{
if (DEBUG1 >= log_min_messages)
write_log("SAVE conn %p OUT-OF-ORDER pkt [seq %d] at pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head %d queue tail %d", conn, pkt->seq, pos, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
/* send an ack for out-of-order packet */
ic_statistics.disorderedPktNum++;
handleDisorderPacket(conn, pos, headSeq + conn->pkt_q_size, pkt);
}
}
else /* duplicate pkt */
{
if (DEBUG1 >= log_min_messages)
write_log("DUPLICATE pkt [seq %d], [head seq] %d, queue size %d, queue head %d queue tail %d", pkt->seq, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
setAckSendParam(param, conn, UDPIC_FLAGS_DUPLICATE | conn->conn_info.flags, pkt->seq, conn->conn_info.seq - 1);
ic_statistics.duplicatedPktNum++;
return false;
}
/* Was the main thread waiting for something ? */
if (rx_control_info.mainWaitingState.waiting &&
rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId &&
rx_control_info.mainWaitingState.waitingQuery == pkt->icId && toWakeup)
{
if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
{
if (rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE)
rx_control_info.mainWaitingState.reachRoute = conn->route;
}
else if (rx_control_info.mainWaitingState.waitingRoute == conn->route)
{
if (DEBUG2 >= log_min_messages)
write_log("rx thread: main_waiting waking it route %d", rx_control_info.mainWaitingState.waitingRoute);
rx_control_info.mainWaitingState.reachRoute = conn->route;
}
/* WAKE MAIN THREAD HERE */
*wakeup_mainthread = true;
}
return true;
}
/*
* rxThreadFunc
* Main function of the receive background thread.
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static void *
rxThreadFunc(void *arg)
{
icpkthdr *pkt = NULL;
bool skip_poll = false;
for (;;)
{
struct pollfd nfd;
int n;
/* check shutdown condition */
if (pg_atomic_read_u32(&ic_control_info.shutdown) == 1)
{
if (DEBUG1 >= log_min_messages)
{
write_log("udp-ic: rx-thread shutting down");
}
break;
}
/* Try to get a buffer */
if (pkt == NULL)
{
pthread_mutex_lock(&ic_control_info.lock);
pkt = getRxBuffer(&rx_buffer_pool);
pthread_mutex_unlock(&ic_control_info.lock);
if (pkt == NULL)
{
setRxThreadError(ENOMEM);
continue;
}
}
if (!skip_poll)
{
/* Do we have inbound traffic to handle ? */
nfd.fd = UDP_listenerFd;
nfd.events = POLLIN;
n = poll(&nfd, 1, RX_THREAD_POLL_TIMEOUT);
if (pg_atomic_read_u32(&ic_control_info.shutdown) == 1)
{
if (DEBUG1 >= log_min_messages)
{
write_log("udp-ic: rx-thread shutting down");
}
break;
}
if (n < 0)
{
if (errno == EINTR)
continue;
/*
* ERROR case: if simply break out the loop here, there will
* be a hung here, since main thread will never be waken up,
* and senders will not get responses anymore.
*
* Thus, we set an error flag, and let main thread to report
* an error.
*/
setRxThreadError(errno);
continue;
}
if (n == 0)
continue;
}
if (skip_poll || (n == 1 && (nfd.events & POLLIN)))
{
/* we've got something interesting to read */
/* handle incoming */
/* ready to read on our socket */
MotionConn *conn = NULL;
int read_count = 0;
struct sockaddr_storage peer;
socklen_t peerlen;
peerlen = sizeof(peer);
read_count = recvfrom(UDP_listenerFd, (char *) pkt, Gp_max_packet_size, 0,
(struct sockaddr *) &peer, &peerlen);
if (pg_atomic_read_u32(&ic_control_info.shutdown) == 1)
{
if (DEBUG1 >= log_min_messages)
{
write_log("udp-ic: rx-thread shutting down");
}
break;
}
if (DEBUG5 >= log_min_messages)
write_log("received inbound len %d", read_count);
if (read_count < 0)
{
skip_poll = false;
if (errno == EWOULDBLOCK || errno == EINTR)
continue;
write_log("Interconnect error: recvfrom (%d)", errno);
/*
* ERROR case: if simply break out the loop here, there will
* be a hung here, since main thread will never be waken up,
* and senders will not get responses anymore.
*
* Thus, we set an error flag, and let main thread to report
* an error.
*/
setRxThreadError(errno);
continue;
}
if (read_count < sizeof(icpkthdr))
{
if (DEBUG1 >= log_min_messages)
write_log("Interconnect error: short conn receive (%d)", read_count);
continue;
}
/*
* when we get a "good" recvfrom() result, we can skip poll()
* until we get a bad one.
*/
skip_poll = true;
/* length must be >= 0 */
if (pkt->len < 0)
{
if (DEBUG3 >= log_min_messages)
write_log("received inbound with negative length");
continue;
}
if (pkt->len != read_count)
{
if (DEBUG3 >= log_min_messages)
write_log("received inbound packet [%d], short: read %d bytes, pkt->len %d", pkt->seq, read_count, pkt->len);
continue;
}
/*
* check the CRC of the payload.
*/
if (gp_interconnect_full_crc)
{
if (!checkCRC(pkt))
{
pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &ic_statistics.crcErrors, 1);
if (DEBUG2 >= log_min_messages)
write_log("received network data error, dropping bad packet, user data unaffected.");
continue;
}
}
#ifdef AMS_VERBOSE_LOGGING
logPkt("GOT MESSAGE", pkt);
#endif
bool wakeup_mainthread = false;
AckSendParam param;
memset(¶m, 0, sizeof(AckSendParam));
/*
* Get the connection for the pkt.
*
* The connection hash table should be locked until finishing the
* processing of the packet to avoid the connection
* addition/removal from the hash table during the mean time.
*/
pthread_mutex_lock(&ic_control_info.lock);
conn = findConnByHeader(&ic_control_info.connHtab, pkt);
if (conn != NULL)
{
/* Handling a regular packet */
if (handleDataPacket(conn, pkt, &peer, &peerlen, ¶m, &wakeup_mainthread))
pkt = NULL;
ic_statistics.recvPktNum++;
}
else
{
/*
* There may have two kinds of Mismatched packets: a) Past
* packets from previous command after I was torn down b)
* Future packets from current command before my connections
* are built.
*
* The handling logic is to "Ack the past and Nak the future".
*/
if ((pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER) == 0)
{
if (DEBUG1 >= log_min_messages)
write_log("mismatched packet received, seq %d, srcpid %d, dstpid %d, icid %d, sid %d", pkt->seq, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->sessionId);
#ifdef AMS_VERBOSE_LOGGING
logPkt("Got a Mismatched Packet", pkt);
#endif
if (handleMismatch(pkt, &peer, peerlen))
pkt = NULL;
ic_statistics.mismatchNum++;
}
}
pthread_mutex_unlock(&ic_control_info.lock);
if (wakeup_mainthread)
SetLatch(&ic_control_info.latch);
/*
* real ack sending is after lock release to decrease the lock
* holding time.
*/
if (param.msg.len != 0)
sendAckWithParam(¶m);
}
/* pthread_yield(); */
}
/* Before return, we release the packet. */
if (pkt)
{
pthread_mutex_lock(&ic_control_info.lock);
freeRxBuffer(&rx_buffer_pool, pkt);
pkt = NULL;
pthread_mutex_unlock(&ic_control_info.lock);
}
/* nothing to return */
return NULL;
}
/*
* handleMismatch
* If the mismatched packet is from an old connection, we may need to
* send an acknowledgment.
*
* We are called with the receiver-lock held, and we never release it.
*
* For QD:
* 1) Not in hashtable : NAK it/Do nothing
* Causes: a) Start race
* b) Before the entry for the ic instance is inserted, an error happened.
* c) From past transactions: should no happen.
* 2) Active in hashtable : NAK it/Do nothing
* Causes: a) Error reported after the entry is inserted, and connections are
* not inserted to the hashtable yet, and before teardown is called.
* 3) Inactive in hashtable: ACK it (with stop)
* Causes: a) Normal execution: after teardown is called on current command.
* b) Error case, 2a) after teardown is called.
* c) Normal execution: from past history transactions (should not happen).
*
* For QE:
* 1) pkt->id > ic_control_info.ic_instance_id : NAK it/Do nothing
* Causes: a) Start race
* b) Before ic_control_info.ic_instance_id is assigned to correct value, an error happened.
* 2) lastTornIcId < pkt->id == ic_control_info.ic_instance_id: NAK it/Do nothing
* Causes: a) Error reported after ic_control_info.ic_instance_id is set, and connections are
* not inserted to the hashtable yet, and before teardown is called.
* 3) lastTornIcId == pkt->id == ic_control_info.ic_instance_id: ACK it (with stop)
* Causes: a) Normal execution: after teardown is called on current command
* 4) pkt->id < ic_control_info.ic_instance_id: NAK it/Do nothing/ACK it.
* Causes: a) Should not happen.
*
*/
static bool
handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
{
bool cached = false;
/*
* we want to ack old packets; but *must* avoid acking connection
* requests:
*
* "ACK the past, NAK the future" explicit NAKs aren't necessary, we just
* don't want to ACK future packets, that confuses everyone.
*/
if (pkt->seq > 0 && pkt->sessionId == gp_session_id)
{
bool need_ack = false;
uint8 ack_flags = 0;
/*
* The QD-backends can't use a counter, they've potentially got
* multiple instances (one for each active cursor)
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
struct CursorICHistoryEntry *p;
p = getCursorIcEntry(&rx_control_info.cursorHistoryTable, pkt->icId);
if (p)
{
if (p->status == 0)
{
/* Torn down. Ack the past. */
need_ack = true;
}
else /* p->status == 1 */
{
/*
* Not torn down yet. It happens when an error
* (out-of-memory, network error...) occurred after the
* cursor entry is inserted into the table in interconnect
* setup process. The peer will be canceled.
*/
if (DEBUG1 >= log_min_messages)
write_log("GOT A MISMATCH PACKET WITH ID %d HISTORY THINKS IT IS ACTIVE", pkt->icId);
return cached; /* ignore, no ack */
}
}
else
{
if (DEBUG1 >= log_min_messages)
write_log("GOT A MISMATCH PACKET WITH ID %d HISTORY HAS NO RECORD", pkt->icId);
/*
* No record means that two possibilities. 1) It is from the
* future. It is due to startup race. We do not ack future
* packets 2) Before the entry for the ic instance is
* inserted, an error happened. We do not ack for this case
* too. The peer will be canceled.
*/
ack_flags = UDPIC_FLAGS_NAK;
need_ack = false;
if (gp_interconnect_cache_future_packets)
{
cached = cacheFuturePacket(pkt, peer, peer_len);
}
}
}
/* The QEs get to use a simple counter. */
else if (Gp_role == GP_ROLE_EXECUTE)
{
if (ic_control_info.ic_instance_id >= pkt->icId)
{
need_ack = true;
/*
* We want to "ACK the past, but NAK the future."
*
* handleAck() will retransmit.
*/
if (pkt->seq >= 1 && pkt->icId > rx_control_info.lastTornIcId)
{
ack_flags = UDPIC_FLAGS_NAK;
need_ack = false;
}
}
else
{
/*
* ic_control_info.ic_instance_id < pkt->icId, from the future
*/
if (gp_interconnect_cache_future_packets)
{
cached = cacheFuturePacket(pkt, peer, peer_len);
}
}
}
if (need_ack)
{
MotionConn dummyconn;
char buf[128]; /* numeric IP addresses shouldn't exceed
* about 50 chars, but play it safe */
memcpy(&dummyconn.conn_info, pkt, sizeof(icpkthdr));
dummyconn.peer = *peer;
dummyconn.peer_len = peer_len;
dummyconn.conn_info.flags |= ack_flags;
if (DEBUG1 >= log_min_messages)
write_log("ACKING PACKET WITH FLAGS: pkt->seq %d 0x%x [pkt->icId %d last-teardown %d interconnect_id %d]",
pkt->seq, dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, ic_control_info.ic_instance_id);
format_sockaddr(&dummyconn.peer, buf, sizeof(buf));
if (DEBUG1 >= log_min_messages)
write_log("ACKING PACKET TO %s", buf);
if ((ack_flags & UDPIC_FLAGS_NAK) == 0)
{
ack_flags |= UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_RECEIVER_TO_SENDER;
}
else
{
ack_flags |= UDPIC_FLAGS_RECEIVER_TO_SENDER;
}
/*
* There are two cases, we may need to send a response to sender
* here. One is start race and the other is receiver becomes idle.
*
* ack_flags here can take two possible values 1) UDPIC_FLAGS_NAK
* | UDPIC_FLAGS_RECEIVER_TO_SENDER (for start race) 2)
* UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY |
* UDPIC_FLAGS_RECEIVER_TO_SENDER (for idle receiver)
*
* The final flags in the packet may take some extra bits such as
* 1) UDPIC_FLAGS_STOP 2) UDPIC_FLAGS_EOS 3) UDPIC_FLAGS_CAPACITY
* which are from original packet
*/
sendAck(&dummyconn, ack_flags | dummyconn.conn_info.flags, dummyconn.conn_info.seq, dummyconn.conn_info.seq);
}
}
else
{
if (DEBUG1 >= log_min_messages)
write_log("dropping packet from command-id %d seq %d (my cmd %d)", pkt->icId, pkt->seq, ic_control_info.ic_instance_id);
}
return cached;
}
/*
* cacheFuturePacket
* Cache the future packets during the setupUDPIFCInterconnect.
*
* Return true if packet is cached, otherwise false
*/
static bool
cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
{
MotionConn *conn;
conn = findConnByHeader(&ic_control_info.startupCacheHtab, pkt);
if (conn == NULL)
{
conn = malloc(sizeof(MotionConn));
if (conn == NULL)
{
setRxThreadError(errno);
return false;
}
memset((void *) conn, 0, sizeof(MotionConn));
memcpy(&conn->conn_info, pkt, sizeof(icpkthdr));
conn->pkt_q_capacity = Gp_interconnect_queue_depth;
conn->pkt_q_size = Gp_interconnect_queue_depth;
conn->pkt_q = (uint8 **) malloc(Gp_interconnect_queue_depth * sizeof(uint8 *));
if (conn->pkt_q == NULL)
{
/* malloc failed. */
free(conn);
setRxThreadError(errno);
return false;
}
/* We only use the array to store cached packets. */
memset(conn->pkt_q, 0, Gp_interconnect_queue_depth * sizeof(uint8 *));
/* Put connection to the hashtable. */
if (!connAddHash(&ic_control_info.startupCacheHtab, conn))
{
free(conn->pkt_q);
free(conn);
setRxThreadError(errno);
return false;
}
/* Setup the peer sock information. */
memcpy(&conn->peer, peer, peer_len);
conn->peer_len = peer_len;
}
/*
* Reject packets with invalid sequence numbers and packets which have
* been cached before.
*/
if (pkt->seq > conn->pkt_q_size || pkt->seq == 0 || conn->pkt_q[pkt->seq - 1] != NULL)
return false;
conn->pkt_q[pkt->seq - 1] = (uint8 *) pkt;
rx_buffer_pool.maxCount++;
ic_statistics.startupCachedPktNum++;
return true;
}
/*
* cleanupStartupCache
* Clean the startup cache.
*/
static void
cleanupStartupCache()
{
ConnHtabBin *bin = NULL;
MotionConn *cachedConn = NULL;
icpkthdr *pkt = NULL;
int i = 0;
int j = 0;
for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
{
bin = ic_control_info.startupCacheHtab.table[i];
while (bin)
{
cachedConn = bin->conn;
for (j = 0; j < cachedConn->pkt_q_size; j++)
{
pkt = (icpkthdr *) cachedConn->pkt_q[j];
if (pkt == NULL)
continue;
rx_buffer_pool.maxCount--;
putRxBufferToFreeList(&rx_buffer_pool, pkt);
cachedConn->pkt_q[j] = NULL;
}
bin = bin->next;
connDelHash(&ic_control_info.startupCacheHtab, cachedConn);
/*
* MPP-19981 free the cached connections; otherwise memory leak
* would be introduced.
*/
free(cachedConn->pkt_q);
free(cachedConn);
}
}
}
/* The following functions are facility methods for debugging.
* They are quite useful when there are a large number of connections.
* These functions can be called from gdb to output internal information to a file.
*/
/*
* dumpICBufferList_Internal
* Dump a buffer list.
*/
static void
dumpICBufferList_Internal(ICBufferList *list, FILE *ofile)
{
ICBufferLink *bufLink = list->head.next;
int len = list->length;
int i = 0;
fprintf(ofile, "List Length %d\n", len);
while (bufLink != &list->head && len > 0)
{
ICBuffer *buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
: GET_ICBUFFER_FROM_SECONDARY(bufLink));
fprintf(ofile, "Node %d, linkptr %p ", i++, bufLink);
fprintf(ofile, "Packet Content [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d\n",
buf->pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
buf->pkt->seq, buf->pkt->extraSeq, buf->pkt->motNodeId, buf->pkt->crc, buf->pkt->len,
buf->pkt->srcContentId, buf->pkt->dstContentId,
buf->pkt->srcPid, buf->pkt->dstPid,
buf->pkt->srcListenerPort, buf->pkt->dstListenerPort,
buf->pkt->sendSliceIndex, buf->pkt->recvSliceIndex,
buf->pkt->sessionId, buf->pkt->icId,
buf->pkt->flags);
bufLink = bufLink->next;
len--;
}
}
/*
* dumpICBufferList
* Dump a buffer list.
*/
void
dumpICBufferList(ICBufferList *list, const char *fname)
{
FILE *ofile = fopen(fname, "w+");
dumpICBufferList_Internal(list, ofile);
fclose(ofile);
}
/*
* dumpUnackQueueRing
* Dump an unack queue ring.
*/
void
dumpUnackQueueRing(const char *fname)
{
FILE *ofile = fopen(fname, "w+");
int i;
fprintf(ofile, "UnackQueueRing: currentTime " UINT64_FORMAT ", idx %d numOutstanding %d numSharedOutstanding %d\n",
unack_queue_ring.currentTime, unack_queue_ring.idx,
unack_queue_ring.numOutStanding, unack_queue_ring.numSharedOutStanding);
fprintf(ofile, "==================================\n");
for (i = 0; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
{
if (icBufferListLength(&unack_queue_ring.slots[i]) > 0)
{
dumpICBufferList_Internal(&unack_queue_ring.slots[i], ofile);
}
}
fclose(ofile);
}
/*
* dumpConnections
* Dump connections.
*/
void
dumpConnections(ChunkTransportStateEntry *pEntry, const char *fname)
{
int i,
j;
MotionConn *conn;
FILE *ofile = fopen(fname, "w+");
fprintf(ofile, "Entry connections: conn num %d \n", pEntry->numConns);
fprintf(ofile, "==================================\n");
for (i = 0; i < pEntry->numConns; i++)
{
conn = &pEntry->conns[i];
fprintf(ofile, "conns[%d] motNodeId=%d: remoteContentId=%d pid=%d sockfd=%d remote=%s local=%s "
"capacity=%d sentSeq=%d receivedAckSeq=%d consumedSeq=%d rtt=" UINT64_FORMAT
" dev=" UINT64_FORMAT " deadlockCheckBeginTime=" UINT64_FORMAT " route=%d msgSize=%d msgPos=%p"
" recvBytes=%d tupleCount=%d stillActive=%d stopRequested=%d "
"state=%d\n",
i, pEntry->motNodeId,
conn->remoteContentId,
conn->cdbProc ? conn->cdbProc->pid : 0,
conn->sockfd,
conn->remoteHostAndPort,
conn->localHostAndPort,
conn->capacity, conn->sentSeq, conn->receivedAckSeq, conn->consumedSeq,
conn->rtt, conn->dev, conn->deadlockCheckBeginTime, conn->route, conn->msgSize, conn->msgPos,
conn->recvBytes, conn->tupleCount, conn->stillActive, conn->stopRequested,
conn->state);
fprintf(ofile, "conn_info [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d\n",
conn->conn_info.flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
conn->conn_info.seq, conn->conn_info.extraSeq, conn->conn_info.motNodeId, conn->conn_info.crc, conn->conn_info.len,
conn->conn_info.srcContentId, conn->conn_info.dstContentId,
conn->conn_info.srcPid, conn->conn_info.dstPid,
conn->conn_info.srcListenerPort, conn->conn_info.dstListenerPort,
conn->conn_info.sendSliceIndex, conn->conn_info.recvSliceIndex,
conn->conn_info.sessionId, conn->conn_info.icId,
conn->conn_info.flags);
if (!ic_control_info.isSender)
{
fprintf(ofile, "pkt_q_size=%d pkt_q_head=%d pkt_q_tail=%d pkt_q=%p\n", conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail, conn->pkt_q);
for (j = 0; j < conn->pkt_q_capacity; j++)
{
if (conn->pkt_q != NULL && conn->pkt_q[j] != NULL)
{
icpkthdr *pkt = (icpkthdr *) conn->pkt_q[j];
fprintf(ofile, "Packet (pos %d) Info [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d\n",
j,
pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
pkt->seq, pkt->extraSeq, pkt->motNodeId, pkt->crc, pkt->len,
pkt->srcContentId, pkt->dstContentId,
pkt->srcPid, pkt->dstPid,
pkt->srcListenerPort, pkt->dstListenerPort,
pkt->sendSliceIndex, pkt->recvSliceIndex,
pkt->sessionId, pkt->icId,
pkt->flags);
}
}
}
if (ic_control_info.isSender)
{
fprintf(ofile, "sndQueue ");
dumpICBufferList_Internal(&conn->sndQueue, ofile);
fprintf(ofile, "unackQueue ");
dumpICBufferList_Internal(&conn->unackQueue, ofile);
}
fprintf(ofile, "\n");
}
fclose(ofile);
}
void
WaitInterconnectQuitUDPIFC(void)
{
/*
* Just in case ic thread is waiting on the locks.
*/
pthread_mutex_unlock(&ic_control_info.lock);
pg_atomic_write_u32(&ic_control_info.shutdown, 1);
if (ic_control_info.threadCreated)
{
SendDummyPacket();
pthread_join(ic_control_info.threadHandle, NULL);
}
ic_control_info.threadCreated = false;
}
/*
* Send a dummy packet to interconnect thread to exit poll() immediately
*/
static void
SendDummyPacket(void)
{
int sockfd = -1;
int ret;
struct addrinfo *addrs = NULL;
struct addrinfo *rp;
struct addrinfo hint;
uint16 udp_listener;
char port_str[32] = {0};
char *dummy_pkt = "stop it";
int counter;
/*
* Get address info from interconnect udp listener port
*/
udp_listener = (Gp_listener_port >> 16) & 0x0ffff;
snprintf(port_str, sizeof(port_str), "%d", udp_listener);
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_DGRAM;
hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */
/* Never do name resolution */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
#else
hint.ai_flags = AI_NUMERICHOST;
#endif
ret = pg_getaddrinfo_all(interconnect_address, port_str, &hint, &addrs);
if (ret || !addrs)
{
elog(LOG, "send dummy packet failed, pg_getaddrinfo_all(): %m");
goto send_error;
}
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/* Create socket according to pg_getaddrinfo_all() */
sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sockfd < 0)
continue;
if (!pg_set_noblock(sockfd))
{
if (sockfd >= 0)
{
closesocket(sockfd);
sockfd = -1;
}
continue;
}
break;
}
if (rp == NULL)
{
elog(LOG, "send dummy packet failed, create socket failed: %m");
goto send_error;
}
/*
* Send a dummy package to the interconnect listener, try 10 times
*/
counter = 0;
while (counter < 10)
{
counter++;
ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen);
if (ret < 0)
{
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
else
{
elog(LOG, "send dummy packet failed, sendto failed: %m");
goto send_error;
}
}
break;
}
if (counter >= 10)
{
elog(LOG, "send dummy packet failed, sendto failed with 10 times: %m");
goto send_error;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
closesocket(sockfd);
return;
send_error:
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
if (sockfd != -1)
closesocket(sockfd);
return;
}
uint32
getActiveMotionConns(void)
{
return ic_statistics.activeConnectionsNum;
}
相关信息
相关文章
greenplumn ic_proxy_backend 源码
greenplumn ic_proxy_backend 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦