CVS User Account cvsuser
Mon Mar 7 23:27:08 PST 2005
Log Message:
-----------
Major changes to log shipping, allowing it to support substantially
all events (to the degree supportible).  Notably, COPY_SET now 
copies the contents of tables in newly subscribed sets.

Also includes a new event, ACCEPT_SET, which addresses a race condition
where updates might be lost. If ACCEPT_SET is received before the
MOVE_SET has been processed, then the slon will wait until it has
received both.

Modified Files:
--------------
    slony1-engine/src/backend:
        slony1_funcs.sql (r1.55 -> r1.56)
    slony1-engine/src/slon:
        cleanup_thread.c (r1.19 -> r1.20)
        remote_worker.c (r1.76 -> r1.77)
        slon.h (r1.45 -> r1.46)

Added Files:
-----------
    slony1-engine/src/ducttape:
        test_8_logship (r1.1)

-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.55
retrieving revision 1.56
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.55 -r1.56
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -1877,6 +1877,21 @@
 		end loop;
 	end if;
 
+	-- On the new origin, raise an event - ACCEPT_SET
+	if v_local_node_id = p_new_origin then
+		-- Find the event number from the origin
+		select max(ev_seqno) as seqno into v_sub_row 
+			from @NAMESPACE at .sl_event
+			where ev_type = ''MOVE_SET'' and
+			  ev_data1 = p_set_id and
+			  ev_data2 = p_old_origin and
+			  ev_data3 = p_new_origin and
+			  ev_origin = p_old_origin;
+		
+		perform @NAMESPACE at .createEvent(''_ at CLUSTERNAME@'', ''ACCEPT_SET'', 
+			p_set_id, p_old_origin, p_new_origin, v_sub_row.seqno);
+	end if;
+
 	-- ----
 	-- Next we have to reverse the subscription path
 	-- ----
@@ -4894,6 +4909,36 @@
 --
 --	Called by slonik during the function upgrade process. 
 -- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .add_missing_table_field (text, text, text, text) 
+returns bool as '
+DECLARE
+  p_namespace alias for $1;
+  p_table     alias for $2;
+  p_field     alias for $3;
+  p_type      alias for $4;
+  v_row       record;
+  v_query     text;
+BEGIN
+  select 1 into v_row from pg_namespace n, pg_class c, pg_attribute a
+     where quote_ident(n.nspname) = p_namespace and 
+         c.relnamespace = n.oid and
+         quote_ident(c.relname) = p_table and
+         a.attrelid = c.oid and
+         quote_ident(a.attname) = p_field;
+  if not found then
+    raise notice ''Upgrade table %.% - add field %'', p_namespace, p_table, p_field;
+    v_query := ''alter table '' || p_namespace || ''.'' || p_table || '' add column '';
+    v_query := v_query || p_field || '' '' || p_type || '';'';
+    execute v_query;
+    return ''t'';
+  else
+    return ''f'';
+  end if;
+END;' language plpgsql;
+
+comment on function @NAMESPACE at .add_missing_table_field (text, text, text, text) 
+is 'Add a column of a given type to a table if it is missing';
+
 create or replace function @NAMESPACE at .upgradeSchema(text)
 returns text as '
 
@@ -4945,7 +4990,6 @@
 		execute ''alter table @NAMESPACE at .sl_node add column no_spool boolean'';
 		update @NAMESPACE at .sl_node set no_spool = false;
 	end if;
-
 	return p_old;
 end;
 ' language plpgsql;
@@ -4990,6 +5034,6 @@
 			where con_origin = @NAMESPACE at .getLocalNodeId('_ at CLUSTERNAME@')
 			group by 1, 2
 		);
-comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.
-';
+
+comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.';
 
