greenplumn ic_proxy_main 源码

  • 2022-08-18
  • 浏览 (426)

greenplumn ic_proxy_main 代码

文件路径:/src/backend/cdb/motion/ic_proxy_main.c

/*-------------------------------------------------------------------------
 *
 * ic_proxy_main.c
 *
 *	  The main loop of the ic-proxy, it listens for both new peers and new
 *	  clients, it also establish the peer connections.
 *
 *
 * Copyright (c) 2020-Present VMware, Inc. or its affiliates.
 *
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"

#include "ic_proxy_server.h"
#include "ic_proxy_addr.h"
#include "ic_proxy_pkt_cache.h"

#include <uv.h>

#include <unistd.h>

static void ic_proxy_server_peer_listener_init(uv_loop_t *loop);

static uv_loop_t	ic_proxy_server_loop;
static uv_signal_t	ic_proxy_server_signal_hup;
static uv_signal_t	ic_proxy_server_signal_int;
static uv_signal_t	ic_proxy_server_signal_term;
static uv_signal_t	ic_proxy_server_signal_stop;
static uv_timer_t	ic_proxy_server_timer;

static uv_tcp_t		ic_proxy_peer_listener;
static bool			ic_proxy_peer_listening;
static bool			ic_proxy_peer_relistening;

static uv_pipe_t	ic_proxy_client_listener;
static bool			ic_proxy_client_listening;

static int			ic_proxy_server_exit_code = 1;

/* pipe to check whether postmaster is alive */
static uv_pipe_t	ic_proxy_postmaster_pipe;

/*
 * The peer listener is closed.
 */
static void
ic_proxy_server_peer_listener_on_closed(uv_handle_t *handle)
{
	ic_proxy_log(LOG, "ic-proxy-server: peer listener: closed");

	/* A new peer listener will be created on the next timer callback */
	ic_proxy_peer_listening = false;

	/* If relisten is requested, do it now */
	if (ic_proxy_peer_relistening)
	{
		ic_proxy_peer_relistening = false;
		ic_proxy_server_peer_listener_init(handle->loop);
	}
}

/*
 * New peer arrives.
 */
static void
ic_proxy_server_on_new_peer(uv_stream_t *server, int status)
{
	ICProxyPeer *peer;
	int			ret;

	if (status < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: new peer error: %s",
					 uv_strerror(status));

		uv_close((uv_handle_t *) server,
				 ic_proxy_server_peer_listener_on_closed);
		return;
	}

	ic_proxy_log(LOG, "ic-proxy-server: new peer to the server");

	peer = ic_proxy_peer_new(server->loop,
							 IC_PROXY_INVALID_CONTENT, IC_PROXY_INVALID_DBID);

	ret = uv_accept(server, (uv_stream_t *) &peer->tcp);
	if (ret < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: fail to accept new peer: %s",
					 uv_strerror(ret));
		ic_proxy_peer_free(peer);
		return;
	}

	/* TODO: it is better to only touch the states in peer.c */
	peer->state |= IC_PROXY_PEER_STATE_ACCEPTED;

	/* Dump some connection information, not very useful though */
	{
		struct sockaddr_storage peeraddr;
		int			addrlen = sizeof(peeraddr);
		char		name[HOST_NAME_MAX];

		uv_tcp_getpeername(&peer->tcp, (struct sockaddr *) &peeraddr, &addrlen);
		if (peeraddr.ss_family == AF_INET)
		{
			struct sockaddr_in *peeraddr4 = (struct sockaddr_in *) &peeraddr;

			uv_ip4_name(peeraddr4, name, sizeof(name));

			ic_proxy_log(LOG, "ic-proxy-server: the new peer is from %s:%d",
						 name, ntohs(peeraddr4->sin_port));
		}
		else if (peeraddr.ss_family == AF_INET6)
		{
			struct sockaddr_in6 *peeraddr6 = (struct sockaddr_in6 *) &peeraddr;

			uv_ip6_name(peeraddr6, name, sizeof(name));

			ic_proxy_log(LOG, "ic-proxy-server: the new peer is from %s:%d",
						 name, ntohs(peeraddr6->sin6_port));
		}
	}

	ic_proxy_peer_read_hello(peer);
}

