Wed Oct 25 06:53:36 PDT 2006
- Previous message: [Slony1-commit] By wieck: Fixed archive log writing.
- Next message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Fixed archive log writing by moving global variables into
the node structure.
Corrected node ids in the handover scripts of ducttape test_1 and
added node handover scripts (move set) to tests 2 and 3.
Tags:
----
REL_1_2_STABLE
Modified Files:
--------------
slony1-engine/src/ducttape:
test_1_handover_to_1 (r1.1 -> r1.1.6.1)
test_1_handover_to_2 (r1.1 -> r1.1.6.1)
slony1-engine/src/slon:
remote_worker.c (r1.124.2.2 -> r1.124.2.3)
slon.h (r1.59 -> r1.59.2.1)
Added Files:
-----------
slony1-engine/src/ducttape:
test_2_handover_to_1 (r1.1.2.1)
test_2_handover_to_2 (r1.1.2.1)
test_3_handover_to_1 (r1.1.2.1)
test_3_handover_to_2 (r1.1.2.1)
-------------- next part --------------
--- /dev/null
+++ src/ducttape/test_2_handover_to_2
@@ -0,0 +1,29 @@
+#!/bin/sh
+
+# **********
+# test_1_handover_to_2
+#
+# Script to change the origin of set 1 from node 1 to node 2.
+# This still requires that node 1 is alive. This is called
+# handover or move, not failover.
+# **********
+
+export PATH
+TMPOUT=/tmp/output.$$
+DB1=slony_test1
+DB2=slony_test2
+
+######################################################################
+# Move set 1 to node 2
+######################################################################
+
+echo "**** Move set 1 to node 2"
+slonik <<_EOF_
+ cluster name = T1;
+ node 1 admin conninfo = 'dbname=$DB1';
+ node 2 admin conninfo = 'dbname=$DB2';
+
+ lock set (id = 1, origin = 1);
+ move set (id = 1, old origin = 1, new origin = 2);
+_EOF_
+
Index: test_1_handover_to_2
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_1_handover_to_2,v
retrieving revision 1.1
retrieving revision 1.1.6.1
diff -Lsrc/ducttape/test_1_handover_to_2 -Lsrc/ducttape/test_1_handover_to_2 -u -w -r1.1 -r1.1.6.1
--- src/ducttape/test_1_handover_to_2
+++ src/ducttape/test_1_handover_to_2
@@ -3,8 +3,8 @@
# **********
# test_1_handover_to_2
#
-# Script to change the origin of set 1 from node 1 to node 2.
-# This still requires that node 1 is alive. This is called
+# Script to change the origin of set 1 from node 11 to node 22.
+# This still requires that node 11 is alive. This is called
# handover or move, not failover.
# **********
@@ -20,10 +20,10 @@
echo "**** Move set 1 to node 2"
slonik <<_EOF_
cluster name = T1;
- node 1 admin conninfo = 'dbname=$DB1';
- node 2 admin conninfo = 'dbname=$DB2';
+ node 11 admin conninfo = 'dbname=$DB1';
+ node 22 admin conninfo = 'dbname=$DB2';
- lock set (id = 1, origin = 1);
- move set (id = 1, old origin = 1, new origin = 2);
+ lock set (id = 1, origin = 11);
+ move set (id = 1, old origin = 11, new origin = 22);
_EOF_
--- /dev/null
+++ src/ducttape/test_3_handover_to_2
@@ -0,0 +1,29 @@
+#!/bin/sh
+
+# **********
+# test_1_handover_to_2
+#
+# Script to change the origin of set 1 from node 1 to node 2.
+# This still requires that node 1 is alive. This is called
+# handover or move, not failover.
+# **********
+
+export PATH
+TMPOUT=/tmp/output.$$
+DB1=slony_test1
+DB2=slony_test2
+
+######################################################################
+# Move set 1 to node 2
+######################################################################
+
+echo "**** Move set 1 to node 2"
+slonik <<_EOF_
+ cluster name = T1;
+ node 1 admin conninfo = 'dbname=$DB1';
+ node 2 admin conninfo = 'dbname=$DB2';
+
+ lock set (id = 1, origin = 1);
+ move set (id = 1, old origin = 1, new origin = 2);
+_EOF_
+
Index: test_1_handover_to_1
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_1_handover_to_1,v
retrieving revision 1.1
retrieving revision 1.1.6.1
diff -Lsrc/ducttape/test_1_handover_to_1 -Lsrc/ducttape/test_1_handover_to_1 -u -w -r1.1 -r1.1.6.1
--- src/ducttape/test_1_handover_to_1
+++ src/ducttape/test_1_handover_to_1
@@ -3,8 +3,8 @@
# **********
# test_1_handover_to_2
#
-# Script to change the origin of set 1 from node 1 to node 2.
-# This still requires that node 1 is alive. This is called
+# Script to change the origin of set 1 from node 22 back to node 11.
+# This still requires that both nodes are alive. This is called
# handover or move, not failover.
# **********
@@ -20,10 +20,10 @@
echo "**** Move set 1 to node 2"
slonik <<_EOF_
cluster name = T1;
- node 1 admin conninfo = 'dbname=$DB1';
- node 2 admin conninfo = 'dbname=$DB2';
+ node 11 admin conninfo = 'dbname=$DB1';
+ node 22 admin conninfo = 'dbname=$DB2';
- lock set (id = 1, origin = 2);
- move set (id = 1, old origin = 2, new origin = 1);
+ lock set (id = 1, origin = 22);
+ move set (id = 1, old origin = 22, new origin = 11);
_EOF_
--- /dev/null
+++ src/ducttape/test_3_handover_to_1
@@ -0,0 +1,29 @@
+#!/bin/sh
+
+# **********
+# test_1_handover_to_2
+#
+# Script to change the origin of set 1 from node 2 back to node 1.
+# This still requires that both nodes are alive. This is called
+# handover or move, not failover.
+# **********
+
+export PATH
+TMPOUT=/tmp/output.$$
+DB1=slony_test1
+DB2=slony_test2
+
+######################################################################
+# Move set 1 to node 2
+######################################################################
+
+echo "**** Move set 1 to node 2"
+slonik <<_EOF_
+ cluster name = T1;
+ node 1 admin conninfo = 'dbname=$DB1';
+ node 2 admin conninfo = 'dbname=$DB2';
+
+ lock set (id = 1, origin = 2);
+ move set (id = 1, old origin = 2, new origin = 1);
+_EOF_
+
--- /dev/null
+++ src/ducttape/test_2_handover_to_1
@@ -0,0 +1,29 @@
+#!/bin/sh
+
+# **********
+# test_1_handover_to_2
+#
+# Script to change the origin of set 1 from node 2 back to node 1.
+# This still requires that both nodes are alive. This is called
+# handover or move, not failover.
+# **********
+
+export PATH
+TMPOUT=/tmp/output.$$
+DB1=slony_test1
+DB2=slony_test2
+
+######################################################################
+# Move set 1 to node 2
+######################################################################
+
+echo "**** Move set 1 to node 2"
+slonik <<_EOF_
+ cluster name = T1;
+ node 1 admin conninfo = 'dbname=$DB1';
+ node 2 admin conninfo = 'dbname=$DB2';
+
+ lock set (id = 1, origin = 2);
+ move set (id = 1, old origin = 2, new origin = 1);
+_EOF_
+
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.124.2.2
retrieving revision 1.124.2.3
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.124.2.2 -r1.124.2.3
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -254,25 +254,21 @@
WorkerGroupData * wd, SlonWorkMsg_event * event);
static void *sync_helper(void *cdata);
-static char archive_name[SLON_MAX_PATH];
-static char archive_tmp[SLON_MAX_PATH];
-static FILE *archive_fp = NULL;
-static int open_log_archive(int node_id, char *seqbuf);
-static int close_log_archive();
-static void terminate_log_archive();
-static int generate_archive_header(int node_id, const char *seqbuf);
-static int submit_query_to_archive(SlonDString * ds);
-static int submit_string_to_archive(const char *s);
-#ifndef HAVE_PQPUTCOPYDATA
-static int submit_raw_data_to_archive(const char *s);
-#endif
-static int logarchive_tracking(const char *namespace, int sub_set, const char *firstseq,
+
+static int archive_open(SlonNode *node, char *seqbuf);
+static int archive_close(SlonNode *node);
+static void archive_terminate(SlonNode *node);
+static int archive_append_ds(SlonNode *node, SlonDString * ds);
+static int archive_append_str(SlonNode *node, const char *s);
+static int archive_append_data(SlonNode *node, const char *s, int len);
+static int archive_tracking(SlonNode *node, const char *namespace,
+ int sub_set, const char *firstseq,
const char *seqbuf, const char *timestamp);
-static int write_void_log(int node_id, char *seqbuf, const char *message);
+static int archive_void_log(SlonNode *node, char *seqbuf, const char *message);
+
static void compress_actionseq(const char *ssy_actionseq, SlonDString * action_subquery);
-#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
/* ----------
* slon_remoteWorkerThread
@@ -645,14 +641,10 @@
need_reloadListen = true;
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_NODE");
+ rc = archive_void_log(node, seqbuf, "-- STORE_NODE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
- }
}
else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
@@ -671,15 +663,11 @@
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- ENABLE_NODE");
+ rc = archive_void_log(node, seqbuf, "-- ENABLE_NODE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "DROP_NODE") == 0)
{
int no_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -730,15 +718,11 @@
need_reloadListen = true;
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_NODE");
+ rc = archive_void_log(node, seqbuf, "-- DROP_NODE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "STORE_PATH") == 0)
{
int pa_server = (int)strtol(event->ev_data1, NULL, 10);
@@ -757,15 +741,11 @@
need_reloadListen = true;
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_PATH");
+ rc = archive_void_log(node, seqbuf, "-- STORE_PATH");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "DROP_PATH") == 0)
{
int pa_server = (int)strtol(event->ev_data1, NULL, 10);
@@ -782,16 +762,11 @@
need_reloadListen = true;
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_PATH");
+ rc = archive_void_log(node, seqbuf, "-- DROP_PATH");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
-
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
{
int li_origin = (int)strtol(event->ev_data1, NULL, 10);
@@ -807,15 +782,11 @@
li_origin, li_provider, li_receiver);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_LISTEN");
+ rc = archive_void_log(node, seqbuf, "-- STORE_LISTEN");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
{
int li_origin = (int)strtol(event->ev_data1, NULL, 10);
@@ -831,14 +802,10 @@
li_origin, li_provider, li_receiver);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_LISTEN");
+ rc = archive_void_log(node, seqbuf, "-- DROP_LISTEN");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
- }
}
else if (strcmp(event->ev_type, "STORE_SET") == 0)
@@ -857,15 +824,11 @@
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_SET");
+ rc = archive_void_log(node, seqbuf, "-- STORE_SET");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "DROP_SET") == 0)
{
int set_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -881,39 +844,15 @@
*/
if (archive_dir)
{
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
slon_mkquery(&lsquery,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, set_id);
- rc = submit_query_to_archive(&lsquery);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_open(node, seqbuf) < 0 ||
+ archive_append_ds(node, &lsquery) < 0 ||
+ archive_close(node) < 0)
slon_retry();
}
- rc = close_log_archive();
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
- }
}
else if (strcmp(event->ev_type, "MERGE_SET") == 0)
{
@@ -933,40 +872,16 @@
*/
if (archive_dir)
{
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
rc = slon_mkquery(&lsquery,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, add_id);
- rc = submit_query_to_archive(&lsquery);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
- rc = close_log_archive();
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_open(node, seqbuf) < 0 ||
+ archive_append_ds(node, &lsquery) < 0 ||
+ archive_close(node) < 0)
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
{
/*
@@ -976,15 +891,11 @@
*/
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE");
+ rc = archive_void_log(node, seqbuf, "-- SET_ADD_TABLE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
{
/*
@@ -994,15 +905,11 @@
*/
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE");
+ rc = archive_void_log(node, seqbuf, "-- SET_ADD_SEQUENCE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0)
{
int tab_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1012,15 +919,11 @@
tab_id);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE");
+ rc = archive_void_log(node, seqbuf, "-- SET_DROP_TABLE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0)
{
int seq_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1030,15 +933,11 @@
seq_id);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE");
+ rc = archive_void_log(node, seqbuf, "-- SET_DROP_SEQUENCE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0)
{
int tab_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1049,15 +948,11 @@
tab_id, new_set_id);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE");
+ rc = archive_void_log(node, seqbuf, "-- SET_MOVE_TABLE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0)
{
int seq_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1068,15 +963,11 @@
seq_id, new_set_id);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE");
+ rc = archive_void_log(node, seqbuf, "-- SET_MOVE_SEQUENCE");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
{
int trig_tabid = (int)strtol(event->ev_data1, NULL, 10);
@@ -1088,15 +979,11 @@
trig_tabid, trig_tgname);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER");
+ rc = archive_void_log(node, seqbuf, "-- STORE_TRIGGER");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
{
int trig_tabid = (int)strtol(event->ev_data1, NULL, 10);
@@ -1108,15 +995,11 @@
trig_tabid, trig_tgname);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER");
+ rc = archive_void_log(node, seqbuf, "-- DROP_TRIGGER");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
}
- }
else if (strcmp(event->ev_type, "ACCEPT_SET") == 0)
{
int set_id,
@@ -1289,14 +1172,10 @@
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- FAILOVER_SET");
+ rc = archive_void_log(node, seqbuf, "-- FAILOVER_SET");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
- }
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
@@ -1315,14 +1194,10 @@
sub_set, sub_provider, sub_receiver, sub_forward);
if (archive_dir)
{
- rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET");
+ rc = archive_void_log(node, seqbuf, "-- SUBSCRIBE_SET");
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
- }
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
@@ -1453,39 +1328,15 @@
need_reloadListen = true;
if (archive_dir)
{
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
slon_mkquery(&lsquery,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, sub_set);
- rc = submit_query_to_archive(&lsquery);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_open(node, seqbuf) < 0 ||
+ archive_append_ds(node, &lsquery) < 0 ||
+ archive_close(node) < 0)
slon_retry();
}
- rc = close_log_archive();
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
- }
}
else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
{
@@ -1572,50 +1423,19 @@
if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
{
- rc = open_log_archive(node->no_id, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not open DDL archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_open(node, seqbuf) < 0)
slon_retry();
- }
- generate_archive_header(node->no_id, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not generate DDL archive header %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_tracking(node, rtcfg_namespace,
+ ddl_setid, seqbuf, seqbuf,
+ event->ev_timestamp_c) < 0)
slon_retry();
- }
- rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not generate DDL archive tracker %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_retry();
- }
- rc = submit_string_to_archive(ddl_script);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not submit DDL Script %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_append_str(node, ddl_script) < 0)
slon_retry();
- }
-
- rc = close_log_archive();
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not close DDL Script %s - %s",
- node->no_id, archive_tmp, strerror(errno));
+ if (archive_close(node) < 0)
slon_retry();
}
}
}
- }
else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
{
int reset_config_setid = (int)strtol(event->ev_data1, NULL, 10);
@@ -1626,14 +1446,16 @@
rtcfg_namespace,
reset_config_setid, reset_configonly_on_node);
if (archive_dir)
- write_void_log(rtcfg_nodeid, seqbuf, "-- RESET_CONFIG");
+ if (archive_void_log(node, seqbuf, "-- RESET_CONFIG") < 0)
+ slon_retry();
}
else
{
printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
if (archive_dir)
- write_void_log(rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!");
+ if (archive_void_log(node, seqbuf, "-- UNHANDLED EVENT!!!") < 0)
+ slon_retry();
}
/*
@@ -2681,34 +2503,16 @@
*/
if (archive_dir)
{
- rc = open_log_archive(rtcfg_nodeid, seqbuf);
+ rc = archive_open(node, seqbuf);
if (rc < 0)
{
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not open COPY SET archive file %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
- return -1;
- }
- rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not generate COPY SET archive header %s - %s",
- node->no_id, archive_tmp, strerror(errno));
- slon_disconnectdb(pro_conn);
- dstring_free(&query1);
- dstring_free(&query2);
- dstring_free(&query3);
- dstring_free(&lsquery);
- dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2727,7 +2531,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -2761,7 +2565,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (*(PQgetvalue(res1, 0, 0)) == 't')
@@ -2776,7 +2580,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
PQclear(res1);
@@ -2794,7 +2598,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2842,7 +2646,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -2880,7 +2684,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2908,7 +2712,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2938,7 +2742,7 @@
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query3);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -2959,7 +2763,7 @@
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query3);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3000,7 +2804,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -3027,7 +2831,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3066,7 +2870,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -3108,7 +2912,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -3136,7 +2940,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -3161,7 +2965,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
@@ -3201,7 +3005,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3226,7 +3030,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples2 = PQntuples(res2);
@@ -3245,7 +3049,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3277,7 +3081,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3299,7 +3103,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3332,7 +3136,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir)
@@ -3340,20 +3144,16 @@
slon_mkquery(&query1,
"delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
nodeon73 ? "" : PQgetvalue(res3, 0, 0));
- rc = submit_query_to_archive(&query1);
+ rc = archive_append_ds(node, &query1);
if (rc < 0)
{
- slon_log(SLON_ERROR, "remoteWorkerThread_d: "
- "Could not generate copy_set request for %s - %s",
- node->no_id, tab_fqname, strerror(errno));
-
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3386,7 +3186,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3418,17 +3218,14 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir)
{
- rc = fwrite(copydata, 1, len, archive_fp);
- if (rc != len)
+ rc = archive_append_data(node, copydata, len);
+ if (rc < 0)
{
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "PQputCopyData() - log shipping - %s",
- node->no_id, strerror(errno));
#ifdef SLON_MEMDEBUG
memset(copydata, 88, len);
#endif
@@ -3443,7 +3240,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3468,7 +3265,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3493,7 +3290,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
PQclear(res3);
@@ -3514,7 +3311,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
res2 = PQgetResult(loc_dbconn);
@@ -3532,12 +3329,12 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir)
{
- rc = submit_string_to_archive("\\.");
+ rc = archive_append_str(node, "\\.");
if (rc < 0) {
PQclear(res2);
PQclear(res1);
@@ -3547,7 +3344,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3574,7 +3371,7 @@
PQputline(loc_dbconn, copybuf);
PQputline(loc_dbconn, "\n");
if (archive_dir) {
- rc = submit_string_to_archive(copybuf);
+ rc = archive_append_str(node, copybuf);
if (rc < 0) {
PQclear(res2);
PQclear(res1);
@@ -3584,7 +3381,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3592,7 +3389,7 @@
case 1:
PQputline(loc_dbconn, copybuf);
if (archive_dir) {
- rc = submit_raw_data_to_archive(copybuf);
+ rc = archive_append_data(node, copybuf, strlen(copybuf));
if (rc < 0) {
PQclear(res2);
PQclear(res1);
@@ -3602,7 +3399,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
@@ -3615,7 +3412,7 @@
PQputline(loc_dbconn, "\\.\n");
if (archive_dir)
{
- rc = submit_string_to_archive("\\.");
+ rc = archive_append_str(node, "\\.");
if (rc < 0) {
PQclear(res2);
PQclear(res1);
@@ -3625,7 +3422,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3648,7 +3445,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
PQclear(res3);
@@ -3674,7 +3471,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
#endif /* HAVE_PQPUTCOPYDATA */
@@ -3700,12 +3497,12 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir)
{
- rc = submit_query_to_archive(&query1);
+ rc = archive_append_ds(node, &query1);
if (rc < 0) {
return -1;
}
@@ -3755,7 +3552,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res1);
@@ -3782,7 +3579,7 @@
if (archive_dir)
{
- rc = submit_query_to_archive(&query1);
+ rc = archive_append_ds(node, &query1);
if (rc < 0) {
PQclear(res1);
slon_disconnectdb(pro_conn);
@@ -3791,7 +3588,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3814,7 +3611,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -3862,7 +3659,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQntuples(res1) != 1)
@@ -3877,7 +3674,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQgetisnull(res1, 0, 0))
@@ -3928,7 +3725,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQntuples(res1) != 1)
@@ -3943,7 +3740,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ssy_seqno = PQgetvalue(res1, 0, 0);
@@ -3990,7 +3787,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
ntuples1 = PQntuples(res2);
@@ -4035,7 +3832,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (PQntuples(res1) != 1)
@@ -4050,7 +3847,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
dstring_init(&ssy_action_list);
@@ -4083,7 +3880,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
if (archive_dir)
@@ -4092,7 +3889,7 @@
"insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) "
"values ('%d', '%d');",
rtcfg_namespace, set_id, ssy_seqno);
- rc = submit_query_to_archive(&lsquery);
+ rc = archive_append_ds(node, &lsquery);
if (rc < 0)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
@@ -4104,7 +3901,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -4116,19 +3913,16 @@
if (archive_dir)
{
- rc = close_log_archive();
+ rc = archive_close(node);
if (rc < 0)
{
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- " could not close archive log %s - %s",
- node->no_id, archive_tmp, strerror(errno));
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
}
@@ -4147,7 +3941,7 @@
dstring_free(&query3);
dstring_free(&lsquery);
dstring_free(&indexregenquery);
- terminate_log_archive();
+ archive_terminate(node);
return -1;
}
slon_disconnectdb(pro_conn);
@@ -4217,21 +4011,10 @@
*/
if (archive_dir)
{
- rc = open_log_archive(node->no_id, seqbuf);
- if (rc == -1)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot open archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
- dstring_free(&query);
- return 60;
- }
- rc = generate_archive_header(node->no_id, seqbuf);
+ rc = archive_open(node, seqbuf);
if (rc < 0)
{
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
+ dstring_free(&query);
return 60;
}
}
@@ -4257,7 +4040,8 @@
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"No pa_conninfo for data provider %d\n",
node->no_id, provider->no_id);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 10;
}
sprintf(conn_symname, "subscriber_%d_provider_%d",
@@ -4270,7 +4054,8 @@
"cannot connect to data provider %d on '%s'\n",
node->no_id, provider->no_id,
provider->pa_conninfo);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return provider->pa_connretry;
}
@@ -4282,7 +4067,8 @@
rtcfg_namespace, rtcfg_nodeid);
if (query_execute(node, provider->conn->dbconn, &query) < 0)
{
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_disconnectdb(provider->conn);
provider->conn = NULL;
return provider->pa_connretry;
@@ -4318,7 +4104,8 @@
"for ev_origin %d\n",
node->no_id, provider->no_id,
event->ev_origin);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 10;
}
if (prov_seqno < event->ev_seqno)
@@ -4328,7 +4115,8 @@
"ev_seqno " INT64_FORMAT " for ev_origin %d\n",
node->no_id, provider->no_id,
prov_seqno, event->ev_origin);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 10;
}
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -4395,7 +4183,8 @@
PQresultErrorMessage(res1));
PQclear(res1);
dstring_free(&new_qual);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
@@ -4444,7 +4233,8 @@
PQclear(res2);
PQclear(res1);
dstring_free(&new_qual);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
ntuples2 = PQntuples(res2);
@@ -4578,16 +4368,11 @@
slon_log(SLON_DEBUG2, "writing archive log...\n");
fflush(stderr);
fflush(stdout);
- rc = logarchive_tracking(rtcfg_namespace, sub_set,
+ rc = archive_tracking(node, rtcfg_namespace, sub_set,
PQgetvalue(res1, tupno1, 1), seqbuf,
event->ev_timestamp_c);
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
- return 60;
- }
+ slon_retry();
}
}
PQclear(res1);
@@ -4620,14 +4405,9 @@
dstring_free(&query);
if (archive_dir)
{
- rc = close_log_archive();
+ rc = archive_close(node);
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not close out archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
- return 60;
- }
+ slon_retry();
}
return 0;
}
@@ -4644,7 +4424,8 @@
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_disconnectdb(provider->conn);
provider->conn = NULL;
return 20;
@@ -4655,7 +4436,8 @@
slon_log(SLON_ERROR, "remoteWorkerThread_%d: cannot determine current log status\n",
node->no_id);
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_disconnectdb(provider->conn);
provider->conn = NULL;
return 20;
@@ -4778,19 +4560,9 @@
*/
if (archive_dir)
{
- rc = submit_string_to_archive(dstring_data(&(wgline->data)));
-
- /*
- * rc = fprintf(archive_fp, "%s",
- * dstring_data(&(wgline->data)));
- */
+ rc = archive_append_ds(node, &(wgline->data));
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
- return 60;
- }
+ slon_retry();
}
break;
@@ -4889,7 +4661,8 @@
*/
if (num_errors != 0)
{
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
node->no_id);
return 10;
@@ -4926,7 +4699,8 @@
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_disconnectdb(provider->conn);
provider->conn = NULL;
return 20;
@@ -4944,7 +4718,8 @@
if (query_execute(node, local_dbconn, &query) < 0)
{
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
@@ -4957,14 +4732,9 @@
"select %s.sequenceSetValue_offline(%s,'%s');\n",
rtcfg_namespace,
seql_seqid, seql_last_value);
- rc = submit_query_to_archive(&lsquery);
+ rc = archive_append_ds(node, &lsquery);
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Cannot write to archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
- return 60;
- }
+ slon_retry();
}
}
PQclear(res1);
@@ -5006,7 +4776,8 @@
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
node->no_id);
return 10;
@@ -5032,7 +4803,8 @@
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
if (PQntuples(res1) > 0)
@@ -5046,7 +4818,8 @@
if (query_execute(node, local_dbconn, &query) < 0)
{
PQclear(res1);
- TERMINATE_QUERY_AND_ARCHIVE;
+ dstring_free(&query);
+ archive_terminate(node);
return 60;
}
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -5061,18 +4834,13 @@
*/
if (archive_dir)
{
- rc = close_log_archive();
+ rc = archive_close(node);
if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "Could not close out archive file %s - %s\n",
- node->no_id, archive_tmp, strerror(errno));
- return 60;
+ slon_retry();
- }
if (command_on_logarchive) {
char command[512];
- sprintf(command, "%s %s", command_on_logarchive, archive_name);
+ sprintf(command, "%s %s", command_on_logarchive, node->archive_name);
slon_log(SLON_INFO, "remoteWorkerThread_%d: Run Archive Command %s\n",
node->no_id, command);
system(command);
@@ -5760,7 +5528,7 @@
/* ----------
* Functions for processing log archives...
*
- * - First, you open the log archive using open_log_archive()
+ * - First, you open the log archive using archive_open()
*
* - Second, you generate the header using generate_archive_header()
*
@@ -5783,47 +5551,76 @@
/* ----------
- * open_log_archive
+ * archive_open
*
* Stores the archive name in archive_name (as .sql name) and
* archive_tmp (.tmp file)
* ----------
*/
-int
-open_log_archive(int node_id, char *seqbuf)
+static int
+archive_open(SlonNode *node, char *seqbuf)
{
int i;
+ int rc;
- sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id);
+ if (node->archive_name == NULL)
+ {
+ node->archive_name = malloc(SLON_MAX_PATH);
+ node->archive_temp = malloc(SLON_MAX_PATH);
+ if (node->archive_name == NULL || node->archive_temp == NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Out of memory in archive_open()\n",
+ node->no_id);
+ return -1;
+ }
+ }
+
+ sprintf(node->archive_name, "%s/slony1_log_%d_", archive_dir,
+ node->no_id);
for (i = strlen(seqbuf); i < 20; i++)
- strcat(archive_name, "0");
- strcat(archive_name, seqbuf);
- strcat(archive_name, ".sql");
- strcpy(archive_tmp, archive_name);
- strcat(archive_tmp, ".tmp");
- archive_fp = fopen(archive_tmp, "w");
- if (archive_fp == NULL)
+ strcat(node->archive_name, "0");
+ strcat(node->archive_name, seqbuf);
+ strcat(node->archive_name, ".sql");
+ strcpy(node->archive_temp, node->archive_name);
+ strcat(node->archive_temp, ".tmp");
+ node->archive_fp = fopen(node->archive_temp, "w");
+ if (node->archive_fp == NULL)
{
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot open archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
return -1;
}
- else
+
+ rc = fprintf(node->archive_fp,
+ "-- Slony-I log shipping archive\n"
+ "-- Node %d, Event %s\n"
+ "start transaction;\n",
+ node->no_id, seqbuf);
+ if (rc < 0)
{
- return 0;
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
+ return -1;
}
+
+ return 0;
}
/* ----------
- * close_log_archive
+ * archive_close
* ----------
*/
-int
-close_log_archive()
+static int
+archive_close(SlonNode *node)
{
int rc = 0;
if (archive_dir)
{
- rc = fprintf(archive_fp,
+ rc = fprintf(node->archive_fp,
"\n------------------------------------------------------------------\n"
"-- End Of Archive Log\n"
"------------------------------------------------------------------\n"
@@ -5831,116 +5628,166 @@
"vacuum analyze %s.sl_setsync_offline;\n",
rtcfg_namespace);
if (rc < 0)
+ {
+ archive_terminate(node);
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
return -1;
- rc = fclose(archive_fp);
- archive_fp = NULL;
+ }
+
+ rc = fclose(node->archive_fp);
+ node->archive_fp = NULL;
if (rc != 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot close archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
return -1;
- rc = rename(archive_tmp, archive_name);
}
- return rc;
+
+ rc = rename(node->archive_temp, node->archive_name);
+ if (rc != 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot rename archive file %s to %s - %s\n",
+ node->no_id, node->archive_temp, node->archive_name,
+ strerror(errno));
+ return -1;
+ }
+ }
+
+ return 0;
}
/* ----------
- * logarchive_tracking
+ * archive_terminate
* ----------
*/
-int
-logarchive_tracking(const char *namespace, int sub_set, const char *firstseq,
- const char *seqbuf, const char *timestamp)
+static void
+archive_terminate(SlonNode *node)
+{
+ if (node->archive_fp != NULL)
{
- return fprintf(archive_fp,
+ fclose(node->archive_fp);
+ node->archive_fp = NULL;
+ }
+}
+
+/* ----------
+ * archive_tracking
+ * ----------
+ */
+static int
+archive_tracking(SlonNode *node, const char *namespace, int sub_set,
+ const char *firstseq, const char *seqbuf,
+ const char *timestamp)
+{
+ int rc;
+
+ rc = fprintf(node->archive_fp,
"\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
"-- end of log archiving header\n"
"------------------------------------------------------------------\n"
"-- start of Slony-I data\n"
"------------------------------------------------------------------\n",
namespace, sub_set, firstseq, seqbuf, timestamp);
+ if (rc < 0)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
+ return -1;
}
-/* ----------
- * submit_query_to_archive
- * ----------
- */
-int
-submit_query_to_archive(SlonDString * ds)
-{
- return fprintf(archive_fp, "%s\n", ds->data);
+ return 0;
}
/* ----------
- * submit_string_to_archive
+ * archive_append_ds
* ----------
*/
-int
-submit_string_to_archive(const char *s)
+static int
+archive_append_ds(SlonNode *node, SlonDString * ds)
{
- return fprintf(archive_fp, "%s\n", s);
-}
+ int rc;
-#ifndef HAVE_PQPUTCOPYDATA
-/* ----------
- * submit_raw_data_to_archive
- *
- * Raw form used for COPY where we don't want any extra cr/lf output
- * ----------
- */
-int
-submit_raw_data_to_archive(const char *s)
+ rc = fprintf(node->archive_fp, "%s\n", dstring_data(ds));
+ if (rc < 0)
{
- return fprintf(archive_fp, "%s", s);
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
+ return -1;
+ }
+
+ return 0;
}
-#endif
/* ----------
- * terminate_log_archive
+ * archive_append_str
* ----------
*/
-void
-terminate_log_archive()
+static int
+archive_append_str(SlonNode *node, const char *s)
{
- if (archive_fp)
+ int rc;
+
+ rc = fprintf(node->archive_fp, "%s\n", s);
+ if (rc < 0)
{
- fclose(archive_fp);
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
+ return -1;
}
+
+ return 0;
}
/* ----------
- * generate_archive_header
+ * archive_append_data
+ *
+ * Raw form used for COPY where we don't want any extra cr/lf output
* ----------
*/
-int
-generate_archive_header(int node_id, const char *seqbuf)
+static int
+archive_append_data(SlonNode *node, const char *s, int len)
{
- return fprintf(archive_fp,
- "-- Slony-I log shipping archive\n"
- "-- Node %d, Event %s\n"
- "start transaction;\n",
- node_id, seqbuf);
+ int rc;
+
+ rc = fwrite(s, len, 1, node->archive_fp);
+ if (rc != 1)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - %s\n",
+ node->no_id, node->archive_temp, strerror(errno));
+ return -1;
+ }
+
+ return 0;
}
/* ----------
- * write_void_log
+ * archive_void_log
*
* writes out a "void" log consisting of the message which must either
* be a valid SQL query or a SQL comment.
* ----------
*/
-int
-write_void_log(int node_id, char *seqbuf, const char *message)
+static int
+archive_void_log(SlonNode *node, char *seqbuf, const char *message)
{
int rc;
- rc = open_log_archive(node_id, seqbuf);
- if (rc < 0)
- return rc;
- rc = generate_archive_header(node_id, seqbuf);
+ rc = archive_open(node, seqbuf);
if (rc < 0)
return rc;
- rc = submit_string_to_archive(message);
+ rc = archive_append_str(node, message);
if (rc < 0)
return rc;
- rc = close_log_archive();
+ rc = archive_close(node);
+
return rc;
}
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.59
retrieving revision 1.59.2.1
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.59 -r1.59.2.1
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -106,6 +106,10 @@
SlonWorkMsg *message_head;
SlonWorkMsg *message_tail;
+ char *archive_name;
+ char *archive_temp;
+ FILE *archive_fp;
+
SlonNode *prev;
SlonNode *next;
};
- Previous message: [Slony1-commit] By wieck: Fixed archive log writing.
- Next message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list