--- /dev/null
+++ src/ducttape/test_8_logship
@@ -0,0 +1,310 @@
+#!/bin/sh
+# $Id: test_8_logship,v 1.1 2005/03/07 23:27:02 cbbrowne Exp $
+# **********
+# test_8_logship
+#
+# 	This test script creates a standalone pgbench database 
+#	as slony_test1 and then:
+#
+#	- initializes a primary node and starts the node daemon
+#	- creates a set containing all 4 pgbench tables
+#	- creates a second database as slony_test2
+#	- adds database slony_test2 to the system
+#	- starts the second replication daemon
+#	- creates the pgbench tables (schema only)
+#	- subscribes the replication set from the primary node
+#
+#  The nature of the test has to do with the use of the new slonik
+#  log shipping functionality...
+# **********
+
+export PATH
+TMPOUT=/tmp/output.$$
+LOGSHIPDIR=/tmp/logs.$$
+mkdir -p $LOGSHIPDIR
+DB1=slony_test1
+DB2=slony_test
+CLUSTERNAME=T1
+PGBENCH_SCALE=1
+PGBENCH_CLIENTS=5
+PGBENCH_TRANS=`expr 30000 / $PGBENCH_CLIENTS`
+DEBUGLEVEL=4
+
+trap '
+	echo ""
+	echo "**** user abort"
+	if [ ! -z $pgbench_pid ] ; then
+		echo "**** killing pgbench"
+		kill -15 $pgbench_pid
+	fi
+	if [ ! -z $slon1_pid ] ; then
+		echo "**** killing node daemon 1"
+		kill -15 $slon1_pid
+	fi
+	if [ ! -z $slon2_pid ] ; then
+		echo "**** killing node daemon 2"
+		kill -15 $slon2_pid
+	fi
+	exit 1
+' 2 15
+
+######################################################################
+# Preparations ... create a standalone pgbench database and
+# have the "application" (pgbench) running.
+######################################################################
+
+#####
+# Make sure the install is up to date
+#####
+WGM=`which gmake`
+if [ -z $WGM ] ; then
+    MAKE=make
+    CGNU=`make -v | grep GNU`
+    if [ -z $CGNU ] ; then
+	echo "GNU Make not found - please install GNU Make"
+	exit 1
+    fi
+else
+    MAKE=gmake
+fi
+echo -n "**** running 'make install' in src directory ... "
+if ! ${MAKE} -C .. install >$TMPOUT 2>&1 ; then
+    echo "failed"; cat $TMPOUT; rm $TMPOUT; exit 1
+fi
+echo "done"
+rm $TMPOUT
+
+PREAMBLE_FILE=/tmp/preamble.$$
+cat <<EOF > $PREAMBLE_FILE
+define origin 11;
+define sub1 22;
+cluster name = $CLUSTERNAME;
+node @origin admin conninfo='dbname=$DB1';
+node @sub1 admin conninfo='dbname=$DB2';
+EOF
+
+
+#####
+# Remove old databases, if they exist
+#####
+echo "**** remove old test databases"
+dropdb $DB1 || echo "**** ignored"
+sleep 1
+dropdb $DB2 || echo "**** ignored"
+sleep 1
+
+#####
+# Create the "Primary Node"
+#####
+echo "**** creating database for Node 11"
+
+createdb $DB1 || exit 1
+pgbench -i -s $PGBENCH_SCALE $DB1
+pg_dump -s $DB1 >pgbench_schema.sql
+
+#####
+# Start pgbench in the background and give it rampup time
+#####
+pgbench -n -s $PGBENCH_SCALE -c $PGBENCH_CLIENTS -t $PGBENCH_TRANS $DB1 &
+pgbench_pid=$!
+echo "**** pgbench is running in background with pid $pgbench_pid"
+echo -n "**** sleeping 10 seconds to give pgbench time for rampup ... "
+sleep 10
+echo "done"
+
+echo ""
+echo "**********************************************************************"
+echo "**** $DB1 is now a standalone database with a running pgbench"
+echo "**********************************************************************"
+echo ""
+
+######################################################################
+# Setup DB1 as the primary cluster T1 node, start the node daemon,
+# and create a replication set containing the pgbench tables.
+######################################################################
+
+echo "**** initializing $DB1 as Primary Node for Slony-I cluster $CLUSTERNAME"
+slonik <<_EOF_
+	include <$PREAMBLE_FILE>;
+	init cluster (id = @origin, comment = 'Node @origin');
+	echo 'Database $DB1 initialized as Node 11';
+_EOF_
+if [ $? -ne 0 ] ; then
+	kill $pgbench_pid;
+	exit 1
+fi
+
+echo "**** starting the Slony-I node daemon for $DB1"
+xterm -title "Slon node 11" -e sh -c "slon -d$DEBUGLEVEL -s500 -g10 $CLUSTERNAME dbname=$DB1; echo -n 'Enter>'; read line" &
+slon1_pid=$!
+echo "slon[$slon1_pid] on dbname=$DB1"
+
+echo "**** creating a replication set containing the 4 pgbench tables ... "
+slonik <<_EOF_
+	include <$PREAMBLE_FILE>;
+	try {
+		table add key (node id = @origin, fully qualified name = 'public.history');
+	}
+	on error {
+		exit 1;
+	}
+
+	try {
+		create set (id = 1, origin = @origin, comment = 'Set 1 - pgbench tables');
+		set add table (set id = 1, origin = @origin,
+			id = 1, fully qualified name = 'public.accounts',
+			comment = 'Table accounts');
+		set add table (set id = 1, origin = @origin,
+			id = 2, fully qualified name = 'public.branches',
+			comment = 'Table branches');
+		set add table (set id = 1, origin = @origin,
+			id = 3, fully qualified name = 'public.tellers',
+			comment = 'Table tellers');
+		set add table (set id = 1, origin = @origin,
+			id = 4, fully qualified name = 'public.history',
+			key = serial, comment = 'Table accounts');
+	}
+	on error {
+		exit 1;
+	}
+_EOF_
+
+if [ $? -ne 0 ] ; then
+	echo "failed"
+	kill $pgbench_pid 2>/dev/null
+	kill $slon1_pid 2>/dev/null
+	cat $TMPOUT
+	rm $TMPOUT
+	exit 1
+fi
+echo "**** set created"
+
+#####
+# Check that pgbench is still running
+#####
+if ! kill -0 $pgbench_pid 2>/dev/null ; then
+	echo "**** pgbench terminated ???"
+	kill $slon1_pid 2>/dev/null
+	exit 1
+fi
+
+echo ""
+echo "**********************************************************************"
+echo "**** $DB1 is now the Slony-I origin for set 1"
+echo "**********************************************************************"
+echo ""
+
+######################################################################
+# Setup DB2 as a subscriber node and let it subscribe the replication
+# set of the running pgbench
+######################################################################
+echo "**** creating database for node 22"
+if ! createdb $DB2 ; then
+	kill $pgbench_pid 2>/dev/null
+	kill $slon1_pid 2>/dev/null
+	exit 1
+fi
+
+echo "**** initializing $DB2 as node 22 of Slony-I cluster $CLUSTERNAME"
+slonik <<_EOF_
+	include <$PREAMBLE_FILE>;
+	echo 'Creating node 22';
+	try {
+		store node (id = @sub1, comment = 'node @sub1', event node = @origin);
+        } on error {
+	    echo 'could not establish node @sub1';
+	    exit -1;
+	}
+	try {
+		store path (server = @origin, client = @sub1, conninfo = 'dbname=$DB1');
+		store path (server = @sub1, client = @origin, conninfo = 'dbname=$DB2');
+	}
+	on error { 
+	    echo 'could not establish paths between @origin and @sub1';
+	    exit -1; 
+	}
+	echo 'Database $DB2 added as node @sub1';
+_EOF_
+if [ $? -ne 0 ] ; then
+	kill $pgbench_pid 2>/dev/null
+	kill $slon1_pid 2>/dev/null
+	exit 1
+fi
+
+echo "**** starting the Slony-I node daemon for $DB1"
+xterm -title "Slon node 22" -e sh -c "slon -d$DEBUGLEVEL -s10000 -o10000 -g10 -a $LOGSHIPDIR $CLUSTERNAME dbname=$DB2; echo -n 'Enter>'; read line" &
+slon2_pid=$!
+echo "slon[$slon2_pid] on dbname=$DB2"
+
+#####
+# Check that pgbench is still running
+#####
+if ! kill -0 $pgbench_pid 2>/dev/null ; then
+	echo "**** pgbench terminated ???"
+	kill $slon1_pid 2>/dev/null
+	exit 1
+fi
+
+######################################################################
+# And now comes the moment where the big elephant starts to pee
+# and the attendants in the first row climb on their chairs ...
+######################################################################
+echo "**** creating pgbench tables and subscribing node 22 to set 1"
+(
+	cat pgbench_schema.sql
+) | psql -q $DB2
+slonik <<_EOF_
+	include <$PREAMBLE_FILE>;
+	subscribe set ( id = 1, provider = @origin, receiver = @sub1, forward = yes );
+_EOF_
+
+echo ""
+echo "**********************************************************************"
+echo "**** $DB2 should now be copying data and attempting to catch up."
+echo "**********************************************************************"
+echo ""
+
+echo -n "**** waiting for pgbench to finish "
+while kill -0 $pgbench_pid 2>/dev/null ; do
+	echo -n "."
+	sleep 10
+done
+echo "**** pgbench finished"
+echo "**** please terminate the replication engines when caught up."
+wait $slon1_pid
+wait $slon2_pid
+
+kill $pgbench_pid 2>/dev/null
+kill $slon1_pid 2>/dev/null
+kill $slon2_pid 2>/dev/null
+
+echo -n "**** comparing databases ... "
+psql $DB1 >dump.tmp.1.$$ <<_EOF_
+	select 'accounts:'::text, aid, bid, abalance, filler
+			from accounts order by aid;
+	select 'branches:'::text, bid, bbalance, filler
+			from branches order by bid;
+	select 'tellers:'::text, tid, bid, tbalance, filler
+			from tellers order by tid;
+	select 'history:'::text, tid, bid, aid, delta, mtime, filler,
+			"_Slony-I_${CLUSTERNAME}_rowID" from history order by "_Slony-I_${CLUSTERNAME}_rowID";
+_EOF_
+psql $DB2 >dump.tmp.2.$$ <<_EOF_
+	select 'accounts:'::text, aid, bid, abalance, filler
+			from accounts order by aid;
+	select 'branches:'::text, bid, bbalance, filler
+			from branches order by bid;
+	select 'tellers:'::text, tid, bid, tbalance, filler
+			from tellers order by tid;
+	select 'history:'::text, tid, bid, aid, delta, mtime, filler,
+			"_Slony-I_${CLUSTERNAME}_rowID" from history order by "_Slony-I_${CLUSTERNAME}_rowID";
+_EOF_
+
+if diff dump.tmp.1.$$ dump.tmp.2.$$ >test_1.diff ; then
+	echo "success - databases are equal."
+	rm dump.tmp.?.$$
+	rm test_1.diff
+else
+	echo "FAILED - see test_1.diff for database differences"
+fi
+rm $PREAMBLE_FILE
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.76
retrieving revision 1.77
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.76 -r1.77
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -249,7 +249,9 @@
 static int generate_archive_header (int node_id, char *seqbuf);
 static int submit_query_to_archive(SlonDString *ds);
 static int submit_string_to_archive (const char *s);
