CVS User Account cvsuser
Wed Oct 26 22:46:02 PDT 2005
Log Message:
-----------
On first SYNC, need to compress log_actionseq not in (possibly huge list)
into a set of "not (log_actionseq between a and b or log_actionseq between c and d ...)"

If the SUBSCRIBE_SET event runs for a very long time, there can be a whole lot
of events on the go between the time it starts and that first SYNC.  If so,
the list of log_actionseq values can consist of thousands if not greater
orders of magnitude of items.

compress_actionseq() takes the "huge list" (a string of comma delimited values)
and has a state machine to parse out the numbers, in order.

Each number is compared to the current 'range'...
- If it's inside or extends a range, we continue working on a "between a and b" clause
- The value might be a singleton, thus adding a "log_actionseq <> x" clause

What generally happens if the event runs for a long time is that there will
be some really enormous sequences of consecutive values.  Happily, the
new function shrinks that into "between a and b" (where b = a + 50000), which
means we have a query that's 4K in size rather than 40MB in size.

Hannu Krosing added logic at the end that handles the case where there
was NOTHING in the set of log_actionseq values.

Modified Files:
--------------
    slony1-engine/src/slon:
        remote_worker.c (r1.90 -> r1.91)

-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.90
retrieving revision 1.91
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.90 -r1.91
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -256,6 +256,8 @@
 				const char *seqbuf, const char *timestamp);
 static int write_void_log (int node_id, char *seqbuf, const char *message);
 
