CVS User Account cvsuser
Fri Sep 24 23:12:44 PDT 2004
Log Message:
-----------
add syslog support, pid file writing and other bells

Modified Files:
--------------
    slony1-engine/src/slon:
        cleanup_thread.c (r1.16 -> r1.17)
        confoptions.c (r1.1 -> r1.2)
        confoptions.h (r1.1 -> r1.2)
        dbutils.c (r1.11 -> r1.12)
        misc.c (r1.12 -> r1.13)
        remote_listen.c (r1.15 -> r1.16)
        remote_worker.c (r1.61 -> r1.62)
        runtime_config.c (r1.19 -> r1.20)
        scheduler.c (r1.15 -> r1.16)
        slon.c (r1.29 -> r1.30)
        sync_thread.c (r1.12 -> r1.13)

-------------- next part --------------
Index: confoptions.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.1 -r1.2
--- src/slon/confoptions.h
+++ src/slon/confoptions.h
@@ -11,6 +11,8 @@
 extern double real_placeholder;
 extern char *string_placeholder;
 
+extern char *pid_file;
+
 
 extern int vac_frequency;
 extern int slon_log_level;
@@ -19,6 +21,9 @@
 
 extern int sync_group_maxsize;
 
+char *Syslog_ident;
+char *Syslog_facility;
+int Use_syslog;
 bool logpid;
 bool logtimestamp;
 
@@ -145,7 +150,23 @@
 		0,
 		100
 	},
-
+#ifdef HAVE_SYSLOG
+	{
+		{
+			(const char *)"syslog",
+			gettext_noop("Uses syslog for logging."),
+			gettext_noop("If this parameter is 1, messages go both to syslog "
+					"and the standard output. A value of 2 sends output only to syslog. "
+					"(Some messages will still go to the standard output/error.) The "
+					"default is 0, which means syslog is off."),
+			SLON_C_INT
+		},
+		&Use_syslog,
+		0,
+		0,
+		2
+	},
+#endif
         NULL
 };
 static struct config_int ConfigureNamesBool[] =
@@ -202,6 +223,39 @@
                 &string_placeholder,                                                                    /* var_name */
                 "default"                                                                               /* default value */
         },
+	{
+		{
+			(const char *)"pid_file",
+			gettext_noop("Where to write the pid file"),
+			NULL,
+			SLON_C_STRING
+		},
+		&pid_file,
+		NULL
+	},
+#ifdef HAVE_SYSLOG
+	{
+		{
+			(const char *)"syslog_facility",
+			gettext_noop("Sets the syslog \"facility\" to be used when syslog enabled."),
+			gettext_noop("Valid values are LOCAL0, LOCAL1, LOCAL2, LOCAL3, "
+				"LOCAL4, LOCAL5, LOCAL6, LOCAL7."),
+			SLON_C_STRING
+		},
+		&Syslog_ident,
+		"LOCAL0"
+	},
+	{
+		{
+			(const char *)"syslog_ident",
+			gettext_noop("Sets the program name used to identify slon messages in syslog."),
+			NULL,
+			SLON_C_STRING
+		},
+		&Syslog_ident,
+		"slon"
+	},
+#endif
 	NULL
 };
 
Index: scheduler.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/scheduler.c,v
retrieving revision 1.15
retrieving revision 1.16
diff -Lsrc/slon/scheduler.c -Lsrc/slon/scheduler.c -u -w -r1.15 -r1.16
--- src/slon/scheduler.c
+++ src/slon/scheduler.c
@@ -37,9 +37,8 @@
 #endif
 
 
-/* ----------
- * Static data
- * ----------
+/*
+ * ---------- Static data ----------
  */
 static int				sched_status = SCHED_STATUS_OK;
 
@@ -59,9 +58,8 @@
 static sigset_t			sched_sigset;
 
 
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
  */
 static void			   *sched_mainloop(void *);
 static void				sched_sighandler(int signo);
@@ -70,14 +68,13 @@
 static void				sched_remove_fdset(int fd, fd_set *fds);
 
 
-/* ----------
- * sched_start_mainloop
+/*
+ * ---------- sched_start_mainloop
  *
  *	Called from main() before starting up any worker thread.
  *
- *	This will spawn the event scheduling thread that does the
- *	central select(2) system call.
- * ----------
+ * This will spawn the event scheduling thread that does the central select(2)
+ * system call. ----------
  */
 int
 sched_start_mainloop(void)
@@ -88,9 +85,9 @@
 	sched_main_thread = pthread_self();
 
 	/*
-	 * Block signals. Since sched_start_mainloop() is called before
-	 * any other thread is created, this will be inherited by all
-	 * threads in the system. 
+	 * Block signals. Since sched_start_mainloop() is called before any
+	 * other thread is created, this will be inherited by all threads in
+	 * the system.
 	 */
 	sigemptyset(&sched_sigset);
 	sigaddset(&sched_sigset, SIGHUP);
@@ -106,7 +103,6 @@
 		perror("sched_start_mainloop: pthread_mutex_lock()");
 		return -1;
 	}
-
 	/*
 	 * Start the scheduler thread
 	 */
@@ -115,7 +111,6 @@
 		perror("sched_start_mainloop: pthread_create()");
 		return -1;
 	}
-
 	/*
 	 * When the scheduler is ready, he'll signal the scheduler cond
 	 */
@@ -124,7 +119,6 @@
 		perror("sched_start_mainloop: pthread_cond_wait()");
 		return -1;
 	}
-
 	/*
 	 * Release the scheduler lock
 	 */
@@ -133,7 +127,6 @@
 		perror("sched_start_mainloop: pthread_mutex_unlock()");
 		return -1;
 	}
-
 	/*
 	 * Check for errors
 	 */
@@ -147,13 +140,12 @@
 }
 
 
-/* ----------
- * sched_wait_mainloop
+/*
+ * ---------- sched_wait_mainloop
  *
- *	Called from main() after all working threads according to the
- *	initial configuration are started. Will wait until the scheduler
- *	mainloop terminates.
- * ----------
+ * Called from main() after all working threads according to the initial
+ * configuration are started. Will wait until the scheduler mainloop
+ * terminates. ----------
  */
 int
 sched_wait_mainloop(void)
@@ -174,11 +166,13 @@
 
 	switch (signo)
 	{
-		case SIGHUP:	sched_sighuphandler(signo);
+	case SIGHUP:
+		sched_sighuphandler(signo);
 						break;
 
 		case SIGINT:
-		case SIGTERM:	sched_sighandler(signo);
+	case SIGTERM:
+		sched_sighandler(signo);
 						break;
 	}
 
@@ -190,19 +184,17 @@
 		perror("sched_wait_mainloop: pthread_join()");
 		return -1;
 	}
-
 	return 0;
 }
 
 
-/* ----------
- * sched_wait_conn
+/*
+ * ---------- sched_wait_conn
  *
  *	Assumes that the thread holds the lock on conn->conn_lock.
  *
- *	Adds the connection to the central wait queue and wakes up
- *	the scheduler thread to reloop onto the select(2) call.
- * ----------
+ * Adds the connection to the central wait queue and wakes up the scheduler
+ * thread to reloop onto the select(2) call. ----------
  */
 int
 sched_wait_conn(SlonConn *conn, int condition)
@@ -218,7 +210,6 @@
 		pthread_mutex_unlock(&sched_master_lock);
 		return -1;
 	}
-
 	/*
 	 * Remember the event we're waiting for and add the database 
 	 * connection to the fdset(s)
@@ -235,8 +226,8 @@
 	DLLIST_ADD_HEAD(sched_waitqueue_head, sched_waitqueue_tail, conn);
 
 	/*
-	 * Give the scheduler thread a heads up, release the master lock
-	 * and wait for it to tell us that the event we're waiting for happened.
+	 * Give the scheduler thread a heads up, release the master lock and
+	 * wait for it to tell us that the event we're waiting for happened.
 	 */
 	if (write(sched_wakeuppipe[1], "x", 1) < 0)
 	{
@@ -264,14 +255,13 @@
 }
 
 
-/* ----------
- * sched_wait_time
+/*
+ * ---------- sched_wait_time
  *
  *	Assumes that the thread holds the lock on conn->conn_lock.
  *
- *	Like sched_wait_conn() but with a timeout. Can be called without
- *	any read/write condition to wait for to resemble a pure timeout
- *	mechanism.
+ * Like sched_wait_conn() but with a timeout. Can be called without any
+ * read/write condition to wait for to resemble a pure timeout mechanism.
  * ----------
  */
 int
@@ -294,11 +284,10 @@
 }
 
 
-/* ----------
- * sched_msleep
+/*
+ * ---------- sched_msleep
  *
- *	Use the schedulers event loop to sleep for msec milliseconds.
- * ----------
+ * Use the schedulers event loop to sleep for msec milliseconds. ----------
  */
 int
 sched_msleep(SlonNode *node, int msec)
@@ -311,8 +300,7 @@
 	{
 		snprintf(dummyconn_name, 64, "msleep_node_%d", node->no_id);
 		conn = slon_make_dummyconn(dummyconn_name);
-	}
-	else
+	} else
 		conn = slon_make_dummyconn("msleep_local");
 
 	rc = sched_wait_time(conn, 0, msec);
@@ -322,11 +310,10 @@
 }
 
 
-/* ----------
- * sched_get_status
+/*
+ * ---------- sched_get_status
  *
- *	Return the current scheduler status in a thread safe fashion
- * ----------
+ * Return the current scheduler status in a thread safe fashion ----------
  */
 int
 sched_get_status(void)
@@ -340,13 +327,12 @@
 }
 
 
-/* ----------
- * sched_wakeup_node
+/*
+ * ---------- sched_wakeup_node
  *
- *	Wakeup the threads (listen and worker) of one or all remote
- *	nodes to cause them rechecking the current runtime status or
- *	adjust their configuration to changes.
- * ----------
+ * Wakeup the threads (listen and worker) of one or all remote nodes to cause
+ * them rechecking the current runtime status or adjust their configuration
+ * to changes. ----------
  */
 int
 sched_wakeup_node (int no_id)
@@ -382,7 +368,6 @@
 			slon_abort();
 		}
 	}
-
 	pthread_mutex_unlock(&sched_master_lock);
 
 	remoteWorker_wakeup(no_id);
@@ -394,11 +379,10 @@
 }
 
 
-/* ----------
- * sched_mainloop
+/*
+ * ---------- sched_mainloop
  *
- *	The thread handling the master scheduling.
- * ----------
+ * The thread handling the master scheduling. ----------
  */
 static void *
 sched_mainloop(void *dummy)
@@ -413,8 +397,8 @@
 	int				i;
 
 	/*
-	 * Grab the scheduler master lock. This will wait until the
-	 * main thread acutally blocks on the master cond.
+	 * Grab the scheduler master lock. This will wait until the main
+	 * thread acutally blocks on the master cond.
 	 */
 	pthread_mutex_lock(&sched_master_lock);
 
@@ -425,8 +409,8 @@
 	FD_ZERO(&sched_fdset_write);
 
 	/*
-	 * Create a pipe used by the main thread to cleanly
-	 * wakeup the scheduler on signals.
+	 * Create a pipe used by the main thread to cleanly wakeup the
+	 * scheduler on signals.
 	 */
 	if (pipe(sched_wakeuppipe) < 0)
 	{
@@ -438,8 +422,8 @@
 	sched_add_fdset(sched_wakeuppipe[0], &sched_fdset_read);
 
 	/*
-	 * Done with all initialization. Let the main thread go
-	 * ahead and get everyone else dancing.
+	 * Done with all initialization. Let the main thread go ahead and get
+	 * everyone else dancing.
 	 */
 	pthread_cond_signal(&sched_master_cond);
 
@@ -465,9 +449,9 @@
 		}
 
 		/*
-		 * Check if any of the connections in the wait queue
-		 * have reached there timeout. While doing so, we also
-		 * remember the closest timeout in the future.
+		 * Check if any of the connections in the wait queue have
+		 * reached there timeout. While doing so, we also remember
+		 * the closest timeout in the future.
 		 */
 		tv = NULL;
 		gettimeofday(&now, NULL);
@@ -498,12 +482,11 @@
 				conn = next;
 				continue;
 			}
-
 			if (conn->condition & SCHED_WAIT_TIMEOUT)
 			{
 				/*
-				 * This connection has a timeout. Calculate the
-				 * time until that.
+				 * This connection has a timeout. Calculate
+				 * the time until that.
 				 */
 				timeout.tv_sec  = conn->timeout.tv_sec  - now.tv_sec;
 				timeout.tv_usec = conn->timeout.tv_usec - now.tv_usec;
@@ -520,10 +503,11 @@
 						(timeout.tv_sec == 0 && timeout.tv_usec < 20000))
 				{
 					/*
-					 * Remove the connection from the wait queue. We
-					 * consider everything closer than 20 msec being
-					 * elapsed to avoid a full scheduler round just
-					 * for one kernel tick.
+					 * Remove the connection from the
+					 * wait queue. We consider everything
+					 * closer than 20 msec being elapsed
+					 * to avoid a full scheduler round
+					 * just for one kernel tick.
 					 */
 					DLLIST_REMOVE(sched_waitqueue_head, 
 								sched_waitqueue_tail, conn);
@@ -538,11 +522,11 @@
 					pthread_mutex_lock(&(conn->conn_lock));
 					pthread_cond_signal(&(conn->conn_cond));
 					pthread_mutex_unlock(&(conn->conn_lock));
-				}
-				else
+				} else
 				{
 					/*
-					 * Timeout not elapsed. Remember the nearest.
+					 * Timeout not elapsed. Remember the
+					 * nearest.
 					 */
 					if (tv == NULL ||
 							timeout.tv_sec < min_timeout.tv_sec ||
@@ -555,7 +539,6 @@
 					}
 				}
 			}
-
 			conn = next;
 		}
 
@@ -575,7 +558,6 @@
 			sched_status = SCHED_STATUS_ERROR;
 			break;
 		}
-
 		/*
 		 * Check the special pipe for a heads up.
 		 */
@@ -591,7 +573,6 @@
 				break;
 			}
 		}
-
 		/*
 		 * Check all remaining connections if the IO condition the
 		 * thread is waiting for has occured.
@@ -624,7 +605,6 @@
 					continue;
 				}
 			}
-
 			if (conn->condition & SCHED_WAIT_SOCK_WRITE)
 			{
 				if (FD_ISSET(PQsocket(conn->dbconn), &wfds))
@@ -650,16 +630,15 @@
 					continue;
 				}
 			}
-
 			conn = conn->next;
 		}
 	}
 
 	/*
-	 * If we reach here the scheduler runmode has been changed by
-	 * by the main threads signal handler. We currently hold the
-	 * master lock. First we close the scheduler heads-up socket
-	 * pair so nobody will think we're listening any longer.
+	 * If we reach here the scheduler runmode has been changed by by the
+	 * main threads signal handler. We currently hold the master lock.
+	 * First we close the scheduler heads-up socket pair so nobody will
+	 * think we're listening any longer.
 	 */
 	close(sched_wakeuppipe[0]);
 	close(sched_wakeuppipe[1]);
@@ -697,17 +676,16 @@
 }
 
 
