CVS User Account cvsuser
Mon Nov 21 21:20:12 PST 2005
Log Message:
-----------
Restructuring of the watchdog process structure.

The watchdog process (original pid started as slon) will now only
die if it got the appropriate signal (usually SIGINT or SIGTERM).
If the worker process will die for any other reason, it will get
restarted.

Jan

Modified Files:
--------------
    slony1-engine/src/slon:
        cleanup_thread.c (r1.27 -> r1.28)
        dbutils.c (r1.16 -> r1.17)
        local_listen.c (r1.33 -> r1.34)
        misc.c (r1.20 -> r1.21)
        remote_listen.c (r1.23 -> r1.24)
        remote_worker.c (r1.97 -> r1.98)
        runtime_config.c (r1.25 -> r1.26)
        scheduler.c (r1.21 -> r1.22)
        slon.c (r1.59 -> r1.60)
        slon.h (r1.53 -> r1.54)
        sync_thread.c (r1.15 -> r1.16)

-------------- next part --------------
Index: scheduler.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/scheduler.c,v
retrieving revision 1.21
retrieving revision 1.22
diff -Lsrc/slon/scheduler.c -Lsrc/slon/scheduler.c -u -w -r1.21 -r1.22
--- src/slon/scheduler.c
+++ src/slon/scheduler.c
@@ -50,8 +50,8 @@
 static pthread_t sched_main_thread;
 static pthread_t sched_scheduler_thread;
 
-static pthread_mutex_t sched_master_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t sched_master_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t sched_master_lock;
+static pthread_cond_t sched_master_cond;
 
 
 /*
@@ -66,7 +66,7 @@
 /*
  * ---------- sched_start_mainloop
  *
- * Called from main() before starting up any worker thread.
+ * Called from SlonMain() before starting up any worker thread.
  *
  * This will spawn the event scheduling thread that does the central select(2)
  * system call. ----------
@@ -74,17 +74,41 @@
 int
 sched_start_mainloop(void)
 {
+	sched_status = SCHED_STATUS_OK;
+	sched_waitqueue_head = NULL;
+	sched_waitqueue_tail = NULL;
+	sched_numfd = 0;
+	FD_ZERO(&sched_fdset_read);
+	FD_ZERO(&sched_fdset_write);
+
 	/*
 	 * Remember the main threads identifier
 	 */
 	sched_main_thread = pthread_self();
 
 	/*
+	 * Initialize the master lock and condition variables
+	 */
+	if (pthread_mutex_init(&sched_master_lock, NULL) < 0)
+	{
+		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_init() - %s\n",
+				strerror(errno));
+		return -1;
+	}
+	if (pthread_cond_init(&sched_master_cond, NULL) < 0)
+	{
+		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_cond_init() - %s\n",
+				strerror(errno));
+		return -1;
+	}
+
+	/*
 	 * Grab the scheduler master lock
 	 */
 	if (pthread_mutex_lock(&sched_master_lock) < 0)
 	{
-		perror("sched_start_mainloop: pthread_mutex_lock()");
+		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_lock() - %s\n",
+				strerror(errno));
 		return -1;
 	}
 
@@ -93,7 +117,8 @@
 	 */
 	if (pthread_create(&sched_scheduler_thread, NULL, sched_mainloop, NULL) < 0)
 	{
-		perror("sched_start_mainloop: pthread_create()");
+		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_create() - %s\n",
+				strerror(errno));
 		return -1;
 	}
 
@@ -102,7 +127,8 @@
 	 */
 	if (pthread_cond_wait(&sched_master_cond, &sched_master_lock) < 0)
 	{
-		perror("sched_start_mainloop: pthread_cond_wait()");
+		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_cond_wait() - %s\n",
+				strerror(errno));
 		return -1;
 	}
 
@@ -111,7 +137,8 @@
 	 */
 	if (pthread_mutex_unlock(&sched_master_lock) < 0)
 	{
-		perror("sched_start_mainloop: pthread_mutex_unlock()");
+		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_unlock() - %s\n",
+				strerror(errno));
 		return -1;
 	}
 
@@ -329,7 +356,7 @@
 		if (pipewrite(sched_wakeuppipe[1], "x", 1) < 0)
 		{
 			perror("sched_wait_conn: write()");
-			slon_abort();
+			slon_restart();
 		}
 	}
 	pthread_mutex_unlock(&sched_master_lock);
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.97
retrieving revision 1.98
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.97 -r1.98
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -314,7 +314,7 @@
 	 * Connect to the local database
 	 */
 	if ((local_conn = slon_connectdb(rtcfg_conninfo, "remote_worker")) == NULL)
-		slon_abort();
+		slon_retry();
 	local_dbconn = local_conn->dbconn;
 
 	/*
@@ -324,7 +324,7 @@
 				 "select %s.setSessionRole('_%s', 'slon'); ",
 				 rtcfg_namespace, rtcfg_cluster_name);
 	if (query_execute(node, local_dbconn, &query1) < 0)
-		slon_abort();
+		slon_retry();
 
 	/*
 	 * Work until shutdown or node destruction
@@ -370,7 +370,7 @@
 						 "remoteWorkerThread_%d: got message "
 						 "condition but queue is empty\n",
 						 node->no_id);
-				slon_abort();
+				slon_retry();
 			}
 		}
 		msg = node->message_head;
@@ -412,7 +412,7 @@
 			slon_log(SLON_FATAL,
 					 "remoteWorkerThread_%d: unknown WMSG type %d\n",
 					 node->no_id, msg->msg_type);
-			slon_abort();
+			slon_retry();
 		}
 		event_ok = true;
 		event = (SlonWorkMsg_event *) msg;
@@ -502,7 +502,7 @@
 						}
 						if (event->ev_seqno >= quit_sync_finalsync) {
 							slon_log(SLON_FATAL, "ABORT at sync %d per command line request%n", quit_sync_finalsync);
-							slon_abort();
+							slon_retry();
 						}
 					}
 				}
@@ -533,7 +533,7 @@
 				 * the transaction yet.
 				 */
 				if (query_execute(node, local_dbconn, &query1) < 0)
-					slon_abort();
+					slon_retry();
 
 				/*
 				 * Process the sync and apply the replication data. If
@@ -552,7 +552,7 @@
 				 */
 				slon_mkquery(&query2, "rollback transaction");
 				if (query_execute(node, local_dbconn, &query2) < 0)
-					slon_abort();
+					slon_retry();
 
 				if ((rc = sched_msleep(node, seconds * 1000)) != SCHED_STATUS_OK)
 					break;