+static int submit_raw_data_to_archive (const char *s);
 static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf);
+static int write_void_log (int node_id, char *seqbuf, const char *message);
 
 #define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
 
@@ -587,6 +589,9 @@
 								 no_id, no_comment, no_spool);
 
 				need_reloadListen = true;
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_NODE");
+
 			}
 			else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
 			{
@@ -601,6 +606,9 @@
 								 no_id);
 
 				need_reloadListen = true;
+
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- ENABLE_NODE");
 			}
 			else if (strcmp(event->ev_type, "DROP_NODE") == 0)
 			{
@@ -650,6 +658,8 @@
 								 rtcfg_cluster_name);
 
 				need_reloadListen = true;
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_NODE");
 			}
 			else if (strcmp(event->ev_type, "STORE_PATH") == 0)
 			{
@@ -667,6 +677,8 @@
 							pa_server, pa_client, pa_conninfo, pa_connretry);
 
 				need_reloadListen = true;
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_PATH");
 			}
 			else if (strcmp(event->ev_type, "DROP_PATH") == 0)
 			{
@@ -682,6 +694,8 @@
 								 pa_server, pa_client);
 
 				need_reloadListen = true;
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_PATH");
 			}
 			else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
 			{
@@ -696,6 +710,8 @@
 								 "select %s.storeListen_int(%d, %d, %d); ",
 								 rtcfg_namespace,
 								 li_origin, li_provider, li_receiver);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_LISTEN");
 			}
 			else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
 			{
@@ -710,6 +726,8 @@
 								 "select %s.dropListen_int(%d, %d, %d); ",
 								 rtcfg_namespace,
 								 li_origin, li_provider, li_receiver);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_LISTEN");
 			}
 			else if (strcmp(event->ev_type, "STORE_SET") == 0)
 			{
@@ -724,6 +742,9 @@
 								 "select %s.storeSet_int(%d, %d, '%q'); ",
 								 rtcfg_namespace,
 								 set_id, set_origin, set_comment);