-/* ----------
- * sched_sighandler
+/*
+ * ---------- sched_sighandler
  *
- *	After starting up the sole purpose of the main thread (the original
- *	process) is to respond to signals while waiting for the scheduler to
- *	finish. The signal handler is here because it is actually
- *	sched_start_mainloop() that arranges for the signal handling and
- *	sched_wait_mainloop() that enables them. And the only one really
- *	interested in the signals is the scheduler thread ... but that's doing
- *	select(2) mainly and we have to avoid race conditions with signals.
- * ----------
+ * After starting up the sole purpose of the main thread (the original process)
+ * is to respond to signals while waiting for the scheduler to finish. The
+ * signal handler is here because it is actually sched_start_mainloop() that
+ * arranges for the signal handling and sched_wait_mainloop() that enables
+ * them. And the only one really interested in the signals is the scheduler
+ * thread ... but that's doing select(2) mainly and we have to avoid race
+ * conditions with signals. ----------
  */
 static void
 sched_sighandler(int signo)
@@ -721,7 +699,6 @@
 		slon_log(SLON_FATAL, "sched_sighandler: called in non-main thread\n");
 		slon_abort();
 	}
-
 	/*
 	 * Set the scheduling status to shutdown
 	 */
@@ -743,7 +720,6 @@
 		pthread_mutex_unlock(&sched_master_lock);
 		exit(-1);
 	}
-
 	/*
 	 * Unlock the master mutex
 	 */
@@ -759,12 +735,11 @@
 }
 
 
-/* ----------
- * sched_add_fdset
+/*
+ * ---------- sched_add_fdset
  *
- *	Add a file descriptor to one of the global scheduler sets and
- *	adjust sched_numfd accordingly.
- * ----------
+ * Add a file descriptor to one of the global scheduler sets and adjust
+ * sched_numfd accordingly. ----------
  */
 static void
 sched_add_fdset(int fd, fd_set *fds)
@@ -775,12 +750,11 @@
 }
 
 
-/* ----------
- * sched_add_fdset
+/*
+ * ---------- sched_add_fdset
  *
- *	Remove a file descriptor from one of the global scheduler sets and
- *	adjust sched_numfd accordingly.
- * ----------
+ * Remove a file descriptor from one of the global scheduler sets and adjust
+ * sched_numfd accordingly. ----------
  */
 static void
 sched_remove_fdset(int fd, fd_set *fds)
@@ -798,5 +772,3 @@
 		}
 	}
 }
-
-
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.61
retrieving revision 1.62
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.61 -r1.62
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -28,9 +28,8 @@
 #include "slon.h"
 
 
-/* ----------
- * Local definitions
- * ----------
+/*
+ * ---------- Local definitions ----------
  */
 
 /*
@@ -45,7 +44,8 @@
  * Message structure resulting from a remote event
  */
 typedef struct SlonWorkMsg_event_s SlonWorkMsg_event;
-struct SlonWorkMsg_event_s {
+struct SlonWorkMsg_event_s
+{
 	int			msg_type;
 	SlonWorkMsg_event *prev;
 	SlonWorkMsg_event *next;
@@ -75,7 +75,8 @@
  * Message structure resulting from a remote confirm
  */
 typedef struct SlonWorkMsg_confirm_s SlonWorkMsg_confirm;
-struct SlonWorkMsg_confirm_s {
+struct SlonWorkMsg_confirm_s
+{
 	int			msg_type;
 	SlonWorkMsg_confirm *prev;
 	SlonWorkMsg_confirm *next;
@@ -90,7 +91,8 @@
 /*
  * Generic message header
  */
-struct SlonWorkMsg_s {
+struct SlonWorkMsg_s
+{
 	int			msg_type;
 	SlonWorkMsg *prev;
 	SlonWorkMsg *next;
@@ -103,7 +105,8 @@
 typedef struct WorkerGroupLine_s WorkerGroupLine;
 
 
-struct ProviderSet_s {
+struct ProviderSet_s
+{
 	int			set_id;
 	int			sub_forward;
 
@@ -112,7 +115,8 @@
 };
 
 
-typedef enum {
+typedef enum
+{
 	SLON_WG_IDLE,
 	SLON_WG_BUSY,
 	SLON_WG_DONE,
@@ -121,14 +125,16 @@
 } WorkGroupStatus;
 
 
-typedef enum {
+typedef enum
+{
 	SLON_WGLC_ACTION,
 	SLON_WGLC_DONE,
 	SLON_WGLC_ERROR
 } WorkGroupLineCode;
 
 
-struct ProviderInfo_s {
+struct ProviderInfo_s
+{
 	int			no_id;
 	char	   *pa_conninfo;
 	int			pa_connretry;
@@ -150,7 +156,8 @@
 };
 
 
-struct WorkerGroupData_s {
+struct WorkerGroupData_s
+{
 	SlonNode		   *node;
 
 	char			   *tab_forward;
@@ -173,7 +180,8 @@
 };
 
 
-struct WorkerGroupLine_s {
+struct WorkerGroupLine_s
+{
 	WorkGroupLineCode	code;
 	ProviderInfo	   *provider;
 	SlonDString			data;
@@ -184,10 +192,11 @@
 
 
 /*
- * Global status for all remote worker threads, remembering the
- * last seen confirmed sequence number.
+ * Global status for all remote worker threads, remembering the last seen
+ * confirmed sequence number.
  */
-struct node_confirm_status {
+struct node_confirm_status
+{
 	int			con_origin;
 	int			con_received;
 	int64		con_seqno;
@@ -202,32 +211,36 @@
 int		sync_group_maxsize;
 
 
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
  */
-static void	adjust_provider_info(SlonNode *node,
+static void 
+adjust_provider_info(SlonNode * node,
 					WorkerGroupData *wd, int cleanup);
-static int	query_execute(SlonNode *node, PGconn *dbconn, 
+static int 
+query_execute(SlonNode * node, PGconn * dbconn,
 					SlonDString *dsp);
-static void	query_append_event(SlonDString *dsp, 
+static void 
+query_append_event(SlonDString * dsp,
 					SlonWorkMsg_event *event);
-static void	store_confirm_forward(SlonNode *node, SlonConn *conn,
+static void 
+store_confirm_forward(SlonNode * node, SlonConn * conn,
 					SlonWorkMsg_confirm *confirm);
 static int64 get_last_forwarded_confirm(int origin, int receiver);
-static int	copy_set(SlonNode *node, SlonConn *local_conn, int set_id,
+static int 
+copy_set(SlonNode * node, SlonConn * local_conn, int set_id,
 					SlonWorkMsg_event *event);
-static int	sync_event(SlonNode *node, SlonConn *local_conn,
+static int 
+sync_event(SlonNode * node, SlonConn * local_conn,
 					WorkerGroupData *wd,SlonWorkMsg_event *event);
 static void	*sync_helper(void *cdata);
 
 
-/* ----------
- * slon_remoteWorkerThread
+/*
+ * ---------- slon_remoteWorkerThread
  *
- *	Listen for events on the local database connection. This means,
- *	events generated by the local node only.
- * ----------
+ * Listen for events on the local database connection. This means, events
+ * generated by the local node only. ----------
  */
 void *
 remoteWorkerThread_main(void *cdata)
@@ -278,9 +291,8 @@
 	local_dbconn = local_conn->dbconn;
 
 	/*
-	 * Put the connection into replication mode and listen on the
-	 * special relation telling what node daemon this connection
-	 * belongs to.
+	 * Put the connection into replication mode and listen on the special
+	 * relation telling what node daemon this connection belongs to.
 	 */
 	slon_mkquery(&query1,
 			"select %s.setSessionRole('_%s', 'slon'); "
@@ -296,8 +308,8 @@
 	while (true)
 	{
 		/*
-		 * If we got the special WMSG_WAKEUP, check the current runmode
-		 * of the scheduler and the status of our node.
+		 * If we got the special WMSG_WAKEUP, check the current
+		 * runmode of the scheduler and the status of our node.
 		 */
 		if (sched_get_status() != SCHED_STATUS_OK)
 			break;
@@ -315,12 +327,10 @@
 				adjust_provider_info(node, wd, false);
 				curr_config = rtcfg_seq_get();
 			}
-
 			rtcfg_unlock();
 
 			check_config = false;
 		}
-
 		/*
 		 * Receive the next message from the queue. If there is no
 		 * one present, wait on the condition variable.
@@ -355,7 +365,6 @@
 			check_config = true;
 			continue;
 		}
-
 		/*
 		 * Process confirm messages.
 		 */
@@ -369,7 +378,6 @@
 			free(msg);
 			continue;
 		}
-
 		/*
 		 * This must be an event message then.
 		 */
@@ -380,7 +388,6 @@
 					node->no_id, msg->msg_type);
 			slon_abort();
 		}
-
 		event_ok = true;
 		event = (SlonWorkMsg_event *)msg;
 		sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
@@ -393,10 +400,10 @@
 		/*
 		 * Construct the queries to begin a transaction, notify on
 		 * the event and confirm relations, insert the event into our
-		 * local sl_event table and confirm it in our local sl_confirm
-		 * table. When this transaction commits, every other remote
-		 * node listening for events with us as a provider will pick
-		 * up the news.
+		 * local sl_event table and confirm it in our local
+		 * sl_confirm table. When this transaction commits, every
+		 * other remote node listening for events with us as a
+		 * provider will pick up the news.
 		 */
 		slon_mkquery(&query1, 
 				"begin transaction; "
@@ -439,19 +446,19 @@
 				}
 				pthread_mutex_unlock(&(node->message_lock));
 			}
-
 			while (true)
 			{
 				/*
-				 * Execute the forwarding and notify stuff, but
-				 * do not commit the transaction yet.
+				 * Execute the forwarding and notify stuff,
+				 * but do not commit the transaction yet.
 				 */
 				if (query_execute(node, local_dbconn, &query1) < 0)
 					slon_abort();
 
 				/*
-				 * Process the sync and apply the replication data.
-				 * If successful, exit this loop and commit the transaction.
+				 * Process the sync and apply the replication
+				 * data. If successful, exit this loop and
+				 * commit the transaction.
 				 */
 				seconds = sync_event(node, local_conn, wd, event);
 				if (seconds == 0)
@@ -459,10 +466,9 @@
 					rc = SCHED_STATUS_OK;
 					break;
 				}
-				
 				/*
-				 * Something went wrong. Rollback and try again
-				 * after the specified timeout.
+				 * Something went wrong. Rollback and try
+				 * again after the specified timeout.
 				 */
 				slon_mkquery(&query2, "rollback transaction");
 				if (query_execute(node, local_dbconn, &query2) < 0)
@@ -491,22 +497,22 @@
 
 			if (query_execute(node, local_dbconn, &query1) < 0)
 				slon_abort();
-		}
-		else
+		} else
 		{
 			/*
-			 * Avoid deadlock problems during configuration changes
-			 * by locking the central confiuration lock right from
-			 * the start.
+			 * Avoid deadlock problems during configuration
+			 * changes by locking the central confiuration lock
+			 * right from the start.
 			 */
 			slon_appendquery(&query1,
 					"lock table %s.sl_config_lock; ",
 					rtcfg_namespace);
 
 			/*
-			 * Simple configuration events. Call the corresponding
-			 * runtime config function, add the query to call the
-			 * configuration event specific stored procedure.
+			 * Simple configuration events. Call the
+			 * corresponding runtime config function, add the
+			 * query to call the configuration event specific
+			 * stored procedure.
 			 */
 			if (strcmp(event->ev_type, "STORE_NODE") == 0)
 			{
@@ -520,8 +526,7 @@
 						"select %s.storeNode_int(%d, '%q'); ",
 						rtcfg_namespace,
 						no_id, no_comment);
-			}
-			else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
+			} else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
 			{
 				int		no_id = (int) strtol(event->ev_data1, NULL, 10);
 
@@ -532,8 +537,7 @@
 						"select %s.enableNode_int(%d); ",
 						rtcfg_namespace,
 						no_id);
-			}
-			else if (strcmp(event->ev_type, "DROP_NODE") == 0)
+			} else if (strcmp(event->ev_type, "DROP_NODE") == 0)
 			{
 				int		no_id = (int) strtol(event->ev_data1, NULL, 10);
 
@@ -546,10 +550,11 @@
 						no_id);
 
 				/*
-				 * If this is our own nodeid, then calling disableNode_int()
-				 * will destroy the whole configuration including the
-				 * entire schema. Make sure we call just that and get
-				 * out of here ASAP!
+				 * If this is our own nodeid, then calling
+				 * disableNode_int() will destroy the whole
+				 * configuration including the entire schema.
+				 * Make sure we call just that and get out of
+				 * here ASAP!
 				 */
 				if (no_id == rtcfg_nodeid)
 				{
@@ -572,15 +577,14 @@
 
 					slon_abort();
 				}
-
 				/*
-				 * this is a remote node. Arrange for daemon restart.
+				 * this is a remote node. Arrange for daemon
+				 * restart.
 				 */
 				slon_appendquery(&query1,
 						"notify \"_%s_Restart\"; ",
 						rtcfg_cluster_name);
-			}
-			else if (strcmp(event->ev_type, "STORE_PATH") == 0)
+			} else if (strcmp(event->ev_type, "STORE_PATH") == 0)
 			{
 				int		pa_server = (int) strtol(event->ev_data1, NULL, 10);
 				int		pa_client = (int) strtol(event->ev_data2, NULL, 10);
@@ -594,8 +598,7 @@
 						"select %s.storePath_int(%d, %d, '%q', %d); ",
 						rtcfg_namespace,
 						pa_server, pa_client, pa_conninfo, pa_connretry);
-			}
-			else if (strcmp(event->ev_type, "DROP_PATH") == 0)
+			} else if (strcmp(event->ev_type, "DROP_PATH") == 0)
 			{
 				int		pa_server = (int) strtol(event->ev_data1, NULL, 10);
 				int		pa_client = (int) strtol(event->ev_data2, NULL, 10);
@@ -607,8 +610,7 @@
 						"select %s.dropPath_int(%d, %d); ",
 						rtcfg_namespace,
 						pa_server, pa_client);
-			}
-			else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
+			} else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
 			{
 				int		li_origin = (int) strtol(event->ev_data1, NULL, 10);
 				int		li_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -621,8 +623,7 @@
 						"select %s.storeListen_int(%d, %d, %d); ",
 						rtcfg_namespace,
 						li_origin, li_provider, li_receiver);
-			}
-			else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
+			} else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
 			{
 				int		li_origin = (int) strtol(event->ev_data1, NULL, 10);
 				int		li_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -635,8 +636,7 @@
 						"select %s.dropListen_int(%d, %d, %d); ",
 						rtcfg_namespace,
 						li_origin, li_provider, li_receiver);
-			}
-			else if (strcmp(event->ev_type, "STORE_SET") == 0)
+			} else if (strcmp(event->ev_type, "STORE_SET") == 0)
 			{
 				int		set_id = (int) strtol(event->ev_data1, NULL, 10);
 				int		set_origin = (int) strtol(event->ev_data2, NULL, 10);
@@ -649,8 +649,7 @@
 						"select %s.storeSet_int(%d, %d, '%q'); ",
 						rtcfg_namespace,
 						set_id, set_origin, set_comment);
-			}
-			else if (strcmp(event->ev_type, "DROP_SET") == 0)
+			} else if (strcmp(event->ev_type, "DROP_SET") == 0)
 			{
 				int		set_id = (int) strtol(event->ev_data1, NULL, 10);
 
@@ -659,8 +658,7 @@
 				slon_appendquery(&query1,
 						"select %s.dropSet_int(%d); ",
 						rtcfg_namespace, set_id);
-			}
-			else if (strcmp(event->ev_type, "MERGE_SET") == 0)
+			} else if (strcmp(event->ev_type, "MERGE_SET") == 0)
 			{
 				int		set_id = (int) strtol(event->ev_data1, NULL, 10);
 				int		add_id = (int) strtol(event->ev_data2, NULL, 10);
@@ -671,26 +669,23 @@
 						"select %s.mergeSet_int(%d, %d); ",
 						rtcfg_namespace,
 						set_id, add_id);
