Mon Aug 20 11:22:28 PDT 2007
- Previous message: [Slony1-commit] slony1-engine/tools slony1_dump.sh
- Next message: [Slony1-commit] slony1-engine/tests/testlogship ddl_updates.sql generate_dml.sh init_data.sql
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv10238/src/slon
Modified Files:
remote_worker.c
Log Message:
Forward patching the new archive tracking into HEAD
Jan
Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.150
retrieving revision 1.151
diff -C2 -d -r1.150 -r1.151
*** remote_worker.c 29 Jul 2007 17:38:23 -0000 1.150
--- remote_worker.c 20 Aug 2007 18:22:26 -0000 1.151
***************
*** 260,265 ****
static int archive_append_data(SlonNode *node, const char *s, int len);
static int archive_tracking(SlonNode *node, const char *namespace,
! int sub_set, const char *firstseq,
! const char *seqbuf, const char *timestamp);
--- 260,264 ----
static int archive_append_data(SlonNode *node, const char *s, int len);
static int archive_tracking(SlonNode *node, const char *namespace,
! PGconn *local_dbconn);
***************
*** 691,705 ****
slon_retry();
! for (pinfo=wd->provider_head; pinfo != NULL; pinfo = pinfo->next)
! {
! for(pset = pinfo->set_head; pset != NULL; pset = pset->next)
! {
! if (archive_tracking(node, rtcfg_namespace,
! pset->set_id, pset->ssy_seqno, seqbuf,
! event->ev_timestamp_c) < 0)
! slon_retry();
! strcpy(pset->ssy_seqno, seqbuf);
! }
! }
/*
--- 690,696 ----
slon_retry();
! if (archive_tracking(node, rtcfg_namespace,
! local_dbconn) < 0)
! slon_retry();
/*
***************
*** 875,891 ****
"select %s.dropSet_int(%d); ",
rtcfg_namespace, set_id);
-
- /*
- * The table deleted needs to be dropped from log shipping too
- */
- if (archive_dir)
- {
- slon_mkquery(&lsquery,
- "delete from %s.sl_setsync_offline "
- " where ssy_setid= %d;",
- rtcfg_namespace, set_id);
- if (archive_append_ds(node, &lsquery) < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "MERGE_SET") == 0)
--- 866,869 ----
***************
*** 901,917 ****
set_id, add_id);
- /*
- * Log shipping gets the change here that we need to delete
- * the table being merged from the set being maintained.
- */
- if (archive_dir)
- {
- slon_mkquery(&lsquery,
- "delete from %s.sl_setsync_offline "
- " where ssy_setid= %d;",
- rtcfg_namespace, add_id);
- if (archive_append_ds(node, &lsquery) < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
--- 879,882 ----
***************
*** 1281,1293 ****
need_reloadListen = true;
- if (archive_dir)
- {
- (void) slon_mkquery(&lsquery,
- "delete from %s.sl_setsync_offline "
- " where ssy_setid= %d;",
- rtcfg_namespace, sub_set);
- if (archive_append_ds(node, &lsquery) < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
--- 1246,1249 ----
***************
*** 3424,3450 ****
return -1;
}
- if (archive_dir)
- {
- (void) slon_mkquery(&lsquery,
- "insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) "
- "values ('%d', '%s');",
- rtcfg_namespace, set_id, ssy_seqno);
- rc = archive_append_ds(node, &lsquery);
- if (rc < 0)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- " could not insert to sl_setsync_offline",
- node->no_id);
- PQclear(res1);
- slon_disconnectdb(pro_conn);
- dstring_free(&query1);
- dstring_free(&query2);
- dstring_free(&query3);
- dstring_free(&lsquery);
- dstring_free(&indexregenquery);
- archive_terminate(node);
- return -1;
- }
- }
PQclear(res1);
gettimeofday(&tv_now, NULL);
--- 3380,3383 ----
***************
*** 3896,3932 ****
PQclear(res2);
-
- /*
- * Add a call to the setsync tracking function to the archive log.
- * This function ensures that all archive log files are applied in
- * the right order.
- */
- if (archive_dir)
- {
- for (pset = provider->set_head; pset; pset = pset->next)
- if (pset->set_id == sub_set) break;
- if (pset == NULL)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "set %d not found in runtime config\n",
- node->no_id, sub_set);
- slon_retry();
- }
-
- slon_log(SLON_DEBUG2, "writing archive log...\n");
- fflush(stderr);
- fflush(stdout);
- rc = archive_tracking(node, rtcfg_namespace, sub_set,
- pset->ssy_seqno, seqbuf,
- event->ev_timestamp_c);
- if (rc < 0)
- slon_retry();
-
- strcpy(pset->ssy_seqno, seqbuf);
- }
}
PQclear(res1);
/*
* We didn't add anything good in the provider clause. That shouldn't
* be!
--- 3829,3848 ----
PQclear(res2);
}
PQclear(res1);
/*
+ * Add a call to the archive tracking function to the archive log.
+ * This function ensures that all archive log files are applied in
+ * the right order.
+ */
+ if (archive_dir)
+ {
+ rc = archive_tracking(node, rtcfg_namespace, local_dbconn);
+ if (rc < 0)
+ slon_retry();
+ }
+
+ /*
* We didn't add anything good in the provider clause. That shouldn't
* be!
***************
*** 5167,5171 ****
"------------------------------------------------------------------\n"
"commit;\n"
! "vacuum analyze %s.sl_setsync_offline;\n",
rtcfg_namespace);
if (rc < 0)
--- 5083,5087 ----
"------------------------------------------------------------------\n"
"commit;\n"
! "vacuum analyze %s.sl_archive_tracking;\n",
rtcfg_namespace);
if (rc < 0)
***************
*** 5220,5232 ****
*/
static int
! archive_tracking(SlonNode *node, const char *namespace, int sub_set,
! const char *firstseq, const char *seqbuf,
! const char *timestamp)
{
! int rc;
if (!archive_dir)
return 0;
if (node->archive_fp == NULL)
{
--- 5136,5178 ----
*/
static int
! archive_tracking(SlonNode *node, const char *namespace,
! PGconn *local_dbconn)
{
! SlonDString query;
! PGresult *res;
! int rc;
if (!archive_dir)
return 0;
+ dstring_init(&query);
+ slon_mkquery(&query,
+ "update %s.sl_archive_counter "
+ " set ac_num = ac_num + 1, "
+ " ac_timestamp = CURRENT_TIMESTAMP; "
+ "select ac_num, ac_timestamp from %s.sl_archive_counter; ",
+ namespace, namespace);
+ res = PQexec(local_dbconn, dstring_data(&query));
+ if ((rc = PQresultStatus(res)) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR,
+ "remoteWorkerThread_%d: \"%s\" %s %s\n",
+ node->no_id, dstring_data(&query),
+ PQresStatus(rc),
+ PQresultErrorMessage(res));
+ PQclear(res);
+ dstring_free(&query);
+ return -1;
+ }
+ if ((rc = PQntuples(res)) != 1)
+ {
+ slon_log(SLON_ERROR,
+ "remoteWorkerThread_%d: expected 1 row in sl_archive_counter - found %d",
+ node->no_id, rc);
+ PQclear(res);
+ dstring_free(&query);
+ return -1;
+ }
+
if (node->archive_fp == NULL)
{
***************
*** 5234,5247 ****
"Cannot write to archive file %s - not open\n",
node->no_id, node->archive_temp);
return -1;
}
rc = fprintf(node->archive_fp,
! "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
"-- end of log archiving header\n"
"------------------------------------------------------------------\n"
"-- start of Slony-I data\n"
"------------------------------------------------------------------\n",
! namespace, sub_set, firstseq, seqbuf, timestamp);
if (rc < 0)
{
--- 5180,5197 ----
"Cannot write to archive file %s - not open\n",
node->no_id, node->archive_temp);
+ PQclear(res);
+ dstring_free(&query);
return -1;
}
rc = fprintf(node->archive_fp,
! "\nselect %s.archiveTracking_offline('%s', '%s');\n"
"-- end of log archiving header\n"
"------------------------------------------------------------------\n"
"-- start of Slony-I data\n"
"------------------------------------------------------------------\n",
! namespace, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 1));
! PQclear(res);
! dstring_free(&query);
if (rc < 0)
{
- Previous message: [Slony1-commit] slony1-engine/tools slony1_dump.sh
- Next message: [Slony1-commit] slony1-engine/tests/testlogship ddl_updates.sql generate_dml.sh init_data.sql
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list