+
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_SET");
 			}
 			else if (strcmp(event->ev_type, "DROP_SET") == 0)
 			{
@@ -734,18 +755,46 @@
 				slon_appendquery(&query1,
 								 "select %s.dropSet_int(%d); ",
 								 rtcfg_namespace, set_id);
+
+				/* The table deleted needs to be
+				 * dropped from log shipping too */
+				if (archive_dir) {
+				    rc = open_log_archive(rtcfg_nodeid, seqbuf);
+				    rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+				    slon_mkquery(&query1, 
+						 "delete from %s.sl_setsync_offline "
+						 "  where ssy_setid= %d;",
+						 rtcfg_namespace, set_id);
+				    rc = submit_query_to_archive(&query1);
+				    rc = close_log_archive();
+				}
 			}
 			else if (strcmp(event->ev_type, "MERGE_SET") == 0)
 			{
 				int			set_id = (int)strtol(event->ev_data1, NULL, 10);
 				int			add_id = (int)strtol(event->ev_data2, NULL, 10);
-
+				char dropquery[280];
 				rtcfg_dropSet(add_id);
 
 				slon_appendquery(&query1,
 								 "select %s.mergeSet_int(%d, %d); ",
 								 rtcfg_namespace,
 								 set_id, add_id);
+				
+				/* Log shipping gets the change here
+				 * that we need to delete the table
+				 * being merged from the set being
+				 * maintained. */
+				if (archive_dir) {
+				    rc = open_log_archive(rtcfg_nodeid, seqbuf);
+				    rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+				    rc = slon_mkquery(&query1, 
+						      "delete from %s.sl_setsync_offline "
+						      "  where ssy_setid= %d;",
+						      rtcfg_namespace, add_id);
+				    rc = submit_query_to_archive(&query1);
+				    rc = close_log_archive();
+				}
 			}
 			else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
 			{
@@ -754,6 +803,8 @@
 				 * subscribed sets yet and table information is not maintained
 				 * in the runtime configuration.
 				 */
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE");
 			}
 			else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
 			{
@@ -762,6 +813,8 @@
 				 * subscribed sets yet and sequences information is not
 				 * maintained in the runtime configuration.
 				 */
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE");
 			}
 			else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0)
 			{
@@ -770,6 +823,8 @@
 				slon_appendquery(&query1, "select %s.setDropTable_int(%d);",
 								 rtcfg_namespace,
 								 tab_id);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE");
 			}
 			else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0)
 			{
@@ -778,6 +833,8 @@
 				slon_appendquery(&query1, "select %s.setDropSequence_int(%d);",
 								 rtcfg_namespace,
 								 seq_id);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE");
 			}
 			else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0)
 			{
@@ -787,6 +844,8 @@
 				slon_appendquery(&query1, "select %s.setMoveTable_int(%d, %d);",
 								 rtcfg_namespace,
 								 tab_id, new_set_id);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE");
 			}
 			else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0)
 			{
@@ -796,6 +855,8 @@
 				slon_appendquery(&query1, "select %s.setMoveSequence_int(%d, %d);",
 								 rtcfg_namespace,
 								 seq_id, new_set_id);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE");
 			}
 			else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
 			{
@@ -806,6 +867,8 @@
 								 "select %s.storeTrigger_int(%d, '%q'); ",
 								 rtcfg_namespace,
 								 trig_tabid, trig_tgname);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER");
 			}
 			else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
 			{
@@ -816,6 +879,70 @@
 								 "select %s.dropTrigger_int(%d, '%q'); ",
 								 rtcfg_namespace,
 								 trig_tabid, trig_tgname);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER");
