CVS User Account cvsuser
Thu Dec 8 15:51:26 PST 2005
Log Message:
-----------
Feature Request #1280 - Reduce generation of LISTEN/NOTIFY entries to cut pg_listener bloat

Attached is a patch to the slon code which should dramatically reduce
the amount of pg_listener bloat.

The notion is that when the slon is working well, processing events with
great regularity, we switch over to do polling, and UNLISTEN to events
being handled via pg_listener, which entirely eliminates the
tendancy for pg_listener to bloat up.

By now, people probably know my tendancy to prefer adaptive
self-configuration, where possible  :-) .

We have two states enumerated:

enum pstate_enum {POLL=1, LISTEN};
static enum pstate_enum pstate;

Polling times are managed via poll_sleep...
static int poll_sleep;

- Any time the event loop finds events, the sleep time gets set to 0 so
that the next iteration starts immediately.

- Any time the event loop doesn't find events, we double poll_sleep and
add sync_interval (the -s option value)

This causes polling to rapidly double, heading towards... OUCH! 
sync_interval_timeout.

If it reaches sync_interval_timeout, we switch states from POLL to
LISTEN, and turn on listening to the node's Event.

And when we get an event, again, we switch back to POLLing mode.

So, we'll generally have two kinds of behaviour:

1.  If your application is applying changes very infrequently, then
you'll LISTEN, waking up once in a while when there's an update.  Not
many events generated will mean not many dead pg_listener tuples.

2.  If your application applies changes very frequently, then you'll
mostly POLL, which will generate exactly 0 dead pg_listener tuples  :-) .

Both of those have the similar "not many dead pg_listener tuples" property.

This would doubtless resolve the common desire for "less dead
pg_listener tuples," particularly in combination with a previous patch
that eliminates using LISTEN/NOTIFY for event confirmations.

Modified Files:
--------------
    slony1-engine/src/slon:
        local_listen.c (r1.35 -> r1.36)
        remote_listen.c (r1.27 -> r1.28)

-------------- next part --------------
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.27
retrieving revision 1.28
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.27 -r1.28
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -58,6 +58,10 @@
 static int remoteListen_receive_events(SlonNode * node,
 					SlonConn * conn, struct listat * listat);
 
+static int poll_sleep;
+enum pstate_enum {POLL=1, LISTEN};
+static enum pstate_enum pstate;
+
 extern char *lag_interval;
 
 /* ----------
@@ -98,8 +102,11 @@
 	listat_tail = NULL;
 	dstring_init(&query1);
 
+	poll_sleep = 0;
+	pstate = POLL;      /* Initially, start in Polling mode */
+
 	sprintf(conn_symname, "node_%d_listen", node->no_id);
-/*	sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name); */
+	sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name);
 
 	/*
 	 * Work until doomsday
@@ -229,13 +236,23 @@
 			 * register the node connection.
 			 */
 			slon_mkquery(&query1,
-				     "listen \"_%s_Event\"; "
+				     /* "listen \"_%s_Event\"; " */
 				     /*	 skip confirms "listen \"_%s_Confirm\"; " */
-				     "select %s.registerNodeConnection(%d); ",
-				     rtcfg_cluster_name, /* rtcfg_cluster_name, */
+				     "select _%s.registerNodeConnection(%d); ",
+				     /* rtcfg_cluster_name,  */
 				     rtcfg_namespace, rtcfg_nodeid);
+
+			if (pstate == LISTEN) {
+				slon_appendquery(&query1, 
+						 "listen \"_%s_Event\"; ",
+						 rtcfg_cluster_name);
+			} else {
+				slon_appendquery(&query1, 
+						 "unlisten \"_%s_Event\"; ",
+						 rtcfg_cluster_name);
+			}
 			res = PQexec(dbconn, dstring_data(&query1));
