CVS User Account cvsuser
Wed Sep 28 15:39:31 PDT 2005
Log Message:
-----------
Changes to handling of log shipping:

1.  COPY statements include field names if the subscriber is running
    PG version >= 7.4

    This was a requested feature, and allows a replica to have additional
    fields not found in the original system.

2.  Log shipping time stamps

    The log shipping function calls have been augmented with the timestamp
    of the end of the SYNC.

    Documentation has been modified accordingly.

Modified Files:
--------------
    slony1-engine/doc/adminguide:
        logshipping.sgml (r1.10 -> r1.11)
    slony1-engine/src/backend:
        slony1_funcs.sql (r1.66 -> r1.67)
        slony1_funcs.v73.sql (r1.7 -> r1.8)
        slony1_funcs.v74.sql (r1.6 -> r1.7)
    slony1-engine/src/slon:
        remote_worker.c (r1.89 -> r1.90)
    slony1-engine/tools:
        slony1_dump.sh (r1.1 -> r1.2)

-------------- next part --------------
Index: logshipping.sgml
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/doc/adminguide/logshipping.sgml,v
retrieving revision 1.10
retrieving revision 1.11
diff -Ldoc/adminguide/logshipping.sgml -Ldoc/adminguide/logshipping.sgml -u -w -r1.10 -r1.11
--- doc/adminguide/logshipping.sgml
+++ doc/adminguide/logshipping.sgml
@@ -238,13 +238,36 @@
 dashes at the end of the <quote>header</quote> material:
 
 <programlisting>
--- Slony-I sync log
--- Node 11, event 745
+-- Slony-I log shipping archive
+-- Node 11, Event 656
 start transaction;
-select "_T1".setsyncTracking_offline(1, '744', '745');
---------------------------------------------------------------------
+
+select "_T1".setsyncTracking_offline(1, '655', '656', '2005-09-23 18:37:40.206342');
+-- end of log archiving header
 </programlisting></para></listitem>
 
+
+<listitem><para> Note that the header includes a timestamp indicating
+SYNC time. 
+<programlisting>
+-- Slony-I log shipping archive
+-- Node 11, Event 109
+start transaction;
+
+select "_T1".setsyncTracking_offline(1, '96', '109', '2005-09-23 19:01:31.267403');
+-- end of log archiving header
+</programlisting></para>
+
+<para> This timestamp represents the time at which the
+<command>SYNC</command> was issued on the origin node.</para>
+
+<para> The value is stored in the log shipping configuration table
+sl_setsync_offline.</para>
+
+<para> If you are constructing a temporal database, this is likely to
+represent the time you will wish to have apply to all of the data in
+the given log shipping transaction log. </para>
+</listitem>
 </itemizedlist>
 </sect2>
 </sect1>
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.66
retrieving revision 1.67
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.66 -r1.67
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -5196,3 +5196,29 @@
 
 comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.';
 
