CVS User Account cvsuser
Mon Mar 20 14:20:49 PST 2006
Log Message:
-----------
Revisions to listen path generation based on code from Florian Pflug.

General idea: Use the notion of reachableness to determine if paths (in
sl_path) should be used as possible listen paths (e.g. - as providers
for sl_listen)

This should eliminate the problem in 1.1 where some listen paths were
actually infeasible, e.g. - some nodes listened on paths that could not
possibly provide messages from the desired origin.

This also includes a script (test_listen_path_gen.sql) to help test
the results.  It seeds sample sl_path/sl_subscribe configuration for
a number of replication layouts, and verifies that they lead to feasible
sets of listen paths.

Modified Files:
--------------
    slony1-engine/src/backend:
        slony1_funcs.sql (r1.80 -> r1.81)

Added Files:
-----------
    slony1-engine/src/backend:
        test_listen_path_gen.sql (r1.1)

-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.80
retrieving revision 1.81
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.80 -r1.81
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -4994,25 +4994,46 @@
 returns int
 as '
 declare
-	v_row			record;
+	v_receiver record ;
+	v_provider record ;
+	v_origin record ;
+	v_reachable int4[] ;
 begin
 	-- First remove the entire configuration
 	delete from @NAMESPACE at .sl_listen;
 
-	-- The loop over every possible pair of origin, receiver
-	for v_row in select N1.no_id as origin, N2.no_id as receiver
-			from @NAMESPACE at .sl_node N1, @NAMESPACE at .sl_node N2
-			where N1.no_id <> N2.no_id
-	loop
-		perform @NAMESPACE at .RebuildListenEntriesOne(v_row.origin, v_row.receiver);
+	-- Loop over every possible pair of receiver and provider
+	for v_receiver in select no_id from @NAMESPACE at .sl_node loop
+		for v_provider in select pa_server as no_id from @NAMESPACE at .sl_path where pa_client = v_receiver.no_id loop
+
+			-- Find all nodes that v_provider.no_id can receiver events from without using v_receiver.no_id			
+			for v_origin in select * from @NAMESPACE at .ReachableFromNode(v_provider.no_id, array[v_receiver.no_id]) as r(no_id) loop
+
+				-- If v_receiver.no_id subscribes a set from v_provider.no_id, events have to travel the same
+				-- path as the data. Ignore possible sl_listen that would break that rule.
+				perform 1 from @NAMESPACE at .sl_subscribe
+					join @NAMESPACE at .sl_set on sl_set.set_id = sl_subscribe.sub_set
+		 			where
+						sub_receiver = v_receiver.no_id and
+						sub_provider != v_provider.no_id and
+						set_origin = v_origin.no_id ;
+				if not found then
+					insert into @NAMESPACE at .sl_listen (li_receiver, li_provider, li_origin)
+						values (v_receiver.no_id, v_provider.no_id, v_origin.no_id) ;
+				end if ;
+
+
 	end loop;
 
-	return 0;
+		end loop ;
+	end loop ;
+
+	return null ;
 end;
-' language plpgsql;
+' language 'plpgsql';
 
 comment on function @NAMESPACE at .RebuildListenEntries() is
-'RebuildListenEntries(p_provider, p_receiver)
+'RebuildListenEntries()
 
 Invoked by various subscription and path modifying functions, this
 rewrites the sl_listen entries, adding in all the ones required to
@@ -5020,93 +5041,44 @@
 
 
 -- ----------------------------------------------------------------------
--- FUNCTION RebuildListenEntriesOne (origin, receiver)
+-- FUNCTION ReachableFromNode (receiver, blacklist)
 --
 -- ----------------------------------------------------------------------
-create or replace function @NAMESPACE at .RebuildListenEntriesOne(int4, int4)
-returns int4
-as '
+create or replace function @NAMESPACE at .ReachableFromNode(int4, int4[]) returns setof int4 as '
 declare