-			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
 				slon_log(SLON_ERROR,
 					 "remoteListenThread_%d: \"%s\" - %s",
@@ -299,6 +316,7 @@
 		/*
 		 * Receive events from the provider node
 		 */
+		enum pstate_enum oldpstate = pstate;
 		rc = remoteListen_receive_events(node, conn, listat_head);
 		if (rc < 0)
 		{
@@ -313,6 +331,41 @@
 
 			continue;
 		}
+		if (oldpstate != pstate) { /* Switched states... */
+			switch (pstate) {
+			case POLL:
+				slon_log(SLON_DEBUG2, 
+					 "remoteListenThread_%d: UNLISTEN\n",
+					 node->no_id);
+
+				slon_mkquery(&query1,
+					     "unlisten \"_%s_Event\"; ",
+					     rtcfg_cluster_name);
+				break;
+			case LISTEN:
+				slon_log(SLON_DEBUG2, 
+					 "remoteListenThread_%d: LISTEN\n",
+					 node->no_id);
+				slon_mkquery(&query1,
+					     "listen \"_%s_Event\"; ",
+					     rtcfg_cluster_name);
+				break;
+			}			
+			res = PQexec(dbconn, dstring_data(&query1));
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			{
+				slon_log(SLON_ERROR,
+					 "remoteListenThread_%d: \"%s\" - %s",
+					 node->no_id,
+					 dstring_data(&query1), PQresultErrorMessage(res));
+				PQclear(res);
+				slon_disconnectdb(conn);
+				free(conn_conninfo);
+				conn = NULL;
+				conn_conninfo = NULL;
+				continue;
+			}
+		}
 
 		/*
 		 * If the remote node notified for new confirmations, read them and
@@ -343,7 +396,7 @@
 		/*
 		 * Wait for notification.
 		 */
-		rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, 10000);
+		rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep);
 		if (rc == SCHED_STATUS_CANCEL)
 			continue;
 		if (rc != SCHED_STATUS_OK)
@@ -746,6 +799,16 @@
 		  (PQgetisnull(res, tupno, 14)) ? NULL : PQgetvalue(res, tupno, 14));
 	}
 
+	if (ntuples > 0) {
+		poll_sleep = 0;
+		pstate = POLL;
+	} else {
+		poll_sleep = poll_sleep * 2 + sync_interval;
+		if (poll_sleep > sync_interval_timeout) {
+			poll_sleep = sync_interval_timeout;
+			pstate = LISTEN;
+		}
+	}
 	PQclear(res);
 
 	return 0;
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.35
retrieving revision 1.36
diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.35 -r1.36
--- src/slon/local_listen.c
+++ src/slon/local_listen.c
@@ -49,6 +49,7 @@
 	PGnotify   *notification;
 	char		restart_notify[256];
 	int			restart_request;
+	int poll_sleep = 0;
 
 	slon_log(SLON_DEBUG1, "localListenThread: thread starts\n");
 
@@ -69,9 +70,10 @@
 	 * Listen for local events
 	 */
 	slon_mkquery(&query1,
-				 "listen \"_%s_Event\"; "
+		     /* "listen \"_%s_Event\"; " */
 				 "listen \"_%s_Restart\"; ",
-				 rtcfg_cluster_name, rtcfg_cluster_name);
+		     /*	 rtcfg_cluster_name,  */
+		     rtcfg_cluster_name);
 	res = PQexec(dbconn, dstring_data(&query1));
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
@@ -636,6 +638,7 @@
 		 */
 		if (ntuples > 0)
 		{
+			poll_sleep = 0;  /* drop polling time back to 0... */
 			res = PQexec(dbconn, "commit transaction");
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
@@ -654,6 +657,12 @@
 			/*
 			 * No database events received. Rollback instead.
 			 */
+
+			/* Increase the amount of time to sleep, to a max of sync_interval_timeout */
+			poll_sleep += sync_interval;
+			if (poll_sleep > sync_interval_timeout) {
+				poll_sleep = sync_interval_timeout;
+			}
 			res = PQexec(dbconn, "rollback transaction;");
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
@@ -668,9 +677,9 @@
 		}
 
 		/*
-		 * Wait for notify
+		 * Wait for notify or for timeout
 		 */
-		if (sched_wait_conn(conn, SCHED_WAIT_SOCK_READ) != SCHED_STATUS_OK)
+		if (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep) != SCHED_STATUS_OK)
 			break;
 	}
 


More information about the Slony1-commit mailing list