-			}
-			else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
+			} else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
 			{
 				/*
 				 * Nothing to do ATM ... we don't support
 				 * adding tables to subscribed sets yet and
-				 * table information is not maintained in
-				 * the runtime configuration.
+				 * table information is not maintained in the
+				 * runtime configuration.
 				 */
-			}
-			else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
+			} else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
 			{
 				/*
 				 * Nothing to do ATM ... we don't support
-				 * adding sequences to subscribed sets yet and
-				 * sequences information is not maintained in
-				 * the runtime configuration.
+				 * adding sequences to subscribed sets yet
+				 * and sequences information is not
+				 * maintained in the runtime configuration.
 				 */
-			}
-			else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
+			} else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
 			{
 				int		trig_tabid = (int) strtol(event->ev_data1, NULL, 10);
 				char   *trig_tgname = event->ev_data2;
@@ -699,8 +694,7 @@
 						"select %s.storeTrigger_int(%d, '%q'); ",
 						rtcfg_namespace,
 						trig_tabid, trig_tgname);
-			}
-			else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
+			} else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
 			{
 				int		trig_tabid = (int) strtol(event->ev_data1, NULL, 10);
 				char   *trig_tgname = event->ev_data2;
@@ -709,8 +703,7 @@
 						"select %s.dropTrigger_int(%d, '%q'); ",
 						rtcfg_namespace,
 						trig_tabid, trig_tgname);
-			}
-			else if (strcmp(event->ev_type, "MOVE_SET") == 0)
+			} else if (strcmp(event->ev_type, "MOVE_SET") == 0)
 			{
 				int		set_id = (int) strtol(event->ev_data1, NULL, 10);
 				int		old_origin = (int) strtol(event->ev_data2, NULL, 10);
@@ -719,10 +712,11 @@
 				PGresult   *res;
 
 				/*
-				 * Move set is a little more tricky. The stored
-				 * proc does a lot of rearranging in the provider
-				 * chain. To catch up with that, we need to execute
-				 * it now and select the resulting provider for us.
+				 * Move set is a little more tricky. The
+				 * stored proc does a lot of rearranging in
+				 * the provider chain. To catch up with that,
+				 * we need to execute it now and select the
+				 * resulting provider for us.
 				 */
 				slon_appendquery(&query1,
 						"select %s.moveSet_int(%d, %d, %d); ",
@@ -757,8 +751,7 @@
 				rtcfg_moveSet(set_id, old_origin, new_origin, sub_provider);
 
 				dstring_reset(&query1);
-			}
-			else if (strcmp(event->ev_type, "FAILOVER_SET") == 0)
+			} else if (strcmp(event->ev_type, "FAILOVER_SET") == 0)
 			{
 				int		failed_node = (int) strtol(event->ev_data1, NULL, 10);
 				int		backup_node = (int) strtol(event->ev_data2, NULL, 10);
@@ -770,8 +763,7 @@
 						"select %s.failoverSet_int(%d, %d, %d); ",
 						rtcfg_namespace,
 						failed_node, backup_node, set_id);
-			}
-			else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
+			} else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
 			{
 				int		sub_set = (int) strtol(event->ev_data1, NULL, 10);
 				int		sub_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -785,8 +777,7 @@
 						"select %s.subscribeSet_int(%d, %d, %d, '%q'); ",
 						rtcfg_namespace,
 						sub_set, sub_provider, sub_receiver, sub_forward);
-			}
-			else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
+			} else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
 			{
 				int		sub_set = (int) strtol(event->ev_data1, NULL, 10);
 				int		sub_provider = (int) strtol(event->ev_data2, NULL, 10);
@@ -809,9 +800,11 @@
 					while (true)
 					{
 						/*
-						 * If we received this event from another
-						 * node than the data provider, wait until the
-						 * data provider has synced up far enough.
+						 * If we received this event
+						 * from another node than the
+						 * data provider, wait until
+						 * the data provider has
+						 * synced up far enough.
 						 */
 						if (event->event_provider != sub_provider)
 						{
@@ -836,18 +829,20 @@
 								continue;
 							}
 						}
-						
 						/*
-						 * Execute the config changes so far, but don't
-						 * commit the transaction yet. We have to copy
-						 * the data now ...
+						 * Execute the config changes
+						 * so far, but don't commit
+						 * the transaction yet. We
+						 * have to copy the data now
+						 * ...
 						 */
 						if (query_execute(node, local_dbconn, &query1) < 0)
 							slon_abort();
 
 						/*
-						 * If the copy succeeds, exit the loop and let
-						 * the transaction commit.
+						 * If the copy succeeds, exit
+						 * the loop and let the
+						 * transaction commit.
 						 */
 						if (copy_set(node, local_conn, sub_set, event) == 0)
 						{
@@ -859,10 +854,11 @@
 							sched_rc = SCHED_STATUS_OK;
 							break;
 						}
-
 						/*
-						 * Data copy for new enabled set has failed.
-						 * Rollback the transaction, sleep and try again.
+						 * Data copy for new enabled
+						 * set has failed. Rollback
+						 * the transaction, sleep and
+						 * try again.
 						 */
 						if (query_execute(node, local_dbconn, &query2) < 0)
 							slon_abort();
@@ -877,15 +873,14 @@
 							event_ok = false;
 							break;
 						}
-
 						if (sleeptime < 60)
 							sleeptime *= 2;
 					}
-				}
-				else
+				} else
 				{
 					/*
-					 * Somebody else got enabled, just remember it
+					 * Somebody else got enabled, just
+					 * remember it
 					 */
 					slon_appendquery(&query1,
 							"select %s.enableSubscription(%d, %d, %d); ",
@@ -893,23 +888,21 @@
 							sub_set, sub_provider, sub_receiver);
 				}
 
-			}
-			else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
+			} else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
 			{
 				int		sub_set = (int) strtol(event->ev_data1, NULL, 10);
 				int		sub_receiver = (int) strtol(event->ev_data2, NULL, 10);
 
 				/*
-				 * All the real work is done when the config utility
-				 * calls unsubscribeSet() itself. Just propagate the
-				 * event here.
+				 * All the real work is done when the config
+				 * utility calls unsubscribeSet() itself.
+				 * Just propagate the event here.
 				 */
 				slon_appendquery(&query1,
 						"select %s.unsubscribeSet_int(%d, %d); ",
 						rtcfg_namespace,
 						sub_set, sub_receiver);
-			}
-			else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
+			} else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
 			{
 				int		ddl_setid = (int) strtol(event->ev_data1, NULL, 10);
 				char   *ddl_script = event->ev_data2;
@@ -919,8 +912,7 @@
 						"select %s.ddlScript_int(%d, '%q', %d); ",
 						rtcfg_namespace,
 						ddl_setid, ddl_script, ddl_only_on_node);
-			}
-			else
+			} else
 			{
 printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
 node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
@@ -934,8 +926,7 @@
 			{
 				query_append_event(&query1, event);
 				slon_appendquery(&query1, "commit transaction;");
-			}
-			else
+			} else
 			{
 				slon_mkquery(&query1, "rollback transaction;");
 			}
@@ -950,8 +941,8 @@
 	}
 
 	/*
-	 * Thread exit time has arrived.
-	 * Disconnect from all data providers and free memory
+	 * Thread exit time has arrived. Disconnect from all data providers
+	 * and free memory
 	 */
 	adjust_provider_info(node, wd, true);
 
@@ -1004,8 +995,8 @@
 	for (provider = wd->provider_head; provider; provider = provider->next)
 	{
 		/*
-		 * We create a lock here and keep it until we made
-		 * our final decision about what to do with the helper thread.
+		 * We create a lock here and keep it until we made our final
+		 * decision about what to do with the helper thread.
 		 */
 		pthread_mutex_lock(&(provider->helper_lock));
 
@@ -1023,10 +1014,10 @@
 	/*
 	 * Step 2.
 	 *
-	 *	Add all currently replicated sets (back) to the providers
-	 *	adding new providers as necessary. This step is skippen in
-	 *	cleanup mode, causing all providers to become obsolete
-	 *	and thus the whole provider info extinct.
+	 * Add all currently replicated sets (back) to the providers adding new
+	 * providers as necessary. This step is skippen in cleanup mode,
+	 * causing all providers to become obsolete and thus the whole
+	 * provider info extinct.
 	 */
 	if (!cleanup)
 	{
@@ -1056,7 +1047,8 @@
 				if (provider == NULL)
 				{
 					/*
-					 * No provider entry found. Create a new one.
+					 * No provider entry found. Create a
+					 * new one.
 					 */
 					provider = (ProviderInfo *)
 							malloc(sizeof(ProviderInfo));
@@ -1065,9 +1057,9 @@
 					provider->wd = wd;
 
 					/*
-					 * Also create a helper thread for this
-					 * provider, which will actually run the
-					 * log data selection for us.
+					 * Also create a helper thread for
+					 * this provider, which will actually
+					 * run the log data selection for us.
 					 */
 					pthread_mutex_init(&(provider->helper_lock), NULL);
 					pthread_mutex_lock(&(provider->helper_lock));
@@ -1087,7 +1079,8 @@
 							node->no_id, provider->no_id);
 
 					/*
-					 * Add more workgroup data lines to the pool.
+					 * Add more workgroup data lines to
+					 * the pool.
 					 */
 					for (i = 0; i < SLON_WORKLINES_PER_HELPER; i++)
 					{
@@ -1107,8 +1100,8 @@
 							provider);
 					
 					/*
-					 * Copy the runtime configurations conninfo
-					 * into the provider info.
+					 * Copy the runtime configurations
+					 * conninfo into the provider info.
 					 */
 					rtcfg_node = rtcfg_findNode(provider->no_id);
 					if (rtcfg_node != NULL)
@@ -1119,7 +1112,6 @@
 									strdup(rtcfg_node->pa_conninfo);
 					}
 				}
-
 				/*
 				 * Add the set to the list of sets we get
 				 * from this provider.
@@ -1139,7 +1131,6 @@
 			}
 		}
 	}
-
 	/*
 	 * Step 3.
 	 *
@@ -1159,8 +1150,8 @@
 		if (provider->set_head == NULL)
 		{
 			/*
-			 * Tell this helper thread to exit, join him and destroy
-			 * thread related data.
+			 * Tell this helper thread to exit, join him and
+			 * destroy thread related data.
 			 */
 			provider->helper_status = SLON_WG_EXIT;
 			pthread_cond_signal(&(provider->helper_cond));
@@ -1202,7 +1193,6 @@
 				slon_disconnectdb(provider->conn);
 				provider->conn = NULL;
 			}
-
 			/*
 			 * Free other resources
 			 */
@@ -1218,7 +1208,6 @@
 
 			continue;
 		}
-
 		/*
 		 * If the connection info has changed, we have to reconnect.
 		 */
@@ -1242,22 +1231,20 @@
 			else
 				provider->pa_conninfo = strdup(rtcfg_node->pa_conninfo);
 		}
-
 		/*
-		 * Unlock the helper thread ... he should now go and wait
-		 * for work.
+		 * Unlock the helper thread ... he should now go and wait for
+		 * work.
 		 */
 		pthread_mutex_unlock(&(provider->helper_lock));
 	}
 }
 
 
-/* ----------
- * remoteWorker_event
+/*
+ * ---------- remoteWorker_event
  *
- *	Used by the remoteListeThread to forward events selected from
- *	the event provider database to the remote nodes worker thread.
- *----------
+ * Used by the remoteListeThread to forward events selected from the event
+ * provider database to the remote nodes worker thread. ----------
  */
 void
 remoteWorker_event(int event_provider,
@@ -1297,7 +1284,6 @@
 				"remoteWorker_event: ignore new events due to shutdown\n");
 		return;
 	}
-
 	/*
 	 * Find the node, make sure it is active and that this event is not
 	 * already queued or processed.
@@ -1331,11 +1317,10 @@
 				ev_origin, ev_seqno);
 		return;
 	}
-
 	/*
-	 * We lock the worker threads message queue before bumping the
-	 * nodes last known event sequence to avoid that another listener
-	 * queues a later message before we can insert this one.
+	 * We lock the worker threads message queue before bumping the nodes
+	 * last known event sequence to avoid that another listener queues a
+	 * later message before we can insert this one.
 	 */
 	pthread_mutex_lock(&(node->message_lock));
 	node->last_event = ev_seqno;
@@ -1377,47 +1362,72 @@
 	msg->event_provider	= event_provider;
 	msg->ev_origin		= ev_origin;
 	msg->ev_seqno		= ev_seqno;
-	msg->ev_timestamp_c = cp; strcpy(cp, ev_timestamp); cp += len_timestamp;
-	msg->ev_minxid_c = cp; strcpy(cp, ev_minxid); cp += len_minxid;
-	msg->ev_maxxid_c = cp; strcpy(cp, ev_maxxid); cp += len_maxxid;
-	msg->ev_xip = cp; strcpy(cp, ev_xip); cp += len_xip;
-	msg->ev_type = cp; strcpy(cp, ev_type); cp += len_type;
+	msg->ev_timestamp_c = cp;
+	strcpy(cp, ev_timestamp);
+	cp += len_timestamp;
+	msg->ev_minxid_c = cp;
+	strcpy(cp, ev_minxid);
+	cp += len_minxid;
+	msg->ev_maxxid_c = cp;
+	strcpy(cp, ev_maxxid);
+	cp += len_maxxid;
+	msg->ev_xip = cp;
+	strcpy(cp, ev_xip);
+	cp += len_xip;
+	msg->ev_type = cp;
+	strcpy(cp, ev_type);
+	cp += len_type;
 	if (ev_data1 != NULL)
 	{
-		msg->ev_data1 = cp; strcpy(cp, ev_data1); cp += len_data1;
+		msg->ev_data1 = cp;
+		strcpy(cp, ev_data1);
+		cp += len_data1;
 	}
 	if (ev_data2 != NULL)
 	{
-		msg->ev_data2 = cp; strcpy(cp, ev_data2); cp += len_data2;
+		msg->ev_data2 = cp;
+		strcpy(cp, ev_data2);
+		cp += len_data2;
 	}
 	if (ev_data3 != NULL)
 	{
-		msg->ev_data3 = cp; strcpy(cp, ev_data3); cp += len_data3;
+		msg->ev_data3 = cp;
+		strcpy(cp, ev_data3);
+		cp += len_data3;
 	}
 	if (ev_data4 != NULL)
 	{
-		msg->ev_data4 = cp; strcpy(cp, ev_data4); cp += len_data4;
+		msg->ev_data4 = cp;
+		strcpy(cp, ev_data4);
+		cp += len_data4;
 	}
 	if (ev_data5 != NULL)
 	{
-		msg->ev_data5 = cp; strcpy(cp, ev_data5); cp += len_data5;
+		msg->ev_data5 = cp;
+		strcpy(cp, ev_data5);
+		cp += len_data5;
 	}
 	if (ev_data6 != NULL)
 	{
-		msg->ev_data6 = cp; strcpy(cp, ev_data6); cp += len_data6;
+		msg->ev_data6 = cp;
+		strcpy(cp, ev_data6);
+		cp += len_data6;
 	}
 	if (ev_data7 != NULL)
 	{
-		msg->ev_data7 = cp; strcpy(cp, ev_data7); cp += len_data7;
+		msg->ev_data7 = cp;
+		strcpy(cp, ev_data7);
+		cp += len_data7;
 	}
 	if (ev_data8 != NULL)
 	{
-		msg->ev_data8 = cp; strcpy(cp, ev_data8); cp += len_data8;
+		msg->ev_data8 = cp;
+		strcpy(cp, ev_data8);
+		cp += len_data8;
 	}