+			}
+			else if (strcmp(event->ev_type, "ACCEPT_SET") == 0)
+			{
+			    int set_id = (int) strtol(event->ev_data1, NULL, 10);
+			    int old_origin = (int) strtol(event->ev_data2, NULL, 10);
+			    int new_origin = (int) strtol(event->ev_data3, NULL, 10);
+			    int seq_no = (int) strtol(event->ev_data4, NULL, 10);
+			    PGresult   *res;
+			    
+			    /* If we're a remote node, and haven't yet
+			     * received the MOVE_SET event from the
+			     * new origin, then we'll need to sleep a
+			     * bit...  This avoids a race condition
+			     * where new SYNCs take place on the new
+			     * origin, and are ignored on some
+			     * subscribers (and their children)
+			     * because the MOVE_SET wasn't yet
+			     * received and processed  */
+
+			    if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) {
+				slon_mkquery(&query1, 
+					     "select 1 from %s.sl_event accept "
+					     "where "
+					     "   accept.ev_type = 'ACCEPT_SET' and "
+					     "   accept.ev_origin = %d and "
+					     "   accept.ev_data1 = %d and "
+					     "   accept.ev_data2 = %d and "
+					     "   accept.ev_data3 = %d and "
+					     "   accept.ev_data4 = %d and "
+					     "   not exists  "
+					     "   (select 1 from %s.sl_event move "
+					     "    where "
+					     "      accept.ev_origin = move.ev_data3 and "
+					     "      move.ev_type = 'MOVE_SET' and "
+					     "      move.ev_data1 = accept.ev_data1 and "
+					     "      move.ev_data2 = accept.ev_data2 and "
+					     "      move.ev_data3 = accept.ev_data3 and "
+					     "      move.ev_seqno = %d); ",
+					     
+					     rtcfg_namespace, 
+					     old_origin, set_id, old_origin, new_origin, seq_no,
+					     rtcfg_namespace, seq_no);
+				res = PQexec(local_dbconn, dstring_data(&query1));
+				while (PQntuples(res) > 0) {
+				    int sleeptime = 15;
+				    int sched_rc;
+				    slon_log(SLON_WARN, "remoteWorkerThread_%d: "
+					     "accept set: node has not yet received MOVE_SET event "
+					     "for set %d old origin %d new origin - sleep %d seconds\n",
+					     rtcfg_nodeid, set_id, old_origin, new_origin, sleeptime);
+				    sched_rc = sched_msleep(node, sleeptime * 1000);
+				    if (sched_rc != SCHED_STATUS_OK) {
+					event_ok = false;
+					break;
+				    } else {
+					if (sleeptime < 60)
+					    sleeptime *= 2;
+				    }
+				    res = PQexec(local_dbconn, dstring_data(&query1));
+				}
+			    }
+			    
 			}
 			else if (strcmp(event->ev_type, "MOVE_SET") == 0)
 			{
@@ -831,6 +958,7 @@
 				 * that, we need to execute it now and select the resulting
 				 * provider for us.
 				 */
+
 				slon_appendquery(&query1,
 								 "select %s.moveSet_int(%d, %d, %d); ",
 								 rtcfg_namespace,
@@ -880,6 +1008,8 @@
 								 rtcfg_namespace,
 								 failed_node, backup_node, set_id);
 
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- FAILOVER_SET");
 				need_reloadListen = true;
 			}
 			else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
@@ -896,7 +1026,8 @@
 							"select %s.subscribeSet_int(%d, %d, %d, '%q'); ",
 								 rtcfg_namespace,
 						   sub_set, sub_provider, sub_receiver, sub_forward);
-
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET");
 				need_reloadListen = true;
 			}
 			else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
@@ -1004,7 +1135,9 @@
 									 rtcfg_namespace,
 									 sub_set, sub_provider, sub_receiver);
 				}
-
+				/* Note: No need to do anything based
+				   on archive_dir here; copy_set does
+				   that nicely already.  */
 				need_reloadListen = true;
 			}
 			else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
@@ -1022,6 +1155,16 @@
 								 sub_set, sub_receiver);
 
 				need_reloadListen = true;