@@ -577,7 +577,7 @@
 			slon_appendquery(&query1, "commit transaction;");
 
 			if (query_execute(node, local_dbconn, &query1) < 0)
-				slon_abort();
+				slon_retry();
 		}
 		else
 		{
@@ -614,7 +614,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 
@@ -638,7 +638,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -668,18 +668,18 @@
 
 					slon_appendquery(&query1, "commit transaction; ");
 					if (query_execute(node, local_dbconn, &query1) < 0)
-						slon_abort();
+						slon_retry();
 
 					slon_mkquery(&query1, "select %s.uninstallNode(); ",
 								 rtcfg_namespace);
 					if (query_execute(node, local_dbconn, &query1) < 0)
-						slon_abort();
+						slon_retry();
 
 					slon_mkquery(&query1, "drop schema %s cascade; ",
 								 rtcfg_namespace);
 					query_execute(node, local_dbconn, &query1);
 
-					slon_abort();
+					slon_retry();
 				}
 
 				/*
@@ -695,7 +695,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -720,7 +720,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -744,7 +744,7 @@
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
 
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -766,7 +766,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -788,7 +788,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 
@@ -812,7 +812,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -833,13 +833,13 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					rc = generate_archive_header(rtcfg_nodeid, seqbuf);
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					slon_mkquery(&query1, 
 						     "delete from %s.sl_setsync_offline "
@@ -849,13 +849,13 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					rc = close_log_archive();
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -879,13 +879,13 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					rc = generate_archive_header(rtcfg_nodeid, seqbuf);
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					rc = slon_mkquery(&query1, 
 							  "delete from %s.sl_setsync_offline "
@@ -895,13 +895,13 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					rc = close_log_archive();
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -917,7 +917,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -933,7 +933,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -949,7 +949,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -965,7 +965,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -982,7 +982,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -999,7 +999,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -1017,7 +1017,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -1035,7 +1035,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -1093,7 +1093,7 @@
 					{
 						slon_log(SLON_DEBUG2, "ACCEPT_SET - MOVE_SET or FAILOVER_SET not received yet - sleep\n");
 						if (sched_msleep(node, 10000) != SCHED_STATUS_OK)
-							slon_abort();
+							slon_retry();
 						PQclear(res);
 						res = PQexec(local_dbconn, dstring_data(&query2));
 					}
@@ -1106,7 +1106,7 @@
 					query_append_event(&query1, event);
 					slon_appendquery(&query1, "commit transaction;");
 					query_execute(node, local_dbconn, &query1);
-					slon_abort();
+					slon_retry();
 
 					need_reloadListen = true;
 			    } else {
@@ -1134,7 +1134,7 @@
 						 rtcfg_namespace,
 						 set_id, old_origin, new_origin);
 				if (query_execute(node, local_dbconn, &query1) < 0)
-					slon_abort();
+					slon_retry();
 
 				slon_mkquery(&query1,
 							 "select sub_provider from %s.sl_subscribe "
@@ -1147,7 +1147,7 @@
 							 node->no_id, dstring_data(&query1),
 							 PQresultErrorMessage(res));
 					PQclear(res);
-					slon_abort();
+					slon_retry();
 				}
 				if (PQntuples(res) == 1)
 				{
@@ -1183,7 +1183,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 				need_reloadListen = true;
@@ -1207,7 +1207,7 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 				need_reloadListen = true;
@@ -1269,7 +1269,7 @@
 						 * ...
 						 */
 						if (query_execute(node, local_dbconn, &query1) < 0)
-							slon_abort();
+							slon_retry();
 
 						/*
 						 * If the copy succeeds, exit the loop and let the
@@ -1295,7 +1295,7 @@
 								 "sleep %d seconds\n",
 								 node->no_id, sub_set, sleeptime);
 						if (query_execute(node, local_dbconn, &query2) < 0)
-							slon_abort();
+							slon_retry();
 						sched_rc = sched_msleep(node, sleeptime * 1000);
 						if (sched_rc != SCHED_STATUS_OK)
 						{
@@ -1341,13 +1341,13 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					rc = generate_archive_header(rtcfg_nodeid, seqbuf);
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					slon_mkquery(&query1, 
 						     "delete from %s.sl_setsync_offline "
@@ -1357,13 +1357,13 @@
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					rc = close_log_archive();
 					if (rc < 0) {
 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", 
 							 node->no_id, archive_tmp, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 				}
 			}
@@ -1388,28 +1388,28 @@
 					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						     "Could not open DDL archive file %s - %s",
 					       node->no_id, archive_tmp, strerror(errno));
-					    slon_abort();
+					    slon_retry();
 					}
 					generate_archive_header(node->no_id, seqbuf);
 					if (rc < 0) {
 					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						     "Could not generate DDL archive header %s - %s",
 						     node->no_id, archive_tmp, strerror(errno));
-					    slon_abort();
+					    slon_retry();
 					}
 					rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c);
 					if (rc < 0) {
 					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						     "Could not generate DDL archive tracker %s - %s",
 						     node->no_id, archive_tmp, strerror(errno));
-					    slon_abort();
+					    slon_retry();
 					}
 					rc = submit_string_to_archive(ddl_script);
 					if (rc < 0) {
 					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						     "Could not submit DDL Script %s - %s",
 						     node->no_id, archive_tmp, strerror(errno));
-					    slon_abort();
+					    slon_retry();
 					}
 					
 					rc = close_log_archive();
@@ -1417,7 +1417,7 @@
 					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						     "Could not close DDL Script %s - %s",
 						     node->no_id, archive_tmp, strerror(errno));
-					    slon_abort();
+					    slon_retry();
 					}
 				    }
 				}
@@ -1456,7 +1456,7 @@
 				slon_mkquery(&query1, "rollback transaction;");
 			}
 			if (query_execute(node, local_dbconn, &query1) < 0)
-				slon_abort();
+				slon_retry();
 
 			if (need_reloadListen)
 			{
@@ -1601,7 +1601,7 @@
 						slon_log(SLON_FATAL, "remoteWorkerThread_%d: ",
 								 "pthread_create() - %s\n",
 								 node->no_id, strerror(errno));
-						slon_abort();
+						slon_retry();
 					}
 					slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: "
 							 "helper thread for provider %d created\n",
@@ -1884,7 +1884,7 @@
 	if (msg == NULL)
 	{
 		perror("remoteWorker_event: malloc()");
-		slon_abort();
+		slon_retry();
 	}
 	memset(msg, 0, sizeof(SlonWorkMsg_event));
 
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.23
retrieving revision 1.24
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.23 -r1.24
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -478,7 +478,7 @@
 			if (listat == NULL)
 			{
 				perror("remoteListen_adjust_listat: malloc()");
-				slon_abort();
+				slon_restart();
 			}
 			memset(listat, 0, sizeof(struct listat));
 
Index: sync_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/sync_thread.c,v
retrieving revision 1.15
retrieving revision 1.16
diff -Lsrc/slon/sync_thread.c -Lsrc/slon/sync_thread.c -u -w -r1.15 -r1.16
--- src/slon/sync_thread.c
+++ src/slon/sync_thread.c
@@ -62,7 +62,7 @@
 	 * Connect to the local database
 	 */
 	if ((conn = slon_connectdb(rtcfg_conninfo, "local_sync")) == NULL)