-
 	/*
-	 * Add the message to the queue and trigger the condition
-	 * variable in case the worker is idle.
+	 * Add the message to the queue and trigger the condition variable in
+	 * case the worker is idle.
 	 */
 	DLLIST_ADD_TAIL(node->message_head, node->message_tail,
 			(SlonWorkMsg *)msg);
@@ -1426,12 +1436,11 @@
 }
 
 
-/* ----------
- * remoteWorker_wakeup
+/*
+ * ---------- remoteWorker_wakeup
  *
- *	Send a special WAKEUP message to a worker, causing it to recheck
- *	the runmode and the runtime configuration.
- * ----------
+ * Send a special WAKEUP message to a worker, causing it to recheck the runmode
+ * and the runtime configuration. ----------
  */
 void
 remoteWorker_wakeup(int no_id)
@@ -1440,8 +1449,8 @@
 	SlonWorkMsg *msg;
 
 	/*
-	 * Can't wakeup myself, can I? No, we never have a 
-	 * "remote" worker for our own node ID. 
+	 * Can't wakeup myself, can I? No, we never have a "remote" worker
+	 * for our own node ID.
 	 */
 	if (no_id == rtcfg_nodeid)
 		return;
@@ -1476,11 +1485,10 @@
 }
 
 
-/* ----------
- * remoteWorker_confirm
+/*
+ * ---------- remoteWorker_confirm
  *
- *	Add a confirm message to the remote worker message queue
- * ----------
+ * Add a confirm message to the remote worker message queue ----------
  */
 void
 remoteWorker_confirm(int no_id,
@@ -1534,13 +1542,15 @@
 	for (oldmsg = (SlonWorkMsg_confirm *)(node->message_head);
 			oldmsg; oldmsg = (SlonWorkMsg_confirm *)(oldmsg->next))
 	{
-		if (oldmsg->msg_type == WMSG_CONFIRM) {
+		if (oldmsg->msg_type == WMSG_CONFIRM)
+		{
 			if (oldmsg->con_origin == con_origin &&
 				oldmsg->con_received == con_received)
 			{
 				/*
-				 * Existing message found. Change it if new seqno is
-				 * greater than old. Otherwise just ignore this confirm.
+				 * Existing message found. Change it if new
+				 * seqno is greater than old. Otherwise just
+				 * ignore this confirm.
 				 */
 				if (oldmsg->con_seqno < con_seqno)
 				{
@@ -1569,19 +1579,18 @@
 			(SlonWorkMsg *)msg);
 
 	/*
-	 * Send a condition signal to the worker thread in case it is
-	 * waiting for new messages.
+	 * Send a condition signal to the worker thread in case it is waiting
+	 * for new messages.
 	 */
 	pthread_cond_signal(&(node->message_cond));
 	pthread_mutex_unlock(&(node->message_lock));
 }
 
 
-/* ----------
- * query_execute
+/*
+ * ---------- query_execute
  *
- *	Execute a query string that does not return a result set.
- * ----------
+ * Execute a query string that does not return a result set. ----------
  */
 static int
 query_execute(SlonNode *node, PGconn *dbconn, SlonDString *dsp)
@@ -1608,12 +1617,11 @@
 }
 
 
-/* ----------
- * query_append_event
+/*
+ * ---------- query_append_event
  *
- *	Add queries to a dstring that notify for Event and Confirm and
- *	that insert a duplicate of an event record as well as the
- *	confirmation for it.
+ * Add queries to a dstring that notify for Event and Confirm and that insert a
+ * duplicate of an event record as well as the confirmation for it.
  * ----------
  */
 static void
@@ -1630,14 +1638,22 @@
 			"     ev_minxid, ev_maxxid, ev_xip, ev_type ",
 			rtcfg_cluster_name, rtcfg_cluster_name,
 			rtcfg_namespace);
-	if (event->ev_data1 != NULL)	dstring_append(dsp, ", ev_data1");
-	if (event->ev_data2 != NULL)	dstring_append(dsp, ", ev_data2");
-	if (event->ev_data3 != NULL)	dstring_append(dsp, ", ev_data3");
-	if (event->ev_data4 != NULL)	dstring_append(dsp, ", ev_data4");
-	if (event->ev_data5 != NULL)	dstring_append(dsp, ", ev_data5");
-	if (event->ev_data6 != NULL)	dstring_append(dsp, ", ev_data6");
-	if (event->ev_data7 != NULL)	dstring_append(dsp, ", ev_data7");
-	if (event->ev_data8 != NULL)	dstring_append(dsp, ", ev_data8");
+	if (event->ev_data1 != NULL)
+		dstring_append(dsp, ", ev_data1");
+	if (event->ev_data2 != NULL)
+		dstring_append(dsp, ", ev_data2");
+	if (event->ev_data3 != NULL)
+		dstring_append(dsp, ", ev_data3");
+	if (event->ev_data4 != NULL)
+		dstring_append(dsp, ", ev_data4");
+	if (event->ev_data5 != NULL)
+		dstring_append(dsp, ", ev_data5");
+	if (event->ev_data6 != NULL)
+		dstring_append(dsp, ", ev_data6");
+	if (event->ev_data7 != NULL)
+		dstring_append(dsp, ", ev_data7");
+	if (event->ev_data8 != NULL)
+		dstring_append(dsp, ", ev_data8");
 	slon_appendquery(dsp,
 			"    ) values ('%d', '%s', '%s', '%s', '%s', '%q', '%s'",
 			event->ev_origin, seqbuf, event->ev_timestamp_c,
@@ -1669,11 +1685,10 @@
 }
 
 
-/* ----------
- * store_confirm_forward
+/*
+ * ---------- store_confirm_forward
  *
- *	Call the forwardConfirm() stored procedure.
- * ----------
+ * Call the forwardConfirm() stored procedure. ----------
  */
 static void
 store_confirm_forward(SlonNode *node, SlonConn *conn,
@@ -1686,8 +1701,8 @@
 	int				cstat_found = false;
 
 	/*
-	 * Check the global confirm status if we already know about
-	 * this confirmation.
+	 * Check the global confirm status if we already know about this
+	 * confirmation.
 	 */
 	pthread_mutex_lock(&node_confirm_lock);
 	for (cstat = node_confirm_head; cstat; cstat = cstat->next)
@@ -1701,14 +1716,15 @@
 			if (cstat->con_seqno >= confirm->con_seqno)
 			{
 				/*
-				 * Confirm status is newer or equal, ignore message.
+				 * Confirm status is newer or equal, ignore
+				 * message.
 				 */
 				pthread_mutex_unlock(&node_confirm_lock);
 				return;
 			}
 			/*
-			 * Set the confirm status to the new seqno and continue
-			 * below.
+			 * Set the confirm status to the new seqno and
+			 * continue below.
 			 */
 			cstat_found = true;
 			cstat->con_seqno = confirm->con_seqno;
@@ -1728,12 +1744,11 @@
 		cstat->con_seqno = confirm->con_seqno;
 		DLLIST_ADD_TAIL(node_confirm_head, node_confirm_tail, cstat);
 	}
-
 	pthread_mutex_unlock(&node_confirm_lock);
 
 	/*
-	 * Call the stored procedure to forward this status through
-	 * the table sl_confirm.
+	 * Call the stored procedure to forward this status through the table
+	 * sl_confirm.
 	 */
 	dstring_init(&query);
 	sprintf(seqbuf, INT64_FORMAT, confirm->con_seqno);
@@ -1765,12 +1780,11 @@
 }
 
 
-/* ----------
- * get_last_forwarded_confirm
+/*
+ * ---------- get_last_forwarded_confirm
  *
- *	Look what confirmed event seqno we forwarded last for
- *	a given origin+receiver pair.
- * ----------
+ * Look what confirmed event seqno we forwarded last for a given origin+receiver
+ * pair. ----------
  */
 static int64
 get_last_forwarded_confirm(int origin, int receiver)
@@ -1778,8 +1792,8 @@
 	struct node_confirm_status *cstat;
 
 	/*
-	 * Check the global confirm status if we already know about
-	 * this confirmation.
+	 * Check the global confirm status if we already know about this
+	 * confirmation.
 	 */
 	pthread_mutex_lock(&node_confirm_lock);
 	for (cstat = node_confirm_head; cstat; cstat = cstat->next)
@@ -1863,7 +1877,6 @@
 				node->no_id, set_id);
 		return -1;
 	}
-
 	if ((sub_node = rtcfg_findNode(sub_provider)) == NULL)
 	{
 		rtcfg_unlock();
@@ -1919,18 +1932,15 @@
 		dstring_free(&query1);
 		return -1;
 	}
-
 	/*
-	 * Begin a serialized transaction and check if our xmin
-	 * in the snapshot is > than ev_maxxid. This ensures that
-	 * all transactions that have been in progress when the
-	 * subscription got enabled (which is after the triggers
-	 * on the tables have been defined), have finished.
-	 * Otherwise a long running open transaction would not
-	 * have the trigger definitions yet, and an insert would
-	 * not get logged. But if it still runs when we start to
-	 * copy the set, then we don't see the row either and it
-	 * would get lost.
+	 * Begin a serialized transaction and check if our xmin in the
+	 * snapshot is > than ev_maxxid. This ensures that all transactions
+	 * that have been in progress when the subscription got enabled
+	 * (which is after the triggers on the tables have been defined),
+	 * have finished. Otherwise a long running open transaction would not
+	 * have the trigger definitions yet, and an insert would not get
+	 * logged. But if it still runs when we start to copy the set, then
+	 * we don't see the row either and it would get lost.
 	 */
 	if (sub_provider == set_origin)
 	{
@@ -1961,8 +1971,7 @@
 			return -1;
 		}
 		PQclear(res1);
-	}
-	else
+	} else
 	{
 		slon_mkquery(&query1,
 				"start transaction; "
@@ -1976,7 +1985,8 @@
 	}
 
 	/*
-	 * Select the list of all tables the provider currently has in the set.
+	 * Select the list of all tables the provider currently has in the
+	 * set.
 	 */
 	slon_mkquery(&query1,
 			"select T.tab_id, "
@@ -2021,8 +2031,8 @@
 				node->no_id, tab_fqname);
 
 		/*
-		 * Find out if the table we're copying has the special
-		 * slony serial number key on the provider DB
+		 * Find out if the table we're copying has the special slony
+		 * serial number key on the provider DB
 		 */
 		slon_mkquery(&query1,
 				"select %s.tableHasSerialKey('%q');",
@@ -2086,15 +2096,13 @@
 				slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
 						"table %s Slony-I serial key added local\n",
 						node->no_id, tab_fqname);
-			}
-			else
+			} else
 			{
 				slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
 						"local table %s already has Slony-I serial key\n",
 						node->no_id, tab_fqname);
 			}
-		}
-		else
+		} else
 		{
 			slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
 					"table %s does not require Slony-I serial key\n",
@@ -2103,9 +2111,9 @@
 
 
 		/*
-		 * Call the setAddTable_int() stored procedure. Up to now, while
-		 * we have not been subscribed to the set, this should have been
-		 * suppressed.
+		 * Call the setAddTable_int() stored procedure. Up to now,
+		 * while we have not been subscribed to the set, this should
+		 * have been suppressed.
 		 */
 		slon_mkquery(&query1,
 				"select %s.setAddTable_int(%d, %d, '%q', '%q', '%q'); ",
@@ -2118,7 +2126,6 @@
 			dstring_free(&query1);
 			return -1;
 		}
-
 		/*
 		 * Copy the content of sl_trigger for this table
 		 */
@@ -2157,8 +2164,8 @@
 
 		/*
 		 * Begin a COPY from stdin for the table on the local DB
-		 * TODO: use the transaction safe truncate table on 7.4 or better
-		 *       instead of delete.
+		 * TODO: use the transaction safe truncate table on 7.4 or
+		 * better instead of delete.
 		 */
 		slon_mkquery(&query1,
 				"delete from only %s; "
@@ -2176,7 +2183,6 @@
 			dstring_free(&query1);
 			return -1;
 		}
-
 		/*
 		 * Begin a COPY to stdout for the table on the provider DB
 		 */
@@ -2202,7 +2208,6 @@
 			dstring_free(&query1);
 			return -1;
 		}
-
 		/*
 		 * Copy the data over
 		 */
@@ -2248,7 +2253,6 @@
 			dstring_free(&query1);
 			return -1;
 		}
-
 		/*
 		 * Check that the COPY to stdout on the provider node
 		 * finished successful.
@@ -2308,8 +2312,7 @@
 				copybuf[2] == '\0')
 			{
 				copydone = true;
-			}
-			else
+			} else
 			{
 				switch(rc)
 				{
@@ -2384,7 +2387,6 @@
 			dstring_free(&query1);
 			return -1;
 		}
-
 		gettimeofday(&tv_now, NULL);
 		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
 				"%.3f seconds to copy table %s\n",
@@ -2487,16 +2489,15 @@
 				node->no_id, seql_seqid, seq_fqname, seql_last_value);
 
 		/*
-		 * sequence with ID 0 is a nodes rowid ... only remember
-		 * in seqlog.
+		 * sequence with ID 0 is a nodes rowid ... only remember in
+		 * seqlog.
 		 */
 		if (strtol(seql_seqid, NULL, 10) != 0)
 		{
 		slon_mkquery(&query1,
 				"select \"pg_catalog\".setval('%q', '%s'); ",
 				seq_fqname, seql_last_value);
-		}
-		else
+		} else
 			dstring_reset(&query1);
 		slon_appendquery(&query1,
 				"insert into %s.sl_seqlog "
@@ -2526,18 +2527,18 @@
 	gettimeofday(&tv_start2, NULL);
 
 	/*
-	 * It depends on who is our data provider how we construct
-	 * the initial setsync status.
+	 * It depends on who is our data provider how we construct the
+	 * initial setsync status.
 	 */
 	if (set_origin == node->no_id)
 	{
 		/*
-		 * Our provider is the origin, so we have to construct
-		 * the setsync from scratch.
+		 * Our provider is the origin, so we have to construct the
+		 * setsync from scratch.
 		 * 
-		 * The problem at hand is that the data is something between
-		 * two SYNC points. So to get to the next sync point, we'll
-		 * have to take this and all 
+		 * The problem at hand is that the data is something between two
+		 * SYNC points. So to get to the next sync point, we'll have
+		 * to take this and all
 		 */
 		slon_mkquery(&query1,
 				"select max(ev_seqno) as ssy_seqno "
@@ -2568,8 +2569,8 @@
 		if (PQgetisnull(res1, 0, 0))
 		{
 			/*
-			 * No SYNC event found, so we initialize the setsync to
-			 * the event point of the ENABLE_SUBSCRIPTION 
+			 * No SYNC event found, so we initialize the setsync
+			 * to the event point of the ENABLE_SUBSCRIPTION
 			 */
 			ssy_seqno	= seqbuf;
 			ssy_minxid	= event->ev_minxid_c;
@@ -2587,12 +2588,12 @@
 					"from %s.sl_log_2 where log_origin = %d; ",
 					rtcfg_namespace, node->no_id,
 					rtcfg_namespace, node->no_id);
-		}
-		else
+		} else
 		{
 			/*
-			 * Use the last SYNC's snapshot information and
-			 * set the action sequence list to all actions after that.
+			 * Use the last SYNC's snapshot information and set
+			 * the action sequence list to all actions after
+			 * that.
 			 */
 			slon_mkquery(&query1,
 					"select ev_seqno, ev_minxid, ev_maxxid, ev_xip "
@@ -2621,7 +2622,6 @@
 				dstring_free(&query1);
 				return -1;
 			}
-
 			ssy_seqno	= PQgetvalue(res1, 0, 0);
 			ssy_minxid	= PQgetvalue(res1, 0, 1);
 			ssy_maxxid	= PQgetvalue(res1, 0, 2);
@@ -2683,8 +2683,7 @@
 		}
 		dstring_terminate(&ssy_action_list);
 		PQclear(res2);
-	}
-	else
+	} else
 	{
 		/*
 		 * Our provider is another subscriber, so we can copy the
@@ -2716,7 +2715,6 @@
 			dstring_free(&query1);
 			return -1;
 		}
-
 		dstring_init(&ssy_action_list);
 		ssy_seqno	= PQgetvalue(res1, 0, 0);
 		ssy_minxid	= PQgetvalue(res1, 0, 1);
@@ -2745,7 +2743,6 @@
 		dstring_free(&query1);
 		return -1;
 	}
-
 	gettimeofday(&tv_now, NULL);
 	slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
 			"%.3f seconds to build initial setsync status\n",
@@ -2753,8 +2750,8 @@
 			TIMEVAL_DIFF(&tv_start2, &tv_now));
 
 	/*
-	 * Roll back the transaction we used on the provider and close
-	 * the database connection.
+	 * Roll back the transaction we used on the provider and close the
+	 * database connection.
 	 */
 	slon_mkquery(&query1, "rollback transaction");
 	if (query_execute(node, pro_dbconn, &query1) < 0)
@@ -2824,7 +2821,6 @@
 				dstring_free(&query);
 				return 10;
 			}