+				if (archive_dir) {
+					rc = open_log_archive(rtcfg_nodeid, seqbuf);
+					rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+					slon_mkquery(&query1, 
+						     "delete from %s.sl_setsync_offline "
+						     "  where ssy_setid= %d;",
+						     rtcfg_namespace, sub_set);
+					rc = submit_query_to_archive(&query1);
+					rc = close_log_archive();
+				}
 			}
 			else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
 			{
@@ -1087,11 +1230,15 @@
 								 "select %s.updateReloid(%d, '%q', %d); ",
 								 rtcfg_namespace,
 							   reset_config_setid, reset_configonly_on_node);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- RESET_CONFIG");
 			}
 			else
 			{
 				printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
 					   node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
+				if (archive_dir)
+					write_void_log (rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!");
 			}
 
 			/*
@@ -2107,6 +2254,33 @@
 	dstring_init(&query1);
 	sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
 
+
+	/* Log Shipping Support begins... */
+	/*  - Open the log, put the header in
+	    Isn't it convenient that seqbuf was just populated???  :-)
+	*/
+	if (archive_dir) {
+		rc = open_log_archive(rtcfg_nodeid, seqbuf);
+		if (rc < 0) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				 "Could not open COPY SET archive file %s - %s",
+				 node->no_id, archive_tmp, strerror(errno));
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
+			return -1;
+		}
+		rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+		if (rc < 0) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				 "Could not generate COPY SET archive header %s - %s",
+				 node->no_id, archive_tmp, strerror(errno));
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
+			return -1;
+		}
+	}
 	/*
 	 * Listen on the special relation telling what node daemon this connection
 	 * belongs to.
@@ -2118,6 +2292,7 @@
 	{
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		terminate_log_archive();
 		return -1;
 	}
 
@@ -2147,6 +2322,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		if (*(PQgetvalue(res1, 0, 0)) == 't')
@@ -2157,6 +2333,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		PQclear(res1);
@@ -2170,6 +2347,7 @@
 		{
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 	}
@@ -2199,6 +2377,7 @@
 		PQclear(res1);
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		terminate_log_archive();
 		return -1;
 	}
 	ntuples1 = PQntuples(res1);
@@ -2236,6 +2415,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2259,6 +2439,7 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				terminate_log_archive();
 				return -1;
 			}
 			rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2279,6 +2460,7 @@
 					PQclear(res1);
 					slon_disconnectdb(pro_conn);
 					dstring_free(&query1);
+					terminate_log_archive();
 					return -1;
 				}
 				slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
@@ -2314,6 +2496,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 
@@ -2334,6 +2517,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		ntuples2 = PQntuples(res2);
@@ -2348,6 +2532,7 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				terminate_log_archive();
 				return -1;
 			}
 		}
@@ -2356,6 +2541,10 @@
 		/*
 		 * Begin a COPY from stdin for the table on the local DB
 		 */
+		slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: "
+			 "Begin COPY of table %s\n",
+			 node->no_id, tab_fqname);
+
 		slon_mkquery(&query1,
 					 "select %s.truncateTable('%s'); "
 					 "copy %s from stdin; ",
@@ -2372,9 +2561,25 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
+		if (archive_dir) {
+			slon_log(SLON_DEBUG4, "start log ship copy of %s\n", tab_fqname);
+			slon_mkquery(&query1,
+				     "delete from %s;copy %s from stdin;", tab_fqname, tab_fqname);
+			rc = submit_query_to_archive(&query1);
+			if (rc < 0) {
+				slon_log(SLON_ERROR, "remoteWorkerThread_d: "
+					 "Could not generate copy_set request for %s - %s",
+					 node->no_id, tab_fqname, strerror(errno));
 
+				slon_disconnectdb(pro_conn);
+				dstring_free(&query1);
+				terminate_log_archive();
+				return -1;
+			}
+		}
 		/*
 		 * Begin a COPY to stdout for the table on the provider DB
 		 */
@@ -2398,6 +2603,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 
@@ -2410,7 +2616,6 @@
 			int			len = strlen(copydata);
 
 			copysize += (int64) len;
-
 			if (PQputCopyData(loc_dbconn, copydata, len) != 1)
 			{
 				slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
@@ -2426,8 +2631,30 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				terminate_log_archive();
 				return -1;
 			}
+			if (archive_dir) {
+				rc = fwrite(copydata, 1, len, archive_fp);
+				if (rc != len) {
+					slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+						 "PQputCopyData() - log shipping - %s",
+						 node->no_id, strerror(errno));
+#ifdef SLON_MEMDEBUG
+					memset(copydata, 88, len);
+#endif
+					PQfreemem(copydata);
+					PQputCopyEnd(loc_dbconn, "Slony-I: copy set operation");
+					PQclear(res3);
+					PQclear(res2);
+					PQclear(res1);
+					slon_disconnectdb(pro_conn);
+					dstring_free(&query1);
+					terminate_log_archive();
+					return -1;
+					
+				}
+			}
 #ifdef SLON_MEMDEBUG
 			memset(copydata, 88, len);
 #endif
@@ -2444,6 +2671,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 
@@ -2464,6 +2692,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		PQclear(res3);
@@ -2480,6 +2709,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		res2 = PQgetResult(loc_dbconn);
@@ -2493,8 +2723,12 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
+		if (archive_dir) {
+			rc = submit_string_to_archive("\\.");
+		}
 #else							/* ! HAVE_PQPUTCOPYDATA */
 		copydone = false;
 		while (!copydone)
@@ -2517,16 +2751,22 @@
 					case 0:
 						PQputline(loc_dbconn, copybuf);
 						PQputline(loc_dbconn, "\n");
+					if (archive_dir)
+						submit_string_to_archive(copybuf);
 						break;
 					case 1:
 						PQputline(loc_dbconn, copybuf);
+					if (archive_dir)
+						submit_raw_data_to_archive(copybuf);
 						break;
 
 				}
 			}
 		}
 		PQputline(loc_dbconn, "\\.\n");
-
+		if (archive_dir) {
+			rc = submit_string_to_archive("\\\.");
+		}
 		/*
 		 * End the COPY to stdout on the provider
 		 */
