CVS User Account cvsuser
Wed Oct 12 22:14:43 PDT 2005
Log Message:
-----------
Fixed failover processing.

In the case of failover we need the same ACCEPT_SET guarding, so that 
the remaining subscribers do not process any events from the backup
node before they have finished all remaining events from the failed
node, that might be queued up on the backup. 

ACCEPT_SET must also restart slon in order to guarantee that the new
configuration is loaded.


Jan

Tags:
----
REL_1_1_STABLE

Modified Files:
--------------
    slony1-engine/src/backend:
        slony1_funcs.sql (r1.64.2.7 -> r1.64.2.8)
    slony1-engine/src/slon:
        remote_worker.c (r1.86.2.5 -> r1.86.2.6)
    slony1-engine/src/slonik:
        slonik.c (r1.42.2.1 -> r1.42.2.2)

-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.64.2.7
retrieving revision 1.64.2.8
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.64.2.7 -r1.64.2.8
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -943,41 +943,6 @@
 -- Note that the following code should all become obsolete in the wake
 -- of the availability of RebuildListenEntries()...
 
-if false then
-	-- ----
-	-- Let every node that listens for something on the failed node
-	-- listen for that on the backup node instead.
-	-- ----
-	for v_row in select * from @NAMESPACE at .sl_listen
-			where li_provider = p_failed_node
-				and li_receiver <> p_backup_node
-	loop
-		perform @NAMESPACE at .storeListen_int(v_row.li_origin,
-				p_backup_node, v_row.li_receiver);
-	end loop;
-
-	-- ----
-	-- Let the backup node listen for all events where the
-	-- failed node did listen for it.
-	-- ----
-	for v_row in select li_origin, li_provider
-			from @NAMESPACE at .sl_listen
-			where li_receiver = p_failed_node
-				and li_provider <> p_backup_node
-	loop
-		perform @NAMESPACE at .storeListen_int(v_row.li_origin,
-				v_row.li_provider, p_backup_node);
-	end loop;
-
-	-- ----
-	-- Remove all sl_listen entries that receive anything from the
-	-- failed node.
-	-- ----
-	delete from @NAMESPACE at .sl_listen
-			where li_provider = p_failed_node
-				or li_receiver = p_failed_node;
-end if;
-
 	-- ----
 	-- Move the sets
 	-- ----
@@ -1009,12 +974,10 @@
 				loop
 					perform @NAMESPACE at .alterTableRestore(v_row2.tab_id);
 				end loop;
-			end if;
 
 			update @NAMESPACE at .sl_set set set_origin = p_backup_node
 					where set_id = v_row.set_id;
 
-			if p_backup_node = @NAMESPACE at .getLocalNodeId(''_ at CLUSTERNAME@'') then
 				delete from @NAMESPACE at .sl_setsync
 						where ssy_setid = v_row.set_id;
 
@@ -1106,6 +1069,8 @@
 				p_failed_node, p_ev_seqno;
 	end if;
 
