greenplumn ic_proxy_main 源码
greenplumn ic_proxy_main 代码
文件路径:/src/backend/cdb/motion/ic_proxy_main.c
/*-------------------------------------------------------------------------
*
* ic_proxy_main.c
*
* The main loop of the ic-proxy, it listens for both new peers and new
* clients, it also establish the peer connections.
*
*
* Copyright (c) 2020-Present VMware, Inc. or its affiliates.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "ic_proxy_server.h"
#include "ic_proxy_addr.h"
#include "ic_proxy_pkt_cache.h"
#include <uv.h>
#include <unistd.h>
static void ic_proxy_server_peer_listener_init(uv_loop_t *loop);
static uv_loop_t ic_proxy_server_loop;
static uv_signal_t ic_proxy_server_signal_hup;
static uv_signal_t ic_proxy_server_signal_int;
static uv_signal_t ic_proxy_server_signal_term;
static uv_signal_t ic_proxy_server_signal_stop;
static uv_timer_t ic_proxy_server_timer;
static uv_tcp_t ic_proxy_peer_listener;
static bool ic_proxy_peer_listening;
static bool ic_proxy_peer_relistening;
static uv_pipe_t ic_proxy_client_listener;
static bool ic_proxy_client_listening;
static int ic_proxy_server_exit_code = 1;
/* pipe to check whether postmaster is alive */
static uv_pipe_t ic_proxy_postmaster_pipe;
/*
* The peer listener is closed.
*/
static void
ic_proxy_server_peer_listener_on_closed(uv_handle_t *handle)
{
ic_proxy_log(LOG, "ic-proxy-server: peer listener: closed");
/* A new peer listener will be created on the next timer callback */
ic_proxy_peer_listening = false;
/* If relisten is requested, do it now */
if (ic_proxy_peer_relistening)
{
ic_proxy_peer_relistening = false;
ic_proxy_server_peer_listener_init(handle->loop);
}
}
/*
* New peer arrives.
*/
static void
ic_proxy_server_on_new_peer(uv_stream_t *server, int status)
{
ICProxyPeer *peer;
int ret;
if (status < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: new peer error: %s",
uv_strerror(status));
uv_close((uv_handle_t *) server,
ic_proxy_server_peer_listener_on_closed);
return;
}
ic_proxy_log(LOG, "ic-proxy-server: new peer to the server");
peer = ic_proxy_peer_new(server->loop,
IC_PROXY_INVALID_CONTENT, IC_PROXY_INVALID_DBID);
ret = uv_accept(server, (uv_stream_t *) &peer->tcp);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: fail to accept new peer: %s",
uv_strerror(ret));
ic_proxy_peer_free(peer);
return;
}
/* TODO: it is better to only touch the states in peer.c */
peer->state |= IC_PROXY_PEER_STATE_ACCEPTED;
/* Dump some connection information, not very useful though */
{
struct sockaddr_storage peeraddr;
int addrlen = sizeof(peeraddr);
char name[HOST_NAME_MAX];
uv_tcp_getpeername(&peer->tcp, (struct sockaddr *) &peeraddr, &addrlen);
if (peeraddr.ss_family == AF_INET)
{
struct sockaddr_in *peeraddr4 = (struct sockaddr_in *) &peeraddr;
uv_ip4_name(peeraddr4, name, sizeof(name));
ic_proxy_log(LOG, "ic-proxy-server: the new peer is from %s:%d",
name, ntohs(peeraddr4->sin_port));
}
else if (peeraddr.ss_family == AF_INET6)
{
struct sockaddr_in6 *peeraddr6 = (struct sockaddr_in6 *) &peeraddr;
uv_ip6_name(peeraddr6, name, sizeof(name));
ic_proxy_log(LOG, "ic-proxy-server: the new peer is from %s:%d",
name, ntohs(peeraddr6->sin6_port));
}
}
ic_proxy_peer_read_hello(peer);
}
/*
* Setup the peer listener.
*
* The peer listener listens on a tcp socket, the peer connections will come
* through it.
*/
static void
ic_proxy_server_peer_listener_init(uv_loop_t *loop)
{
const ICProxyAddr *addr;
uv_tcp_t *listener = &ic_proxy_peer_listener;
int fd = -1;
int ret;
if (ic_proxy_addrs == NIL)
return;
if (ic_proxy_peer_listening)
return;
/* Get the addr from the gp_interconnect_proxy_addresses */
addr = ic_proxy_get_my_addr();
if (addr == NULL)
/* Cannot get my addr, maybe the setting is invalid */
return;
#if IC_PROXY_LOG_LEVEL <= LOG
{
char name[HOST_NAME_MAX] = "unknown";
int port = 0;
int family;
int ret;
ret = ic_proxy_extract_sockaddr((struct sockaddr *) &addr->sockaddr,
name, sizeof(name), &port, &family);
if (ret == 0)
ic_proxy_log(LOG,
"ic-proxy-server: setting up peer listener on %s:%s (%s:%d family=%d)",
addr->hostname, addr->service, name, port, family);
else
ic_proxy_log(WARNING,
"ic-proxy-server: setting up peer listener on %s:%s (%s:%d family=%d) (fail to extract the address: %s)",
addr->hostname, addr->service, name, port, family,
uv_strerror(ret));
}
#endif /* IC_PROXY_LOG_LEVEL <= LOG */
/*
* It is important to set TCP_NODELAY, otherwise we will suffer from
* significant latency and get very bad OLTP performance.
*/
uv_tcp_init(loop, listener);
uv_tcp_nodelay(listener, true);
ret = uv_tcp_bind(listener, (struct sockaddr *) &addr->sockaddr, 0);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: tcp: fail to bind: %s",
uv_strerror(ret));
return;
}
ret = uv_listen((uv_stream_t *) listener,
IC_PROXY_BACKLOG, ic_proxy_server_on_new_peer);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: tcp: fail to listen: %s",
uv_strerror(ret));
return;
}
uv_fileno((uv_handle_t *) listener, &fd);
ic_proxy_log(LOG, "ic-proxy-server: tcp: listening on socket %d", fd);
ic_proxy_peer_listening = true;
}
/*
* Reinit the peer listener.
*/
static void
ic_proxy_server_peer_listener_reinit(uv_loop_t *loop)
{
const ICProxyAddr *myaddr = ic_proxy_get_my_addr();
if (ic_proxy_peer_relistening)
return;
if (ic_proxy_peer_listening)
{
/*
* We are listening already, so must first close the current one, we
* keep the ic_proxy_peer_listening as true during the process to
* prevent double connect.
*/
ic_proxy_log(LOG, "ic-proxy-server: closing the legacy peer listener");
/* Only recreate a new listener if an address is assigned to us */
ic_proxy_peer_relistening = !!myaddr;
uv_close((uv_handle_t *) &ic_proxy_peer_listener,
ic_proxy_server_peer_listener_on_closed);
}
else if (myaddr)
{
/* Otherwise simply establish a new one */
ic_proxy_peer_relistening = false;
ic_proxy_server_peer_listener_init(loop);
}
}
/*
* The client listener is closed.
*/
static void
ic_proxy_server_client_listener_on_closed(uv_handle_t *handle)
{
ic_proxy_log(LOG, "ic-proxy-server: client listener: closed");
/* A new client listener will be created on the next timer callback */
ic_proxy_client_listening = false;
}
/*
* New client arrives.
*/
static void
ic_proxy_server_on_new_client(uv_stream_t *server, int status)
{
ICProxyClient *client;
int ret;
if (status < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: new client error: %s",
uv_strerror(status));
uv_close((uv_handle_t *) server,
ic_proxy_server_client_listener_on_closed);
return;
}
ic_proxy_log(LOG, "ic-proxy-server: new client to the server");
client = ic_proxy_client_new(server->loop, false);
ret = uv_accept(server, ic_proxy_client_get_stream(client));
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: fail to accept new client: %s",
uv_strerror(ret));
return;
}
ic_proxy_client_read_hello(client);
}
/*
* Setup the client listener.
*
* The client listener listens on a domain socket, the client connections will
* come through it.
*/
static void
ic_proxy_server_client_listener_init(uv_loop_t *loop)
{
uv_pipe_t *listener = &ic_proxy_client_listener;
char path[MAXPGPATH];
int fd = -1;
int ret;
if (ic_proxy_client_listening)
return;
ic_proxy_build_server_sock_path(path, sizeof(path));
/* FIXME: do not unlink here */
ic_proxy_log(LOG, "unlink(%s) ...", path);
unlink(path);
ic_proxy_log(LOG, "ic-proxy-server: setting up client listener on address %s",
path);
ret = uv_pipe_init(loop, listener, false);
if (ret < 0)
{
ic_proxy_log(WARNING,
"ic-proxy-server: fail to init a client listener: %s",
uv_strerror(ret));
return;
}
ret = uv_pipe_bind(listener, path);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: pipe: fail to bind(%s): %s",
path, uv_strerror(ret));
return;
}
ret = uv_listen((uv_stream_t *) listener,
IC_PROXY_BACKLOG, ic_proxy_server_on_new_client);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: pipe: fail to listen on path %s: %s",
path, uv_strerror(ret));
return;
}
uv_fileno((uv_handle_t *) listener, &fd);
ic_proxy_log(LOG, "ic-proxy-server: pipe: listening on socket %d", fd);
/*
* Dump the inode of the domain socket file, this helps us to know that the
* file is replaced by someone. This is not likely to happen, we have
* carefully choosen the file path to not conflict with each other.
*/
{
struct stat st;
stat(path, &st);
ic_proxy_log(LOG, "ic-proxy-server: dev=%lu, inode=%lu, path=%s",
st.st_dev, st.st_ino, path);
}
ic_proxy_client_listening = true;
}
/*
* Establish the peer connections.
*
* A proxy connects to all the other proxies, all these connections form the
* proxy network. Only one connection is needed between 2 proxies, this is
* ensured by a policy that "proxy X connects to proxy Y iff X > Y". To support
* mirror promotion, X attempts to connect to Y even if Y is a mirror, or even
* if we have connected to Y's primary. In fact we do not know whether Y is a
* mirror or not, and we do not care.
*/
static void
ic_proxy_server_ensure_peers(uv_loop_t *loop)
{
ListCell *cell;
foreach(cell, ic_proxy_addrs)
{
ICProxyAddr *addr = lfirst(cell);
ICProxyPeer *peer;
if (addr->content >= GpIdentity.segindex)
continue;
if (addr->dbid == GpIdentity.dbid)
continue; /* do not connect to my primary / mirror */
/*
* First get the peer with the peer id, then connect to it. The peer
* can be a placeholder, can be in the progress of a connection, or can
* be connected, the ic_proxy_peer_connect() function will take care of
* the state.
*/
peer = ic_proxy_peer_blessed_lookup(loop, addr->content, addr->dbid);
ic_proxy_peer_connect(peer, (struct sockaddr_in *) addr);
}
}
/*
* Drop legacy peers.
*
* The list ic_proxy_removed_addrs contains both removed and updated addresses,
* the corresponding peers must be disconnected before taking further actions.
*/
static void
ic_proxy_server_drop_legacy_peers(uv_loop_t *loop)
{
ListCell *cell;
const ICProxyAddr *myaddr = ic_proxy_get_my_addr();
/*
* Also take the chance to check the peer listener.
*
* If myaddr cannot be found at all, the address must have been removed,
* close the current listener.
*/
if (!myaddr)
ic_proxy_server_peer_listener_reinit(loop);
foreach(cell, ic_proxy_removed_addrs)
{
ICProxyAddr *addr = lfirst(cell);
ICProxyPeer *peer;
/*
* Also take the chance to check the peer listener.
*
* If myaddr appears in the removed list, then the address must have
* been changed or removed, no need to compare the sockaddrs again.
*/
if (myaddr && myaddr->dbid == addr->dbid)
ic_proxy_server_peer_listener_reinit(loop);
/*
* Refer to ic_proxy_server_ensure_peers() on why we need below checks.
*/
if (addr->content >= GpIdentity.segindex)
continue;
if (addr->dbid == GpIdentity.dbid)
continue; /* do not connect to my primary / mirror */
peer = ic_proxy_peer_lookup(addr->content, addr->dbid);
if (!peer)
continue;
ic_proxy_peer_disconnect(peer);
}
}
/*
* Timer handler.
*
* This is used to maintain the proxy-proxy network, as well as the client and
* peer listeners.
*/
static void
ic_proxy_server_on_timer(uv_timer_t *timer)
{
ic_proxy_server_peer_listener_init(timer->loop);
ic_proxy_server_ensure_peers(timer->loop);
ic_proxy_server_client_listener_init(timer->loop);
}
/*
* Signal handler.
*
* Signals are handled via the signalfd() call in libuv, so this is a normal
* callback as others, nothing special, errors can be raised, too.
*/
static void
ic_proxy_server_on_signal(uv_signal_t *handle, int signum)
{
ic_proxy_log(WARNING, "ic-proxy-server: received signal %d", signum);
if (signum == SIGHUP)
{
ProcessConfigFile(PGC_SIGHUP);
ic_proxy_reload_addresses(handle->loop);
ic_proxy_server_drop_legacy_peers(handle->loop);
ic_proxy_server_peer_listener_init(handle->loop);
ic_proxy_server_ensure_peers(handle->loop);
ic_proxy_server_client_listener_init(handle->loop);
}
else
{
uv_stop(handle->loop);
}
}
/*
* callback when received data from ic_proxy_postmaster_pipe
*/
static void
ic_proxy_server_on_read_postmaster_pipe(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
/* return the pkt to cache freelist, we don't care about the buffer content */
if (buf->base)
ic_proxy_pkt_cache_free(buf->base);
/* nread = 0 means EAGAIN and EWOULDBLOCK, while nread = EOF means postmaster is dead */
if (nread == UV_EOF)
proc_exit(1);
else if (nread < 0)
ic_proxy_log(FATAL, "read on postmaster death monitoring pipe failed: %s", uv_strerror(nread));
else if (nread > 0)
ic_proxy_log(FATAL, "unexpected data in postmaster death monitoring pipe with length: %ld", nread);
}
/*
* The main loop of the ic-proxy.
*/
int
ic_proxy_server_main(void)
{
char path[MAXPGPATH];
ic_proxy_log(LOG, "ic-proxy-server: setting up");
ic_proxy_pkt_cache_init(IC_PROXY_MAX_PKT_SIZE);
uv_loop_init(&ic_proxy_server_loop);
ic_proxy_reload_addresses(&ic_proxy_server_loop);
ic_proxy_router_init(&ic_proxy_server_loop);
ic_proxy_peer_table_init();
ic_proxy_client_table_init();
ic_proxy_peer_listening = false;
ic_proxy_peer_relistening = false;
ic_proxy_client_listening = false;
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_hup);
uv_signal_start(&ic_proxy_server_signal_hup, ic_proxy_server_on_signal, SIGHUP);
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_int);
uv_signal_start(&ic_proxy_server_signal_int, ic_proxy_server_on_signal, SIGINT);
/* on master */
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_term);
uv_signal_start(&ic_proxy_server_signal_term, ic_proxy_server_on_signal, SIGTERM);
/* on segments */
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_stop);
uv_signal_start(&ic_proxy_server_signal_stop, ic_proxy_server_on_signal, SIGQUIT);
/* TODO: we could stop the timer if all the peers are connected */
uv_timer_init(&ic_proxy_server_loop, &ic_proxy_server_timer);
uv_timer_start(&ic_proxy_server_timer, ic_proxy_server_on_timer, 100, 1000);
/* monitor the postmaster pipe to check whether postmaster is still alive */
uv_pipe_init(&ic_proxy_server_loop, &ic_proxy_postmaster_pipe, false);
uv_pipe_open(&ic_proxy_postmaster_pipe, postmaster_alive_fds[POSTMASTER_FD_WATCH]);
uv_read_start((uv_stream_t *)&ic_proxy_postmaster_pipe, ic_proxy_pkt_cache_alloc_buffer,
ic_proxy_server_on_read_postmaster_pipe);
ic_proxy_log(LOG, "ic-proxy-server: running");
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/*
* return non-zero value so we are restarted by the postmaster, but this
* behavior can be controled by calling ic_proxy_server_quit()
*/
ic_proxy_server_exit_code = 1;
uv_run(&ic_proxy_server_loop, UV_RUN_DEFAULT);
uv_loop_close(&ic_proxy_server_loop);
ic_proxy_log(LOG, "ic-proxy-server: closing");
ic_proxy_client_table_uninit();
ic_proxy_peer_table_uninit();
ic_proxy_router_uninit();
ic_proxy_build_server_sock_path(path, sizeof(path));
#if 0
ic_proxy_log(LOG, "unlink(%s) ...", path);
unlink(path);
#endif
ic_proxy_pkt_cache_uninit();
ic_proxy_log(LOG, "ic-proxy-server: closed with code %d",
ic_proxy_server_exit_code);
return ic_proxy_server_exit_code;
}
void
ic_proxy_server_quit(uv_loop_t *loop, bool relaunch)
{
ic_proxy_log(LOG, "ic-proxy-server: quiting");
if (relaunch)
/* return non-zero value so we are restarted by the postmaster */
ic_proxy_server_exit_code = 1;
else
ic_proxy_server_exit_code = 0;
/*
* we can't close the loop directly, we need to properly shutdown all the
* clients first.
*/
if (ic_proxy_peer_listening)
{
/* cancel pending relistening request */
ic_proxy_peer_relistening = false;
uv_unref((uv_handle_t *) &ic_proxy_peer_listener);
uv_close((uv_handle_t *) &ic_proxy_peer_listener, NULL);
}
if (ic_proxy_client_listening)
{
uv_unref((uv_handle_t *) &ic_proxy_client_listener);
uv_close((uv_handle_t *) &ic_proxy_client_listener, NULL);
}
uv_timer_stop(&ic_proxy_server_timer);
uv_unref((uv_handle_t *) &ic_proxy_server_signal_hup);
uv_unref((uv_handle_t *) &ic_proxy_server_signal_term);
uv_unref((uv_handle_t *) &ic_proxy_server_signal_stop);
#if 0
uv_client_table_disconnect_all();
#endif
/*
* do not close the loop directly, it will quit automatically after all the
* clients are closed.
*/
#if 0
uv_loop_close(loop);
#endif
}
相关信息
相关文章
greenplumn ic_proxy_backend 源码
greenplumn ic_proxy_backend 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