-	p_origin		alias for $1;
-	p_receiver		alias for $2;
-	v_row			record;
+	v_node alias for $1 ;
+	v_blacklist alias for $2 ;
+	v_ignore int4[] ;
+	v_reachable_edge_last int4[] ;
+	v_reachable_edge_new int4[] default \'{}\' ;
+	v_server record ;
 begin
-	-- 1. If the receiver is subscribed to any set from the origin,
-	--    listen on the same provider(s).
-	for v_row in select distinct sub_provider
-			from @NAMESPACE at .sl_subscribe, @NAMESPACE at .sl_set,
-				@NAMESPACE at .sl_path
-			where sub_set = set_id
-			and set_origin = p_origin
-			and sub_receiver = p_receiver
-			and sub_provider = pa_server
-			and sub_receiver = pa_client
-	loop
-		perform @NAMESPACE at .storeListen_int(p_origin, 
-				v_row.sub_provider, p_receiver);
-	end loop;
-	if found then
-		return 1;
-	end if;
-
-	-- 2. If the receiver has a direct path to the provider,
-	--    use that.
-	if exists (select true
+	v_reachable_edge_last := array[v_node] ;
+	v_ignore := v_blacklist || array[v_node] ;
+	return next v_node ;
+	while v_reachable_edge_last != \'{}\' loop
+		v_reachable_edge_new := \'{}\' ;
+		for v_server in select pa_server as no_id
 			from @NAMESPACE at .sl_path
-			where pa_server = p_origin
-			and pa_client = p_receiver)
-	then
-		perform @NAMESPACE at .storeListen_int(p_origin, p_origin, p_receiver);
-		return 1;
-	end if;
-
-	-- 3. Listen on every node that is either provider for the
-	--    receiver or is using the receiver as provider (follow the
-	--    normal subscription routes).
-	for v_row in select distinct provider from (
-			select sub_provider as provider
-					from @NAMESPACE at .sl_subscribe
-					where sub_receiver = p_receiver
-			union
-			select sub_receiver as provider
-					from @NAMESPACE at .sl_subscribe
-					where sub_provider = p_receiver
-					and exists (select true from @NAMESPACE at .sl_path
-								where pa_server = sub_receiver
-								and pa_client = sub_provider)
-			) as S
+			where pa_client = ANY(v_reachable_edge_last) and pa_server != ALL(v_ignore)
 	loop
-		perform @NAMESPACE at .storeListen_int(p_origin,
-				v_row.provider, p_receiver);
-	end loop;
-	if found then
-		return 1;
+			if v_server.no_id != ALL(v_ignore) then
+				v_ignore := v_ignore || array[v_server.no_id] ;
+				v_reachable_edge_new := v_reachable_edge_new || array[v_server.no_id] ;
+				return next v_server.no_id ;
 	end if;
-
-	-- 4. If all else fails - meaning there are no subscriptions to
-	--    guide us to the right path - use every node we have a path
-	--    to as provider. This normally only happens when the cluster
-	--    is built or a new node added. This brute force fallback
-	--    ensures that events will propagate if possible at all.
-	for v_row in select pa_server as provider
-			from @NAMESPACE at .sl_path
-			where pa_client = p_receiver
-	loop
-		perform @NAMESPACE at .storeListen_int(p_origin, 
-				v_row.provider, p_receiver);
 	end loop;
-	if found then
-		return 1;
-	end if;
-
-	return 0;
+		v_reachable_edge_last := v_reachable_edge_new ;
+	end loop ;
+	return ;
 end;
-' language plpgsql;
+' language 'plpgsql';
 
-comment on function @NAMESPACE at .RebuildListenEntriesOne(int4, int4) is
-'RebuildListenEntriesOne(p_origin, p_receiver)
+comment on function @NAMESPACE at .ReachableFromNode(int4, int4[]) is
+'ReachableFromNode(receiver, blacklist)
 
