greenplumn ic_proxy_router 源码
greenplumn ic_proxy_router 代码
文件路径:/src/backend/cdb/motion/ic_proxy_router.c
/*-------------------------------------------------------------------------
*
* ic_proxy_router.c
*
* Interconnect Proxy Router
*
* A router routes a packet to the correct target, a client or a peer.
*
* Copyright (c) 2020-Present VMware, Inc. or its affiliates.
*
*
*-------------------------------------------------------------------------
*/
#include "ic_proxy.h"
#include "ic_proxy_server.h"
#include "ic_proxy_router.h"
#include "ic_proxy_packet.h"
#include "ic_proxy_pkt_cache.h"
#include "ic_proxy_server.h"
typedef struct ICProxyWriteReq ICProxyWriteReq;
typedef struct ICProxyLoopback ICProxyLoopback;
/*
* A router write request.
*
* It is similar to the libuv write request, however the router will take care
* of the common part, such as the freeing of the packet and the request, so
* the caller can focus on the real business.
*/
struct ICProxyWriteReq
{
uv_write_t req; /* the libuv write request */
ic_proxy_sent_cb callback; /* the callback */
void *opaque; /* the callback data */
};
/*
* The loopback packet queue.
*
* Loopback packets can not be routed immediately, refer to ICProxyDelay for
* details. They are first put in a queue, and are actually routed in a libuv
* check callback, it is triggered after all the current I/O events are
* handled, so there will be no misordered packets, and no reentrance to some
* critical functions.
*/
struct ICProxyLoopback
{
uv_check_t check; /* the libuv check handle */
List *queue; /* List<ICProxyWriteReq *> */
};
static ICProxyLoopback ic_proxy_router_loopback;
/*
* The loopback check is triggered.
*/
static void
ic_proxy_router_loopback_on_check(uv_check_t *handle)
{
List *queue;
ListCell *cell;
/*
* Stop the check callback, all the queued loopback packets will be routed
* in this round. This must happen before routing the packets, so if new
* loopback packets are queued, the check callback can be re-turned on,
* those packets will be handled next round.
*
* TODO: the new loopback packets can be handled in this round, too.
* Queueing them to next round means it will not be triggered until some
* I/O events happen. In current logic there is the per second timer, so
* in worst case the new loopback packets are delayed for 1 second. If the
* timer is paused in the future, as an optimization, then in worst case
* the new packets may not get a chance to be routed. The only concern on
* handling them now is that if a infinite ping-pong happens, this function
* would never return to the mainloop. It's unlikely to happen, though,
* and we could prevent that by adding a round count limit.
*/
uv_check_stop(&ic_proxy_router_loopback.check);
/*
* We must detach the queue before handling it, in case some new packets
* are queued during the process
*/
queue = ic_proxy_router_loopback.queue;
ic_proxy_router_loopback.queue = NIL;
foreach(cell, queue)
{
ICProxyDelay *delay = lfirst(cell);
ICProxyClient *client;
ICProxyKey key;
/* Loopback packets are always to a loopback client */
ic_proxy_key_from_p2c_pkt(&key, delay->pkt);
client = ic_proxy_client_blessed_lookup(handle->loop, &key);
ic_proxy_log(LOG, "ic-proxy-router: looped back %s to %s",
ic_proxy_pkt_to_str(delay->pkt),
ic_proxy_client_get_name(client));
ic_proxy_client_on_p2c_data(client, delay->pkt,
delay->callback, delay->opaque);
/* do not forget to call the callback */
if (delay->callback)
delay->callback(delay->opaque, delay->pkt, 0);
/* and do not forget to free the memory */
ic_proxy_free(delay);
}
list_free(queue);
}
/*
* Push a loopback packet to the queue.
*/
static void
ic_proxy_router_loopback_push(ICProxyPkt *pkt,
ic_proxy_sent_cb callback, void *opaque)
{
ICProxyDelay *delay;
ic_proxy_log(LOG, "ic-proxy-router: looping back %s",
ic_proxy_pkt_to_str(pkt));
/*
* Enable the libuv check callback if not yet.
*/
if (ic_proxy_router_loopback.queue == NIL)
uv_check_start(&ic_proxy_router_loopback.check,
ic_proxy_router_loopback_on_check);
/*
* Loopback packets are always to a loopback client, so it's safe to pass
* NULL as the peer.
*/
delay = ic_proxy_peer_build_delay(NULL, pkt, callback, opaque);
ic_proxy_router_loopback.queue = lappend(ic_proxy_router_loopback.queue, delay);
}
/*
* Initialize the router.
*/
void
ic_proxy_router_init(uv_loop_t *loop)
{
uv_check_init(loop, &ic_proxy_router_loopback.check);
ic_proxy_router_loopback.queue = NIL;
}
/*
* Cleanup the router.
*/
void
ic_proxy_router_uninit(void)
{
List *queue;
ListCell *cell;
queue = ic_proxy_router_loopback.queue;
ic_proxy_router_loopback.queue = NIL;
uv_check_stop(&ic_proxy_router_loopback.check);
foreach(cell, queue)
{
ICProxyDelay *delay = lfirst(cell);
/*
* TODO: this function is only called on exiting, so it's better to
* drop the callbacks silently, right?
*/
#if 0
if (delay->callback)
delay->callback(delay->opaque, pkt, UV_ECANCELED);
#endif
ic_proxy_pkt_cache_free(delay->pkt);
ic_proxy_free(delay);
}
list_free(queue);
}
/*
* Route a packet.
*/
void
ic_proxy_router_route(uv_loop_t *loop, ICProxyPkt *pkt,
ic_proxy_sent_cb callback, void *opaque)
{
if (pkt->dstDbid == pkt->srcDbid)
{
/*
* For a loopback target, we do not need to send the packet via a peer,
* we could pass the packet to the target client via
* ic_proxy_client_on_p2c_data(), however that function is not
* reentrantable, which happens on PAUSE & RESUME messages, so we must
* schedule it in a libuv check callback.
*
* TODO: when callback is NULL, we could pass the packet immediately.
*/
ic_proxy_router_loopback_push(pkt, callback, opaque);
}
else if (pkt->dstDbid == GpIdentity.dbid)
{
ICProxyClient *client;
ICProxyKey key;
ic_proxy_key_from_p2c_pkt(&key, pkt);
client = ic_proxy_client_blessed_lookup(loop, &key);
ic_proxy_log(LOG, "ic-proxy-router: routing %s to %s",
ic_proxy_pkt_to_str(pkt),
ic_proxy_client_get_name(client));
ic_proxy_client_on_p2c_data(client, pkt, callback, opaque);
}
else
{
ICProxyPeer *peer;
peer = ic_proxy_peer_blessed_lookup(loop,
pkt->dstContentId, pkt->dstDbid);
ic_proxy_log(LOG, "ic-proxy-router: routing %s to %s",
ic_proxy_pkt_to_str(pkt), peer->name);
ic_proxy_peer_route_data(peer, pkt, callback, opaque);
}
}
/*
* The packet is written.
*/
static void
ic_proxy_router_on_write(uv_write_t *req, int status)
{
ICProxyWriteReq *wreq = (ICProxyWriteReq *) req;
ICProxyPkt *pkt = req->data;
if (status < 0)
ic_proxy_log(LOG, "ic-proxy-router: fail to send %s: %s",
ic_proxy_pkt_to_str(pkt), uv_strerror(status));
else
ic_proxy_log(LOG, "ic-proxy-router: sent %s",
ic_proxy_pkt_to_str(pkt));
if (wreq->callback)
wreq->callback(wreq->opaque, pkt, status);
ic_proxy_pkt_cache_free(pkt);
ic_proxy_free(req);
}
/*
* Write a packet to a libuv stream.
*
* This is a simple wrapper for the uv_write() function. The boring parts,
* like buffer & request management, are handled by this wrapper, so the caller
* can focus on the real business.
*
* It can write the packet at a specific offset, this is useful when writing
* data from the client to the backend, the backend wants headless data, so the
* client can specify sizeof(ICProxyPkt) as the offset.
*
* - stream: the target stream, usually a peer or a client;
* - pkt: the data to write, the ownership is taken;
* - offset: the data offset to write from, usually 0 when writing to a peer,
* or sizeof(ICProxyPkt) when writing to a client;
* - callback: the callback function;
* - opaque: the callback data;
*/
void
ic_proxy_router_write(uv_stream_t *stream, ICProxyPkt *pkt, int32 offset,
ic_proxy_sent_cb callback, void *opaque)
{
ICProxyWriteReq *wreq;
uv_buf_t wbuf;
ic_proxy_log(LOG, "ic-proxy-router: sending %s", ic_proxy_pkt_to_str(pkt));
wreq = ic_proxy_new(ICProxyWriteReq);
wreq->req.data = pkt;
wreq->callback = callback;
wreq->opaque = opaque;
wbuf.base = ((char *) pkt) + offset;
wbuf.len = pkt->len - offset;
uv_write(&wreq->req, stream, &wbuf, 1, ic_proxy_router_on_write);
}
相关信息
相关文章
greenplumn ic_proxy_backend 源码
greenplumn ic_proxy_backend 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