Wed Sep 28 15:39:31 PDT 2005
- Previous message: [Slony1-commit] By cbbrowne: Changes to handling of log shipping: 1.
- Next message: [Slony1-commit] By cbbrowne: Release notes
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Changes to handling of log shipping:
1. COPY statements include field names if the subscriber is running
PG version >= 7.4
This was a requested feature, and allows a replica to have additional
fields not found in the original system.
2. Log shipping time stamps
The log shipping function calls have been augmented with the timestamp
of the end of the SYNC.
Documentation has been modified accordingly.
Modified Files:
--------------
slony1-engine/doc/adminguide:
logshipping.sgml (r1.10 -> r1.11)
slony1-engine/src/backend:
slony1_funcs.sql (r1.66 -> r1.67)
slony1_funcs.v73.sql (r1.7 -> r1.8)
slony1_funcs.v74.sql (r1.6 -> r1.7)
slony1-engine/src/slon:
remote_worker.c (r1.89 -> r1.90)
slony1-engine/tools:
slony1_dump.sh (r1.1 -> r1.2)
-------------- next part --------------
Index: logshipping.sgml
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/doc/adminguide/logshipping.sgml,v
retrieving revision 1.10
retrieving revision 1.11
diff -Ldoc/adminguide/logshipping.sgml -Ldoc/adminguide/logshipping.sgml -u -w -r1.10 -r1.11
--- doc/adminguide/logshipping.sgml
+++ doc/adminguide/logshipping.sgml
@@ -238,13 +238,36 @@
dashes at the end of the <quote>header</quote> material:
<programlisting>
--- Slony-I sync log
--- Node 11, event 745
+-- Slony-I log shipping archive
+-- Node 11, Event 656
start transaction;
-select "_T1".setsyncTracking_offline(1, '744', '745');
---------------------------------------------------------------------
+
+select "_T1".setsyncTracking_offline(1, '655', '656', '2005-09-23 18:37:40.206342');
+-- end of log archiving header
</programlisting></para></listitem>
+
+<listitem><para> Note that the header includes a timestamp indicating
+SYNC time.
+<programlisting>
+-- Slony-I log shipping archive
+-- Node 11, Event 109
+start transaction;
+
+select "_T1".setsyncTracking_offline(1, '96', '109', '2005-09-23 19:01:31.267403');
+-- end of log archiving header
+</programlisting></para>
+
+<para> This timestamp represents the time at which the
+<command>SYNC</command> was issued on the origin node.</para>
+
+<para> The value is stored in the log shipping configuration table
+sl_setsync_offline.</para>
+
+<para> If you are constructing a temporal database, this is likely to
+represent the time you will wish to have apply to all of the data in
+the given log shipping transaction log. </para>
+</listitem>
</itemizedlist>
</sect2>
</sect1>
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.66
retrieving revision 1.67
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.66 -r1.67
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -5196,3 +5196,29 @@
comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.';
+create or replace function @NAMESPACE at .copyFields(integer)
+returns text
+as '
+declare
+ result text;
+ prefix text;
+ prec record;
+begin
+ result := '''';
+ prefix := ''(''; -- Initially, prefix is the opening paren
+
+ for prec in select @NAMESPACE at .slon_quote_input(a.attname) as column from @NAMESPACE at .sl_table t, pg_catalog.pg_attribute a where t.tab_id = $1 and t.tab_reloid = a.attrelid and a.attnum > 0 order by attnum
+ loop
+ result := result || prefix || prec.column;
+ prefix := '',''; -- Subsequently, prepend columns with commas
+ end loop;
+ result := result || '')'';
+ return result;
+end;
+' language plpgsql;
+
+comment on function @NAMESPACE at .copyFields(integer) is
+'Return a string consisting of what should be appended to a COPY statement
+to specify fields for the passed-in tab_id.
+
+In PG versions > 7.3, this looks like (field1,field2,...fieldn)';
Index: slony1_funcs.v74.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v74.sql,v
retrieving revision 1.6
retrieving revision 1.7
diff -Lsrc/backend/slony1_funcs.v74.sql -Lsrc/backend/slony1_funcs.v74.sql -u -w -r1.6 -r1.7
--- src/backend/slony1_funcs.v74.sql
+++ src/backend/slony1_funcs.v74.sql
@@ -26,3 +26,13 @@
end;
' language plpgsql;
+comment on function @NAMESPACE at .truncateTable(text) is
+'Delete all data from specified table';
+
+create or replace function @NAMESPACE at .pre74()
+returns integer
+as 'select 0;' language sql;
+
+comment on function @NAMESPACE at .pre74() is
+'Returns 1 or 0 based on whether or not the DB is running a
+version earlier than 7.4';
Index: slony1_funcs.v73.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v73.sql,v
retrieving revision 1.7
retrieving revision 1.8
diff -Lsrc/backend/slony1_funcs.v73.sql -Lsrc/backend/slony1_funcs.v73.sql -u -w -r1.7 -r1.8
--- src/backend/slony1_funcs.v73.sql
+++ src/backend/slony1_funcs.v73.sql
@@ -26,3 +26,13 @@
end;
' language plpgsql;
+comment on function @NAMESPACE at .truncateTable(text) is
+'Delete all data from specified table';
+
+create or replace function @NAMESPACE at .pre74()
+returns integer
+as 'select 1;' language sql;
+
+comment on function @NAMESPACE at .pre74() is
+'Returns 1 or 0 based on whether or not the DB is running a
+version earlier than 7.4';
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.89
retrieving revision 1.90
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.89 -r1.90
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -248,11 +248,12 @@
static int open_log_archive (int node_id, char *seqbuf);
static int close_log_archive ();
static void terminate_log_archive ();
-static int generate_archive_header (int node_id, char *seqbuf);
+static int generate_archive_header (int node_id, const char *seqbuf);
static int submit_query_to_archive(SlonDString *ds);
static int submit_string_to_archive (const char *s);
static int submit_raw_data_to_archive (const char *s);
-static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf);
+static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq,
+ const char *seqbuf, const char *timestamp);
static int write_void_log (int node_id, char *seqbuf, const char *message);
#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
@@ -1397,7 +1398,7 @@
node->no_id, archive_tmp, strerror(errno));
slon_abort();
}
- rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf);
+ rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c);
if (rc < 0) {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not generate DDL archive tracker %s - %s",
@@ -2364,11 +2365,14 @@
SlonDString indexregenquery;
int ntuples1;
int ntuples2;
+ int ntuples3;
int tupno1;
int tupno2;
PGresult *res1;
PGresult *res2;
PGresult *res3;
+ PGresult *res4;
+ int nodeon73;
int rc;
int set_origin = 0;
SlonNode *sub_node;
@@ -2922,15 +2926,60 @@
/*
* Begin a COPY from stdin for the table on the local DB
*/
- slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: "
+ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
"Begin COPY of table %s\n",
node->no_id, tab_fqname);
+ dstring_init(&query2);
+ slon_mkquery(&query2, "select %s.copyFields(%d);",
+ rtcfg_namespace, tab_id);
+
+ res3 = PQexec(pro_dbconn, dstring_data(&query2));
+
+ if (PQresultStatus(res2) != PGRES_TUPLES_OK) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
+ node->no_id, dstring_data(&query2),
+ PQresultErrorMessage(res3));
+ PQclear(res3);
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+
+ slon_mkquery(&query2, "select %s.pre74();",
+ rtcfg_namespace);
+ res4 = PQexec(loc_dbconn, dstring_data(&query2));
+
+ if (PQresultStatus(res4) != PGRES_TUPLES_OK) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
+ node->no_id, dstring_data(&query2),
+ PQresultErrorMessage(res4));
+ PQclear(res4);
+ PQclear(res3);
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ dstring_free(&query2);
+ terminate_log_archive();
+ return -1;
+ }
+
+ /* Are we running on < PG 7.4??? result = */
+ nodeon73 = atoi(PQgetvalue(res4, 0, 0));
+
+ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+ " nodeon73 is %d\n",
+ node->no_id, nodeon73);
+
slon_mkquery(&query1,
"select %s.truncateTable('%s'); "
- "copy %s from stdin; ",
+ "copy %s %s from stdin; ",
rtcfg_namespace,
- tab_fqname, tab_fqname);
+ tab_fqname, tab_fqname,
+ nodeon73 ? "" : PQgetvalue(res3, 0, 0)
+ );
res2 = PQexec(loc_dbconn, dstring_data(&query1));
if (PQresultStatus(res2) != PGRES_COPY_IN)
{
@@ -2946,9 +2995,9 @@
return -1;
}
if (archive_dir) {
- slon_log(SLON_DEBUG4, "start log ship copy of %s\n", tab_fqname);
slon_mkquery(&query1,
- "delete from %s;copy %s from stdin;", tab_fqname, tab_fqname);
+ "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
+ nodeon73 ? "" : PQgetvalue(res3, 0, 0));
rc = submit_query_to_archive(&query1);
if (rc < 0) {
slon_log(SLON_ERROR, "remoteWorkerThread_d: "
@@ -2965,7 +3014,8 @@
* Begin a COPY to stdout for the table on the provider DB
*/
slon_mkquery(&query1,
- "copy %s to stdout; ", tab_fqname);
+ "copy %s %s to stdout; ", tab_fqname, PQgetvalue(res3, 0, 0));
+ PQclear(res3);
res3 = PQexec(pro_dbconn, dstring_data(&query1));
if (PQresultStatus(res3) != PGRES_COPY_OUT)
{
@@ -3471,7 +3521,6 @@
ssy_maxxid = PQgetvalue(res1, 0, 2);
ssy_xip = PQgetvalue(res1, 0, 3);
- dstring_init(&query2);
slon_mkquery(&query2,
"log_xid >= '%s' or (log_xid >= '%s'",
ssy_maxxid, ssy_minxid);
@@ -4018,8 +4067,12 @@
*/
if (archive_dir)
{
+ slon_log(SLON_DEBUG2, "writing archive log...\n");
+ fflush(stderr);
+ fflush(stdout);
rc = logarchive_tracking(rtcfg_namespace, sub_set,
- PQgetvalue(res1, tupno1, 1), seqbuf);
+ PQgetvalue(res1, tupno1, 1), seqbuf,
+ event->ev_timestamp_c);
if (rc < 0) {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Cannot write to archive file %s - %s",
@@ -4904,16 +4957,27 @@
int close_log_archive () {
int rc = 0;
if (archive_dir) {
- rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n");
+ rc = fprintf(archive_fp,
+ "\n------------------------------------------------------------------\n"
+ "-- End Of Archive Log\n"
+ "------------------------------------------------------------------\n"
+ "commit;\n"
+ "vacuum analyze %s.sl_setsync_offline;\n",
+ rtcfg_namespace);
rc = fclose(archive_fp);
rc = rename(archive_tmp, archive_name);
}
return rc;
}
-int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf) {
- return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s');\n-- end of log archiving header\n------------------------------------------------------------------\n-- start of Slony-I data\n------------------------------------------------------------------\n",
- namespace, sub_set, firstseq, seqbuf);
+int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq,
+ const char *seqbuf, const char *timestamp) {
+ return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
+ "-- end of log archiving header\n"
+ "------------------------------------------------------------------\n"
+ "-- start of Slony-I data\n"
+ "------------------------------------------------------------------\n",
+ namespace, sub_set, firstseq, seqbuf, timestamp);
}
int submit_query_to_archive(SlonDString *ds) {
@@ -4935,15 +4999,12 @@
}
}
-int generate_archive_header (int node_id, char *seqbuf) {
- time_t now;
- now = time(NULL);
+int generate_archive_header (int node_id, const char *seqbuf) {
return fprintf(archive_fp,
"-- Slony-I log shipping archive\n"
"-- Node %d, Event %s\n"
- "-- at... %s\n"
"start transaction;\n",
- node_id, seqbuf, ctime(&now));
+ node_id, seqbuf);
}
/* write_void_log() writes out a "void" log consisting of the message
Index: slony1_dump.sh
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/tools/slony1_dump.sh,v
retrieving revision 1.1
retrieving revision 1.2
diff -Ltools/slony1_dump.sh -Ltools/slony1_dump.sh -u -w -r1.1 -r1.2
--- tools/slony1_dump.sh
+++ tools/slony1_dump.sh
@@ -91,15 +91,15 @@
create table $clname.sl_setsync_offline (
ssy_setid int4,
ssy_seqno int8,
+ ssy_synctime timestamptz,
CONSTRAINT "sl_setsync-pkey"
PRIMARY KEY (ssy_setid)
);
-
--- ----------------------------------------------------------------------
+-- -----------------------------------------------------------------------------
-- FUNCTION sequenceSetValue_offline (seq_id, seq_origin, ev_seqno, last_value)
--- ----------------------------------------------------------------------
+-- -----------------------------------------------------------------------------
create or replace function $clname.sequenceSetValue_offline(int4, int8) returns int4
as '
declare
@@ -127,15 +127,16 @@
end;
' language plpgsql;
--- ----------------------------------------------------------------------
--- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value)
--- ----------------------------------------------------------------------
+-- ---------------------------------------------------------------------------------------
+-- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value, sync_time)
+-- ---------------------------------------------------------------------------------------
create or replace function $clname.setsyncTracking_offline(int4, int8, int8) returns int8
as '
declare
p_set_id alias for \$1;
p_old_seq alias for \$2;
p_new_seq alias for \$3;
+ p_sync_time alias for \$4;
v_row record;
begin
select ssy_seqno into v_row from $clname.sl_setsync_offline
@@ -148,8 +149,9 @@
raise exception ''Slony-I: set % is on sync %, this archive log expects %'',
p_set_id, v_row.ssy_seqno, p_old_seq;
end if;
+ raise notice ''Slony-I: Process set % sync % time'', p_set_id, p_new_seq, p_sync_time;
- update $clname.sl_setsync_offline set ssy_seqno = p_new_seq
+ update $clname.sl_setsync_offline set ssy_seqno = p_new_seq, ssy_synctime = p_sync_time
where ssy_setid = p_set_id;
return p_new_seq;
end;
- Previous message: [Slony1-commit] By cbbrowne: Changes to handling of log shipping: 1.
- Next message: [Slony1-commit] By cbbrowne: Release notes
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list