CVS User Account cvsuser
Fri Feb 24 12:02:41 PST 2006
Log Message:
-----------
Implementation of "SWITCH LOG (id = <nodeid>)".

The slonik command SWITCH LOG will call the stored procedure
    
	logswitch_start()

which will adjust the sl_log_status sequence or bail out with an
ERROR if the previous logswitch did not complete yet. The slon
cleanup thread will call the stored procedure

    logswitch_finish()

which will check if the old log is logically empty (if log_status
is 2 or 3) and if so, truncate the old sl_log_? and adjust sl_log_status
to the final value completing the switch. 

This patch also includes the completion of functionality in the log
trigger and the remote worker thread to actually use sl_log_2.

Jan

Modified Files:
--------------
    slony1-engine/src/backend:
        slony1_funcs.c (r1.38 -> r1.39)
        slony1_funcs.sql (r1.76 -> r1.77)
    slony1-engine/src/slon:
        cleanup_thread.c (r1.29 -> r1.30)
        remote_worker.c (r1.104 -> r1.105)
    slony1-engine/src/slonik:
        parser.y (r1.23 -> r1.24)
        scan.l (r1.23 -> r1.24)
        slonik.c (r1.55 -> r1.56)
        slonik.h (r1.25 -> r1.26)

-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.76
retrieving revision 1.77
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.76 -r1.77
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -5090,6 +5090,133 @@
 Updates the respective reloids in sl_table and sl_seqeunce based on
 their respective FQN';
 
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logswitch_start()
+--
+--	Called by slonik to initiate a switch from sl_log_1 to sl_log_2 and
+--  visa versa.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .logswitch_start()
+returns int4 as '
+DECLARE
+	v_current_status	int4;
+BEGIN
+	-- ----
+	-- Grab the central configuration lock to prevent race conditions
+	-- while changing the sl_log_status sequence value.
+	-- ----
+	lock table @NAMESPACE at .sl_config_lock;
+
+	-- ----
+	-- Get the current log status.
+	-- ----
+	select last_value into v_current_status from @NAMESPACE at .sl_log_status;
+
+	-- ----
+	-- status = 0: sl_log_1 active, sl_log_2 clean
+	-- Initiate a switch to sl_log_2.
+	-- ----
+	if v_current_status = 0 then
+		perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 3);
+		raise notice ''Logswitch to sl_log_2 initiated'';
+		return 2;
+	end if;
+
+	-- ----
+	-- status = 1: sl_log_2 active, sl_log_1 clean
+	-- Initiate a switch to sl_log_1.
+	-- ----
+	if v_current_status = 1 then
+		perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 2);
+		raise notice ''Logswitch to sl_log_1 initiated'';
+		return 1;
+	end if;
+
+	raise exception ''Previous logswitch still in progress'';
+END;
+' language plpgsql;
+
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logswitch_finish()
+--
+--	Called from the cleanup thread to eventually finish a logswitch
+--  that is in progress.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .logswitch_finish()
+returns int4 as '
+DECLARE
+	v_current_status	int4;
+	v_dummy				record;
+BEGIN
+	-- ----
+	-- Grab the central configuration lock to prevent race conditions
+	-- while changing the sl_log_status sequence value.
+	-- ----
+	lock table @NAMESPACE at .sl_config_lock;
+
+	-- ----
+	-- Get the current log status.
+	-- ----
+	select last_value into v_current_status from @NAMESPACE at .sl_log_status;
+
+	-- ----
+	-- status value 0 or 1 means that there is no log switch in progress
+	-- ----
+	if v_current_status = 0 or v_current_status = 1 then
+		return 0;
+	end if;
+
+	-- ----
+	-- status = 2: sl_log_1 active, cleanup sl_log_2
+	-- ----
+	if v_current_status = 2 then
+		-- ----
+		-- The cleanup thread calls us after it did the delete and
+		-- vacuum of both log tables. If sl_log_2 is empty now, we
+		-- can truncate it and the log switch is done.
+		-- ----
+		for v_dummy in select 1 from @NAMESPACE at .sl_log_2 loop
+			-- ----
+			-- Found a row ... log switch is still in progress.
+			-- ----
+			raise notice ''Slony-I: log switch to sl_log_1 still in progress - sl_log_2 not truncated'';
+			return -1;
+		end loop;
+
+		raise notice ''Slony-I: log switch to sl_log_1 complete - truncate sl_log_2'';
+		truncate @NAMESPACE at .sl_log_2;
+		perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 0);
+		return 1;
+	end if;
+
+	-- ----
+	-- status = 3: sl_log_2 active, cleanup sl_log_1
+	-- ----
+	if v_current_status = 3 then
+		-- ----
+		-- The cleanup thread calls us after it did the delete and
+		-- vacuum of both log tables. If sl_log_2 is empty now, we
+		-- can truncate it and the log switch is done.
+		-- ----
+		for v_dummy in select 1 from @NAMESPACE at .sl_log_1 loop
+			-- ----
+			-- Found a row ... log switch is still in progress.
+			-- ----
+			raise notice ''Slony-I: log switch to sl_log_2 still in progress - sl_log_1 not truncated'';
+			return -1;
+		end loop;
+
+		raise notice ''Slony-I: log switch to sl_log_2 complete - truncate sl_log_1'';
+		truncate @NAMESPACE at .sl_log_1;
+		perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 1);
+		return 2;
+	end if;
+END;
+' language plpgsql;
+
+
 -- ----------------------------------------------------------------------
 -- FUNCTION upgradeSchema(old_version)
         -- upgrade sl_node