-		slon_abort();
+		slon_retry();
 	dbconn = conn->dbconn;
 
 	/*
@@ -106,7 +106,7 @@
 					 "syncThread: \"%s\" - %s",
 					 dstring_data(&query1), PQresultErrorMessage(res));
 			PQclear(res);
-			slon_abort();
+			slon_retry();
 			break;
 		}
 
@@ -134,7 +134,7 @@
 						 "syncThread: \"%s\" - %s",
 						 dstring_data(&query2), PQresultErrorMessage(res));
 				PQclear(res);
-				slon_abort();
+				slon_retry();
 				break;
 			}
 			slon_log(SLON_DEBUG2,
@@ -152,7 +152,7 @@
 						 "syncThread: \"commit transaction;\" - %s",
 						 PQresultErrorMessage(res));
 				PQclear(res);
-				slon_abort();
+				slon_retry();
 			}
 			PQclear(res);
 
@@ -175,7 +175,7 @@
 						 "syncThread: \"rollback transaction;\" - %s",
 						 PQresultErrorMessage(res));
 				PQclear(res);
-				slon_abort();
+				slon_retry();
 			}
 			PQclear(res);
 		}
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.53
retrieving revision 1.54
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.53 -r1.54
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -325,8 +325,9 @@
  */
 extern pid_t slon_pid;
 #ifndef WIN32
-extern pid_t slon_ppid;
-extern pid_t slon_cpid;
+extern pthread_mutex_t slon_watchdog_lock;
+extern pid_t slon_watchdog_pid;
+extern pid_t slon_worker_pid;
 #endif
 extern char *rtcfg_cluster_name;
 extern char *rtcfg_namespace;
@@ -349,12 +350,34 @@
 #ifndef WIN32
 #define slon_abort() \
 do { \
-	kill((slon_ppid == 0 ? slon_pid : slon_ppid), SIGTERM); \
+	pthread_mutex_lock(&slon_watchdog_lock); \
+	if (slon_watchdog_pid >= 0) { \
+		slon_log(SLON_DEBUG2, "slon_abort() from pid=%d\n", slon_pid); \
+		kill(slon_watchdog_pid, SIGTERM); \
+		slon_watchdog_pid = -1; \
+	} \
+	pthread_mutex_unlock(&slon_watchdog_lock); \
 	pthread_exit(NULL); \
 } while (0)
 #define slon_restart() \
 do { \
-	kill((slon_ppid == 0 ? slon_pid : slon_ppid), SIGHUP); \
+	pthread_mutex_lock(&slon_watchdog_lock); \
+	if (slon_watchdog_pid >= 0) { \
+		slon_log(SLON_DEBUG2, "slon_restart() from pid=%d\n", slon_pid); \
+		kill(slon_watchdog_pid, SIGHUP); \
+		slon_watchdog_pid = -1; \
+	} \
+	pthread_mutex_unlock(&slon_watchdog_lock); \
+} while (0)
+#define slon_retry() \
+do { \
+	pthread_mutex_lock(&slon_watchdog_lock); \
+	if (slon_watchdog_pid >= 0) { \
+		slon_log(SLON_DEBUG2, "slon_retry() from pid=%d\n", slon_pid); \
+		kill(slon_watchdog_pid, SIGUSR1); \
+		slon_watchdog_pid = -1; \
+	} \
+	pthread_mutex_unlock(&slon_watchdog_lock); \
 } while (0)
 #else /* WIN32 */
 /* On win32, we currently just bail out and let the service control manager
@@ -369,13 +392,16 @@
     WSACleanup(); \
     exit(1); \
 } while (0)
+#define slon_retry() \
+do { \
+    WSACleanup(); \
+    exit(1); \
+} while (0)
 #endif
 
 extern void slon_exit(int code);
 extern void Usage(char * const argv[]);
 
-extern int	slon_restart_request;
-extern int watchdog_pipe[];
 extern int sched_wakeuppipe[];
 extern pthread_mutex_t slon_wait_listen_lock;
 extern pthread_cond_t slon_wait_listen_cond;
Index: runtime_config.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/runtime_config.c,v
retrieving revision 1.25
retrieving revision 1.26
diff -Lsrc/slon/runtime_config.c -Lsrc/slon/runtime_config.c -u -w -r1.25 -r1.26
--- src/slon/runtime_config.c
+++ src/slon/runtime_config.c
@@ -34,8 +34,9 @@
  */
 pid_t		slon_pid;
 #ifndef WIN32
-pid_t		slon_cpid;
-pid_t		slon_ppid;
+pthread_mutex_t slon_watchdog_lock;
+pid_t		slon_watchdog_pid;
+pid_t		slon_worker_pid;
 #endif
 char	   *rtcfg_cluster_name = NULL;
 char	   *rtcfg_namespace = NULL;
@@ -133,7 +134,7 @@
 	if (node == NULL)
 	{
 		perror("rtcfg_storeNode: malloc()");
-		slon_abort();
+		slon_retry();
 	}
 	memset(node, 0, sizeof(SlonNode));
 
@@ -230,7 +231,7 @@
 
 		slon_log(SLON_FATAL,
 				 "enableNode: unknown node ID %d\n", no_id);
-		slon_abort();
+		slon_retry();
 		return;
 	}
 
@@ -265,7 +266,7 @@
 
 		slon_log(SLON_FATAL,
 				 "enableNode: unknown node ID %d\n", no_id);
-		slon_abort();
+		slon_retry();
 		return;
 	}
 
@@ -449,7 +450,7 @@
 				 PQresultErrorMessage(res));
 		PQclear(res);
 		dstring_free(&query);
