Mon May 26 14:09:50 PDT 2008
- Previous message: [Slony1-commit] slony1-engine/tests run_test.sh
- Next message: [Slony1-commit] slony1-engine/src/backend slony1_funcs.c slony1_funcs.sql
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv12883/slon
Modified Files:
local_listen.c remote_listen.c remote_worker.c
Log Message:
Remove code that generates + listens to LISTEN/NOTIFY events for
Events + Confirms
remote_listen.c now functions solely via polling, which we already know
works fine, as we have had a polling mode for quite a while in the 2.0
code tree.
The slon _Restart LISTEN/NOTIFY has been left alone; making sure that this
is handled rightly will be left as a further exercise to be handled
independently.
Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.168
retrieving revision 1.169
diff -C2 -d -r1.168 -r1.169
*** remote_worker.c 26 May 2008 20:09:54 -0000 1.168
--- remote_worker.c 26 May 2008 21:09:48 -0000 1.169
***************
*** 243,247 ****
SlonDString *dsp);
static void query_append_event(SlonDString *dsp,
! SlonWorkMsg_event *event, bool suppress_notify);
static void store_confirm_forward(SlonNode *node, SlonConn *conn,
SlonWorkMsg_confirm *confirm);
--- 243,247 ----
SlonDString *dsp);
static void query_append_event(SlonDString *dsp,
! SlonWorkMsg_event *event);
static void store_confirm_forward(SlonNode *node, SlonConn *conn,
SlonWorkMsg_confirm *confirm);
***************
*** 291,295 ****
bool event_ok;
bool need_reloadListen = false;
- bool suppress_notify;
slon_log(SLON_INFO,
--- 291,294 ----
***************
*** 503,508 ****
/*
! * Construct the queries to begin a transaction, notify on the event
! * and confirm relations, insert the event into our local sl_event
* table and confirm it in our local sl_confirm table. When this
* transaction commits, every other remote node listening for events
--- 502,507 ----
/*
! * Construct the queries to begin a transaction, insert the event
! * into our local sl_event
* table and confirm it in our local sl_confirm table. When this
* transaction commits, every other remote node listening for events
***************
*** 624,628 ****
{
/*
! * Execute the forwarding and notify stuff, but do not commit
* the transaction yet.
*/
--- 623,627 ----
{
/*
! * Execute the forwarding stuff, but do not commit
* the transaction yet.
*/
***************
*** 663,674 ****
dstring_reset(&query1);
last_sync_group_size = 0;
- suppress_notify = FALSE;
for (i = 0; i < sync_group_size; i++)
{
! query_append_event(&query1, sync_group[i], suppress_notify);
! if (i < (sync_group_size - 1))
! free(sync_group[i]);
! last_sync_group_size++;
! suppress_notify = TRUE;
}
slon_appendquery(&query1, "commit transaction;");
--- 662,671 ----
dstring_reset(&query1);
last_sync_group_size = 0;
for (i = 0; i < sync_group_size; i++)
{
! query_append_event(&query1, sync_group[i]);
! if (i < (sync_group_size - 1))
! free(sync_group[i]);
! last_sync_group_size++;
}
slon_appendquery(&query1, "commit transaction;");
***************
*** 1041,1045 ****
"notify \"_%s_Restart\"; ",
rtcfg_cluster_name);
! query_append_event(&query1, event, FALSE);
slon_appendquery(&query1, "commit transaction;");
query_execute(node, local_dbconn, &query1);
--- 1038,1042 ----
"notify \"_%s_Restart\"; ",
rtcfg_cluster_name);
! query_append_event(&query1, event);
slon_appendquery(&query1, "commit transaction;");
query_execute(node, local_dbconn, &query1);
***************
*** 1423,1430 ****
if (event_ok)
{
! query_append_event(&query1, event, FALSE);
! slon_appendquery(&query1, "commit transaction;");
! if (archive_close(node) < 0)
! slon_retry();
}
else
--- 1420,1427 ----
if (event_ok)
{
! query_append_event(&query1, event);
! slon_appendquery(&query1, "commit transaction;");
! if (archive_close(node) < 0)
! slon_retry();
}
else
***************
*** 2141,2162 ****
* query_append_event
*
! * Add queries to a dstring that notify for Event and Confirm and that insert a
! * duplicate of an event record as well as the confirmation for it.
! * "suppress_notify" parm permits omitting the notify request if running this many times
* ----------
*/
static void
! query_append_event(SlonDString *dsp, SlonWorkMsg_event *event, bool suppress_notify)
{
char seqbuf[64];
sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
- if (!suppress_notify)
- {
- slon_appendquery(dsp,
- "notify \"_%s_Event\"; ",
- rtcfg_cluster_name);
-
- }
slon_appendquery(dsp,
"insert into %s.sl_event "
--- 2138,2151 ----
* query_append_event
*
! * Add queries to a dstring that insert a duplicate of an event record
! * as well as the confirmation for it.
* ----------
*/
static void
! query_append_event(SlonDString *dsp, SlonWorkMsg_event *event)
{
char seqbuf[64];
sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
slon_appendquery(dsp,
"insert into %s.sl_event "
Index: remote_listen.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.44
retrieving revision 1.45
diff -C2 -d -r1.44 -r1.45
*** remote_listen.c 26 May 2008 20:14:27 -0000 1.44
--- remote_listen.c 26 May 2008 21:09:48 -0000 1.45
***************
*** 56,64 ****
SlonConn * conn, struct listat * listat);
- typedef enum {
- SLON_POLLSTATE_POLL=1,
- SLON_POLLSTATE_LISTEN
- } PollState;
- static PollState poll_state;
static int poll_sleep;
--- 56,59 ----
***************
*** 92,97 ****
int64 new_config_seq = 0;
- PollState oldpstate;
-
slon_log(SLON_INFO,
"remoteListenThread_%d: thread starts\n",
--- 87,90 ----
***************
*** 106,110 ****
poll_sleep = 0;
- poll_state = SLON_POLLSTATE_POLL; /* Initially, start in Polling mode */
sprintf(conn_symname, "node_%d_listen", node->no_id);
--- 99,102 ----
***************
*** 238,263 ****
*/
(void) slon_mkquery(&query1,
- /* "listen \"_%s_Event\"; " */
- /* skip confirms "listen \"_%s_Confirm\"; " */
"select %s.registerNodeConnection(%d); ",
- /* rtcfg_cluster_name, */
rtcfg_namespace, rtcfg_nodeid);
- if (poll_state == SLON_POLLSTATE_LISTEN) {
- slon_appendquery(&query1,
- "listen \"_%s_Event\"; ",
- rtcfg_cluster_name);
- } else {
- slon_appendquery(&query1,
- "unlisten \"_%s_Event\"; ",
- rtcfg_cluster_name);
- }
res = PQexec(dbconn, dstring_data(&query1));
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
slon_log(SLON_ERROR,
"remoteListenThread_%d: \"%s\" - %s",
node->no_id,
! dstring_data(&query1), PQresultErrorMessage(res));
PQclear(res);
slon_disconnectdb(conn);
--- 230,243 ----
*/
(void) slon_mkquery(&query1,
"select %s.registerNodeConnection(%d); ",
rtcfg_namespace, rtcfg_nodeid);
res = PQexec(dbconn, dstring_data(&query1));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR,
"remoteListenThread_%d: \"%s\" - %s",
node->no_id,
! dstring_data(&query1), PQresultErrorMessage(res));
PQclear(res);
slon_disconnectdb(conn);
***************
*** 318,322 ****
* Receive events from the provider node
*/
- oldpstate = poll_state;
rc = remoteListen_receive_events(node, conn, listat_head);
if (rc < 0)
--- 298,301 ----
***************
*** 333,371 ****
continue;
}
- if (oldpstate != poll_state) { /* Switched states... */
- switch (poll_state) {
- case SLON_POLLSTATE_POLL:
- slon_log(SLON_DEBUG2,
- "remoteListenThread_%d: UNLISTEN - switch into polling mode\n",
- node->no_id);
-
- (void) slon_mkquery(&query1,
- "unlisten \"_%s_Event\"; ",
- rtcfg_cluster_name);
- break;
- case SLON_POLLSTATE_LISTEN:
- slon_log(SLON_DEBUG2,
- "remoteListenThread_%d: LISTEN - switch from polling mode to use LISTEN\n",
- node->no_id);
- (void) slon_mkquery(&query1,
- "listen \"_%s_Event\"; ",
- rtcfg_cluster_name);
- break;
- }
- res = PQexec(dbconn, dstring_data(&query1));
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- slon_log(SLON_ERROR,
- "remoteListenThread_%d: \"%s\" - %s",
- node->no_id,
- dstring_data(&query1), PQresultErrorMessage(res));
- PQclear(res);
- slon_disconnectdb(conn);
- free(conn_conninfo);
- conn = NULL;
- conn_conninfo = NULL;
- continue;
- }
- }
/*
--- 312,315 ----
***************
*** 375,381 ****
*/
- /* Initially: Let's just blindly check... */
- /* if (forward_confirm)
- { */
rc = remoteListen_forward_confirm(node, conn);
if (rc < 0)
--- 319,322 ----
***************
*** 800,804 ****
if (ntuples > 0) {
if ((sel_max_events > 2) && (sync_group_maxsize > 100)) {
- poll_state = SLON_POLLSTATE_LISTEN;
slon_log(SLON_INFO, "remoteListenThread_%d: drew maximum # of events for %d iterations\n",
node->no_id, sel_max_events);
--- 741,744 ----
***************
*** 808,812 ****
} else {
poll_sleep = 0;
- poll_state = SLON_POLLSTATE_POLL;
}
} else {
--- 748,751 ----
***************
*** 814,822 ****
if (poll_sleep > sync_interval_timeout) {
poll_sleep = sync_interval_timeout;
- poll_state = SLON_POLLSTATE_LISTEN;
}
}
PQclear(res);
- last_event_sel = ntuples;
return 0;
}
--- 753,759 ----
Index: local_listen.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.45
retrieving revision 1.46
diff -C2 -d -r1.45 -r1.46
*** local_listen.c 21 Jan 2008 18:54:11 -0000 1.45
--- local_listen.c 26 May 2008 21:09:48 -0000 1.46
***************
*** 68,74 ****
*/
(void) slon_mkquery(&query1,
- /* "listen \"_%s_Event\"; " */
"listen \"_%s_Restart\"; ",
- /* rtcfg_cluster_name, */
rtcfg_cluster_name);
res = PQexec(dbconn, dstring_data(&query1));
--- 68,72 ----
- Previous message: [Slony1-commit] slony1-engine/tests run_test.sh
- Next message: [Slony1-commit] slony1-engine/src/backend slony1_funcs.c slony1_funcs.sql
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list