/*
 * Setup the peer listener.
 *
 * The peer listener listens on a tcp socket, the peer connections will come
 * through it.
 */
static void
ic_proxy_server_peer_listener_init(uv_loop_t *loop)
{
	const ICProxyAddr *addr;
	uv_tcp_t   *listener = &ic_proxy_peer_listener;
	int			fd = -1;
	int			ret;

	if (ic_proxy_addrs == NIL)
		return;

	if (ic_proxy_peer_listening)
		return;

	/* Get the addr from the gp_interconnect_proxy_addresses */
	addr = ic_proxy_get_my_addr();
	if (addr == NULL)
		/* Cannot get my addr, maybe the setting is invalid */
		return;

#if IC_PROXY_LOG_LEVEL <= LOG
	{
		char		name[HOST_NAME_MAX] = "unknown";
		int			port = 0;
		int			family;
		int			ret;

		ret = ic_proxy_extract_sockaddr((struct sockaddr *) &addr->sockaddr,
									name, sizeof(name), &port, &family);
		if (ret == 0)
			ic_proxy_log(LOG,
						 "ic-proxy-server: setting up peer listener on %s:%s (%s:%d family=%d)",
						 addr->hostname, addr->service, name, port, family);
		else
			ic_proxy_log(WARNING,
						 "ic-proxy-server: setting up peer listener on %s:%s (%s:%d family=%d) (fail to extract the address: %s)",
						 addr->hostname, addr->service, name, port, family,
						 uv_strerror(ret));
	}
#endif /* IC_PROXY_LOG_LEVEL <= LOG */

	/*
	 * It is important to set TCP_NODELAY, otherwise we will suffer from
	 * significant latency and get very bad OLTP performance.
	 */
	uv_tcp_init(loop, listener);
	uv_tcp_nodelay(listener, true);

	ret = uv_tcp_bind(listener, (struct sockaddr *) &addr->sockaddr, 0);
	if (ret < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: tcp: fail to bind: %s",
					 uv_strerror(ret));
		return;
	}

	ret = uv_listen((uv_stream_t *) listener,
					IC_PROXY_BACKLOG, ic_proxy_server_on_new_peer);
	if (ret < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: tcp: fail to listen: %s",
					 uv_strerror(ret));
		return;
	}

	uv_fileno((uv_handle_t *) listener, &fd);
	ic_proxy_log(LOG, "ic-proxy-server: tcp: listening on socket %d", fd);

	ic_proxy_peer_listening = true;
}

/*
 * Reinit the peer listener.
 */
static void
ic_proxy_server_peer_listener_reinit(uv_loop_t *loop)
{
	const ICProxyAddr *myaddr = ic_proxy_get_my_addr();

	if (ic_proxy_peer_relistening)
		return;

	if (ic_proxy_peer_listening)
	{
		/*
		 * We are listening already, so must first close the current one, we
		 * keep the ic_proxy_peer_listening as true during the process to
		 * prevent double connect.
		 */
		ic_proxy_log(LOG, "ic-proxy-server: closing the legacy peer listener");

		/* Only recreate a new listener if an address is assigned to us */
		ic_proxy_peer_relistening = !!myaddr;

		uv_close((uv_handle_t *) &ic_proxy_peer_listener,
				 ic_proxy_server_peer_listener_on_closed);
	}
	else if (myaddr)
	{
		/* Otherwise simply establish a new one */
		ic_proxy_peer_relistening = false;
		ic_proxy_server_peer_listener_init(loop);
	}
}

/*
 * The client listener is closed.
 */
static void
ic_proxy_server_client_listener_on_closed(uv_handle_t *handle)
{
	ic_proxy_log(LOG, "ic-proxy-server: client listener: closed");

	/* A new client listener will be created on the next timer callback */
	ic_proxy_client_listening = false;
}

/*
 * New client arrives.
 */