-		slon_exit(-1);
+		slon_retry();
 	}
 	for (i = 0, n = PQntuples(res); i < n; i++)
 	{
@@ -485,7 +486,7 @@
 	{
 		slon_log(SLON_FATAL,
 				 "storeListen: unknown node ID %d\n", li_provider);
-		slon_abort();
+		slon_retry();
 		return;
 	}
 
@@ -517,7 +518,7 @@
 	if (listen == NULL)
 	{
 		perror("rtcfg_storeListen: malloc()");
-		slon_abort();
+		slon_retry();
 	}
 	memset(listen, 0, sizeof(SlonListen));
 
@@ -550,7 +551,7 @@
 	{
 		slon_log(SLON_FATAL,
 				 "dropListen: unknown node ID %d\n", li_provider);
-		slon_abort();
+		slon_retry();
 		return;
 	}
 
@@ -638,7 +639,7 @@
 	if (set == NULL)
 	{
 		perror("rtcfg_storeSet: malloc()");
-		slon_abort();
+		slon_retry();
 	}
 	memset(set, 0, sizeof(SlonSet));
 
@@ -733,7 +734,7 @@
 	 */
 	rtcfg_unlock();
 	slon_log(SLON_FATAL, "rtcfg_moveSet(): set %d not found\n", set_id);
-	slon_abort();
+	slon_retry();
 }
 
 
@@ -778,7 +779,7 @@
 	slon_log(SLON_FATAL,
 			 "storeSubscribe: set %d not found\n", sub_set);
 	rtcfg_unlock();
-	slon_abort();
+	slon_retry();
 }
 
 
@@ -820,7 +821,7 @@
 	slon_log(SLON_FATAL,
 			 "enableSubscription: set %d not found\n", sub_set);
 	rtcfg_unlock();
-	slon_abort();
+	slon_retry();
 }
 
 
@@ -861,7 +862,7 @@
 	slon_log(SLON_FATAL,
 			 "unsubscribeSet: set %d not found\n", sub_set);
 	rtcfg_unlock();
-	slon_abort();
+	slon_retry();
 }
 
 
@@ -892,7 +893,7 @@
 							 "remoteWorkerThread - %s\n",
 							 strerror(errno));
 					rtcfg_unlock();
-					slon_abort();
+					slon_retry();
 				}
 				node->worker_status = SLON_TSTAT_RUNNING;
 				break;
@@ -946,7 +947,7 @@
 							 "remoteListenThread - %s\n",
 							 strerror(errno));
 					rtcfg_unlock();
-					slon_abort();
+					slon_retry();
 				}
 				break;
 
@@ -1019,7 +1020,7 @@
 	if (anode == NULL)
 	{
 		perror("rtcfg_needActivate: malloc()");
-		slon_abort();
+		slon_retry();
 	}
 	anode->no_id = no_id;
 	DLLIST_ADD_TAIL(to_activate_head, to_activate_tail, anode);
Index: misc.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/misc.c,v
retrieving revision 1.20
retrieving revision 1.21
diff -Lsrc/slon/misc.c -Lsrc/slon/misc.c -u -w -r1.20 -r1.21
--- src/slon/misc.c
+++ src/slon/misc.c
@@ -146,7 +146,7 @@
 		{
 			perror("slon_log: malloc()");
 			pthread_mutex_unlock(&log_mutex);
-			slon_abort();
+			slon_retry();
 		}
 	}
 	outbuf[0] = 0;
@@ -175,7 +175,7 @@
 		if (outbuf == NULL)
 		{
 			perror("slon_log: realloc()");
-			slon_abort();
+			slon_retry();
 		}
 	}
 #ifdef HAVE_SYSLOG
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.33
retrieving revision 1.34
diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.33 -r1.34
--- src/slon/local_listen.c
+++ src/slon/local_listen.c
@@ -56,7 +56,7 @@
 	 * Connect to the local database
 	 */
 	if ((conn = slon_connectdb(rtcfg_conninfo, "local_listen")) == NULL)
-		slon_abort();
+		slon_retry();
 	dbconn = conn->dbconn;
 
 	/*
@@ -80,7 +80,7 @@
 				 dstring_data(&query1), PQresultErrorMessage(res));
 		PQclear(res);
 		dstring_free(&query1);
-		slon_abort();
+		slon_retry();
 	}
 	PQclear(res);
 
@@ -131,7 +131,7 @@
 					 PQresultErrorMessage(res));
 			PQclear(res);
 			dstring_free(&query1);
-			slon_abort();
+			slon_retry();
 			break;
 		}
 		PQclear(res);
@@ -150,14 +150,13 @@
 		if (restart_request)
 		{
 			slon_log(SLON_INFO,
-					 "localListenThread: got restart notification - "
-					 "signal scheduler\n");
+					 "localListenThread: got restart notification\n");
 #ifndef WIN32
-			kill(getppid(), SIGHUP);
+			slon_restart();
 #else
 			/* XXX */
 			/* Win32 defer to service manager to restart for now */
-			slon_abort();
+			slon_restart();
 #endif
 		}
 
@@ -183,7 +182,7 @@
 					 dstring_data(&query1), PQresultErrorMessage(res));
 			PQclear(res);
 			dstring_free(&query1);
-			slon_abort();
+			slon_retry();
 			break;
 		}
 		ntuples = PQntuples(res);
@@ -271,7 +270,7 @@
 					slon_log(SLON_FATAL, "localListenThread: \"%s\" %s",
 							 notify_query, PQresultErrorMessage(notify_res));
 					PQclear(notify_res);
-					slon_abort();
+					slon_restart();
 				}
 				PQclear(notify_res);
 
@@ -511,7 +510,7 @@
 						 PQresultErrorMessage(res2));
 					dstring_free(&query2);
 					PQclear(res2);
-					slon_abort();
+					slon_retry();
 				}
 				if (PQntuples(res2) != 1)
 				{
@@ -520,7 +519,7 @@
 						 set_id);
 					dstring_free(&query2);
 					PQclear(res2);
-					slon_abort();
+					slon_retry();
 				}
 				
 				sub_provider =
@@ -645,7 +644,7 @@
 						 dstring_data(&query1), PQresultErrorMessage(res));
 				PQclear(res);
 				dstring_free(&query1);
-				slon_abort();
+				slon_retry();
 				break;
 			}
 			PQclear(res);
@@ -662,7 +661,7 @@
 						 "localListenThread: \"rollback transaction;\" - %s",
 						 PQresultErrorMessage(res));
 				PQclear(res);