Index: slony1_funcs.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.c,v
retrieving revision 1.38
retrieving revision 1.39
diff -Lsrc/backend/slony1_funcs.c -Lsrc/backend/slony1_funcs.c -u -w -r1.38 -r1.39
--- src/backend/slony1_funcs.c
+++ src/backend/slony1_funcs.c
@@ -99,6 +99,7 @@
 	void	   *plan_insert_log_1;
 	void	   *plan_insert_log_2;
 	void	   *plan_record_sequences;
+	void	   *plan_get_logstatus;
 
 	text	   *cmdtype_I;
 	text	   *cmdtype_U;
@@ -451,10 +452,33 @@
 	 */
 	if (!TransactionIdEquals(cs->currentXid, newXid))
 	{
+		int32	log_status;
+
 		/*
 		 * Determine the currently active log table
 		 */
-		cs->plan_active_log = cs->plan_insert_log_1;
+		if(SPI_execp(cs->plan_get_logstatus, NULL, NULL, 0) < 0)
+			elog(ERROR, "Slony-I: cannot determine log status");
+		if (SPI_processed != 1)
+			elog(ERROR, "Slony-I: cannot determine log status");
+		
+		log_status = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
+									SPI_tuptable->tupdesc, 1, NULL));
+		SPI_freetuptable(SPI_tuptable);
+
+		switch(log_status)
+		{
+			case 0:
+			case 2:		cs->plan_active_log = cs->plan_insert_log_1;
+						break;
+			
+			case 1:
+			case 3:		cs->plan_active_log = cs->plan_insert_log_2;
+						break;
+
+			default:	elog(ERROR, "Slony-I: illegal log status %d", log_status);
+						break;
+		}
 
 		cs->currentXid = newXid;
 	}
@@ -1425,6 +1449,10 @@
 		VARATT_SIZEP(cs->cmdtype_D) = VARHDRSZ + 1;
 		*VARDATA(cs->cmdtype_D) = 'D';
 
+		sprintf(query, "SELECT last_value::int4 FROM %s.sl_log_status",
+				cs->clusterident);
+		cs->plan_get_logstatus = SPI_saveplan(SPI_prepare(query, 0, NULL));
+
 		cs->cmddata_size = 8192;
 		cs->cmddata_buf = (text *) malloc(8192);
 
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.104
retrieving revision 1.105
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.104 -r1.105
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -4833,6 +4833,8 @@
 	struct timeval tv_start;
 	struct timeval tv_now;
 	int			first_fetch;
+	int			log_status;
+	int			rc;
 
 	WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE];
 	int			data_line_alloc;
@@ -4841,6 +4843,7 @@
 
 	PGresult   *res;
 	PGresult   *res2;
+	PGresult   *res3;
 	int			ntuples;
 	int			tupno;
 