static void
ic_proxy_server_on_new_client(uv_stream_t *server, int status)
{
	ICProxyClient *client;
	int			ret;

	if (status < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: new client error: %s",
					 uv_strerror(status));

		uv_close((uv_handle_t *) server,
				 ic_proxy_server_client_listener_on_closed);
		return;
	}

	ic_proxy_log(LOG, "ic-proxy-server: new client to the server");

	client = ic_proxy_client_new(server->loop, false);

	ret = uv_accept(server, ic_proxy_client_get_stream(client));
	if (ret < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: fail to accept new client: %s",
					 uv_strerror(ret));
		return;
	}

	ic_proxy_client_read_hello(client);
}

/*
 * Setup the client listener.
 *
 * The client listener listens on a domain socket, the client connections will
 * come through it.
 */
static void
ic_proxy_server_client_listener_init(uv_loop_t *loop)
{
	uv_pipe_t  *listener = &ic_proxy_client_listener;
	char		path[MAXPGPATH];
	int			fd = -1;
	int			ret;

	if (ic_proxy_client_listening)
		return;

	ic_proxy_build_server_sock_path(path, sizeof(path));

	/* FIXME: do not unlink here */
	ic_proxy_log(LOG, "unlink(%s) ...", path);
	unlink(path);

	ic_proxy_log(LOG, "ic-proxy-server: setting up client listener on address %s",
				 path);

	ret = uv_pipe_init(loop, listener, false);
	if (ret < 0)
	{
		ic_proxy_log(WARNING,
					 "ic-proxy-server: fail to init a client listener: %s",
					 uv_strerror(ret));
		return;
	}

	ret = uv_pipe_bind(listener, path);
	if (ret < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: pipe: fail to bind(%s): %s",
					 path, uv_strerror(ret));
		return;
	}

	ret = uv_listen((uv_stream_t *) listener,
					IC_PROXY_BACKLOG, ic_proxy_server_on_new_client);
	if (ret < 0)
	{
		ic_proxy_log(WARNING, "ic-proxy-server: pipe: fail to listen on path %s: %s",
					 path, uv_strerror(ret));
		return;
	}

	uv_fileno((uv_handle_t *) listener, &fd);
	ic_proxy_log(LOG, "ic-proxy-server: pipe: listening on socket %d", fd);

	/*
	 * Dump the inode of the domain socket file, this helps us to know that the
	 * file is replaced by someone.  This is not likely to happen, we have
	 * carefully choosen the file path to not conflict with each other.
	 */
	{
		struct stat	st;

		stat(path, &st);
		ic_proxy_log(LOG, "ic-proxy-server: dev=%lu, inode=%lu, path=%s",
					 st.st_dev, st.st_ino, path);
	}

	ic_proxy_client_listening = true;
}

/*
 * Establish the peer connections.
 *
 * A proxy connects to all the other proxies, all these connections form the
 * proxy network.  Only one connection is needed between 2 proxies, this is
 * ensured by a policy that "proxy X connects to proxy Y iff X > Y".  To support
 * mirror promotion, X attempts to connect to Y even if Y is a mirror, or even
 * if we have connected to Y's primary.  In fact we do not know whether Y is a
 * mirror or not, and we do not care.
 */
static void
ic_proxy_server_ensure_peers(uv_loop_t *loop)
{
	ListCell   *cell;

	foreach(cell, ic_proxy_addrs)
	{
		ICProxyAddr *addr = lfirst(cell);
		ICProxyPeer *peer;

		if (addr->content >= GpIdentity.segindex)
			continue;
		if (addr->dbid == GpIdentity.dbid)
			continue; /* do not connect to my primary / mirror */

		/*
		 * First get the peer with the peer id, then connect to it.  The peer
		 * can be a placeholder, can be in the progress of a connection, or can
		 * be connected, the ic_proxy_peer_connect() function will take care of
		 * the state.
		 */
		peer = ic_proxy_peer_blessed_lookup(loop, addr->content, addr->dbid);
		ic_proxy_peer_connect(peer, (struct sockaddr_in *) addr);
	}
}

/*
 * Drop legacy peers.
 *
 * The list ic_proxy_removed_addrs contains both removed and updated addresses,
 * the corresponding peers must be disconnected before taking further actions.
 */