-
 			sprintf(conn_symname, "subscriber_%d_provider_%d",
 					node->no_id, provider->no_id);
 			provider->conn = slon_connectdb(provider->pa_conninfo,
@@ -2838,7 +2834,6 @@
 				dstring_free(&query);
 				return provider->pa_connretry;
 			}
-
 			/*
 			 * Listen on the special relation telling our node
 			 * relationship
@@ -2853,7 +2848,6 @@
 				provider->conn = NULL;
 				return provider->pa_connretry;
 			}
-
 			slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: "
 					"connected to data provider %d on '%s'\n",
 					node->no_id, provider->no_id,
@@ -2862,8 +2856,8 @@
 	}
 
 	/*
-	 * Check that all these providers have processed at least up
-	 * to the SYNC event we're handling here.
+	 * Check that all these providers have processed at least up to the
+	 * SYNC event we're handling here.
 	 */
 	for (provider = wd->provider_head; provider; provider = provider->next)
 	{
@@ -2899,7 +2893,6 @@
 				dstring_free(&query);
 				return 10;
 			}
-
 			slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
 						"data provider %d confirmed up to "
 						"ev_seqno %s for ev_origin %d - OK\n",
@@ -3009,8 +3002,8 @@
 				node->no_id);
 
 		/*
-		 * Select all sets we receive from this provider and which are
-		 * not synced better than this SYNC already.
+		 * Select all sets we receive from this provider and which
+		 * are not synced better than this SYNC already.
 		 */
 		slon_mkquery(&query,
 				"select SSY.ssy_setid, SSY.ssy_seqno, "
@@ -3037,8 +3030,6 @@
 			dstring_free(&query);
 			return 60;
 		}
-
-
 		/* 
 		 * For every set we receive from this provider
 		 */
@@ -3095,27 +3086,23 @@
 				PQclear(res2);
 				continue;
 			}
-
 			/*
 			 * ... and build up a query qualification that is
 			 * 
-			 *  and (
-             *        (log_tableid in (<tables_in_set>) 
-			 *          and (<snapshot_qual_of_new_sync>)
-			 *          and (<snapshot_qual_of_setsync>)
-			 *        )
-			 *        OR 
-             *        ( <next_set_from_this_provider> )
-			 *  )
+			 * and ( (log_tableid in (<tables_in_set>) and
+			 * (<snapshot_qual_of_new_sync>) and
+			 * (<snapshot_qual_of_setsync>) ) OR (
+			 * <next_set_from_this_provider> ) )
 			 * 
-			 * If we were using AND's there then no rows will ever end
-             * up being selected when you have multiple sets. 
+			 * If we were using AND's there then no rows will ever
+			 * end up being selected when you have multiple sets.
 			 */
 
 			if(added_or_to_provider)
 			  {
 				slon_appendquery(provider_qual, "or (\n    log_tableid in (");
-			  } else {
+			} else
+			{
 				slon_appendquery(provider_qual, " (\n log_tableid in (");
 				added_or_to_provider = 1;
 			  }
@@ -3128,8 +3115,9 @@
 				SlonSet	*rtcfg_set;
 
 				/*
-				 * Remember the fully qualified table name on the fly.
-				 * This might have to become a hashtable someday.
+				 * Remember the fully qualified table name on
+				 * the fly. This might have to become a
+				 * hashtable someday.
 				 */
 				while (tab_id >= wd->tab_fqname_size)
 				{
@@ -3146,8 +3134,8 @@
 				wd->tab_fqname[tab_id] = strdup(PQgetvalue(res2, tupno2, 2));
 
 				/*
-				 * Also remember if the tables log data needs to be
-				 * forwarded.
+				 * Also remember if the tables log data needs
+				 * to be forwarded.
 				 */
 				for (rtcfg_set = rtcfg_set_list_head; rtcfg_set;
 						rtcfg_set = rtcfg_set->next)
@@ -3194,14 +3182,15 @@
 		PQclear(res1);
 
 		/* 
-		 * We didn't add anything good in the provider clause.
-		 * That shouldn't be!
+		 * We didn't add anything good in the provider clause. That
+		 * shouldn't be!
 		 */
 		if(added_or_to_provider)
 		  {
 			/* close out our OR block */
 			slon_appendquery(provider_qual, ")"); 
-		  } else {
+		} else
+		{
 			slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: Didn't add or to provider\n", node->no_id);
 		  }
 	}
@@ -3209,8 +3198,8 @@
 	dstring_free(&new_qual);
 
 	/*
-	 * If we have found no sets needing sync at all, why
-	 * bother the helpers?
+	 * If we have found no sets needing sync at all, why bother the
+	 * helpers?
 	 */
 	if (num_sets == 0)
 	{
@@ -3220,7 +3209,6 @@
 		dstring_free(&query);
 		return 0;
 	}
-
 	/*
 	 * Time to get the helpers busy.
 	 */
@@ -3278,7 +3266,6 @@
 						PQclear(res1);
 						break;
 					}
-
 					if (PQresultStatus(res1) != PGRES_COMMAND_OK)
 					{
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
@@ -3301,8 +3288,7 @@
 									dstring_data(&(wgline->data)),
 									dstring_data(&(wgline->provider->helper_qualification)));
 							num_errors++;
-						}
-						else
+					} else
 							slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: %s\n",
 									node->no_id, dstring_data(&(wgline->data)));
 					}
@@ -3348,8 +3334,7 @@
 	}
 
 	/*
-	 * Inform the helpers that the whole group is done with this
-	 * SYNC.
+	 * Inform the helpers that the whole group is done with this SYNC.
 	 */
 	slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
 			"all helpers done.\n",
@@ -3391,10 +3376,9 @@
 				node->no_id);
 		return 10;
 	}
-
 	/*
-	 * Light's are still green ... update the setsync status of
-	 * all the sets we've just replicated ...
+	 * Light's are still green ... update the setsync status of all the
+	 * sets we've just replicated ...
 	 */
 	slon_mkquery(&query,
 			"update %s.sl_setsync set "
@@ -3435,7 +3419,6 @@
 		}
 		PQclear(res1);
 	}
-
 	for (provider = wd->provider_head; provider; provider = provider->next)
 	{
 		/*
@@ -3454,8 +3437,8 @@
 	}
 
 	/*
-	 * Get the nodes rowid sequence at that sync time just in case
-	 * we are later on asked to restore the node after a failover.
+	 * Get the nodes rowid sequence at that sync time just in case we are
+	 * later on asked to restore the node after a failover.
 	 */
 	slon_mkquery(&query,
 			"select seql_last_value from %s.sl_seqlog "
@@ -3553,7 +3536,6 @@
 			pthread_mutex_unlock(&(provider->helper_lock));
 			continue;
 		}
-
 		/*
 		 * OK, we got work to do.
 		 */
@@ -3561,7 +3543,8 @@
 		pthread_mutex_unlock(&(provider->helper_lock));
 
 		errors = 0;
-		do {
+		do
+		{
 			/*
 			 * Start a transaction
 			 */
@@ -3573,7 +3556,6 @@
 				errors++;
 				break;
 			}
-
 			/*
 			 * Open a cursor that reads the log data.
 			 *
@@ -3596,18 +3578,17 @@
 				errors++;
 				break;
 			}
-
 			/*
 			 * Now fetch the log data and forward it via the line
-			 * pool to the main worker who pushes it into the local
-			 * database.
+			 * pool to the main worker who pushes it into the
+			 * local database.
 			 */
 			alloc_lines = 0;
 			while (errors == 0)
 			{
 				/*
-				 * Allocate at least some lines - ideally the whole
-				 * fetch size.
+				 * Allocate at least some lines - ideally the
+				 * whole fetch size.
 				 */
 				while (alloc_lines == 0 && !errors)
 				{
@@ -3615,8 +3596,8 @@
 							"remoteHelperThread_%d_%d: allocate lines\n",
 							node->no_id, provider->no_id);
 					/*
-					 * Wait until there are lines available in
-					 * the pool.
+					 * Wait until there are lines
+					 * available in the pool.
 					 */
 					pthread_mutex_lock(&(wd->workdata_lock));
 					while (wd->linepool_head == NULL &&
@@ -3626,8 +3607,9 @@
 					}
 
 					/*
-					 * If any error occured somewhere in the group, the
-					 * main worker will set the status to ABORT.
+					 * If any error occured somewhere in
+					 * the group, the main worker will
+					 * set the status to ABORT.
 					 */
 					if (wd->workgroup_status != SLON_WG_BUSY)
 					{
@@ -3638,10 +3620,9 @@
 						errors++;
 						break;
 					}
-
 					/*
-					 * So far so good. Fill our array of lines
-					 * from the pool.
+					 * So far so good. Fill our array of
+					 * lines from the pool.
 					 */
 					while (alloc_lines < SLON_DATA_FETCH_SIZE &&
 							wd->linepool_head != NULL)
@@ -3662,8 +3643,9 @@
 						node->no_id, provider->no_id, alloc_lines);
 
 				/*
-				 * Now that we have allocated some buffer space,
-				 * try to fetch that many rows from the cursor.
+				 * Now that we have allocated some buffer
+				 * space, try to fetch that many rows from
+				 * the cursor.
 				 */
 				slon_mkquery(&query, "fetch %d from LOG; ", 
 						alloc_lines * SLON_COMMANDS_PER_LINE);
@@ -3678,7 +3660,6 @@
 					errors++;
 					break;
 				}
-
 				if (first_fetch)
 				{
 					gettimeofday(&tv_now, NULL);
@@ -3689,10 +3670,9 @@
 							
 					first_fetch = false;
 				}
-
 				/*
-				 * Fill the line buffers with queries from the
-				 * retrieved log rows.
+				 * Fill the line buffers with queries from
+				 * the retrieved log rows.
 				 */
 				line_no = 0;
 				ntuples = PQntuples(res);
@@ -3716,13 +3696,14 @@
 						line->provider = provider;
 						dstring_reset(&(line->data));
 					}
-
 					/*
-					 * This can happen if the table belongs to a
-					 * set that already has a better sync status
-					 * than the event we're currently processing
-					 * as a result from another SYNC occuring before
-					 * we had started processing the copy_set.
+					 * This can happen if the table
+					 * belongs to a set that already has
+					 * a better sync status than the
+					 * event we're currently processing
+					 * as a result from another SYNC
+					 * occuring before we had started
+					 * processing the copy_set.
 					 */
 					if (log_tableid >= wd->tab_fqname_size ||
 							wd->tab_fqname[log_tableid] == NULL)
@@ -3740,7 +3721,6 @@
 								log_origin, log_xid, log_tableid,
 								log_actionseq, log_cmdtype, log_cmddata);
 					}
-
 					switch (*log_cmdtype)
 					{
 						case 'I':
@@ -3768,8 +3748,9 @@
 				PQclear(res);
 
 				/*
-				 * Now put all the line buffers back. Filled ones
-				 * into the repldata, unused ones into the pool.
+				 * Now put all the line buffers back. Filled
+				 * ones into the repldata, unused ones into
+				 * the pool.
 				 */
 				pthread_mutex_lock(&(wd->workdata_lock));
 				for (tupno = 0; tupno < alloc_lines; tupno++)
@@ -3799,7 +3780,6 @@
 					alloc_lines = 0;
 					break;
 				}
-
 				alloc_lines = 0;
 			}
 		} while (0);
@@ -3822,7 +3802,6 @@
 			pthread_cond_broadcast(&(wd->linepool_cond));
 			pthread_mutex_unlock(&(wd->workdata_lock));
 		}
-
 		/*
 		 * Close the cursor and rollback the transaction.
 		 */
@@ -3887,5 +3866,3 @@
 		pthread_mutex_unlock(&(provider->helper_lock));
 	}
 }
-
-
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.15
retrieving revision 1.16
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.15 -r1.16
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -29,14 +29,14 @@
 #include "slon.h"
 
 
-/* ----------
- * struct listat
+/*
+ * ---------- struct listat
  *
- *	local data structure for nodes we are currently listening
- *	for events from.
+ * local data structure for nodes we are currently listening for events from.
  * ----------
  */
