Wed Dec 7 03:52:44 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Next message: [Slony1-commit] By dpage: Comment out redundant query parameter and fix query so it
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Fix for the exzessive memory allocation problem when replicating
data with large attributes.
New config options
sync_max_rowsize (default 8k)
sync_max_largemem (default 5M)
Slon will try to keep the memory allocation "per provider" within
500 x sync_max_rowsize ... 500 x sync_max_rowsize + sync_max_largemem.
With the default settings, this means 5-10 MB.
Jan
Modified Files:
--------------
slony1-engine/share:
slon.conf-sample (r1.3 -> r1.4)
slony1-engine/src/slon:
confoptions.h (r1.26 -> r1.27)
remote_worker.c (r1.102 -> r1.103)
slon.h (r1.56 -> r1.57)
-------------- next part --------------
Index: slon.conf-sample
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/share/slon.conf-sample,v
retrieving revision 1.3
retrieving revision 1.4
diff -Lshare/slon.conf-sample -Lshare/slon.conf-sample -u -w -r1.3 -r1.4
--- share/slon.conf-sample
+++ share/slon.conf-sample
@@ -31,6 +31,20 @@
# Range: [0,100], default: 6
#sync_group_maxsize=6
+# Size above which an sl_log_? row's log_cmddata is considered large.
+# Up to 500 rows of this size are allowed in memory at once. Rows larger
+# than that count into the sync_max_largemem space allocated and free'd
+# on demand.
+# Range: [1024,32768], default: 8192
+#sync_max_rowsize=8192
+
+# Maximum amount of memory allowed for large rows. Note that the algorithm
+# will stop fetching rows AFTER this amount is exceeded, not BEFORE. This
+# is done to ensure that a single row exceeding this limit alone does not
+# stall replication.
+# Range: [1048576,1073741824], default: 5242880
+#sync_max_largemem=5242880
+
# If this parameter is 1, messages go both to syslog and the standard
# output. A value of 2 sends output only to syslog (some messages will
# still go to the standard output/error). The default is 0, which means
Index: confoptions.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v
retrieving revision 1.26
retrieving revision 1.27
diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.26 -r1.27
--- src/slon/confoptions.h
+++ src/slon/confoptions.h
@@ -21,6 +21,8 @@
extern int slon_log_level;
extern int sync_interval;
extern int sync_interval_timeout;
+extern int sync_max_rowsize;
+extern int sync_max_largemem;
extern int sync_group_maxsize;
extern int desired_sync_time;
@@ -218,6 +220,30 @@
0,
2147483647
},
+ {
+ {
+ (const char *)"sync_max_rowsize", /* conf name */
+ gettext_noop("sl_log_? rows larger than that are read separately"), /* short desc */
+ gettext_noop("sl_log_? rows larger than that are read separately"), /* long desc */
+ SLON_C_INT /* config type */
+ },
+ &sync_max_rowsize, /* var name */
+ 8192, /* default val */
+ 1024, /* min val */
+ 32768 /* max val */
+ },
+ {
+ {
+ (const char *)"sync_max_largemem", /* conf name */
+ gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"), /* short desc */
+ gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"), /* long desc */
+ SLON_C_INT /* config type */
+ },
+ &sync_max_largemem, /* var name */
+ 5242880, /* default val */
+ 1048576, /* min val */
+ 1073741824 /* max val */
+ },
{0}
};
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.102
retrieving revision 1.103
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.102 -r1.103
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -171,6 +171,7 @@
pthread_mutex_t workdata_lock;
WorkGroupStatus workgroup_status;
+ int workdata_largemem;
pthread_cond_t repldata_cond;
WorkerGroupLine *repldata_head;
@@ -188,6 +189,7 @@
ProviderInfo *provider;
SlonDString data;
SlonDString log;
+ int line_largemem;
WorkerGroupLine *prev;
WorkerGroupLine *next;
@@ -212,6 +214,8 @@
pthread_mutex_t node_confirm_lock = PTHREAD_MUTEX_INITIALIZER;
int sync_group_maxsize;
+int sync_max_rowsize;
+int sync_max_largemem;
int last_sync_group_size;
int next_sync_group_size;
@@ -304,6 +308,7 @@
pthread_mutex_lock(&(wd->workdata_lock));
wd->workgroup_status = SLON_WG_IDLE;
wd->node = node;
+ wd->workdata_largemem = 0;
wd->tab_fqname_size = SLON_MAX_PATH;
wd->tab_fqname = (char **)malloc(sizeof(char *) * wd->tab_fqname_size);
@@ -1700,6 +1705,7 @@
line = (WorkerGroupLine *) malloc(sizeof(WorkerGroupLine));
memset(line, 0, sizeof(WorkerGroupLine));
+ line->line_largemem = 0;
dstring_init(&(line->data));
dstring_init(&(line->log));
DLLIST_ADD_TAIL(wd->linepool_head, wd->linepool_tail,
@@ -4533,8 +4539,26 @@
for (wgline = lines_head; wgline; wgline = wgnext)
{
wgnext = wgline->next;
+ if (wgline->line_largemem > 0)
+ {
+ /*
+ * Really free the lines that contained large rows
+ */
+ dstring_free(&(wgline->data));
+ dstring_free(&(wgline->log));
+ dstring_init(&(wgline->data));
+ dstring_init(&(wgline->log));
+ wd->workdata_largemem -= wgline->line_largemem;
+ wgline->line_largemem = 0;
+ }
+ else
+ {
+ /*
+ * just reset (and allow to grow further) the small ones
+ */
dstring_reset(&(wgline->data));
dstring_reset(&(wgline->log));
+ }
DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, wgline);
}
if (num_errors == 1)
@@ -4783,6 +4807,7 @@
PGconn *dbconn;
WorkerGroupLine *line = NULL;
SlonDString query;
+ SlonDString query2;
int errors;
int alloc_lines = 0;
struct timeval tv_start;
@@ -4795,6 +4820,7 @@
int data_line_last;
PGresult *res;
+ PGresult *res2;
int ntuples;
int tupno;
@@ -4802,6 +4828,7 @@
int line_ncmds;
dstring_init(&query);
+ dstring_init(&query2);
for (;;)
{
@@ -4862,8 +4889,13 @@
slon_mkquery(&query,
"declare LOG cursor for select "
" log_origin, log_xid, log_tableid, "
- " log_actionseq, log_cmdtype, log_cmddata "
+ " log_actionseq, log_cmdtype, "
+ " octet_length(log_cmddata), "
+ " case when octet_length(log_cmddata) <= %d "
+ " then log_cmddata "
+ " else null end "
"from %s.sl_log_1 %s order by log_actionseq; ",
+ sync_max_rowsize,
rtcfg_namespace,
dstring_data(&(provider->helper_qualification)));
@@ -4926,19 +4958,20 @@
* have available line buffers.
*/
pthread_mutex_lock(&(wd->workdata_lock));
- if (data_line_alloc == 0 /* || oversize */)
+ if (data_line_alloc == 0 ||
+ wd->workdata_largemem > sync_max_largemem)
{
/*
* First make sure that the overall memory usage is
* inside bouds.
*/
- if (0 /* oversize */)
+ if (wd->workdata_largemem > sync_max_largemem)
{
slon_log(SLON_DEBUG4,
"remoteHelperThread_%d_%d: wait for oversize memory to free\n",
node->no_id, provider->no_id);
- while (/* oversize && */
+ while (wd->workdata_largemem > sync_max_largemem &&
wd->workgroup_status == SLON_WG_BUSY)
{
pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock));
@@ -5064,10 +5097,49 @@
NULL, 10);
char *log_actionseq = PQgetvalue(res, tupno, 3);
char *log_cmdtype = PQgetvalue(res, tupno, 4);
- char *log_cmddata = PQgetvalue(res, tupno, 5);
+ int log_cmdsize = strtol(PQgetvalue(res, tupno, 5),
+ NULL, 10);
+ char *log_cmddata = PQgetvalue(res, tupno, 6);
+ int largemem = 0;
tupno++;
+ if (log_cmdsize >= sync_max_rowsize)
+ {
+ slon_mkquery(&query2,
+ "select log_cmddata "
+ "from %s.sl_log_1 "
+ "where log_origin = '%s' "
+ " and log_xid = '%s' "
+ " and log_actionseq = '%s'",
+ rtcfg_namespace,
+ log_origin, log_xid, log_actionseq);
+ res2 = PQexec(dbconn, dstring_data(&query2));
+ if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s",
+ node->no_id, provider->no_id,
+ dstring_data(&query),
+ PQresultErrorMessage(res2));
+ PQclear(res2);
+ errors++;
+ break;
+ }
+ if (PQntuples(res2) != 1)
+ {
+ slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: large log_cmddata for actionseq %s not found\n",
+ node->no_id, provider->no_id,
+ dstring_data(&query),
+ log_actionseq);
+ PQclear(res2);
+ errors++;
+ break;
+ }
+
+ log_cmddata = PQgetvalue(res2, 0, 0);
+ largemem = log_cmdsize;
+ }
+
/*
* This can happen if the table belongs to a set that
* already has a better sync status than the event we're
@@ -5093,11 +5165,13 @@
rtcfg_namespace,
log_origin, log_xid, log_tableid,
log_actionseq, log_cmdtype, log_cmddata);
+ largemem *= 2;
}
/*
* Add the actual replicating command to the line buffer
*/
+ line->line_largemem += largemem;
switch (*log_cmdtype)
{
case 'I':
@@ -5137,6 +5211,30 @@
line_ncmds = 0;
}
+
+ /*
+ * If this was a large log_cmddata entry
+ * (> sync_max_rowsize), add this to the memory
+ * usage of the workgroup and check if we are
+ * exceeding limits.
+ */
+ if (largemem > 0)
+ {
+ PQclear(res2);
+ pthread_mutex_lock(&(wd->workdata_lock));
+ wd->workdata_largemem += largemem;
+ if (wd->workdata_largemem >= sync_max_largemem)
+ {
+ /*
+ * This is it ... we exit the loop here
+ * and wait for the worker to apply enough
+ * of the large rows first.
+ */
+ pthread_mutex_unlock(&(wd->workdata_lock));
+ break;
+ }
+ pthread_mutex_unlock(&(wd->workdata_lock));
+ }
}
/*
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.56
retrieving revision 1.57
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.56 -r1.57
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -509,6 +509,8 @@
* ----------
*/
extern int sync_group_maxsize;
+extern int sync_max_rowsize;
+extern int sync_max_largemem;
/* ----------
- Previous message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Next message: [Slony1-commit] By dpage: Comment out redundant query parameter and fix query so it
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list