Wed Oct 26 22:46:02 PDT 2005
- Previous message: [Slony1-commit] By cbbrowne: Fix indentation - slony1_dump
- Next message: [Slony1-commit] By cbbrowne: The query looking at copyFields() return codes was looking
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
On first SYNC, need to compress log_actionseq not in (possibly huge list)
into a set of "not (log_actionseq between a and b or log_actionseq between c and d ...)"
If the SUBSCRIBE_SET event runs for a very long time, there can be a whole lot
of events on the go between the time it starts and that first SYNC. If so,
the list of log_actionseq values can consist of thousands if not greater
orders of magnitude of items.
compress_actionseq() takes the "huge list" (a string of comma delimited values)
and has a state machine to parse out the numbers, in order.
Each number is compared to the current 'range'...
- If it's inside or extends a range, we continue working on a "between a and b" clause
- The value might be a singleton, thus adding a "log_actionseq <> x" clause
What generally happens if the event runs for a long time is that there will
be some really enormous sequences of consecutive values. Happily, the
new function shrinks that into "between a and b" (where b = a + 50000), which
means we have a query that's 4K in size rather than 40MB in size.
Hannu Krosing added logic at the end that handles the case where there
was NOTHING in the set of log_actionseq values.
Modified Files:
--------------
slony1-engine/src/slon:
remote_worker.c (r1.90 -> r1.91)
-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.90
retrieving revision 1.91
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.90 -r1.91
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -256,6 +256,8 @@
const char *seqbuf, const char *timestamp);
static int write_void_log (int node_id, char *seqbuf, const char *message);
+static void compress_actionseq (const char *ssy_actionseq, SlonDString *action_subquery);
+
#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
/*
@@ -3726,6 +3728,9 @@
SlonDString new_qual;
SlonDString query;
SlonDString *provider_qual;
+ SlonDString actionseq_subquery;
+
+ int actionlist_len;
gettimeofday(&tv_start, NULL);
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT
@@ -4051,12 +4056,20 @@
slon_appendquery(provider_qual,
"(log_xid >= '%s')",
ssy_maxxid);
- if (strlen(ssy_action_list) != 0)
- slon_appendquery(provider_qual,
- " and log_actionseq not in (%s)\n) ",
- ssy_action_list);
- else
+ actionlist_len = strlen(ssy_action_list);
+ slon_log(SLON_DEBUG2, " ssy_action_list value: %s length: %d\n",
+ ssy_action_list, actionlist_len);
+
+ if (actionlist_len == 0) {
slon_appendquery(provider_qual, "\n) ");
+ } else {
+ dstring_init(&actionseq_subquery);
+ compress_actionseq(ssy_action_list, &actionseq_subquery);
+ slon_appendquery(provider_qual,
+ " and (%s)\n) ",
+ dstring_data(&actionseq_subquery));
+ dstring_free(&actionseq_subquery);
+ }
PQclear(res2);
@@ -5024,3 +5037,215 @@
rc = close_log_archive();
return rc;
}
+
+/* given a string consisting of a list of actionseq values, return a
+ string that compresses this into a set of log_actionseq ranges
+
+ Thus, "'13455','13456','13457','13458','13459','13460','13462'"
+ compresses into...
+
+ log_actionseq not between '13455' and '13460' and
+ log_actionseq <> '13462'
+
+ There is an expectation that the actionseq values are being
+ returned more or less in order; if that is even somewhat loosely
+ the case, this will lead to a pretty spectacular compression of
+ values if the SUBSCRIBE_SET runs for a long time thereby leading to
+ there being Really A Lot of log entries to exclude. */
+
+#define START_STATE 1
+#define COLLECTING_DIGITS 2
+#define BETWEEN_NUMBERS 3
+#define DONE 4
+
+#define MINMAXINITIAL -1
+
+void compress_actionseq (const char *ssy_actionlist, SlonDString *action_subquery) {
+ int state;
+ int curr_number, curr_min, curr_max;
+ int curr_digit;
+ int first_subquery;
+ char curr_char;
+ curr_min = MINMAXINITIAL;
+ curr_max = MINMAXINITIAL;
+ first_subquery = 1;
+ state = START_STATE;
+ slon_mkquery(action_subquery, " ");
+
+ slon_log(SLON_DEBUG3, "compress_actionseq(list,subquery) Action list: %s\n", ssy_actionlist);
+ while (state != DONE) {
+ curr_char = *ssy_actionlist;
+ switch (curr_char) {
+ case '\0':
+ state = DONE;
+ break;
+ case '0':
+ curr_digit = 0;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '1':
+ curr_digit = 1;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '2':
+ curr_digit = 2;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '3':
+ curr_digit = 3;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '4':
+ curr_digit = 4;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '5':
+ curr_digit = 5;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '6':
+ curr_digit = 6;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '7':
+ curr_digit = 7;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '8':
+ curr_digit = 8;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '9':
+ curr_digit = 9;
+ if (state == COLLECTING_DIGITS) {
+ curr_number = curr_number * 10 + curr_digit;
+ } else {
+ state = COLLECTING_DIGITS;
+ curr_number = curr_digit;
+ }
+ break;
+ case '\'':
+ case ',':
+ if (state == COLLECTING_DIGITS) {
+ /* Finished another number... Fold it into the ranges... */
+ slon_log(SLON_DEBUG4, "Finished number: %d\n", curr_number);
+ /* If we haven't a range, then the range is the current
+ number */
+ if (curr_min == MINMAXINITIAL) {
+ curr_min = curr_number;
+ }
+ if (curr_max == MINMAXINITIAL) {
+ curr_max = curr_number;
+ }
+ /* If the number pushes the range outwards by 1,
+ then shift the range by 1... */
+ if (curr_number == curr_min - 1) {
+ curr_min --;
+ }
+ if (curr_number == curr_max + 1) {
+ curr_max ++;
+ }
+
+ /* If the number is inside the range, do nothing */
+ if ((curr_number >= curr_min) && (curr_number <= curr_max)) {
+ /* Do nothing - inside the range */
+ }
+
+ /* If the number is outside the range, then
+ generate a subquery based on the range, and
+ have the new number become the new range */
+ if ((curr_number < curr_min - 1) || (curr_number > curr_max + 1)) {
+ if (first_subquery) {
+ first_subquery = 0;
+ } else {
+ slon_appendquery(action_subquery, " and ");
+ }
+ if (curr_max == curr_min) {
+ slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max);
+ slon_appendquery(action_subquery,
+ " log_actionseq <> '%d' ", curr_max);
+ } else {
+ slon_log(SLON_DEBUG4, "between entry - %d %d\n",
+ curr_min, curr_max);
+ slon_appendquery(action_subquery,
+ " log_actionseq not between '%d' and '%d' ",
+ curr_min, curr_max);
+ }
+ curr_min = curr_number;
+ curr_max = curr_number;
+ }
+ }
+ state = BETWEEN_NUMBERS;
+ curr_number = 0;
+
+ }
+ ssy_actionlist++;
+ }
+ /* process last range, if it exists */
+ if (curr_min || curr_max) {
+ if (first_subquery) {
+ first_subquery = 0;
+ } else {
+ slon_appendquery(action_subquery, " and ");
+ }
+ if (curr_max == curr_min) {
+ slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max);
+ slon_appendquery(action_subquery,
+ " log_actionseq <> '%d' ", curr_max);
+ } else {
+ slon_log(SLON_DEBUG4, "between entry - %d %d\n",
+ curr_min, curr_max);
+ slon_appendquery(action_subquery,
+ " log_actionseq not between '%d' and '%d' ",
+ curr_min, curr_max);
+ }
+
+
+ }
+ slon_log(SLON_DEBUG3, " compressed actionseq subquery... %s\n", dstring_data(action_subquery));
+}
- Previous message: [Slony1-commit] By cbbrowne: Fix indentation - slony1_dump
- Next message: [Slony1-commit] By cbbrowne: The query looking at copyFields() return codes was looking
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list