-Rebuilding of sl_listen entries for one origin, receiver pair.';
+Find all nodes that <receiver> can receive events from without
+using nodes in <blacklist> as a relay.';
 
 
 -- ----------------------------------------------------------------------
--- /dev/null
+++ src/backend/test_listen_path_gen.sql
@@ -0,0 +1,261 @@
+--- $Id: test_listen_path_gen.sql,v 1.1 2006/03/20 22:20:48 cbbrowne Exp $
+
+-- This SQL script is used to test the new listen path generation code
+-- to make sure the resulting sl_listen allows all nodes to be 
+-- reachable/audible
+
+-- This test basically messes with sl_path, sl_listen, sl_node,
+-- sl_subscribe by hand, assuming that the tables and normal functions exist
+-- in the schema "_slony_regress1"
+
+-- You can get this schema set up either by:
+
+-- 1. Running a slonik script that does "init cluster()" for a cluster
+--    called slony_regress1
+
+-- 2. Running just about any of the test bed scripts, which default to
+--    create the slony_regress1 cluster, and stop the script before
+--    it ends and purges out the databases
+
+-- Some helpers for the test below
+
+create table nodes (id int4 not null primary key) ;
+insert into nodes values (1);
+insert into nodes values (2);
+insert into nodes values (3);
+insert into nodes values (4);
+insert into nodes values (5);
+insert into nodes values (6);
+insert into nodes values (7);
+insert into nodes values (8);
+insert into nodes values (9);
+insert into nodes values (10);
+
+
+create or replace view _slony_regress1.listener_orphans as
+   select n1.no_id as origin, n2.no_id as receiver
+   from _slony_regress1.sl_node n1, _slony_regress1.sl_node n2
+   where n1.no_id <> n2.no_id and
+         not exists (select true from _slony_regress1.sl_listen
+                     where li_origin = n1.no_id and li_receiver = n2.no_id);
+
+create or replace function "_slony_regress1".are_all_nodes_audible () returns int4 as '
+declare
+   v_failed int4;
+   v_source record;
+   v_dest record;
+begin
+	v_failed := 0;
+	for v_source in select no_id from "_slony_regress1".sl_node loop
+	   for v_dest in select no_id from "_slony_regress1".sl_node where no_id <> v_source.no_id loop
+		if "_slony_regress1".can_node_hear (v_source.no_id, v_dest.no_id) then
+			raise notice ''Slony-I: Node % can hear %'', v_source.no_id, v_dest.no_id;
+		else
+			raise notice ''Slony-I: Node % cannot hear %'', v_source.no_id, v_dest.no_id;
+			v_failed := v_failed + 1;
+		end if;
+	   end loop;
+	end loop;
+   	return v_failed;
+end;' language plpgsql;
+
+
+create or replace function "_slony_regress1".can_node_hear (int4, int4) returns boolean as '
+declare
+   v_receiver alias for $1;
+   v_origin   alias for $2;
+   v_rec    record;
+   v_clist int4[];
+   v_csize int4;
+   i int4;
+   j int4;
+   done boolean;
+   add_item boolean;
+begin
+  v_clist :=  ARRAY[v_receiver];
+  v_csize := 1;
+  done := ''f'';
+  while done = ''f'' loop
+    for i in 1..v_csize loop
+      for v_rec in select distinct li_provider from "_slony_regress1".sl_listen where
+                         li_receiver = v_clist[i] loop
+        if v_rec.li_provider = v_origin then
+  	  return ''t'';
+        end if;
+        add_item := ''t'';
+        for j in 1..v_csize loop
+	  if v_clist[i] = v_rec.li_provider then
+            add_item := ''f'';
+            exit;  -- No need to keep searching
+          end if;
+        end loop;
+        if add_item then
+          v_csize := v_csize + 1;
+          v_clist[v_csize] := v_rec.li_provider;
+        end if;
+      end loop;
+    end loop;
+  end loop;
+  return ''f'';
+end;' language plpgsql;   
+
+--Test1
+-- 21 <-> 20 <-> 1 <-> 10 <-> 11
+
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node(no_id) values (1);
+insert into _slony_regress1.sl_node(no_id) values (10);
+insert into _slony_regress1.sl_node(no_id) values (11);
+insert into _slony_regress1.sl_node(no_id) values (20);
+insert into _slony_regress1.sl_node(no_id) values (21);
+
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,20,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,11,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (11,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,21,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (21,20,'');
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+select 'Try again, specifying subscriptions, as well.';
+insert into _slony_regress1.sl_set(set_id, set_origin) values (1, 1);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 1, 10);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 1, 20);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 10, 11);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 21, 21);
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+
+--Test2
+-- 2 <-- 1 --> 4
+-- | ^ |
+-- | / \ |
+-- v / \ v
+-- 3 5
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node(no_id) values (1);
+insert into _slony_regress1.sl_node(no_id) values (2);
+insert into _slony_regress1.sl_node(no_id) values (3);
+insert into _slony_regress1.sl_node(no_id) values (4);
+insert into _slony_regress1.sl_node(no_id) values (5);
+
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,3,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,5,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (2,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (3,2,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (4,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (5,4,'');
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+--Test3
+--Fully meshed setup with 10 nodes
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node (no_id) select * from nodes;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+select n1.id, n2.id, 'test dsn'
+from nodes n1, nodes n2
+where n1.id != n2.id ;
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+--Test4
+--A transitiv graph with 10 nodes
+--This should warn about unreachable nodes
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node (no_id) select * from nodes;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+select n1.id, n2.id, 'test dsn'
+from nodes n1, nodes n2
+where n1.id < n2.id ;
+
+select _slony_regress1.rebuildlistenentries();
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+--Test5
+--A (nearly) transitiv graph with 10 nodes, but with the missing
+--connection (1 -> 10) added.
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node (no_id) select * from nodes;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+select n1.id, n2.id, 'test dsn'
+from nodes n1, nodes n2
+where n1.id < n2.id ;
+
+insert into _slony_regress1.sl_path (pa_client, pa_server, pa_conninfo)
+values (10, 1, 'complete the graph...');
+
+select _slony_regress1.rebuildlistenentries();
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+
+--Test6
+-- 21 <-> 20 <-> 1 <-> 10 <-> 11
+
+truncate _slony_regress1.sl_set, _slony_regress1.sl_setsync, _slony_regress1.sl_table, _slony_regress1.sl_trigger, _slony_regress1.sl_sequence, _slony_regress1.sl_subscribe, _slony_regress1.sl_listen, _slony_regress1.sl_path, _slony_regress1.sl_node;
+
+insert into _slony_regress1.sl_node(no_id) values (1);
+insert into _slony_regress1.sl_node(no_id) values (10);
+insert into _slony_regress1.sl_node(no_id) values (11);
+insert into _slony_regress1.sl_node(no_id) values (20);
+insert into _slony_regress1.sl_node(no_id) values (21);
+
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (1,20,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (10,11,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (11,10,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,1,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (20,21,'');
+insert into _slony_regress1.sl_path(pa_server, pa_client, pa_conninfo) values (21,20,'');
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+
+select 'Try again, specifying subscriptions, as well.';
+insert into _slony_regress1.sl_set(set_id, set_origin) values (1, 10);
+insert into _slony_regress1.sl_set(set_id, set_origin) values (2, 20);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 10, 11);
+insert into _slony_regress1.sl_subscribe(sub_set, sub_provider, sub_receiver) values (1, 20, 21);
+
+select _slony_regress1.rebuildlistenentries();
+
+select * from _slony_regress1.sl_listen order by li_origin, li_receiver, li_provider;
+select * from _slony_regress1.listener_orphans;
+select "_slony_regress1".are_all_nodes_audible();
+



More information about the Slony1-commit mailing list