Tue Nov 22 05:12:06 PST 2005
- Previous message: [Slony1-commit] By wieck: Restructuring of the watchdog process structure.
- Next message: [Slony1-commit] By wieck: Adjust .cvsignore to now generated files.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- A little attempt to make the source code readable again (reformatting of comments that had been screwed by Darcy's pgindent run and such). Jan Modified Files: -------------- slony1-engine/src/backend: slony1_funcs.c (r1.36 -> r1.37) slony1-engine/src/slon: cleanup_thread.c (r1.28 -> r1.29) confoptions.c (r1.14 -> r1.15) confoptions.h (r1.25 -> r1.26) dbutils.c (r1.17 -> r1.18) local_listen.c (r1.34 -> r1.35) misc.c (r1.21 -> r1.22) remote_listen.c (r1.24 -> r1.25) remote_worker.c (r1.98 -> r1.99) runtime_config.c (r1.26 -> r1.27) scheduler.c (r1.22 -> r1.23) slon.c (r1.60 -> r1.61) slon.h (r1.54 -> r1.55) snmp_thread.c (r1.2 -> r1.3) sync_thread.c (r1.16 -> r1.17) slony1-engine/src/slonik: dbutil.c (r1.8 -> r1.9) slonik.c (r1.52 -> r1.53) slonik.h (r1.24 -> r1.25) -------------- next part -------------- Index: slony1_funcs.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.c,v retrieving revision 1.36 retrieving revision 1.37 diff -Lsrc/backend/slony1_funcs.c -Lsrc/backend/slony1_funcs.c -u -w -r1.36 -r1.37 --- src/backend/slony1_funcs.c +++ src/backend/slony1_funcs.c @@ -1085,10 +1085,9 @@ slon_quote_identifier(const char *ident) { /* - * Can avoid quoting if ident starts with a lowercase letter or - * underscore and contains only lowercase letters, digits, and - * underscores, *and* is not any SQL keyword. Otherwise, supply - * quotes. + * Can avoid quoting if ident starts with a lowercase letter or underscore + * and contains only lowercase letters, digits, and underscores, *and* is + * not any SQL keyword. Otherwise, supply quotes. */ int nquotes = 0; bool safe; @@ -1097,8 +1096,8 @@ char *optr; /* - * would like to use <ctype.h> macros here, but they might yield - * unwanted locale-specific results... + * would like to use <ctype.h> macros here, but they might yield unwanted + * locale-specific results... */ safe = ((ident[0] >= 'a' && ident[0] <= 'z') || ident[0] == '_'); @@ -1123,13 +1122,13 @@ if (safe) { /* - * Check for keyword. This test is overly strong, since many of - * the "keywords" known to the parser are usable as column names, - * but the parser doesn't provide any easy way to test for whether - * an identifier is safe or not... so be safe not sorry. + * Check for keyword. This test is overly strong, since many of the + * "keywords" known to the parser are usable as column names, but the + * parser doesn't provide any easy way to test for whether an + * identifier is safe or not... so be safe not sorry. * - * Note: ScanKeywordLookup() does case-insensitive comparison, but - * that's fine, since we already know we have all-lower-case. + * Note: ScanKeywordLookup() does case-insensitive comparison, but that's + * fine, since we already know we have all-lower-case. */ if (ScanKeywordLookup(ident) != NULL) safe = false; Index: confoptions.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v retrieving revision 1.25 retrieving revision 1.26 diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.25 -r1.26 Index: scheduler.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/scheduler.c,v retrieving revision 1.22 retrieving revision 1.23 diff -Lsrc/slon/scheduler.c -Lsrc/slon/scheduler.c -u -w -r1.22 -r1.23 --- src/slon/scheduler.c +++ src/slon/scheduler.c @@ -1,4 +1,4 @@ -/*------------------------------------------------------------------------- +/* ---------------------------------------------------------------------- * scheduler.c * * Event scheduling subsystem for slon. @@ -7,7 +7,7 @@ * Author: Jan Wieck, Afilias USA INC. * * $Id$ - *------------------------------------------------------------------------- + * ---------------------------------------------------------------------- */ @@ -36,8 +36,9 @@ #define PF_LOCAL PF_UNIX #endif -/* - * ---------- Static data ---------- +/* ---------- + * Static data + * ---------- */ static int sched_status = SCHED_STATUS_OK; @@ -54,8 +55,9 @@ static pthread_cond_t sched_master_cond; -/* - * ---------- Local functions ---------- +/* ---------- + * Local functions + * ---------- */ static void *sched_mainloop(void *); static void sched_add_fdset(int fd, fd_set * fds); @@ -63,13 +65,14 @@ static void sched_shutdown(); -/* - * ---------- sched_start_mainloop +/* ---------- + * sched_start_mainloop * * Called from SlonMain() before starting up any worker thread. * * This will spawn the event scheduling thread that does the central select(2) - * system call. ---------- + * system call. + * ---------- */ int sched_start_mainloop(void) @@ -155,12 +158,13 @@ } -/* - * ---------- sched_wait_mainloop +/* ---------- + * sched_wait_mainloop * * Called from main() after all working threads according to the initial * configuration are started. Will wait until the scheduler mainloop - * terminates. ---------- + * terminates. + * ---------- */ int sched_wait_mainloop(void) @@ -177,13 +181,14 @@ } -/* - * ---------- sched_wait_conn +/* ---------- + * sched_wait_conn * * Assumes that the thread holds the lock on conn->conn_lock. * * Adds the connection to the central wait queue and wakes up the scheduler - * thread to reloop onto the select(2) call. ---------- + * thread to reloop onto the select(2) call. + * ---------- */ int sched_wait_conn(SlonConn * conn, int condition) @@ -245,8 +250,8 @@ } -/* - * ---------- sched_wait_time +/* ---------- + * sched_wait_time * * Assumes that the thread holds the lock on conn->conn_lock. * @@ -274,10 +279,11 @@ } -/* - * ---------- sched_msleep +/* ---------- + * sched_msleep * - * Use the schedulers event loop to sleep for msec milliseconds. ---------- + * Use the schedulers event loop to sleep for msec milliseconds. + * ---------- */ int sched_msleep(SlonNode * node, int msec) @@ -301,10 +307,11 @@ } -/* - * ---------- sched_get_status +/* ---------- + * sched_get_status * - * Return the current scheduler status in a thread safe fashion ---------- + * Return the current scheduler status in a thread safe fashion + * ---------- */ int sched_get_status(void) @@ -318,12 +325,13 @@ } -/* - * ---------- sched_wakeup_node +/* ---------- + * sched_wakeup_node * * Wakeup the threads (listen and worker) of one or all remote nodes to cause * them rechecking the current runtime status or adjust their configuration - * to changes. ---------- + * to changes. + * ---------- */ int sched_wakeup_node(int no_id) @@ -370,10 +378,11 @@ } -/* - * ---------- sched_mainloop +/* ---------- + * sched_mainloop * - * The thread handling the master scheduling. ---------- + * The thread handling the master scheduling. + * ---------- */ static void * sched_mainloop(void *dummy) @@ -626,10 +635,8 @@ */ /* - close(sched_wakeuppipe[0]); - sched_wakeuppipe[0] = -1; - close(sched_wakeuppipe[1]); - sched_wakeuppipe[1] = -1; + * close(sched_wakeuppipe[0]); sched_wakeuppipe[0] = -1; + * close(sched_wakeuppipe[1]); sched_wakeuppipe[1] = -1; */ /* @@ -664,11 +671,12 @@ } -/* - * ---------- sched_add_fdset +/* ---------- + * sched_add_fdset * * Add a file descriptor to one of the global scheduler sets and adjust - * sched_numfd accordingly. ---------- + * sched_numfd accordingly. + * ---------- */ static void sched_add_fdset(int fd, fd_set * fds) @@ -679,11 +687,12 @@ } -/* - * ---------- sched_add_fdset +/* ---------- + * sched_add_fdset * * Remove a file descriptor from one of the global scheduler sets and adjust - * sched_numfd accordingly. ---------- + * sched_numfd accordingly. + * ---------- */ static void sched_remove_fdset(int fd, fd_set * fds) @@ -701,3 +710,5 @@ } } } + + Index: snmp_thread.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/snmp_thread.c,v retrieving revision 1.2 retrieving revision 1.3 diff -Lsrc/slon/snmp_thread.c -Lsrc/slon/snmp_thread.c -u -w -r1.2 -r1.3 --- src/slon/snmp_thread.c +++ src/slon/snmp_thread.c @@ -17,7 +17,8 @@ extern int slon_log_level; -void init_nstAgentSubagentObject(void) +void +init_nstAgentSubagentObject(void) { static oid nstAgentSubagentObject_oid[] = { 1, 3, 6, 1, 4, 1, 20366, 32, 2, 3, 32, 1 }; @@ -29,7 +30,8 @@ } -void *snmpThread_main(void *dummy) +void * +snmpThread_main(void *dummy) { int agentx_subagent=1; @@ -52,11 +54,12 @@ netsnmp_ds_set_boolean(NETSNMP_DS_APPLICATION_ID, NETSNMP_DS_AGENT_ROLE, 1); - /* ******************************************************************** + /* + * ******************************************************************** * If we are running slon as root allow the snmp agent to have full * access to it's internals, this is required to run as a master agent - * (from my understanding) - **********************************************************************/ + * (from my understanding) ******************************************************************** + */ if (getuid() !=0) { @@ -68,7 +71,10 @@ init_agent("slon-demon"); - /* initialize the mib code found in: init_nstAgentSubagentObject from nstAgentSubagentObject.C */ + /* + * initialize the mib code found in: init_nstAgentSubagentObject from + * nstAgentSubagentObject.C + */ init_nstAgentSubagentObject(); /* initialize vacm/usm access control */ @@ -84,7 +90,8 @@ /* If we're going to be a snmp master agent, initial the ports */ if (!agentx_subagent) { - init_master_agent(); /* open the port to listen on (defaults to udp:161) */ + init_master_agent(); /* open the port to listen on (defaults to + * udp:161) */ } while(true) Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.98 retrieving revision 1.99 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.98 -r1.99 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -29,8 +29,9 @@ #include "confoptions.h" -/* - * ---------- Local definitions ---------- +/* ---------- + * Local definitions + * ---------- */ /* @@ -224,8 +225,10 @@ int min_sync; int quit_sync_provider; int quit_sync_finalsync; -/* - * ---------- Local functions ---------- + +/* ---------- + * Local functions + * ---------- */ static void adjust_provider_info(SlonNode * node, WorkerGroupData * wd, int cleanup); @@ -260,11 +263,12 @@ #define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive(); -/* - * ---------- slon_remoteWorkerThread +/* ---------- + * slon_remoteWorkerThread * * Listen for events on the local database connection. This means, events - * generated by the local node only. ---------- + * generated by the local node only. + * ---------- */ void * remoteWorkerThread_main(void *cdata) @@ -478,9 +482,12 @@ * Estimate an "ideal" number of syncs based on how long * they took last time */ - if (desired_sync_time != 0) { + if (desired_sync_time != 0) + { ideal_sync = (last_sync_group_size * desired_sync_time) / last_sync_length; - } else { + } + else + { ideal_sync = sync_group_maxsize; } max_sync = ((last_sync_group_size * 110) / 100) + 1; @@ -494,13 +501,20 @@ } - /* Quit upon receiving event # quit_sync_number from node # quit_sync_provider */ - if (quit_sync_provider != 0) { - if (quit_sync_provider == node->no_id) { - if ((next_sync_group_size + (event->ev_seqno)) > quit_sync_finalsync) { + /* + * Quit upon receiving event # quit_sync_number from node # + * quit_sync_provider + */ + if (quit_sync_provider != 0) + { + if (quit_sync_provider == node->no_id) + { + if ((next_sync_group_size + (event->ev_seqno)) > quit_sync_finalsync) + { next_sync_group_size = quit_sync_finalsync - event->ev_seqno; } - if (event->ev_seqno >= quit_sync_finalsync) { + if (event->ev_seqno >= quit_sync_finalsync) + { slon_log(SLON_FATAL, "ABORT at sync %d per command line request%n", quit_sync_finalsync); slon_retry(); } @@ -609,9 +623,11 @@ no_id, no_comment, no_spool); need_reloadListen = true; - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_NODE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -633,9 +649,11 @@ need_reloadListen = true; - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- ENABLE_NODE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -690,9 +708,11 @@ rtcfg_cluster_name); need_reloadListen = true; - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_NODE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -715,9 +735,11 @@ pa_server, pa_client, pa_conninfo, pa_connretry); need_reloadListen = true; - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_PATH"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -738,9 +760,11 @@ pa_server, pa_client); need_reloadListen = true; - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_PATH"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -761,9 +785,11 @@ "select %s.storeListen_int(%d, %d, %d); ", rtcfg_namespace, li_origin, li_provider, li_receiver); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_LISTEN"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -783,9 +809,11 @@ "select %s.dropListen_int(%d, %d, %d); ", rtcfg_namespace, li_origin, li_provider, li_receiver); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_LISTEN"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -807,9 +835,11 @@ rtcfg_namespace, set_id, set_origin, set_comment); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_SET"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -826,17 +856,21 @@ "select %s.dropSet_int(%d); ", rtcfg_namespace, set_id); - /* The table deleted needs to be - * dropped from log shipping too */ - if (archive_dir) { + /* + * The table deleted needs to be dropped from log shipping too + */ + if (archive_dir) + { rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -846,13 +880,15 @@ " where ssy_setid= %d;", rtcfg_namespace, set_id); rc = submit_query_to_archive(&query1); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = close_log_archive(); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -863,6 +899,7 @@ { int set_id = (int)strtol(event->ev_data1, NULL, 10); int add_id = (int)strtol(event->ev_data2, NULL, 10); + rtcfg_dropSet(add_id); slon_appendquery(&query1, @@ -870,19 +907,22 @@ rtcfg_namespace, set_id, add_id); - /* Log shipping gets the change here - * that we need to delete the table - * being merged from the set being - * maintained. */ - if (archive_dir) { + /* + * Log shipping gets the change here that we need to delete + * the table being merged from the set being maintained. + */ + if (archive_dir) + { rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -892,13 +932,15 @@ " where ssy_setid= %d;", rtcfg_namespace, add_id); rc = submit_query_to_archive(&query1); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = close_log_archive(); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -912,9 +954,11 @@ * subscribed sets yet and table information is not maintained * in the runtime configuration. */ - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -928,9 +972,11 @@ * subscribed sets yet and sequences information is not * maintained in the runtime configuration. */ - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -944,9 +990,11 @@ slon_appendquery(&query1, "select %s.setDropTable_int(%d);", rtcfg_namespace, tab_id); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -960,9 +1008,11 @@ slon_appendquery(&query1, "select %s.setDropSequence_int(%d);", rtcfg_namespace, seq_id); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -977,9 +1027,11 @@ slon_appendquery(&query1, "select %s.setMoveTable_int(%d, %d);", rtcfg_namespace, tab_id, new_set_id); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -994,9 +1046,11 @@ slon_appendquery(&query1, "select %s.setMoveSequence_int(%d, %d);", rtcfg_namespace, seq_id, new_set_id); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -1012,9 +1066,11 @@ "select %s.storeTrigger_int(%d, '%q'); ", rtcfg_namespace, trig_tabid, trig_tgname); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -1030,9 +1086,11 @@ "select %s.dropTrigger_int(%d, '%q'); ", rtcfg_namespace, trig_tabid, trig_tgname); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -1041,8 +1099,10 @@ } else if (strcmp(event->ev_type, "ACCEPT_SET") == 0) { - int set_id, old_origin, - new_origin, event_no; + int set_id, + old_origin, + new_origin, + event_no; PGresult *res; slon_log(SLON_DEBUG2, "start processing ACCEPT_SET\n"); @@ -1057,17 +1117,17 @@ slon_log(SLON_DEBUG2, "got parms ACCEPT_SET\n"); - /* If we're a remote node, and haven't yet - * received the MOVE/FAILOVER_SET event from the - * new origin, then we'll need to sleep a - * bit... This avoids a race condition - * where new SYNCs take place on the new - * origin, and are ignored on some - * subscribers (and their children) - * because the MOVE_SET wasn't yet - * received and processed */ + /* + * If we're a remote node, and haven't yet received the + * MOVE/FAILOVER_SET event from the new origin, then we'll + * need to sleep a bit... This avoids a race condition where + * new SYNCs take place on the new origin, and are ignored on + * some subscribers (and their children) because the MOVE_SET + * wasn't yet received and processed + */ - if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) { + if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) + { slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin\n"); slon_mkquery(&query2, "select 1 from %s.sl_event " @@ -1109,7 +1169,9 @@ slon_retry(); need_reloadListen = true; - } else { + } + else + { slon_log(SLON_DEBUG2, "ACCEPT_SET - on origin node...\n"); } @@ -1178,9 +1240,11 @@ rtcfg_namespace, failed_node, backup_node, set_id); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- FAILOVER_SET"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -1202,9 +1266,11 @@ "select %s.subscribeSet_int(%d, %d, %d, '%q'); ", rtcfg_namespace, sub_set, sub_provider, sub_receiver, sub_forward); - if (archive_dir) { + if (archive_dir) + { rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET"); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -1316,9 +1382,11 @@ rtcfg_namespace, sub_set, sub_provider, sub_receiver); } - /* Note: No need to do anything based - on archive_dir here; copy_set does - that nicely already. */ + + /* + * Note: No need to do anything based on archive_dir here; + * copy_set does that nicely already. + */ need_reloadListen = true; } else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0) @@ -1336,15 +1404,18 @@ sub_set, sub_receiver); need_reloadListen = true; - if (archive_dir) { + if (archive_dir) + { rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -1354,13 +1425,15 @@ " where ssy_setid= %d;", rtcfg_namespace, sub_set); rc = submit_query_to_archive(&query1); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = close_log_archive(); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); @@ -1380,32 +1453,38 @@ ddl_setid, ddl_script, ddl_only_on_node); /* DDL_SCRIPT needs to be turned into a log shipping script */ - if (archive_dir) { - if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) { + if (archive_dir) + { + if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) + { rc = open_log_archive(node->no_id, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not open DDL archive file %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } generate_archive_header(node->no_id, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not generate DDL archive header %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not generate DDL archive tracker %s - %s", node->no_id, archive_tmp, strerror(errno)); slon_retry(); } rc = submit_string_to_archive(ddl_script); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not submit DDL Script %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -1413,7 +1492,8 @@ } rc = close_log_archive(); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not close DDL Script %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -1500,6 +1580,10 @@ } +/* ---------- + * adjust_provider_info + * ---------- + */ static void adjust_provider_info(SlonNode * node, WorkerGroupData * wd, int cleanup) { @@ -1772,11 +1856,12 @@ } -/* - * ---------- remoteWorker_event +/* ---------- + * remoteWorker_event * * Used by the remoteListeThread to forward events selected from the event - * provider database to the remote nodes worker thread. ---------- + * provider database to the remote nodes worker thread. + * ---------- */ void remoteWorker_event(int event_provider, @@ -1971,11 +2056,12 @@ } -/* - * ---------- remoteWorker_wakeup +/* ---------- + * remoteWorker_wakeup * * Send a special WAKEUP message to a worker, causing it to recheck the runmode - * and the runtime configuration. ---------- + * and the runtime configuration. + * ---------- */ void remoteWorker_wakeup(int no_id) @@ -2020,10 +2106,11 @@ } -/* - * ---------- remoteWorker_confirm +/* ---------- + * remoteWorker_confirm * - * Add a confirm message to the remote worker message queue ---------- + * Add a confirm message to the remote worker message queue + * ---------- */ void remoteWorker_confirm(int no_id, @@ -2121,10 +2208,11 @@ } -/* - * ---------- query_execute +/* ---------- + * query_execute * - * Execute a query string that does not return a result set. ---------- + * Execute a query string that does not return a result set. + * ---------- */ static int query_execute(SlonNode * node, PGconn *dbconn, SlonDString * dsp) @@ -2151,8 +2239,8 @@ } -/* - * ---------- query_append_event +/* ---------- + * query_append_event * * Add queries to a dstring that notify for Event and Confirm and that insert a * duplicate of an event record as well as the confirmation for it. @@ -2220,10 +2308,11 @@ } -/* - * ---------- store_confirm_forward +/* ---------- + * store_confirm_forward * - * Call the forwardConfirm() stored procedure. ---------- + * Call the forwardConfirm() stored procedure. + * ---------- */ static void store_confirm_forward(SlonNode * node, SlonConn * conn, @@ -2314,11 +2403,12 @@ } -/* - * ---------- get_last_forwarded_confirm +/* ---------- + * get_last_forwarded_confirm * * Look what confirmed event seqno we forwarded last for a given origin+receiver - * pair. ---------- + * pair. + * ---------- */ static int64 get_last_forwarded_confirm(int origin, int receiver) @@ -2348,6 +2438,10 @@ } +/* ---------- + * copy_set + * ---------- + */ static int copy_set(SlonNode * node, SlonConn * local_conn, int set_id, SlonWorkMsg_event * event) @@ -2385,7 +2479,6 @@ #ifdef HAVE_PQPUTCOPYDATA char *copydata = NULL; - #else char copybuf[8192]; int copydone; @@ -2465,12 +2558,16 @@ /* Log Shipping Support begins... */ - /* - Open the log, put the header in - Isn't it convenient that seqbuf was just populated??? :-) + + /* + * - Open the log, put the header in Isn't it convenient that seqbuf was + * just populated??? :-) */ - if (archive_dir) { + if (archive_dir) + { rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not open COPY SET archive file %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -2483,7 +2580,8 @@ return -1; } rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not generate COPY SET archive header %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -2496,6 +2594,7 @@ return -1; } } + /* * Register this connection in sl_nodelock */ @@ -2578,12 +2677,16 @@ } } - /* check tables/sequences in set to make sure they are there - * and in good order. Don't copy any data yet; we want to - * just do a first pass that finds "bozo errors" */ + /* + * check tables/sequences in set to make sure they are there and in good + * order. Don't copy any data yet; we want to just do a first pass that + * finds "bozo errors" + */ - /* Check tables and sequences in set to make sure they are all - * appropriately configured... */ + /* + * Check tables and sequences in set to make sure they are all + * appropriately configured... + */ /* * Select the list of all tables the provider currently has in the set. @@ -2700,7 +2803,8 @@ slon_mkquery(&query3, "select * from %s limit 0;", tab_fqname); res2 = PQexec(loc_dbconn, dstring_data(&query3)); - if (PQresultStatus(res2) != PGRES_TUPLES_OK) { + if (PQresultStatus(res2) != PGRES_TUPLES_OK) + { slon_log (SLON_ERROR, "remoteWorkerThread_%d: Could not find table %s " "on subscriber\n", node->no_id, tab_fqname); PQclear(res2); @@ -2718,6 +2822,7 @@ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " "all tables for set %d found on subscriber\n", node->no_id, set_id); + /* * Add in the sequences contained in the set */ @@ -3002,7 +3107,8 @@ res3 = PQexec(pro_dbconn, dstring_data(&query2)); - if (PQresultStatus(res3) != PGRES_TUPLES_OK) { + if (PQresultStatus(res3) != PGRES_TUPLES_OK) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n", node->no_id, dstring_data(&query2), PQresultErrorMessage(res3)); @@ -3021,7 +3127,8 @@ rtcfg_namespace); res4 = PQexec(loc_dbconn, dstring_data(&query2)); - if (PQresultStatus(res4) != PGRES_TUPLES_OK) { + if (PQresultStatus(res4) != PGRES_TUPLES_OK) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n", node->no_id, dstring_data(&query2), PQresultErrorMessage(res4)); @@ -3068,12 +3175,14 @@ terminate_log_archive(); return -1; } - if (archive_dir) { + if (archive_dir) + { slon_mkquery(&query1, "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname, nodeon73 ? "" : PQgetvalue(res3, 0, 0)); rc = submit_query_to_archive(&query1); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_d: " "Could not generate copy_set request for %s - %s", node->no_id, tab_fqname, strerror(errno)); @@ -3087,6 +3196,7 @@ return -1; } } + /* * Begin a COPY to stdout for the table on the provider DB */ @@ -3148,9 +3258,11 @@ terminate_log_archive(); return -1; } - if (archive_dir) { + if (archive_dir) + { rc = fwrite(copydata, 1, len, archive_fp); - if (rc != len) { + if (rc != len) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "PQputCopyData() - log shipping - %s", node->no_id, strerror(errno)); @@ -3255,7 +3367,8 @@ terminate_log_archive(); return -1; } - if (archive_dir) { + if (archive_dir) + { rc = submit_string_to_archive("\\."); } #else /* ! HAVE_PQPUTCOPYDATA */ @@ -3293,9 +3406,11 @@ } } PQputline(loc_dbconn, "\\.\n"); - if (archive_dir) { + if (archive_dir) + { rc = submit_string_to_archive("\\."); } + /* * End the COPY to stdout on the provider */ @@ -3366,7 +3481,8 @@ terminate_log_archive(); return -1; } - if (archive_dir) { + if (archive_dir) + { submit_query_to_archive(&query1); } @@ -3437,7 +3553,8 @@ "select \"pg_catalog\".setval('%q', '%s'); ", seq_fqname, seql_last_value); - if (archive_dir) { + if (archive_dir) + { submit_query_to_archive(&query1); } } @@ -3722,13 +3839,15 @@ terminate_log_archive(); return -1; } - if (archive_dir) { + if (archive_dir) + { slon_mkquery(&query1, "insert into %s.sl_setsync_offline () " "values ('%d', '%d');", rtcfg_namespace, set_id, ssy_seqno); rc = submit_query_to_archive(&query1); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " " could not insert to sl_setsync_offline", node->no_id); @@ -3747,9 +3866,11 @@ node->no_id, TIMEVAL_DIFF(&tv_start2, &tv_now)); - if (archive_dir) { + if (archive_dir) + { rc = close_log_archive(); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " " could not close archive log %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -3795,6 +3916,11 @@ return 0; } + +/* ---------- + * sync_event + * ---------- + */ static int sync_event(SlonNode * node, SlonConn * local_conn, WorkerGroupData * wd, SlonWorkMsg_event * event) @@ -3832,13 +3958,14 @@ dstring_init(&query); /* - * If this slon is running in log archiving mode, open a - * temporary file for it. + * If this slon is running in log archiving mode, open a temporary file + * for it. */ if (archive_dir) { rc = open_log_archive(node->no_id, seqbuf); - if (rc == -1) { + if (rc == -1) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Cannot open archive file %s - %s\n", node->no_id, archive_tmp, strerror(errno)); @@ -3846,7 +3973,8 @@ return 60; } rc = generate_archive_header(node->no_id, seqbuf); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Cannot write to archive file %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -4169,9 +4297,12 @@ slon_log(SLON_DEBUG2, " ssy_action_list value: %s length: %d\n", ssy_action_list, actionlist_len); - if (actionlist_len == 0) { + if (actionlist_len == 0) + { slon_appendquery(provider_qual, "\n) "); - } else { + } + else + { dstring_init(&actionseq_subquery); compress_actionseq(ssy_action_list, &actionseq_subquery); slon_appendquery(provider_qual, @@ -4183,9 +4314,9 @@ PQclear(res2); /* - * Add a call to the setsync tracking function to - * the archive log. This function ensures that all - * archive log files are applied in the right order. + * Add a call to the setsync tracking function to the archive log. + * This function ensures that all archive log files are applied in + * the right order. */ if (archive_dir) { @@ -4195,7 +4326,8 @@ rc = logarchive_tracking(rtcfg_namespace, sub_set, PQgetvalue(res1, tupno1, 1), seqbuf, event->ev_timestamp_c); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Cannot write to archive file %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -4231,9 +4363,11 @@ "no sets need syncing for this event\n", node->no_id); dstring_free(&query); - if (archive_dir) { + if (archive_dir) + { rc = close_log_archive(); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not close out archive file %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -4351,13 +4485,18 @@ PQclear(res1); /* - * Add the user data modification part to - * the archive log. + * Add the user data modification part to the archive log. */ - if (archive_dir) { + if (archive_dir) + { rc = submit_string_to_archive(dstring_data(&(wgline->data))); - /* rc = fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); */ - if (rc < 0) { + + /* + * rc = fprintf(archive_fp, "%s", + * dstring_data(&(wgline->data))); + */ + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Cannot write to archive file %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -4512,7 +4651,8 @@ rtcfg_namespace, seql_seqid, seql_last_value); rc = submit_query_to_archive(&query); - if (rc < 0) { + if (rc < 0) + { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Cannot write to archive file %s - %s", node->no_id, archive_tmp, strerror(errno)); @@ -4609,8 +4749,8 @@ PQclear(res1); /* - * Add the final commit to the archive log, close it and rename - * the temporary file to the real log chunk filename. + * Add the final commit to the archive log, close it and rename the + * temporary file to the real log chunk filename. */ if (archive_dir) { @@ -4630,6 +4770,10 @@ } +/* ---------- + * sync_helper + * ---------- + */ static void * sync_helper(void *cdata) { @@ -5007,32 +5151,43 @@ } } -/* Functions for processing log archives... - - - First, you open the log archive using open_log_archive() - - - Second, you generate the header using generate_archive_header() - - - Third, you need to set up the sync tracking function in the log - using logarchive_tracking() - - ============= Here Ends The Header of the Log Shipping Archive ================== - - Then come the various queries (inserts/deletes/updates) that - comprise the "body" of the SYNC. Probably submitted using - submit_query_to_archive(). - - ============= Here Ends The Body of the Log Shipping Archive ================== - - Finally, the log ends, notably with a COMMIT statement, generated - using close_log_archive(), which closes the file and renames it - from ".tmp" form to the final name. +/* ---------- + * Functions for processing log archives... + * + * - First, you open the log archive using open_log_archive() + * + * - Second, you generate the header using generate_archive_header() + * + * - Third, you need to set up the sync tracking function in the log + * using logarchive_tracking() + * + * ======== Here Ends The Header of the Log Shipping Archive ======== + * + * Then come the various queries (inserts/deletes/updates) that + * comprise the "body" of the SYNC. Probably submitted using + * submit_query_to_archive(). + * + * ======== Here Ends The Body of the Log Shipping Archive ======== + * + * Finally, the log ends, notably with a COMMIT statement, generated + * using close_log_archive(), which closes the file and renames it + * from ".tmp" form to the final name. + * ---------- */ -/* Stores the archive name in archive_name (as .sql name) and archive_tmp (.tmp file) */ -int open_log_archive (int node_id, char *seqbuf) { +/* ---------- + * open_log_archive + * + * Stores the archive name in archive_name (as .sql name) and + * archive_tmp (.tmp file) + * ---------- + */ +int +open_log_archive(int node_id, char *seqbuf) +{ int i; + sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id); for (i = strlen(seqbuf); i < 20; i++) strcat(archive_name, "0"); @@ -5041,16 +5196,27 @@ strcpy(archive_tmp, archive_name); strcat(archive_tmp, ".tmp"); archive_fp = fopen(archive_tmp, "w"); - if (archive_fp == NULL) { + if (archive_fp == NULL) + { return -1; - } else { + } + else + { return 0; } } -int close_log_archive () { +/* ---------- + * close_log_archive + * ---------- + */ +int +close_log_archive() +{ int rc = 0; - if (archive_dir) { + + if (archive_dir) + { rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n" "-- End Of Archive Log\n" @@ -5069,9 +5235,16 @@ return rc; } -int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, - const char *seqbuf, const char *timestamp) { - return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n" +/* ---------- + * logarchive_tracking + * ---------- + */ +int +logarchive_tracking(const char *namespace, int sub_set, const char *firstseq, + const char *seqbuf, const char *timestamp) +{ + return fprintf(archive_fp, + "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n" "-- end of log archiving header\n" "------------------------------------------------------------------\n" "-- start of Slony-I data\n" @@ -5079,26 +5252,58 @@ namespace, sub_set, firstseq, seqbuf, timestamp); } -int submit_query_to_archive(SlonDString *ds) { +/* ---------- + * submit_query_to_archive + * ---------- + */ +int +submit_query_to_archive(SlonDString * ds) +{ return fprintf(archive_fp, "%s\n", ds->data); } -int submit_string_to_archive (const char *s) { +/* ---------- + * submit_string_to_archive + * ---------- + */ +int +submit_string_to_archive(const char *s) +{ return fprintf(archive_fp, "%s\n", s); } -/* Raw form used for COPY where we don't want any extra cr/lf output */ -int submit_raw_data_to_archive (const char *s) { +/* ---------- + * submit_raw_data_to_archive + * + * Raw form used for COPY where we don't want any extra cr/lf output + * ---------- + */ +int +submit_raw_data_to_archive(const char *s) +{ return fprintf(archive_fp, "%s", s); } -void terminate_log_archive () { - if (archive_fp) { +/* ---------- + * terminate_log_archive + * ---------- + */ +void +terminate_log_archive() +{ + if (archive_fp) + { fclose(archive_fp); } } -int generate_archive_header (int node_id, const char *seqbuf) { +/* ---------- + * generate_archive_header + * ---------- + */ +int +generate_archive_header(int node_id, const char *seqbuf) +{ return fprintf(archive_fp, "-- Slony-I log shipping archive\n" "-- Node %d, Event %s\n" @@ -5106,11 +5311,18 @@ node_id, seqbuf); } -/* write_void_log() writes out a "void" log consisting of the message - * which must either be a valid SQL query or a SQL comment. */ - -int write_void_log (int node_id, char *seqbuf, const char *message) { +/* ---------- + * write_void_log + * + * writes out a "void" log consisting of the message which must either + * be a valid SQL query or a SQL comment. + * ---------- + */ +int +write_void_log(int node_id, char *seqbuf, const char *message) +{ int rc; + rc = open_log_archive(node_id, seqbuf); if (rc < 0) return rc; @@ -5124,20 +5336,23 @@ return rc; } -/* given a string consisting of a list of actionseq values, return a - string that compresses this into a set of log_actionseq ranges - - Thus, "'13455','13456','13457','13458','13459','13460','13462'" - compresses into... - - log_actionseq not between '13455' and '13460' and - log_actionseq <> '13462' - - There is an expectation that the actionseq values are being - returned more or less in order; if that is even somewhat loosely - the case, this will lead to a pretty spectacular compression of - values if the SUBSCRIBE_SET runs for a long time thereby leading to - there being Really A Lot of log entries to exclude. */ +/* ---------- + * given a string consisting of a list of actionseq values, return a + * string that compresses this into a set of log_actionseq ranges + * + * Thus, "'13455','13456','13457','13458','13459','13460','13462'" + * compresses into... + * + * log_actionseq not between '13455' and '13460' and + * log_actionseq <> '13462' + * + * There is an expectation that the actionseq values are being + * returned more or less in order; if that is even somewhat loosely + * the case, this will lead to a pretty spectacular compression of + * values if the SUBSCRIBE_SET runs for a long time thereby leading to + * there being Really A Lot of log entries to exclude. + * ---------- + */ #define START_STATE 1 #define COLLECTING_DIGITS 2 @@ -5146,12 +5361,21 @@ #define MINMAXINITIAL -1 -void compress_actionseq (const char *ssy_actionlist, SlonDString *action_subquery) { +/* ---------- + * compress_actionseq + * ---------- + */ +void +compress_actionseq(const char *ssy_actionlist, SlonDString * action_subquery) +{ int state; - int curr_number, curr_min, curr_max; + int curr_number, + curr_min, + curr_max; int curr_digit; int first_subquery; char curr_char; + curr_min = MINMAXINITIAL; curr_max = MINMAXINITIAL; first_subquery = 1; @@ -5159,143 +5383,196 @@ slon_mkquery(action_subquery, " "); slon_log(SLON_DEBUG3, "compress_actionseq(list,subquery) Action list: %s\n", ssy_actionlist); - while (state != DONE) { + while (state != DONE) + { curr_char = *ssy_actionlist; - switch (curr_char) { + switch (curr_char) + { case '\0': state = DONE; break; case '0': curr_digit = 0; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '1': curr_digit = 1; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '2': curr_digit = 2; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '3': curr_digit = 3; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '4': curr_digit = 4; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '5': curr_digit = 5; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '6': curr_digit = 6; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '7': curr_digit = 7; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '8': curr_digit = 8; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '9': curr_digit = 9; - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { curr_number = curr_number * 10 + curr_digit; - } else { + } + else + { state = COLLECTING_DIGITS; curr_number = curr_digit; } break; case '\'': case ',': - if (state == COLLECTING_DIGITS) { + if (state == COLLECTING_DIGITS) + { /* Finished another number... Fold it into the ranges... */ slon_log(SLON_DEBUG4, "Finished number: %d\n", curr_number); - /* If we haven't a range, then the range is the current - number */ - if (curr_min == MINMAXINITIAL) { + + /* + * If we haven't a range, then the range is the current + * number + */ + if (curr_min == MINMAXINITIAL) + { curr_min = curr_number; } - if (curr_max == MINMAXINITIAL) { + if (curr_max == MINMAXINITIAL) + { curr_max = curr_number; } - /* If the number pushes the range outwards by 1, - then shift the range by 1... */ - if (curr_number == curr_min - 1) { + + /* + * If the number pushes the range outwards by 1, then + * shift the range by 1... + */ + if (curr_number == curr_min - 1) + { curr_min --; } - if (curr_number == curr_max + 1) { + if (curr_number == curr_max + 1) + { curr_max ++; } /* If the number is inside the range, do nothing */ - if ((curr_number >= curr_min) && (curr_number <= curr_max)) { + if ((curr_number >= curr_min) && (curr_number <= curr_max)) + { /* Do nothing - inside the range */ } - /* If the number is outside the range, then - generate a subquery based on the range, and - have the new number become the new range */ - if ((curr_number < curr_min - 1) || (curr_number > curr_max + 1)) { - if (first_subquery) { + /* + * If the number is outside the range, then generate a + * subquery based on the range, and have the new number + * become the new range + */ + if ((curr_number < curr_min - 1) || (curr_number > curr_max + 1)) + { + if (first_subquery) + { first_subquery = 0; - } else { + } + else + { slon_appendquery(action_subquery, " and "); } - if (curr_max == curr_min) { + if (curr_max == curr_min) + { slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max); slon_appendquery(action_subquery, " log_actionseq <> '%d' ", curr_max); - } else { + } + else + { slon_log(SLON_DEBUG4, "between entry - %d %d\n", curr_min, curr_max); slon_appendquery(action_subquery, @@ -5313,17 +5590,24 @@ ssy_actionlist++; } /* process last range, if it exists */ - if (curr_min || curr_max) { - if (first_subquery) { + if (curr_min || curr_max) + { + if (first_subquery) + { first_subquery = 0; - } else { + } + else + { slon_appendquery(action_subquery, " and "); } - if (curr_max == curr_min) { + if (curr_max == curr_min) + { slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max); slon_appendquery(action_subquery, " log_actionseq <> '%d' ", curr_max); - } else { + } + else + { slon_log(SLON_DEBUG4, "between entry - %d %d\n", curr_min, curr_max); slon_appendquery(action_subquery, @@ -5335,3 +5619,5 @@ } slon_log(SLON_DEBUG3, " compressed actionseq subquery... %s\n", dstring_data(action_subquery)); } + + Index: remote_listen.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v retrieving revision 1.24 retrieving revision 1.25 diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.24 -r1.25 --- src/slon/remote_listen.c +++ src/slon/remote_listen.c @@ -1,4 +1,4 @@ -/*------------------------------------------------------------------------- +/* ---------------------------------------------------------------------- * remote_listen.c * * Implementation of the thread listening for events on @@ -8,7 +8,7 @@ * Author: Jan Wieck, Afilias USA INC. * * $Id$ - *------------------------------------------------------------------------- + * ---------------------------------------------------------------------- */ @@ -29,8 +29,8 @@ #include "slon.h" -/* - * ---------- struct listat +/* ---------- + * struct listat * * local data structure for nodes we are currently listening for events from. * ---------- @@ -44,8 +44,9 @@ }; -/* - * ---------- Local functions ---------- +/* ---------- + * Local functions + * ---------- */ static void remoteListen_adjust_listat(SlonNode * node, struct listat **listat_head, @@ -59,11 +60,12 @@ extern char *lag_interval; -/* - * ---------- slon_remoteListenThread +/* ---------- + * slon_remoteListenThread * * Listen for events on a remote database connection. This means, events - * generated by every other node we listen for on this one. ---------- + * generated by every other node we listen for on this one. + * ---------- */ void * remoteListenThread_main(void *cdata) @@ -223,8 +225,8 @@ dbconn = conn->dbconn; /* - * Listen on the connection for events and confirmations - * and register the node connection. + * Listen on the connection for events and confirmations and + * register the node connection. */ slon_mkquery(&query1, "listen \"_%s_Event\"; " @@ -385,8 +387,8 @@ } -/* - * ---------- remoteListen_adjust_listat +/* ---------- + * remoteListen_adjust_listat * * local function to (re)adjust the known nodes to the global configuration. * ---------- @@ -489,10 +491,11 @@ } -/* - * ---------- remoteListen_cleanup +/* ---------- + * remoteListen_cleanup * - * Free resources used by the remoteListen thread ---------- + * Free resources used by the remoteListen thread + * ---------- */ static void remoteListen_cleanup(struct listat **listat_head, struct listat **listat_tail) @@ -510,13 +513,14 @@ } -/* - * ---------- remoteListen_forward_confirm +/* ---------- + * remoteListen_forward_confirm * - * Read the last confirmed event sequence for all nodes from the remote database - * and forward it to the local database so that the cleanup process can know - * when all nodes have confirmed an event so it may be safely thrown away (together - * with its log data). ---------- + * Read the last confirmed event sequence for all nodes from the remote + * database and forward it to the local database so that the cleanup + * process can know when all nodes have confirmed an event so it may + * be safely thrown away (together with its log data). + * ---------- */ static int remoteListen_forward_confirm(SlonNode * node, SlonConn * conn) @@ -574,8 +578,8 @@ } -/* - * ---------- remoteListen_receive_events +/* ---------- + * remoteListen_receive_events * * Retrieve all new events that origin from nodes for which we listen on this * node as provider and add them to the node specific worker message queue. @@ -622,7 +626,8 @@ rtcfg_lock(); where_or_or = "where"; - if (lag_interval) { + if (lag_interval) + { dstring_init(&q2); slon_mkquery(&q2, "where ev_timestamp < now() - '%s'::interval and (", lag_interval); where_or_or = dstring_data(&q2); @@ -646,7 +651,8 @@ where_or_or = "or"; listat = listat->next; } - if (lag_interval) { + if (lag_interval) + { slon_appendquery(&query, ")"); } slon_appendquery(&query, " order by e.ev_origin, e.ev_seqno"); Index: sync_thread.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/sync_thread.c,v retrieving revision 1.16 retrieving revision 1.17 diff -Lsrc/slon/sync_thread.c -Lsrc/slon/sync_thread.c -u -w -r1.16 -r1.17 --- src/slon/sync_thread.c +++ src/slon/sync_thread.c @@ -11,9 +11,6 @@ */ -/* Note that in 1.1, generate_sync_event() is a stored procedure that - does roughly the same thing as this... */ - #include <pthread.h> #include <stdio.h> @@ -31,15 +28,16 @@ #include "slon.h" -/* - * ---------- Global variables ---------- +/* ---------- + * Global variables + * ---------- */ int sync_interval; int sync_interval_timeout; -/* - * ---------- slon_localSyncThread +/* ---------- + * slon_localSyncThread * * Generate SYNC event if local database activity created new log info. * ---------- Index: slon.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v retrieving revision 1.54 retrieving revision 1.55 diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.54 -r1.55 --- src/slon/slon.h +++ src/slon/slon.h @@ -324,6 +324,7 @@ * ---------- */ extern pid_t slon_pid; + #ifndef WIN32 extern pthread_mutex_t slon_watchdog_lock; extern pid_t slon_watchdog_pid; @@ -575,7 +576,6 @@ #define pipewrite(a,b,c) write(a,b,c) #endif - #endif /* SLON_H_INCLUDED */ Index: runtime_config.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/runtime_config.c,v retrieving revision 1.26 retrieving revision 1.27 diff -Lsrc/slon/runtime_config.c -Lsrc/slon/runtime_config.c -u -w -r1.26 -r1.27 --- src/slon/runtime_config.c +++ src/slon/runtime_config.c @@ -29,10 +29,12 @@ #include "slon.h" -/* - * ---------- Global data ---------- +/* ---------- + * Global data + * ---------- */ pid_t slon_pid; + #ifndef WIN32 pthread_mutex_t slon_watchdog_lock; pid_t slon_watchdog_pid; @@ -52,8 +54,9 @@ SlonNode *rtcfg_node_list_tail = NULL; -/* - * ---------- Local data ---------- +/* ---------- + * Local data + * ---------- */ static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t cfgseq_lock = PTHREAD_MUTEX_INITIALIZER; @@ -70,14 +73,16 @@ static struct to_activate *to_activate_tail = NULL; -/* - * ---------- Local functions ---------- +/* ---------- + * Local functions + * ---------- */ static void rtcfg_startStopNodeThread(SlonNode * node); -/* - * ---------- rtcfg_lock ---------- +/* ---------- + * rtcfg_lock + * ---------- */ void rtcfg_lock(void) @@ -86,8 +91,9 @@ } -/* - * ---------- rtcfg_unlock ---------- +/* ---------- + * rtcfg_unlock + * ---------- */ void rtcfg_unlock(void) @@ -96,8 +102,9 @@ } -/* - * ---------- rtcfg_storeNode ---------- +/* ---------- + * rtcfg_storeNode + * ---------- */ void rtcfg_storeNode(int no_id, char *no_comment) @@ -151,13 +158,14 @@ } -/* - * ---------- rtcfg_setNodeLastEvent() +/* ---------- + * rtcfg_setNodeLastEvent() * * Set the last_event field in the node runtime structure. * * Returns: 0 if the event_seq is <= the known value -1 if the node is - * not known event_seq otherwise ---------- + * not known event_seq otherwise + * ---------- */ int64 rtcfg_setNodeLastEvent(int no_id, int64 event_seq) @@ -189,10 +197,11 @@ } -/* - * ---------- rtcfg_getNodeLastEvent +/* ---------- + * rtcfg_getNodeLastEvent * - * Read the nodes last_event field ---------- + * Read the nodes last_event field + * ---------- */ int64 rtcfg_getNodeLastEvent(int no_id) @@ -214,8 +223,9 @@ } -/* - * ---------- rtcfg_enableNode ---------- +/* ---------- + * rtcfg_enableNode + * ---------- */ void rtcfg_enableNode(int no_id) @@ -249,8 +259,9 @@ } -/* - * ---------- slon_disableNode ---------- +/* ---------- + * slon_disableNode + * ---------- */ void rtcfg_disableNode(int no_id) @@ -286,8 +297,9 @@ } -/* - * ---------- rtcfg_findNode ---------- +/* ---------- + * rtcfg_findNode + * ---------- */ SlonNode * rtcfg_findNode(int no_id) @@ -304,8 +316,9 @@ } -/* - * ---------- rtcfg_storePath ---------- +/* ---------- + * rtcfg_storePath + * ---------- */ void rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry) @@ -350,8 +363,9 @@ } -/* - * ---------- rtcfg_dropPath ---------- +/* ---------- + * rtcfg_dropPath + * ---------- */ void rtcfg_dropPath(int pa_server) @@ -406,8 +420,9 @@ } -/* - * ---------- rtcfg_storeListen ---------- +/* ---------- + * rtcfg_storeListen + * ---------- */ void rtcfg_reloadListen(PGconn *db) @@ -470,8 +485,9 @@ } -/* - * ---------- rtcfg_storeListen ---------- +/* ---------- + * rtcfg_storeListen + * ---------- */ void rtcfg_storeListen(int li_origin, int li_provider) @@ -535,8 +551,9 @@ } -/* - * ---------- rtcfg_dropListen ---------- +/* ---------- + * rtcfg_dropListen + * ---------- */ void rtcfg_dropListen(int li_origin, int li_provider) @@ -593,6 +610,10 @@ } +/* ---------- + * rtcfg_storeSet + * ---------- + */ void rtcfg_storeSet(int set_id, int set_origin, char *set_comment) { @@ -656,6 +677,10 @@ } +/* ---------- + * rtcfg_dropSet + * ---------- + */ void rtcfg_dropSet(int set_id) { @@ -690,6 +715,10 @@ rtcfg_unlock(); } +/* ---------- + * rtcfg_moveSet + * ---------- + */ void rtcfg_moveSet(int set_id, int old_origin, int new_origin, int sub_provider) { @@ -738,6 +767,10 @@ } +/* ---------- + * rtcfg_storeSubscribe + * ---------- + */ void rtcfg_storeSubscribe(int sub_set, int sub_provider, char *sub_forward) { @@ -783,6 +816,10 @@ } +/* ---------- + * rtcfg_enableSubscription + * ---------- + */ void rtcfg_enableSubscription(int sub_set, int sub_provider, char *sub_forward) { @@ -825,6 +862,10 @@ } +/* ---------- + * rtcfg_unsubscribeSet + * ---------- + */ void rtcfg_unsubscribeSet(int sub_set) { @@ -866,8 +907,9 @@ } -/* - * ---------- rtcfg_startStopNodeThread ---------- +/* ---------- + * rtcfg_startStopNodeThread + * ---------- */ static void rtcfg_startStopNodeThread(SlonNode * node) @@ -1011,6 +1053,10 @@ } +/* ---------- + * rtcfg_needActivate + * ---------- + */ void rtcfg_needActivate(int no_id) { @@ -1027,6 +1073,10 @@ } +/* ---------- + * rtcfg_doActivate + * ---------- + */ void rtcfg_doActivate(void) { @@ -1041,6 +1091,10 @@ } +/* ---------- + * rtcfg_joinAllRemoteThreads + * ---------- + */ void rtcfg_joinAllRemoteThreads(void) { @@ -1096,6 +1150,10 @@ } +/* ---------- + * rtcfg_seq_bump + * ---------- + */ void rtcfg_seq_bump(void) { @@ -1105,6 +1163,10 @@ } +/* ---------- + * rtcfg_seq_get + * ---------- + */ int64 rtcfg_seq_get(void) { Index: misc.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/misc.c,v retrieving revision 1.21 retrieving revision 1.22 diff -Lsrc/slon/misc.c -Lsrc/slon/misc.c -u -w -r1.21 -r1.22 --- src/slon/misc.c +++ src/slon/misc.c @@ -63,13 +63,16 @@ extern char *Syslog_ident; static void write_syslog(int level, const char *line); - #else #define Use_syslog 0 #endif /* HAVE_SYSLOG */ +/* ---------- + * slon_log + * ---------- + */ void slon_log(Slon_Log_Level level, char *fmt,...) { @@ -196,11 +199,12 @@ } -/* +/* ---------- * scanint8 --- try to parse a string into an int8. * * If errorOK is false, ereport a useful error message if the string is bad. If * errorOK is true, just return "false" for bad input. + * ---------- */ int slon_scanint64(char *str, int64 * result) @@ -263,6 +267,10 @@ } #if HAVE_SYSLOG +/* ---------- + * write_syslog + * ---------- + */ static void write_syslog(int level, const char *line) { @@ -365,3 +373,5 @@ } #endif /* HAVE_SYSLOG */ + + Index: local_listen.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v retrieving revision 1.34 retrieving revision 1.35 diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.34 -r1.35 Index: cleanup_thread.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v retrieving revision 1.28 retrieving revision 1.29 diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.28 -r1.29 --- src/slon/cleanup_thread.c +++ src/slon/cleanup_thread.c @@ -28,8 +28,9 @@ #include "slon.h" -/* - * ---------- Global data ---------- +/* ---------- + * Global data + * ---------- */ int vac_frequency = SLON_VACUUM_FREQUENCY; static int vac_bias = 0; @@ -49,15 +50,18 @@ "pg_catalog.pg_listener" }; -#define MAX_VAC_TABLE 8 /* Add to this if additional tables are added above */ +#define MAX_VAC_TABLE 8 /* Add to this if additional tables are added + * above */ -static char tstring[255]; /* string used to store table names for the VACUUM statements */ +static char tstring[255]; /* string used to store table names for the + * VACUUM statements */ -/* - * ---------- cleanupThread_main +/* ---------- + * cleanupThread_main * * Periodically calls the stored procedure to remove old events and log data and - * vacuums those tables. ---------- + * vacuums those tables. + * ---------- */ void * cleanupThread_main(void *dummy) @@ -79,9 +83,12 @@ slon_log(SLON_DEBUG1, "cleanupThread: thread starts\n"); - /* Want the vacuum time bias to be between 0 and 100 seconds, - * hence between 0 and 100000 */ - if (vac_bias == 0) { + /* + * Want the vacuum time bias to be between 0 and 100 seconds, hence + * between 0 and 100000 + */ + if (vac_bias == 0) + { vac_bias = rand() % ( SLON_CLEANUP_SLEEP * 166 ); } slon_log(SLON_DEBUG4, "cleanupThread: bias = %d\n", vac_bias); @@ -111,10 +118,10 @@ /* * Loop until shutdown time arrived * - * Note the introduction of vac_bias and an up-to-100s random - * "fuzz"; this reduces the likelihood that having multiple - * slons hitting the same cluster will run into conflicts due - * to trying to vacuum pg_listener concurrently + * Note the introduction of vac_bias and an up-to-100s random "fuzz"; this + * reduces the likelihood that having multiple slons hitting the same + * cluster will run into conflicts due to trying to vacuum pg_listener + * concurrently */ while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, SLON_CLEANUP_SLEEP * 1000 + vac_bias + (rand() % (SLON_CLEANUP_SLEEP * 166))) == SCHED_STATUS_OK) { @@ -205,23 +212,30 @@ if (vac_frequency != 0 && ++vac_count >= vac_frequency) { unsigned long latest_xid; + vac_count = 0; latest_xid = get_earliest_xid(dbconn); - if (earliest_xid != latest_xid) { + if (earliest_xid != latest_xid) + { vacuum_action = "vacuum analyze"; - } else { + } + else + { vacuum_action = "analyze"; - slon_log(SLON_DEBUG4, "cleanupThread: xid %d still active - analyze instead\n", + slon_log(SLON_DEBUG4, + "cleanupThread: xid %d still active - analyze instead\n", earliest_xid); } earliest_xid = latest_xid; + /* * Build the query string for vacuuming replication runtime data * and event tables */ dstring_init(&query3); gettimeofday(&tv_start, NULL); - for (t=0; t < MAX_VAC_TABLE; t++) { + for (t = 0; t < MAX_VAC_TABLE; t++) + { sprintf(tstring, table_list[t], rtcfg_namespace); slon_mkquery(&query3, "%s %s;", @@ -235,8 +249,10 @@ "cleanupThread: \"%s\" - %s", dstring_data(&query3), PQresultErrorMessage(res)); PQclear(res); - /* slon_retry(); - break; */ + + /* + * slon_retry(); break; + */ } } PQclear(res); @@ -271,22 +287,30 @@ pthread_exit(NULL); } -/* get_earliest_xid() reads the earliest XID that is still active. - The idea is that if, between cleanupThread iterations, this XID has - not changed, then an old transaction is still in progress, - PostgreSQL is holding onto the tuples, and there is no value in - doing VACUUMs of the various Slony-I tables. +/* ---------- + * get_earliest_xid() + * + * reads the earliest XID that is still active. + * + * The idea is that if, between cleanupThread iterations, this XID has + * not changed, then an old transaction is still in progress, + * PostgreSQL is holding onto the tuples, and there is no value in + * doing VACUUMs of the various Slony-I tables. + * ---------- */ - -static unsigned long get_earliest_xid (PGconn *dbconn) { +static unsigned long +get_earliest_xid(PGconn *dbconn) +{ long long xid; PGresult *res; SlonDString query1; + dstring_init(&query1); slon_mkquery(&query1, "select %s.getMinXid();", rtcfg_namespace); res = PQexec(dbconn, dstring_data(&query1)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { slon_log(SLON_FATAL, "cleanupThread: could not getMinXid()!\n"); PQclear(res); slon_retry(); @@ -296,3 +320,5 @@ slon_log(SLON_DEBUG3, "cleanupThread: minxid: %d\n", xid); return (unsigned long) xid; } + + Index: confoptions.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.c,v retrieving revision 1.14 retrieving revision 1.15 diff -Lsrc/slon/confoptions.c -Lsrc/slon/confoptions.c -u -w -r1.14 -r1.15 Index: dbutils.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/dbutils.c,v retrieving revision 1.17 retrieving revision 1.18 diff -Lsrc/slon/dbutils.c -Lsrc/slon/dbutils.c -u -w -r1.17 -r1.18 --- src/slon/dbutils.c +++ src/slon/dbutils.c @@ -1,4 +1,4 @@ -/*------------------------------------------------------------------------- +/* ---------------------------------------------------------------------- * dbutils.c * * Database utility functions for Slony-I @@ -7,7 +7,7 @@ * Author: Jan Wieck, Afilias USA INC. * * $Id$ - *------------------------------------------------------------------------- + * ---------------------------------------------------------------------- */ @@ -31,20 +31,21 @@ static int slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap); -/* +/* ---- * This mutex is used to wrap around PQconnectdb. There's a problem that * occurs when your libpq is compiled with libkrb (kerberos) which is not * threadsafe. It is especially odd because I'm not using kerberos. * * This is fixed in libpq in 8.0, but for now (and for older versions we'll just * use this mutex. - * + * ---- */ static pthread_mutex_t slon_connect_lock = PTHREAD_MUTEX_INITIALIZER; -/* - * ---------- slon_connectdb ---------- +/* ---------- + * slon_connectdb + * ---------- */ SlonConn * slon_connectdb(char *conninfo, char *symname) @@ -103,8 +104,9 @@ } -/* - * ---------- slon_disconnectdb ---------- +/* ---------- + * slon_disconnectdb + * ---------- */ void slon_disconnectdb(SlonConn * conn) @@ -124,8 +126,9 @@ } -/* - * ---------- slon_make_dummyconn ---------- +/* ---------- + * slon_make_dummyconn + * ---------- */ SlonConn * slon_make_dummyconn(char *symname) @@ -155,8 +158,9 @@ } -/* - * ---------- slon_free_dummyconn ---------- +/* ---------- + * slon_free_dummyconn + * ---------- */ void slon_free_dummyconn(SlonConn * conn) @@ -179,10 +183,11 @@ } -/* - * ---------- db_getLocalNodeId +/* ---------- + * db_getLocalNodeId * - * Query a connection for the value of sequence sl_local_node_id ---------- + * Query a connection for the value of sequence sl_local_node_id + * ---------- */ int db_getLocalNodeId(PGconn *conn) @@ -323,13 +328,15 @@ } -/* - * ---------- slon_mkquery +/* ---------- + * slon_mkquery * * A simple query formatting and quoting function using dynamic string buffer - * allocation. Similar to sprintf() it uses formatting symbols: %s - * tring argument %q Quoted literal (\ and ' will be escaped) %d - * nteger argument ---------- + * allocation. Similar to sprintf() it uses formatting symbols: + * %s String argument + * %q Quoted literal (\ and ' will be escaped) + * %d Integer argument + * ---------- */ int slon_mkquery(SlonDString * dsp, char *fmt,...) @@ -348,10 +355,11 @@ } -/* - * ---------- slon_appendquery +/* ---------- + * slon_appendquery * - * Append query string material to an existing dynamic string. ---------- + * Append query string material to an existing dynamic string. + * ---------- */ int slon_appendquery(SlonDString * dsp, char *fmt,...) @@ -368,10 +376,11 @@ } -/* - * ---------- slon_appendquery_int +/* ---------- + * slon_appendquery_int * - * Implementation of slon_mkquery() and slon_appendquery(). ---------- + * Implementation of slon_mkquery() and slon_appendquery(). + * ---------- */ static int slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap) Index: slon.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v retrieving revision 1.60 retrieving revision 1.61 diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.60 -r1.61 --- src/slon/slon.c +++ src/slon/slon.c @@ -36,8 +36,9 @@ #include "confoptions.h" -/* - * ---------- Global data ---------- +/* ---------- + * Global data + * ---------- */ #ifndef WIN32 #define SLON_WATCHDOG_NORMAL 0 @@ -51,8 +52,9 @@ pthread_mutex_t slon_wait_listen_lock; pthread_cond_t slon_wait_listen_cond; -/* - * ---------- Local data ---------- +/* ---------- + * Local data + * ---------- */ static pthread_t local_event_thread; static pthread_t local_cleanup_thread; @@ -66,6 +68,7 @@ static char *const *main_argv; static void SlonMain(void); + #ifndef WIN32 static void SlonWatchdog(void); static void sighandler(int signo); @@ -78,7 +81,12 @@ int child_status; -void Usage(char * const argv[]) +/* ---------- + * Usage + * ---------- + */ +void +Usage(char *const argv[]) { fprintf(stderr, "usage: %s [options] clustername conninfo\n", argv[0]); fprintf(stderr, "\n"); @@ -106,8 +114,9 @@ } -/* - * ---------- main ---------- +/* ---------- + * main + * ---------- */ int main(int argc, char *const argv[]) @@ -296,11 +305,13 @@ } #ifdef WIN32 + /* * Startup the network subsystem, in case our libpq doesn't */ err = WSAStartup(MAKEWORD(1, 1), &wsaData); - if (err != 0) { + if (err != 0) + { slon_log(SLON_FATAL, "main: Cannot start the network subsystem - %d\n", err); exit(-1); } @@ -331,9 +342,11 @@ slon_exit(-1); } - /* There is no watchdog process on win32. We delegate restarting and - * other such tasks to the Service Control Manager. And win32 doesn't - * support signals, so we don't need to catch them... */ + /* + * There is no watchdog process on win32. We delegate restarting and other + * such tasks to the Service Control Manager. And win32 doesn't support + * signals, so we don't need to catch them... + */ #ifndef WIN32 SlonWatchdog(); #else @@ -343,11 +356,17 @@ } -static void SlonMain(void) +/* ---------- + * SlonMain + * ---------- + */ +static void +SlonMain(void) { PGresult *res; SlonDString query; - int i,n; + int i, + n; char pipe_c; PGconn *startup_conn; @@ -431,7 +450,6 @@ slon_log(SLON_FATAL, "main: SIGQUIT signal handler setup failed -(%d) %s\n", errno,strerror(errno)); slon_abort(); } - #endif slon_log(SLON_DEBUG2, "main: main process started\n"); @@ -573,7 +591,8 @@ PQclear(res); /* - * Read configuration table sl_subscribe - only subscriptions for local node + * Read configuration table sl_subscribe - only subscriptions for local + * node */ slon_mkquery(&query, "select sub_set, sub_provider, sub_forward, sub_active " @@ -651,10 +670,10 @@ slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n"); /* - * Create the local event thread that monitors the local node - * for administrative events to adjust the configuration at - * runtime. We wait here until the local listen thread has - * checked that there is no other slon daemon running. + * Create the local event thread that monitors the local node for + * administrative events to adjust the configuration at runtime. We wait + * here until the local listen thread has checked that there is no other + * slon daemon running. */ pthread_mutex_lock(&slon_wait_listen_lock); if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0) @@ -700,6 +719,7 @@ slon_retry(); } #endif + /* * Wait until the scheduler has shut down all remote connections */ @@ -746,9 +766,15 @@ } #ifndef WIN32 -static void SlonWatchdog(void) +/* ---------- + * SlonWatchdog + * ---------- + */ +static void +SlonWatchdog(void) { pid_t pid; + #if !defined(CYGWIN) && !defined(WIN32) struct sigaction act; #endif @@ -860,6 +886,10 @@ } +/* ---------- + * sighandler + * ---------- + */ static void sighandler(int signo) { @@ -900,6 +930,10 @@ } } +/* ---------- + * slon_terminate_worker + * ---------- + */ void slon_terminate_worker() { @@ -916,6 +950,10 @@ } #endif +/* ---------- + * slon_exit + * ---------- + */ void slon_exit(int code) { Index: slonik.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.h,v retrieving revision 1.24 retrieving revision 1.25 diff -Lsrc/slonik/slonik.h -Lsrc/slonik/slonik.h -u -w -r1.24 -r1.25 --- src/slonik/slonik.h +++ src/slonik/slonik.h @@ -556,7 +556,6 @@ #ifdef HAVE_PQSETNOTICERECEIVER void db_notice_recv(void *arg, const PGresult *res); - #else void db_notice_recv(void *arg, const char *msg); #endif Index: dbutil.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/dbutil.c,v retrieving revision 1.8 retrieving revision 1.9 diff -Lsrc/slonik/dbutil.c -Lsrc/slonik/dbutil.c -u -w -r1.8 -r1.9 --- src/slonik/dbutil.c +++ src/slonik/dbutil.c @@ -70,7 +70,6 @@ PQresultErrorMessage(res)); } } - #else /* !HAVE_PQSETNOTICERECEIVER */ /* ---------- Index: slonik.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.c,v retrieving revision 1.52 retrieving revision 1.53 diff -Lsrc/slonik/slonik.c -Lsrc/slonik/slonik.c -u -w -r1.52 -r1.53 --- src/slonik/slonik.c +++ src/slonik/slonik.c @@ -81,7 +81,8 @@ replace_token(char *resout, char *lines, const char *token, const char *replacement) { int numlines = 1; - int i,o; + int i, + o; char result_set[4096]; int toklen, replen; @@ -123,13 +124,15 @@ * expanded? If so there is more complete code available in the * PostgreSQL backend that could be adapted. */ -char *get_sharepath(const char *path) +char * +get_sharepath(const char *path) { int i; char *result; result = (char *)malloc(MAX_PATH+1); - if (!result) { + if (!result) + { printf("memory allocation failure.\n"); exit(1); } @@ -1918,7 +1921,8 @@ #define ROWIDBITS "_Slony-I__rowID" - if (strlen(stmt->script->clustername) + strlen("ROWIDBITS") > NAMEDATALEN) { + if (strlen(stmt->script->clustername) + strlen("ROWIDBITS") > NAMEDATALEN) + { printf ("Cluster name %s too long to permit creation of columns containing %s - maximum name length: %d\n", stmt->script->clustername, ROWIDBITS, NAMEDATALEN); return -1;
- Previous message: [Slony1-commit] By wieck: Restructuring of the watchdog process structure.
- Next message: [Slony1-commit] By wieck: Adjust .cvsignore to now generated files.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list