-				slon_abort();
+				slon_retry();
 				break;
 			}
 			PQclear(res);
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.27
retrieving revision 1.28
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.27 -r1.28
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -97,7 +97,7 @@
 #else
 		exit(0);
 #endif
-		/* slon_abort(); */
+		/* slon_retry(); */
 	}
 	dbconn = conn->dbconn;
 
@@ -129,7 +129,7 @@
 					 "cleanupThread: \"%s\" - %s",
 					 dstring_data(&query1), PQresultErrorMessage(res));
 			PQclear(res);
-			slon_abort();
+			slon_retry();
 			break;
 		}
 		PQclear(res);
@@ -158,7 +158,7 @@
 					 "cleanupThread: \"%s\" - %s",
 					 dstring_data(&query2), PQresultErrorMessage(res));
 			PQclear(res);
-			slon_abort();
+			slon_retry();
 			break;
 		}
 		n = PQntuples(res);
@@ -188,7 +188,7 @@
 						 dstring_data(&query2), PQresultErrorMessage(res2));
 				PQclear(res);
 				PQclear(res2);
-				slon_abort();
+				slon_retry();
 				break;
 			}
 			PQclear(res2);
@@ -235,7 +235,7 @@
 						 "cleanupThread: \"%s\" - %s",
 						 dstring_data(&query3), PQresultErrorMessage(res));
 				PQclear(res);
-				/* slon_abort();
+				/* slon_retry();
 				   break; */
 			}
 			}
@@ -289,7 +289,7 @@
 	if (PQresultStatus(res) != PGRES_TUPLES_OK) {
 		slon_log(SLON_FATAL, "cleanupThread: could not getMinXid()!\n");
 		PQclear(res);
-		slon_abort();
+		slon_retry();
 		return -1;
 	} 
 	xid = strtoll(PQgetvalue(res, 0, 0), NULL, 10);
Index: dbutils.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/dbutils.c,v
retrieving revision 1.16
retrieving revision 1.17
diff -Lsrc/slon/dbutils.c -Lsrc/slon/dbutils.c -u -w -r1.16 -r1.17
--- src/slon/dbutils.c
+++ src/slon/dbutils.c
@@ -139,7 +139,7 @@
 	if (conn == NULL)
 	{
 		perror("slon_make_dummyconn: malloc()");
-		slon_abort();
+		slon_retry();
 	}
 	memset(conn, 0, sizeof(SlonConn));
 	conn->symname = strdup(symname);
Index: slon.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v
retrieving revision 1.59
retrieving revision 1.60
diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.59 -r1.60
--- src/slon/slon.c
+++ src/slon/slon.c
@@ -40,12 +40,16 @@
  * ---------- Global data ----------
  */
 #ifndef WIN32
-int			watchdog_pipe[2];
+#define		SLON_WATCHDOG_NORMAL		0
+#define		SLON_WATCHDOG_RESTART		1
+#define		SLON_WATCHDOG_RETRY			2
+#define		SLON_WATCHDOG_SHUTDOWN		3
+int			watchdog_status = SLON_WATCHDOG_NORMAL;
 #endif
 int			sched_wakeuppipe[2];
 
-pthread_mutex_t slon_wait_listen_lock = PTHREAD_MUTEX_INITIALIZER;
-pthread_cond_t slon_wait_listen_cond = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t slon_wait_listen_lock;
+pthread_cond_t slon_wait_listen_cond;
 
 /*
  * ---------- Local data ----------
@@ -61,12 +65,11 @@
 static pthread_t main_thread;
 static char *const *main_argv;
 
-static void SlonMain(PGconn *startup_conn);
+static void SlonMain(void);
 #ifndef WIN32
 static void SlonWatchdog(void);
 static void sighandler(int signo);
-static void main_sigalrmhandler(int signo);
-static void slon_kill_child(void);
+static void slon_terminate_worker(void);
 #endif
 
 int			slon_log_level;
@@ -115,7 +118,6 @@
 	PGresult   *res;
 	int			i	  ,
 				n;
-	PGconn	   *startup_conn;
 	int			c;
 	int			errors = 0;
 	char		pipe_c;
@@ -240,8 +242,14 @@
 	 */
 	slon_pid = getpid();
 #ifndef WIN32
-	slon_cpid = 0;
-	slon_ppid = 0;
+	if (pthread_mutex_init(&slon_watchdog_lock, NULL) < 0)
+	{
+		slon_log(SLON_FATAL, "slon: pthread_mutex_init() - %s\n", 
+				strerror(errno));
+		exit(-1);
+	}
+	slon_watchdog_pid = slon_pid;
+	slon_worker_pid = -1;
 #endif
 	main_argv = argv;
 
@@ -294,45 +302,10 @@
     err = WSAStartup(MAKEWORD(1, 1), &wsaData);
     if (err != 0) {
 		slon_log(SLON_FATAL, "main: Cannot start the network subsystem - %d\n", err);
-		slon_exit(-1);
+		exit(-1);
     }
 #endif
     
-	/*
-	 * Connect to the local database to read the initial configuration
-	 */
-
-
-	startup_conn = PQconnectdb(rtcfg_conninfo);
-	if (startup_conn == NULL)
-	{
-		slon_log(SLON_FATAL, "main: PQconnectdb() failed\n");
-		slon_exit(-1);
-	}
-	if (PQstatus(startup_conn) != CONNECTION_OK)
-	{
-		slon_log(SLON_FATAL, "main: Cannot connect to local database - %s\n",
-				 PQerrorMessage(startup_conn));
-		PQfinish(startup_conn);
-		slon_exit(-1);
-	}
-
-	/*
-	 * Get our local node ID
-	 */
-	rtcfg_nodeid = db_getLocalNodeId(startup_conn);
-	if (rtcfg_nodeid < 0)
-	{
-		slon_log(SLON_FATAL, "main: Node is not initialized properly\n");
-		slon_exit(-1);
-	}
-	if (db_checkSchemaVersion(startup_conn) < 0)
-	{
-		slon_log(SLON_FATAL, "main: Node has wrong Slony-I schema or module version loaded\n");
-		slon_exit(-1);
-	}
-	slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid);
-
 	if (pid_file)
 	{
 		FILE	   *pidfile;
@@ -350,16 +323,8 @@
 	}
 
 	/*
-	 * Pipes to be used as communication devices between the parent (watchdog)
-	 * and child (worker) processes.
+	 * Create the pipe used to kick the workers scheduler thread
 	 */