@@ -4901,11 +4904,48 @@
 			}
 
 			/*
+			 * Get the current sl_log_status value
+			 */
+			slon_mkquery(&query, "select last_value from %s.sl_log_status",
+						rtcfg_namespace);
+			res3 = PQexec(dbconn, dstring_data(&query));
+			rc = PQresultStatus(res3);
+			if (rc != PGRES_TUPLES_OK)
+			{
+				slon_log(SLON_ERROR,
+						 "remoteWorkerThread_%d: \"%s\" %s %s",
+						 node->no_id, dstring_data(&query),
+						 PQresStatus(rc),
+						 PQresultErrorMessage(res3));
+				PQclear(res3);
+				errors++;
+				break;
+			}
+			if (PQntuples(res3) != 1)
+			{
+				slon_log(SLON_ERROR,
+						 "remoteWorkerThread_%d: \"%s\" %s returned %d tuples\n",
+						 node->no_id, dstring_data(&query),
+						 PQresStatus(rc), PQntuples(res3));
+				PQclear(res3);
+				errors++;
+				break;
+			}
+			log_status = strtol(PQgetvalue(res3, 0, 0), NULL, 10);
+			PQclear(res3);
+			slon_log(SLON_DEBUG2,
+					 "remoteWorkerThread_%d: current log_status = %d\n", 
+					 node->no_id, log_status);
+					
+			/*
 			 * Open a cursor that reads the log data.
 			 *
-			 * TODO: need to change this into a conditional sl_log_n selection
-			 * depending on the logstatus.
+			 *	Depending on sl_log_status select from sl_log_1,
+			 *	sl_log_2 or both.
 			 */
+			switch (log_status)
+			{
+				case 0:
 			slon_mkquery(&query,
 						 "declare LOG cursor for select "
 						 "    log_origin, log_xid, log_tableid, "
@@ -4918,6 +4958,60 @@
 						 sync_max_rowsize,
 						 rtcfg_namespace,
 						 dstring_data(&(provider->helper_qualification)));
+					break;
+
+				case 1:
+					slon_mkquery(&query,
+						 "declare LOG cursor for select "
+						 "    log_origin, log_xid, log_tableid, "
+						 "    log_actionseq, log_cmdtype, "
+						 "    octet_length(log_cmddata), "
+						 "    case when octet_length(log_cmddata) <= %d "
+						 "        then log_cmddata "
+						 "        else null end "
+						 "from %s.sl_log_2 %s order by log_actionseq; ",
+						 sync_max_rowsize,
+						 rtcfg_namespace,
+						 dstring_data(&(provider->helper_qualification)));
+					break;
+
+				case 2:
+				case 3:
+					slon_mkquery(&query,
+						 "declare LOG cursor for select * from ("
+						 "  select log_origin, log_xid, log_tableid, "
+						 "    log_actionseq, log_cmdtype, "
+						 "    octet_length(log_cmddata), "
+						 "    case when octet_length(log_cmddata) <= %d "
+						 "        then log_cmddata "
+						 "        else null end "
+						 "  from %s.sl_log_1 %s "
+						 "  union all "
+						 "  select log_origin, log_xid, log_tableid, "
+						 "    log_actionseq, log_cmdtype, "
+						 "    octet_length(log_cmddata), "
+						 "    case when octet_length(log_cmddata) <= %d "
+						 "        then log_cmddata "
+						 "        else null end "
+						 "  from %s.sl_log_2 %s) as log_union "
+						 "order by log_actionseq; ",
+						 sync_max_rowsize,
+						 rtcfg_namespace,
+						 dstring_data(&(provider->helper_qualification)),
+						 sync_max_rowsize,
+						 rtcfg_namespace,
+						 dstring_data(&(provider->helper_qualification)));
+					break;
+
+				default:
+					slon_log(SLON_ERROR,
+						 "remoteWorkerThread_%d: unexpected log_status %d\n",
+						 node->no_id, log_status);
+					errors++;
+					break;
+			}
+			if (errors)
+				break;
 
 			gettimeofday(&tv_start, NULL);
 			first_fetch = true;
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.29
retrieving revision 1.30
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.29 -r1.30
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -123,7 +123,8 @@
 	 * cluster will run into conflicts due to trying to vacuum pg_listener
 	 * concurrently
 	 */
