Fri Sep 23 22:44:34 PDT 2005
- Previous message: [Slony1-general] Master or Slave Down
- Next message: [Slony1-general] Forthcoming 1.1.1 release
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Listed below is a context diff for a patch which I intend to apply to the 1.1 branch next week in order to generate a 1.1.1 release. There are three things justifying this: 1. Ian Burrell's fix to the multibyte problem which bites anyone using Unicode or Big 5 character sets. <http://gborg.postgresql.org/pipermail/slony1-commit/2005-July/000655.html> This is a rather serious problem which needs to be addressed. 2. Some textual changes (yes, somewhat self-serving :-)) to log shipping to help us use it to populate temporal databases. Pointedly, the setsyncTracking_offline() function has been augmented with a SYNC time parameter which passes in the end time of each SYNC. In addition, this function raises a notice every time it processes a log so that the Gentle User will have some indication in logs/stdout as to what logs have been processed. That's just too easy an improvement to pass up :-). 3. When working with post-7.3 versions of PostgreSQL, the COPY statements include field names. This was a feature request a while back which didn't quite make it into 1.1; it turns out that we need this for handling temporal databases. This set of changes conspicuously does NOT include responses to some of the recent requests surrounding modifying the way sl_log_1 is accessed; that's fodder for 1.2, and I will see about getting to that. Apologies that I haven't been getting to some of these things quickly; I had other higher priority projects. (To be followed by some vacation, in October :-).) I do have several patches partially tested that need to head into the code base in October in preparation for 1.2; as that introduces Windows support, that will doubtless be an exciting time. Index: doc/adminguide/logshipping.sgml =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/doc/adminguide/logshipping.sgml,v retrieving revision 1.10 diff -c -u -r1.10 logshipping.sgml --- doc/adminguide/logshipping.sgml 29 Jun 2005 20:13:37 -0000 1.10 +++ doc/adminguide/logshipping.sgml 23 Sep 2005 19:21:12 -0000 @@ -238,13 +238,36 @@ dashes at the end of the <quote>header</quote> material: <programlisting> --- Slony-I sync log --- Node 11, event 745 +-- Slony-I log shipping archive +-- Node 11, Event 656 start transaction; -select "_T1".setsyncTracking_offline(1, '744', '745'); --------------------------------------------------------------------- + +select "_T1".setsyncTracking_offline(1, '655', '656', '2005-09-23 18:37:40.206342'); +-- end of log archiving header </programlisting></para></listitem> + +<listitem><para> Note that the header includes a timestamp indicating +SYNC time. +<programlisting> +-- Slony-I log shipping archive +-- Node 11, Event 109 +start transaction; + +select "_T1".setsyncTracking_offline(1, '96', '109', '2005-09-23 19:01:31.267403'); +-- end of log archiving header +</programlisting></para> + +<para> This timestamp represents the time at which the +<command>SYNC</command> was issued on the origin node.</para> + +<para> The value is stored in the log shipping configuration table +sl_setsync_offline.</para> + +<para> If you are constructing a temporal database, this is likely to +represent the time you will wish to have apply to all of the data in +the given log shipping transaction log. </para> +</listitem> </itemizedlist> </sect2> </sect1> Index: src/backend/slony1_funcs.sql =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v retrieving revision 1.66 diff -c -u -r1.66 slony1_funcs.sql --- src/backend/slony1_funcs.sql 20 Jul 2005 09:02:39 -0000 1.66 +++ src/backend/slony1_funcs.sql 23 Sep 2005 19:21:15 -0000 @@ -5196,3 +5196,29 @@ comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.'; +create or replace function @NAMESPACE at .copyFields(integer) +returns text +as ' +declare + result text; + prefix text; + prec record; +begin + result := ''''; + prefix := ''(''; -- Initially, prefix is the opening paren + + for prec in select @NAMESPACE at .slon_quote_input(a.attname) as column from @NAMESPACE at .sl_table t, pg_catalog.pg_attribute a where t.tab_id = $1 and t.tab_reloid = a.attrelid and a.attnum > 0 order by attnum + loop + result := result || prefix || prec.column; + prefix := '',''; -- Subsequently, prepend columns with commas + end loop; + result := result || '')''; + return result; +end; +' language plpgsql; + +comment on function @NAMESPACE at .copyFields(integer) is +'Return a string consisting of what should be appended to a COPY statement +to specify fields for the passed-in tab_id. + +In PG versions > 7.3, this looks like (field1,field2,...fieldn)'; Index: src/backend/slony1_funcs.v73.sql =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v73.sql,v retrieving revision 1.7 diff -c -u -r1.7 slony1_funcs.v73.sql --- src/backend/slony1_funcs.v73.sql 7 Jun 2005 21:51:05 -0000 1.7 +++ src/backend/slony1_funcs.v73.sql 23 Sep 2005 19:21:16 -0000 @@ -26,3 +26,13 @@ end; ' language plpgsql; +comment comment on function @NAMESPACE at .truncateTable(text) is +'Delete all data from specified table'; + +create or replace function @NAMESPACE at .pre74() +returns integer +as 'select 1' language sql; + +comment on function @NAMESPACE at .pre74() is +'Returns 1/0 based on whether or not the DB is running a +version earlier than 7.4'; Index: src/backend/slony1_funcs.v74.sql =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.v74.sql,v retrieving revision 1.6 diff -c -u -r1.6 slony1_funcs.v74.sql --- src/backend/slony1_funcs.v74.sql 19 Apr 2005 15:47:03 -0000 1.6 +++ src/backend/slony1_funcs.v74.sql 23 Sep 2005 19:21:16 -0000 @@ -26,3 +26,13 @@ end; ' language plpgsql; +comment on function @NAMESPACE at .truncateTable(text) is +'Delete all data from specified table'; + +create or replace function @NAMESPACE at .pre74() +returns integer +as 'select 0' language sql; + +comment on function @NAMESPACE at .pre74() is +'Returns 1/0 based on whether or not the DB is running a +version earlier than 7.4'; Index: src/slon/remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.88 diff -c -u -r1.88 remote_worker.c --- src/slon/remote_worker.c 8 Aug 2005 15:51:18 -0000 1.88 +++ src/slon/remote_worker.c 23 Sep 2005 19:21:21 -0000 @@ -248,11 +248,12 @@ static int open_log_archive (int node_id, char *seqbuf); static int close_log_archive (); static void terminate_log_archive (); -static int generate_archive_header (int node_id, char *seqbuf); +static int generate_archive_header (int node_id, const char *seqbuf); static int submit_query_to_archive(SlonDString *ds); static int submit_string_to_archive (const char *s); static int submit_raw_data_to_archive (const char *s); -static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf); +static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, + const char *seqbuf, const char *timestamp); static int write_void_log (int node_id, char *seqbuf, const char *message); #define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive(); @@ -1397,7 +1398,7 @@ node->no_id, archive_tmp, strerror(errno)); slon_abort(); } - rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf); + rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c); if (rc < 0) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Could not generate DDL archive tracker %s - %s", @@ -2364,11 +2365,14 @@ SlonDString indexregenquery; int ntuples1; int ntuples2; + int ntuples3; int tupno1; int tupno2; PGresult *res1; PGresult *res2; PGresult *res3; + PGresult *res4; + int nodeon73; int rc; int set_origin = 0; SlonNode *sub_node; @@ -2922,15 +2926,60 @@ /* * Begin a COPY from stdin for the table on the local DB */ - slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: " + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " "Begin COPY of table %s\n", node->no_id, tab_fqname); + dstring_init(&query2); + slon_mkquery(&query2, "select %s.copyFields(%d);", + rtcfg_namespace, tab_id); + + res3 = PQexec(pro_dbconn, dstring_data(&query2)); + + if (PQresultStatus(res2) != PGRES_TUPLES_OK) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n", + node->no_id, dstring_data(&query2), + PQresultErrorMessage(res3)); + PQclear(res3); + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + + slon_mkquery(&query2, "select %s.pre74();", + rtcfg_namespace); + res4 = PQexec(loc_dbconn, dstring_data(&query2)); + + if (PQresultStatus(res4) != PGRES_TUPLES_OK) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n", + node->no_id, dstring_data(&query2), + PQresultErrorMessage(res4)); + PQclear(res4); + PQclear(res3); + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + dstring_free(&query2); + terminate_log_archive(); + return -1; + } + + /* Are we running on < PG 7.4??? result = */ + nodeon73 = atoi(PQgetvalue(res4, 0, 0)); + + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " + " nodeon73 is %d\n", + node->no_id, nodeon73); + slon_mkquery(&query1, "select %s.truncateTable('%s'); " - "copy %s from stdin; ", + "copy %s %s from stdin; ", rtcfg_namespace, - tab_fqname, tab_fqname); + tab_fqname, tab_fqname, + nodeon73 ? "" : PQgetvalue(res3, 0, 0) + ); res2 = PQexec(loc_dbconn, dstring_data(&query1)); if (PQresultStatus(res2) != PGRES_COPY_IN) { @@ -2946,9 +2995,9 @@ return -1; } if (archive_dir) { - slon_log(SLON_DEBUG4, "start log ship copy of %s\n", tab_fqname); slon_mkquery(&query1, - "delete from %s;copy %s from stdin;", tab_fqname, tab_fqname); + "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname, + nodeon73 ? "" : PQgetvalue(res3, 0, 0)); rc = submit_query_to_archive(&query1); if (rc < 0) { slon_log(SLON_ERROR, "remoteWorkerThread_d: " @@ -2965,7 +3014,8 @@ * Begin a COPY to stdout for the table on the provider DB */ slon_mkquery(&query1, - "copy %s to stdout; ", tab_fqname); + "copy %s %s to stdout; ", tab_fqname, PQgetvalue(res3, 0, 0)); + PQclear(res3); res3 = PQexec(pro_dbconn, dstring_data(&query1)); if (PQresultStatus(res3) != PGRES_COPY_OUT) { @@ -3471,7 +3521,6 @@ ssy_maxxid = PQgetvalue(res1, 0, 2); ssy_xip = PQgetvalue(res1, 0, 3); - dstring_init(&query2); slon_mkquery(&query2, "log_xid >= '%s' or (log_xid >= '%s'", ssy_maxxid, ssy_minxid); @@ -3692,21 +3741,21 @@ */ if (archive_dir) { - rc = open_log_archive(node->no_id, seqbuf); - if (rc == -1) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot open archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); - dstring_free(&query); - return 60; - } - rc = generate_archive_header(node->no_id, seqbuf); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); - return 60; - } + rc = open_log_archive(node->no_id, seqbuf); + if (rc == -1) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot open archive file %s - %s\n", + node->no_id, archive_tmp, strerror(errno)); + dstring_free(&query); + return 60; + } + rc = generate_archive_header(node->no_id, seqbuf); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s", + node->no_id, archive_tmp, strerror(errno)); + return 60; + } } /* @@ -3774,7 +3823,7 @@ int64 prov_seqno; prov_seqno = get_last_forwarded_confirm(event->ev_origin, - provider->no_id); + provider->no_id); if (prov_seqno < 0) { slon_log(SLON_WARN, "remoteWorkerThread_%d: " @@ -4018,14 +4067,18 @@ */ if (archive_dir) { - rc = logarchive_tracking(rtcfg_namespace, sub_set, - PQgetvalue(res1, tupno1, 1), seqbuf); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); - return 60; - } + slon_log(SLON_DEBUG2, "writing archive log...\n"); + fflush(stderr); + fflush(stdout); + rc = logarchive_tracking(rtcfg_namespace, sub_set, + PQgetvalue(res1, tupno1, 1), seqbuf, + event->ev_timestamp_c); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s", + node->no_id, archive_tmp, strerror(errno)); + return 60; + } } } PQclear(res1); @@ -4904,16 +4957,27 @@ int close_log_archive () { int rc = 0; if (archive_dir) { - rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n"); + rc = fprintf(archive_fp, + "\n------------------------------------------------------------------\n" + "-- End Of Archive Log\n" + "------------------------------------------------------------------\n" + "commit;\n" + "vacuum analyze %s.sl_setsync_offline;\n", + rtcfg_namespace); rc = fclose(archive_fp); rc = rename(archive_tmp, archive_name); } return rc; } -int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf) { - return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s');\n-- end of log archiving header\n------------------------------------------------------------------\n-- start of Slony-I data\n------------------------------------------------------------------\n", - namespace, sub_set, firstseq, seqbuf); +int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, + const char *seqbuf, const char *timestamp) { + return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n" + "-- end of log archiving header\n" + "------------------------------------------------------------------\n" + "-- start of Slony-I data\n" + "------------------------------------------------------------------\n", + namespace, sub_set, firstseq, seqbuf, timestamp); } int submit_query_to_archive(SlonDString *ds) { @@ -4935,15 +4999,12 @@ } } -int generate_archive_header (int node_id, char *seqbuf) { - time_t now; - now = time(NULL); +int generate_archive_header (int node_id, const char *seqbuf) { return fprintf(archive_fp, "-- Slony-I log shipping archive\n" "-- Node %d, Event %s\n" - "-- at... %s\n" "start transaction;\n", - node_id, seqbuf, ctime(&now)); + node_id, seqbuf); } /* write_void_log() writes out a "void" log consisting of the message Index: tools/slony1_dump.sh =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/tools/slony1_dump.sh,v retrieving revision 1.1 diff -c -u -r1.1 slony1_dump.sh --- tools/slony1_dump.sh 17 Feb 2005 06:59:05 -0000 1.1 +++ tools/slony1_dump.sh 23 Sep 2005 19:21:21 -0000 @@ -91,15 +91,15 @@ create table $clname.sl_setsync_offline ( ssy_setid int4, ssy_seqno int8, + ssy_synctime timestamptz, CONSTRAINT "sl_setsync-pkey" PRIMARY KEY (ssy_setid) ); - --- ---------------------------------------------------------------------- +-- ----------------------------------------------------------------------------- -- FUNCTION sequenceSetValue_offline (seq_id, seq_origin, ev_seqno, last_value) --- ---------------------------------------------------------------------- +-- ----------------------------------------------------------------------------- create or replace function $clname.sequenceSetValue_offline(int4, int8) returns int4 as ' declare @@ -127,15 +127,16 @@ end; ' language plpgsql; --- ---------------------------------------------------------------------- --- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value) --- ---------------------------------------------------------------------- +-- --------------------------------------------------------------------------------------- +-- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value, sync_time) +-- --------------------------------------------------------------------------------------- create or replace function $clname.setsyncTracking_offline(int4, int8, int8) returns int8 as ' declare p_set_id alias for \$1; p_old_seq alias for \$2; p_new_seq alias for \$3; + p_sync_time alias for \$4; v_row record; begin select ssy_seqno into v_row from $clname.sl_setsync_offline @@ -148,8 +149,9 @@ raise exception ''Slony-I: set % is on sync %, this archive log expects %'', p_set_id, v_row.ssy_seqno, p_old_seq; end if; + raise notice ''Slony-I: Process set % sync % time'', p_set_id, p_new_seq, p_sync_time; - update $clname.sl_setsync_offline set ssy_seqno = p_new_seq + update $clname.sl_setsync_offline set ssy_seqno = p_new_seq, ssy_synctime = p_sync_time where ssy_setid = p_set_id; return p_new_seq; end; -- (format nil "~S@~S" "cbbrowne" "ca.afilias.info") <http://dev6.int.libertyrms.com/> Christopher Browne (416) 673-4124 (land)
- Previous message: [Slony1-general] Master or Slave Down
- Next message: [Slony1-general] Forthcoming 1.1.1 release
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-general mailing list