Fri Feb 24 12:02:41 PST 2006
- Previous message: [Slony1-commit] By cbbrowne: SQL had been modified; expected results should also be...
- Next message: [Slony1-commit] By cbbrowne: Point to new location of RPMs, apply <file> tag to several
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Implementation of "SWITCH LOG (id = <nodeid>)".
The slonik command SWITCH LOG will call the stored procedure
logswitch_start()
which will adjust the sl_log_status sequence or bail out with an
ERROR if the previous logswitch did not complete yet. The slon
cleanup thread will call the stored procedure
logswitch_finish()
which will check if the old log is logically empty (if log_status
is 2 or 3) and if so, truncate the old sl_log_? and adjust sl_log_status
to the final value completing the switch.
This patch also includes the completion of functionality in the log
trigger and the remote worker thread to actually use sl_log_2.
Jan
Modified Files:
--------------
slony1-engine/src/backend:
slony1_funcs.c (r1.38 -> r1.39)
slony1_funcs.sql (r1.76 -> r1.77)
slony1-engine/src/slon:
cleanup_thread.c (r1.29 -> r1.30)
remote_worker.c (r1.104 -> r1.105)
slony1-engine/src/slonik:
parser.y (r1.23 -> r1.24)
scan.l (r1.23 -> r1.24)
slonik.c (r1.55 -> r1.56)
slonik.h (r1.25 -> r1.26)
-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.76
retrieving revision 1.77
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.76 -r1.77
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -5090,6 +5090,133 @@
Updates the respective reloids in sl_table and sl_seqeunce based on
their respective FQN';
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logswitch_start()
+--
+-- Called by slonik to initiate a switch from sl_log_1 to sl_log_2 and
+-- visa versa.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .logswitch_start()
+returns int4 as '
+DECLARE
+ v_current_status int4;
+BEGIN
+ -- ----
+ -- Grab the central configuration lock to prevent race conditions
+ -- while changing the sl_log_status sequence value.
+ -- ----
+ lock table @NAMESPACE at .sl_config_lock;
+
+ -- ----
+ -- Get the current log status.
+ -- ----
+ select last_value into v_current_status from @NAMESPACE at .sl_log_status;
+
+ -- ----
+ -- status = 0: sl_log_1 active, sl_log_2 clean
+ -- Initiate a switch to sl_log_2.
+ -- ----
+ if v_current_status = 0 then
+ perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 3);
+ raise notice ''Logswitch to sl_log_2 initiated'';
+ return 2;
+ end if;
+
+ -- ----
+ -- status = 1: sl_log_2 active, sl_log_1 clean
+ -- Initiate a switch to sl_log_1.
+ -- ----
+ if v_current_status = 1 then
+ perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 2);
+ raise notice ''Logswitch to sl_log_1 initiated'';
+ return 1;
+ end if;
+
+ raise exception ''Previous logswitch still in progress'';
+END;
+' language plpgsql;
+
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logswitch_finish()
+--
+-- Called from the cleanup thread to eventually finish a logswitch
+-- that is in progress.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .logswitch_finish()
+returns int4 as '
+DECLARE
+ v_current_status int4;
+ v_dummy record;
+BEGIN
+ -- ----
+ -- Grab the central configuration lock to prevent race conditions
+ -- while changing the sl_log_status sequence value.
+ -- ----
+ lock table @NAMESPACE at .sl_config_lock;
+
+ -- ----
+ -- Get the current log status.
+ -- ----
+ select last_value into v_current_status from @NAMESPACE at .sl_log_status;
+
+ -- ----
+ -- status value 0 or 1 means that there is no log switch in progress
+ -- ----
+ if v_current_status = 0 or v_current_status = 1 then
+ return 0;
+ end if;
+
+ -- ----
+ -- status = 2: sl_log_1 active, cleanup sl_log_2
+ -- ----
+ if v_current_status = 2 then
+ -- ----
+ -- The cleanup thread calls us after it did the delete and
+ -- vacuum of both log tables. If sl_log_2 is empty now, we
+ -- can truncate it and the log switch is done.
+ -- ----
+ for v_dummy in select 1 from @NAMESPACE at .sl_log_2 loop
+ -- ----
+ -- Found a row ... log switch is still in progress.
+ -- ----
+ raise notice ''Slony-I: log switch to sl_log_1 still in progress - sl_log_2 not truncated'';
+ return -1;
+ end loop;
+
+ raise notice ''Slony-I: log switch to sl_log_1 complete - truncate sl_log_2'';
+ truncate @NAMESPACE at .sl_log_2;
+ perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 0);
+ return 1;
+ end if;
+
+ -- ----
+ -- status = 3: sl_log_2 active, cleanup sl_log_1
+ -- ----
+ if v_current_status = 3 then
+ -- ----
+ -- The cleanup thread calls us after it did the delete and
+ -- vacuum of both log tables. If sl_log_2 is empty now, we
+ -- can truncate it and the log switch is done.
+ -- ----
+ for v_dummy in select 1 from @NAMESPACE at .sl_log_1 loop
+ -- ----
+ -- Found a row ... log switch is still in progress.
+ -- ----
+ raise notice ''Slony-I: log switch to sl_log_2 still in progress - sl_log_1 not truncated'';
+ return -1;
+ end loop;
+
+ raise notice ''Slony-I: log switch to sl_log_2 complete - truncate sl_log_1'';
+ truncate @NAMESPACE at .sl_log_1;
+ perform "pg_catalog".setval(''@NAMESPACE at .sl_log_status'', 1);
+ return 2;
+ end if;
+END;
+' language plpgsql;
+
+
-- ----------------------------------------------------------------------
-- FUNCTION upgradeSchema(old_version)
-- upgrade sl_node
Index: slony1_funcs.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.c,v
retrieving revision 1.38
retrieving revision 1.39
diff -Lsrc/backend/slony1_funcs.c -Lsrc/backend/slony1_funcs.c -u -w -r1.38 -r1.39
--- src/backend/slony1_funcs.c
+++ src/backend/slony1_funcs.c
@@ -99,6 +99,7 @@
void *plan_insert_log_1;
void *plan_insert_log_2;
void *plan_record_sequences;
+ void *plan_get_logstatus;
text *cmdtype_I;
text *cmdtype_U;
@@ -451,10 +452,33 @@
*/
if (!TransactionIdEquals(cs->currentXid, newXid))
{
+ int32 log_status;
+
/*
* Determine the currently active log table
*/
- cs->plan_active_log = cs->plan_insert_log_1;
+ if(SPI_execp(cs->plan_get_logstatus, NULL, NULL, 0) < 0)
+ elog(ERROR, "Slony-I: cannot determine log status");
+ if (SPI_processed != 1)
+ elog(ERROR, "Slony-I: cannot determine log status");
+
+ log_status = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc, 1, NULL));
+ SPI_freetuptable(SPI_tuptable);
+
+ switch(log_status)
+ {
+ case 0:
+ case 2: cs->plan_active_log = cs->plan_insert_log_1;
+ break;
+
+ case 1:
+ case 3: cs->plan_active_log = cs->plan_insert_log_2;
+ break;
+
+ default: elog(ERROR, "Slony-I: illegal log status %d", log_status);
+ break;
+ }
cs->currentXid = newXid;
}
@@ -1425,6 +1449,10 @@
VARATT_SIZEP(cs->cmdtype_D) = VARHDRSZ + 1;
*VARDATA(cs->cmdtype_D) = 'D';
+ sprintf(query, "SELECT last_value::int4 FROM %s.sl_log_status",
+ cs->clusterident);
+ cs->plan_get_logstatus = SPI_saveplan(SPI_prepare(query, 0, NULL));
+
cs->cmddata_size = 8192;
cs->cmddata_buf = (text *) malloc(8192);
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.104
retrieving revision 1.105
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.104 -r1.105
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -4833,6 +4833,8 @@
struct timeval tv_start;
struct timeval tv_now;
int first_fetch;
+ int log_status;
+ int rc;
WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE];
int data_line_alloc;
@@ -4841,6 +4843,7 @@
PGresult *res;
PGresult *res2;
+ PGresult *res3;
int ntuples;
int tupno;
@@ -4901,11 +4904,48 @@
}
/*
+ * Get the current sl_log_status value
+ */
+ slon_mkquery(&query, "select last_value from %s.sl_log_status",
+ rtcfg_namespace);
+ res3 = PQexec(dbconn, dstring_data(&query));
+ rc = PQresultStatus(res3);
+ if (rc != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR,
+ "remoteWorkerThread_%d: \"%s\" %s %s",
+ node->no_id, dstring_data(&query),
+ PQresStatus(rc),
+ PQresultErrorMessage(res3));
+ PQclear(res3);
+ errors++;
+ break;
+ }
+ if (PQntuples(res3) != 1)
+ {
+ slon_log(SLON_ERROR,
+ "remoteWorkerThread_%d: \"%s\" %s returned %d tuples\n",
+ node->no_id, dstring_data(&query),
+ PQresStatus(rc), PQntuples(res3));
+ PQclear(res3);
+ errors++;
+ break;
+ }
+ log_status = strtol(PQgetvalue(res3, 0, 0), NULL, 10);
+ PQclear(res3);
+ slon_log(SLON_DEBUG2,
+ "remoteWorkerThread_%d: current log_status = %d\n",
+ node->no_id, log_status);
+
+ /*
* Open a cursor that reads the log data.
*
- * TODO: need to change this into a conditional sl_log_n selection
- * depending on the logstatus.
+ * Depending on sl_log_status select from sl_log_1,
+ * sl_log_2 or both.
*/
+ switch (log_status)
+ {
+ case 0:
slon_mkquery(&query,
"declare LOG cursor for select "
" log_origin, log_xid, log_tableid, "
@@ -4918,6 +4958,60 @@
sync_max_rowsize,
rtcfg_namespace,
dstring_data(&(provider->helper_qualification)));
+ break;
+
+ case 1:
+ slon_mkquery(&query,
+ "declare LOG cursor for select "
+ " log_origin, log_xid, log_tableid, "
+ " log_actionseq, log_cmdtype, "
+ " octet_length(log_cmddata), "
+ " case when octet_length(log_cmddata) <= %d "
+ " then log_cmddata "
+ " else null end "
+ "from %s.sl_log_2 %s order by log_actionseq; ",
+ sync_max_rowsize,
+ rtcfg_namespace,
+ dstring_data(&(provider->helper_qualification)));
+ break;
+
+ case 2:
+ case 3:
+ slon_mkquery(&query,
+ "declare LOG cursor for select * from ("
+ " select log_origin, log_xid, log_tableid, "
+ " log_actionseq, log_cmdtype, "
+ " octet_length(log_cmddata), "
+ " case when octet_length(log_cmddata) <= %d "
+ " then log_cmddata "
+ " else null end "
+ " from %s.sl_log_1 %s "
+ " union all "
+ " select log_origin, log_xid, log_tableid, "
+ " log_actionseq, log_cmdtype, "
+ " octet_length(log_cmddata), "
+ " case when octet_length(log_cmddata) <= %d "
+ " then log_cmddata "
+ " else null end "
+ " from %s.sl_log_2 %s) as log_union "
+ "order by log_actionseq; ",
+ sync_max_rowsize,
+ rtcfg_namespace,
+ dstring_data(&(provider->helper_qualification)),
+ sync_max_rowsize,
+ rtcfg_namespace,
+ dstring_data(&(provider->helper_qualification)));
+ break;
+
+ default:
+ slon_log(SLON_ERROR,
+ "remoteWorkerThread_%d: unexpected log_status %d\n",
+ node->no_id, log_status);
+ errors++;
+ break;
+ }
+ if (errors)
+ break;
gettimeofday(&tv_start, NULL);
first_fetch = true;
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.29
retrieving revision 1.30
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.29 -r1.30
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -123,7 +123,8 @@
* cluster will run into conflicts due to trying to vacuum pg_listener
* concurrently
*/
- while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, SLON_CLEANUP_SLEEP * 1000 + vac_bias + (rand() % (SLON_CLEANUP_SLEEP * 166))) == SCHED_STATUS_OK)
+ // while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, SLON_CLEANUP_SLEEP * 1000 + vac_bias + (rand() % (SLON_CLEANUP_SLEEP * 166))) == SCHED_STATUS_OK)
+ while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, 300 * 1000) == SCHED_STATUS_OK)
{
/*
* Call the stored procedure cleanupEvent()
@@ -146,7 +147,7 @@
TIMEVAL_DIFF(&tv_start, &tv_end));
/*
- * Clean up the logs
+ * Clean up the logs and eventually finish switching logs
*/
gettimeofday(&tv_start, NULL);
slon_mkquery(&query2,
@@ -180,15 +181,17 @@
"and log_xid < '%s'; "
"delete from %s.sl_seqlog "
"where seql_origin = '%s' "
- "and seql_ev_seqno < '%s'; ",
+ "and seql_ev_seqno < '%s'; "
+ "select %s.logswitch_finish(); ",
rtcfg_namespace, PQgetvalue(res, t, 0),
PQgetvalue(res, t, 2),
rtcfg_namespace, PQgetvalue(res, t, 0),
PQgetvalue(res, t, 2),
rtcfg_namespace, PQgetvalue(res, t, 0),
- PQgetvalue(res, t, 1));
+ PQgetvalue(res, t, 1),
+ rtcfg_namespace);
res2 = PQexec(dbconn, dstring_data(&query2));
- if (PQresultStatus(res2) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res2) != PGRES_TUPLES_OK)
{
slon_log(SLON_FATAL,
"cleanupThread: \"%s\" - %s",
Index: scan.l
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/scan.l,v
retrieving revision 1.23
retrieving revision 1.24
diff -Lsrc/slonik/scan.l -Lsrc/slonik/scan.l -u -w -r1.23 -r1.24
--- src/slonik/scan.l
+++ src/slonik/scan.l
@@ -97,6 +97,7 @@
key { return K_KEY; }
listen { return K_LISTEN; }
lock { return K_LOCK; }
+log { return K_LOG; }
merge { return K_MERGE; }
move { return K_MOVE; }
name { return K_NAME; }
@@ -123,6 +124,7 @@
store { return K_STORE; }
subscribe { return K_SUBSCRIBE; }
success { return K_SUCCESS; }
+switch { return K_SWITCH; }
table { return K_TABLE; }
timeout { return K_TIMEOUT; }
trigger { return K_TRIGGER; }
Index: slonik.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.h,v
retrieving revision 1.25
retrieving revision 1.26
diff -Lsrc/slonik/slonik.h -Lsrc/slonik/slonik.h -u -w -r1.25 -r1.26
--- src/slonik/slonik.h
+++ src/slonik/slonik.h
@@ -48,6 +48,7 @@
typedef struct SlonikStmt_ddl_script_s SlonikStmt_ddl_script;
typedef struct SlonikStmt_update_functions_s SlonikStmt_update_functions;
typedef struct SlonikStmt_wait_event_s SlonikStmt_wait_event;
+typedef struct SlonikStmt_switch_log_s SlonikStmt_switch_log;
typedef enum
{
@@ -85,6 +86,7 @@
STMT_UNSUBSCRIBE_SET,
STMT_UPDATE_FUNCTIONS,
STMT_WAIT_EVENT,
+ STMT_SWITCH_LOG,
STMT_ERROR
} Slonik_stmttype;
@@ -423,6 +425,13 @@
};
+struct SlonikStmt_switch_log_s
+{
+ SlonikStmt hdr;
+ int no_id;
+};
+
+
extern SlonikScript *parser_script;
@@ -544,6 +553,7 @@
extern int slonik_ddl_script(SlonikStmt_ddl_script * stmt);
extern int slonik_update_functions(SlonikStmt_update_functions * stmt);
extern int slonik_wait_event(SlonikStmt_wait_event * stmt);
+extern int slonik_switch_log(SlonikStmt_switch_log * stmt);
extern int slon_scanint64(char *str, int64 * result);
Index: parser.y
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/parser.y,v
retrieving revision 1.23
retrieving revision 1.24
diff -Lsrc/slonik/parser.y -Lsrc/slonik/parser.y -u -w -r1.23 -r1.24
--- src/slonik/parser.y
+++ src/slonik/parser.y
@@ -159,6 +159,7 @@
%type <statement> stmt_update_functions
%type <statement> stmt_repair_config
%type <statement> stmt_wait_event
+%type <statement> stmt_switch_log
%type <opt_list> option_list
%type <opt_list> option_list_item
%type <opt_list> option_list_items
@@ -201,6 +202,7 @@
%token K_KEY
%token K_LISTEN
%token K_LOCK
+%token K_LOG
%token K_MERGE
%token K_MOVE
%token K_NAME
@@ -227,6 +229,7 @@
%token K_STORE
%token K_SUBSCRIBE
%token K_SUCCESS
+%token K_SWITCH
%token K_TABLE
%token K_TIMEOUT
%token K_TRIGGER
@@ -475,6 +478,8 @@
{ $$ = $1; }
| stmt_wait_event
{ $$ = $1; }
+ | stmt_switch_log
+ { $$ = $1; }
| stmt_error ';'
{ yyerrok;
$$ = $1; }
@@ -1443,6 +1448,32 @@
}
;
+stmt_switch_log : lno K_SWITCH K_LOG option_list
+ {
+ SlonikStmt_switch_log *new;
+ statement_option opt[] = {
+ STMT_OPTION_INT( O_ID, -1 ),
+ STMT_OPTION_END
+ };
+
+ new = (SlonikStmt_switch_log *)
+ malloc(sizeof(SlonikStmt_switch_log));
+ memset(new, 0, sizeof(SlonikStmt_switch_log));
+ new->hdr.stmt_type = STMT_SWITCH_LOG;
+ new->hdr.stmt_filename = current_file;
+ new->hdr.stmt_lno = $1;
+
+ if (assign_options(opt, $4) == 0)
+ {
+ new->no_id = opt[0].ival;
+ }
+ else
+ parser_errors++;
+
+ $$ = (SlonikStmt *)new;
+ }
+ ;
+
option_list : ';'
{ $$ = NULL; }
| '(' option_list_items ')' ';'
Index: slonik.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.c,v
retrieving revision 1.55
retrieving revision 1.56
diff -Lsrc/slonik/slonik.c -Lsrc/slonik/slonik.c -u -w -r1.55 -r1.56
--- src/slonik/slonik.c
+++ src/slonik/slonik.c
@@ -1119,6 +1119,25 @@
}
break;
+ case STMT_SWITCH_LOG:
+ {
+ SlonikStmt_switch_log *stmt =
+ (SlonikStmt_switch_log *) hdr;
+
+ if (stmt->no_id == -1)
+ {
+ printf("%s:%d: Error: "
+ "node ID must be specified\n",
+ hdr->stmt_filename, hdr->stmt_lno);
+ errors++;
+ }
+
+ if (script_check_adminfo(hdr, stmt->no_id) < 0)
+ errors++;
+
+ }
+ break;
+
}
hdr = hdr->next;
@@ -1544,6 +1563,16 @@
}
break;
+ case STMT_SWITCH_LOG:
+ {
+ SlonikStmt_switch_log *stmt =
+ (SlonikStmt_switch_log *) hdr;
+
+ if (slonik_switch_log(stmt) < 0)
+ errors++;
+ }
+ break;
+
}
if (current_try_level == 0)
@@ -4071,6 +4100,35 @@
}
+int
+slonik_switch_log(SlonikStmt_switch_log * stmt)
+{
+ SlonikAdmInfo *adminfo1;
+ SlonDString query;
+
+ adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->no_id);
+ if (adminfo1 == NULL)
+ return -1;
+
+ if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
+ return -1;
+
+ dstring_init(&query);
+
+ slon_mkquery(&query,
+ "select \"_%s\".logswitch_start(); ",
+ stmt->hdr.script->clustername);
+ if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ {
+ dstring_free(&query);
+ return -1;
+ }
+
+ dstring_free(&query);
+ return 0;
+}
+
+
/*
* scanint8 --- try to parse a string into an int8.
*
- Previous message: [Slony1-commit] By cbbrowne: SQL had been modified; expected results should also be...
- Next message: [Slony1-commit] By cbbrowne: Point to new location of RPMs, apply <file> tag to several
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list