Thu Dec 8 15:51:26 PST 2005
- Previous message: [Slony1-commit] By dpage: Comment out redundant query parameter and fix query so it
- Next message: [Slony1-commit] By cbbrowne: Feature Request #1280 - Reduce generation of LISTEN/NOTIFY
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Feature Request #1280 - Reduce generation of LISTEN/NOTIFY entries to cut pg_listener bloat Attached is a patch to the slon code which should dramatically reduce the amount of pg_listener bloat. The notion is that when the slon is working well, processing events with great regularity, we switch over to do polling, and UNLISTEN to events being handled via pg_listener, which entirely eliminates the tendancy for pg_listener to bloat up. By now, people probably know my tendancy to prefer adaptive self-configuration, where possible :-) . We have two states enumerated: enum pstate_enum {POLL=1, LISTEN}; static enum pstate_enum pstate; Polling times are managed via poll_sleep... static int poll_sleep; - Any time the event loop finds events, the sleep time gets set to 0 so that the next iteration starts immediately. - Any time the event loop doesn't find events, we double poll_sleep and add sync_interval (the -s option value) This causes polling to rapidly double, heading towards... OUCH! sync_interval_timeout. If it reaches sync_interval_timeout, we switch states from POLL to LISTEN, and turn on listening to the node's Event. And when we get an event, again, we switch back to POLLing mode. So, we'll generally have two kinds of behaviour: 1. If your application is applying changes very infrequently, then you'll LISTEN, waking up once in a while when there's an update. Not many events generated will mean not many dead pg_listener tuples. 2. If your application applies changes very frequently, then you'll mostly POLL, which will generate exactly 0 dead pg_listener tuples :-) . Both of those have the similar "not many dead pg_listener tuples" property. This would doubtless resolve the common desire for "less dead pg_listener tuples," particularly in combination with a previous patch that eliminates using LISTEN/NOTIFY for event confirmations. Modified Files: -------------- slony1-engine/src/slon: local_listen.c (r1.35 -> r1.36) remote_listen.c (r1.27 -> r1.28) -------------- next part -------------- Index: remote_listen.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v retrieving revision 1.27 retrieving revision 1.28 diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.27 -r1.28 --- src/slon/remote_listen.c +++ src/slon/remote_listen.c @@ -58,6 +58,10 @@ static int remoteListen_receive_events(SlonNode * node, SlonConn * conn, struct listat * listat); +static int poll_sleep; +enum pstate_enum {POLL=1, LISTEN}; +static enum pstate_enum pstate; + extern char *lag_interval; /* ---------- @@ -98,8 +102,11 @@ listat_tail = NULL; dstring_init(&query1); + poll_sleep = 0; + pstate = POLL; /* Initially, start in Polling mode */ + sprintf(conn_symname, "node_%d_listen", node->no_id); -/* sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name); */ + sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name); /* * Work until doomsday @@ -229,13 +236,23 @@ * register the node connection. */ slon_mkquery(&query1, - "listen \"_%s_Event\"; " + /* "listen \"_%s_Event\"; " */ /* skip confirms "listen \"_%s_Confirm\"; " */ - "select %s.registerNodeConnection(%d); ", - rtcfg_cluster_name, /* rtcfg_cluster_name, */ + "select _%s.registerNodeConnection(%d); ", + /* rtcfg_cluster_name, */ rtcfg_namespace, rtcfg_nodeid); + + if (pstate == LISTEN) { + slon_appendquery(&query1, + "listen \"_%s_Event\"; ", + rtcfg_cluster_name); + } else { + slon_appendquery(&query1, + "unlisten \"_%s_Event\"; ", + rtcfg_cluster_name); + } res = PQexec(dbconn, dstring_data(&query1)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) + if (PQresultStatus(res) != PGRES_COMMAND_OK) { slon_log(SLON_ERROR, "remoteListenThread_%d: \"%s\" - %s", @@ -299,6 +316,7 @@ /* * Receive events from the provider node */ + enum pstate_enum oldpstate = pstate; rc = remoteListen_receive_events(node, conn, listat_head); if (rc < 0) { @@ -313,6 +331,41 @@ continue; } + if (oldpstate != pstate) { /* Switched states... */ + switch (pstate) { + case POLL: + slon_log(SLON_DEBUG2, + "remoteListenThread_%d: UNLISTEN\n", + node->no_id); + + slon_mkquery(&query1, + "unlisten \"_%s_Event\"; ", + rtcfg_cluster_name); + break; + case LISTEN: + slon_log(SLON_DEBUG2, + "remoteListenThread_%d: LISTEN\n", + node->no_id); + slon_mkquery(&query1, + "listen \"_%s_Event\"; ", + rtcfg_cluster_name); + break; + } + res = PQexec(dbconn, dstring_data(&query1)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + slon_log(SLON_ERROR, + "remoteListenThread_%d: \"%s\" - %s", + node->no_id, + dstring_data(&query1), PQresultErrorMessage(res)); + PQclear(res); + slon_disconnectdb(conn); + free(conn_conninfo); + conn = NULL; + conn_conninfo = NULL; + continue; + } + } /* * If the remote node notified for new confirmations, read them and @@ -343,7 +396,7 @@ /* * Wait for notification. */ - rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, 10000); + rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep); if (rc == SCHED_STATUS_CANCEL) continue; if (rc != SCHED_STATUS_OK) @@ -746,6 +799,16 @@ (PQgetisnull(res, tupno, 14)) ? NULL : PQgetvalue(res, tupno, 14)); } + if (ntuples > 0) { + poll_sleep = 0; + pstate = POLL; + } else { + poll_sleep = poll_sleep * 2 + sync_interval; + if (poll_sleep > sync_interval_timeout) { + poll_sleep = sync_interval_timeout; + pstate = LISTEN; + } + } PQclear(res); return 0; Index: local_listen.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v retrieving revision 1.35 retrieving revision 1.36 diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.35 -r1.36 --- src/slon/local_listen.c +++ src/slon/local_listen.c @@ -49,6 +49,7 @@ PGnotify *notification; char restart_notify[256]; int restart_request; + int poll_sleep = 0; slon_log(SLON_DEBUG1, "localListenThread: thread starts\n"); @@ -69,9 +70,10 @@ * Listen for local events */ slon_mkquery(&query1, - "listen \"_%s_Event\"; " + /* "listen \"_%s_Event\"; " */ "listen \"_%s_Restart\"; ", - rtcfg_cluster_name, rtcfg_cluster_name); + /* rtcfg_cluster_name, */ + rtcfg_cluster_name); res = PQexec(dbconn, dstring_data(&query1)); if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -636,6 +638,7 @@ */ if (ntuples > 0) { + poll_sleep = 0; /* drop polling time back to 0... */ res = PQexec(dbconn, "commit transaction"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -654,6 +657,12 @@ /* * No database events received. Rollback instead. */ + + /* Increase the amount of time to sleep, to a max of sync_interval_timeout */ + poll_sleep += sync_interval; + if (poll_sleep > sync_interval_timeout) { + poll_sleep = sync_interval_timeout; + } res = PQexec(dbconn, "rollback transaction;"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -668,9 +677,9 @@ } /* - * Wait for notify + * Wait for notify or for timeout */ - if (sched_wait_conn(conn, SCHED_WAIT_SOCK_READ) != SCHED_STATUS_OK) + if (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep) != SCHED_STATUS_OK) break; }
- Previous message: [Slony1-commit] By dpage: Comment out redundant query parameter and fix query so it
- Next message: [Slony1-commit] By cbbrowne: Feature Request #1280 - Reduce generation of LISTEN/NOTIFY
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list