Sat Nov 13 04:53:32 PST 2004
- Previous message: [Slony1-commit] By cbbrowne: Make "ducttape" scripts that use pgbench aware of varying
- Next message: [Slony1-general] Re: [Slony1-commit] By cbbrowne: Add FUNCTION generate_sync_event () - to try to add SYNCs
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Here is a first full functional and activated rebuiltListenEntries().
Any change to sl_node, sl_path and sl_subscribe causes slon to reload
the listen configuration. The logic is slightly different from what
was first discussed and probably not in its final state. But it passes
various configurations without any need to restart slon.
Jan
Modified Files:
--------------
slony1-engine/src/backend:
slony1_funcs.sql (r1.41 -> r1.42)
slony1-engine/src/ducttape:
test_1_pgbench (r1.16 -> r1.17)
test_2_pgbench (r1.9 -> r1.10)
slony1-engine/src/slon:
local_listen.c (r1.26 -> r1.27)
remote_worker.c (r1.65 -> r1.66)
runtime_config.c (r1.21 -> r1.22)
slon.c (r1.36 -> r1.37)
slon.h (r1.40 -> r1.41)
-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.41
retrieving revision 1.42
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.41 -r1.42
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -814,6 +814,7 @@
-- Note that the following code should all become obsolete in the wake
-- of the availability of RebuildListenEntries()...
+if false then
-- ----
-- Let every node that listens for something on the failed node
-- listen for that on the backup node instead.
@@ -846,6 +847,7 @@
delete from @NAMESPACE at .sl_listen
where li_provider = p_failed_node
or li_receiver = p_failed_node;
+end if;
-- ----
-- Move the sets
@@ -1360,6 +1362,8 @@
p_provider alias for $2;
p_receiver alias for $3;
begin
+ return -1;
+
perform @NAMESPACE at .storeListen_int (p_origin, p_provider, p_receiver);
return @NAMESPACE at .createEvent (''_ at CLUSTERNAME@'', ''STORE_LISTEN'',
p_origin, p_provider, p_receiver);
@@ -1447,6 +1451,8 @@
p_li_provider alias for $2;
p_li_receiver alias for $3;
begin
+ return -1;
+
perform @NAMESPACE at .dropListen_int(p_li_origin,
p_li_provider, p_li_receiver);
@@ -1473,6 +1479,8 @@
p_li_provider alias for $2;
p_li_receiver alias for $3;
begin
+ return -1;
+
-- ----
-- Grab the central configuration lock
-- ----
@@ -3940,6 +3948,21 @@
-- The real work is done in the replication engine. All
-- we have to do here is remembering that it happened.
-- ----
+
+ -- ----
+ -- Well, not only ... we might be missing an important event here
+ -- ----
+ if not exists (select true from @NAMESPACE at .sl_path
+ where pa_server = p_sub_provider
+ and pa_client = p_sub_receiver)
+ then
+ insert into @NAMESPACE at .sl_path
+ (pa_server, pa_client, pa_conninfo, pa_connretry)
+ values
+ (p_sub_provider, p_sub_receiver,
+ ''<event pending>'', 10);
+ end if;
+
update @NAMESPACE at .sl_subscribe
set sub_active = ''t''
where sub_set = p_sub_set
@@ -4498,7 +4521,7 @@
-- ----------------------------------------------------------------------
--- FUNCTION RebuildListenEntries (provider, receiver)
+-- FUNCTION RebuildListenEntries ()
--
-- Revises sl_listen rules based on contents of sl_path and
-- sl_subscribe
@@ -4508,61 +4531,18 @@
as '
declare
v_row record;
- v_origin int4;
- v_receiver int4;
- v_done boolean;
-
begin
- return 0;
- -- 0. Drop out listens
+ -- First remove the entire configuration
delete from @NAMESPACE at .sl_listen;
- -- 1. Add listens pointed out by subscriptions - sl_listen
- -- @NAMESPACE at .storelisten(origin, provider, receiver)
- for v_row in select sub_provider, sub_receiver
- from @NAMESPACE at .sl_subscribe s
- where exists (select true from @NAMESPACE at .sl_path p where
- p.pa_server = s.sub_provider and
- p.pa_client = s.sub_receiver)
- loop
- -- raise notice ''Listen based on subscription: server % client %'', v_row.sub_provider, v_row.sub_receiver;
- perform @NAMESPACE at .storelisten(v_row.sub_provider, v_row.sub_provider, v_row.sub_receiver);
- end loop;
-
- -- 2. Add direct listens pointed out in sl_path
- for v_row in select pa_server, pa_client
- from @NAMESPACE at .sl_path path
- where not exists (select true from @NAMESPACE at .sl_listen listen
- where path.pa_server = listen.li_origin and
- path.pa_client = listen.li_receiver)
- loop
- -- raise notice ''Listen based on path: server % client %'', v_row.pa_server, v_row.pa_client;
- perform @NAMESPACE at .storelisten(v_row.pa_server, v_row.pa_server, v_row.pa_client);
- end loop;
-
- -- 3. Iterate until we cannot iterate any more...
- -- Add in indirect listens based on what is in sl_listen and sl_path
- v_done := ''f'';
- while not v_done loop
- v_done := ''t'';
- for v_row in select li_origin, pa_server, pa_client
- from @NAMESPACE at .sl_path path, @NAMESPACE at .sl_listen listen
- where
- li_receiver = pa_server
- and li_origin <> pa_client
- and not exists (select true from @NAMESPACE at .sl_listen listen2
- where listen2.li_origin = listen.li_origin and
- listen2.li_receiver = path.pa_client)
- and exists (select true from @NAMESPACE at .sl_path p
- where p.pa_server = path.pa_server and
- p.pa_client = path.pa_client )
-
+ -- The loop over every possible pair of origin, receiver
+ for v_row in select N1.no_id as origin, N2.no_id as receiver
+ from @NAMESPACE at .sl_node N1, @NAMESPACE at .sl_node N2
+ where N1.no_id <> N2.no_id
loop
- -- raise notice ''Listen based on listen/path - ORG: % PROV:% REC:%'', v_row.li_origin,v_row.pa_server,v_row.pa_client;
- perform @NAMESPACE at .storelisten(v_row.li_origin,v_row.pa_server,v_row.pa_client);
- v_done := ''f'';
- end loop;
+ perform @NAMESPACE at .RebuildListenEntriesOne(v_row.origin, v_row.receiver);
end loop;
+
return 0;
end;
' language plpgsql;
@@ -4574,6 +4554,97 @@
rewrites the sl_listen entries, adding in all the ones required to
allow communications between nodes in the Slony-I cluster.';
+
+-- ----------------------------------------------------------------------
+-- FUNCTION RebuildListenEntriesOne (origin, receiver)
+--
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .RebuildListenEntriesOne(int4, int4)
+returns int4
+as '
+declare
+ p_origin alias for $1;
+ p_receiver alias for $2;
+ v_row record;
+begin
+ -- 1. If the receiver is subscribed to any set from the origin,
+ -- listen on the same provider(s).
+ for v_row in select distinct sub_provider
+ from @NAMESPACE at .sl_subscribe, @NAMESPACE at .sl_set,
+ @NAMESPACE at .sl_path
+ where sub_set = set_id
+ and set_origin = p_origin
+ and sub_receiver = p_receiver
+ and sub_provider = pa_server
+ and sub_receiver = pa_client
+ loop
+ perform @NAMESPACE at .storeListen_int(p_origin,
+ v_row.sub_provider, p_receiver);
+ end loop;
+ if found then
+ return 1;
+ end if;
+
+ -- 2. If the receiver has a direct path to the provider,
+ -- use that.
+ if exists (select true
+ from @NAMESPACE at .sl_path
+ where pa_server = p_origin
+ and pa_client = p_receiver)
+ then
+ perform @NAMESPACE at .storeListen_int(p_origin, p_origin, p_receiver);
+ return 1;
+ end if;
+
+ -- 3. Listen on every node that is either provider for the
+ -- receiver or is using the receiver as provider (follow the
+ -- normal subscription routes).
+ for v_row in select distinct provider from (
+ select sub_provider as provider
+ from @NAMESPACE at .sl_subscribe
+ where sub_receiver = p_receiver
+ union
+ select sub_receiver as provider
+ from @NAMESPACE at .sl_subscribe
+ where sub_provider = p_receiver
+ and exists (select true from @NAMESPACE at .sl_path
+ where pa_server = sub_receiver
+ and pa_client = sub_provider)
+ ) as S
+ loop
+ perform @NAMESPACE at .storeListen_int(p_origin,
+ v_row.provider, p_receiver);
+ end loop;
+ if found then
+ return 1;
+ end if;
+
+ -- 4. If all else fails - meaning there are no subscriptions to
+ -- guide us to the right path - use every node we have a path
+ -- to as provider. This normally only happens when the cluster
+ -- is built or a new node added. This brute force fallback
+ -- ensures that events will propagate if possible at all.
+ for v_row in select pa_server as provider
+ from @NAMESPACE at .sl_path
+ where pa_client = p_receiver
+ loop
+ perform @NAMESPACE at .storeListen_int(p_origin,
+ v_row.provider, p_receiver);
+ end loop;
+ if found then
+ return 1;
+ end if;
+
+ return 0;
+end;
+' language plpgsql;
+
+comment on function @NAMESPACE at .RebuildListenEntriesOne(int4, int4) is
+'RebuildListenEntriesOne(p_origin, p_receiver)
+
+Rebuilding of sl_listen entries for one origin, receiver pair.';
+
+
-- ----------------------------------------------------------------------
-- FUNCTION generate_sync_event ()
--
Index: test_2_pgbench
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_2_pgbench,v
retrieving revision 1.9
retrieving revision 1.10
diff -Lsrc/ducttape/test_2_pgbench -Lsrc/ducttape/test_2_pgbench -u -w -r1.9 -r1.10
--- src/ducttape/test_2_pgbench
+++ src/ducttape/test_2_pgbench
@@ -219,8 +219,6 @@
store node (id = 2, comment = 'Node 2');
store path (server = 1, client = 2, conninfo = 'dbname=$DB1');
store path (server = 2, client = 1, conninfo = 'dbname=$DB2');
- store listen (origin = 1, receiver = 2);
- store listen (origin = 2, receiver = 1);
}
on error { exit -1; }
echo 'Database $DB2 added as Node 2';
@@ -298,11 +296,6 @@
store path (server = 3, client = 1, conninfo = 'dbname=$DB3');
store path (server = 3, client = 2, conninfo = 'dbname=$DB3');
- store listen (origin = 1, receiver = 3, provider = 2);
- store listen (origin = 2, receiver = 3);
- store listen (origin = 3, receiver = 2);
- store listen (origin = 3, receiver = 1, provider = 2);
-
echo 'Database $DB2 added as Node 2';
_EOF_
Index: test_1_pgbench
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_1_pgbench,v
retrieving revision 1.16
retrieving revision 1.17
diff -Lsrc/ducttape/test_1_pgbench -Lsrc/ducttape/test_1_pgbench -u -w -r1.16 -r1.17
--- src/ducttape/test_1_pgbench
+++ src/ducttape/test_1_pgbench
@@ -203,8 +203,6 @@
store node (id = 2, comment = 'Node 2');
store path (server = 1, client = 2, conninfo = 'dbname=$DB1');
store path (server = 2, client = 1, conninfo = 'dbname=$DB2');
- store listen (origin = 1, receiver = 2);
- store listen (origin = 2, receiver = 1);
}
on error { exit -1; }
echo 'Database $DB2 added as Node 2';
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.65
retrieving revision 1.66
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.65 -r1.66
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -257,6 +257,7 @@
int64 curr_config = -1;
char seqbuf[64];
int event_ok;
+ int need_reloadListen = false;
slon_log(SLON_DEBUG1,
"remoteWorkerThread_%d: thread starts\n",
@@ -526,6 +527,8 @@
"select %s.storeNode_int(%d, '%q'); ",
rtcfg_namespace,
no_id, no_comment);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
{
int no_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -537,6 +540,8 @@
"select %s.enableNode_int(%d); ",
rtcfg_namespace,
no_id);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "DROP_NODE") == 0)
{
int no_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -584,6 +589,8 @@
slon_appendquery(&query1,
"notify \"_%s_Restart\"; ",
rtcfg_cluster_name);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "STORE_PATH") == 0)
{
int pa_server = (int)strtol(event->ev_data1, NULL, 10);
@@ -598,6 +605,8 @@
"select %s.storePath_int(%d, %d, '%q', %d); ",
rtcfg_namespace,
pa_server, pa_client, pa_conninfo, pa_connretry);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "DROP_PATH") == 0)
{
int pa_server = (int)strtol(event->ev_data1, NULL, 10);
@@ -610,6 +619,8 @@
"select %s.dropPath_int(%d, %d); ",
rtcfg_namespace,
pa_server, pa_client);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
{
int li_origin = (int)strtol(event->ev_data1, NULL, 10);
@@ -781,6 +792,8 @@
rtcfg_moveSet(set_id, old_origin, new_origin, sub_provider);
dstring_reset(&query1);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "FAILOVER_SET") == 0)
{
int failed_node = (int)strtol(event->ev_data1, NULL, 10);
@@ -793,6 +806,8 @@
"select %s.failoverSet_int(%d, %d, %d); ",
rtcfg_namespace,
failed_node, backup_node, set_id);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
{
int sub_set = (int)strtol(event->ev_data1, NULL, 10);
@@ -807,6 +822,8 @@
"select %s.subscribeSet_int(%d, %d, %d, '%q'); ",
rtcfg_namespace,
sub_set, sub_provider, sub_receiver, sub_forward);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
{
int sub_set = (int)strtol(event->ev_data1, NULL, 10);
@@ -918,6 +935,7 @@
sub_set, sub_provider, sub_receiver);
}
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
{
int sub_set = (int)strtol(event->ev_data1, NULL, 10);
@@ -932,6 +950,8 @@
"select %s.unsubscribeSet_int(%d, %d); ",
rtcfg_namespace,
sub_set, sub_receiver);
+
+ need_reloadListen = true;
} else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
{
int ddl_setid = (int)strtol(event->ev_data1, NULL, 10);
@@ -962,6 +982,12 @@
}
if (query_execute(node, local_dbconn, &query1) < 0)
slon_abort();
+
+ if (need_reloadListen)
+ {
+ rtcfg_reloadListen(local_dbconn);
+ need_reloadListen = false;
+ }
}
#ifdef SLON_MEMDEBUG
Index: slon.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v
retrieving revision 1.36
retrieving revision 1.37
diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.36 -r1.37
--- src/slon/slon.c
+++ src/slon/slon.c
@@ -344,29 +344,9 @@
PQclear(res);
/*
- * Read configuration table sl_listen - the interesting pieces
+ * Load the initial listen configuration
*/
- slon_mkquery(&query,
- "select li_origin, li_provider "
- "from %s.sl_listen where li_receiver = %d",
- rtcfg_namespace, rtcfg_nodeid);
- res = PQexec(startup_conn, dstring_data(&query));
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- slon_log(SLON_FATAL, "main: Cannot get listen config - %s",
- PQresultErrorMessage(res));
- PQclear(res);
- dstring_free(&query);
- slon_exit(-1);
- }
- for (i = 0, n = PQntuples(res); i < n; i++)
- {
- int li_origin = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
- int li_provider = (int)strtol(PQgetvalue(res, i, 1), NULL, 10);
-
- rtcfg_storeListen(li_origin, li_provider);
- }
- PQclear(res);
+ rtcfg_reloadListen(startup_conn);
/*
* Read configuration table sl_set
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.40
retrieving revision 1.41
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.40 -r1.41
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -363,6 +363,7 @@
int pa_connretry);
extern void rtcfg_dropPath(int pa_server);
+extern void rtcfg_reloadListen(PGconn *db);
extern void rtcfg_storeListen(int li_origin, int li_provider);
extern void rtcfg_dropListen(int li_origin, int li_provider);
Index: runtime_config.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/runtime_config.c,v
retrieving revision 1.21
retrieving revision 1.22
diff -Lsrc/slon/runtime_config.c -Lsrc/slon/runtime_config.c -u -w -r1.21 -r1.22
--- src/slon/runtime_config.c
+++ src/slon/runtime_config.c
@@ -397,6 +397,69 @@
* ---------- rtcfg_storeListen ----------
*/
void
+rtcfg_reloadListen(PGconn *db)
+{
+ SlonDString query;
+ PGresult *res;
+ int i, n;
+ SlonNode *node;
+ SlonListen *listen;
+
+ /*
+ * Clear out the entire Listen configuration
+ */
+ rtcfg_lock();
+ for (node = rtcfg_node_list_head; node; node = node->next)
+ {
+ while((listen = node->listen_head) != NULL)
+ {
+ DLLIST_REMOVE(node->listen_head, node->listen_tail, listen);
+ free(listen);
+ }
+ }
+ rtcfg_unlock();
+ rtcfg_seq_bump();
+
+ /*
+ * Read configuration table sl_listen - the interesting pieces
+ */
+ dstring_init(&query);
+
+ slon_mkquery(&query,
+ "select li_origin, li_provider "
+ "from %s.sl_listen where li_receiver = %d",
+ rtcfg_namespace, rtcfg_nodeid);
+ res = PQexec(db, dstring_data(&query));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_FATAL, "Cannot get listen config - %s",
+ PQresultErrorMessage(res));
+ PQclear(res);
+ dstring_free(&query);
+ slon_exit(-1);
+ }
+ for (i = 0, n = PQntuples(res); i < n; i++)
+ {
+ int li_origin = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
+ int li_provider = (int)strtol(PQgetvalue(res, i, 1), NULL, 10);
+
+ rtcfg_storeListen(li_origin, li_provider);
+ }
+ PQclear(res);
+
+ dstring_free(&query);
+
+ for (node = rtcfg_node_list_head; node; node = node->next)
+ {
+ rtcfg_startStopNodeThread(node);
+ }
+}
+
+
+/*
+ * ---------- rtcfg_storeListen ----------
+ */
+void
rtcfg_storeListen(int li_origin, int li_provider)
{
SlonNode *node;
@@ -412,6 +475,7 @@
slon_abort();
return;
}
+
/*
* Check if we already listen for events from that origin at this
* provider.
@@ -476,6 +540,7 @@
slon_abort();
return;
}
+
/*
* Find that listen entry at this provider.
*/
@@ -501,6 +566,7 @@
return;
}
}
+
rtcfg_unlock();
/*
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.26
retrieving revision 1.27
diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.26 -r1.27
--- src/slon/local_listen.c
+++ src/slon/local_listen.c
@@ -227,6 +227,8 @@
if (no_id != rtcfg_nodeid)
rtcfg_storeNode(no_id, no_comment);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "ENABLE_NODE") == 0)
{
@@ -239,6 +241,8 @@
if (no_id != rtcfg_nodeid)
rtcfg_enableNode(no_id);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "DROP_NODE") == 0)
{
@@ -272,6 +276,8 @@
slon_abort();
}
PQclear(notify_res);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "STORE_PATH") == 0)
{
@@ -290,6 +296,8 @@
if (pa_client == rtcfg_nodeid)
rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "DROP_PATH") == 0)
{
@@ -304,6 +312,8 @@
if (pa_client == rtcfg_nodeid)
rtcfg_dropPath(pa_server);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "STORE_LISTEN") == 0)
{
@@ -523,6 +533,8 @@
dstring_free(&query2);
rtcfg_moveSet(set_id, old_origin, new_origin, sub_provider);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "FAILOVER_SET") == 0)
{
@@ -552,6 +564,8 @@
if (sub_receiver == rtcfg_nodeid)
rtcfg_storeSubscribe(sub_set, sub_provider, sub_forward);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "ENABLE_SUBSCRIPTION") == 0)
{
@@ -570,6 +584,8 @@
if (sub_receiver == rtcfg_nodeid)
rtcfg_enableSubscription(sub_set, sub_provider, sub_forward);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "UNSUBSCRIBE_SET") == 0)
{
@@ -584,6 +600,8 @@
if (sub_receiver == rtcfg_nodeid)
rtcfg_unsubscribeSet(sub_set);
+
+ rtcfg_reloadListen(dbconn);
}
else if (strcmp(ev_type, "DDL_SCRIPT") == 0)
{
- Previous message: [Slony1-commit] By cbbrowne: Make "ducttape" scripts that use pgbench aware of varying
- Next message: [Slony1-general] Re: [Slony1-commit] By cbbrowne: Add FUNCTION generate_sync_event () - to try to add SYNCs
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list