-#ifndef WIN32
-	if (pgpipe(watchdog_pipe) < 0)
-	{
-		slon_log(SLON_FATAL, "slon: parent pipe create failed -(%d) %s\n", errno,strerror(errno));
-		slon_exit(-1);
-	}
-#endif
 	if (pgpipe(sched_wakeuppipe) < 0)
 	{
 		slon_log(SLON_FATAL, "slon: sched_wakeuppipe create failed -(%d) %s\n", errno,strerror(errno));
@@ -370,93 +335,113 @@
 	 * other such tasks to the Service Control Manager. And win32 doesn't
 	 * support signals, so we don't need to catch them... */
 #ifndef WIN32
-	/*
-	 * Fork here to allow parent process to trap signals and child process to 
-	 * handle real processing work creating a watchdog and worker process
-	 * hierarchy
-	 */
-	if ((slon_cpid = fork()) < 0)
-	{
-		slon_log(SLON_FATAL, "Fork failed -(%d) %s\n", errno,strerror(errno));
-		slon_exit(-1);
-	}
-	else if (slon_cpid == 0) /* child */
-		SlonMain(startup_conn);
-	else
 		SlonWatchdog();
 #else
-	SlonMain(startup_conn);
+	SlonMain();
 #endif
 	exit(0);
 }
 
 
-static void SlonMain(PGconn *startup_conn)
+static void SlonMain(void)
 {
 		PGresult *res;
 		SlonDString query;
 		int i,n;
 		char		pipe_c;
+	PGconn		   *startup_conn;
 
 		slon_pid = getpid();
 #ifndef WIN32
-		slon_ppid = getppid();
+	slon_worker_pid = slon_pid;
 #endif
 
-		slon_log(SLON_DEBUG2, "main: main process started\n");
-#ifndef WIN32
+	if (pthread_mutex_init(&slon_wait_listen_lock, NULL) < 0)
+	{
+		slon_log(SLON_FATAL, "main: pthread_mutex_init() failed - %s\n",
+				strerror(errno));
+		slon_abort();
+	}
+	if (pthread_cond_init(&slon_wait_listen_cond, NULL) < 0)
+	{
+		slon_log(SLON_FATAL, "main: pthread_cond_init() failed - %s\n",
+				strerror(errno));
+		slon_abort();
+	}
+
 		/*
-		 * Wait for the parent process to initialize
+	 * Connect to the local database to read the initial configuration
 		 */
-		if (read(watchdog_pipe[0], &pipe_c, 1) != 1)
+	startup_conn = PQconnectdb(rtcfg_conninfo);
+	if (startup_conn == NULL)
 		{
-			slon_log(SLON_FATAL, "main: read from parent pipe failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
+		slon_log(SLON_FATAL, "main: PQconnectdb() failed\n");
+		slon_retry();
+		exit(-1);
 		}
-
-		if (pipe_c != 'p')
+	if (PQstatus(startup_conn) != CONNECTION_OK)
 		{
-			slon_log(SLON_FATAL, "main: incorrect data from parent pipe -(%c)\n",pipe_c);
-			slon_exit(-1);
+		slon_log(SLON_FATAL, "main: Cannot connect to local database - %s\n",
+				 PQerrorMessage(startup_conn));
+		PQfinish(startup_conn);
+		slon_retry();
+		exit(-1);
 		}
 
-		slon_log(SLON_DEBUG2, "main: begin signal handler setup\n");
+	/*
+	 * Get our local node ID
+	 */
+	rtcfg_nodeid = db_getLocalNodeId(startup_conn);
+	if (rtcfg_nodeid < 0)
+	{
+		slon_log(SLON_FATAL, "main: Node is not initialized properly\n");
+		slon_retry();
+		exit(-1);
+	}
+	if (db_checkSchemaVersion(startup_conn) < 0)
+	{
+		slon_log(SLON_FATAL, "main: Node has wrong Slony-I schema or module version loaded\n");
+		slon_abort();
+	}
+	slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid);
 
+#ifndef WIN32
 		if (signal(SIGHUP,SIG_IGN) == SIG_ERR)
 		{
-			slon_log(SLON_FATAL, "slon: SIGHUP signal handler setup failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
+		slon_log(SLON_FATAL, "main: SIGHUP signal handler setup failed -(%d) %s\n", errno,strerror(errno));
+		slon_abort();
 		}
 		if (signal(SIGINT,SIG_IGN) == SIG_ERR)
 		{
-			slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
+		slon_log(SLON_FATAL, "main: SIGINT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
+		slon_abort();
 		}
 		if (signal(SIGTERM,SIG_IGN) == SIG_ERR)
 		{
-			slon_log(SLON_FATAL, "slon: SIGTERM signal handler setup failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
+		slon_log(SLON_FATAL, "main: SIGTERM signal handler setup failed -(%d) %s\n", errno,strerror(errno));
+		slon_abort();
 		}
 		if (signal(SIGCHLD,SIG_IGN) == SIG_ERR)
 		{
-			slon_log(SLON_FATAL, "slon: SIGCHLD signal handler setup failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
+		slon_log(SLON_FATAL, "main: SIGCHLD signal handler setup failed -(%d) %s\n", errno,strerror(errno));
+		slon_abort();
 		}
 		if (signal(SIGQUIT,SIG_IGN) == SIG_ERR)
 		{
-			slon_log(SLON_FATAL, "slon: SIGQUIT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
+		slon_log(SLON_FATAL, "main: SIGQUIT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
+		slon_abort();
 		}
 
-		slon_log(SLON_DEBUG2, "main: end signal handler setup\n");
 #endif
 
+	slon_log(SLON_DEBUG2, "main: main process started\n");
+
 		/*
 		 * Start the event scheduling system
 		 */
 		slon_log(SLON_CONFIG, "main: launching sched_start_mainloop\n");
 		if (sched_start_mainloop() < 0)
-			slon_exit(-1);
+		slon_retry();
 
 		slon_log(SLON_CONFIG, "main: loading current cluster configuration\n");
 
@@ -471,7 +456,7 @@
 			slon_log(SLON_FATAL, "Cannot start transaction - %s\n",
 					 PQresultErrorMessage(res));
 			PQclear(res);
-			slon_exit(-1);
+		slon_retry();
 		}
 		PQclear(res);
 
@@ -494,7 +479,7 @@
 					 PQresultErrorMessage(res));
 			PQclear(res);
 			dstring_free(&query);
-			slon_exit(-1);
+		slon_retry();
 		}
 		for (i = 0, n = PQntuples(res); i < n; i++)
 		{
@@ -544,7 +529,7 @@
 					 PQresultErrorMessage(res));
 			PQclear(res);
 			dstring_free(&query);
-			slon_exit(-1);
+		slon_retry();
 		}
 		for (i = 0, n = PQntuples(res); i < n; i++)
 		{
@@ -575,7 +560,7 @@
 					 PQresultErrorMessage(res));
 			PQclear(res);
 			dstring_free(&query);
-			slon_exit(-1);
+		slon_retry();
 		}
 		for (i = 0, n = PQntuples(res); i < n; i++)
 		{
@@ -602,7 +587,7 @@
 					 PQresultErrorMessage(res));
 			PQclear(res);
 			dstring_free(&query);
-			slon_exit(-1);
+		slon_retry();
 		}
 		for (i = 0, n = PQntuples(res); i < n; i++)
 		{
@@ -631,7 +616,7 @@
 					 PQresultErrorMessage(res));
 			PQclear(res);
 			dstring_free(&query);
-			slon_exit(-1);
+		slon_retry();
 		}
 		if (PQntuples(res) == 0)
 			strcpy(rtcfg_lastevent, "-1");