-struct listat {
+struct listat
+{
 	int				li_origin;
 
 	struct listat  *prev;
@@ -44,28 +44,30 @@
 };
 
 
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
  */
-static void		remoteListen_adjust_listat(SlonNode *node,
+static void 
+remoteListen_adjust_listat(SlonNode * node,
 							struct listat **listat_head, 
 							struct listat **listat_tail);
-static void		remoteListen_cleanup(struct listat **listat_head,
+static void 
+remoteListen_cleanup(struct listat ** listat_head,
 							struct listat **listat_tail);
-static int		remoteListen_forward_confirm(SlonNode *node,
+static int 
+remoteListen_forward_confirm(SlonNode * node,
 							SlonConn *conn);
-static int		remoteListen_receive_events(SlonNode *node,
+static int 
+remoteListen_receive_events(SlonNode * node,
 							SlonConn *conn, struct listat *listat);
 
 
 
-/* ----------
- * slon_remoteListenThread
+/*
+ * ---------- slon_remoteListenThread
  *
- *	Listen for events on a remote database connection. This means,
- *	events generated by every other node we listen for on this one.
- * ----------
+ * Listen for events on a remote database connection. This means, events
+ * generated by every other node we listen for on this one. ----------
  */
 void *
 remoteListenThread_main(void *cdata)
@@ -109,14 +111,15 @@
 		if (last_config_seq != (new_config_seq = rtcfg_seq_get()))
 		{
 			/*
-			 * Lock the configuration and check if we are (still) supposed
-			 * to exist.
+			 * Lock the configuration and check if we are (still)
+			 * supposed to exist.
 			 */
 			rtcfg_lock();
 
 			/*
-			 * If we have a database connection to the remote node, check
-			 * if there was a change in the connection information.
+			 * If we have a database connection to the remote
+			 * node, check if there was a change in the
+			 * connection information.
 			 */
 			if (conn != NULL)
 			{
@@ -134,7 +137,6 @@
 					conn_conninfo = NULL;
 				}
 			}
-
 			/*
 			 * Check our node's listen_status
 			 */
@@ -149,9 +151,10 @@
 				node->listen_status = SLON_TSTAT_RUNNING;
 
 			/*
-			 * Adjust the listat list and see if there is anything to
-			 * listen for. If not, sleep for a while and check again,
-			 * some node reconfiguration must be going on here.
+			 * Adjust the listat list and see if there is
+			 * anything to listen for. If not, sleep for a while
+			 * and check again, some node reconfiguration must be
+			 * going on here.
 			 */
 			remoteListen_adjust_listat(node, &listat_head, &listat_tail);
 
@@ -168,10 +171,8 @@
 					break;
 				continue;
 			}
-
 			rtcfg_unlock();
 		}
-
 		/*
 		 * Check if we have a database connection
 		 */
@@ -197,10 +198,9 @@
 
 				continue;
 			}
-
 			/*
-			 * Try to establish a database connection to the remote
-			 * node's database.
+			 * Try to establish a database connection to the
+			 * remote node's database.
 			 */
 			conn_conninfo = strdup(node->pa_conninfo);
 			pa_connretry = node->pa_connretry;
@@ -226,7 +226,8 @@
 			dbconn = conn->dbconn;
 
 			/*
-			 * Listen on the connection for events and confirmations
+			 * Listen on the connection for events and
+			 * confirmations
 			 */
 			slon_mkquery(&query1,
 					"listen \"_%s_Event\"; "
@@ -253,7 +254,6 @@
 
 				continue;
 			}
-
 			rc = db_getLocalNodeId(dbconn);
 			if (rc != node->no_id)
 			{
@@ -273,13 +273,11 @@
 
 				continue;
 			}
-
 			slon_log(SLON_DEBUG1,
 					"remoteListenThread_%d: connected to '%s'\n",
 					node->no_id, conn_conninfo);
 
 		}
-
 		/*
 		 * Receive events from the provider node
 		 */
@@ -297,11 +295,10 @@
 
 			continue;
 		}
-
 		/*
-		 * If the remote node notified for new confirmations,
-		 * read them and queue them into the remote worker for
-		 * storage in our local database.
+		 * If the remote node notified for new confirmations, read
+		 * them and queue them into the remote worker for storage in
+		 * our local database.
 		 */
 		if (forward_confirm)
 		{
@@ -321,7 +318,6 @@
 			}
 			forward_confirm = false;
 		}
-
 		/*
 		 * Wait for notification.
 		 */
@@ -358,7 +354,6 @@
 		conn = NULL;
 		conn_conninfo = NULL;
 	}
-
 	remoteListen_cleanup(&listat_head, &listat_tail);
 
 	rtcfg_lock();
@@ -372,11 +367,10 @@
 }
 
 
-/* ----------
- * remoteListen_adjust_listat
+/*
+ * ---------- remoteListen_adjust_listat
  *
- *	local function to (re)adjust the known nodes to the global
- *	configuration.
+ * local function to (re)adjust the known nodes to the global configuration.
  * ----------
  */
 static void
@@ -425,7 +419,6 @@
 			DLLIST_REMOVE(*listat_head, *listat_tail, listat);
 			free(listat);
 		}
-
 		listat = linext;
 	}
 
@@ -478,11 +471,10 @@
 }
 
 
-/* ----------
- * remoteListen_cleanup
+/*
+ * ---------- remoteListen_cleanup
  *
- *	Free resources used by the remoteListen thread
- * ----------
+ * Free resources used by the remoteListen thread ----------
  */
 static void
 remoteListen_cleanup(struct listat **listat_head, struct listat **listat_tail)
@@ -500,14 +492,13 @@
 }
 
 
-/* ----------
- * remoteListen_forward_confirm
+/*
+ * ---------- remoteListen_forward_confirm
  *
- *	Read the last confirmed event sequence for all nodes from the
- *	remote database and forward it to the local database so that
- *	the cleanup process will know when all nodes have confirmed
- *	an event and it can be thrown away (together with its log data).
- * ----------
+ * Read the last confirmed event sequence for all nodes from the remote database
+ * and forward it to the local database so that the cleanup process will know
+ * when all nodes have confirmed an event and it can be thrown away (together
+ * with its log data). ----------
  */
 static int
 remoteListen_forward_confirm(SlonNode *node, SlonConn *conn)
@@ -542,10 +533,9 @@
 		PQclear(res);
 		return -1;
 	}
-
 	/*
-	 * We actually do not do the forwardiing ourself here. We send
-	 * a special message to the remote worker for that node.
+	 * We actually do not do the forwardiing ourself here. We send a
+	 * special message to the remote worker for that node.
 	 */
 	ntuples = PQntuples(res);
 	for (tupno = 0; tupno < ntuples; tupno++)
@@ -565,12 +555,11 @@
 }
 
 
-/* ----------
- * remoteListen_receive_events
+/*
+ * ---------- remoteListen_receive_events
  *
- *	Retrieve all new events that origin from nodes for which we
- *	listen on this node as provider and add them to the node
- *	specific worker message queue..
+ * Retrieve all new events that origin from nodes for which we listen on this
+ * node as provider and add them to the node specific worker message queue..
  * ----------
  */
 static int
@@ -590,15 +579,15 @@
 	dstring_init(&query);
 
 	/*
-	 * In the runtime configuration info for the node, we remember
-	 * the last event sequence that we actually have received. If the
-	 * remote worker thread has processed it yet or not isn't important,
-	 * we have it in the message queue at least and don't need to
-	 * select it again.
+	 * In the runtime configuration info for the node, we remember the
+	 * last event sequence that we actually have received. If the remote
+	 * worker thread has processed it yet or not isn't important, we have
+	 * it in the message queue at least and don't need to select it
+	 * again.
 	 *
-	 * So the query we construct contains a qualification
-	 * (ev_origin = <remote_node> and ev_seqno > <last_seqno>) per
-	 * remote node we're listen for here.
+	 * So the query we construct contains a qualification (ev_origin =
+	 * <remote_node> and ev_seqno > <last_seqno>) per remote node we're
+	 * listen for here.
 	 */
 	slon_mkquery(&query,
 			"select ev_origin, ev_seqno, ev_timestamp, "
@@ -625,7 +614,6 @@
 			dstring_free(&query);
 			return -1;
 		}
-
 		sprintf(seqno_buf, INT64_FORMAT, origin->last_event);
 		slon_appendquery(&query,
 				" %s (e.ev_origin = '%d' and e.ev_seqno > '%s')",
@@ -647,7 +635,6 @@
 		dstring_free(&query);
 		return -1;
 	}
-
 	time(&timeout);
 	timeout += 300;
 	while (PQisBusy(conn->dbconn) != 0)
@@ -726,5 +713,3 @@
 
 	return 0;
 }
-
-
Index: sync_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/sync_thread.c,v
retrieving revision 1.12
retrieving revision 1.13
diff -Lsrc/slon/sync_thread.c -Lsrc/slon/sync_thread.c -u -w -r1.12 -r1.13
--- src/slon/sync_thread.c
+++ src/slon/sync_thread.c
@@ -28,16 +28,15 @@
 #include "slon.h"
 
 
-/* ----------
- * Global variables
- * ----------
+/*
+ * ---------- Global variables ----------
  */
 int		sync_interval;
 int		sync_interval_timeout;
 
 
-/* ----------
- * slon_localSyncThread
+/*
+ * ---------- slon_localSyncThread
  *
  *	Generate SYNC event if local database activity created new log info.
  * ----------
@@ -64,15 +63,15 @@
 	dbconn = conn->dbconn;
 
 	/*
-	 * We don't initialize the last known action sequence to the
-	 * actual value. This causes that we create a SYNC event
-	 * allways on startup, just in case.
+	 * We don't initialize the last known action sequence to the actual
+	 * value. This causes that we create a SYNC event allways on startup,
+	 * just in case.
 	 */
 	last_actseq_buf[0] = '\0';
 
 	/*
-	 * Build the query that starts a transaction and retrieves
-	 * the last value from the action sequence.
+	 * Build the query that starts a transaction and retrieves the last
+	 * value from the action sequence.
 	 */
 	dstring_init(&query1);
 	slon_mkquery(&query1, 
@@ -107,7 +106,6 @@
 			slon_abort();
 			break;
 		}
-
 		/*
 		 * Check if it's identical to the last know seq or if the
 		 * sync interval timeout has arrived.
@@ -120,7 +118,8 @@
 		{
 			/*
 			 * Action sequence has changed, generate a SYNC event
-			 * and read the resulting currval of the event sequence.
+			 * and read the resulting currval of the event
+			 * sequence.
 			 */
 			strcpy(last_actseq_buf, PQgetvalue(res, 0, 0));
 
@@ -159,8 +158,7 @@
 			 */
 			timeout_count = (sync_interval_timeout == 0) ? 0 :
 					sync_interval_timeout - sync_interval;
-		}
-		else
+		} else
 		{
 			/*
 			 * No database activity detected - rollback.
@@ -186,5 +184,3 @@
 	slon_log(SLON_DEBUG1, "syncThread: thread done\n");
 	pthread_exit(NULL);
 }
-
-
Index: runtime_config.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/runtime_config.c,v
retrieving revision 1.19
retrieving revision 1.20
diff -Lsrc/slon/runtime_config.c -Lsrc/slon/runtime_config.c -u -w -r1.19 -r1.20
--- src/slon/runtime_config.c
+++ src/slon/runtime_config.c
@@ -29,9 +29,8 @@
 #include "slon.h"
 
 
-/* ----------
- * Global data
- * ----------
+/*
+ * ---------- Global data ----------
  */
 pid_t					slon_pid;
 char				   *rtcfg_cluster_name = NULL;
@@ -48,15 +47,15 @@
 SlonNode			   *rtcfg_node_list_tail = NULL;
 
 
-/* ----------
- * Local data
- * ----------
+/*
+ * ---------- Local data ----------
  */
 static pthread_mutex_t	config_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_mutex_t	cfgseq_lock = PTHREAD_MUTEX_INITIALIZER;
 static int64			cfgseq = 0;
 
-struct to_activate {
+struct to_activate
+{
 	int					no_id;
 
 	struct to_activate *prev;
@@ -66,16 +65,14 @@
 static struct to_activate *to_activate_tail = NULL;
 
 
-/* ----------
- * Local functions
- * ----------
+/*
+ * ---------- Local functions ----------
  */
 static void			rtcfg_startStopNodeThread(SlonNode *node);
 
 
-/* ----------
- * rtcfg_lock
- * ----------
+/*
+ * ---------- rtcfg_lock ----------
  */
 void
 rtcfg_lock(void)
@@ -84,9 +81,8 @@
 }
 
 
-/* ----------
- * rtcfg_unlock
- * ----------
+/*
+ * ---------- rtcfg_unlock ----------
  */
 void
 rtcfg_unlock(void)
@@ -95,9 +91,8 @@
 }
 
 
-/* ----------
- * rtcfg_storeNode
- * ----------
+/*
+ * ---------- rtcfg_storeNode ----------
  */
 void
 rtcfg_storeNode(int no_id, char *no_comment)
@@ -122,7 +117,6 @@
 		rtcfg_unlock();
 		return;
 	}
-
 	/*
 	 * Add the new node to our in-memory configuration.
 	 */
@@ -151,15 +145,13 @@
 }
 
 
-/* ----------
- * rtcfg_setNodeLastEvent()
+/*
+ * ---------- rtcfg_setNodeLastEvent()
  *
  *	Set the last_event field in the node runtime structure.
  *
- *	Returns:	0 if the event_seq is <= the known value
- *				-1 if the node is not known
- *				event_seq otherwise
- * ----------
+ * Returns:	0 if the event_seq is <= the known value -1 if the node is
+ * not known event_seq otherwise ----------
  */
 int64
 rtcfg_setNodeLastEvent(int no_id, int64 event_seq)
@@ -174,11 +166,9 @@
 		{
 			node->last_event = event_seq;
 			retval = event_seq;
-		}
-		else
+		} else
 			retval = 0;
-	}
-	else
+	} else
 		retval = -1;
 
 	rtcfg_unlock();
@@ -191,11 +181,10 @@
 }
 
 
-/* ----------
- * rtcfg_getNodeLastEvent
+/*
+ * ---------- rtcfg_getNodeLastEvent
  *
- *	Read the nodes last_event field
- * ----------
+ * Read the nodes last_event field ----------
  */
 int64
 rtcfg_getNodeLastEvent(int no_id)
@@ -207,8 +196,7 @@
 	if ((node = rtcfg_findNode(no_id)) != NULL)
 	{
 		retval = node->last_event;
-	}
-	else
+	} else
 		retval = -1;
 
 	rtcfg_unlock();
@@ -217,9 +205,8 @@
 }
 
 
-/* ----------
- * rtcfg_enableNode
- * ----------
+/*
+ * ---------- rtcfg_enableNode ----------
  */
 void
 rtcfg_enableNode(int no_id)
@@ -238,7 +225,6 @@
 		slon_abort();
 		return;
 	}
-
 	/*
 	 * Activate the node
 	 */
@@ -253,9 +239,8 @@
 }
 
 
-/* ----------
- * slon_disableNode
- * ----------
+/*
+ * ---------- slon_disableNode ----------
  */
 void
 rtcfg_disableNode(int no_id)
@@ -274,7 +259,6 @@
 		slon_abort();
 		return;
 	}
-
 	/*
 	 * Deactivate the node
 	 */
@@ -286,14 +270,13 @@
 	rtcfg_seq_bump();
 
 	/*
-	rtcfg_startStopNodeThread(node);
+	 * rtcfg_startStopNodeThread(node);
 	*/
 }
 
 
-/* ----------
- * rtcfg_findNode
- * ----------
+/*
+ * ---------- rtcfg_findNode ----------
  */
 SlonNode *
 rtcfg_findNode(int no_id)
@@ -310,9 +293,8 @@
 }
 
 
-/* ----------
- * rtcfg_storePath
- * ----------
+/*
+ * ---------- rtcfg_storePath ----------
  */
 void
 rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry)
@@ -334,7 +316,6 @@
 		rtcfg_lock();
 		node = rtcfg_findNode(pa_server);
 	}
-
 	/*
 	 * Store the (new) conninfo to the node
 	 */
@@ -357,9 +338,8 @@
 }
 
 
-/* ----------
- * rtcfg_dropPath
- * ----------
+/*
+ * ---------- rtcfg_dropPath ----------
  */
 void
 rtcfg_dropPath(int pa_server)
@@ -380,11 +360,9 @@
 
 		return;
 	}
-
 	/*
-	 * Drop all listen information as well
-	 * at this provider. Without a path we
-	 * cannot listen.
+	 * Drop all listen information as well at this provider. Without a
+	 * path we cannot listen.
 	 */
 	while (node->listen_head != NULL)
 	{
@@ -415,9 +393,8 @@
 }
 
 
-/* ----------
- * rtcfg_storeListen
- * ----------
+/*
+ * ---------- rtcfg_storeListen ----------
  */
 void
 rtcfg_storeListen(int li_origin, int li_provider)
@@ -435,10 +412,9 @@
 		slon_abort();
 		return;
 	}
-
 	/*
-	 * Check if we already listen for events from that origin
-	 * at this provider.
+	 * Check if we already listen for events from that origin at this
+	 * provider.
 	 */
 	for (listen = node->listen_head; listen; listen = listen->next)
 	{
@@ -481,9 +457,8 @@
 }
 
 
-/* ----------
- * rtcfg_dropListen
- * ----------
+/*
+ * ---------- rtcfg_dropListen ----------
  */
 void
 rtcfg_dropListen(int li_origin, int li_provider)
