Mon May 26 14:09:50 PDT 2008
- Previous message: [Slony1-commit] slony1-engine/tests run_test.sh
- Next message: [Slony1-commit] slony1-engine/src/backend slony1_funcs.c slony1_funcs.sql
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slon In directory main.slony.info:/tmp/cvs-serv12883/slon Modified Files: local_listen.c remote_listen.c remote_worker.c Log Message: Remove code that generates + listens to LISTEN/NOTIFY events for Events + Confirms remote_listen.c now functions solely via polling, which we already know works fine, as we have had a polling mode for quite a while in the 2.0 code tree. The slon _Restart LISTEN/NOTIFY has been left alone; making sure that this is handled rightly will be left as a further exercise to be handled independently. Index: remote_worker.c =================================================================== RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.168 retrieving revision 1.169 diff -C2 -d -r1.168 -r1.169 *** remote_worker.c 26 May 2008 20:09:54 -0000 1.168 --- remote_worker.c 26 May 2008 21:09:48 -0000 1.169 *************** *** 243,247 **** SlonDString *dsp); static void query_append_event(SlonDString *dsp, ! SlonWorkMsg_event *event, bool suppress_notify); static void store_confirm_forward(SlonNode *node, SlonConn *conn, SlonWorkMsg_confirm *confirm); --- 243,247 ---- SlonDString *dsp); static void query_append_event(SlonDString *dsp, ! SlonWorkMsg_event *event); static void store_confirm_forward(SlonNode *node, SlonConn *conn, SlonWorkMsg_confirm *confirm); *************** *** 291,295 **** bool event_ok; bool need_reloadListen = false; - bool suppress_notify; slon_log(SLON_INFO, --- 291,294 ---- *************** *** 503,508 **** /* ! * Construct the queries to begin a transaction, notify on the event ! * and confirm relations, insert the event into our local sl_event * table and confirm it in our local sl_confirm table. When this * transaction commits, every other remote node listening for events --- 502,507 ---- /* ! * Construct the queries to begin a transaction, insert the event ! * into our local sl_event * table and confirm it in our local sl_confirm table. When this * transaction commits, every other remote node listening for events *************** *** 624,628 **** { /* ! * Execute the forwarding and notify stuff, but do not commit * the transaction yet. */ --- 623,627 ---- { /* ! * Execute the forwarding stuff, but do not commit * the transaction yet. */ *************** *** 663,674 **** dstring_reset(&query1); last_sync_group_size = 0; - suppress_notify = FALSE; for (i = 0; i < sync_group_size; i++) { ! query_append_event(&query1, sync_group[i], suppress_notify); ! if (i < (sync_group_size - 1)) ! free(sync_group[i]); ! last_sync_group_size++; ! suppress_notify = TRUE; } slon_appendquery(&query1, "commit transaction;"); --- 662,671 ---- dstring_reset(&query1); last_sync_group_size = 0; for (i = 0; i < sync_group_size; i++) { ! query_append_event(&query1, sync_group[i]); ! if (i < (sync_group_size - 1)) ! free(sync_group[i]); ! last_sync_group_size++; } slon_appendquery(&query1, "commit transaction;"); *************** *** 1041,1045 **** "notify \"_%s_Restart\"; ", rtcfg_cluster_name); ! query_append_event(&query1, event, FALSE); slon_appendquery(&query1, "commit transaction;"); query_execute(node, local_dbconn, &query1); --- 1038,1042 ---- "notify \"_%s_Restart\"; ", rtcfg_cluster_name); ! query_append_event(&query1, event); slon_appendquery(&query1, "commit transaction;"); query_execute(node, local_dbconn, &query1); *************** *** 1423,1430 **** if (event_ok) { ! query_append_event(&query1, event, FALSE); ! slon_appendquery(&query1, "commit transaction;"); ! if (archive_close(node) < 0) ! slon_retry(); } else --- 1420,1427 ---- if (event_ok) { ! query_append_event(&query1, event); ! slon_appendquery(&query1, "commit transaction;"); ! if (archive_close(node) < 0) ! slon_retry(); } else *************** *** 2141,2162 **** * query_append_event * ! * Add queries to a dstring that notify for Event and Confirm and that insert a ! * duplicate of an event record as well as the confirmation for it. ! * "suppress_notify" parm permits omitting the notify request if running this many times * ---------- */ static void ! query_append_event(SlonDString *dsp, SlonWorkMsg_event *event, bool suppress_notify) { char seqbuf[64]; sprintf(seqbuf, INT64_FORMAT, event->ev_seqno); - if (!suppress_notify) - { - slon_appendquery(dsp, - "notify \"_%s_Event\"; ", - rtcfg_cluster_name); - - } slon_appendquery(dsp, "insert into %s.sl_event " --- 2138,2151 ---- * query_append_event * ! * Add queries to a dstring that insert a duplicate of an event record ! * as well as the confirmation for it. * ---------- */ static void ! query_append_event(SlonDString *dsp, SlonWorkMsg_event *event) { char seqbuf[64]; sprintf(seqbuf, INT64_FORMAT, event->ev_seqno); slon_appendquery(dsp, "insert into %s.sl_event " Index: remote_listen.c =================================================================== RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_listen.c,v retrieving revision 1.44 retrieving revision 1.45 diff -C2 -d -r1.44 -r1.45 *** remote_listen.c 26 May 2008 20:14:27 -0000 1.44 --- remote_listen.c 26 May 2008 21:09:48 -0000 1.45 *************** *** 56,64 **** SlonConn * conn, struct listat * listat); - typedef enum { - SLON_POLLSTATE_POLL=1, - SLON_POLLSTATE_LISTEN - } PollState; - static PollState poll_state; static int poll_sleep; --- 56,59 ---- *************** *** 92,97 **** int64 new_config_seq = 0; - PollState oldpstate; - slon_log(SLON_INFO, "remoteListenThread_%d: thread starts\n", --- 87,90 ---- *************** *** 106,110 **** poll_sleep = 0; - poll_state = SLON_POLLSTATE_POLL; /* Initially, start in Polling mode */ sprintf(conn_symname, "node_%d_listen", node->no_id); --- 99,102 ---- *************** *** 238,263 **** */ (void) slon_mkquery(&query1, - /* "listen \"_%s_Event\"; " */ - /* skip confirms "listen \"_%s_Confirm\"; " */ "select %s.registerNodeConnection(%d); ", - /* rtcfg_cluster_name, */ rtcfg_namespace, rtcfg_nodeid); - if (poll_state == SLON_POLLSTATE_LISTEN) { - slon_appendquery(&query1, - "listen \"_%s_Event\"; ", - rtcfg_cluster_name); - } else { - slon_appendquery(&query1, - "unlisten \"_%s_Event\"; ", - rtcfg_cluster_name); - } res = PQexec(dbconn, dstring_data(&query1)); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) { slon_log(SLON_ERROR, "remoteListenThread_%d: \"%s\" - %s", node->no_id, ! dstring_data(&query1), PQresultErrorMessage(res)); PQclear(res); slon_disconnectdb(conn); --- 230,243 ---- */ (void) slon_mkquery(&query1, "select %s.registerNodeConnection(%d); ", rtcfg_namespace, rtcfg_nodeid); res = PQexec(dbconn, dstring_data(&query1)); ! if (PQresultStatus(res) != PGRES_TUPLES_OK) { slon_log(SLON_ERROR, "remoteListenThread_%d: \"%s\" - %s", node->no_id, ! dstring_data(&query1), PQresultErrorMessage(res)); PQclear(res); slon_disconnectdb(conn); *************** *** 318,322 **** * Receive events from the provider node */ - oldpstate = poll_state; rc = remoteListen_receive_events(node, conn, listat_head); if (rc < 0) --- 298,301 ---- *************** *** 333,371 **** continue; } - if (oldpstate != poll_state) { /* Switched states... */ - switch (poll_state) { - case SLON_POLLSTATE_POLL: - slon_log(SLON_DEBUG2, - "remoteListenThread_%d: UNLISTEN - switch into polling mode\n", - node->no_id); - - (void) slon_mkquery(&query1, - "unlisten \"_%s_Event\"; ", - rtcfg_cluster_name); - break; - case SLON_POLLSTATE_LISTEN: - slon_log(SLON_DEBUG2, - "remoteListenThread_%d: LISTEN - switch from polling mode to use LISTEN\n", - node->no_id); - (void) slon_mkquery(&query1, - "listen \"_%s_Event\"; ", - rtcfg_cluster_name); - break; - } - res = PQexec(dbconn, dstring_data(&query1)); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - slon_log(SLON_ERROR, - "remoteListenThread_%d: \"%s\" - %s", - node->no_id, - dstring_data(&query1), PQresultErrorMessage(res)); - PQclear(res); - slon_disconnectdb(conn); - free(conn_conninfo); - conn = NULL; - conn_conninfo = NULL; - continue; - } - } /* --- 312,315 ---- *************** *** 375,381 **** */ - /* Initially: Let's just blindly check... */ - /* if (forward_confirm) - { */ rc = remoteListen_forward_confirm(node, conn); if (rc < 0) --- 319,322 ---- *************** *** 800,804 **** if (ntuples > 0) { if ((sel_max_events > 2) && (sync_group_maxsize > 100)) { - poll_state = SLON_POLLSTATE_LISTEN; slon_log(SLON_INFO, "remoteListenThread_%d: drew maximum # of events for %d iterations\n", node->no_id, sel_max_events); --- 741,744 ---- *************** *** 808,812 **** } else { poll_sleep = 0; - poll_state = SLON_POLLSTATE_POLL; } } else { --- 748,751 ---- *************** *** 814,822 **** if (poll_sleep > sync_interval_timeout) { poll_sleep = sync_interval_timeout; - poll_state = SLON_POLLSTATE_LISTEN; } } PQclear(res); - last_event_sel = ntuples; return 0; } --- 753,759 ---- Index: local_listen.c =================================================================== RCS file: /home/cvsd/slony1/slony1-engine/src/slon/local_listen.c,v retrieving revision 1.45 retrieving revision 1.46 diff -C2 -d -r1.45 -r1.46 *** local_listen.c 21 Jan 2008 18:54:11 -0000 1.45 --- local_listen.c 26 May 2008 21:09:48 -0000 1.46 *************** *** 68,74 **** */ (void) slon_mkquery(&query1, - /* "listen \"_%s_Event\"; " */ "listen \"_%s_Restart\"; ", - /* rtcfg_cluster_name, */ rtcfg_cluster_name); res = PQexec(dbconn, dstring_data(&query1)); --- 68,72 ----
- Previous message: [Slony1-commit] slony1-engine/tests run_test.sh
- Next message: [Slony1-commit] slony1-engine/src/backend slony1_funcs.c slony1_funcs.sql
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list