Wed Oct 25 05:57:07 PDT 2006
- Previous message: [Slony1-commit] By cbbrowne: Bug #1591 - large tuples in sl_log_2 not being found - The
- Next message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Fixed archive log writing.
Jan
Tags:
----
REL_1_1_STABLE
Modified Files:
--------------
slony1-engine/src/slon:
remote_worker.c (r1.86.2.14 -> r1.86.2.15)
slon.h (r1.48.2.1 -> r1.48.2.2)
-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.86.2.14
retrieving revision 1.86.2.15
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.86.2.14 -r1.86.2.15
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -240,21 +240,20 @@
WorkerGroupData * wd, SlonWorkMsg_event * event);
static void *sync_helper(void *cdata);
-static char archive_name[SLON_MAX_PATH];
-static char archive_tmp[SLON_MAX_PATH];
-static FILE *archive_fp = NULL;
-static int open_log_archive (int node_id, char *seqbuf);
-static int close_log_archive ();
-static void terminate_log_archive ();
-static int generate_archive_header (int node_id, const char *seqbuf);
-static int submit_query_to_archive(SlonDString *ds);
-static int submit_string_to_archive (const char *s);
-static int submit_raw_data_to_archive (const char *s);
-static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq,
+static int archive_open (SlonNode *node, char *seqbuf);
+static int archive_close (SlonNode *node);
+static void archive_terminate (SlonNode *node);
+static int archive_append_ds (SlonNode *node, SlonDString *ds);
+static int archive_append_str (SlonNode *node, const char *s);
+static int archive_append_data (SlonNode *node, const char *data, int len);
+
+
+static int archive_tracking (SlonNode *node, const char *namespace,
+ int sub_set, const char *firstseq,
const char *seqbuf, const char *timestamp);
-static void write_void_log (int node_id, char *seqbuf, const char *message);
-#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
+static void archive_void_log (SlonNode *node, char *seqbuf, const char *message);
+
/*
* ---------- slon_remoteWorkerThread
@@ -592,7 +591,7 @@
need_reloadListen = true;
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_NODE");
+ archive_void_log (node, seqbuf, "-- STORE_NODE");
}
else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
@@ -610,7 +609,7 @@
need_reloadListen = true;
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- ENABLE_NODE");
+ archive_void_log (node, seqbuf, "-- ENABLE_NODE");
}
else if (strcmp(event->ev_type, "DROP_NODE") == 0)
{
@@ -661,7 +660,7 @@
need_reloadListen = true;
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_NODE");
+ archive_void_log (node, seqbuf, "-- DROP_NODE");
}
else if (strcmp(event->ev_type, "STORE_PATH") == 0)
{
@@ -680,7 +679,7 @@
need_reloadListen = true;
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_PATH");
+ archive_void_log (node, seqbuf, "-- STORE_PATH");
}
else if (strcmp(event->ev_type, "DROP_PATH") == 0)
{
@@ -697,7 +696,7 @@
need_reloadListen = true;
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_PATH");
+ archive_void_log (node, seqbuf, "-- DROP_PATH");
}
else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
{
@@ -713,7 +712,7 @@
rtcfg_namespace,
li_origin, li_provider, li_receiver);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_LISTEN");
+ archive_void_log (node, seqbuf, "-- STORE_LISTEN");
}
else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
{
@@ -729,7 +728,7 @@
rtcfg_namespace,
li_origin, li_provider, li_receiver);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_LISTEN");
+ archive_void_log (node, seqbuf, "-- DROP_LISTEN");
}
else if (strcmp(event->ev_type, "STORE_SET") == 0)
{
@@ -746,7 +745,7 @@
set_id, set_origin, set_comment);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_SET");
+ archive_void_log (node, seqbuf, "-- STORE_SET");
}
else if (strcmp(event->ev_type, "DROP_SET") == 0)
{
@@ -761,14 +760,17 @@
/* The table deleted needs to be
* dropped from log shipping too */
if (archive_dir) {
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
slon_mkquery(&query1,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, set_id);
- rc = submit_query_to_archive(&query1);
- rc = close_log_archive();
+
+ if (archive_open(node, seqbuf) < 0 ||
+ archive_append_ds(node, &query1) < 0 ||
+ archive_close(node) < 0)
+ {
+ slon_abort();
+ }
}
}
else if (strcmp(event->ev_type, "MERGE_SET") == 0)
@@ -787,14 +789,17 @@
* being merged from the set being
* maintained. */
if (archive_dir) {
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- rc = slon_mkquery(&query1,
+ slon_mkquery(&query1,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, add_id);
- rc = submit_query_to_archive(&query1);
- rc = close_log_archive();
+
+ if (archive_open(node, seqbuf) < 0 ||
+ archive_append_ds(node, &query1) < 0 ||
+ archive_close(node) < 0)
+ {
+ slon_abort();
+ }
}
}
else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
@@ -805,7 +810,7 @@
* in the runtime configuration.
*/
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE");
+ archive_void_log (node, seqbuf, "-- SET_ADD_TABLE");
}
else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
{
@@ -815,7 +820,7 @@
* maintained in the runtime configuration.
*/
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE");
+ archive_void_log (node, seqbuf, "-- SET_ADD_SEQUENCE");
}
else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0)
{
@@ -825,7 +830,7 @@
rtcfg_namespace,
tab_id);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE");
+ archive_void_log (node, seqbuf, "-- SET_DROP_TABLE");
}
else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0)
{
@@ -835,7 +840,7 @@
rtcfg_namespace,
seq_id);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE");
+ archive_void_log (node, seqbuf, "-- SET_DROP_SEQUENCE");
}
else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0)
{
@@ -846,7 +851,7 @@
rtcfg_namespace,
tab_id, new_set_id);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE");
+ archive_void_log (node, seqbuf, "-- SET_MOVE_TABLE");
}
else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0)
{
@@ -857,7 +862,7 @@
rtcfg_namespace,
seq_id, new_set_id);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE");
+ archive_void_log (node, seqbuf, "-- SET_MOVE_SEQUENCE");
}
else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
{
@@ -869,7 +874,7 @@
rtcfg_namespace,
trig_tabid, trig_tgname);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER");
+ archive_void_log (node, seqbuf, "-- STORE_TRIGGER");
}
else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
{
@@ -881,7 +886,7 @@
rtcfg_namespace,
trig_tabid, trig_tgname);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER");
+ archive_void_log (node, seqbuf, "-- DROP_TRIGGER");
}
else if (strcmp(event->ev_type, "ACCEPT_SET") == 0)
{
@@ -1023,7 +1028,7 @@
failed_node, backup_node, set_id);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- FAILOVER_SET");
+ archive_void_log (node, seqbuf, "-- FAILOVER_SET");
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
@@ -1041,7 +1046,7 @@
rtcfg_namespace,
sub_set, sub_provider, sub_receiver, sub_forward);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET");
+ archive_void_log (node, seqbuf, "-- SUBSCRIBE_SET");
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
@@ -1169,14 +1174,17 @@
need_reloadListen = true;
if (archive_dir) {
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
slon_mkquery(&query1,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, sub_set);
- rc = submit_query_to_archive(&query1);
- rc = close_log_archive();
+
+ if (archive_open(node, seqbuf) < 0 ||
+ archive_append_ds(node, &query1) < 0 ||
+ archive_close(node) < 0)
+ {
+ slon_abort();
+ }
}
}
else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
@@ -1194,46 +1202,19 @@
/* DDL_SCRIPT needs to be turned into a log shipping script */
if (archive_dir) {
if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) {
-
- rc = open_log_archive(node->no_id, seqbuf);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not open DDL archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_open(node, seqbuf) < 0)
slon_abort();
- }
- generate_archive_header(node->no_id, seqbuf);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not generate DDL archive header %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ rc = archive_tracking(node, rtcfg_namespace, ddl_setid,
+ seqbuf, seqbuf, event->ev_timestamp_c);
+ if (rc < 0)
slon_abort();
- }
- rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not generate DDL archive tracker %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_append_str(node, ddl_script) < 0)
slon_abort();
- }
- rc = submit_string_to_archive(ddl_script);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not submit DDL Script %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_abort();
- }
-
- rc = close_log_archive();
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not close DDL Script %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_close(node) < 0)
slon_abort();
}
}
}
- }
else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
{
int reset_config_setid = (int)strtol(event->ev_data1, NULL, 10);
@@ -1244,14 +1225,14 @@
rtcfg_namespace,
reset_config_setid, reset_configonly_on_node);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- RESET_CONFIG");
+ archive_void_log (node, seqbuf, "-- RESET_CONFIG");
}
else
{
printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
if (archive_dir)
- write_void_log (rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!");
+ archive_void_log (node, seqbuf, "-- UNHANDLED EVENT!!!");
}
/*
@@ -2281,30 +2262,13 @@
Isn't it convenient that seqbuf was just populated??? :-)
*/
if (archive_dir) {
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not open COPY SET archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_disconnectdb(pro_conn);
- dstring_free(&query1);
- dstring_free(&query2);
- dstring_free(&query3);
- dstring_free(&indexregenquery);
- terminate_log_archive();
- return -1;
- }
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not generate COPY SET archive header %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_open(node, seqbuf) < 0)
+ {
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
return -1;
}
}
@@ -2321,7 +2285,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -2354,7 +2318,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (*(PQgetvalue(res1, 0, 0)) == 't')
@@ -2368,7 +2332,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
PQclear(res1);
@@ -2385,7 +2349,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2428,7 +2392,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -2465,7 +2429,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2492,7 +2456,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2521,7 +2485,7 @@
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query3);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2561,7 +2525,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -2587,7 +2551,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2625,7 +2589,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -2666,7 +2630,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2693,7 +2657,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2717,7 +2681,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
@@ -2756,7 +2720,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -2780,7 +2744,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples2 = PQntuples(res2);
@@ -2798,7 +2762,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2828,7 +2792,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -2848,7 +2812,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -2876,25 +2840,21 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir) {
slon_mkquery(&query1,
"delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
nodeon73 ? "" : PQgetvalue(res3, 0, 0));
- rc = submit_query_to_archive(&query1);
+ rc = archive_append_ds(node, &query1);
if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_d: "
- "Could not generate copy_set request for %s - %s",
- node->no_id, tab_fqname, strerror(errno));
-
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2925,7 +2885,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -2956,15 +2916,12 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir) {
- rc = fwrite(copydata, 1, len, archive_fp);
- if (rc != len) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "PQputCopyData() - log shipping - %s",
- node->no_id, strerror(errno));
+ rc = archive_append_data(node, copydata, len);
+ if (rc < 0) {
#ifdef SLON_MEMDEBUG
memset(copydata, 88, len);
#endif
@@ -2978,7 +2935,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3002,7 +2959,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3026,7 +2983,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
PQclear(res3);
@@ -3046,7 +3003,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
res2 = PQgetResult(loc_dbconn);
@@ -3063,11 +3020,12 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir) {
- rc = submit_string_to_archive("\\.");
+ if (archive_append_str(node, "\\.") < 0)
+ slon_abort();
}
#else /* ! HAVE_PQPUTCOPYDATA */
copydone = false;
@@ -3092,12 +3050,14 @@
PQputline(loc_dbconn, copybuf);
PQputline(loc_dbconn, "\n");
if (archive_dir)
- submit_string_to_archive(copybuf);
+ if (archive_append_str(node, copybuf) < 0)
+ slon_abort();
break;
case 1:
PQputline(loc_dbconn, copybuf);
if (archive_dir)
- submit_raw_data_to_archive(copybuf);
+ if (archive_append_data(node, copybuf, strlen(copybuf) < 0)
+ slon_abort();
break;
}
@@ -3105,7 +3065,8 @@
}
PQputline(loc_dbconn, "\\.\n");
if (archive_dir) {
- rc = submit_string_to_archive("\\.");
+ if (archive_append_str(node, "\\.") < 0)
+ slon_abort();
}
/*
* End the COPY to stdout on the provider
@@ -3124,7 +3085,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
PQclear(res3);
@@ -3149,7 +3110,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
#endif /* HAVE_PQPUTCOPYDATA */
@@ -3174,11 +3135,12 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir) {
- submit_query_to_archive(&query1);
+ if (archive_append_ds(node, &query1) < 0)
+ slon_abort();
}
gettimeofday(&tv_now, NULL);
@@ -3224,7 +3186,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -3250,7 +3212,8 @@
seq_fqname, seql_last_value);
if (archive_dir) {
- submit_query_to_archive(&query1);
+ if (archive_append_ds(node, &query1) < 0)
+ slon_abort();
}
}
else
@@ -3270,7 +3233,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3317,7 +3280,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQntuples(res1) != 1)
@@ -3331,7 +3294,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQgetisnull(res1, 0, 0))
@@ -3381,7 +3344,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQntuples(res1) != 1)
@@ -3395,7 +3358,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ssy_seqno = PQgetvalue(res1, 0, 0);
@@ -3441,7 +3404,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res2);
@@ -3485,7 +3448,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQntuples(res1) != 1)
@@ -3499,7 +3462,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
dstring_init(&ssy_action_list);
@@ -3531,7 +3494,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir) {
@@ -3539,17 +3502,14 @@
"insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) "
"values ('%d', '%d');",
rtcfg_namespace, set_id, ssy_seqno);
- rc = submit_query_to_archive(&query1);
+ rc = archive_append_ds(node, &query1);
if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- " could not insert to sl_setsync_offline",
- node->no_id);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3560,17 +3520,14 @@
TIMEVAL_DIFF(&tv_start2, &tv_now));
if (archive_dir) {
- rc = close_log_archive();
+ rc = archive_close(node);
if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- " could not close archive log %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3588,7 +3545,7 @@
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
slon_disconnectdb(pro_conn);
@@ -3646,21 +3603,12 @@
*/
if (archive_dir)
{
- rc = open_log_archive(node->no_id, seqbuf);
- if (rc == -1) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot open archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
+ rc = archive_open(node, seqbuf);
+ if (rc < 0)
+ {
dstring_free(&query);
return 60;
}
- rc = generate_archive_header(node->no_id, seqbuf);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- return 60;
- }
}
/*
@@ -3684,7 +3632,8 @@
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"No pa_conninfo for data provider %d\n",
node->no_id, provider->no_id);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 10;
}
sprintf(conn_symname, "subscriber_%d_provider_%d",
@@ -3697,7 +3646,8 @@
"cannot connect to data provider %d on '%s'\n",
node->no_id, provider->no_id,
provider->pa_conninfo);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return provider->pa_connretry;
}
@@ -3709,7 +3659,8 @@
rtcfg_namespace, rtcfg_nodeid);
if (query_execute(node, provider->conn->dbconn, &query) < 0)
{
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_disconnectdb(provider->conn);
provider->conn = NULL;
return provider->pa_connretry;
@@ -3745,7 +3696,8 @@
"for ev_origin %d\n",
node->no_id, provider->no_id,
event->ev_origin);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 10;
}
if (prov_seqno < event->ev_seqno)
@@ -3755,7 +3707,8 @@
"ev_seqno " INT64_FORMAT " for ev_origin %d\n",
node->no_id, provider->no_id,
prov_seqno, event->ev_origin);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 10;
}
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -3822,7 +3775,8 @@
PQresultErrorMessage(res1));
PQclear(res1);
dstring_free(&new_qual);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
@@ -3871,7 +3825,8 @@
PQclear(res2);
PQclear(res1);
dstring_free(&new_qual);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
ntuples2 = PQntuples(res2);
@@ -3994,17 +3949,13 @@
slon_log(SLON_DEBUG2, "writing archive log...\n");
fflush(stderr);
fflush(stdout);
- rc = logarchive_tracking(rtcfg_namespace, sub_set,
+ rc = archive_tracking(node, rtcfg_namespace, sub_set,
PQgetvalue(res1, tupno1, 1), seqbuf,
event->ev_timestamp_c);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (rc < 0)
return 60;
}
}
- }
PQclear(res1);
/*
@@ -4034,14 +3985,10 @@
node->no_id);
dstring_free(&query);
if (archive_dir) {
- rc = close_log_archive();
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not close out archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ rc = archive_close(node);
+ if (rc < 0)
return 60;
}
- }
return 0;
}
@@ -4157,15 +4104,10 @@
* the archive log.
*/
if (archive_dir) {
- rc = submit_string_to_archive(dstring_data(&(wgline->data)));
- /* rc = fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); */
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ rc = archive_append_ds(node, &(wgline->data));
+ if (rc < 0)
return 60;
}
- }
break;
case SLON_WGLC_DONE:
@@ -4245,7 +4187,8 @@
*/
if (num_errors != 0)
{
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
node->no_id);
return 10;
@@ -4282,7 +4225,8 @@
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_disconnectdb(provider->conn);
provider->conn = NULL;
return 20;
@@ -4300,7 +4244,8 @@
if (query_execute(node, local_dbconn, &query) < 0)
{
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
@@ -4313,15 +4258,11 @@
"select %s.sequenceSetValue_offline(%s,'%s');\n",
rtcfg_namespace,
seql_seqid, seql_last_value);
- rc = submit_query_to_archive(&query);
- if (rc < 0) {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ rc = archive_append_ds(node, &query);
+ if (rc < 0)
return 60;
}
}
- }
PQclear(res1);
}
@@ -4361,7 +4302,8 @@
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
node->no_id);
return 10;
@@ -4387,7 +4329,8 @@
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
if (PQntuples(res1) > 0)
@@ -4401,7 +4344,8 @@
if (query_execute(node, local_dbconn, &query) < 0)
{
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -4416,7 +4360,8 @@
*/
if (archive_dir)
{
- close_log_archive();
+ if (archive_close(node) < 0)
+ slon_abort();
}
/*
@@ -4811,49 +4756,85 @@
/* Functions for processing log archives...
- - First, you open the log archive using open_log_archive()
-
- - Second, you generate the header using generate_archive_header()
+ - First, you open the log archive using archive_open()
- - Third, you need to set up the sync tracking function in the log
+ - Second, you need to set up the sync tracking function in the log
using logarchive_tracking()
============= Here Ends The Header of the Log Shipping Archive ==================
Then come the various queries (inserts/deletes/updates) that
comprise the "body" of the SYNC. Probably submitted using
- submit_query_to_archive().
+ archive_append_*().
============= Here Ends The Body of the Log Shipping Archive ==================
Finally, the log ends, notably with a COMMIT statement, generated
- using close_log_archive(), which closes the file and renames it
+ using archive_close(), which closes the file and renames it
from ".tmp" form to the final name.
*/
/* Stores the archive name in archive_name (as .sql name) and archive_tmp (.tmp file) */
-int open_log_archive (int node_id, char *seqbuf) {
+static int
+archive_open (SlonNode *node, char *seqbuf)
+{
int i;
- sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id);
+ int rc;
+
+ if (node->archive_name == NULL)
+ {
+ node->archive_name = malloc(SLON_MAX_PATH);
+ node->archive_temp = malloc(SLON_MAX_PATH);
+ if (node->archive_name == NULL || node->archive_temp == NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Out of memory in archive_open()", node->no_id);
+ slon_abort();
+ }
+ }
+
+ sprintf(node->archive_name, "%s/slony1_log_%d_",
+ archive_dir, node->no_id);
for (i = strlen(seqbuf); i < 20; i++)
- strcat(archive_name, "0");
- strcat(archive_name, seqbuf);
- strcat(archive_name, ".sql");
- strcpy(archive_tmp, archive_name);
- strcat(archive_tmp, ".tmp");
- archive_fp = fopen(archive_tmp, "w");
- if (archive_fp == NULL) {
+ strcat(node->archive_name, "0");
+ strcat(node->archive_name, seqbuf);
+ strcat(node->archive_name, ".sql");
+ strcpy(node->archive_temp, node->archive_name);
+ strcat(node->archive_temp, ".tmp");
+
+ node->archive_fp = fopen(node->archive_temp, "w");
+ if (node->archive_fp == NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not open archive file %s - %s",
+ node->archive_temp, strerror(errno));
return -1;
- } else {
- return 0;
}
+ rc = fprintf(node->archive_fp,
+ "-- Slony-I log shipping archive\n"
+ "-- Node %d, Event %s\n"
+ "start transaction;\n",
+ node->no_id, seqbuf);
+ if (rc < 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not open archive file %s - %s",
+ node->archive_temp, strerror(errno));
+ return -1;
}
-int close_log_archive () {
+ return 0;
+}
+
+
+static int
+archive_close (SlonNode *node)
+{
int rc = 0;
- if (archive_dir) {
- rc = fprintf(archive_fp,
+
+ if (node->archive_fp != NULL) {
+ rc = fprintf(node->archive_fp,
"\n------------------------------------------------------------------\n"
"-- End Of Archive Log\n"
"------------------------------------------------------------------\n"
@@ -4861,59 +4842,135 @@
"vacuum analyze %s.sl_setsync_offline;\n",
rtcfg_namespace);
if ( rc < 0 )
+ {
+ archive_terminate(node);
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not write to archive file %s - %s",
+ node->archive_temp, strerror(errno));
return -1;
- rc = fclose(archive_fp);
- archive_fp = NULL;
+ }
+
+ rc = fclose(node->archive_fp);
+ node->archive_fp = NULL;
if ( rc < 0 )
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not close archive file %s - %s",
+ node->archive_temp, strerror(errno));
+ return -1;
+ }
+
+ rc = rename(node->archive_temp, node->archive_name);
+ if (rc < 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not rename archive file %s to %s - %s",
+ node->archive_temp, node->archive_name, strerror(errno));
return -1;
- rc = rename(archive_tmp, archive_name);
}
- return rc;
}
-int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq,
- const char *seqbuf, const char *timestamp) {
- return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
- "-- end of log archiving header\n"
- "------------------------------------------------------------------\n"
- "-- start of Slony-I data\n"
- "------------------------------------------------------------------\n",
- namespace, sub_set, firstseq, seqbuf, timestamp);
+ return 0;
}
-int submit_query_to_archive(SlonDString *ds) {
- return fprintf(archive_fp, "%s\n", ds->data);
+
+static void
+archive_terminate (SlonNode *node)
+{
+ if (node->archive_fp != NULL) {
+ fclose(node->archive_fp);
+ node->archive_fp = NULL;
+ }
}
-int submit_string_to_archive (const char *s) {
- return fprintf(archive_fp, "%s\n", s);
+
+static int
+archive_append_ds (SlonNode *node, SlonDString *ds)
+{
+ int rc;
+
+ rc = fprintf(node->archive_fp, "%s\n", dstring_data(ds));
+ if (rc < 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not write to archive file %s - %s",
+ node->archive_temp, strerror(errno));
+ return -1;
}
-/* Raw form used for COPY where we don't want any extra cr/lf output */
-int submit_raw_data_to_archive (const char *s) {
- return fprintf(archive_fp, "%s", s);
+ return 0;
}
-void terminate_log_archive () {
- if (archive_fp) {
- fclose(archive_fp);
+
+static int
+archive_append_str (SlonNode *node, const char *str)
+{
+ int rc;
+
+ rc = fprintf(node->archive_fp, "%s\n", str);
+ if (rc < 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not write to archive file %s - %s",
+ node->archive_temp, strerror(errno));
+ return -1;
}
+
+ return 0;
}
-int generate_archive_header (int node_id, const char *seqbuf) {
- return fprintf(archive_fp,
- "-- Slony-I log shipping archive\n"
- "-- Node %d, Event %s\n"
- "start transaction;\n",
- node_id, seqbuf);
+
+static int
+archive_append_data (SlonNode *node, const char *data, int len) {
+ int rc;
+
+ rc = fwrite(data, len, 1, node->archive_fp);
+ if (rc != 1)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not write to archive file %s - %s",
+ node->archive_temp, strerror(errno));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static int
+archive_tracking (SlonNode *node, const char *namespace, int sub_set,
+ const char *firstseq, const char *seqbuf,
+ const char *timestamp)
+{
+ int rc;
+
+ rc = fprintf(node->archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
+ "-- end of log archiving header\n"
+ "------------------------------------------------------------------\n"
+ "-- start of Slony-I data\n"
+ "------------------------------------------------------------------\n",
+ namespace, sub_set, firstseq, seqbuf, timestamp);
+
+ if (rc < 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not write to archive file %s - %s",
+ node->archive_temp, strerror(errno));
+ return -1;
}
-/* write_void_log() writes out a "void" log consisting of the message
+ return 0;
+}
+
+
+/* archive_void_log() writes out a "void" log consisting of the message
* which must either be a valid SQL query or a SQL comment. */
-void write_void_log (int node_id, char *seqbuf, const char *message) {
- open_log_archive(node_id, seqbuf);
- generate_archive_header(node_id, seqbuf);
- submit_string_to_archive(message);
- close_log_archive();
+static void
+archive_void_log (SlonNode *node, char *seqbuf, const char *message) {
+ archive_open(node, seqbuf);
+ archive_append_str(node, message);
+ archive_close(node);
}
+
+
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.48.2.1
retrieving revision 1.48.2.2
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.48.2.1 -r1.48.2.2
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -106,6 +106,10 @@
SlonWorkMsg *message_head;
SlonWorkMsg *message_tail;
+ char *archive_name;
+ char *archive_temp;
+ FILE *archive_fp;
+
SlonNode *prev;
SlonNode *next;
};
- Previous message: [Slony1-commit] By cbbrowne: Bug #1591 - large tuples in sl_log_2 not being found - The
- Next message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list