+raise notice ''failedNode2(): faking FAILOVER_SET event'';
+
 	insert into @NAMESPACE at .sl_event
 			(ev_origin, ev_seqno, ev_timestamp,
 			ev_minxid, ev_maxxid, ev_xip,
@@ -1135,7 +1100,7 @@
 'FUNCTION failedNode2 (failed_node, backup_node, set_id, ev_seqno, ev_seqfake)
 
 On the node that has the highest sequence number of the failed node,
-fake the FAILED_NODE event.';
+fake the FAILOVER_SET event.';
 
 -- ----------------------------------------------------------------------
 -- FUNCTION failoverSet_int (failed_node, backup_node, set_id)
@@ -1183,6 +1148,15 @@
 		loop
 			perform @NAMESPACE at .alterTableForReplication(v_row.tab_id);
 		end loop;
+		insert into @NAMESPACE at .sl_event
+				(ev_origin, ev_seqno, ev_timestamp,
+				ev_minxid, ev_maxxid, ev_xip,
+				ev_type, ev_data1, ev_data2, ev_data3)
+				values 
+				(p_backup_node, "pg_catalog".nextval(''@NAMESPACE at .sl_event_seq''), CURRENT_TIMESTAMP,
+				''0'', ''0'', '''',
+				''ACCEPT_SET'', p_set_id::text,
+				p_failed_node::text, p_backup_node::text);
 	else
 		delete from @NAMESPACE at .sl_subscribe
 				where sub_set = p_set_id
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.86.2.5
retrieving revision 1.86.2.6
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.86.2.5 -r1.86.2.6
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -905,8 +905,8 @@
 				slon_log(SLON_DEBUG2, "got parms ACCEPT_SET\n");
 				
 			    /* If we're a remote node, and haven't yet
-			     * received the MOVE_SET event from the
-			     * new origin, then we'll need to sleep a
+			     * received the MOVE/FAILOVER_SET event from the
+			     * old origin, then we'll need to sleep a
 			     * bit...  This avoids a race condition
 			     * where new SYNCs take place on the new
 			     * origin, and are ignored on some
@@ -915,49 +915,50 @@
 			     * received and processed  */
 
 			    if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) {
-				    slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin - wait...\n");
-				    slon_mkquery(&query1, 
-						 "select 1 from %s.sl_event accept "
-						 "where "
-						 "   accept.ev_type = 'ACCEPT_SET' and "
-						 "   accept.ev_origin = %d and "
-						 "   accept.ev_data1 = %d and "
-						 "   accept.ev_data2 = %d and "
-						 "   accept.ev_data3 = %d and "
-						 "   not exists  "
-						 "   (select 1 from %s.sl_event move "
+				    slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin\n");
+				    slon_mkquery(&query2, 
+						 "select 1 from %s.sl_event "
 						 "    where "
-						 "      accept.ev_origin = move.ev_data3 and "
-						 "      move.ev_type = 'MOVE_SET' and "
-						 "      move.ev_data1 = accept.ev_data1 and "
-						 "      move.ev_data2 = accept.ev_data2 and "
-						 "      move.ev_data3 = accept.ev_data3); ",
+						 "     (ev_origin = %d and "
+						 "      ev_type = 'MOVE_SET' and "
+						 "      ev_data1 = '%d' and "
+						 "      ev_data2 = '%d' and "
+						 "      ev_data3 = '%d') "
+						 "or "
+						 "     (ev_origin = %d and "
+						 "      ev_type = 'FAILOVER_SET' and "
+						 "      ev_data1 = '%d' and "
+						 "      ev_data2 = '%d' and "
+						 "      ev_data3 = '%d'); ",
 						 
 						 rtcfg_namespace, 
 						 old_origin, set_id, old_origin, new_origin,
-						 rtcfg_namespace);
-				    res = PQexec(local_dbconn, dstring_data(&query1));
-				    while (PQntuples(res) > 0) {
-					    int sleeptime = 15;
-					    int sched_rc;
-					    slon_log(SLON_WARN, "remoteWorkerThread_%d: "
-					     "accept set: node has not yet received MOVE_SET event "
-						     "for set %d old origin %d new origin - sleep %d seconds\n",
-						     rtcfg_nodeid, set_id, old_origin, new_origin, sleeptime);
-					    sched_rc = sched_msleep(node, sleeptime * 1000);
-					    if (sched_rc != SCHED_STATUS_OK) {
-						    event_ok = false;
-						    break;
-					    } else {
-						    if (sleeptime < 60)
-							    sleeptime *= 2;
-					    }
-					    res = PQexec(local_dbconn, dstring_data(&query1));
+						 old_origin, old_origin, new_origin, set_id);
+
+				    res = PQexec(local_dbconn, dstring_data(&query2));
+					while (PQntuples(res) == 0)
+					{
+						slon_log(SLON_DEBUG2, "ACCEPT_SET - MOVE_SET or FAILOVER_SET not received yet - sleep\n");
+						if (sched_msleep(node, 10000) != SCHED_STATUS_OK)
+							slon_abort();
+						PQclear(res);
+						res = PQexec(local_dbconn, dstring_data(&query2));
 				    }
+					PQclear(res);
+					slon_log(SLON_DEBUG2, "ACCEPT_SET - MOVE_SET or FAILOVER_SET exists - done\n");
+
+					slon_appendquery(&query1,
+									 "notify \"_%s_Restart\"; ",
+									 rtcfg_cluster_name);
+					query_append_event(&query1, event);
+					slon_appendquery(&query1, "commit transaction;");
+					query_execute(node, local_dbconn, &query1);
+					slon_abort();
+
+					need_reloadListen = true;
 			    } else {
 				    slon_log(SLON_DEBUG2, "ACCEPT_SET - on origin node...\n");
 			    }
-			    slon_log(SLON_DEBUG2, "ACCEPT_SET - done...\n");
 
 			}
 			else if (strcmp(event->ev_type, "MOVE_SET") == 0)
Index: slonik.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.c,v
retrieving revision 1.42.2.1
retrieving revision 1.42.2.2
diff -Lsrc/slonik/slonik.c -Lsrc/slonik/slonik.c -u -w -r1.42.2.1 -r1.42.2.2
--- src/slonik/slonik.c
+++ src/slonik/slonik.c
@@ -2436,6 +2436,7 @@
 typedef struct
 {
 	int			set_id;
+	int			num_directsub;
 	int			num_subscribers;
 	failnode_node **subscribers;
 	failnode_node *max_node;
@@ -2507,8 +2508,7 @@
 				 "    and S.set_origin = %d "
 				 "    and SUB.sub_provider = %d "
 				 "    and SUB.sub_active "
-				 "    group by set_id "
-				 "    having count(S.set_id) > 1",
+				 "    group by set_id ",
 				 stmt->hdr.script->clustername,
 				 stmt->hdr.script->clustername,
 				 stmt->no_id, stmt->no_id);
@@ -2596,6 +2596,10 @@
 	for (i = 0; i < num_sets; i++)
 	{
 		setinfo[i].set_id = (int)strtol(PQgetvalue(res2, i, 0), NULL, 10);
+		setinfo[i].num_directsub = (int)strtol(PQgetvalue(res2, i, 1), NULL, 10);
+
+		if (setinfo[i].num_directsub <= 1)
+			continue;
 
 		slon_mkquery(&query,
 					 "select sub_receiver "
@@ -2772,6 +2776,34 @@
 		setinfo[i].max_node = NULL;
 		setinfo[i].max_seqno = 0;
 
+		if (setinfo[i].num_directsub <= 1)
+		{
+			int64		ev_seqno;
+
+			slon_mkquery(&query,
+					"select max(ev_seqno) "
+					"	from \"_%s\".sl_event "
+					"	where ev_origin = %d "
+					"	and ev_type = 'SYNC'; ",
+					stmt->hdr.script->clustername,
+					stmt->no_id);
+			res1 = db_exec_select((SlonikStmt *) stmt,
+					adminfo1, &query);
+			if (res1 == NULL)
+			{
+				free(configbuf);
+				dstring_free(&query);
+				return -1;
+			}
+			slon_scanint64(PQgetvalue(res1, 0, 0), &ev_seqno);
+
+			setinfo[i].max_seqno = ev_seqno;
+
+			PQclear(res1);
+
+			continue;
+		}
+
 		slon_mkquery(&query,
 					 "select ssy_seqno "
 					 "    from \"_%s\".sl_setsync "
@@ -2835,7 +2867,12 @@
 		int			use_node;
 		SlonikAdmInfo *use_adminfo;
 
-		if (setinfo[i].max_node == NULL)
+		if (setinfo[i].num_directsub <= 1)
+		{
+			use_node = stmt->backup_node;
+			use_adminfo = adminfo1;
+		}
+		else if (setinfo[i].max_node == NULL)
 		{
 			printf("no setsync status for set %d found at all\n",
 				   setinfo[i].set_id);


More information about the Slony1-commit mailing list