Christopher Browne cbbrowne
Fri Sep 23 22:44:34 PDT 2005
Listed below is a context diff for a patch which I intend to apply to
the 1.1 branch next week in order to generate a 1.1.1 release.

There are three things justifying this:

1.  Ian Burrell's fix to the multibyte problem which bites anyone
using Unicode or Big 5 character sets.

<http://gborg.postgresql.org/pipermail/slony1-commit/2005-July/000655.html>

This is a rather serious problem which needs to be addressed.

2.  Some textual changes (yes, somewhat self-serving :-)) to log
shipping to help us use it to populate temporal databases.  

Pointedly, the setsyncTracking_offline() function has been augmented
with a SYNC time parameter which passes in the end time of each SYNC.

In addition, this function raises a notice every time it processes a
log so that the Gentle User will have some indication in logs/stdout
as to what logs have been processed.  That's just too easy an
improvement to pass up :-).

3.  When working with post-7.3 versions of PostgreSQL, the COPY
statements include field names.

This was a feature request a while back which didn't quite make it
into 1.1; it turns out that we need this for handling temporal
databases.

This set of changes conspicuously does NOT include responses to some
of the recent requests surrounding modifying the way sl_log_1 is
accessed; that's fodder for 1.2, and I will see about getting to that.

Apologies that I haven't been getting to some of these things quickly;
I had other higher priority projects.  (To be followed by some
vacation, in October :-).)

I do have several patches partially tested that need to head into the
code base in October in preparation for 1.2; as that introduces
Windows support, that will doubtless be an exciting time.


Index: doc/adminguide/logshipping.sgml
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/doc/adminguide/logshipping.sgml,v
retrieving revision 1.10
diff -c -u -r1.10 logshipping.sgml
--- doc/adminguide/logshipping.sgml	29 Jun 2005 20:13:37 -0000	1.10
+++ doc/adminguide/logshipping.sgml	23 Sep 2005 19:21:12 -0000
@@ -238,13 +238,36 @@
 dashes at the end of the <quote>header</quote> material:
 
 <programlisting>
--- Slony-I sync log
--- Node 11, event 745
+-- Slony-I log shipping archive
+-- Node 11, Event 656
 start transaction;
-select "_T1".setsyncTracking_offline(1, '744', '745');
---------------------------------------------------------------------
+
+select "_T1".setsyncTracking_offline(1, '655', '656', '2005-09-23 18:37:40.206342');
+-- end of log archiving header
 </programlisting></para></listitem>
 
+
+<listitem><para> Note that the header includes a timestamp indicating
+SYNC time. 
+<programlisting>
+-- Slony-I log shipping archive
+-- Node 11, Event 109
+start transaction;
+
+select "_T1".setsyncTracking_offline(1, '96', '109', '2005-09-23 19:01:31.267403');
+-- end of log archiving header
+</programlisting></para>
+
+<para> This timestamp represents the time at which the
+<command>SYNC</command> was issued on the origin node.</para>
+
+<para> The value is stored in the log shipping configuration table
+sl_setsync_offline.</para>
+
+<para> If you are constructing a temporal database, this is likely to
+represent the time you will wish to have apply to all of the data in
+the given log shipping transaction log. </para>
+</listitem>
 </itemizedlist>
 </sect2>
 </sect1>
Index: src/backend/slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.66
diff -c -u -r1.66 slony1_funcs.sql
--- src/backend/slony1_funcs.sql	20 Jul 2005 09:02:39 -0000	1.66
+++ src/backend/slony1_funcs.sql	23 Sep 2005 19:21:15 -0000
@@ -5196,3 +5196,29 @@
 
 comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.';
 