@@ -654,7 +639,7 @@
 			slon_log(SLON_FATAL, "main: Cannot rollback transaction - %s\n",
 					 PQresultErrorMessage(res));
 			PQclear(res);
-			slon_exit(-1);
+		slon_retry();
 		}
 		PQclear(res);
 
@@ -676,7 +661,7 @@
 		{
 			slon_log(SLON_FATAL, "main: cannot create localListenThread - %s\n",
 					 strerror(errno));
-			slon_abort();
+		slon_retry();
 		}
 		pthread_cond_wait(&slon_wait_listen_cond, &slon_wait_listen_lock);
 		pthread_mutex_unlock(&slon_wait_listen_lock);
@@ -694,7 +679,7 @@
 		{
 			slon_log(SLON_FATAL, "main: cannot create cleanupThread - %s\n",
 					 strerror(errno));
-			slon_abort();
+		slon_retry();
 		}
 
 		/*
@@ -705,14 +690,14 @@
 		{
 			slon_log(SLON_FATAL, "main: cannot create syncThread - %s\n",
 					 strerror(errno));
-			slon_abort();
+		slon_retry();
 		}
 #ifdef HAVE_NETSNMP
 		if (pthread_create(&local_snmp_thread, NULL, snmpThread_main, NULL) < 0)
 		{
 			slon_log(SLON_FATAL, "main: cannot create snmpThread -%s\n",
 					strerror(errno));
-			slon_abort();
+		slon_retry();
 		}
 #endif
 		/*
@@ -722,7 +707,7 @@
 		if (sched_wait_mainloop() < 0)
 		{
 			slon_log(SLON_FATAL, "main: scheduler returned with error\n");
-			slon_abort();
+		slon_retry();
 		}
 		slon_log(SLON_DEBUG1, "main: scheduler mainloop returned\n");
 
@@ -730,18 +715,10 @@
 		 * Wait for all remote threads to finish
 		 */
 		main_thread = pthread_self();
-#ifndef WIN32 /* XXX WIN32 no sigalarm, how fix? XXX */
-		signal(SIGALRM, main_sigalrmhandler);
-		alarm(20);
-#endif
 
 		slon_log(SLON_DEBUG2, "main: wait for remote threads\n");
 		rtcfg_joinAllRemoteThreads();
 
-#ifndef WIN32 /* XXX WIN32 XXX */
-		alarm(0);
-#endif
-
 		/*
 		 * Wait for the local threads to finish
 		 */
@@ -763,19 +740,6 @@
 					strerror(errno));
 #endif
 
-		/*
-		 * Tell parent that worker is done
-		 */
-#ifndef WIN32
-		slon_log(SLON_DEBUG2, "main: notify parent that worker is done\n");
-
-		if (write(watchdog_pipe[1], "c", 1) != 1)
-		{
-			slon_log(SLON_FATAL, "main: write to watchdog pipe failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
-		}
-#endif
-
 		slon_log(SLON_DEBUG1, "main: done\n");
 
 		exit(0);
@@ -793,9 +757,6 @@
 		/* 
 		 * Install signal handlers 
 		 */
-		
-		slon_log(SLON_DEBUG2, "slon: begin signal handler setup\n");
-
 #ifndef CYGWIN
 		act.sa_handler = &sighandler; 
 		sigemptyset(&act.sa_mask);
@@ -810,6 +771,16 @@
 			slon_exit(-1);
 		}
 
+	if (signal(SIGUSR1,sighandler) == SIG_ERR)
+	{
+		slon_log(SLON_FATAL, "slon: SIGUSR1 signal handler setup failed -(%d) %s\n", errno,strerror(errno));
+		slon_exit(-1);
+	}
+	if (signal(SIGALRM,sighandler) == SIG_ERR)
+	{
+		slon_log(SLON_FATAL, "slon: SIGALRM signal handler setup failed -(%d) %s\n", errno,strerror(errno));
+		slon_exit(-1);
+	}
 		if (signal(SIGINT,sighandler) == SIG_ERR)
 		{
 			slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
@@ -831,22 +802,53 @@
 			slon_exit(-1);
 		}
 
-		slon_log(SLON_DEBUG2, "slon: end signal handler setup\n");
+	slon_log(SLON_DEBUG2, "slon: watchdog ready - pid = %d\n", slon_watchdog_pid);
 
-		/*
-		 * Tell worker/scheduler that parent has completed initialization
-		 */
-		if (write(watchdog_pipe[1], "p", 1) != 1)
+	slon_worker_pid = fork();
+	if (slon_worker_pid == 0)
 		{
-			slon_log(SLON_FATAL, "slon: write to pipe failed -(%d) %s\n", errno,strerror(errno));
-			slon_exit(-1);
+		SlonMain();
+		exit(-1);
 		}
 
-		slon_log(SLON_DEBUG2, "slon: wait for main child process\n");
+	slon_log(SLON_DEBUG2, "slon: worker process created - pid = %d\n",
+			slon_worker_pid);
+	while ((pid = wait(&child_status)) != slon_worker_pid)
+	{
+		if (pid < 0 && errno == EINTR)
+			continue;
+
+		slon_log(SLON_DEBUG2, "slon: child terminated status: %d; pid: %d, current worker pid: %d\n", child_status, pid, slon_worker_pid);
+	}
+	slon_log(SLON_DEBUG2, "slon: child terminated status: %d; pid: %d, current worker pid: %d\n", child_status, pid, slon_worker_pid);
 