-	while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, SLON_CLEANUP_SLEEP * 1000 + vac_bias + (rand() % (SLON_CLEANUP_SLEEP * 166))) == SCHED_STATUS_OK)
+	// while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, SLON_CLEANUP_SLEEP * 1000 + vac_bias + (rand() % (SLON_CLEANUP_SLEEP * 166))) == SCHED_STATUS_OK)
+	while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, 300 * 1000) == SCHED_STATUS_OK)
 	{
 		/*
 		 * Call the stored procedure cleanupEvent()
@@ -146,7 +147,7 @@
 				 TIMEVAL_DIFF(&tv_start, &tv_end));
 
 		/*
-		 * Clean up the logs
+		 * Clean up the logs and eventually finish switching logs
 		 */
 		gettimeofday(&tv_start, NULL);
 		slon_mkquery(&query2,
@@ -180,15 +181,17 @@
 						 "and log_xid < '%s'; "
 						 "delete from %s.sl_seqlog "
 						 "where seql_origin = '%s' "
-						 "and seql_ev_seqno < '%s'; ",
+						 "and seql_ev_seqno < '%s'; "
+						 "select %s.logswitch_finish(); ",
 						 rtcfg_namespace, PQgetvalue(res, t, 0),
 						 PQgetvalue(res, t, 2),
 						 rtcfg_namespace, PQgetvalue(res, t, 0),
 						 PQgetvalue(res, t, 2),
 						 rtcfg_namespace, PQgetvalue(res, t, 0),
-						 PQgetvalue(res, t, 1));
+						 PQgetvalue(res, t, 1),
+						 rtcfg_namespace);
 			res2 = PQexec(dbconn, dstring_data(&query2));
-			if (PQresultStatus(res2) != PGRES_COMMAND_OK)
+			if (PQresultStatus(res2) != PGRES_TUPLES_OK)
 			{
 				slon_log(SLON_FATAL,
 						 "cleanupThread: \"%s\" - %s",
Index: scan.l
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/scan.l,v
retrieving revision 1.23
retrieving revision 1.24
diff -Lsrc/slonik/scan.l -Lsrc/slonik/scan.l -u -w -r1.23 -r1.24
--- src/slonik/scan.l
+++ src/slonik/scan.l
@@ -97,6 +97,7 @@
 key			{ return K_KEY;				}
 listen			{ return K_LISTEN;			}
 lock			{ return K_LOCK;			}
+log				{ return K_LOG;				}
 merge			{ return K_MERGE;			}
 move			{ return K_MOVE;			}
 name			{ return K_NAME;			}
@@ -123,6 +124,7 @@
 store			{ return K_STORE;			}
 subscribe		{ return K_SUBSCRIBE;			}
 success			{ return K_SUCCESS;			}
+switch			{ return K_SWITCH;			}
 table			{ return K_TABLE;			}
 timeout			{ return K_TIMEOUT;			}
 trigger			{ return K_TRIGGER;			}
Index: slonik.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.h,v
retrieving revision 1.25
retrieving revision 1.26
diff -Lsrc/slonik/slonik.h -Lsrc/slonik/slonik.h -u -w -r1.25 -r1.26
--- src/slonik/slonik.h
+++ src/slonik/slonik.h
@@ -48,6 +48,7 @@
 typedef struct SlonikStmt_ddl_script_s SlonikStmt_ddl_script;
 typedef struct SlonikStmt_update_functions_s SlonikStmt_update_functions;
 typedef struct SlonikStmt_wait_event_s SlonikStmt_wait_event;
+typedef struct SlonikStmt_switch_log_s SlonikStmt_switch_log;
 
 typedef enum
 {
@@ -85,6 +86,7 @@
 	STMT_UNSUBSCRIBE_SET,
 	STMT_UPDATE_FUNCTIONS,
 	STMT_WAIT_EVENT,
+	STMT_SWITCH_LOG,
 	STMT_ERROR
 }	Slonik_stmttype;
 
@@ -423,6 +425,13 @@
 };
 
 
+struct SlonikStmt_switch_log_s
+{
+	SlonikStmt	hdr;
+	int			no_id;
+};
+
+
 
 
 extern SlonikScript *parser_script;
@@ -544,6 +553,7 @@
 extern int	slonik_ddl_script(SlonikStmt_ddl_script * stmt);
 extern int	slonik_update_functions(SlonikStmt_update_functions * stmt);
 extern int	slonik_wait_event(SlonikStmt_wait_event * stmt);
+extern int	slonik_switch_log(SlonikStmt_switch_log * stmt);
 
 extern int	slon_scanint64(char *str, int64 * result);
 
Index: parser.y
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/parser.y,v
retrieving revision 1.23
retrieving revision 1.24
diff -Lsrc/slonik/parser.y -Lsrc/slonik/parser.y -u -w -r1.23 -r1.24
--- src/slonik/parser.y
+++ src/slonik/parser.y
@@ -159,6 +159,7 @@
 %type <statement>	stmt_update_functions
 %type <statement>	stmt_repair_config
 %type <statement>	stmt_wait_event
+%type <statement>	stmt_switch_log
 %type <opt_list>	option_list
 %type <opt_list>	option_list_item
 %type <opt_list>	option_list_items
@@ -201,6 +202,7 @@
 %token	K_KEY
 %token	K_LISTEN
 %token	K_LOCK
+%token	K_LOG
 %token	K_MERGE
 %token	K_MOVE
 %token	K_NAME
@@ -227,6 +229,7 @@
 %token	K_STORE
 %token	K_SUBSCRIBE
 %token	K_SUCCESS
+%token	K_SWITCH
 %token	K_TABLE
 %token	K_TIMEOUT
 %token	K_TRIGGER
@@ -475,6 +478,8 @@
 						{ $$ = $1; }
 					| stmt_wait_event
 						{ $$ = $1; }
+					| stmt_switch_log
+						{ $$ = $1; }
 					| stmt_error ';' 
 						{ yyerrok;
 						  $$ = $1; }
@@ -1443,6 +1448,32 @@
 					}
 					;
 