+create or replace function @NAMESPACE at .copyFields(integer) 
+returns text
+as '
+declare
+	result text;
+	prefix text;
+	prec record;
+begin
+	result := '''';
+	prefix := ''('';   -- Initially, prefix is the opening paren
+
+	for prec in select @NAMESPACE at .slon_quote_input(a.attname) as column from @NAMESPACE at .sl_table t, pg_catalog.pg_attribute a where t.tab_id = $1 and t.tab_reloid = a.attrelid and a.attnum > 0 order by attnum
+	loop
+		result := result || prefix || prec.column;
+		prefix := '','';   -- Subsequently, prepend columns with commas
+	end loop;
+	result := result || '')'';
+	return result;
+end;
+' language plpgsql;
+
+comment on function @NAMESPACE at .copyFields(integer) is
+'Return a string consisting of what should be appended to a COPY statement
+to specify fields for the passed-in tab_id.  
+
+In PG versions > 7.3, this looks like (field1,field2,...fieldn)';
Index: src/backend/slony1_funcs.v73.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v73.sql,v
retrieving revision 1.7
diff -c -u -r1.7 slony1_funcs.v73.sql
--- src/backend/slony1_funcs.v73.sql	7 Jun 2005 21:51:05 -0000	1.7
+++ src/backend/slony1_funcs.v73.sql	23 Sep 2005 19:21:16 -0000
@@ -26,3 +26,13 @@
 end;
 ' language plpgsql;
 
+comment comment on function @NAMESPACE at .truncateTable(text) is
+'Delete all data from specified table';
+
+create or replace function @NAMESPACE at .pre74()
+returns integer
+as 'select 1' language sql;
+
+comment on function @NAMESPACE at .pre74() is 
+'Returns 1/0 based on whether or not the DB is running a
+version earlier than 7.4';
Index: src/backend/slony1_funcs.v74.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v74.sql,v
retrieving revision 1.6
diff -c -u -r1.6 slony1_funcs.v74.sql
--- src/backend/slony1_funcs.v74.sql	19 Apr 2005 15:47:03 -0000	1.6
+++ src/backend/slony1_funcs.v74.sql	23 Sep 2005 19:21:16 -0000
@@ -26,3 +26,13 @@
 end;
 ' language plpgsql;
 
+comment on function @NAMESPACE at .truncateTable(text) is
+'Delete all data from specified table';
+
+create or replace function @NAMESPACE at .pre74()
+returns integer
+as 'select 0' language sql;
+
+comment on function @NAMESPACE at .pre74() is 
+'Returns 1/0 based on whether or not the DB is running a
+version earlier than 7.4';
Index: src/slon/remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.88
diff -c -u -r1.88 remote_worker.c
--- src/slon/remote_worker.c	8 Aug 2005 15:51:18 -0000	1.88
+++ src/slon/remote_worker.c	23 Sep 2005 19:21:21 -0000
@@ -248,11 +248,12 @@
 static int open_log_archive (int node_id, char *seqbuf);
 static int close_log_archive ();
 static void terminate_log_archive ();
-static int generate_archive_header (int node_id, char *seqbuf);
+static int generate_archive_header (int node_id, const char *seqbuf);
 static int submit_query_to_archive(SlonDString *ds);
 static int submit_string_to_archive (const char *s);
 static int submit_raw_data_to_archive (const char *s);
-static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf);
+static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, 
+				const char *seqbuf, const char *timestamp);
 static int write_void_log (int node_id, char *seqbuf, const char *message);
 
 #define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
@@ -1397,7 +1398,7 @@
 						     node->no_id, archive_tmp, strerror(errno));
 					    slon_abort();
 					}
-					rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf);
+					rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c);
 					if (rc < 0) {
 					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						     "Could not generate DDL archive tracker %s - %s",
@@ -2364,11 +2365,14 @@
 	SlonDString indexregenquery;
 	int			ntuples1;
 	int			ntuples2;
+	int			ntuples3;
 	int			tupno1;
 	int			tupno2;
 	PGresult   *res1;
 	PGresult   *res2;
 	PGresult   *res3;
+	PGresult   *res4;
+	int        nodeon73;
 	int			rc;
 	int			set_origin = 0;
 	SlonNode   *sub_node;
@@ -2922,15 +2926,60 @@
 		/*
 		 * Begin a COPY from stdin for the table on the local DB
 		 */
-		slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: "
+		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
 			 "Begin COPY of table %s\n",
 			 node->no_id, tab_fqname);
 
+		dstring_init(&query2);
+		slon_mkquery(&query2, "select %s.copyFields(%d);",
+			     rtcfg_namespace, tab_id);
+
+		res3 = PQexec(pro_dbconn, dstring_data(&query2));
+
+		if (PQresultStatus(res2) != PGRES_TUPLES_OK) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
+				 node->no_id, dstring_data(&query2),
+				 PQresultErrorMessage(res3));
+			PQclear(res3);
+			PQclear(res1);
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
+			return -1;
+		}
+
+		slon_mkquery(&query2, "select %s.pre74();",
+			     rtcfg_namespace);
+		res4 = PQexec(loc_dbconn, dstring_data(&query2));
+		
+		if (PQresultStatus(res4) != PGRES_TUPLES_OK) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
+				 node->no_id, dstring_data(&query2),
+				 PQresultErrorMessage(res4));
+			PQclear(res4);
+			PQclear(res3);
+			PQclear(res1);
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			dstring_free(&query2);
+			terminate_log_archive();
+			return -1;
+		}
+
+		/* Are we running on < PG 7.4???  result =  */
+		nodeon73 = atoi(PQgetvalue(res4, 0, 0));
+
+		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+			 " nodeon73 is %d\n",
+			 node->no_id, nodeon73);
+		
 		slon_mkquery(&query1,
 			     "select %s.truncateTable('%s'); "
-			     "copy %s from stdin; ",
+			     "copy %s %s from stdin; ",
 			     rtcfg_namespace,
-			     tab_fqname, tab_fqname);
+			     tab_fqname, tab_fqname,
+			     nodeon73 ? "" : PQgetvalue(res3, 0, 0)
+			);
 		res2 = PQexec(loc_dbconn, dstring_data(&query1));
 		if (PQresultStatus(res2) != PGRES_COPY_IN)
 		{
@@ -2946,9 +2995,9 @@
 			return -1;
 		}
 		if (archive_dir) {
-			slon_log(SLON_DEBUG4, "start log ship copy of %s\n", tab_fqname);
 			slon_mkquery(&query1,
-				     "delete from %s;copy %s from stdin;", tab_fqname, tab_fqname);
+				     "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
+				     nodeon73 ? "" : PQgetvalue(res3, 0, 0));
 			rc = submit_query_to_archive(&query1);
 			if (rc < 0) {
 				slon_log(SLON_ERROR, "remoteWorkerThread_d: "
@@ -2965,7 +3014,8 @@
 		 * Begin a COPY to stdout for the table on the provider DB
 		 */
 		slon_mkquery(&query1,
-			     "copy %s to stdout; ", tab_fqname);
+			     "copy %s %s to stdout; ", tab_fqname, PQgetvalue(res3, 0, 0));
+		PQclear(res3);
 		res3 = PQexec(pro_dbconn, dstring_data(&query1));
 		if (PQresultStatus(res3) != PGRES_COPY_OUT)
 		{
@@ -3471,7 +3521,6 @@
 			ssy_maxxid = PQgetvalue(res1, 0, 2);
 			ssy_xip = PQgetvalue(res1, 0, 3);
 
-			dstring_init(&query2);
 			slon_mkquery(&query2,
 				     "log_xid >= '%s' or (log_xid >= '%s'",
 				     ssy_maxxid, ssy_minxid);
@@ -3692,21 +3741,21 @@
 	 */
 	if (archive_dir)
 	{
-	  rc = open_log_archive(node->no_id, seqbuf);
-	  if (rc == -1) {
-	    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-		     "Cannot open archive file %s - %s\n",
-		     node->no_id, archive_tmp, strerror(errno));
-	    dstring_free(&query);
-	    return 60;
-	  }
-	  rc = generate_archive_header(node->no_id, seqbuf);
-	  if (rc < 0) {
-	    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-		     "Cannot write to archive file %s - %s",
-		     node->no_id, archive_tmp, strerror(errno));
-	    return 60;
-	  }
+		rc = open_log_archive(node->no_id, seqbuf);
+		if (rc == -1) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				 "Cannot open archive file %s - %s\n",
+				 node->no_id, archive_tmp, strerror(errno));
+			dstring_free(&query);
+			return 60;
+		}
+		rc = generate_archive_header(node->no_id, seqbuf);
+		if (rc < 0) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				 "Cannot write to archive file %s - %s",
+				 node->no_id, archive_tmp, strerror(errno));
+			return 60;
+		}
 	}
 
 	/*
@@ -3774,7 +3823,7 @@
 			int64		prov_seqno;
 
 			prov_seqno = get_last_forwarded_confirm(event->ev_origin,
-													provider->no_id);
+								provider->no_id);
 			if (prov_seqno < 0)
 			{
 				slon_log(SLON_WARN, "remoteWorkerThread_%d: "
@@ -4018,14 +4067,18 @@
 			 */
 			if (archive_dir)
 			{
-			  rc = logarchive_tracking(rtcfg_namespace, sub_set, 
-						   PQgetvalue(res1, tupno1, 1), seqbuf);
-			  if (rc < 0) {
-			    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-				     "Cannot write to archive file %s - %s",
-				     node->no_id, archive_tmp, strerror(errno));
-			    return 60;
-			  }
+				slon_log(SLON_DEBUG2, "writing archive log...\n");
+				fflush(stderr);
+				fflush(stdout);
+				rc = logarchive_tracking(rtcfg_namespace, sub_set, 
+							 PQgetvalue(res1, tupno1, 1), seqbuf, 
+							 event->ev_timestamp_c);
+				if (rc < 0) {
+					slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+						 "Cannot write to archive file %s - %s",
+						 node->no_id, archive_tmp, strerror(errno));
+					return 60;
+				}
 			}
 		}
 		PQclear(res1);
@@ -4904,16 +4957,27 @@
 int close_log_archive () {
 	int rc = 0;
 	if (archive_dir) {
-		rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n");
+		rc = fprintf(archive_fp, 
+			     "\n------------------------------------------------------------------\n"
+			     "-- End Of Archive Log\n"
+			     "------------------------------------------------------------------\n"
+			     "commit;\n"
+			     "vacuum analyze %s.sl_setsync_offline;\n", 
+			     rtcfg_namespace);
 		rc = fclose(archive_fp);
 		rc = rename(archive_tmp, archive_name);
 	}
 	return rc;
 }
 
-int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf) {
-	return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s');\n-- end of log archiving header\n------------------------------------------------------------------\n-- start of Slony-I data\n------------------------------------------------------------------\n",
-		       namespace, sub_set, firstseq, seqbuf);
+int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, 
+			 const char *seqbuf, const char *timestamp) {
+	return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
+		       "-- end of log archiving header\n"
+		       "------------------------------------------------------------------\n"
+		       "-- start of Slony-I data\n"
+		       "------------------------------------------------------------------\n",
+		       namespace, sub_set, firstseq, seqbuf, timestamp);
 }
 
 int submit_query_to_archive(SlonDString *ds) {
@@ -4935,15 +4999,12 @@
 	}
 }
 
-int generate_archive_header (int node_id, char *seqbuf) {
-	time_t now;
-	now = time(NULL);
+int generate_archive_header (int node_id, const char *seqbuf) {
 	return fprintf(archive_fp, 
 		       "-- Slony-I log shipping archive\n"
 		       "-- Node %d, Event %s\n"
-		       "-- at... %s\n"
 		       "start transaction;\n",
-		       node_id, seqbuf, ctime(&now));
+		       node_id, seqbuf);
 }
 
 /* write_void_log() writes out a "void" log consisting of the message
Index: tools/slony1_dump.sh
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/tools/slony1_dump.sh,v
retrieving revision 1.1
diff -c -u -r1.1 slony1_dump.sh
--- tools/slony1_dump.sh	17 Feb 2005 06:59:05 -0000	1.1
+++ tools/slony1_dump.sh	23 Sep 2005 19:21:21 -0000
@@ -91,15 +91,15 @@
 create table $clname.sl_setsync_offline (
 	ssy_setid			int4,
 	ssy_seqno			int8,
+	ssy_synctime                    timestamptz,
 
 	CONSTRAINT "sl_setsync-pkey"
 		PRIMARY KEY (ssy_setid)
 );
 
-
--- ----------------------------------------------------------------------
+-- -----------------------------------------------------------------------------
 -- FUNCTION sequenceSetValue_offline (seq_id, seq_origin, ev_seqno, last_value)
--- ----------------------------------------------------------------------
+-- -----------------------------------------------------------------------------
 create or replace function $clname.sequenceSetValue_offline(int4, int8) returns int4
 as '
 declare
@@ -127,15 +127,16 @@
 end;
 ' language plpgsql;
 
--- ----------------------------------------------------------------------
--- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value)
--- ----------------------------------------------------------------------
+-- ---------------------------------------------------------------------------------------
+-- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value, sync_time)
+-- ---------------------------------------------------------------------------------------
 create or replace function $clname.setsyncTracking_offline(int4, int8, int8) returns int8
 as '
 declare
 	p_set_id	alias for \$1;
 	p_old_seq	alias for \$2;
 	p_new_seq	alias for \$3;
+        p_sync_time     alias for \$4;
 	v_row		record;
 begin
 	select ssy_seqno into v_row from $clname.sl_setsync_offline
@@ -148,8 +149,9 @@
 		raise exception ''Slony-I: set % is on sync %, this archive log expects %'', 
 			p_set_id, v_row.ssy_seqno, p_old_seq;
 	end if;
+	raise notice ''Slony-I: Process set % sync % time'', p_set_id, p_new_seq, p_sync_time;
 
-	update $clname.sl_setsync_offline set ssy_seqno = p_new_seq
+	update $clname.sl_setsync_offline set ssy_seqno = p_new_seq, ssy_synctime = p_sync_time
 		where ssy_setid = p_set_id;
 	return p_new_seq;
 end;

-- 
(format nil "~S@~S" "cbbrowne" "ca.afilias.info")
<http://dev6.int.libertyrms.com/>
Christopher Browne
(416) 673-4124 (land)


More information about the Slony1-general mailing list