-		while ((pid = wait(&child_status)) != slon_cpid)
+	alarm(0);
+
+	switch(watchdog_status)
 		{
-			slon_log(SLON_DEBUG2, "slon: child terminated status: %d; pid: %d, current worker pid: %d\n", child_status, pid, slon_cpid);
+		case SLON_WATCHDOG_RESTART:
+			execvp(main_argv[0], main_argv);
+			slon_log(SLON_FATAL, "slon: cannot restart via execvp() - %s\n",
+					strerror(errno));
+			slon_exit(-1);
+			break;
+
+		case SLON_WATCHDOG_NORMAL:
+		case SLON_WATCHDOG_RETRY:
+			watchdog_status = SLON_WATCHDOG_RETRY;
+			slon_log(SLON_DEBUG1, "slon: restart of worker in 10 seconds\n");
+			sleep(10);
+			if (watchdog_status == SLON_WATCHDOG_RETRY)
+			{
+				execvp(main_argv[0], main_argv);
+				slon_log(SLON_FATAL, "slon: cannot restart via execvp() - %s\n",
+						strerror(errno));
+				slon_exit(-1);
+			}
+			break;
+
+		default:
+			break;
 		}
 
 		slon_log(SLON_DEBUG1, "slon: done\n");
@@ -859,120 +861,58 @@
 
 
 static void
-main_sigalrmhandler(int signo)
-{
-	if (pthread_equal(main_thread, pthread_self()))
-	{
-		alarm(0);
-		slon_log(SLON_WARN, "main: shutdown timeout exiting\n");
-		kill(slon_ppid,SIGQUIT);
-		exit(-1);
-	}
-	else
-	{
-		slon_log(SLON_WARN, "main: force SIGALRM the main thread\n");
-		pthread_kill(main_thread,SIGALRM);
-	}
-}
-
-static void
 sighandler(int signo)
 {
 	switch (signo)
 	{
 	case SIGALRM:
+		slon_log(SLON_DEBUG1, "slon: child termination timeout - kill child\n");
+		kill(slon_worker_pid, SIGKILL);
+		break;
+		
 	case SIGCHLD:
 		break;
 		
 	case SIGHUP:
 		slon_log(SLON_DEBUG1, "slon: restart requested\n");
-		slon_kill_child();
-		execvp(main_argv[0], main_argv);
-		slon_log(SLON_FATAL, "slon: cannot restart via execvp(): %s\n", strerror(errno));
-		slon_exit(-1);
+		watchdog_status = SLON_WATCHDOG_RESTART;
+		slon_terminate_worker();
+		break;
+
+	case SIGUSR1:
+		slon_log(SLON_DEBUG1, "slon: retry requested\n");
+		watchdog_status = SLON_WATCHDOG_RETRY;
+		slon_terminate_worker();
 		break;
 
 	case SIGINT:
 	case SIGTERM:
 		slon_log(SLON_DEBUG1, "slon: shutdown requested\n");
-		slon_kill_child();
-		slon_exit(-1);
+		watchdog_status = SLON_WATCHDOG_SHUTDOWN;
+		slon_terminate_worker();
 		break;
 
 	case SIGQUIT:
 		slon_log(SLON_DEBUG1, "slon: shutdown now requested\n");
-		kill(slon_cpid,SIGKILL);
+		kill(slon_worker_pid, SIGKILL);
 		slon_exit(-1);
 		break;
 	}
 }
 
 void
-slon_kill_child()
+slon_terminate_worker()
 {
-	char			pipe_c;
-	struct timeval	tv;
-	fd_set			fds;
-	int				rc;
-	int				fd;
-
-	if (slon_cpid == 0) return;
-
-	tv.tv_sec = 60;
-	tv.tv_usec = 0;
-
 	slon_log(SLON_DEBUG2, "slon: notify worker process to shutdown\n");
 
-	fd = sched_wakeuppipe[1];
-	FD_ZERO(&fds);
-	FD_SET(fd,&fds);
-
-	rc = select(fd + 1, NULL, &fds, NULL, &tv);
-
-	if (rc == 0 || rc < 0)
-	{
-		slon_log(SLON_DEBUG2, "slon: select write to worker timeout\n");
-		kill(slon_cpid,SIGKILL);
-		slon_exit(-1);
-	}
-	
 	if (pipewrite(sched_wakeuppipe[1], "p", 1) != 1)
 	{
 		slon_log(SLON_FATAL, "main: write to worker pipe failed -(%d) %s\n", errno,strerror(errno));
-		kill(slon_cpid,SIGKILL);
+		kill(slon_worker_pid, SIGKILL);
 		slon_exit(-1);
 	}
 
-	slon_log(SLON_DEBUG2, "slon: wait for worker process to shutdown\n");
-
-	fd = watchdog_pipe[0];
-	FD_ZERO(&fds);
-	FD_SET(fd,&fds);
-
-	rc = select(fd + 1, &fds, NULL, NULL, &tv);
-
-	if (rc == 0 || rc < 0)
-	{
-		slon_log(SLON_DEBUG2, "slon: select read from worker pipe timeout\n");
-		kill(slon_cpid,SIGKILL);
-		slon_exit(-1);
-	}
-	
-	if (read(watchdog_pipe[0], &pipe_c, 1) != 1)
-	{
-		slon_log(SLON_FATAL, "slon: read from worker pipe failed -(%d) %s\n", errno,strerror(errno));
-		kill(slon_cpid,SIGKILL);
-		slon_exit(-1);
-	}
-
-	if (pipe_c != 'c')
-	{
-		slon_log(SLON_FATAL, "slon: incorrect data from worker pipe -(%c)\n",pipe_c);
-		kill(slon_cpid,SIGKILL);
-		slon_exit(-1);
-	}
-
-	slon_log(SLON_DEBUG2, "slon: worker process shutdown ok\n");
+	alarm(20);
 }
 #endif
 
@@ -982,11 +922,9 @@
 #ifdef WIN32
     /* Cleanup winsock */
     WSACleanup();
+#endif
     
 	if (pid_file)
-#else
-    if (slon_ppid == 0 && pid_file)
-#endif
 	{
 		slon_log(SLON_DEBUG2, "slon: remove pid file\n");
 		unlink(pid_file);


More information about the Slony1-commit mailing list