static void
ic_proxy_server_drop_legacy_peers(uv_loop_t *loop)
{
	ListCell   *cell;
	const ICProxyAddr *myaddr = ic_proxy_get_my_addr();

	/*
	 * Also take the chance to check the peer listener.
	 *
	 * If myaddr cannot be found at all, the address must have been removed,
	 * close the current listener.
	 */
	if (!myaddr)
		ic_proxy_server_peer_listener_reinit(loop);

	foreach(cell, ic_proxy_removed_addrs)
	{
		ICProxyAddr *addr = lfirst(cell);
		ICProxyPeer *peer;

		/*
		 * Also take the chance to check the peer listener.
		 *
		 * If myaddr appears in the removed list, then the address must have
		 * been changed or removed, no need to compare the sockaddrs again.
		 */
		if (myaddr && myaddr->dbid == addr->dbid)
			ic_proxy_server_peer_listener_reinit(loop);

		/*
		 * Refer to ic_proxy_server_ensure_peers() on why we need below checks.
		 */
		if (addr->content >= GpIdentity.segindex)
			continue;
		if (addr->dbid == GpIdentity.dbid)
			continue; /* do not connect to my primary / mirror */

		peer = ic_proxy_peer_lookup(addr->content, addr->dbid);
		if (!peer)
			continue;

		ic_proxy_peer_disconnect(peer);
	}
}

/*
 * Timer handler.
 *
 * This is used to maintain the proxy-proxy network, as well as the client and
 * peer listeners.
 */
static void
ic_proxy_server_on_timer(uv_timer_t *timer)
{
	ic_proxy_server_peer_listener_init(timer->loop);
	ic_proxy_server_ensure_peers(timer->loop);
	ic_proxy_server_client_listener_init(timer->loop);
}

/*
 * Signal handler.
 *
 * Signals are handled via the signalfd() call in libuv, so this is a normal
 * callback as others, nothing special, errors can be raised, too.
 */
static void
ic_proxy_server_on_signal(uv_signal_t *handle, int signum)
{
	ic_proxy_log(WARNING, "ic-proxy-server: received signal %d", signum);

	if (signum == SIGHUP)
	{
		ProcessConfigFile(PGC_SIGHUP);

		ic_proxy_reload_addresses(handle->loop);
		ic_proxy_server_drop_legacy_peers(handle->loop);

		ic_proxy_server_peer_listener_init(handle->loop);
		ic_proxy_server_ensure_peers(handle->loop);
		ic_proxy_server_client_listener_init(handle->loop);
	}
	else
	{
		uv_stop(handle->loop);
	}
}

/*
 * callback when received data from ic_proxy_postmaster_pipe
 */
static void
ic_proxy_server_on_read_postmaster_pipe(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
	/* return the pkt to cache freelist, we don't care about the buffer content */
	if (buf->base)
		ic_proxy_pkt_cache_free(buf->base);

	/* nread = 0 means EAGAIN and EWOULDBLOCK, while nread = EOF means postmaster is dead */
	if (nread == UV_EOF)
		proc_exit(1);
	else if (nread < 0)
		ic_proxy_log(FATAL, "read on postmaster death monitoring pipe failed: %s", uv_strerror(nread));
	else if (nread > 0)
		ic_proxy_log(FATAL, "unexpected data in postmaster death monitoring pipe with length: %ld", nread);
}

/*
 * The main loop of the ic-proxy.
 */