@@ -501,10 +476,8 @@
 		slon_abort();
 		return;
 	}
-
 	/*
-	 * Find that listen entry
-	 * at this provider.
+	 * Find that listen entry at this provider.
 	 */
 	for (listen = node->listen_head; listen; listen = listen->next)
 	{
@@ -713,7 +686,8 @@
 			rtcfg_unlock();
 			rtcfg_seq_bump();
 			/*
-			 * Wakeup the worker threads for the old and new provider
+			 * Wakeup the worker threads for the old and new
+			 * provider
 			 */
 			if (old_provider >= 0 && old_provider != sub_provider)
 				sched_wakeup_node(old_provider);
@@ -797,7 +771,8 @@
 			rtcfg_unlock();
 			rtcfg_seq_bump();
 			/*
-			 * Wakeup the worker threads for the old and new provider
+			 * Wakeup the worker threads for the old and new
+			 * provider
 			 */
 			if (old_provider >= 0)
 				sched_wakeup_node(old_provider);
@@ -812,9 +787,8 @@
 }
 
 
-/* ----------
- * rtcfg_startStopNodeThread
- * ----------
+/*
+ * ---------- rtcfg_startStopNodeThread ----------
  */
 static void
 rtcfg_startStopNodeThread(SlonNode *node)
@@ -851,8 +825,7 @@
 			default:
 printf("TODO: ********** rtcfg_startStopNodeThread: restart node worker\n");
 		}
-	}
-	else
+	} else
 	{
 		/*
 		 * Make sure there is no node worker
@@ -916,8 +889,7 @@
 				node->listen_status = SLON_TSTAT_NONE;
 				break;
 		}
-	}
-	else
+	} else
 	{
 		/*
 		 * Node specific listen thread not required
@@ -1063,6 +1035,3 @@
 
 	return retval;
 }
-
-
-
Index: misc.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/misc.c,v
retrieving revision 1.12
retrieving revision 1.13
diff -Lsrc/slon/misc.c -Lsrc/slon/misc.c -u -w -r1.12 -r1.13
--- src/slon/misc.c
+++ src/slon/misc.c
@@ -24,6 +24,9 @@
 #include <sys/time.h>
 #include <sys/types.h>
 
+#include <syslog.h>
+#include <stdarg.h>
+
 #include "libpq-fe.h"
 #include "c.h"
 
@@ -38,6 +41,30 @@
 
 static pthread_mutex_t	log_mutex = PTHREAD_MUTEX_INITIALIZER;
 
+#ifdef HAVE_SYSLOG
+/*
+ * 0 = only stdout/stderr
+ * 1 = stdout+stderr and syslog
+ * 2 = syslog only
+ * ... in theory anyway
+ */
+
+#ifndef SLON_SYSLOG_LIMIT
+#define SLON_SYSLOG_LIMIT 128
+#endif
+
+extern int                     Use_syslog;
+extern char       *Syslog_facility;    /* openlog() parameters */
+extern char       *Syslog_ident;
+
+static void write_syslog(int level, const char *line);
+
+#else
+
+#define Use_syslog 0
+
+#endif   /* HAVE_SYSLOG */
+
 
 void
 slon_log(SlonLogLevel level, char *fmt, ...)
@@ -51,28 +78,56 @@
 	char		time_buf[128];
 	time_t		stamp_time = time(NULL);
 
+#ifdef HAVE_SYSLOG
+	int		syslog_level = LOG_ERR;
+#endif
 	if (level > slon_log_level)
 		return;
 
 	switch (level)
 	{
-		case SLON_DEBUG4:	level_c = "DEBUG4";
+	case SLON_DEBUG4:
+		level_c = "DEBUG4";
 							break;
-		case SLON_DEBUG3:	level_c = "DEBUG3";
+	case SLON_DEBUG3:
+		level_c = "DEBUG3";
 							break;
-		case SLON_DEBUG2:	level_c = "DEBUG2";
+	case SLON_DEBUG2:
+		level_c = "DEBUG2";
 							break;
-		case SLON_DEBUG1:	level_c = "DEBUG1";
+	case SLON_DEBUG1:
+		level_c = "DEBUG1";
+#ifdef HAVE_SYSLOG
+		syslog_level = LOG_DEBUG;
+#endif
 							break;
-		case SLON_INFO:		level_c = "INFO";
+	case SLON_INFO:
+		level_c = "INFO";
+#ifdef HAVE_SYSLOG
+                syslog_level = LOG_INFO;
+#endif
 							break;
-		case SLON_CONFIG:	level_c = "CONFIG";
+	case SLON_CONFIG:
+		level_c = "CONFIG";
 							break;
-		case SLON_WARN:		level_c = "WARN";
+	case SLON_WARN:
+		level_c = "WARN";
+#ifdef HAVE_SYSLOG
+                syslog_level = LOG_WARNING;
+#endif
 							break;
-		case SLON_ERROR:	level_c = "ERROR";
+	case SLON_ERROR:
+		level_c = "ERROR";
+#ifdef HAVE_SYSLOG
+                syslog_level = LOG_ERR;
+#endif
 							break;
-		case SLON_FATAL:	level_c = "FATAL";
+	case SLON_FATAL:
+		level_c = "FATAL";
+#ifdef HAVE_SYSLOG
+                syslog_level = LOG_ERR;
+#endif
+
 							break;
 	}
 
@@ -90,15 +145,13 @@
 			slon_abort();
 		}
 	}
-
 	sprintf(outbuf, "");
 	
-	if (logtimestamp == true)
+	if (logtimestamp == true && (Use_syslog != 1))
 	{
 		strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S %Z", localtime(&stamp_time));
 		sprintf(outbuf, "%s ", time_buf);
 	}
-
 	if (logpid == true)
 	{
 		sprintf(outbuf, "%s[%d] ", outbuf, slon_pid);
@@ -117,7 +170,12 @@
 			slon_abort();
 		}
 	}
-
+#ifdef HAVE_SYSLOG
+	if (Use_syslog >= 1)
+	{
+		write_syslog(syslog_level,outbuf);
+	}
+#endif
 	fwrite(outbuf, strlen(outbuf), 1, stdout);
 	fflush(stdout);
 	pthread_mutex_unlock(&log_mutex);
@@ -129,8 +187,8 @@
 /*
  * scanint8 --- try to parse a string into an int8.
  *
- * If errorOK is false, ereport a useful error message if the string is bad.
- * If errorOK is true, just return "false" for bad input.
+ * If errorOK is false, ereport a useful error message if the string is bad. If
+ * errorOK is true, just return "false" for bad input.
  */
 int
 slon_scanint64(char *str, int64 *result)
@@ -155,9 +213,9 @@
 		sign = -1;
 
 		/*
-		 * Do an explicit check for INT64_MIN.	Ugly though this is, it's
-		 * cleaner than trying to get the loop below to handle it
-		 * portably.
+		 * Do an explicit check for INT64_MIN.	Ugly though this is,
+		 * it's cleaner than trying to get the loop below to handle
+		 * it portably.
 		 */
 #ifndef INT64_IS_BUSTED
 		if (strcmp(ptr, "9223372036854775808") == 0)
@@ -166,8 +224,7 @@
 			return true;
 		}
 #endif
-	}
-	else if (*ptr == '+')
+	} else if (*ptr == '+')
 		ptr++;
 
 	/* require at least one digit */
@@ -193,3 +250,106 @@
 	return true;
 }
 
+#if HAVE_SYSLOG
+static void
+write_syslog(int level, const char *line)
+{
+        static bool openlog_done = false;
+        static unsigned long seq = 0;
+        static int      syslog_fac = LOG_LOCAL0;
+
+        int                     len = strlen(line);
+
+        if (Use_syslog == 0)
+                return;
+
+        if (!openlog_done)
+        {
+                if (strcasecmp(Syslog_facility, "LOCAL0") == 0)
+                        syslog_fac = LOG_LOCAL0;
+                if (strcasecmp(Syslog_facility, "LOCAL1") == 0)
+                        syslog_fac = LOG_LOCAL1;
+                if (strcasecmp(Syslog_facility, "LOCAL2") == 0)
+                        syslog_fac = LOG_LOCAL2;
+                if (strcasecmp(Syslog_facility, "LOCAL3") == 0)
+                        syslog_fac = LOG_LOCAL3;
+                if (strcasecmp(Syslog_facility, "LOCAL4") == 0)
+                        syslog_fac = LOG_LOCAL4;
+                if (strcasecmp(Syslog_facility, "LOCAL5") == 0)
+                        syslog_fac = LOG_LOCAL5;
+                if (strcasecmp(Syslog_facility, "LOCAL6") == 0)
+                        syslog_fac = LOG_LOCAL6;
+                if (strcasecmp(Syslog_facility, "LOCAL7") == 0)
+                        syslog_fac = LOG_LOCAL7;
+                openlog(Syslog_ident, LOG_PID | LOG_NDELAY, syslog_fac);
+                openlog_done = true;
+        }
+
+        /*
+         * We add a sequence number to each log message to suppress "same"
+         * messages.
+         */
+        seq++;
+
+        /* divide into multiple syslog() calls if message is too long */
+        /* or if the message contains embedded NewLine(s) '\n' */
+        if (len > SLON_SYSLOG_LIMIT || strchr(line, '\n') != NULL)
+        {
+	        int                     chunk_nr = 0;
+
+                while (len > 0)
+                {
+                        char            buf[SLON_SYSLOG_LIMIT + 1];
+                        int                     buflen;
+                        int                     i;
+
+                        /* if we start at a newline, move ahead one char */
+                        if (line[0] == '\n')
+                        {
+                                line++;
+                                len--;
+                                continue;
+                        }
+
+                        strncpy(buf, line, SLON_SYSLOG_LIMIT);
+                        buf[SLON_SYSLOG_LIMIT] = '\0';
+                        if (strchr(buf, '\n') != NULL)
+                                *strchr(buf, '\n') = '\0';
+
+                        buflen = strlen(buf);
+
+                        if (buflen <= 0)
+                                return;
+                        buf[buflen] = '\0';
+
+                        /* already word boundary? */
+                        if (!isspace((unsigned char) line[buflen]) &&
+                                line[buflen] != '\0')
+                        {
+                                /* try to divide at word boundary */
+                                i = buflen - 1;
+                                while (i > 0 && !isspace((unsigned char) buf[i]))
+                                        i--;
+
+                                if (i > 0)              /* else couldn't divide word boundary */
+                                {
+                                        buflen = i;
+                                        buf[i] = '\0';
+                                }
+                        }
+
+                        chunk_nr++;
+
+                        syslog(level, "[%lu-%d] %s", seq, chunk_nr, buf);
+                        line += buflen;
+                        len -= buflen;
+                }
+        }
+        else
+        {
+                /* message short enough */
+                syslog(level, "[%lu] %s", seq, line);
+        }
+}
+#endif   /* HAVE_SYSLOG */
+
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.16
retrieving revision 1.17
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.16 -r1.17
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -28,18 +28,16 @@
 #include "slon.h"
 
 
-/* ----------
- * Global data
- * ----------
+/*
+ * ---------- Global data ----------
  */
 int		vac_frequency = SLON_VACUUM_FREQUENCY;
 
-/* ----------
- * cleanupThread_main
+/*
+ * ---------- cleanupThread_main
  *
- *	Periodically calls the stored procedure to remove old events
- *	and log data and vacuums those tables.
- * ----------
+ * Periodically calls the stored procedure to remove old events and log data and
+ * vacuums those tables. ----------
  */
 void *
 cleanupThread_main(void *dummy)
@@ -70,8 +68,8 @@
 	dbconn = conn->dbconn;
 
 	/*
-	 * Build the query string for calling the cleanupEvent()
-	 * stored procedure
+	 * Build the query string for calling the cleanupEvent() stored
+	 * procedure
 	 */
 	dstring_init(&query1);
 	slon_mkquery(&query1, "select %s.cleanupEvent(); ", rtcfg_namespace);
@@ -231,5 +229,3 @@
 	slon_log(SLON_DEBUG1, "cleanupThread: thread done\n");
 	pthread_exit(NULL);
 }
-
-
Index: confoptions.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slon/confoptions.c -Lsrc/slon/confoptions.c -u -w -r1.1 -r1.2
--- src/slon/confoptions.c
+++ src/slon/confoptions.c
@@ -21,7 +21,8 @@
 
 
 
-void build_conf_variables(void)
+void 
+build_conf_variables(void)
 {
 	int	size_vars;
 	int	num_vars=0;
@@ -32,7 +33,10 @@
 	{
 		struct config_bool *conf = &ConfigureNamesBool[i];
 
-		/* Rather than requiring vartype to be filled in by hand, do this: */
+		/*
+		 * Rather than requiring vartype to be filled in by hand, do
+		 * this:
+		 */
 		conf->gen.vartype = SLON_C_BOOL;
 		num_vars++;
 	}
@@ -66,7 +70,6 @@
 		slon_log(FATAL, "malloc failed");
 		return;
 	}
-
 	num_vars = 0;
 
 	for (i = 0; ConfigureNamesBool[i].gen.name; i++)
@@ -113,8 +116,7 @@
                         size_vars = 100;
                         conf_vars = (struct config_generic **)
                                 malloc(size_vars * sizeof(struct config_generic *));
-                }
-                else
+		} else
                 {
                         conf_vars = (struct config_generic **)
                                 realloc(conf_variables, size_vars * sizeof(struct config_generic *));
@@ -125,7 +127,6 @@
 			slon_log(elevel,"malloc failed");
                         return false;           /* out of memory */
 		}
-
                 conf_variables = conf_vars;
                 size_conf_variables = size_vars;
         }
@@ -135,7 +136,8 @@
         return true;
 }
 
-void InitializeConfOptions(void)
+void 
+InitializeConfOptions(void)
 {
 	int	i;
 	char *env;
@@ -171,8 +173,11 @@
 					char	*str;
 					struct config_int *conf = (struct config_int *) gconf;
 					*conf->variable = NULL;
+				if (conf->default_val)
+				{
 					str = strdup(conf->default_val);
 					*conf->variable = str;
+				}
 					break;
 				}
 		}
@@ -196,57 +201,49 @@
 		{
 			*result = true;
 		}
-	}
-	else if (strncasecmp(value, "false", len) == 0)
+	} else if (strncasecmp(value, "false", len) == 0)
 	{
 		if (result)
 		{
 			*result = false;
 		}
-	}
-        else if (strncasecmp(value, "yes", len) == 0)
+	} else if (strncasecmp(value, "yes", len) == 0)
         {
                 if (result)
                 {
                         *result = true;
                 }
-        }
-        else if (strncasecmp(value, "no", len) == 0)
+	} else if (strncasecmp(value, "no", len) == 0)
         {
                 if (result)
                 {
                         *result = false;
                 }
-        }
-        else if (strncasecmp(value, "on", len) == 0)
+	} else if (strncasecmp(value, "on", len) == 0)
         {
                 if (result)
                 {
                         *result = true;
                 }
-        }
-        else if (strncasecmp(value, "off", len) == 0)
+	} else if (strncasecmp(value, "off", len) == 0)
         {
                 if (result)
                 {
                         *result = false;
                 }
-        }
-        else if (strncasecmp(value, "1", len) == 0)
+	} else if (strncasecmp(value, "1", len) == 0)
         {
                 if (result)
                 {
                         *result = true;
                 }
-        }
-        else if (strncasecmp(value, "o", len) == 0)
+	} else if (strncasecmp(value, "o", len) == 0)
         {
                 if (result)
                 {
                         *result = false;
                 }
-        }
-	else
+	} else
 	{
 		return false;
 	}