+stmt_switch_log		: lno K_SWITCH K_LOG option_list
+					{
+						SlonikStmt_switch_log *new;
+						statement_option opt[] = {
+							STMT_OPTION_INT( O_ID, -1 ),
+							STMT_OPTION_END
+						};
+
+						new = (SlonikStmt_switch_log *)
+								malloc(sizeof(SlonikStmt_switch_log));
+						memset(new, 0, sizeof(SlonikStmt_switch_log));
+						new->hdr.stmt_type		= STMT_SWITCH_LOG;
+						new->hdr.stmt_filename	= current_file;
+						new->hdr.stmt_lno		= $1;
+
+						if (assign_options(opt, $4) == 0)
+						{
+							new->no_id			= opt[0].ival;
+						}
+						else
+							parser_errors++;
+
+						$$ = (SlonikStmt *)new;
+					}
+					;
+
 option_list			: ';'
 					{ $$ = NULL; }
 					| '(' option_list_items ')' ';'
Index: slonik.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.c,v
retrieving revision 1.55
retrieving revision 1.56
diff -Lsrc/slonik/slonik.c -Lsrc/slonik/slonik.c -u -w -r1.55 -r1.56
--- src/slonik/slonik.c
+++ src/slonik/slonik.c
@@ -1119,6 +1119,25 @@
 				}
 				break;
 
+			case STMT_SWITCH_LOG:
+				{
+					SlonikStmt_switch_log *stmt =
+					(SlonikStmt_switch_log *) hdr;
+
+					if (stmt->no_id == -1)
+					{
+						printf("%s:%d: Error: "
+							   "node ID must be specified\n",
+							   hdr->stmt_filename, hdr->stmt_lno);
+						errors++;
+					}
+
+					if (script_check_adminfo(hdr, stmt->no_id) < 0)
+						errors++;
+
+				}
+				break;
+
 		}
 
 		hdr = hdr->next;
@@ -1544,6 +1563,16 @@
 				}
 				break;
 
+			case STMT_SWITCH_LOG:
+				{
+					SlonikStmt_switch_log *stmt =
+					(SlonikStmt_switch_log *) hdr;
+
+					if (slonik_switch_log(stmt) < 0)
+						errors++;
+				}
+				break;
+
 		}
 
 		if (current_try_level == 0)
@@ -4071,6 +4100,35 @@
 }
 
 
+int
+slonik_switch_log(SlonikStmt_switch_log * stmt)
+{
+	SlonikAdmInfo *adminfo1;
+	SlonDString query;
+
+	adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->no_id);
+	if (adminfo1 == NULL)
+		return -1;
+
+	if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
+		return -1;
+
+	dstring_init(&query);
+
+	slon_mkquery(&query,
+				 "select \"_%s\".logswitch_start(); ",
+				 stmt->hdr.script->clustername);
+	if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0)
+	{
+		dstring_free(&query);
+		return -1;
+	}
+
+	dstring_free(&query);
+	return 0;
+}
+
+
 /*
  * scanint8 --- try to parse a string into an int8.
  *



More information about the Slony1-commit mailing list