+create or replace function @NAMESPACE at .copyFields(integer) 
+returns text
+as '
+declare
+	result text;
+	prefix text;
+	prec record;
+begin
+	result := '''';
+	prefix := ''('';   -- Initially, prefix is the opening paren
+
+	for prec in select @NAMESPACE at .slon_quote_input(a.attname) as column from @NAMESPACE at .sl_table t, pg_catalog.pg_attribute a where t.tab_id = $1 and t.tab_reloid = a.attrelid and a.attnum > 0 order by attnum
+	loop
+		result := result || prefix || prec.column;
+		prefix := '','';   -- Subsequently, prepend columns with commas
+	end loop;
+	result := result || '')'';
+	return result;
+end;
+' language plpgsql;
+
+comment on function @NAMESPACE at .copyFields(integer) is
+'Return a string consisting of what should be appended to a COPY statement
+to specify fields for the passed-in tab_id.  
+
+In PG versions > 7.3, this looks like (field1,field2,...fieldn)';
Index: slony1_funcs.v74.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v74.sql,v
retrieving revision 1.6
retrieving revision 1.7
diff -Lsrc/backend/slony1_funcs.v74.sql -Lsrc/backend/slony1_funcs.v74.sql -u -w -r1.6 -r1.7
--- src/backend/slony1_funcs.v74.sql
+++ src/backend/slony1_funcs.v74.sql
@@ -26,3 +26,13 @@
 end;
 ' language plpgsql;
 
+comment on function @NAMESPACE at .truncateTable(text) is
+'Delete all data from specified table';
+
+create or replace function @NAMESPACE at .pre74()
+returns integer
+as 'select 0;' language sql;
+
+comment on function @NAMESPACE at .pre74() is 
+'Returns 1 or 0 based on whether or not the DB is running a
+version earlier than 7.4';
Index: slony1_funcs.v73.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v73.sql,v
retrieving revision 1.7
retrieving revision 1.8
diff -Lsrc/backend/slony1_funcs.v73.sql -Lsrc/backend/slony1_funcs.v73.sql -u -w -r1.7 -r1.8
--- src/backend/slony1_funcs.v73.sql
+++ src/backend/slony1_funcs.v73.sql
@@ -26,3 +26,13 @@
 end;
 ' language plpgsql;
 
+comment on function @NAMESPACE at .truncateTable(text) is
+'Delete all data from specified table';
+
+create or replace function @NAMESPACE at .pre74()
+returns integer
+as 'select 1;' language sql;
+
+comment on function @NAMESPACE at .pre74() is 
+'Returns 1 or 0 based on whether or not the DB is running a
+version earlier than 7.4';
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.89
retrieving revision 1.90
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.89 -r1.90
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -248,11 +248,12 @@
 static int open_log_archive (int node_id, char *seqbuf);
 static int close_log_archive ();
 static void terminate_log_archive ();
-static int generate_archive_header (int node_id, char *seqbuf);
+static int generate_archive_header (int node_id, const char *seqbuf);
 static int submit_query_to_archive(SlonDString *ds);
 static int submit_string_to_archive (const char *s);
 static int submit_raw_data_to_archive (const char *s);
-static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf);
+static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, 
+				const char *seqbuf, const char *timestamp);
 static int write_void_log (int node_id, char *seqbuf, const char *message);
 
 #define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
@@ -1397,7 +1398,7 @@
 						     node->no_id, archive_tmp, strerror(errno));
 					    slon_abort();
 					}
-					rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf);
+					rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c);
 					if (rc < 0) {
 					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						     "Could not generate DDL archive tracker %s - %s",
@@ -2364,11 +2365,14 @@
 	SlonDString indexregenquery;
 	int			ntuples1;
 	int			ntuples2;
+	int			ntuples3;
 	int			tupno1;
 	int			tupno2;
 	PGresult   *res1;
 	PGresult   *res2;
 	PGresult   *res3;
+	PGresult   *res4;
+	int        nodeon73;
 	int			rc;
 	int			set_origin = 0;
 	SlonNode   *sub_node;
@@ -2922,15 +2926,60 @@
 		/*
 		 * Begin a COPY from stdin for the table on the local DB
 		 */
-		slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: "
+		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
 			 "Begin COPY of table %s\n",
 			 node->no_id, tab_fqname);
 
+		dstring_init(&query2);
+		slon_mkquery(&query2, "select %s.copyFields(%d);",
+			     rtcfg_namespace, tab_id);
+
+		res3 = PQexec(pro_dbconn, dstring_data(&query2));
+
+		if (PQresultStatus(res2) != PGRES_TUPLES_OK) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
+				 node->no_id, dstring_data(&query2),
+				 PQresultErrorMessage(res3));
+			PQclear(res3);
+			PQclear(res1);
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
+			return -1;
+		}
+
+		slon_mkquery(&query2, "select %s.pre74();",
+			     rtcfg_namespace);
+		res4 = PQexec(loc_dbconn, dstring_data(&query2));
+		
+		if (PQresultStatus(res4) != PGRES_TUPLES_OK) {
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
+				 node->no_id, dstring_data(&query2),
+				 PQresultErrorMessage(res4));
+			PQclear(res4);
+			PQclear(res3);
+			PQclear(res1);
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			dstring_free(&query2);
+			terminate_log_archive();
+			return -1;
+		}
+
+		/* Are we running on < PG 7.4???  result =  */
+		nodeon73 = atoi(PQgetvalue(res4, 0, 0));
+
+		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+			 " nodeon73 is %d\n",
+			 node->no_id, nodeon73);
+		
 		slon_mkquery(&query1,
 			     "select %s.truncateTable('%s'); "
-			     "copy %s from stdin; ",
+			     "copy %s %s from stdin; ",
 			     rtcfg_namespace,
-			     tab_fqname, tab_fqname);
+			     tab_fqname, tab_fqname,
+			     nodeon73 ? "" : PQgetvalue(res3, 0, 0)
+			);
 		res2 = PQexec(loc_dbconn, dstring_data(&query1));
 		if (PQresultStatus(res2) != PGRES_COPY_IN)
 		{
@@ -2946,9 +2995,9 @@
 			return -1;
 		}
 		if (archive_dir) {
-			slon_log(SLON_DEBUG4, "start log ship copy of %s\n", tab_fqname);
 			slon_mkquery(&query1,
-				     "delete from %s;copy %s from stdin;", tab_fqname, tab_fqname);
+				     "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
+				     nodeon73 ? "" : PQgetvalue(res3, 0, 0));
 			rc = submit_query_to_archive(&query1);
 			if (rc < 0) {
 				slon_log(SLON_ERROR, "remoteWorkerThread_d: "
@@ -2965,7 +3014,8 @@
 		 * Begin a COPY to stdout for the table on the provider DB
 		 */
 		slon_mkquery(&query1,
-			     "copy %s to stdout; ", tab_fqname);
+			     "copy %s %s to stdout; ", tab_fqname, PQgetvalue(res3, 0, 0));
+		PQclear(res3);
 		res3 = PQexec(pro_dbconn, dstring_data(&query1));
 		if (PQresultStatus(res3) != PGRES_COPY_OUT)
 		{
@@ -3471,7 +3521,6 @@
 			ssy_maxxid = PQgetvalue(res1, 0, 2);
 			ssy_xip = PQgetvalue(res1, 0, 3);
 
-			dstring_init(&query2);
 			slon_mkquery(&query2,
 				     "log_xid >= '%s' or (log_xid >= '%s'",
 				     ssy_maxxid, ssy_minxid);
@@ -4018,8 +4067,12 @@
 			 */
 			if (archive_dir)
 			{
+				slon_log(SLON_DEBUG2, "writing archive log...\n");
+				fflush(stderr);
+				fflush(stdout);
 			  rc = logarchive_tracking(rtcfg_namespace, sub_set, 
-						   PQgetvalue(res1, tupno1, 1), seqbuf);
+							 PQgetvalue(res1, tupno1, 1), seqbuf, 
+							 event->ev_timestamp_c);
 			  if (rc < 0) {
 			    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 				     "Cannot write to archive file %s - %s",
@@ -4904,16 +4957,27 @@
 int close_log_archive () {
 	int rc = 0;
 	if (archive_dir) {
-		rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n");
+		rc = fprintf(archive_fp, 
+			     "\n------------------------------------------------------------------\n"
+			     "-- End Of Archive Log\n"
+			     "------------------------------------------------------------------\n"
+			     "commit;\n"
+			     "vacuum analyze %s.sl_setsync_offline;\n", 
+			     rtcfg_namespace);
 		rc = fclose(archive_fp);
 		rc = rename(archive_tmp, archive_name);
 	}
 	return rc;
 }
 
-int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf) {
-	return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s');\n-- end of log archiving header\n------------------------------------------------------------------\n-- start of Slony-I data\n------------------------------------------------------------------\n",
-		       namespace, sub_set, firstseq, seqbuf);
+int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, 
+			 const char *seqbuf, const char *timestamp) {
+	return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
+		       "-- end of log archiving header\n"
+		       "------------------------------------------------------------------\n"
+		       "-- start of Slony-I data\n"
+		       "------------------------------------------------------------------\n",
+		       namespace, sub_set, firstseq, seqbuf, timestamp);
 }
 
 int submit_query_to_archive(SlonDString *ds) {
@@ -4935,15 +4999,12 @@
 	}
 }
 
-int generate_archive_header (int node_id, char *seqbuf) {
-	time_t now;
-	now = time(NULL);
+int generate_archive_header (int node_id, const char *seqbuf) {
 	return fprintf(archive_fp, 
 		       "-- Slony-I log shipping archive\n"
 		       "-- Node %d, Event %s\n"
-		       "-- at... %s\n"
 		       "start transaction;\n",
-		       node_id, seqbuf, ctime(&now));
+		       node_id, seqbuf);
 }
 
 /* write_void_log() writes out a "void" log consisting of the message
Index: slony1_dump.sh
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/tools/slony1_dump.sh,v
retrieving revision 1.1
retrieving revision 1.2
diff -Ltools/slony1_dump.sh -Ltools/slony1_dump.sh -u -w -r1.1 -r1.2
--- tools/slony1_dump.sh
+++ tools/slony1_dump.sh
@@ -91,15 +91,15 @@
 create table $clname.sl_setsync_offline (
 	ssy_setid			int4,
 	ssy_seqno			int8,
+	ssy_synctime                    timestamptz,
 
 	CONSTRAINT "sl_setsync-pkey"
 		PRIMARY KEY (ssy_setid)
 );
 
-
--- ----------------------------------------------------------------------
+-- -----------------------------------------------------------------------------
 -- FUNCTION sequenceSetValue_offline (seq_id, seq_origin, ev_seqno, last_value)
--- ----------------------------------------------------------------------
+-- -----------------------------------------------------------------------------
 create or replace function $clname.sequenceSetValue_offline(int4, int8) returns int4
 as '
 declare
@@ -127,15 +127,16 @@
 end;
 ' language plpgsql;
 
--- ----------------------------------------------------------------------
--- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value)
--- ----------------------------------------------------------------------
+-- ---------------------------------------------------------------------------------------
+-- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value, sync_time)
+-- ---------------------------------------------------------------------------------------
 create or replace function $clname.setsyncTracking_offline(int4, int8, int8) returns int8
 as '
 declare
 	p_set_id	alias for \$1;
 	p_old_seq	alias for \$2;
 	p_new_seq	alias for \$3;
+        p_sync_time     alias for \$4;
 	v_row		record;
 begin
 	select ssy_seqno into v_row from $clname.sl_setsync_offline
@@ -148,8 +149,9 @@
 		raise exception ''Slony-I: set % is on sync %, this archive log expects %'', 
 			p_set_id, v_row.ssy_seqno, p_old_seq;
 	end if;
+	raise notice ''Slony-I: Process set % sync % time'', p_set_id, p_new_seq, p_sync_time;
 
-	update $clname.sl_setsync_offline set ssy_seqno = p_new_seq
+	update $clname.sl_setsync_offline set ssy_seqno = p_new_seq, ssy_synctime = p_sync_time
 		where ssy_setid = p_set_id;
 	return p_new_seq;
 end;


More information about the Slony1-commit mailing list