int
ic_proxy_server_main(void)
{
	char		path[MAXPGPATH];

	ic_proxy_log(LOG, "ic-proxy-server: setting up");

	ic_proxy_pkt_cache_init(IC_PROXY_MAX_PKT_SIZE);

	uv_loop_init(&ic_proxy_server_loop);

	ic_proxy_reload_addresses(&ic_proxy_server_loop);

	ic_proxy_router_init(&ic_proxy_server_loop);
	ic_proxy_peer_table_init();
	ic_proxy_client_table_init();

	ic_proxy_peer_listening = false;
	ic_proxy_peer_relistening = false;
	ic_proxy_client_listening = false;

	uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_hup);
	uv_signal_start(&ic_proxy_server_signal_hup, ic_proxy_server_on_signal, SIGHUP);

	uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_int);
	uv_signal_start(&ic_proxy_server_signal_int, ic_proxy_server_on_signal, SIGINT);

	/* on master */
	uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_term);
	uv_signal_start(&ic_proxy_server_signal_term, ic_proxy_server_on_signal, SIGTERM);

	/* on segments */
	uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_stop);
	uv_signal_start(&ic_proxy_server_signal_stop, ic_proxy_server_on_signal, SIGQUIT);

	/* TODO: we could stop the timer if all the peers are connected */
	uv_timer_init(&ic_proxy_server_loop, &ic_proxy_server_timer);
	uv_timer_start(&ic_proxy_server_timer, ic_proxy_server_on_timer, 100, 1000);

	/* monitor the postmaster pipe to check whether postmaster is still alive */
	uv_pipe_init(&ic_proxy_server_loop, &ic_proxy_postmaster_pipe, false);
	uv_pipe_open(&ic_proxy_postmaster_pipe, postmaster_alive_fds[POSTMASTER_FD_WATCH]);
	uv_read_start((uv_stream_t *)&ic_proxy_postmaster_pipe, ic_proxy_pkt_cache_alloc_buffer,
				  ic_proxy_server_on_read_postmaster_pipe);

	ic_proxy_log(LOG, "ic-proxy-server: running");

	/* We're now ready to receive signals */
	BackgroundWorkerUnblockSignals();

	/*
	 * return non-zero value so we are restarted by the postmaster, but this
	 * behavior can be controled by calling ic_proxy_server_quit()
	 */
	ic_proxy_server_exit_code = 1;
	uv_run(&ic_proxy_server_loop, UV_RUN_DEFAULT);
	uv_loop_close(&ic_proxy_server_loop);

	ic_proxy_log(LOG, "ic-proxy-server: closing");

	ic_proxy_client_table_uninit();
	ic_proxy_peer_table_uninit();
	ic_proxy_router_uninit();

	ic_proxy_build_server_sock_path(path, sizeof(path));
#if 0
	ic_proxy_log(LOG, "unlink(%s) ...", path);
	unlink(path);
#endif

	ic_proxy_pkt_cache_uninit();

	ic_proxy_log(LOG, "ic-proxy-server: closed with code %d",
				 ic_proxy_server_exit_code);

	return ic_proxy_server_exit_code;
}

void
ic_proxy_server_quit(uv_loop_t *loop, bool relaunch)
{
	ic_proxy_log(LOG, "ic-proxy-server: quiting");

	if (relaunch)
		/* return non-zero value so we are restarted by the postmaster */
		ic_proxy_server_exit_code = 1;
	else
		ic_proxy_server_exit_code = 0;

	/*
	 * we can't close the loop directly, we need to properly shutdown all the
	 * clients first.
	 */
	if (ic_proxy_peer_listening)
	{
		/* cancel pending relistening request */
		ic_proxy_peer_relistening = false;

		uv_unref((uv_handle_t *) &ic_proxy_peer_listener);
		uv_close((uv_handle_t *) &ic_proxy_peer_listener, NULL);
	}
	if (ic_proxy_client_listening)
	{
		uv_unref((uv_handle_t *) &ic_proxy_client_listener);
		uv_close((uv_handle_t *) &ic_proxy_client_listener, NULL);
	}
	uv_timer_stop(&ic_proxy_server_timer);
	uv_unref((uv_handle_t *) &ic_proxy_server_signal_hup);
	uv_unref((uv_handle_t *) &ic_proxy_server_signal_term);
	uv_unref((uv_handle_t *) &ic_proxy_server_signal_stop);

#if 0
	uv_client_table_disconnect_all();
#endif

	/*
	 * do not close the loop directly, it will quit automatically after all the
	 * clients are closed.
	 */
#if 0
	uv_loop_close(loop);
#endif
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbmotion 源码

greenplumn htupfifo 源码

greenplumn ic_common 源码

greenplumn ic_proxy 源码

greenplumn ic_proxy_addr 源码

greenplumn ic_proxy_addr 源码

greenplumn ic_proxy_backend 源码

greenplumn ic_proxy_backend 源码

greenplumn ic_proxy_bgworker 源码

greenplumn ic_proxy_client 源码

0  赞