Fri Sep 24 23:12:44 PDT 2004
- Previous message: [Slony1-commit] By darcyb: Ok it's finaly here, a slon config file.
- Next message: [Slony1-commit] By darcyb: Opps copy/paste function prototypeing is bad mojo, fix
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
add syslog support, pid file writing and other bells
Modified Files:
--------------
slony1-engine/src/slon:
cleanup_thread.c (r1.16 -> r1.17)
confoptions.c (r1.1 -> r1.2)
confoptions.h (r1.1 -> r1.2)
dbutils.c (r1.11 -> r1.12)
misc.c (r1.12 -> r1.13)
remote_listen.c (r1.15 -> r1.16)
remote_worker.c (r1.61 -> r1.62)
runtime_config.c (r1.19 -> r1.20)
scheduler.c (r1.15 -> r1.16)
slon.c (r1.29 -> r1.30)
sync_thread.c (r1.12 -> r1.13)
-------------- next part --------------
Index: confoptions.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.1 -r1.2
--- src/slon/confoptions.h
+++ src/slon/confoptions.h
@@ -11,6 +11,8 @@
extern double real_placeholder;
extern char *string_placeholder;
+extern char *pid_file;
+
extern int vac_frequency;
extern int slon_log_level;
@@ -19,6 +21,9 @@
extern int sync_group_maxsize;
+char *Syslog_ident;
+char *Syslog_facility;
+int Use_syslog;
bool logpid;
bool logtimestamp;
@@ -145,7 +150,23 @@
0,
100
},
-
+#ifdef HAVE_SYSLOG
+ {
+ {
+ (const char *)"syslog",
+ gettext_noop("Uses syslog for logging."),
+ gettext_noop("If this parameter is 1, messages go both to syslog "
+ "and the standard output. A value of 2 sends output only to syslog. "
+ "(Some messages will still go to the standard output/error.) The "
+ "default is 0, which means syslog is off."),
+ SLON_C_INT
+ },
+ &Use_syslog,
+ 0,
+ 0,
+ 2
+ },
+#endif
NULL
};
static struct config_int ConfigureNamesBool[] =
@@ -202,6 +223,39 @@
&string_placeholder, /* var_name */
"default" /* default value */
},
+ {
+ {
+ (const char *)"pid_file",
+ gettext_noop("Where to write the pid file"),
+ NULL,
+ SLON_C_STRING
+ },
+ &pid_file,
+ NULL
+ },
+#ifdef HAVE_SYSLOG
+ {
+ {
+ (const char *)"syslog_facility",
+ gettext_noop("Sets the syslog \"facility\" to be used when syslog enabled."),
+ gettext_noop("Valid values are LOCAL0, LOCAL1, LOCAL2, LOCAL3, "
+ "LOCAL4, LOCAL5, LOCAL6, LOCAL7."),
+ SLON_C_STRING
+ },
+ &Syslog_ident,
+ "LOCAL0"
+ },
+ {
+ {
+ (const char *)"syslog_ident",
+ gettext_noop("Sets the program name used to identify slon messages in syslog."),
+ NULL,
+ SLON_C_STRING
+ },
+ &Syslog_ident,
+ "slon"
+ },
+#endif
NULL
};
Index: scheduler.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/scheduler.c,v
retrieving revision 1.15
retrieving revision 1.16
diff -Lsrc/slon/scheduler.c -Lsrc/slon/scheduler.c -u -w -r1.15 -r1.16
--- src/slon/scheduler.c
+++ src/slon/scheduler.c
@@ -37,9 +37,8 @@
#endif
-/* ----------
- * Static data
- * ----------
+/*
+ * ---------- Static data ----------
*/
static int sched_status = SCHED_STATUS_OK;
@@ -59,9 +58,8 @@
static sigset_t sched_sigset;
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
*/
static void *sched_mainloop(void *);
static void sched_sighandler(int signo);
@@ -70,14 +68,13 @@
static void sched_remove_fdset(int fd, fd_set *fds);
-/* ----------
- * sched_start_mainloop
+/*
+ * ---------- sched_start_mainloop
*
* Called from main() before starting up any worker thread.
*
- * This will spawn the event scheduling thread that does the
- * central select(2) system call.
- * ----------
+ * This will spawn the event scheduling thread that does the central select(2)
+ * system call. ----------
*/
int
sched_start_mainloop(void)
@@ -88,9 +85,9 @@
sched_main_thread = pthread_self();
/*
- * Block signals. Since sched_start_mainloop() is called before
- * any other thread is created, this will be inherited by all
- * threads in the system.
+ * Block signals. Since sched_start_mainloop() is called before any
+ * other thread is created, this will be inherited by all threads in
+ * the system.
*/
sigemptyset(&sched_sigset);
sigaddset(&sched_sigset, SIGHUP);
@@ -106,7 +103,6 @@
perror("sched_start_mainloop: pthread_mutex_lock()");
return -1;
}
-
/*
* Start the scheduler thread
*/
@@ -115,7 +111,6 @@
perror("sched_start_mainloop: pthread_create()");
return -1;
}
-
/*
* When the scheduler is ready, he'll signal the scheduler cond
*/
@@ -124,7 +119,6 @@
perror("sched_start_mainloop: pthread_cond_wait()");
return -1;
}
-
/*
* Release the scheduler lock
*/
@@ -133,7 +127,6 @@
perror("sched_start_mainloop: pthread_mutex_unlock()");
return -1;
}
-
/*
* Check for errors
*/
@@ -147,13 +140,12 @@
}
-/* ----------
- * sched_wait_mainloop
+/*
+ * ---------- sched_wait_mainloop
*
- * Called from main() after all working threads according to the
- * initial configuration are started. Will wait until the scheduler
- * mainloop terminates.
- * ----------
+ * Called from main() after all working threads according to the initial
+ * configuration are started. Will wait until the scheduler mainloop
+ * terminates. ----------
*/
int
sched_wait_mainloop(void)
@@ -174,11 +166,13 @@
switch (signo)
{
- case SIGHUP: sched_sighuphandler(signo);
+ case SIGHUP:
+ sched_sighuphandler(signo);
break;
case SIGINT:
- case SIGTERM: sched_sighandler(signo);
+ case SIGTERM:
+ sched_sighandler(signo);
break;
}
@@ -190,19 +184,17 @@
perror("sched_wait_mainloop: pthread_join()");
return -1;
}
-
return 0;
}
-/* ----------
- * sched_wait_conn
+/*
+ * ---------- sched_wait_conn
*
* Assumes that the thread holds the lock on conn->conn_lock.
*
- * Adds the connection to the central wait queue and wakes up
- * the scheduler thread to reloop onto the select(2) call.
- * ----------
+ * Adds the connection to the central wait queue and wakes up the scheduler
+ * thread to reloop onto the select(2) call. ----------
*/
int
sched_wait_conn(SlonConn *conn, int condition)
@@ -218,7 +210,6 @@
pthread_mutex_unlock(&sched_master_lock);
return -1;
}
-
/*
* Remember the event we're waiting for and add the database
* connection to the fdset(s)
@@ -235,8 +226,8 @@
DLLIST_ADD_HEAD(sched_waitqueue_head, sched_waitqueue_tail, conn);
/*
- * Give the scheduler thread a heads up, release the master lock
- * and wait for it to tell us that the event we're waiting for happened.
+ * Give the scheduler thread a heads up, release the master lock and
+ * wait for it to tell us that the event we're waiting for happened.
*/
if (write(sched_wakeuppipe[1], "x", 1) < 0)
{
@@ -264,14 +255,13 @@
}
-/* ----------
- * sched_wait_time
+/*
+ * ---------- sched_wait_time
*
* Assumes that the thread holds the lock on conn->conn_lock.
*
- * Like sched_wait_conn() but with a timeout. Can be called without
- * any read/write condition to wait for to resemble a pure timeout
- * mechanism.
+ * Like sched_wait_conn() but with a timeout. Can be called without any
+ * read/write condition to wait for to resemble a pure timeout mechanism.
* ----------
*/
int
@@ -294,11 +284,10 @@
}
-/* ----------
- * sched_msleep
+/*
+ * ---------- sched_msleep
*
- * Use the schedulers event loop to sleep for msec milliseconds.
- * ----------
+ * Use the schedulers event loop to sleep for msec milliseconds. ----------
*/
int
sched_msleep(SlonNode *node, int msec)
@@ -311,8 +300,7 @@
{
snprintf(dummyconn_name, 64, "msleep_node_%d", node->no_id);
conn = slon_make_dummyconn(dummyconn_name);
- }
- else
+ } else
conn = slon_make_dummyconn("msleep_local");
rc = sched_wait_time(conn, 0, msec);
@@ -322,11 +310,10 @@
}
-/* ----------
- * sched_get_status
+/*
+ * ---------- sched_get_status
*
- * Return the current scheduler status in a thread safe fashion
- * ----------
+ * Return the current scheduler status in a thread safe fashion ----------
*/
int
sched_get_status(void)
@@ -340,13 +327,12 @@
}
-/* ----------
- * sched_wakeup_node
+/*
+ * ---------- sched_wakeup_node
*
- * Wakeup the threads (listen and worker) of one or all remote
- * nodes to cause them rechecking the current runtime status or
- * adjust their configuration to changes.
- * ----------
+ * Wakeup the threads (listen and worker) of one or all remote nodes to cause
+ * them rechecking the current runtime status or adjust their configuration
+ * to changes. ----------
*/
int
sched_wakeup_node (int no_id)
@@ -382,7 +368,6 @@
slon_abort();
}
}
-
pthread_mutex_unlock(&sched_master_lock);
remoteWorker_wakeup(no_id);
@@ -394,11 +379,10 @@
}
-/* ----------
- * sched_mainloop
+/*
+ * ---------- sched_mainloop
*
- * The thread handling the master scheduling.
- * ----------
+ * The thread handling the master scheduling. ----------
*/
static void *
sched_mainloop(void *dummy)
@@ -413,8 +397,8 @@
int i;
/*
- * Grab the scheduler master lock. This will wait until the
- * main thread acutally blocks on the master cond.
+ * Grab the scheduler master lock. This will wait until the main
+ * thread acutally blocks on the master cond.
*/
pthread_mutex_lock(&sched_master_lock);
@@ -425,8 +409,8 @@
FD_ZERO(&sched_fdset_write);
/*
- * Create a pipe used by the main thread to cleanly
- * wakeup the scheduler on signals.
+ * Create a pipe used by the main thread to cleanly wakeup the
+ * scheduler on signals.
*/
if (pipe(sched_wakeuppipe) < 0)
{
@@ -438,8 +422,8 @@
sched_add_fdset(sched_wakeuppipe[0], &sched_fdset_read);
/*
- * Done with all initialization. Let the main thread go
- * ahead and get everyone else dancing.
+ * Done with all initialization. Let the main thread go ahead and get
+ * everyone else dancing.
*/
pthread_cond_signal(&sched_master_cond);
@@ -465,9 +449,9 @@
}
/*
- * Check if any of the connections in the wait queue
- * have reached there timeout. While doing so, we also
- * remember the closest timeout in the future.
+ * Check if any of the connections in the wait queue have
+ * reached there timeout. While doing so, we also remember
+ * the closest timeout in the future.
*/
tv = NULL;
gettimeofday(&now, NULL);
@@ -498,12 +482,11 @@
conn = next;
continue;
}
-
if (conn->condition & SCHED_WAIT_TIMEOUT)
{
/*
- * This connection has a timeout. Calculate the
- * time until that.
+ * This connection has a timeout. Calculate
+ * the time until that.
*/
timeout.tv_sec = conn->timeout.tv_sec - now.tv_sec;
timeout.tv_usec = conn->timeout.tv_usec - now.tv_usec;
@@ -520,10 +503,11 @@
(timeout.tv_sec == 0 && timeout.tv_usec < 20000))
{
/*
- * Remove the connection from the wait queue. We
- * consider everything closer than 20 msec being
- * elapsed to avoid a full scheduler round just
- * for one kernel tick.
+ * Remove the connection from the
+ * wait queue. We consider everything
+ * closer than 20 msec being elapsed
+ * to avoid a full scheduler round
+ * just for one kernel tick.
*/
DLLIST_REMOVE(sched_waitqueue_head,
sched_waitqueue_tail, conn);
@@ -538,11 +522,11 @@
pthread_mutex_lock(&(conn->conn_lock));
pthread_cond_signal(&(conn->conn_cond));
pthread_mutex_unlock(&(conn->conn_lock));
- }
- else
+ } else
{
/*
- * Timeout not elapsed. Remember the nearest.
+ * Timeout not elapsed. Remember the
+ * nearest.
*/
if (tv == NULL ||
timeout.tv_sec < min_timeout.tv_sec ||
@@ -555,7 +539,6 @@
}
}
}
-
conn = next;
}
@@ -575,7 +558,6 @@
sched_status = SCHED_STATUS_ERROR;
break;
}
-
/*
* Check the special pipe for a heads up.
*/
@@ -591,7 +573,6 @@
break;
}
}
-
/*
* Check all remaining connections if the IO condition the
* thread is waiting for has occured.
@@ -624,7 +605,6 @@
continue;
}
}
-
if (conn->condition & SCHED_WAIT_SOCK_WRITE)
{
if (FD_ISSET(PQsocket(conn->dbconn), &wfds))
@@ -650,16 +630,15 @@
continue;
}
}
-
conn = conn->next;
}
}
/*
- * If we reach here the scheduler runmode has been changed by
- * by the main threads signal handler. We currently hold the
- * master lock. First we close the scheduler heads-up socket
- * pair so nobody will think we're listening any longer.
+ * If we reach here the scheduler runmode has been changed by by the
+ * main threads signal handler. We currently hold the master lock.
+ * First we close the scheduler heads-up socket pair so nobody will
+ * think we're listening any longer.
*/
close(sched_wakeuppipe[0]);
close(sched_wakeuppipe[1]);
@@ -697,17 +676,16 @@
}
-/* ----------
- * sched_sighandler
+/*
+ * ---------- sched_sighandler
*
- * After starting up the sole purpose of the main thread (the original
- * process) is to respond to signals while waiting for the scheduler to
- * finish. The signal handler is here because it is actually
- * sched_start_mainloop() that arranges for the signal handling and
- * sched_wait_mainloop() that enables them. And the only one really
- * interested in the signals is the scheduler thread ... but that's doing
- * select(2) mainly and we have to avoid race conditions with signals.
- * ----------
+ * After starting up the sole purpose of the main thread (the original process)
+ * is to respond to signals while waiting for the scheduler to finish. The
+ * signal handler is here because it is actually sched_start_mainloop() that
+ * arranges for the signal handling and sched_wait_mainloop() that enables
+ * them. And the only one really interested in the signals is the scheduler
+ * thread ... but that's doing select(2) mainly and we have to avoid race
+ * conditions with signals. ----------
*/
static void
sched_sighandler(int signo)
@@ -721,7 +699,6 @@
slon_log(SLON_FATAL, "sched_sighandler: called in non-main thread\n");
slon_abort();
}
-
/*
* Set the scheduling status to shutdown
*/
@@ -743,7 +720,6 @@
pthread_mutex_unlock(&sched_master_lock);
exit(-1);
}
-
/*
* Unlock the master mutex
*/
@@ -759,12 +735,11 @@
}
-/* ----------
- * sched_add_fdset
+/*
+ * ---------- sched_add_fdset
*
- * Add a file descriptor to one of the global scheduler sets and
- * adjust sched_numfd accordingly.
- * ----------
+ * Add a file descriptor to one of the global scheduler sets and adjust
+ * sched_numfd accordingly. ----------
*/
static void
sched_add_fdset(int fd, fd_set *fds)
@@ -775,12 +750,11 @@
}
-/* ----------
- * sched_add_fdset
+/*
+ * ---------- sched_add_fdset
*
- * Remove a file descriptor from one of the global scheduler sets and
- * adjust sched_numfd accordingly.
- * ----------
+ * Remove a file descriptor from one of the global scheduler sets and adjust
+ * sched_numfd accordingly. ----------
*/
static void
sched_remove_fdset(int fd, fd_set *fds)
@@ -798,5 +772,3 @@
}
}
}
-
-
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.61
retrieving revision 1.62
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.61 -r1.62
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -28,9 +28,8 @@
#include "slon.h"
-/* ----------
- * Local definitions
- * ----------
+/*
+ * ---------- Local definitions ----------
*/
/*
@@ -45,7 +44,8 @@
* Message structure resulting from a remote event
*/
typedef struct SlonWorkMsg_event_s SlonWorkMsg_event;
-struct SlonWorkMsg_event_s {
+struct SlonWorkMsg_event_s
+{
int msg_type;
SlonWorkMsg_event *prev;
SlonWorkMsg_event *next;
@@ -75,7 +75,8 @@
* Message structure resulting from a remote confirm
*/
typedef struct SlonWorkMsg_confirm_s SlonWorkMsg_confirm;
-struct SlonWorkMsg_confirm_s {
+struct SlonWorkMsg_confirm_s
+{
int msg_type;
SlonWorkMsg_confirm *prev;
SlonWorkMsg_confirm *next;
@@ -90,7 +91,8 @@
/*
* Generic message header
*/
-struct SlonWorkMsg_s {
+struct SlonWorkMsg_s
+{
int msg_type;
SlonWorkMsg *prev;
SlonWorkMsg *next;
@@ -103,7 +105,8 @@
typedef struct WorkerGroupLine_s WorkerGroupLine;
-struct ProviderSet_s {
+struct ProviderSet_s
+{
int set_id;
int sub_forward;
@@ -112,7 +115,8 @@
};
-typedef enum {
+typedef enum
+{
SLON_WG_IDLE,
SLON_WG_BUSY,
SLON_WG_DONE,
@@ -121,14 +125,16 @@
} WorkGroupStatus;
-typedef enum {
+typedef enum
+{
SLON_WGLC_ACTION,
SLON_WGLC_DONE,
SLON_WGLC_ERROR
} WorkGroupLineCode;
-struct ProviderInfo_s {
+struct ProviderInfo_s
+{
int no_id;
char *pa_conninfo;
int pa_connretry;
@@ -150,7 +156,8 @@
};
-struct WorkerGroupData_s {
+struct WorkerGroupData_s
+{
SlonNode *node;
char *tab_forward;
@@ -173,7 +180,8 @@
};
-struct WorkerGroupLine_s {
+struct WorkerGroupLine_s
+{
WorkGroupLineCode code;
ProviderInfo *provider;
SlonDString data;
@@ -184,10 +192,11 @@
/*
- * Global status for all remote worker threads, remembering the
- * last seen confirmed sequence number.
+ * Global status for all remote worker threads, remembering the last seen
+ * confirmed sequence number.
*/
-struct node_confirm_status {
+struct node_confirm_status
+{
int con_origin;
int con_received;
int64 con_seqno;
@@ -202,32 +211,36 @@
int sync_group_maxsize;
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
*/
-static void adjust_provider_info(SlonNode *node,
+static void
+adjust_provider_info(SlonNode * node,
WorkerGroupData *wd, int cleanup);
-static int query_execute(SlonNode *node, PGconn *dbconn,
+static int
+query_execute(SlonNode * node, PGconn * dbconn,
SlonDString *dsp);
-static void query_append_event(SlonDString *dsp,
+static void
+query_append_event(SlonDString * dsp,
SlonWorkMsg_event *event);
-static void store_confirm_forward(SlonNode *node, SlonConn *conn,
+static void
+store_confirm_forward(SlonNode * node, SlonConn * conn,
SlonWorkMsg_confirm *confirm);
static int64 get_last_forwarded_confirm(int origin, int receiver);
-static int copy_set(SlonNode *node, SlonConn *local_conn, int set_id,
+static int
+copy_set(SlonNode * node, SlonConn * local_conn, int set_id,
SlonWorkMsg_event *event);
-static int sync_event(SlonNode *node, SlonConn *local_conn,
+static int
+sync_event(SlonNode * node, SlonConn * local_conn,
WorkerGroupData *wd,SlonWorkMsg_event *event);
static void *sync_helper(void *cdata);
-/* ----------
- * slon_remoteWorkerThread
+/*
+ * ---------- slon_remoteWorkerThread
*
- * Listen for events on the local database connection. This means,
- * events generated by the local node only.
- * ----------
+ * Listen for events on the local database connection. This means, events
+ * generated by the local node only. ----------
*/
void *
remoteWorkerThread_main(void *cdata)
@@ -278,9 +291,8 @@
local_dbconn = local_conn->dbconn;
/*
- * Put the connection into replication mode and listen on the
- * special relation telling what node daemon this connection
- * belongs to.
+ * Put the connection into replication mode and listen on the special
+ * relation telling what node daemon this connection belongs to.
*/
slon_mkquery(&query1,
"select %s.setSessionRole('_%s', 'slon'); "
@@ -296,8 +308,8 @@
while (true)
{
/*
- * If we got the special WMSG_WAKEUP, check the current runmode
- * of the scheduler and the status of our node.
+ * If we got the special WMSG_WAKEUP, check the current
+ * runmode of the scheduler and the status of our node.
*/
if (sched_get_status() != SCHED_STATUS_OK)
break;
@@ -315,12 +327,10 @@
adjust_provider_info(node, wd, false);
curr_config = rtcfg_seq_get();
}
-
rtcfg_unlock();
check_config = false;
}
-
/*
* Receive the next message from the queue. If there is no
* one present, wait on the condition variable.
@@ -355,7 +365,6 @@
check_config = true;
continue;
}
-
/*
* Process confirm messages.
*/
@@ -369,7 +378,6 @@
free(msg);
continue;
}
-
/*
* This must be an event message then.
*/
@@ -380,7 +388,6 @@
node->no_id, msg->msg_type);
slon_abort();
}
-
event_ok = true;
event = (SlonWorkMsg_event *)msg;
sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
@@ -393,10 +400,10 @@
/*
* Construct the queries to begin a transaction, notify on
* the event and confirm relations, insert the event into our
- * local sl_event table and confirm it in our local sl_confirm
- * table. When this transaction commits, every other remote
- * node listening for events with us as a provider will pick
- * up the news.
+ * local sl_event table and confirm it in our local
+ * sl_confirm table. When this transaction commits, every
+ * other remote node listening for events with us as a
+ * provider will pick up the news.
*/
slon_mkquery(&query1,
"begin transaction; "
@@ -439,19 +446,19 @@
}
pthread_mutex_unlock(&(node->message_lock));
}
-
while (true)
{
/*
- * Execute the forwarding and notify stuff, but
- * do not commit the transaction yet.
+ * Execute the forwarding and notify stuff,
+ * but do not commit the transaction yet.
*/
if (query_execute(node, local_dbconn, &query1) < 0)
slon_abort();
/*
- * Process the sync and apply the replication data.
- * If successful, exit this loop and commit the transaction.
+ * Process the sync and apply the replication
+ * data. If successful, exit this loop and
+ * commit the transaction.
*/
seconds = sync_event(node, local_conn, wd, event);
if (seconds == 0)
@@ -459,10 +466,9 @@
rc = SCHED_STATUS_OK;
break;
}
-
/*
- * Something went wrong. Rollback and try again
- * after the specified timeout.
+ * Something went wrong. Rollback and try
+ * again after the specified timeout.
*/
slon_mkquery(&query2, "rollback transaction");
if (query_execute(node, local_dbconn, &query2) < 0)
@@ -491,22 +497,22 @@
if (query_execute(node, local_dbconn, &query1) < 0)
slon_abort();
- }
- else
+ } else
{
/*
- * Avoid deadlock problems during configuration changes
- * by locking the central confiuration lock right from
- * the start.
+ * Avoid deadlock problems during configuration
+ * changes by locking the central confiuration lock
+ * right from the start.
*/
slon_appendquery(&query1,
"lock table %s.sl_config_lock; ",
rtcfg_namespace);
/*
- * Simple configuration events. Call the corresponding
- * runtime config function, add the query to call the
- * configuration event specific stored procedure.
+ * Simple configuration events. Call the
+ * corresponding runtime config function, add the
+ * query to call the configuration event specific
+ * stored procedure.
*/
if (strcmp(event->ev_type, "STORE_NODE") == 0)
{
@@ -520,8 +526,7 @@
"select %s.storeNode_int(%d, '%q'); ",
rtcfg_namespace,
no_id, no_comment);
- }
- else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
+ } else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
{
int no_id = (int) strtol(event->ev_data1, NULL, 10);
@@ -532,8 +537,7 @@
"select %s.enableNode_int(%d); ",
rtcfg_namespace,
no_id);
- }
- else if (strcmp(event->ev_type, "DROP_NODE") == 0)
+ } else if (strcmp(event->ev_type, "DROP_NODE") == 0)
{
int no_id = (int) strtol(event->ev_data1, NULL, 10);
@@ -546,10 +550,11 @@
no_id);
/*
- * If this is our own nodeid, then calling disableNode_int()
- * will destroy the whole configuration including the
- * entire schema. Make sure we call just that and get
- * out of here ASAP!
+ * If this is our own nodeid, then calling
+ * disableNode_int() will destroy the whole
+ * configuration including the entire schema.
+ * Make sure we call just that and get out of
+ * here ASAP!
*/
if (no_id == rtcfg_nodeid)
{
@@ -572,15 +577,14 @@
slon_abort();
}
-
/*
- * this is a remote node. Arrange for daemon restart.
+ * this is a remote node. Arrange for daemon
+ * restart.
*/
slon_appendquery(&query1,
"notify \"_%s_Restart\"; ",
rtcfg_cluster_name);
- }
- else if (strcmp(event->ev_type, "STORE_PATH") == 0)
+ } else if (strcmp(event->ev_type, "STORE_PATH") == 0)
{
int pa_server = (int) strtol(event->ev_data1, NULL, 10);
int pa_client = (int) strtol(event->ev_data2, NULL, 10);
@@ -594,8 +598,7 @@
"select %s.storePath_int(%d, %d, '%q', %d); ",
rtcfg_namespace,
pa_server, pa_client, pa_conninfo, pa_connretry);
- }
- else if (strcmp(event->ev_type, "DROP_PATH") == 0)
+ } else if (strcmp(event->ev_type, "DROP_PATH") == 0)
{
int pa_server = (int) strtol(event->ev_data1, NULL, 10);
int pa_client = (int) strtol(event->ev_data2, NULL, 10);
@@ -607,8 +610,7 @@
"select %s.dropPath_int(%d, %d); ",
rtcfg_namespace,
pa_server, pa_client);
- }
- else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
+ } else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
{
int li_origin = (int) strtol(event->ev_data1, NULL, 10);
int li_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -621,8 +623,7 @@
"select %s.storeListen_int(%d, %d, %d); ",
rtcfg_namespace,
li_origin, li_provider, li_receiver);
- }
- else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
+ } else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
{
int li_origin = (int) strtol(event->ev_data1, NULL, 10);
int li_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -635,8 +636,7 @@
"select %s.dropListen_int(%d, %d, %d); ",
rtcfg_namespace,
li_origin, li_provider, li_receiver);
- }
- else if (strcmp(event->ev_type, "STORE_SET") == 0)
+ } else if (strcmp(event->ev_type, "STORE_SET") == 0)
{
int set_id = (int) strtol(event->ev_data1, NULL, 10);
int set_origin = (int) strtol(event->ev_data2, NULL, 10);
@@ -649,8 +649,7 @@
"select %s.storeSet_int(%d, %d, '%q'); ",
rtcfg_namespace,
set_id, set_origin, set_comment);
- }
- else if (strcmp(event->ev_type, "DROP_SET") == 0)
+ } else if (strcmp(event->ev_type, "DROP_SET") == 0)
{
int set_id = (int) strtol(event->ev_data1, NULL, 10);
@@ -659,8 +658,7 @@
slon_appendquery(&query1,
"select %s.dropSet_int(%d); ",
rtcfg_namespace, set_id);
- }
- else if (strcmp(event->ev_type, "MERGE_SET") == 0)
+ } else if (strcmp(event->ev_type, "MERGE_SET") == 0)
{
int set_id = (int) strtol(event->ev_data1, NULL, 10);
int add_id = (int) strtol(event->ev_data2, NULL, 10);
@@ -671,26 +669,23 @@
"select %s.mergeSet_int(%d, %d); ",
rtcfg_namespace,
set_id, add_id);
- }
- else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
+ } else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
{
/*
* Nothing to do ATM ... we don't support
* adding tables to subscribed sets yet and
- * table information is not maintained in
- * the runtime configuration.
+ * table information is not maintained in the
+ * runtime configuration.
*/
- }
- else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
+ } else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
{
/*
* Nothing to do ATM ... we don't support
- * adding sequences to subscribed sets yet and
- * sequences information is not maintained in
- * the runtime configuration.
+ * adding sequences to subscribed sets yet
+ * and sequences information is not
+ * maintained in the runtime configuration.
*/
- }
- else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
+ } else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
{
int trig_tabid = (int) strtol(event->ev_data1, NULL, 10);
char *trig_tgname = event->ev_data2;
@@ -699,8 +694,7 @@
"select %s.storeTrigger_int(%d, '%q'); ",
rtcfg_namespace,
trig_tabid, trig_tgname);
- }
- else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
+ } else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
{
int trig_tabid = (int) strtol(event->ev_data1, NULL, 10);
char *trig_tgname = event->ev_data2;
@@ -709,8 +703,7 @@
"select %s.dropTrigger_int(%d, '%q'); ",
rtcfg_namespace,
trig_tabid, trig_tgname);
- }
- else if (strcmp(event->ev_type, "MOVE_SET") == 0)
+ } else if (strcmp(event->ev_type, "MOVE_SET") == 0)
{
int set_id = (int) strtol(event->ev_data1, NULL, 10);
int old_origin = (int) strtol(event->ev_data2, NULL, 10);
@@ -719,10 +712,11 @@
PGresult *res;
/*
- * Move set is a little more tricky. The stored
- * proc does a lot of rearranging in the provider
- * chain. To catch up with that, we need to execute
- * it now and select the resulting provider for us.
+ * Move set is a little more tricky. The
+ * stored proc does a lot of rearranging in
+ * the provider chain. To catch up with that,
+ * we need to execute it now and select the
+ * resulting provider for us.
*/
slon_appendquery(&query1,
"select %s.moveSet_int(%d, %d, %d); ",
@@ -757,8 +751,7 @@
rtcfg_moveSet(set_id, old_origin, new_origin, sub_provider);
dstring_reset(&query1);
- }
- else if (strcmp(event->ev_type, "FAILOVER_SET") == 0)
+ } else if (strcmp(event->ev_type, "FAILOVER_SET") == 0)
{
int failed_node = (int) strtol(event->ev_data1, NULL, 10);
int backup_node = (int) strtol(event->ev_data2, NULL, 10);
@@ -770,8 +763,7 @@
"select %s.failoverSet_int(%d, %d, %d); ",
rtcfg_namespace,
failed_node, backup_node, set_id);
- }
- else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
+ } else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
{
int sub_set = (int) strtol(event->ev_data1, NULL, 10);
int sub_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -785,8 +777,7 @@
"select %s.subscribeSet_int(%d, %d, %d, '%q'); ",
rtcfg_namespace,
sub_set, sub_provider, sub_receiver, sub_forward);
- }
- else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
+ } else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
{
int sub_set = (int) strtol(event->ev_data1, NULL, 10);
int sub_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -809,9 +800,11 @@
while (true)
{
/*
- * If we received this event from another
- * node than the data provider, wait until the
- * data provider has synced up far enough.
+ * If we received this event
+ * from another node than the
+ * data provider, wait until
+ * the data provider has
+ * synced up far enough.
*/
if (event->event_provider != sub_provider)
{
@@ -836,18 +829,20 @@
continue;
}
}
-
/*
- * Execute the config changes so far, but don't
- * commit the transaction yet. We have to copy
- * the data now ...
+ * Execute the config changes
+ * so far, but don't commit
+ * the transaction yet. We
+ * have to copy the data now
+ * ...
*/
if (query_execute(node, local_dbconn, &query1) < 0)
slon_abort();
/*
- * If the copy succeeds, exit the loop and let
- * the transaction commit.
+ * If the copy succeeds, exit
+ * the loop and let the
+ * transaction commit.
*/
if (copy_set(node, local_conn, sub_set, event) == 0)
{
@@ -859,10 +854,11 @@
sched_rc = SCHED_STATUS_OK;
break;
}
-
/*
- * Data copy for new enabled set has failed.
- * Rollback the transaction, sleep and try again.
+ * Data copy for new enabled
+ * set has failed. Rollback
+ * the transaction, sleep and
+ * try again.
*/
if (query_execute(node, local_dbconn, &query2) < 0)
slon_abort();
@@ -877,15 +873,14 @@
event_ok = false;
break;
}
-
if (sleeptime < 60)
sleeptime *= 2;
}
- }
- else
+ } else
{
/*
- * Somebody else got enabled, just remember it
+ * Somebody else got enabled, just
+ * remember it
*/
slon_appendquery(&query1,
"select %s.enableSubscription(%d, %d, %d); ",
@@ -893,23 +888,21 @@
sub_set, sub_provider, sub_receiver);
}
- }
- else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
+ } else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
{
int sub_set = (int) strtol(event->ev_data1, NULL, 10);
int sub_receiver = (int) strtol(event->ev_data2, NULL, 10);
/*
- * All the real work is done when the config utility
- * calls unsubscribeSet() itself. Just propagate the
- * event here.
+ * All the real work is done when the config
+ * utility calls unsubscribeSet() itself.
+ * Just propagate the event here.
*/
slon_appendquery(&query1,
"select %s.unsubscribeSet_int(%d, %d); ",
rtcfg_namespace,
sub_set, sub_receiver);
- }
- else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
+ } else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
{
int ddl_setid = (int) strtol(event->ev_data1, NULL, 10);
char *ddl_script = event->ev_data2;
@@ -919,8 +912,7 @@
"select %s.ddlScript_int(%d, '%q', %d); ",
rtcfg_namespace,
ddl_setid, ddl_script, ddl_only_on_node);
- }
- else
+ } else
{
printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
@@ -934,8 +926,7 @@
{
query_append_event(&query1, event);
slon_appendquery(&query1, "commit transaction;");
- }
- else
+ } else
{
slon_mkquery(&query1, "rollback transaction;");
}
@@ -950,8 +941,8 @@
}
/*
- * Thread exit time has arrived.
- * Disconnect from all data providers and free memory
+ * Thread exit time has arrived. Disconnect from all data providers
+ * and free memory
*/
adjust_provider_info(node, wd, true);
@@ -1004,8 +995,8 @@
for (provider = wd->provider_head; provider; provider = provider->next)
{
/*
- * We create a lock here and keep it until we made
- * our final decision about what to do with the helper thread.
+ * We create a lock here and keep it until we made our final
+ * decision about what to do with the helper thread.
*/
pthread_mutex_lock(&(provider->helper_lock));
@@ -1023,10 +1014,10 @@
/*
* Step 2.
*
- * Add all currently replicated sets (back) to the providers
- * adding new providers as necessary. This step is skippen in
- * cleanup mode, causing all providers to become obsolete
- * and thus the whole provider info extinct.
+ * Add all currently replicated sets (back) to the providers adding new
+ * providers as necessary. This step is skippen in cleanup mode,
+ * causing all providers to become obsolete and thus the whole
+ * provider info extinct.
*/
if (!cleanup)
{
@@ -1056,7 +1047,8 @@
if (provider == NULL)
{
/*
- * No provider entry found. Create a new one.
+ * No provider entry found. Create a
+ * new one.
*/
provider = (ProviderInfo *)
malloc(sizeof(ProviderInfo));
@@ -1065,9 +1057,9 @@
provider->wd = wd;
/*
- * Also create a helper thread for this
- * provider, which will actually run the
- * log data selection for us.
+ * Also create a helper thread for
+ * this provider, which will actually
+ * run the log data selection for us.
*/
pthread_mutex_init(&(provider->helper_lock), NULL);
pthread_mutex_lock(&(provider->helper_lock));
@@ -1087,7 +1079,8 @@
node->no_id, provider->no_id);
/*
- * Add more workgroup data lines to the pool.
+ * Add more workgroup data lines to
+ * the pool.
*/
for (i = 0; i < SLON_WORKLINES_PER_HELPER; i++)
{
@@ -1107,8 +1100,8 @@
provider);
/*
- * Copy the runtime configurations conninfo
- * into the provider info.
+ * Copy the runtime configurations
+ * conninfo into the provider info.
*/
rtcfg_node = rtcfg_findNode(provider->no_id);
if (rtcfg_node != NULL)
@@ -1119,7 +1112,6 @@
strdup(rtcfg_node->pa_conninfo);
}
}
-
/*
* Add the set to the list of sets we get
* from this provider.
@@ -1139,7 +1131,6 @@
}
}
}
-
/*
* Step 3.
*
@@ -1159,8 +1150,8 @@
if (provider->set_head == NULL)
{
/*
- * Tell this helper thread to exit, join him and destroy
- * thread related data.
+ * Tell this helper thread to exit, join him and
+ * destroy thread related data.
*/
provider->helper_status = SLON_WG_EXIT;
pthread_cond_signal(&(provider->helper_cond));
@@ -1202,7 +1193,6 @@
slon_disconnectdb(provider->conn);
provider->conn = NULL;
}
-
/*
* Free other resources
*/
@@ -1218,7 +1208,6 @@
continue;
}
-
/*
* If the connection info has changed, we have to reconnect.
*/
@@ -1242,22 +1231,20 @@
else
provider->pa_conninfo = strdup(rtcfg_node->pa_conninfo);
}
-
/*
- * Unlock the helper thread ... he should now go and wait
- * for work.
+ * Unlock the helper thread ... he should now go and wait for
+ * work.
*/
pthread_mutex_unlock(&(provider->helper_lock));
}
}
-/* ----------
- * remoteWorker_event
+/*
+ * ---------- remoteWorker_event
*
- * Used by the remoteListeThread to forward events selected from
- * the event provider database to the remote nodes worker thread.
- *----------
+ * Used by the remoteListeThread to forward events selected from the event
+ * provider database to the remote nodes worker thread. ----------
*/
void
remoteWorker_event(int event_provider,
@@ -1297,7 +1284,6 @@
"remoteWorker_event: ignore new events due to shutdown\n");
return;
}
-
/*
* Find the node, make sure it is active and that this event is not
* already queued or processed.
@@ -1331,11 +1317,10 @@
ev_origin, ev_seqno);
return;
}
-
/*
- * We lock the worker threads message queue before bumping the
- * nodes last known event sequence to avoid that another listener
- * queues a later message before we can insert this one.
+ * We lock the worker threads message queue before bumping the nodes
+ * last known event sequence to avoid that another listener queues a
+ * later message before we can insert this one.
*/
pthread_mutex_lock(&(node->message_lock));
node->last_event = ev_seqno;
@@ -1377,47 +1362,72 @@
msg->event_provider = event_provider;
msg->ev_origin = ev_origin;
msg->ev_seqno = ev_seqno;
- msg->ev_timestamp_c = cp; strcpy(cp, ev_timestamp); cp += len_timestamp;
- msg->ev_minxid_c = cp; strcpy(cp, ev_minxid); cp += len_minxid;
- msg->ev_maxxid_c = cp; strcpy(cp, ev_maxxid); cp += len_maxxid;
- msg->ev_xip = cp; strcpy(cp, ev_xip); cp += len_xip;
- msg->ev_type = cp; strcpy(cp, ev_type); cp += len_type;
+ msg->ev_timestamp_c = cp;
+ strcpy(cp, ev_timestamp);
+ cp += len_timestamp;
+ msg->ev_minxid_c = cp;
+ strcpy(cp, ev_minxid);
+ cp += len_minxid;
+ msg->ev_maxxid_c = cp;
+ strcpy(cp, ev_maxxid);
+ cp += len_maxxid;
+ msg->ev_xip = cp;
+ strcpy(cp, ev_xip);
+ cp += len_xip;
+ msg->ev_type = cp;
+ strcpy(cp, ev_type);
+ cp += len_type;
if (ev_data1 != NULL)
{
- msg->ev_data1 = cp; strcpy(cp, ev_data1); cp += len_data1;
+ msg->ev_data1 = cp;
+ strcpy(cp, ev_data1);
+ cp += len_data1;
}
if (ev_data2 != NULL)
{
- msg->ev_data2 = cp; strcpy(cp, ev_data2); cp += len_data2;
+ msg->ev_data2 = cp;
+ strcpy(cp, ev_data2);
+ cp += len_data2;
}
if (ev_data3 != NULL)
{
- msg->ev_data3 = cp; strcpy(cp, ev_data3); cp += len_data3;
+ msg->ev_data3 = cp;
+ strcpy(cp, ev_data3);
+ cp += len_data3;
}
if (ev_data4 != NULL)
{
- msg->ev_data4 = cp; strcpy(cp, ev_data4); cp += len_data4;
+ msg->ev_data4 = cp;
+ strcpy(cp, ev_data4);
+ cp += len_data4;
}
if (ev_data5 != NULL)
{
- msg->ev_data5 = cp; strcpy(cp, ev_data5); cp += len_data5;
+ msg->ev_data5 = cp;
+ strcpy(cp, ev_data5);
+ cp += len_data5;
}
if (ev_data6 != NULL)
{
- msg->ev_data6 = cp; strcpy(cp, ev_data6); cp += len_data6;
+ msg->ev_data6 = cp;
+ strcpy(cp, ev_data6);
+ cp += len_data6;
}
if (ev_data7 != NULL)
{
- msg->ev_data7 = cp; strcpy(cp, ev_data7); cp += len_data7;
+ msg->ev_data7 = cp;
+ strcpy(cp, ev_data7);
+ cp += len_data7;
}
if (ev_data8 != NULL)
{
- msg->ev_data8 = cp; strcpy(cp, ev_data8); cp += len_data8;
+ msg->ev_data8 = cp;
+ strcpy(cp, ev_data8);
+ cp += len_data8;
}
-
/*
- * Add the message to the queue and trigger the condition
- * variable in case the worker is idle.
+ * Add the message to the queue and trigger the condition variable in
+ * case the worker is idle.
*/
DLLIST_ADD_TAIL(node->message_head, node->message_tail,
(SlonWorkMsg *)msg);
@@ -1426,12 +1436,11 @@
}
-/* ----------
- * remoteWorker_wakeup
+/*
+ * ---------- remoteWorker_wakeup
*
- * Send a special WAKEUP message to a worker, causing it to recheck
- * the runmode and the runtime configuration.
- * ----------
+ * Send a special WAKEUP message to a worker, causing it to recheck the runmode
+ * and the runtime configuration. ----------
*/
void
remoteWorker_wakeup(int no_id)
@@ -1440,8 +1449,8 @@
SlonWorkMsg *msg;
/*
- * Can't wakeup myself, can I? No, we never have a
- * "remote" worker for our own node ID.
+ * Can't wakeup myself, can I? No, we never have a "remote" worker
+ * for our own node ID.
*/
if (no_id == rtcfg_nodeid)
return;
@@ -1476,11 +1485,10 @@
}
-/* ----------
- * remoteWorker_confirm
+/*
+ * ---------- remoteWorker_confirm
*
- * Add a confirm message to the remote worker message queue
- * ----------
+ * Add a confirm message to the remote worker message queue ----------
*/
void
remoteWorker_confirm(int no_id,
@@ -1534,13 +1542,15 @@
for (oldmsg = (SlonWorkMsg_confirm *)(node->message_head);
oldmsg; oldmsg = (SlonWorkMsg_confirm *)(oldmsg->next))
{
- if (oldmsg->msg_type == WMSG_CONFIRM) {
+ if (oldmsg->msg_type == WMSG_CONFIRM)
+ {
if (oldmsg->con_origin == con_origin &&
oldmsg->con_received == con_received)
{
/*
- * Existing message found. Change it if new seqno is
- * greater than old. Otherwise just ignore this confirm.
+ * Existing message found. Change it if new
+ * seqno is greater than old. Otherwise just
+ * ignore this confirm.
*/
if (oldmsg->con_seqno < con_seqno)
{
@@ -1569,19 +1579,18 @@
(SlonWorkMsg *)msg);
/*
- * Send a condition signal to the worker thread in case it is
- * waiting for new messages.
+ * Send a condition signal to the worker thread in case it is waiting
+ * for new messages.
*/
pthread_cond_signal(&(node->message_cond));
pthread_mutex_unlock(&(node->message_lock));
}
-/* ----------
- * query_execute
+/*
+ * ---------- query_execute
*
- * Execute a query string that does not return a result set.
- * ----------
+ * Execute a query string that does not return a result set. ----------
*/
static int
query_execute(SlonNode *node, PGconn *dbconn, SlonDString *dsp)
@@ -1608,12 +1617,11 @@
}
-/* ----------
- * query_append_event
+/*
+ * ---------- query_append_event
*
- * Add queries to a dstring that notify for Event and Confirm and
- * that insert a duplicate of an event record as well as the
- * confirmation for it.
+ * Add queries to a dstring that notify for Event and Confirm and that insert a
+ * duplicate of an event record as well as the confirmation for it.
* ----------
*/
static void
@@ -1630,14 +1638,22 @@
" ev_minxid, ev_maxxid, ev_xip, ev_type ",
rtcfg_cluster_name, rtcfg_cluster_name,
rtcfg_namespace);
- if (event->ev_data1 != NULL) dstring_append(dsp, ", ev_data1");
- if (event->ev_data2 != NULL) dstring_append(dsp, ", ev_data2");
- if (event->ev_data3 != NULL) dstring_append(dsp, ", ev_data3");
- if (event->ev_data4 != NULL) dstring_append(dsp, ", ev_data4");
- if (event->ev_data5 != NULL) dstring_append(dsp, ", ev_data5");
- if (event->ev_data6 != NULL) dstring_append(dsp, ", ev_data6");
- if (event->ev_data7 != NULL) dstring_append(dsp, ", ev_data7");
- if (event->ev_data8 != NULL) dstring_append(dsp, ", ev_data8");
+ if (event->ev_data1 != NULL)
+ dstring_append(dsp, ", ev_data1");
+ if (event->ev_data2 != NULL)
+ dstring_append(dsp, ", ev_data2");
+ if (event->ev_data3 != NULL)
+ dstring_append(dsp, ", ev_data3");
+ if (event->ev_data4 != NULL)
+ dstring_append(dsp, ", ev_data4");
+ if (event->ev_data5 != NULL)
+ dstring_append(dsp, ", ev_data5");
+ if (event->ev_data6 != NULL)
+ dstring_append(dsp, ", ev_data6");
+ if (event->ev_data7 != NULL)
+ dstring_append(dsp, ", ev_data7");
+ if (event->ev_data8 != NULL)
+ dstring_append(dsp, ", ev_data8");
slon_appendquery(dsp,
" ) values ('%d', '%s', '%s', '%s', '%s', '%q', '%s'",
event->ev_origin, seqbuf, event->ev_timestamp_c,
@@ -1669,11 +1685,10 @@
}
-/* ----------
- * store_confirm_forward
+/*
+ * ---------- store_confirm_forward
*
- * Call the forwardConfirm() stored procedure.
- * ----------
+ * Call the forwardConfirm() stored procedure. ----------
*/
static void
store_confirm_forward(SlonNode *node, SlonConn *conn,
@@ -1686,8 +1701,8 @@
int cstat_found = false;
/*
- * Check the global confirm status if we already know about
- * this confirmation.
+ * Check the global confirm status if we already know about this
+ * confirmation.
*/
pthread_mutex_lock(&node_confirm_lock);
for (cstat = node_confirm_head; cstat; cstat = cstat->next)
@@ -1701,14 +1716,15 @@
if (cstat->con_seqno >= confirm->con_seqno)
{
/*
- * Confirm status is newer or equal, ignore message.
+ * Confirm status is newer or equal, ignore
+ * message.
*/
pthread_mutex_unlock(&node_confirm_lock);
return;
}
/*
- * Set the confirm status to the new seqno and continue
- * below.
+ * Set the confirm status to the new seqno and
+ * continue below.
*/
cstat_found = true;
cstat->con_seqno = confirm->con_seqno;
@@ -1728,12 +1744,11 @@
cstat->con_seqno = confirm->con_seqno;
DLLIST_ADD_TAIL(node_confirm_head, node_confirm_tail, cstat);
}
-
pthread_mutex_unlock(&node_confirm_lock);
/*
- * Call the stored procedure to forward this status through
- * the table sl_confirm.
+ * Call the stored procedure to forward this status through the table
+ * sl_confirm.
*/
dstring_init(&query);
sprintf(seqbuf, INT64_FORMAT, confirm->con_seqno);
@@ -1765,12 +1780,11 @@
}
-/* ----------
- * get_last_forwarded_confirm
+/*
+ * ---------- get_last_forwarded_confirm
*
- * Look what confirmed event seqno we forwarded last for
- * a given origin+receiver pair.
- * ----------
+ * Look what confirmed event seqno we forwarded last for a given origin+receiver
+ * pair. ----------
*/
static int64
get_last_forwarded_confirm(int origin, int receiver)
@@ -1778,8 +1792,8 @@
struct node_confirm_status *cstat;
/*
- * Check the global confirm status if we already know about
- * this confirmation.
+ * Check the global confirm status if we already know about this
+ * confirmation.
*/
pthread_mutex_lock(&node_confirm_lock);
for (cstat = node_confirm_head; cstat; cstat = cstat->next)
@@ -1863,7 +1877,6 @@
node->no_id, set_id);
return -1;
}
-
if ((sub_node = rtcfg_findNode(sub_provider)) == NULL)
{
rtcfg_unlock();
@@ -1919,18 +1932,15 @@
dstring_free(&query1);
return -1;
}
-
/*
- * Begin a serialized transaction and check if our xmin
- * in the snapshot is > than ev_maxxid. This ensures that
- * all transactions that have been in progress when the
- * subscription got enabled (which is after the triggers
- * on the tables have been defined), have finished.
- * Otherwise a long running open transaction would not
- * have the trigger definitions yet, and an insert would
- * not get logged. But if it still runs when we start to
- * copy the set, then we don't see the row either and it
- * would get lost.
+ * Begin a serialized transaction and check if our xmin in the
+ * snapshot is > than ev_maxxid. This ensures that all transactions
+ * that have been in progress when the subscription got enabled
+ * (which is after the triggers on the tables have been defined),
+ * have finished. Otherwise a long running open transaction would not
+ * have the trigger definitions yet, and an insert would not get
+ * logged. But if it still runs when we start to copy the set, then
+ * we don't see the row either and it would get lost.
*/
if (sub_provider == set_origin)
{
@@ -1961,8 +1971,7 @@
return -1;
}
PQclear(res1);
- }
- else
+ } else
{
slon_mkquery(&query1,
"start transaction; "
@@ -1976,7 +1985,8 @@
}
/*
- * Select the list of all tables the provider currently has in the set.
+ * Select the list of all tables the provider currently has in the
+ * set.
*/
slon_mkquery(&query1,
"select T.tab_id, "
@@ -2021,8 +2031,8 @@
node->no_id, tab_fqname);
/*
- * Find out if the table we're copying has the special
- * slony serial number key on the provider DB
+ * Find out if the table we're copying has the special slony
+ * serial number key on the provider DB
*/
slon_mkquery(&query1,
"select %s.tableHasSerialKey('%q');",
@@ -2086,15 +2096,13 @@
slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
"table %s Slony-I serial key added local\n",
node->no_id, tab_fqname);
- }
- else
+ } else
{
slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
"local table %s already has Slony-I serial key\n",
node->no_id, tab_fqname);
}
- }
- else
+ } else
{
slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
"table %s does not require Slony-I serial key\n",
@@ -2103,9 +2111,9 @@
/*
- * Call the setAddTable_int() stored procedure. Up to now, while
- * we have not been subscribed to the set, this should have been
- * suppressed.
+ * Call the setAddTable_int() stored procedure. Up to now,
+ * while we have not been subscribed to the set, this should
+ * have been suppressed.
*/
slon_mkquery(&query1,
"select %s.setAddTable_int(%d, %d, '%q', '%q', '%q'); ",
@@ -2118,7 +2126,6 @@
dstring_free(&query1);
return -1;
}
-
/*
* Copy the content of sl_trigger for this table
*/
@@ -2157,8 +2164,8 @@
/*
* Begin a COPY from stdin for the table on the local DB
- * TODO: use the transaction safe truncate table on 7.4 or better
- * instead of delete.
+ * TODO: use the transaction safe truncate table on 7.4 or
+ * better instead of delete.
*/
slon_mkquery(&query1,
"delete from only %s; "
@@ -2176,7 +2183,6 @@
dstring_free(&query1);
return -1;
}
-
/*
* Begin a COPY to stdout for the table on the provider DB
*/
@@ -2202,7 +2208,6 @@
dstring_free(&query1);
return -1;
}
-
/*
* Copy the data over
*/
@@ -2248,7 +2253,6 @@
dstring_free(&query1);
return -1;
}
-
/*
* Check that the COPY to stdout on the provider node
* finished successful.
@@ -2308,8 +2312,7 @@
copybuf[2] == '\0')
{
copydone = true;
- }
- else
+ } else
{
switch(rc)
{
@@ -2384,7 +2387,6 @@
dstring_free(&query1);
return -1;
}
-
gettimeofday(&tv_now, NULL);
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
"%.3f seconds to copy table %s\n",
@@ -2487,16 +2489,15 @@
node->no_id, seql_seqid, seq_fqname, seql_last_value);
/*
- * sequence with ID 0 is a nodes rowid ... only remember
- * in seqlog.
+ * sequence with ID 0 is a nodes rowid ... only remember in
+ * seqlog.
*/
if (strtol(seql_seqid, NULL, 10) != 0)
{
slon_mkquery(&query1,
"select \"pg_catalog\".setval('%q', '%s'); ",
seq_fqname, seql_last_value);
- }
- else
+ } else
dstring_reset(&query1);
slon_appendquery(&query1,
"insert into %s.sl_seqlog "
@@ -2526,18 +2527,18 @@
gettimeofday(&tv_start2, NULL);
/*
- * It depends on who is our data provider how we construct
- * the initial setsync status.
+ * It depends on who is our data provider how we construct the
+ * initial setsync status.
*/
if (set_origin == node->no_id)
{
/*
- * Our provider is the origin, so we have to construct
- * the setsync from scratch.
+ * Our provider is the origin, so we have to construct the
+ * setsync from scratch.
*
- * The problem at hand is that the data is something between
- * two SYNC points. So to get to the next sync point, we'll
- * have to take this and all
+ * The problem at hand is that the data is something between two
+ * SYNC points. So to get to the next sync point, we'll have
+ * to take this and all
*/
slon_mkquery(&query1,
"select max(ev_seqno) as ssy_seqno "
@@ -2568,8 +2569,8 @@
if (PQgetisnull(res1, 0, 0))
{
/*
- * No SYNC event found, so we initialize the setsync to
- * the event point of the ENABLE_SUBSCRIPTION
+ * No SYNC event found, so we initialize the setsync
+ * to the event point of the ENABLE_SUBSCRIPTION
*/
ssy_seqno = seqbuf;
ssy_minxid = event->ev_minxid_c;
@@ -2587,12 +2588,12 @@
"from %s.sl_log_2 where log_origin = %d; ",
rtcfg_namespace, node->no_id,
rtcfg_namespace, node->no_id);
- }
- else
+ } else
{
/*
- * Use the last SYNC's snapshot information and
- * set the action sequence list to all actions after that.
+ * Use the last SYNC's snapshot information and set
+ * the action sequence list to all actions after
+ * that.
*/
slon_mkquery(&query1,
"select ev_seqno, ev_minxid, ev_maxxid, ev_xip "
@@ -2621,7 +2622,6 @@
dstring_free(&query1);
return -1;
}
-
ssy_seqno = PQgetvalue(res1, 0, 0);
ssy_minxid = PQgetvalue(res1, 0, 1);
ssy_maxxid = PQgetvalue(res1, 0, 2);
@@ -2683,8 +2683,7 @@
}
dstring_terminate(&ssy_action_list);
PQclear(res2);
- }
- else
+ } else
{
/*
* Our provider is another subscriber, so we can copy the
@@ -2716,7 +2715,6 @@
dstring_free(&query1);
return -1;
}
-
dstring_init(&ssy_action_list);
ssy_seqno = PQgetvalue(res1, 0, 0);
ssy_minxid = PQgetvalue(res1, 0, 1);
@@ -2745,7 +2743,6 @@
dstring_free(&query1);
return -1;
}
-
gettimeofday(&tv_now, NULL);
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
"%.3f seconds to build initial setsync status\n",
@@ -2753,8 +2750,8 @@
TIMEVAL_DIFF(&tv_start2, &tv_now));
/*
- * Roll back the transaction we used on the provider and close
- * the database connection.
+ * Roll back the transaction we used on the provider and close the
+ * database connection.
*/
slon_mkquery(&query1, "rollback transaction");
if (query_execute(node, pro_dbconn, &query1) < 0)
@@ -2824,7 +2821,6 @@
dstring_free(&query);
return 10;
}
-
sprintf(conn_symname, "subscriber_%d_provider_%d",
node->no_id, provider->no_id);
provider->conn = slon_connectdb(provider->pa_conninfo,
@@ -2838,7 +2834,6 @@
dstring_free(&query);
return provider->pa_connretry;
}
-
/*
* Listen on the special relation telling our node
* relationship
@@ -2853,7 +2848,6 @@
provider->conn = NULL;
return provider->pa_connretry;
}
-
slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: "
"connected to data provider %d on '%s'\n",
node->no_id, provider->no_id,
@@ -2862,8 +2856,8 @@
}
/*
- * Check that all these providers have processed at least up
- * to the SYNC event we're handling here.
+ * Check that all these providers have processed at least up to the
+ * SYNC event we're handling here.
*/
for (provider = wd->provider_head; provider; provider = provider->next)
{
@@ -2899,7 +2893,6 @@
dstring_free(&query);
return 10;
}
-
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
"data provider %d confirmed up to "
"ev_seqno %s for ev_origin %d - OK\n",
@@ -3009,8 +3002,8 @@
node->no_id);
/*
- * Select all sets we receive from this provider and which are
- * not synced better than this SYNC already.
+ * Select all sets we receive from this provider and which
+ * are not synced better than this SYNC already.
*/
slon_mkquery(&query,
"select SSY.ssy_setid, SSY.ssy_seqno, "
@@ -3037,8 +3030,6 @@
dstring_free(&query);
return 60;
}
-
-
/*
* For every set we receive from this provider
*/
@@ -3095,27 +3086,23 @@
PQclear(res2);
continue;
}
-
/*
* ... and build up a query qualification that is
*
- * and (
- * (log_tableid in (<tables_in_set>)
- * and (<snapshot_qual_of_new_sync>)
- * and (<snapshot_qual_of_setsync>)
- * )
- * OR
- * ( <next_set_from_this_provider> )
- * )
+ * and ( (log_tableid in (<tables_in_set>) and
+ * (<snapshot_qual_of_new_sync>) and
+ * (<snapshot_qual_of_setsync>) ) OR (
+ * <next_set_from_this_provider> ) )
*
- * If we were using AND's there then no rows will ever end
- * up being selected when you have multiple sets.
+ * If we were using AND's there then no rows will ever
+ * end up being selected when you have multiple sets.
*/
if(added_or_to_provider)
{
slon_appendquery(provider_qual, "or (\n log_tableid in (");
- } else {
+ } else
+ {
slon_appendquery(provider_qual, " (\n log_tableid in (");
added_or_to_provider = 1;
}
@@ -3128,8 +3115,9 @@
SlonSet *rtcfg_set;
/*
- * Remember the fully qualified table name on the fly.
- * This might have to become a hashtable someday.
+ * Remember the fully qualified table name on
+ * the fly. This might have to become a
+ * hashtable someday.
*/
while (tab_id >= wd->tab_fqname_size)
{
@@ -3146,8 +3134,8 @@
wd->tab_fqname[tab_id] = strdup(PQgetvalue(res2, tupno2, 2));
/*
- * Also remember if the tables log data needs to be
- * forwarded.
+ * Also remember if the tables log data needs
+ * to be forwarded.
*/
for (rtcfg_set = rtcfg_set_list_head; rtcfg_set;
rtcfg_set = rtcfg_set->next)
@@ -3194,14 +3182,15 @@
PQclear(res1);
/*
- * We didn't add anything good in the provider clause.
- * That shouldn't be!
+ * We didn't add anything good in the provider clause. That
+ * shouldn't be!
*/
if(added_or_to_provider)
{
/* close out our OR block */
slon_appendquery(provider_qual, ")");
- } else {
+ } else
+ {
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: Didn't add or to provider\n", node->no_id);
}
}
@@ -3209,8 +3198,8 @@
dstring_free(&new_qual);
/*
- * If we have found no sets needing sync at all, why
- * bother the helpers?
+ * If we have found no sets needing sync at all, why bother the
+ * helpers?
*/
if (num_sets == 0)
{
@@ -3220,7 +3209,6 @@
dstring_free(&query);
return 0;
}
-
/*
* Time to get the helpers busy.
*/
@@ -3278,7 +3266,6 @@
PQclear(res1);
break;
}
-
if (PQresultStatus(res1) != PGRES_COMMAND_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
@@ -3301,8 +3288,7 @@
dstring_data(&(wgline->data)),
dstring_data(&(wgline->provider->helper_qualification)));
num_errors++;
- }
- else
+ } else
slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: %s\n",
node->no_id, dstring_data(&(wgline->data)));
}
@@ -3348,8 +3334,7 @@
}
/*
- * Inform the helpers that the whole group is done with this
- * SYNC.
+ * Inform the helpers that the whole group is done with this SYNC.
*/
slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
"all helpers done.\n",
@@ -3391,10 +3376,9 @@
node->no_id);
return 10;
}
-
/*
- * Light's are still green ... update the setsync status of
- * all the sets we've just replicated ...
+ * Light's are still green ... update the setsync status of all the
+ * sets we've just replicated ...
*/
slon_mkquery(&query,
"update %s.sl_setsync set "
@@ -3435,7 +3419,6 @@
}
PQclear(res1);
}
-
for (provider = wd->provider_head; provider; provider = provider->next)
{
/*
@@ -3454,8 +3437,8 @@
}
/*
- * Get the nodes rowid sequence at that sync time just in case
- * we are later on asked to restore the node after a failover.
+ * Get the nodes rowid sequence at that sync time just in case we are
+ * later on asked to restore the node after a failover.
*/
slon_mkquery(&query,
"select seql_last_value from %s.sl_seqlog "
@@ -3553,7 +3536,6 @@
pthread_mutex_unlock(&(provider->helper_lock));
continue;
}
-
/*
* OK, we got work to do.
*/
@@ -3561,7 +3543,8 @@
pthread_mutex_unlock(&(provider->helper_lock));
errors = 0;
- do {
+ do
+ {
/*
* Start a transaction
*/
@@ -3573,7 +3556,6 @@
errors++;
break;
}
-
/*
* Open a cursor that reads the log data.
*
@@ -3596,18 +3578,17 @@
errors++;
break;
}
-
/*
* Now fetch the log data and forward it via the line
- * pool to the main worker who pushes it into the local
- * database.
+ * pool to the main worker who pushes it into the
+ * local database.
*/
alloc_lines = 0;
while (errors == 0)
{
/*
- * Allocate at least some lines - ideally the whole
- * fetch size.
+ * Allocate at least some lines - ideally the
+ * whole fetch size.
*/
while (alloc_lines == 0 && !errors)
{
@@ -3615,8 +3596,8 @@
"remoteHelperThread_%d_%d: allocate lines\n",
node->no_id, provider->no_id);
/*
- * Wait until there are lines available in
- * the pool.
+ * Wait until there are lines
+ * available in the pool.
*/
pthread_mutex_lock(&(wd->workdata_lock));
while (wd->linepool_head == NULL &&
@@ -3626,8 +3607,9 @@
}
/*
- * If any error occured somewhere in the group, the
- * main worker will set the status to ABORT.
+ * If any error occured somewhere in
+ * the group, the main worker will
+ * set the status to ABORT.
*/
if (wd->workgroup_status != SLON_WG_BUSY)
{
@@ -3638,10 +3620,9 @@
errors++;
break;
}
-
/*
- * So far so good. Fill our array of lines
- * from the pool.
+ * So far so good. Fill our array of
+ * lines from the pool.
*/
while (alloc_lines < SLON_DATA_FETCH_SIZE &&
wd->linepool_head != NULL)
@@ -3662,8 +3643,9 @@
node->no_id, provider->no_id, alloc_lines);
/*
- * Now that we have allocated some buffer space,
- * try to fetch that many rows from the cursor.
+ * Now that we have allocated some buffer
+ * space, try to fetch that many rows from
+ * the cursor.
*/
slon_mkquery(&query, "fetch %d from LOG; ",
alloc_lines * SLON_COMMANDS_PER_LINE);
@@ -3678,7 +3660,6 @@
errors++;
break;
}
-
if (first_fetch)
{
gettimeofday(&tv_now, NULL);
@@ -3689,10 +3670,9 @@
first_fetch = false;
}
-
/*
- * Fill the line buffers with queries from the
- * retrieved log rows.
+ * Fill the line buffers with queries from
+ * the retrieved log rows.
*/
line_no = 0;
ntuples = PQntuples(res);
@@ -3716,13 +3696,14 @@
line->provider = provider;
dstring_reset(&(line->data));
}
-
/*
- * This can happen if the table belongs to a
- * set that already has a better sync status
- * than the event we're currently processing
- * as a result from another SYNC occuring before
- * we had started processing the copy_set.
+ * This can happen if the table
+ * belongs to a set that already has
+ * a better sync status than the
+ * event we're currently processing
+ * as a result from another SYNC
+ * occuring before we had started
+ * processing the copy_set.
*/
if (log_tableid >= wd->tab_fqname_size ||
wd->tab_fqname[log_tableid] == NULL)
@@ -3740,7 +3721,6 @@
log_origin, log_xid, log_tableid,
log_actionseq, log_cmdtype, log_cmddata);
}
-
switch (*log_cmdtype)
{
case 'I':
@@ -3768,8 +3748,9 @@
PQclear(res);
/*
- * Now put all the line buffers back. Filled ones
- * into the repldata, unused ones into the pool.
+ * Now put all the line buffers back. Filled
+ * ones into the repldata, unused ones into
+ * the pool.
*/
pthread_mutex_lock(&(wd->workdata_lock));
for (tupno = 0; tupno < alloc_lines; tupno++)
@@ -3799,7 +3780,6 @@
alloc_lines = 0;
break;
}
-
alloc_lines = 0;
}
} while (0);
@@ -3822,7 +3802,6 @@
pthread_cond_broadcast(&(wd->linepool_cond));
pthread_mutex_unlock(&(wd->workdata_lock));
}
-
/*
* Close the cursor and rollback the transaction.
*/
@@ -3887,5 +3866,3 @@
pthread_mutex_unlock(&(provider->helper_lock));
}
}
-
-
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.15
retrieving revision 1.16
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.15 -r1.16
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -29,14 +29,14 @@
#include "slon.h"
-/* ----------
- * struct listat
+/*
+ * ---------- struct listat
*
- * local data structure for nodes we are currently listening
- * for events from.
+ * local data structure for nodes we are currently listening for events from.
* ----------
*/
-struct listat {
+struct listat
+{
int li_origin;
struct listat *prev;
@@ -44,28 +44,30 @@
};
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
*/
-static void remoteListen_adjust_listat(SlonNode *node,
+static void
+remoteListen_adjust_listat(SlonNode * node,
struct listat **listat_head,
struct listat **listat_tail);
-static void remoteListen_cleanup(struct listat **listat_head,
+static void
+remoteListen_cleanup(struct listat ** listat_head,
struct listat **listat_tail);
-static int remoteListen_forward_confirm(SlonNode *node,
+static int
+remoteListen_forward_confirm(SlonNode * node,
SlonConn *conn);
-static int remoteListen_receive_events(SlonNode *node,
+static int
+remoteListen_receive_events(SlonNode * node,
SlonConn *conn, struct listat *listat);
-/* ----------
- * slon_remoteListenThread
+/*
+ * ---------- slon_remoteListenThread
*
- * Listen for events on a remote database connection. This means,
- * events generated by every other node we listen for on this one.
- * ----------
+ * Listen for events on a remote database connection. This means, events
+ * generated by every other node we listen for on this one. ----------
*/
void *
remoteListenThread_main(void *cdata)
@@ -109,14 +111,15 @@
if (last_config_seq != (new_config_seq = rtcfg_seq_get()))
{
/*
- * Lock the configuration and check if we are (still) supposed
- * to exist.
+ * Lock the configuration and check if we are (still)
+ * supposed to exist.
*/
rtcfg_lock();
/*
- * If we have a database connection to the remote node, check
- * if there was a change in the connection information.
+ * If we have a database connection to the remote
+ * node, check if there was a change in the
+ * connection information.
*/
if (conn != NULL)
{
@@ -134,7 +137,6 @@
conn_conninfo = NULL;
}
}
-
/*
* Check our node's listen_status
*/
@@ -149,9 +151,10 @@
node->listen_status = SLON_TSTAT_RUNNING;
/*
- * Adjust the listat list and see if there is anything to
- * listen for. If not, sleep for a while and check again,
- * some node reconfiguration must be going on here.
+ * Adjust the listat list and see if there is
+ * anything to listen for. If not, sleep for a while
+ * and check again, some node reconfiguration must be
+ * going on here.
*/
remoteListen_adjust_listat(node, &listat_head, &listat_tail);
@@ -168,10 +171,8 @@
break;
continue;
}
-
rtcfg_unlock();
}
-
/*
* Check if we have a database connection
*/
@@ -197,10 +198,9 @@
continue;
}
-
/*
- * Try to establish a database connection to the remote
- * node's database.
+ * Try to establish a database connection to the
+ * remote node's database.
*/
conn_conninfo = strdup(node->pa_conninfo);
pa_connretry = node->pa_connretry;
@@ -226,7 +226,8 @@
dbconn = conn->dbconn;
/*
- * Listen on the connection for events and confirmations
+ * Listen on the connection for events and
+ * confirmations
*/
slon_mkquery(&query1,
"listen \"_%s_Event\"; "
@@ -253,7 +254,6 @@
continue;
}
-
rc = db_getLocalNodeId(dbconn);
if (rc != node->no_id)
{
@@ -273,13 +273,11 @@
continue;
}
-
slon_log(SLON_DEBUG1,
"remoteListenThread_%d: connected to '%s'\n",
node->no_id, conn_conninfo);
}
-
/*
* Receive events from the provider node
*/
@@ -297,11 +295,10 @@
continue;
}
-
/*
- * If the remote node notified for new confirmations,
- * read them and queue them into the remote worker for
- * storage in our local database.
+ * If the remote node notified for new confirmations, read
+ * them and queue them into the remote worker for storage in
+ * our local database.
*/
if (forward_confirm)
{
@@ -321,7 +318,6 @@
}
forward_confirm = false;
}
-
/*
* Wait for notification.
*/
@@ -358,7 +354,6 @@
conn = NULL;
conn_conninfo = NULL;
}
-
remoteListen_cleanup(&listat_head, &listat_tail);
rtcfg_lock();
@@ -372,11 +367,10 @@
}
-/* ----------
- * remoteListen_adjust_listat
+/*
+ * ---------- remoteListen_adjust_listat
*
- * local function to (re)adjust the known nodes to the global
- * configuration.
+ * local function to (re)adjust the known nodes to the global configuration.
* ----------
*/
static void
@@ -425,7 +419,6 @@
DLLIST_REMOVE(*listat_head, *listat_tail, listat);
free(listat);
}
-
listat = linext;
}
@@ -478,11 +471,10 @@
}
-/* ----------
- * remoteListen_cleanup
+/*
+ * ---------- remoteListen_cleanup
*
- * Free resources used by the remoteListen thread
- * ----------
+ * Free resources used by the remoteListen thread ----------
*/
static void
remoteListen_cleanup(struct listat **listat_head, struct listat **listat_tail)
@@ -500,14 +492,13 @@
}
-/* ----------
- * remoteListen_forward_confirm
+/*
+ * ---------- remoteListen_forward_confirm
*
- * Read the last confirmed event sequence for all nodes from the
- * remote database and forward it to the local database so that
- * the cleanup process will know when all nodes have confirmed
- * an event and it can be thrown away (together with its log data).
- * ----------
+ * Read the last confirmed event sequence for all nodes from the remote database
+ * and forward it to the local database so that the cleanup process will know
+ * when all nodes have confirmed an event and it can be thrown away (together
+ * with its log data). ----------
*/
static int
remoteListen_forward_confirm(SlonNode *node, SlonConn *conn)
@@ -542,10 +533,9 @@
PQclear(res);
return -1;
}
-
/*
- * We actually do not do the forwardiing ourself here. We send
- * a special message to the remote worker for that node.
+ * We actually do not do the forwardiing ourself here. We send a
+ * special message to the remote worker for that node.
*/
ntuples = PQntuples(res);
for (tupno = 0; tupno < ntuples; tupno++)
@@ -565,12 +555,11 @@
}
-/* ----------
- * remoteListen_receive_events
+/*
+ * ---------- remoteListen_receive_events
*
- * Retrieve all new events that origin from nodes for which we
- * listen on this node as provider and add them to the node
- * specific worker message queue..
+ * Retrieve all new events that origin from nodes for which we listen on this
+ * node as provider and add them to the node specific worker message queue..
* ----------
*/
static int
@@ -590,15 +579,15 @@
dstring_init(&query);
/*
- * In the runtime configuration info for the node, we remember
- * the last event sequence that we actually have received. If the
- * remote worker thread has processed it yet or not isn't important,
- * we have it in the message queue at least and don't need to
- * select it again.
+ * In the runtime configuration info for the node, we remember the
+ * last event sequence that we actually have received. If the remote
+ * worker thread has processed it yet or not isn't important, we have
+ * it in the message queue at least and don't need to select it
+ * again.
*
- * So the query we construct contains a qualification
- * (ev_origin = <remote_node> and ev_seqno > <last_seqno>) per
- * remote node we're listen for here.
+ * So the query we construct contains a qualification (ev_origin =
+ * <remote_node> and ev_seqno > <last_seqno>) per remote node we're
+ * listen for here.
*/
slon_mkquery(&query,
"select ev_origin, ev_seqno, ev_timestamp, "
@@ -625,7 +614,6 @@
dstring_free(&query);
return -1;
}
-
sprintf(seqno_buf, INT64_FORMAT, origin->last_event);
slon_appendquery(&query,
" %s (e.ev_origin = '%d' and e.ev_seqno > '%s')",
@@ -647,7 +635,6 @@
dstring_free(&query);
return -1;
}
-
time(&timeout);
timeout += 300;
while (PQisBusy(conn->dbconn) != 0)
@@ -726,5 +713,3 @@
return 0;
}
-
-
Index: sync_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/sync_thread.c,v
retrieving revision 1.12
retrieving revision 1.13
diff -Lsrc/slon/sync_thread.c -Lsrc/slon/sync_thread.c -u -w -r1.12 -r1.13
--- src/slon/sync_thread.c
+++ src/slon/sync_thread.c
@@ -28,16 +28,15 @@
#include "slon.h"
-/* ----------
- * Global variables
- * ----------
+/*
+ * ---------- Global variables ----------
*/
int sync_interval;
int sync_interval_timeout;
-/* ----------
- * slon_localSyncThread
+/*
+ * ---------- slon_localSyncThread
*
* Generate SYNC event if local database activity created new log info.
* ----------
@@ -64,15 +63,15 @@
dbconn = conn->dbconn;
/*
- * We don't initialize the last known action sequence to the
- * actual value. This causes that we create a SYNC event
- * allways on startup, just in case.
+ * We don't initialize the last known action sequence to the actual
+ * value. This causes that we create a SYNC event allways on startup,
+ * just in case.
*/
last_actseq_buf[0] = '\0';
/*
- * Build the query that starts a transaction and retrieves
- * the last value from the action sequence.
+ * Build the query that starts a transaction and retrieves the last
+ * value from the action sequence.
*/
dstring_init(&query1);
slon_mkquery(&query1,
@@ -107,7 +106,6 @@
slon_abort();
break;
}
-
/*
* Check if it's identical to the last know seq or if the
* sync interval timeout has arrived.
@@ -120,7 +118,8 @@
{
/*
* Action sequence has changed, generate a SYNC event
- * and read the resulting currval of the event sequence.
+ * and read the resulting currval of the event
+ * sequence.
*/
strcpy(last_actseq_buf, PQgetvalue(res, 0, 0));
@@ -159,8 +158,7 @@
*/
timeout_count = (sync_interval_timeout == 0) ? 0 :
sync_interval_timeout - sync_interval;
- }
- else
+ } else
{
/*
* No database activity detected - rollback.
@@ -186,5 +184,3 @@
slon_log(SLON_DEBUG1, "syncThread: thread done\n");
pthread_exit(NULL);
}
-
-
Index: runtime_config.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/runtime_config.c,v
retrieving revision 1.19
retrieving revision 1.20
diff -Lsrc/slon/runtime_config.c -Lsrc/slon/runtime_config.c -u -w -r1.19 -r1.20
--- src/slon/runtime_config.c
+++ src/slon/runtime_config.c
@@ -29,9 +29,8 @@
#include "slon.h"
-/* ----------
- * Global data
- * ----------
+/*
+ * ---------- Global data ----------
*/
pid_t slon_pid;
char *rtcfg_cluster_name = NULL;
@@ -48,15 +47,15 @@
SlonNode *rtcfg_node_list_tail = NULL;
-/* ----------
- * Local data
- * ----------
+/*
+ * ---------- Local data ----------
*/
static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t cfgseq_lock = PTHREAD_MUTEX_INITIALIZER;
static int64 cfgseq = 0;
-struct to_activate {
+struct to_activate
+{
int no_id;
struct to_activate *prev;
@@ -66,16 +65,14 @@
static struct to_activate *to_activate_tail = NULL;
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
*/
static void rtcfg_startStopNodeThread(SlonNode *node);
-/* ----------
- * rtcfg_lock
- * ----------
+/*
+ * ---------- rtcfg_lock ----------
*/
void
rtcfg_lock(void)
@@ -84,9 +81,8 @@
}
-/* ----------
- * rtcfg_unlock
- * ----------
+/*
+ * ---------- rtcfg_unlock ----------
*/
void
rtcfg_unlock(void)
@@ -95,9 +91,8 @@
}
-/* ----------
- * rtcfg_storeNode
- * ----------
+/*
+ * ---------- rtcfg_storeNode ----------
*/
void
rtcfg_storeNode(int no_id, char *no_comment)
@@ -122,7 +117,6 @@
rtcfg_unlock();
return;
}
-
/*
* Add the new node to our in-memory configuration.
*/
@@ -151,15 +145,13 @@
}
-/* ----------
- * rtcfg_setNodeLastEvent()
+/*
+ * ---------- rtcfg_setNodeLastEvent()
*
* Set the last_event field in the node runtime structure.
*
- * Returns: 0 if the event_seq is <= the known value
- * -1 if the node is not known
- * event_seq otherwise
- * ----------
+ * Returns: 0 if the event_seq is <= the known value -1 if the node is
+ * not known event_seq otherwise ----------
*/
int64
rtcfg_setNodeLastEvent(int no_id, int64 event_seq)
@@ -174,11 +166,9 @@
{
node->last_event = event_seq;
retval = event_seq;
- }
- else
+ } else
retval = 0;
- }
- else
+ } else
retval = -1;
rtcfg_unlock();
@@ -191,11 +181,10 @@
}
-/* ----------
- * rtcfg_getNodeLastEvent
+/*
+ * ---------- rtcfg_getNodeLastEvent
*
- * Read the nodes last_event field
- * ----------
+ * Read the nodes last_event field ----------
*/
int64
rtcfg_getNodeLastEvent(int no_id)
@@ -207,8 +196,7 @@
if ((node = rtcfg_findNode(no_id)) != NULL)
{
retval = node->last_event;
- }
- else
+ } else
retval = -1;
rtcfg_unlock();
@@ -217,9 +205,8 @@
}
-/* ----------
- * rtcfg_enableNode
- * ----------
+/*
+ * ---------- rtcfg_enableNode ----------
*/
void
rtcfg_enableNode(int no_id)
@@ -238,7 +225,6 @@
slon_abort();
return;
}
-
/*
* Activate the node
*/
@@ -253,9 +239,8 @@
}
-/* ----------
- * slon_disableNode
- * ----------
+/*
+ * ---------- slon_disableNode ----------
*/
void
rtcfg_disableNode(int no_id)
@@ -274,7 +259,6 @@
slon_abort();
return;
}
-
/*
* Deactivate the node
*/
@@ -286,14 +270,13 @@
rtcfg_seq_bump();
/*
- rtcfg_startStopNodeThread(node);
+ * rtcfg_startStopNodeThread(node);
*/
}
-/* ----------
- * rtcfg_findNode
- * ----------
+/*
+ * ---------- rtcfg_findNode ----------
*/
SlonNode *
rtcfg_findNode(int no_id)
@@ -310,9 +293,8 @@
}
-/* ----------
- * rtcfg_storePath
- * ----------
+/*
+ * ---------- rtcfg_storePath ----------
*/
void
rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry)
@@ -334,7 +316,6 @@
rtcfg_lock();
node = rtcfg_findNode(pa_server);
}
-
/*
* Store the (new) conninfo to the node
*/
@@ -357,9 +338,8 @@
}
-/* ----------
- * rtcfg_dropPath
- * ----------
+/*
+ * ---------- rtcfg_dropPath ----------
*/
void
rtcfg_dropPath(int pa_server)
@@ -380,11 +360,9 @@
return;
}
-
/*
- * Drop all listen information as well
- * at this provider. Without a path we
- * cannot listen.
+ * Drop all listen information as well at this provider. Without a
+ * path we cannot listen.
*/
while (node->listen_head != NULL)
{
@@ -415,9 +393,8 @@
}
-/* ----------
- * rtcfg_storeListen
- * ----------
+/*
+ * ---------- rtcfg_storeListen ----------
*/
void
rtcfg_storeListen(int li_origin, int li_provider)
@@ -435,10 +412,9 @@
slon_abort();
return;
}
-
/*
- * Check if we already listen for events from that origin
- * at this provider.
+ * Check if we already listen for events from that origin at this
+ * provider.
*/
for (listen = node->listen_head; listen; listen = listen->next)
{
@@ -481,9 +457,8 @@
}
-/* ----------
- * rtcfg_dropListen
- * ----------
+/*
+ * ---------- rtcfg_dropListen ----------
*/
void
rtcfg_dropListen(int li_origin, int li_provider)
@@ -501,10 +476,8 @@
slon_abort();
return;
}
-
/*
- * Find that listen entry
- * at this provider.
+ * Find that listen entry at this provider.
*/
for (listen = node->listen_head; listen; listen = listen->next)
{
@@ -713,7 +686,8 @@
rtcfg_unlock();
rtcfg_seq_bump();
/*
- * Wakeup the worker threads for the old and new provider
+ * Wakeup the worker threads for the old and new
+ * provider
*/
if (old_provider >= 0 && old_provider != sub_provider)
sched_wakeup_node(old_provider);
@@ -797,7 +771,8 @@
rtcfg_unlock();
rtcfg_seq_bump();
/*
- * Wakeup the worker threads for the old and new provider
+ * Wakeup the worker threads for the old and new
+ * provider
*/
if (old_provider >= 0)
sched_wakeup_node(old_provider);
@@ -812,9 +787,8 @@
}
-/* ----------
- * rtcfg_startStopNodeThread
- * ----------
+/*
+ * ---------- rtcfg_startStopNodeThread ----------
*/
static void
rtcfg_startStopNodeThread(SlonNode *node)
@@ -851,8 +825,7 @@
default:
printf("TODO: ********** rtcfg_startStopNodeThread: restart node worker\n");
}
- }
- else
+ } else
{
/*
* Make sure there is no node worker
@@ -916,8 +889,7 @@
node->listen_status = SLON_TSTAT_NONE;
break;
}
- }
- else
+ } else
{
/*
* Node specific listen thread not required
@@ -1063,6 +1035,3 @@
return retval;
}
-
-
-
Index: misc.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/misc.c,v
retrieving revision 1.12
retrieving revision 1.13
diff -Lsrc/slon/misc.c -Lsrc/slon/misc.c -u -w -r1.12 -r1.13
--- src/slon/misc.c
+++ src/slon/misc.c
@@ -24,6 +24,9 @@
#include <sys/time.h>
#include <sys/types.h>
+#include <syslog.h>
+#include <stdarg.h>
+
#include "libpq-fe.h"
#include "c.h"
@@ -38,6 +41,30 @@
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
+#ifdef HAVE_SYSLOG
+/*
+ * 0 = only stdout/stderr
+ * 1 = stdout+stderr and syslog
+ * 2 = syslog only
+ * ... in theory anyway
+ */
+
+#ifndef SLON_SYSLOG_LIMIT
+#define SLON_SYSLOG_LIMIT 128
+#endif
+
+extern int Use_syslog;
+extern char *Syslog_facility; /* openlog() parameters */
+extern char *Syslog_ident;
+
+static void write_syslog(int level, const char *line);
+
+#else
+
+#define Use_syslog 0
+
+#endif /* HAVE_SYSLOG */
+
void
slon_log(SlonLogLevel level, char *fmt, ...)
@@ -51,28 +78,56 @@
char time_buf[128];
time_t stamp_time = time(NULL);
+#ifdef HAVE_SYSLOG
+ int syslog_level = LOG_ERR;
+#endif
if (level > slon_log_level)
return;
switch (level)
{
- case SLON_DEBUG4: level_c = "DEBUG4";
+ case SLON_DEBUG4:
+ level_c = "DEBUG4";
break;
- case SLON_DEBUG3: level_c = "DEBUG3";
+ case SLON_DEBUG3:
+ level_c = "DEBUG3";
break;
- case SLON_DEBUG2: level_c = "DEBUG2";
+ case SLON_DEBUG2:
+ level_c = "DEBUG2";
break;
- case SLON_DEBUG1: level_c = "DEBUG1";
+ case SLON_DEBUG1:
+ level_c = "DEBUG1";
+#ifdef HAVE_SYSLOG
+ syslog_level = LOG_DEBUG;
+#endif
break;
- case SLON_INFO: level_c = "INFO";
+ case SLON_INFO:
+ level_c = "INFO";
+#ifdef HAVE_SYSLOG
+ syslog_level = LOG_INFO;
+#endif
break;
- case SLON_CONFIG: level_c = "CONFIG";
+ case SLON_CONFIG:
+ level_c = "CONFIG";
break;
- case SLON_WARN: level_c = "WARN";
+ case SLON_WARN:
+ level_c = "WARN";
+#ifdef HAVE_SYSLOG
+ syslog_level = LOG_WARNING;
+#endif
break;
- case SLON_ERROR: level_c = "ERROR";
+ case SLON_ERROR:
+ level_c = "ERROR";
+#ifdef HAVE_SYSLOG
+ syslog_level = LOG_ERR;
+#endif
break;
- case SLON_FATAL: level_c = "FATAL";
+ case SLON_FATAL:
+ level_c = "FATAL";
+#ifdef HAVE_SYSLOG
+ syslog_level = LOG_ERR;
+#endif
+
break;
}
@@ -90,15 +145,13 @@
slon_abort();
}
}
-
sprintf(outbuf, "");
- if (logtimestamp == true)
+ if (logtimestamp == true && (Use_syslog != 1))
{
strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S %Z", localtime(&stamp_time));
sprintf(outbuf, "%s ", time_buf);
}
-
if (logpid == true)
{
sprintf(outbuf, "%s[%d] ", outbuf, slon_pid);
@@ -117,7 +170,12 @@
slon_abort();
}
}
-
+#ifdef HAVE_SYSLOG
+ if (Use_syslog >= 1)
+ {
+ write_syslog(syslog_level,outbuf);
+ }
+#endif
fwrite(outbuf, strlen(outbuf), 1, stdout);
fflush(stdout);
pthread_mutex_unlock(&log_mutex);
@@ -129,8 +187,8 @@
/*
* scanint8 --- try to parse a string into an int8.
*
- * If errorOK is false, ereport a useful error message if the string is bad.
- * If errorOK is true, just return "false" for bad input.
+ * If errorOK is false, ereport a useful error message if the string is bad. If
+ * errorOK is true, just return "false" for bad input.
*/
int
slon_scanint64(char *str, int64 *result)
@@ -155,9 +213,9 @@
sign = -1;
/*
- * Do an explicit check for INT64_MIN. Ugly though this is, it's
- * cleaner than trying to get the loop below to handle it
- * portably.
+ * Do an explicit check for INT64_MIN. Ugly though this is,
+ * it's cleaner than trying to get the loop below to handle
+ * it portably.
*/
#ifndef INT64_IS_BUSTED
if (strcmp(ptr, "9223372036854775808") == 0)
@@ -166,8 +224,7 @@
return true;
}
#endif
- }
- else if (*ptr == '+')
+ } else if (*ptr == '+')
ptr++;
/* require at least one digit */
@@ -193,3 +250,106 @@
return true;
}
+#if HAVE_SYSLOG
+static void
+write_syslog(int level, const char *line)
+{
+ static bool openlog_done = false;
+ static unsigned long seq = 0;
+ static int syslog_fac = LOG_LOCAL0;
+
+ int len = strlen(line);
+
+ if (Use_syslog == 0)
+ return;
+
+ if (!openlog_done)
+ {
+ if (strcasecmp(Syslog_facility, "LOCAL0") == 0)
+ syslog_fac = LOG_LOCAL0;
+ if (strcasecmp(Syslog_facility, "LOCAL1") == 0)
+ syslog_fac = LOG_LOCAL1;
+ if (strcasecmp(Syslog_facility, "LOCAL2") == 0)
+ syslog_fac = LOG_LOCAL2;
+ if (strcasecmp(Syslog_facility, "LOCAL3") == 0)
+ syslog_fac = LOG_LOCAL3;
+ if (strcasecmp(Syslog_facility, "LOCAL4") == 0)
+ syslog_fac = LOG_LOCAL4;
+ if (strcasecmp(Syslog_facility, "LOCAL5") == 0)
+ syslog_fac = LOG_LOCAL5;
+ if (strcasecmp(Syslog_facility, "LOCAL6") == 0)
+ syslog_fac = LOG_LOCAL6;
+ if (strcasecmp(Syslog_facility, "LOCAL7") == 0)
+ syslog_fac = LOG_LOCAL7;
+ openlog(Syslog_ident, LOG_PID | LOG_NDELAY, syslog_fac);
+ openlog_done = true;
+ }
+
+ /*
+ * We add a sequence number to each log message to suppress "same"
+ * messages.
+ */
+ seq++;
+
+ /* divide into multiple syslog() calls if message is too long */
+ /* or if the message contains embedded NewLine(s) '\n' */
+ if (len > SLON_SYSLOG_LIMIT || strchr(line, '\n') != NULL)
+ {
+ int chunk_nr = 0;
+
+ while (len > 0)
+ {
+ char buf[SLON_SYSLOG_LIMIT + 1];
+ int buflen;
+ int i;
+
+ /* if we start at a newline, move ahead one char */
+ if (line[0] == '\n')
+ {
+ line++;
+ len--;
+ continue;
+ }
+
+ strncpy(buf, line, SLON_SYSLOG_LIMIT);
+ buf[SLON_SYSLOG_LIMIT] = '\0';
+ if (strchr(buf, '\n') != NULL)
+ *strchr(buf, '\n') = '\0';
+
+ buflen = strlen(buf);
+
+ if (buflen <= 0)
+ return;
+ buf[buflen] = '\0';
+
+ /* already word boundary? */
+ if (!isspace((unsigned char) line[buflen]) &&
+ line[buflen] != '\0')
+ {
+ /* try to divide at word boundary */
+ i = buflen - 1;
+ while (i > 0 && !isspace((unsigned char) buf[i]))
+ i--;
+
+ if (i > 0) /* else couldn't divide word boundary */
+ {
+ buflen = i;
+ buf[i] = '\0';
+ }
+ }
+
+ chunk_nr++;
+
+ syslog(level, "[%lu-%d] %s", seq, chunk_nr, buf);
+ line += buflen;
+ len -= buflen;
+ }
+ }
+ else
+ {
+ /* message short enough */
+ syslog(level, "[%lu] %s", seq, line);
+ }
+}
+#endif /* HAVE_SYSLOG */
+
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.16
retrieving revision 1.17
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.16 -r1.17
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -28,18 +28,16 @@
#include "slon.h"
-/* ----------
- * Global data
- * ----------
+/*
+ * ---------- Global data ----------
*/
int vac_frequency = SLON_VACUUM_FREQUENCY;
-/* ----------
- * cleanupThread_main
+/*
+ * ---------- cleanupThread_main
*
- * Periodically calls the stored procedure to remove old events
- * and log data and vacuums those tables.
- * ----------
+ * Periodically calls the stored procedure to remove old events and log data and
+ * vacuums those tables. ----------
*/
void *
cleanupThread_main(void *dummy)
@@ -70,8 +68,8 @@
dbconn = conn->dbconn;
/*
- * Build the query string for calling the cleanupEvent()
- * stored procedure
+ * Build the query string for calling the cleanupEvent() stored
+ * procedure
*/
dstring_init(&query1);
slon_mkquery(&query1, "select %s.cleanupEvent(); ", rtcfg_namespace);
@@ -231,5 +229,3 @@
slon_log(SLON_DEBUG1, "cleanupThread: thread done\n");
pthread_exit(NULL);
}
-
-
Index: confoptions.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slon/confoptions.c -Lsrc/slon/confoptions.c -u -w -r1.1 -r1.2
--- src/slon/confoptions.c
+++ src/slon/confoptions.c
@@ -21,7 +21,8 @@
-void build_conf_variables(void)
+void
+build_conf_variables(void)
{
int size_vars;
int num_vars=0;
@@ -32,7 +33,10 @@
{
struct config_bool *conf = &ConfigureNamesBool[i];
- /* Rather than requiring vartype to be filled in by hand, do this: */
+ /*
+ * Rather than requiring vartype to be filled in by hand, do
+ * this:
+ */
conf->gen.vartype = SLON_C_BOOL;
num_vars++;
}
@@ -66,7 +70,6 @@
slon_log(FATAL, "malloc failed");
return;
}
-
num_vars = 0;
for (i = 0; ConfigureNamesBool[i].gen.name; i++)
@@ -113,8 +116,7 @@
size_vars = 100;
conf_vars = (struct config_generic **)
malloc(size_vars * sizeof(struct config_generic *));
- }
- else
+ } else
{
conf_vars = (struct config_generic **)
realloc(conf_variables, size_vars * sizeof(struct config_generic *));
@@ -125,7 +127,6 @@
slon_log(elevel,"malloc failed");
return false; /* out of memory */
}
-
conf_variables = conf_vars;
size_conf_variables = size_vars;
}
@@ -135,7 +136,8 @@
return true;
}
-void InitializeConfOptions(void)
+void
+InitializeConfOptions(void)
{
int i;
char *env;
@@ -171,8 +173,11 @@
char *str;
struct config_int *conf = (struct config_int *) gconf;
*conf->variable = NULL;
+ if (conf->default_val)
+ {
str = strdup(conf->default_val);
*conf->variable = str;
+ }
break;
}
}
@@ -196,57 +201,49 @@
{
*result = true;
}
- }
- else if (strncasecmp(value, "false", len) == 0)
+ } else if (strncasecmp(value, "false", len) == 0)
{
if (result)
{
*result = false;
}
- }
- else if (strncasecmp(value, "yes", len) == 0)
+ } else if (strncasecmp(value, "yes", len) == 0)
{
if (result)
{
*result = true;
}
- }
- else if (strncasecmp(value, "no", len) == 0)
+ } else if (strncasecmp(value, "no", len) == 0)
{
if (result)
{
*result = false;
}
- }
- else if (strncasecmp(value, "on", len) == 0)
+ } else if (strncasecmp(value, "on", len) == 0)
{
if (result)
{
*result = true;
}
- }
- else if (strncasecmp(value, "off", len) == 0)
+ } else if (strncasecmp(value, "off", len) == 0)
{
if (result)
{
*result = false;
}
- }
- else if (strncasecmp(value, "1", len) == 0)
+ } else if (strncasecmp(value, "1", len) == 0)
{
if (result)
{
*result = true;
}
- }
- else if (strncasecmp(value, "o", len) == 0)
+ } else if (strncasecmp(value, "o", len) == 0)
{
if (result)
{
*result = false;
}
- }
- else
+ } else
{
return false;
}
@@ -254,10 +251,9 @@
}
/*
- * Try to parse value as an integer. The accepted formats are the
- * usual decimal, octal, or hexadecimal formats. If the string parses
- * okay, return true, else false. If result is not NULL, return the
- * value there.
+ * Try to parse value as an integer. The accepted formats are the usual
+ * decimal, octal, or hexadecimal formats. If the string parses okay, return
+ * true, else false. If result is not NULL, return the value there.
*/
static bool
parse_int(const char *value, int *result)
@@ -281,9 +277,9 @@
}
/*
- * Try to parse value as a floating point constant in the usual
- * format. If the value parsed okay return true, else false. If
- * result is not NULL, return the semantic value there.
+ * Try to parse value as a floating point constant in the usual format.
+ * If the value parsed okay return true, else false. If result is not NULL,
+ * return the semantic value there.
*/
static bool
parse_real(const char *value, double *result)
@@ -301,7 +297,8 @@
}
-static struct config_generic *find_option(const char *name, int elevel)
+static struct config_generic *
+find_option(const char *name, int elevel)
{
const char *dot;
const char **key = &name;
@@ -355,7 +352,8 @@
}
-bool set_config_option(const char *name, const char *value)
+bool
+set_config_option(const char *name, const char *value)
{
struct config_generic *record;
int elevel;
@@ -369,7 +367,6 @@
slon_log(elevel, "unrecognized configuration parameter \"%s\"\n", name);
return false;
}
-
switch (record->vartype)
{
case SLON_C_BOOL:
@@ -383,8 +380,7 @@
slon_log(elevel, "parameter \"%s\" requires a Boolean value\n", name);
return false;
}
- }
- else
+ } else
{
slon_log(elevel, "parameter \"%s\"\n", name );
}
@@ -410,8 +406,7 @@
newval, name, conf->min, conf->max);
return false;
}
- }
- else
+ } else
{
slon_log(elevel, "parameter \"%s\"\n", name );
}
@@ -436,8 +431,7 @@
newval, name, conf->min, conf->max);
return false;
}
- }
- else
+ } else
{
slon_log(elevel, "parameter \"%s\"\n", name );
}
@@ -456,9 +450,7 @@
{
return false;
}
-
- }
- else
+ } else
{
slon_log(elevel, "parameter \"%s\"\n", name );
free(newval);
@@ -469,4 +461,3 @@
}
return true;
}
-
Index: dbutils.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/dbutils.c,v
retrieving revision 1.11
retrieving revision 1.12
diff -Lsrc/slon/dbutils.c -Lsrc/slon/dbutils.c -u -w -r1.11 -r1.12
--- src/slon/dbutils.c
+++ src/slon/dbutils.c
@@ -32,21 +32,19 @@
static int slon_appendquery_int(SlonDString *dsp, char *fmt, va_list ap);
/*
- * This mutex is used to wrap around PQconnectdb. There's a problem
- * that occurs when your libpq is compiled with libkrb (kerberos)
- * which is not threadsafe. It is especially odd because I'm not using
- * kerberos.
+ * This mutex is used to wrap around PQconnectdb. There's a problem that
+ * occurs when your libpq is compiled with libkrb (kerberos) which is not
+ * threadsafe. It is especially odd because I'm not using kerberos.
*
- * This is fixed in libpq in 8.0, but for now (and for older versions
- * we'll just use this mutex.
+ * This is fixed in libpq in 8.0, but for now (and for older versions we'll just
+ * use this mutex.
*
*/
static pthread_mutex_t slon_connect_lock = PTHREAD_MUTEX_INITIALIZER;
-/* ----------
- * slon_connectdb
- * ----------
+/*
+ * ---------- slon_connectdb ----------
*/
SlonConn *
slon_connectdb(char *conninfo, char *symname)
@@ -75,10 +73,9 @@
PQfinish(dbconn);
return NULL;
}
-
/*
- * Embed it into a SlonConn structure used to exchange it with
- * the scheduler. On return this new connection object is locked.
+ * Embed it into a SlonConn structure used to exchange it with the
+ * scheduler. On return this new connection object is locked.
*/
conn = slon_make_dummyconn(symname);
conn->dbconn = dbconn;
@@ -87,9 +84,8 @@
}
-/* ----------
- * slon_disconnectdb
- * ----------
+/*
+ * ---------- slon_disconnectdb ----------
*/
void
slon_disconnectdb(SlonConn *conn)
@@ -103,16 +99,15 @@
#endif
/*
- * Unlock and destroy the condition and mutex variables
- * and free memory.
+ * Unlock and destroy the condition and mutex variables and free
+ * memory.
*/
slon_free_dummyconn(conn);
}
-/* ----------
- * slon_make_dummyconn
- * ----------
+/*
+ * ---------- slon_make_dummyconn ----------
*/
SlonConn *
slon_make_dummyconn(char *symname)
@@ -142,9 +137,8 @@
}
-/* ----------
- * slon_free_dummyconn
- * ----------
+/*
+ * ---------- slon_free_dummyconn ----------
*/
void
slon_free_dummyconn(SlonConn *conn)
@@ -167,11 +161,10 @@
}
-/* ----------
- * db_getLocalNodeId
+/*
+ * ---------- db_getLocalNodeId
*
- * Query a connection for the value of sequence sl_local_node_id
- * ----------
+ * Query a connection for the value of sequence sl_local_node_id ----------
*/
int
db_getLocalNodeId(PGconn *conn)
@@ -202,7 +195,6 @@
PQclear(res);
return -1;
}
-
/*
* Return the result as an integer value
*/
@@ -213,16 +205,13 @@
}
-/* ----------
- * slon_mkquery
+/*
+ * ---------- slon_mkquery
*
- * A simple query formatting and quoting function using dynamic string
- * buffer allocation.
- * Similar to sprintf() it uses formatting symbols:
- * %s String argument
- * %q Quoted literal (\ and ' will be escaped)
- * %d Integer argument
- * ----------
+ * A simple query formatting and quoting function using dynamic string buffer
+ * allocation. Similar to sprintf() it uses formatting symbols: %s
+ * tring argument %q Quoted literal (\ and ' will be escaped) %d
+ * nteger argument ----------
*/
int
slon_mkquery(SlonDString *dsp, char *fmt, ...)
@@ -241,11 +230,10 @@
}
-/* ----------
- * slon_appendquery
+/*
+ * ---------- slon_appendquery
*
- * Append query string material to an existing dynamic string.
- * ----------
+ * Append query string material to an existing dynamic string. ----------
*/
int
slon_appendquery(SlonDString *dsp, char *fmt, ...)
@@ -262,11 +250,10 @@
}
-/* ----------
- * slon_appendquery_int
+/*
+ * ---------- slon_appendquery_int
*
- * Implementation of slon_mkquery() and slon_appendquery().
- * ----------
+ * Implementation of slon_mkquery() and slon_appendquery(). ----------
*/
static int
slon_appendquery_int(SlonDString *dsp, char *fmt, va_list ap)
@@ -282,12 +269,14 @@
fmt++;
switch(*fmt)
{
- case 's': s = va_arg(ap, char *);
+ case 's':
+ s = va_arg(ap, char *);
dstring_append(dsp, s);
fmt++;
break;
- case 'q': s = va_arg(ap, char *);
+ case 'q':
+ s = va_arg(ap, char *);
while (s && *s != '\0')
{
switch (*s)
@@ -307,24 +296,28 @@
fmt++;
break;
- case 'd': sprintf(buf, "%d", va_arg(ap, int));
+ case 'd':
+ sprintf(buf, "%d", va_arg(ap, int));
dstring_append(dsp, buf);
fmt++;
break;
- default: dstring_addchar(dsp, '%');
+ default:
+ dstring_addchar(dsp, '%');
dstring_addchar(dsp, *fmt);
fmt++;
break;
}
break;
- case '\\': fmt++;
+ case '\\':
+ fmt++;
dstring_addchar(dsp, *fmt);
fmt++;
break;
- default: dstring_addchar(dsp, *fmt);
+ default:
+ dstring_addchar(dsp, *fmt);
fmt++;
break;
}
@@ -334,5 +327,3 @@
return 0;
}
-
-
Index: slon.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v
retrieving revision 1.29
retrieving revision 1.30
diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.29 -r1.30
--- src/slon/slon.c
+++ src/slon/slon.c
@@ -29,9 +29,8 @@
#include "slon.h"
#include "confoptions.h"
-/* ----------
- * Global data
- * ----------
+/*
+ * ---------- Global data ----------
*/
int slon_restart_request = false;
@@ -39,9 +38,8 @@
pthread_cond_t slon_wait_listen_cond = PTHREAD_COND_INITIALIZER;
-/* ----------
- * Local data
- * ----------
+/*
+ * ---------- Local data ----------
*/
static pthread_t local_event_thread;
static pthread_t local_cleanup_thread;
@@ -53,11 +51,10 @@
static void sigalrmhandler(int signo);
int slon_log_level;
+char *pid_file;
-
-/* ----------
- * main
- * ----------
+/*
+ * ---------- main ----------
*/
int
main (int argc, char *const argv[])
@@ -78,7 +75,7 @@
InitializeConfOptions();
- while ((c = getopt(argc, argv, "f:d:s:t:g:c:f:h")) != EOF)
+ while ((c = getopt(argc, argv, "f:d:s:t:g:c:p:h")) != EOF)
{
switch(c)
{
@@ -105,12 +102,16 @@
case 'c':
set_config_option("vac_frequency", optarg);
break;
+ case 'p':
+ set_config_option("pid_file", optarg);
+ break;
case 'h':
errors++;
break;
- default: fprintf(stderr, "unknown option '%c'\n", c);
+ default:
+ fprintf(stderr, "unknown option '%c'\n", c);
errors++;
break;
}
@@ -140,10 +141,10 @@
fprintf(stderr, " -t <milliseconds> SYNC interval timeout (default 60000)\n");
fprintf(stderr, " -g <num> maximum SYNC group size (default 6)\n");
fprintf(stderr, " -c <num> how often to vaccum in cleanup cycles\n");
+ fprintf(stderr, " -p <filename> slon pid file\n");
fprintf(stderr, " -f <filename> slon configuration file\n");
return 1;
}
-
/*
* Make sure the sync interval isn't too small.
*/
@@ -151,8 +152,8 @@
sync_interval_timeout = sync_interval * 2;
/*
- * Remember the cluster name and build the properly quoted
- * namespace identifier
+ * Remember the cluster name and build the properly quoted namespace
+ * identifier
*/
slon_pid = getpid();
rtcfg_cluster_name = (char *)argv[optind];
@@ -175,7 +176,8 @@
rtcfg_conninfo = (char *)argv[++optind];
/*
- * Connect to the local database for reading the initial configuration
+ * Connect to the local database for reading the initial
+ * configuration
*/
startup_conn = PQconnectdb(rtcfg_conninfo);
if (startup_conn == NULL)
@@ -190,7 +192,6 @@
PQfinish(startup_conn);
slon_exit(-1);
}
-
/*
* Get our local node ID
*/
@@ -202,6 +203,23 @@
}
slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid);
+ if (pid_file)
+ {
+ FILE *pidfile;
+
+ pidfile=fopen(pid_file,"w");
+ if(pidfile)
+ {
+ fprintf(pidfile,"%d",slon_pid);
+ fclose(pidfile);
+ }
+ else
+ {
+ slon_log(SLON_WARN, "Cannot open pid_file \"%s\", pid_file\n");
+ }
+ }
+
+
/*
* Start the event scheduling system
*/
@@ -260,8 +278,7 @@
*/
rtcfg_nodeactive = no_active;
rtcfg_nodecomment = strdup(no_comment);
- }
- else
+ } else
{
/*
* Add a remote node
@@ -271,8 +288,8 @@
rtcfg_setNodeLastEvent(no_id, last_event);
/*
- * If it is active, remember for activation just before
- * we start processing events.
+ * If it is active, remember for activation just
+ * before we start processing events.
*/
if (no_active)
rtcfg_needActivate(no_id);
@@ -405,8 +422,7 @@
}
if (PQntuples(res) == 0)
strcpy(rtcfg_lastevent, "-1");
- else
- if (PQgetisnull(res, 0, 0))
+ else if (PQgetisnull(res, 0, 0))
strcpy(rtcfg_lastevent, "-1");
else
strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0));
@@ -437,11 +453,10 @@
slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n");
/*
- * Create the local event thread that is monitoring
- * the local node for administrative events to adjust the
- * configuration at runtime.
- * We wait here until the local listen thread has checked that
- * there is no other slon daemon running.
+ * Create the local event thread that is monitoring the local node
+ * for administrative events to adjust the configuration at runtime.
+ * We wait here until the local listen thread has checked that there
+ * is no other slon daemon running.
*/
pthread_mutex_lock(&slon_wait_listen_lock);
if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0)
@@ -459,8 +474,8 @@
rtcfg_doActivate();
/*
- * Create the local cleanup thread that will remove old
- * events and log data.
+ * Create the local cleanup thread that will remove old events and
+ * log data.
*/
if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0)
{
@@ -468,10 +483,9 @@
strerror(errno));
slon_abort();
}
-
/*
- * Create the local sync thread that will generate SYNC
- * events if we had local database updates.
+ * Create the local sync thread that will generate SYNC events if we
+ * had local database updates.
*/
if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0)
{
@@ -479,7 +493,6 @@
strerror(errno));
slon_abort();
}
-
/*
* Wait until the scheduler has shut down all remote connections
*/
@@ -526,7 +539,6 @@
"main: cannot restart via execvp(): %s\n", strerror(errno));
exit(-1);
}
-
/*
* That's it.
*/
@@ -538,6 +550,10 @@
void
slon_exit(int code)
{
+ if (pid_file)
+ {
+ unlink(pid_file);
+ }
exit(code);
}
@@ -558,9 +574,5 @@
}
exit(-1);
}
-
pthread_kill(main_thread, SIGALRM);
}
-
-
-
- Previous message: [Slony1-commit] By darcyb: Ok it's finaly here, a slon config file.
- Next message: [Slony1-commit] By darcyb: Opps copy/paste function prototypeing is bad mojo, fix
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list