+static void compress_actionseq (const char *ssy_actionseq, SlonDString *action_subquery);
+
 #define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
 
 /*
@@ -3726,6 +3728,9 @@
 	SlonDString new_qual;
 	SlonDString query;
 	SlonDString *provider_qual;
+	SlonDString actionseq_subquery;
+
+	int actionlist_len;
 
 	gettimeofday(&tv_start, NULL);
 	slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT
@@ -4051,12 +4056,20 @@
 				slon_appendquery(provider_qual,
 								 "(log_xid >= '%s')",
 								 ssy_maxxid);
-			if (strlen(ssy_action_list) != 0)
-				slon_appendquery(provider_qual,
-								 " and log_actionseq not in (%s)\n) ",
-								 ssy_action_list);
-			else
+			actionlist_len = strlen(ssy_action_list);
+			slon_log(SLON_DEBUG2, " ssy_action_list value: %s length: %d\n",  
+				 ssy_action_list, actionlist_len);
+
+			if (actionlist_len == 0) {
 				slon_appendquery(provider_qual, "\n) ");
+			} else {
+				dstring_init(&actionseq_subquery);
+				compress_actionseq(ssy_action_list, &actionseq_subquery);
+				slon_appendquery(provider_qual,
+						 " and (%s)\n) ",
+						 dstring_data(&actionseq_subquery));
+				dstring_free(&actionseq_subquery);
+			}
 
 			PQclear(res2);
 
@@ -5024,3 +5037,215 @@
 	rc = close_log_archive();
 	return rc;
 }
+ 
+/* given a string consisting of a list of actionseq values, return a
+   string that compresses this into a set of log_actionseq ranges
+
+   Thus, "'13455','13456','13457','13458','13459','13460','13462'"
+   compresses into...
+
+   log_actionseq not between '13455' and '13460' and
+   log_actionseq <> '13462'
+
+   There is an expectation that the actionseq values are being
+   returned more or less in order; if that is even somewhat loosely
+   the case, this will lead to a pretty spectacular compression of
+   values if the SUBSCRIBE_SET runs for a long time thereby leading to
+   there being Really A Lot of log entries to exclude. */
+
+#define START_STATE 1
+#define COLLECTING_DIGITS 2
+#define BETWEEN_NUMBERS 3
+#define DONE 4
+
+#define MINMAXINITIAL -1
+
+void compress_actionseq (const char *ssy_actionlist, SlonDString *action_subquery) {
+	int state;
+	int curr_number, curr_min, curr_max;
+	int curr_digit;
+	int first_subquery;
+	char curr_char;
+	curr_min = MINMAXINITIAL;
+	curr_max = MINMAXINITIAL;
+	first_subquery = 1;
+	state = START_STATE;
+	slon_mkquery(action_subquery, " ");
+	
+	slon_log(SLON_DEBUG3, "compress_actionseq(list,subquery) Action list: %s\n", ssy_actionlist);
+	while (state != DONE) {
+		curr_char = *ssy_actionlist;
+		switch (curr_char) {
+		case '\0':
+			state = DONE;
+			break;
+		case '0':
+			curr_digit = 0;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '1':
+			curr_digit = 1;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '2':
+			curr_digit = 2;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '3':
+			curr_digit = 3;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '4':
+			curr_digit = 4;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '5':
+			curr_digit = 5;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '6':
+			curr_digit = 6;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '7':
+			curr_digit = 7;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '8':
+			curr_digit = 8;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '9':
+			curr_digit = 9;
+			if (state == COLLECTING_DIGITS) {
+				curr_number = curr_number * 10 + curr_digit;
+			} else {
+				state = COLLECTING_DIGITS;
+				curr_number = curr_digit;
+			}
+			break;
+		case '\'':
+		case ',':
+			if (state == COLLECTING_DIGITS) {
+				/* Finished another number... Fold it into the ranges... */
+  				slon_log(SLON_DEBUG4, "Finished number: %d\n", curr_number);
+				/* If we haven't a range, then the range is the current
+				   number */
+				if (curr_min == MINMAXINITIAL) {
+					curr_min = curr_number;
+				}
+				if (curr_max == MINMAXINITIAL) {
+					curr_max = curr_number;
+				}
+				/* If the number pushes the range outwards by 1,
+				   then shift the range by 1... */
+				if (curr_number == curr_min - 1) {
+					curr_min --;
+				} 
+				if (curr_number == curr_max + 1) {
+					curr_max ++;
+				}
+
+				/* If the number is inside the range, do nothing */
+				if ((curr_number >= curr_min) && (curr_number <= curr_max)) {
+					/* Do nothing - inside the range */
+				}
+
+				/* If the number is outside the range, then
+				   generate a subquery based on the range, and
+				   have the new number become the new range */
+				if ((curr_number < curr_min - 1) || (curr_number > curr_max + 1)) {
+					if (first_subquery) {
+						first_subquery = 0;
+					} else {
+						slon_appendquery(action_subquery, " and ");
+					}
+					if (curr_max == curr_min) {
+ 						slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max); 
+						slon_appendquery(action_subquery, 
+								 " log_actionseq <> '%d' ", curr_max);
+					} else {
+  						slon_log(SLON_DEBUG4, "between entry - %d %d\n", 
+  							 curr_min, curr_max);
+						slon_appendquery(action_subquery,
+								 " log_actionseq not between '%d' and '%d' ",
+								 curr_min, curr_max);
+					}
+					curr_min = curr_number;
+					curr_max = curr_number;
+				}
+			}
+			state = BETWEEN_NUMBERS;
+			curr_number = 0;
+
+		}
+		ssy_actionlist++;
+	}
+	/* process last range, if it exists */
+	if (curr_min || curr_max) {
+		if (first_subquery) {
+			first_subquery = 0;
+		} else {
+			slon_appendquery(action_subquery, " and ");
+		}
+		if (curr_max == curr_min) {
+			slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max);
+			slon_appendquery(action_subquery,
+					 " log_actionseq <> '%d' ", curr_max);
+		} else {
+			slon_log(SLON_DEBUG4, "between entry - %d %d\n",
+				 curr_min, curr_max);
+			slon_appendquery(action_subquery,
+					 " log_actionseq not between '%d' and '%d' ",
+					 curr_min, curr_max);
+		}
+		
+
+	}
+	slon_log(SLON_DEBUG3, " compressed actionseq subquery... %s\n", dstring_data(action_subquery));
+}


More information about the Slony1-commit mailing list