Steve Singer ssinger at ca.afilias.info
Mon Aug 9 07:46:22 PDT 2010
        If we are reshaping a cluster then slonik will contact the receiver
        directly and run update sl_subscribe so the receiver can
        listen from the correct source.
---
 src/backend/slony1_funcs.sql |   24 ++++++++++++++++++
 src/slonik/slonik.c          |   56 ++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 78 insertions(+), 2 deletions(-)

diff --git a/src/backend/slony1_funcs.sql b/src/backend/slony1_funcs.sql
index 8c77c8d..773fd36 100644
--- a/src/backend/slony1_funcs.sql
+++ b/src/backend/slony1_funcs.sql
@@ -5885,3 +5885,27 @@ comment on function @NAMESPACE at .replicate_partition(int4, text, text, text, text
 tab_idxname is optional - if NULL, then we use the primary key.
 This function looks up replication configuration via the parent table.';
 
+create or replace function @NAMESPACE at .reshapeSubscription (int4, int4, int4) returns int4 as $$
+declare
+	p_sub_set			alias for $1;
+	p_sub_provider		alias for $2;
+	p_sub_receiver		alias for $3;
+begin
+	-- ----
+	-- Grab the central configuration lock
+	-- ----
+	lock table @NAMESPACE at .sl_config_lock;
+
+	update @NAMESPACE at .sl_subscribe set sub_provider=p_sub_provider
+		   WHERE sub_set=p_sub_set AND sub_receiver=p_sub_receiver;
+	perform @NAMESPACE at .RebuildListenEntries();
+	notify "_ at CLUSTERNAME@_Restart";
+	return 0;
+end
+$$ language plpgsql;
+
+comment on function @NAMESPACE at .reshapeSubscription(int4,int4,int4) is
+'Run on a receiver/subscriber node when the provider for that
+subscription is being changed.  Slonik will invoke this method
+before the SUBSCRIBE_SET event propogates to the receiver
+so listen paths can be updated.';
\ No newline at end of file
diff --git a/src/slonik/slonik.c b/src/slonik/slonik.c
index cb46ffb..35ca6f7 100644
--- a/src/slonik/slonik.c
+++ b/src/slonik/slonik.c
@@ -3444,6 +3444,9 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt)
 {
 	SlonikAdmInfo *adminfo1;
 	SlonDString query;
+	PGresult    *res1;
+	SlonikAdmInfo * adminfo2;
+	int reshape=0;
 
 	adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->sub_provider);
 	if (adminfo1 == NULL)
@@ -3454,6 +3457,33 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt)
 
 	dstring_init(&query);
 
+	/**
+	 * If the receiver is already subscribed to
+	 * the set through a different provider
+	 * slonik will need to tell the receiver
+	 * about this change directy.
+	 *
+	 */
+
+	slon_mkquery(&query,"select * FROM \"_%s\".sl_subscribe " \
+				 "where sub_set=%d AND sub_receiver=%d " \
+				 " and sub_active=true and sub_provider<>%d",
+				 stmt->hdr.script->clustername,
+				 stmt->sub_setid,stmt->sub_receiver,
+				 stmt->sub_provider);
+	
+	res1 = db_exec_select((SlonikStmt*) stmt,adminfo1,&query);
+	if(res1 == NULL) {
+		dstring_free(&query);
+		return -1;
+	}
+	if(PQntuples(res1) > 0) 
+	{
+		reshape=1;
+	}
+	PQclear(res1);
+	dstring_reset(&query);
+
 	slon_mkquery(&query,
 				 "select \"_%s\".subscribeSet(%d, %d, %d, '%s', '%s'); ",
 				 stmt->hdr.script->clustername,
@@ -3466,8 +3496,30 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt)
 		dstring_free(&query);
 		return -1;
 	}
-
-	dstring_free(&query);
+	dstring_reset(&query);
+	if(reshape)
+	{
+		adminfo2 = get_active_adminfo((SlonikStmt *) stmt, stmt->sub_receiver);
+		if(adminfo2 == NULL) 
+		{
+			printf("can not find conninfo for receiver node %d\n",
+				   stmt->sub_receiver);
+			return -1;
+		}
+		slon_mkquery(&query,
+					 "select \"_%s\".reshapeSubscription(%d,%d,%d);",
+					 stmt->hdr.script->clustername,
+					 stmt->sub_provider,stmt->sub_setid,
+					 stmt->sub_receiver);	
+		if (db_exec_evcommand((SlonikStmt *) stmt, adminfo2, &query) < 0)
+		{
+			printf("error reshaping subscriber\n");
+			dstring_free(&query);
+			return -1;
+		}
+		
+		dstring_free(&query);
+	}
 	return 0;
 }
 
-- 
1.6.3.3


--------------020106060301020405000608--


More information about the Slony1-patches mailing list