@@ -254,10 +251,9 @@
 }
 
 /*
- * Try to parse value as an integer.  The accepted formats are the
- * usual decimal, octal, or hexadecimal formats.  If the string parses
- * okay, return true, else false.  If result is not NULL, return the
- * value there.
+ * Try to parse value as an integer.  The accepted formats are the usual
+ * decimal, octal, or hexadecimal formats.  If the string parses okay, return
+ * true, else false.  If result is not NULL, return the value there.
  */
 static bool
 parse_int(const char *value, int *result)
@@ -281,9 +277,9 @@
 }
 
 /*
- * Try to parse value as a floating point constant in the usual
- * format.      If the value parsed okay return true, else false.  If
- * result is not NULL, return the semantic value there.
+ * Try to parse value as a floating point constant in the usual format.
+ * If the value parsed okay return true, else false.  If result is not NULL,
+ * return the semantic value there.
  */
 static bool
 parse_real(const char *value, double *result)
@@ -301,7 +297,8 @@
 }
 
 
-static struct config_generic *find_option(const char *name, int elevel)
+static struct config_generic *
+find_option(const char *name, int elevel)
 {
 	const char *dot;
 	const char **key = &name;
@@ -355,7 +352,8 @@
 }
 
 
-bool set_config_option(const char *name, const char *value)
+bool 
+set_config_option(const char *name, const char *value)
 {
   struct config_generic *record;
   int elevel;
@@ -369,7 +367,6 @@
     slon_log(elevel, "unrecognized configuration parameter \"%s\"\n", name);
     return false;
   }
-
   switch (record->vartype)
   {
     case SLON_C_BOOL:
@@ -383,8 +380,7 @@
 				slon_log(elevel, "parameter \"%s\" requires a Boolean value\n", name);
 				return false;
 			}
-		}
-		else
+			} else
 		{
 			slon_log(elevel, "parameter \"%s\"\n", name );
 		}
@@ -410,8 +406,7 @@
 					 newval, name, conf->min, conf->max);
 				return false;
 			}
-		}
-		else
+			} else
 		{
 			slon_log(elevel, "parameter \"%s\"\n", name );
 		}
@@ -436,8 +431,7 @@
 					newval, name, conf->min, conf->max);
 				return false;
 			}
-		}
-		else
+			} else
 		{
 			slon_log(elevel, "parameter \"%s\"\n", name );
 		}
@@ -456,9 +450,7 @@
 			{
 				return false;
 			}
-
-		}
-		else
+			} else
 		{
 			slon_log(elevel, "parameter \"%s\"\n", name );
 			free(newval);
@@ -469,4 +461,3 @@
   }
   return true;
 }
-
Index: dbutils.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/dbutils.c,v
retrieving revision 1.11
retrieving revision 1.12
diff -Lsrc/slon/dbutils.c -Lsrc/slon/dbutils.c -u -w -r1.11 -r1.12
--- src/slon/dbutils.c
+++ src/slon/dbutils.c
@@ -32,21 +32,19 @@
 static int	slon_appendquery_int(SlonDString *dsp, char *fmt, va_list ap);
 
 /* 
- * This mutex is used to wrap around PQconnectdb. There's a problem
- * that occurs when your libpq is compiled with libkrb (kerberos)
- * which is not threadsafe.  It is especially odd because I'm not using
- * kerberos. 
+ * This mutex is used to wrap around PQconnectdb. There's a problem that
+ * occurs when your libpq is compiled with libkrb (kerberos) which is not
+ * threadsafe.  It is especially odd because I'm not using kerberos.
  * 
- * This is fixed in libpq in 8.0, but for now (and for older versions
- * we'll just use this mutex. 
+ * This is fixed in libpq in 8.0, but for now (and for older versions we'll just
+ * use this mutex.
  *
  */
 static pthread_mutex_t slon_connect_lock = PTHREAD_MUTEX_INITIALIZER;
 
 
-/* ----------
- * slon_connectdb
- * ----------
+/*
+ * ---------- slon_connectdb ----------
  */
 SlonConn *
 slon_connectdb(char *conninfo, char *symname)
@@ -75,10 +73,9 @@
 		PQfinish(dbconn);
 		return NULL;
 	}
-
 	/*
-	 * Embed it into a SlonConn structure used to exchange it with
-	 * the scheduler. On return this new connection object is locked.
+	 * Embed it into a SlonConn structure used to exchange it with the
+	 * scheduler. On return this new connection object is locked.
 	 */
 	conn = slon_make_dummyconn(symname);
 	conn->dbconn = dbconn;
@@ -87,9 +84,8 @@
 }
 
 
-/* ----------
- * slon_disconnectdb
- * ----------
+/*
+ * ---------- slon_disconnectdb ----------
  */
 void
 slon_disconnectdb(SlonConn *conn)
@@ -103,16 +99,15 @@
 #endif
 
 	/*
-	 * Unlock and destroy the condition and mutex variables
-	 * and free memory.
+	 * Unlock and destroy the condition and mutex variables and free
+	 * memory.
 	 */
 	slon_free_dummyconn(conn);
 }
 
 
-/* ----------
- * slon_make_dummyconn
- * ----------
+/*
+ * ---------- slon_make_dummyconn ----------
  */
 SlonConn *
 slon_make_dummyconn(char *symname)
@@ -142,9 +137,8 @@
 }
 
 
-/* ----------
- * slon_free_dummyconn
- * ----------
+/*
+ * ---------- slon_free_dummyconn ----------
  */
 void
 slon_free_dummyconn(SlonConn *conn)
@@ -167,11 +161,10 @@
 }
 
 
-/* ----------
- * db_getLocalNodeId
+/*
+ * ---------- db_getLocalNodeId
  *
- *	Query a connection for the value of sequence sl_local_node_id
- * ----------
+ * Query a connection for the value of sequence sl_local_node_id ----------
  */
 int
 db_getLocalNodeId(PGconn *conn)
@@ -202,7 +195,6 @@
 		PQclear(res);
 		return -1;
 	}
-
 	/*
 	 * Return the result as an integer value
 	 */
@@ -213,16 +205,13 @@
 }
 
 
-/* ----------
- * slon_mkquery
+/*
+ * ---------- slon_mkquery
  *
- *	A simple query formatting and quoting function using dynamic string
- *	buffer allocation.
- *	Similar to sprintf() it uses formatting symbols:
- *		%s		String argument
- *		%q		Quoted literal (\ and ' will be escaped)
- *		%d		Integer argument
- * ----------
+ * A simple query formatting and quoting function using dynamic string buffer
+ * allocation. Similar to sprintf() it uses formatting symbols: %s
+ * tring argument %q		Quoted literal (\ and ' will be escaped) %d
+ * nteger argument ----------
  */
 int
 slon_mkquery(SlonDString *dsp, char *fmt, ...)
@@ -241,11 +230,10 @@
 }
 
 
-/* ----------
- * slon_appendquery
+/*
+ * ---------- slon_appendquery
  *
- *	Append query string material to an existing dynamic string.
- * ----------
+ * Append query string material to an existing dynamic string. ----------
  */
 int
 slon_appendquery(SlonDString *dsp, char *fmt, ...)
@@ -262,11 +250,10 @@
 }
 
 
-/* ----------
- * slon_appendquery_int
+/*
+ * ---------- slon_appendquery_int
  *
- *	Implementation of slon_mkquery() and slon_appendquery().
- * ----------
+ * Implementation of slon_mkquery() and slon_appendquery(). ----------
  */
 static int
 slon_appendquery_int(SlonDString *dsp, char *fmt, va_list ap)
@@ -282,12 +269,14 @@
 				fmt++;
 				switch(*fmt)
 				{
-					case 's':	s = va_arg(ap, char *);
+			case 's':
+				s = va_arg(ap, char *);
 								dstring_append(dsp, s);
 								fmt++;
 								break;
 
-					case 'q':	s = va_arg(ap, char *);
+			case 'q':
+				s = va_arg(ap, char *);
 								while (s && *s != '\0')
 								{
 									switch (*s)
@@ -307,24 +296,28 @@
 								fmt++;
 								break;
 
-					case 'd':	sprintf(buf, "%d", va_arg(ap, int));
+			case 'd':
+				sprintf(buf, "%d", va_arg(ap, int));
 								dstring_append(dsp, buf);
 								fmt++;
 								break;
 
-					default:	dstring_addchar(dsp, '%');
+			default:
+				dstring_addchar(dsp, '%');
 								dstring_addchar(dsp, *fmt);
 								fmt++;
 								break;
 				}
 				break;
 
-			case '\\':	fmt++;
+		case '\\':
+			fmt++;
 						dstring_addchar(dsp, *fmt);
 						fmt++;
 						break;
 
-			default:	dstring_addchar(dsp, *fmt);
+		default:
+			dstring_addchar(dsp, *fmt);
 						fmt++;
 						break;
 		}
@@ -334,5 +327,3 @@
 
 	return 0;
 }
-
-
Index: slon.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v
retrieving revision 1.29
retrieving revision 1.30
diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.29 -r1.30
--- src/slon/slon.c
+++ src/slon/slon.c
@@ -29,9 +29,8 @@
 #include "slon.h"
 #include "confoptions.h"
 
-/* ----------
- * Global data
- * ----------
+/*
+ * ---------- Global data ----------
  */
 int slon_restart_request = false;
 
@@ -39,9 +38,8 @@
 pthread_cond_t		slon_wait_listen_cond = PTHREAD_COND_INITIALIZER;
 
 
-/* ----------
- * Local data
- * ----------
+/*
+ * ---------- Local data ----------
  */
 static pthread_t        local_event_thread;
 static pthread_t        local_cleanup_thread;
@@ -53,11 +51,10 @@
 static void				sigalrmhandler(int signo);
 
 int slon_log_level;
+char *pid_file;
 
-
-/* ----------
- * main
- * ----------
+/*
+ * ---------- main ----------
  */
 int
 main (int argc, char *const argv[])
@@ -78,7 +75,7 @@
         InitializeConfOptions();
 
 
-	while ((c = getopt(argc, argv, "f:d:s:t:g:c:f:h")) != EOF)
+	while ((c = getopt(argc, argv, "f:d:s:t:g:c:p:h")) != EOF)
 	{
 		switch(c)
 		{
@@ -105,12 +102,16 @@
 			case 'c':
 				set_config_option("vac_frequency", optarg);
 				break;
+		case 'p':
+			set_config_option("pid_file", optarg);
+			break;
 
                         case 'h':
 				errors++;
                                 break;
 
-			default:	fprintf(stderr, "unknown option '%c'\n", c);
+		default:
+			fprintf(stderr, "unknown option '%c'\n", c);
 						errors++;
 						break;
 		}
@@ -140,10 +141,10 @@
 		fprintf(stderr, "    -t <milliseconds>     SYNC interval timeout (default 60000)\n");
 		fprintf(stderr, "    -g <num>              maximum SYNC group size (default 6)\n");
 		fprintf(stderr, "    -c <num>		   how often to vaccum in cleanup cycles\n");
+		fprintf(stderr, "    -p <filename>         slon pid file\n");
 		fprintf(stderr, "    -f <filename>	   slon configuration file\n");
 		return 1;
 	}
-
 	/*
 	 * Make sure the sync interval isn't too small.
 	 */
@@ -151,8 +152,8 @@
 		sync_interval_timeout = sync_interval * 2;
 
 	/*
-	 * Remember the cluster name and build the properly quoted 
-	 * namespace identifier
+	 * Remember the cluster name and build the properly quoted namespace
+	 * identifier
 	 */
 	slon_pid = getpid();
 	rtcfg_cluster_name	= (char *)argv[optind];
@@ -175,7 +176,8 @@
 	rtcfg_conninfo = (char *)argv[++optind];
 
 	/*
-	 * Connect to the local database for reading the initial configuration
+	 * Connect to the local database for reading the initial
+	 * configuration
 	 */
 	startup_conn = PQconnectdb(rtcfg_conninfo);
 	if (startup_conn == NULL)
@@ -190,7 +192,6 @@
 		PQfinish(startup_conn);
 		slon_exit(-1);
 	}
-
 	/*
 	 * Get our local node ID
 	 */
@@ -202,6 +203,23 @@
 	}
 	slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid);
 
+	if (pid_file)
+	{
+		FILE *pidfile;
+
+        	pidfile=fopen(pid_file,"w");
+		if(pidfile)
+		{
+		  fprintf(pidfile,"%d",slon_pid);
+                  fclose(pidfile);
+                }
+		else
+		{
+			slon_log(SLON_WARN, "Cannot open pid_file \"%s\", pid_file\n");
+		}
+	}
+
+
 	/*
 	 * Start the event scheduling system
 	 */
@@ -260,8 +278,7 @@
 			 */
 			rtcfg_nodeactive  = no_active;
 			rtcfg_nodecomment = strdup(no_comment);
-		}
-		else
+		} else
 		{
 			/*
 			 * Add a remote node
@@ -271,8 +288,8 @@
 			rtcfg_setNodeLastEvent(no_id, last_event);
 
 			/*
-			 * If it is active, remember for activation just before
-			 * we start processing events.
+			 * If it is active, remember for activation just
+			 * before we start processing events.
 			 */
 			if (no_active)
 				rtcfg_needActivate(no_id);
@@ -405,8 +422,7 @@
 	}
 	if (PQntuples(res) == 0)
 		strcpy(rtcfg_lastevent, "-1");
-	else
-		if (PQgetisnull(res, 0, 0))
+	else if (PQgetisnull(res, 0, 0))
 			strcpy(rtcfg_lastevent, "-1");
 		else
 			strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0));
@@ -437,11 +453,10 @@
 	slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n");
 
 	/*
-	 * Create the local event thread that is monitoring
-	 * the local node for administrative events to adjust the
-	 * configuration at runtime.
-	 * We wait here until the local listen thread has checked that
-	 * there is no other slon daemon running.
+	 * Create the local event thread that is monitoring the local node
+	 * for administrative events to adjust the configuration at runtime.
+	 * We wait here until the local listen thread has checked that there
+	 * is no other slon daemon running.
 	 */
 	pthread_mutex_lock(&slon_wait_listen_lock);
 	if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0)
@@ -459,8 +474,8 @@
 	rtcfg_doActivate();
 
 	/*
-	 * Create the local cleanup thread that will remove old
-	 * events and log data.
+	 * Create the local cleanup thread that will remove old events and
+	 * log data.
 	 */
 	if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0)
 	{
@@ -468,10 +483,9 @@
 				strerror(errno));
 		slon_abort();
 	}
-
 	/*
-	 * Create the local sync thread that will generate SYNC
-	 * events if we had local database updates.
+	 * Create the local sync thread that will generate SYNC events if we
+	 * had local database updates.
 	 */
 	if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0)
 	{
@@ -479,7 +493,6 @@
 				strerror(errno));
 		slon_abort();
 	}
-
 	/*
 	 * Wait until the scheduler has shut down all remote connections
 	 */
@@ -526,7 +539,6 @@
 				"main: cannot restart via execvp(): %s\n", strerror(errno));
 		exit(-1);
 	}
-
 	/*
 	 * That's it.
 	 */
@@ -538,6 +550,10 @@
 void
 slon_exit(int code)
 {
+	if (pid_file)
+	{
+		unlink(pid_file);
+	}
 	exit(code);
 }
 
@@ -558,9 +574,5 @@
 		}
 		exit(-1);
 	}
-
 	pthread_kill(main_thread, SIGALRM);
 }
-
-
-


More information about the Slony1-commit mailing list