@@ -2541,6 +2781,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		PQclear(res3);
@@ -2562,6 +2803,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 #endif   /* HAVE_PQPUTCOPYDATA */
@@ -2580,8 +2822,13 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
+		if (archive_dir) {
+			submit_query_to_archive(&query1);
+		}
+
 		gettimeofday(&tv_now, NULL);
 		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
 				 "%.3f seconds to copy table %s\n",
@@ -2616,6 +2863,7 @@
 		PQclear(res1);
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		terminate_log_archive();
 		return -1;
 	}
 	ntuples1 = PQntuples(res1);
@@ -2638,6 +2886,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 	}
@@ -2670,6 +2919,7 @@
 		PQclear(res1);
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		terminate_log_archive();
 		return -1;
 	}
 	ntuples1 = PQntuples(res1);
@@ -2683,6 +2933,8 @@
 				 "set last_value of sequence %s (%s) to %s\n",
 				 node->no_id, seql_seqid, seq_fqname, seql_last_value);
 
+		
+
 		/*
 		 * sequence with ID 0 is a nodes rowid ... only remember in seqlog.
 		 */
@@ -2691,6 +2943,10 @@
 			slon_mkquery(&query1,
 						 "select \"pg_catalog\".setval('%q', '%s'); ",
 						 seq_fqname, seql_last_value);
+
+			if (archive_dir) {
+				submit_query_to_archive(&query1);
+			}
 		}
 		else
 			dstring_reset(&query1);
@@ -2706,6 +2962,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 	}
@@ -2749,6 +3006,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		if (PQntuples(res1) != 1)
@@ -2759,6 +3017,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		if (PQgetisnull(res1, 0, 0))
@@ -2805,6 +3064,7 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				terminate_log_archive();
 				return -1;
 			}
 			if (PQntuples(res1) != 1)
@@ -2815,6 +3075,7 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				terminate_log_archive();
 				return -1;
 			}
 			ssy_seqno = PQgetvalue(res1, 0, 0);
@@ -2859,6 +3120,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		ntuples1 = PQntuples(res2);
@@ -2899,6 +3161,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		if (PQntuples(res1) != 1)
@@ -2909,6 +3172,7 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			terminate_log_archive();
 			return -1;
 		}
 		dstring_init(&ssy_action_list);
@@ -2937,14 +3201,45 @@
 	{
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		terminate_log_archive();
+		return -1;
+	}
+	if (archive_dir) {
+		slon_mkquery(&query1,
+			     "insert into %s.sl_setsync_offline () "
+			     "values ('%d', '%d');",
+			     rtcfg_namespace, set_id, ssy_seqno);
+		rc = submit_query_to_archive(&query1);
+		if (rc < 0) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				 " could not insert to sl_setsync_offline",
+				 node->no_id);
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
 		return -1;
 	}
+	}
 	gettimeofday(&tv_now, NULL);
 	slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
 			 "%.3f seconds to build initial setsync status\n",
 			 node->no_id,
 			 TIMEVAL_DIFF(&tv_start2, &tv_now));
 
