Mon Mar 20 14:20:49 PST 2006
- Previous message: [Slony1-commit] By cbbrowne: Revisions to DDL Script processing.
- Next message: [Slony1-commit] By cbbrowne: Add in -x option - command_on_logarchive Upon completing a
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Revisions to listen path generation based on code from Florian Pflug.
General idea: Use the notion of reachableness to determine if paths (in
sl_path) should be used as possible listen paths (e.g. - as providers
for sl_listen)
This should eliminate the problem in 1.1 where some listen paths were
actually infeasible, e.g. - some nodes listened on paths that could not
possibly provide messages from the desired origin.
This also includes a script (test_listen_path_gen.sql) to help test
the results. It seeds sample sl_path/sl_subscribe configuration for
a number of replication layouts, and verifies that they lead to feasible
sets of listen paths.
Modified Files:
--------------
slony1-engine/src/backend:
slony1_funcs.sql (r1.80 -> r1.81)
Added Files:
-----------
slony1-engine/src/backend:
test_listen_path_gen.sql (r1.1)
-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.80
retrieving revision 1.81
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.80 -r1.81
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -4994,25 +4994,46 @@
returns int
as '
declare
- v_row record;
+ v_receiver record ;
+ v_provider record ;
+ v_origin record ;
+ v_reachable int4[] ;
begin
-- First remove the entire configuration
delete from @NAMESPACE at .sl_listen;
- -- The loop over every possible pair of origin, receiver
- for v_row in select N1.no_id as origin, N2.no_id as receiver
- from @NAMESPACE at .sl_node N1, @NAMESPACE at .sl_node N2
- where N1.no_id <> N2.no_id
- loop
- perform @NAMESPACE at .RebuildListenEntriesOne(v_row.origin, v_row.receiver);
+ -- Loop over every possible pair of receiver and provider
+ for v_receiver in select no_id from @NAMESPACE at .sl_node loop
+ for v_provider in select pa_server as no_id from @NAMESPACE at .sl_path where pa_client = v_receiver.no_id loop
+
+ -- Find all nodes that v_provider.no_id can receiver events from without using v_receiver.no_id
+ for v_origin in select * from @NAMESPACE at .ReachableFromNode(v_provider.no_id, array[v_receiver.no_id]) as r(no_id) loop
+
+ -- If v_receiver.no_id subscribes a set from v_provider.no_id, events have to travel the same
+ -- path as the data. Ignore possible sl_listen that would break that rule.
+ perform 1 from @NAMESPACE at .sl_subscribe
+ join @NAMESPACE at .sl_set on sl_set.set_id = sl_subscribe.sub_set
+ where
+ sub_receiver = v_receiver.no_id and
+ sub_provider != v_provider.no_id and
+ set_origin = v_origin.no_id ;
+ if not found then
+ insert into @NAMESPACE at .sl_listen (li_receiver, li_provider, li_origin)
+ values (v_receiver.no_id, v_provider.no_id, v_origin.no_id) ;
+ end if ;
+
+
end loop;
- return 0;
+ end loop ;
+ end loop ;
+
+ return null ;
end;
-' language plpgsql;
+' language 'plpgsql';
comment on function @NAMESPACE at .RebuildListenEntries() is
-'RebuildListenEntries(p_provider, p_receiver)
+'RebuildListenEntries()
Invoked by various subscription and path modifying functions, this
rewrites the sl_listen entries, adding in all the ones required to
@@ -5020,93 +5041,44 @@
-- ----------------------------------------------------------------------
--- FUNCTION RebuildListenEntriesOne (origin, receiver)
+-- FUNCTION ReachableFromNode (receiver, blacklist)
--
-- ----------------------------------------------------------------------
-create or replace function @NAMESPACE at .RebuildListenEntriesOne(int4, int4)
-returns int4
-as '
+create or replace function @NAMESPACE at .ReachableFromNode(int4, int4[]) returns setof int4 as '
declare
- p_origin alias for $1;
- p_receiver alias for $2;
- v_row record;
+ v_node alias for $1 ;
+ v_blacklist alias for $2 ;
+ v_ignore int4[] ;
+ v_reachable_edge_last int4[] ;
+ v_reachable_edge_new int4[] default \'{}\' ;
+ v_server record ;
begin
- -- 1. If the receiver is subscribed to any set from the origin,
- -- listen on the same provider(s).
- for v_row in select distinct sub_provider
- from @NAMESPACE at .sl_subscribe, @NAMESPACE at .sl_set,
- @NAMESPACE at .sl_path
- where sub_set = set_id
- and set_origin = p_origin
- and sub_receiver = p_receiver
- and sub_provider = pa_server
- and sub_receiver = pa_client
- loop
- perform @NAMESPACE at .storeListen_int(p_origin,
- v_row.sub_provider, p_receiver);
- end loop;
- if found then
- return 1;
- end if;
-
- -- 2. If the receiver has a direct path to the provider,
- -- use that.
- if exists (select true
+ v_reachable_edge_last := array[v_node] ;
+ v_ignore := v_blacklist || array[v_node] ;
+ return next v_node ;
+ while v_reachable_edge_last != \'{}\' loop
+ v_reachable_edge_new := \'{}\' ;
+ for v_server in select pa_server as no_id
from @NAMESPACE at .sl_path
- where pa_server = p_origin
- and pa_client = p_receiver)
- then
- perform @NAMESPACE at .storeListen_int(p_origin, p_origin, p_receiver);
- return 1;
- end if;
-
- -- 3. Listen on every node that is either provider for the
- -- receiver or is using the receiver as provider (follow the
- -- normal subscription routes).
- for v_row in select distinct provider from (
- select sub_provider as provider
- from @NAMESPACE at .sl_subscribe
- where sub_receiver = p_receiver
- union
- select sub_receiver as provider
- from @NAMESPACE at .sl_subscribe
- where sub_provider = p_receiver
- and exists (select true from @NAMESPACE at .sl_path
- where pa_server = sub_receiver
- and pa_client = sub_provider)
- ) as S
+ where pa_client = ANY(v_reachable_edge_last) and pa_server != ALL(v_ignore)
loop
- perform @NAMESPACE at .storeListen_int(p_origin,
- v_row.provider, p_receiver);
- end loop;
- if found then
- return 1;
+ if v_server.no_id != ALL(v_ignore) then
+ v_ignore := v_ignore || array[v_server.no_id] ;
+ v_reachable_edge_new := v_reachable_edge_new || array[v_server.no_id] ;
+ return next v_server.no_id ;
end if;
-
- -- 4. If all else fails - meaning there are no subscriptions to
- -- guide us to the right path - use every node we have a path
- -- to as provider. This normally only happens when the cluster
- -- is built or a new node added. This brute force fallback
- -- ensures that events will propagate if possible at all.
- for v_row in select pa_server as provider
- from @NAMESPACE at .sl_path
- where pa_client = p_receiver
- loop
- perform @NAMESPACE at .storeListen_int(p_origin,
- v_row.provider, p_receiver);
end loop;
- if found then
- return 1;
- end if;
-
- return 0;
+ v_reachable_edge_last := v_reachable_edge_new ;
+ end loop ;
+ return ;
end;
-' language plpgsql;
+' language 'plpgsql';
-comment on function @NAMESPACE at .RebuildListenEntriesOne(int4, int4) is
-'RebuildListenEntriesOne(p_origin, p_receiver)
+comment on function @NAMESPACE at .ReachableFromNode(int4, int4[]) is
+'ReachableFromNode(receiver, blacklist)
-Rebuilding of sl_listen entries for one origin, receiver pair.';
+Find all nodes that <receiver> can receive events from without
+using nodes in <blacklist> as a relay.';
-- ----------------------------------------------------------------------
--- /dev/null
+++ src/backend/test_listen_path_gen.sql
@@ -0,0 +1,261 @@
+--- $Id: test_listen_path_gen.sql,v 1.1 2006/03/20 22:20:48 cbbrowne Exp $
+
+-- This SQL script is used to test the new listen path generation code
+-- to make sure the resulting sl_listen allows all nodes to be
+-- reachable/audible
+
+-- This test basically messes with sl_path, sl_listen, sl_node,
+-- sl_subscribe by hand, assuming that the tables and normal functions exist
+-- in the schema "_slony_regress1"
+
+-- You can get this schema set up either by:
+
+-- 1. Running a slonik script that does "init cluster()" for a cluster
+-- called slony_regress1
+
+-- 2. Running just about any of the test bed scripts, which default to
+-- create the slony_regress1 cluster, and stop the script before
+-- it ends and purges out the databases
+
+-- Some helpers for the test below
+
+create table nodes (id int4 not null primary key) ;
+insert into nodes values (1);
+insert into nodes values (2);
+insert into nodes values (3);
+insert into nodes values (4);
+insert into nodes values (5);
+insert into nodes values (6);
+insert into nodes values (7);
+insert into nodes values (8);
+insert into nodes values (9);
+insert into nodes values (10);
+
+
+create or replace view _slony_regress1.listener_orphans as
+ select n1.no_id as origin, n2.no_id as receiver
+ from _slony_regress1.sl_node n1, _slony_regress1.sl_node n2
+ where n1.no_id <> n2.no_id and
+ not exists (select true from _slony_regress1.sl_listen
+ where li_origin = n1.no_id and li_receiver = n2.no_id);
+
+create or replace function "_slony_regress1".are_all_nodes_audible () returns int4 as '
+declare
+ v_failed int4;
+ v_source record;
+ v_dest record;
+begin
+ v_failed := 0;
+ for v_source in select no_id from "_slony_regress1".sl_node loop
+ for v_dest in select no_id from "_slony_regress1".sl_node where no_id <> v_source.no_id loop
+ if "_slony_regress1".can_node_hear (v_source.no_id, v_dest.no_id) then
+ raise notice ''Slony-I: Node % can hear %'', v_source.no_id, v_dest.no_id;
+ else
+ raise notice ''Slony-I: Node % cannot hear %'', v_source.no_id, v_dest.no_id;
+ v_failed := v_failed + 1;
+ end if;
+ end loop;
+ end loop;
+ return v_failed;
+end;' language plpgsql;
+
+
+create or replace function "_slony_regress1".can_node_hear (int4, int4) returns boolean as '
+declare
+ v_receiver alias for $1;
+ v_origin alias for $2;
+ v_rec record;
+ v_clist int4[];
+ v_csize int4;
+ i int4;
+ j int4;
+ done boolean;
+ add_item boolean;
+begin
+ v_clist := ARRAY[v_receiver];
+ v_csize := 1;
+ done := ''f'';
+ while done = ''f'' loop
+ for i in 1..v_csize loop
+ for v_rec in select distinct li_provider from "_slony_regress1".sl_listen where
+ li_receiver = v_clist[i] loop
+ if v_rec.li_provider = v_origin then
+ return ''t'';
+ end if;
+ add_item := ''t'';
+ for j in 1..v_csize loop
+ if v_clist[i] = v_rec.li_provider then
+ add_item := ''f'';
+ exit; -- No need to keep searching
+ end if;
+ end loop;
+ if add_item then
+ v_csize := v_csize + 1;
+ v_clist[v_csize] := v_rec.li_provider;
+ end if;
+ end loop;
+ end loop;
+ end loop;
+ return ''f'';
+end;' language plpgsql;
+
+--Test1
+-- 21 <-> 20 <-> 1 <-> 10 <-> 11
+
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node(no_id) values (1);
+insert into _slony_regress1.sl_node(no_id) values (10);
+insert into _slony_regress1.sl_node(no_id) values (11);
+insert into _slony_regress1.sl_node(no_id) values (20);
+insert into _slony_regress1.sl_node(no_id) values (21);
+
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,20,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,11,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (11,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,21,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (21,20,'');
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+select 'Try again, specifying subscriptions, as well.';
+insert into _slony_regress1.sl_set(set_id, set_origin) values (1, 1);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 1, 10);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 1, 20);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 10, 11);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 21, 21);
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+
+--Test2
+-- 2 <-- 1 --> 4
+-- | ^ |
+-- | / \ |
+-- v / \ v
+-- 3 5
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node(no_id) values (1);
+insert into _slony_regress1.sl_node(no_id) values (2);
+insert into _slony_regress1.sl_node(no_id) values (3);
+insert into _slony_regress1.sl_node(no_id) values (4);
+insert into _slony_regress1.sl_node(no_id) values (5);
+
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,3,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,5,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (2,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (3,2,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (4,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (5,4,'');
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+--Test3
+--Fully meshed setup with 10 nodes
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node (no_id) select * from nodes;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+select n1.id, n2.id, 'test dsn'
+from nodes n1, nodes n2
+where n1.id != n2.id ;
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+--Test4
+--A transitiv graph with 10 nodes
+--This should warn about unreachable nodes
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node (no_id) select * from nodes;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+select n1.id, n2.id, 'test dsn'
+from nodes n1, nodes n2
+where n1.id < n2.id ;
+
+select _slony_regress1.rebuildlistenentries();
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+--Test5
+--A (nearly) transitiv graph with 10 nodes, but with the missing
+--connection (1 -> 10) added.
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node (no_id) select * from nodes;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+select n1.id, n2.id, 'test dsn'
+from nodes n1, nodes n2
+where n1.id < n2.id ;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+values (10, 1, 'complete the graph...');
+
+select _slony_regress1.rebuildlistenentries();
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+
+--Test6
+-- 21 <-> 20 <-> 1 <-> 10 <-> 11
+
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node(no_id) values (1);
+insert into _slony_regress1.sl_node(no_id) values (10);
+insert into _slony_regress1.sl_node(no_id) values (11);
+insert into _slony_regress1.sl_node(no_id) values (20);
+insert into _slony_regress1.sl_node(no_id) values (21);
+
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,20,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,11,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (11,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,21,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (21,20,'');
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+select 'Try again, specifying subscriptions, as well.';
+insert into _slony_regress1.sl_set(set_id, set_origin) values (1, 10);
+insert into _slony_regress1.sl_set(set_id, set_origin) values (2, 20);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 10, 11);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 20, 21);
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
- Previous message: [Slony1-commit] By cbbrowne: Revisions to DDL Script processing.
- Next message: [Slony1-commit] By cbbrowne: Add in -x option - command_on_logarchive Upon completing a
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list