+	if (archive_dir) {
+		rc = close_log_archive();
+		if (rc < 0) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				 " could not close archive log %s - %s",
+				 node->no_id, archive_tmp, strerror(errno));
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
+			return -1;
+		}
+	}
+		
+
 	/*
 	 * Roll back the transaction we used on the provider and close the
 	 * database connection.
@@ -2954,6 +3249,7 @@
 	{
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		terminate_log_archive();
 		return -1;
 	}
 	slon_disconnectdb(pro_conn);
@@ -2966,11 +3262,9 @@
 	gettimeofday(&tv_now, NULL);
 	slon_log(SLON_DEBUG1, "copy_set %d done in %.3f seconds\n", set_id,
 			 TIMEVAL_DIFF(&tv_start, &tv_now));
-
 	return 0;
 }
 
-
 static int
 sync_event(SlonNode * node, SlonConn * local_conn,
 		   WorkerGroupData * wd, SlonWorkMsg_event * event)
@@ -4219,9 +4513,11 @@
 
 int close_log_archive () {
   int rc;
+	if (archive_dir) {
   rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n");
   rc = fclose(archive_fp);
   rc = rename(archive_tmp, archive_name);
+	}
   return rc;
 }
 
@@ -4231,13 +4527,18 @@
 }
 
 int submit_query_to_archive(SlonDString *ds) {
-  return fprintf(archive_fp, "%s\n", *ds->data);
+	return fprintf(archive_fp, "%s\n", ds->data);
 }
 
 int submit_string_to_archive (const char *s) {
   return fprintf(archive_fp, "%s\n", s);
 }
 
+/* Raw form used for COPY where we don't want any extra cr/lf output */
+int submit_raw_data_to_archive (const char *s) {
+	return fprintf(archive_fp, "%s", s);
+}
+
 void terminate_log_archive () {
   if (archive_fp) { 
     fclose(archive_fp); 
@@ -4248,17 +4549,20 @@
   time_t now;
   now = time(NULL);
   return fprintf(archive_fp, 
-	  "-- Slony-I sync log\n"
+		       "-- Slony-I log shipping archive\n"
 	  "-- Node %d, Event %s\n"
 	  "-- at... %s\n"
 	  "start transaction;\n",
 	  node_id, seqbuf, ctime(&now));
 }
 
-/*
- * Local Variables:
- *	tab-width: 4
- *	c-indent-level: 4
- *	c-basic-offset: 4
- * End:
- */
+/* write_void_log() writes out a "void" log consisting of the message
+ * which must either be a valid SQL query or a SQL comment. */
+
+int write_void_log (int node_id, char *seqbuf, const char *message) {
+	open_log_archive(node_id, seqbuf);
+	generate_archive_header(node_id, seqbuf);
+	submit_string_to_archive(message);
+	close_log_archive();
+}
+   
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.45
retrieving revision 1.46
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.45 -r1.46
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -50,7 +50,7 @@
 
 #define SLON_CLEANUP_SLEEP			600 /* sleep 10 minutes between */
  /* cleanup calls */
-#define SLON_VACUUM_FREQUENCY		1	/* vacuum every 3rd cleanup */
+#define SLON_VACUUM_FREQUENCY		3	/* vacuum every 3rd cleanup */
 
 
 typedef enum
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.19
retrieving revision 1.20
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.19 -r1.20
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -32,7 +32,8 @@
  * ---------- Global data ----------
  */
 int			vac_frequency = SLON_VACUUM_FREQUENCY;
-
+static unsigned long earliest_xid = 0;
+static unsigned long get_earliest_xid (PGconn *dbconn);
 /*
  * ---------- cleanupThread_main
  *
@@ -55,6 +56,7 @@
 	int			n	  ,
 				t;
 	int			vac_count = 0;
+	char *vacuum_action;
 
 	slon_log(SLON_DEBUG1, "cleanupThread: thread starts\n");
 
@@ -166,28 +168,44 @@
 		 */
 		if (vac_frequency != 0 && ++vac_count >= vac_frequency)
 		{
+        unsigned long latest_xid;
 			vac_count = 0;
-
+			latest_xid = get_earliest_xid(dbconn);
+			if (earliest_xid != latest_xid) {
+			  vacuum_action = "vacuum analyze";
+			} else {
+			  vacuum_action = "analyze";
+			  slon_log(SLON_DEBUG4, "cleanupThread: xid %d still active - analyze instead\n",
+				   earliest_xid);
+			}
+            earliest_xid = latest_xid;
 			/*
 			 * Build the query string for vacuuming replication runtime data
 			 * and event tables
 			 */
 			dstring_init(&query3);
 			slon_mkquery(&query3,
-						 "vacuum analyze %s.sl_event; "
-						 "vacuum analyze %s.sl_confirm; "
-						 "vacuum analyze %s.sl_setsync; "
-						 "vacuum analyze %s.sl_log_1; "
-						 "vacuum analyze %s.sl_log_2;"
-						 "vacuum analyze %s.sl_seqlog;"
-						 "vacuum analyze pg_catalog.pg_listener;",
+				     "%s %s.sl_event; "
+				     "%s %s.sl_confirm; "
+				     "%s %s.sl_setsync; "
+				     "%s %s.sl_log_1; "
+				     "%s %s.sl_log_2;"
+				     "%s %s.sl_seqlog;"
+				     "%s pg_catalog.pg_listener;",
+				     vacuum_action,
 						 rtcfg_namespace,
+				     vacuum_action,
 						 rtcfg_namespace,
+				     vacuum_action,
 						 rtcfg_namespace,
+				     vacuum_action,
 						 rtcfg_namespace,
+				     vacuum_action,
 						 rtcfg_namespace,
-						 rtcfg_namespace);
-
+				     vacuum_action,
+				     rtcfg_namespace,
+				     vacuum_action
+				     );
 
 			gettimeofday(&tv_start, NULL);
 			res = PQexec(dbconn, dstring_data(&query3));
@@ -231,3 +249,41 @@
 	slon_log(SLON_DEBUG1, "cleanupThread: thread done\n");
 	pthread_exit(NULL);
 }
+
+
+static unsigned long get_earliest_xid (PGconn *dbconn) {
+	unsigned long lo = 2147483647;
+	unsigned long minhi = -1;
+	unsigned long minlo = lo;
+	unsigned long xid;
+	long n,t;
+	PGresult   *res;
+	SlonDString query1;
+	dstring_init(&query1);
+	slon_mkquery(&query1, "select transaction from pg_catalog.pg_locks where transaction is not null;");
+	res = PQexec(dbconn, dstring_data(&query1));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK) {
+		slon_log(SLON_FATAL, "cleanupThread: could not read locks from pg_locks!");
+		PQclear(res);
+		slon_abort();
+		return -1;
+	} else {
+		n = PQntuples(res);
+		for (t = 0; t < n; t++) {
+			xid = atoi(PQgetvalue(res, t, 0));
+			printf ("xid: %d\n", xid);
+			if (xid > lo) {
+				if (xid < minlo)
+					minlo = xid;
+			} else {
+				if (xid < minhi)
+					minhi = xid;
+			}
+		}
+	}
+	printf("minhi: %d minlo: %d\n", minlo, minhi);
+	if ((minhi - lo) < minlo)
+		return minlo;
+	else 
+		return minhi;
+}


More information about the Slony1-commit mailing list