Wed Nov 9 16:50:43 PST 2005
- Previous message: [Slony1-commit] By wieck: Use a new table sl_nodelock to guard against concurrent slon
- Next message: [Slony1-commit] By cbbrowne: Note (per Vivek Khera) that LOCK SET waits for old
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Use a new table sl_nodelock to guard against concurrent slon daemons
for the same node as well as registering all node connections for
the terminateNodeConnections() function.
Jan
Modified Files:
--------------
slony1-engine/src/backend:
slony1_base.sql (r1.28 -> r1.29)
slony1_funcs.c (r1.34 -> r1.35)
slony1_funcs.sql (r1.71 -> r1.72)
slony1-engine/src/slon:
local_listen.c (r1.32 -> r1.33)
remote_listen.c (r1.22 -> r1.23)
remote_worker.c (r1.95 -> r1.96)
-------------- next part --------------
Index: slony1_base.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_base.sql,v
retrieving revision 1.28
retrieving revision 1.29
diff -Lsrc/backend/slony1_base.sql -Lsrc/backend/slony1_base.sql -u -w -r1.28 -r1.29
--- src/backend/slony1_base.sql
+++ src/backend/slony1_base.sql
@@ -35,6 +35,23 @@
-- ----------------------------------------------------------------------
+-- TABLE sl_nodelock
+-- ----------------------------------------------------------------------
+create table @NAMESPACE at .sl_nodelock (
+ nl_nodeid int4,
+ nl_conncnt serial,
+ nl_backendpid int4,
+
+ CONSTRAINT "sl_nodelock-pkey"
+ PRIMARY KEY (nl_nodeid, nl_conncnt)
+);
+comment on table @NAMESPACE at .sl_nodelock is 'Used to prevent multiple slon instances and to identify the backends to kill in terminateNodeConnections().';
+comment on column @NAMESPACE at .sl_nodelock.nl_nodeid is 'Clients node_id';
+comment on column @NAMESPACE at .sl_nodelock.nl_conncnt is 'Clients connection number';
+comment on column @NAMESPACE at .sl_nodelock.nl_backendpid is 'PID of database backend owning this lock';
+
+
+-- ----------------------------------------------------------------------
-- TABLE sl_set
-- ----------------------------------------------------------------------
create table @NAMESPACE at .sl_set (
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.71
retrieving revision 1.72
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.71 -r1.72
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -233,28 +233,44 @@
grant execute on function @NAMESPACE at .logTrigger () to public;
-- ----------------------------------------------------------------------
--- FUNCTION terminateNodeConnections (name)
+-- FUNCTION terminateNodeConnections (failed_node)
--
--
-- ----------------------------------------------------------------------
-create or replace function @NAMESPACE at .terminateNodeConnections (name) returns int4
- as '$libdir/slony1_funcs', '_Slony_I_terminateNodeConnections'
- language C;
+create or replace function @NAMESPACE at .terminateNodeConnections (int4) returns int4
+as '
+declare
+ p_failed_node alias for $1;
+ v_row record;
+begin
+ for v_row in select nl_nodeid, nl_conncnt,
+ nl_backendpid from @NAMESPACE at .sl_nodelock
+ where nl_nodeid = p_failed_node for update
+ loop
+ perform @NAMESPACE at .killBackend(v_row.nl_backendpid, 15);
+ delete from @NAMESPACE at .sl_nodelock
+ where nl_nodeid = v_row.nl_nodeid
+ and nl_conncnt = v_row.nl_conncnt;
+ end loop;
-comment on function @NAMESPACE at .terminateNodeConnections (name) is
- 'terminates connections to the node and terminates the process';
+ return 0;
+end;
+' language plpgsql;
+
+comment on function @NAMESPACE at .terminateNodeConnections (int4) is
+ 'terminates all backends that have registered to be from the given node';
-- ----------------------------------------------------------------------
--- FUNCTION cleanupListener ()
+-- FUNCTION killBackend (pid, signo)
--
--
-- ----------------------------------------------------------------------
-create or replace function @NAMESPACE at .cleanupListener () returns int4
- as '$libdir/slony1_funcs', '_Slony_I_cleanupListener'
+create or replace function @NAMESPACE at .killBackend (int4, int4) returns int4
+ as '$libdir/slony1_funcs', '_Slony_I_killBackend'
language C;
-comment on function @NAMESPACE at .cleanupListener() is
- 'look for stale pg_listener entries and submit Async_Unlisten() to them';
+comment on function @NAMESPACE at .killBackend(int4, int4) is
+ 'Send a signal to a postgres process. Requires superuser rights';
-- ----------------------------------------------------------------------
-- FUNCTION slon_quote_brute(text)
@@ -435,6 +451,56 @@
-- ----------------------------------------------------------------------
+-- FUNCTION cleanupNodelock ()
+--
+-- Remove old entries from the nodelock table
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .cleanupNodelock ()
+returns int4
+as '
+declare
+ v_row record;
+begin
+ for v_row in select nl_nodeid, nl_conncnt, nl_backendpid
+ from @NAMESPACE at .sl_nodelock
+ for update
+ loop
+ if @NAMESPACE at .killBackend(v_row.nl_backendpid, 0) < 0 then
+ raise notice ''Slony-I: cleanup stale sl_nodelock entry for pid=%'',
+ v_row.nl_backendpid;
+ delete from @NAMESPACE at .sl_nodelock where
+ nl_nodeid = v_row.nl_nodeid and
+ nl_conncnt = v_row.nl_conncnt;
+ end if;
+ end loop;
+
+ return 0;
+end;
+' language plpgsql;
+
+
+-- ----------------------------------------------------------------------
+-- FUNCTION registerNodeConnection (nodeid)
+--
+--
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .registerNodeConnection (int4)
+returns int4
+as '
+declare
+ p_nodeid alias for $1;
+begin
+ insert into @NAMESPACE at .sl_nodelock
+ (nl_nodeid, nl_backendpid)
+ values
+ (p_nodeid, pg_backend_pid());
+
+ return 0;
+end;
+' language plpgsql;
+
+
+-- ----------------------------------------------------------------------
-- FUNCTION initializeLocalNode (no_id, no_comment)
--
-- Initializes a new node.
@@ -937,8 +1003,7 @@
-- ----
-- Terminate all connections of the failed node the hard way
-- ----
- perform @NAMESPACE at .terminateNodeConnections(
- ''_ at CLUSTERNAME@_Node_'' || p_failed_node);
+ perform @NAMESPACE at .terminateNodeConnections(p_failed_node);
-- ----
-- Move the sets
@@ -4227,6 +4292,11 @@
end if;
end loop;
+ -- ----
+ -- Also remove stale entries from the nodelock table.
+ -- ----
+ perform @NAMESPACE at .cleanupNodelock();
+
return 0;
end;
' language plpgsql;
@@ -5105,6 +5175,25 @@
execute ''alter table @NAMESPACE at .sl_node add column no_spool boolean'';
update @NAMESPACE at .sl_node set no_spool = false;
end if;
+
+ -- ----
+ -- Changes for 1.1.3
+ -- ----
+ if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'', ''1.1.0'', ''1.1.1'', ''1.1.2'') then
+ -- Add new table sl_nodelock
+ execute ''create table @NAMESPACE at .sl_nodelock (
+ nl_nodeid int4,
+ nl_conncnt serial,
+ nl_backendpid int4,
+
+ CONSTRAINT "sl_nodelock-pkey"
+ PRIMARY KEY (nl_nodeid, nl_conncnt)
+ )'';
+ -- Drop obsolete functions
+ execute ''drop function @NAMESPACE at .terminateNodeConnections(name)'';
+ execute ''drop function @NAMESPACE at .cleanupListener()'';
+ end if;
+
return p_old;
end;
' language plpgsql;
Index: slony1_funcs.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.c,v
retrieving revision 1.34
retrieving revision 1.35
diff -Lsrc/backend/slony1_funcs.c -Lsrc/backend/slony1_funcs.c -u -w -r1.34 -r1.35
--- src/backend/slony1_funcs.c
+++ src/backend/slony1_funcs.c
@@ -44,8 +44,7 @@
PG_FUNCTION_INFO_V1(_Slony_I_logTrigger);
PG_FUNCTION_INFO_V1(_Slony_I_denyAccess);
PG_FUNCTION_INFO_V1(_Slony_I_lockedSet);
-PG_FUNCTION_INFO_V1(_Slony_I_terminateNodeConnections);
-PG_FUNCTION_INFO_V1(_Slony_I_cleanupListener);
+PG_FUNCTION_INFO_V1(_Slony_I_killBackend);
PG_FUNCTION_INFO_V1(_slon_quote_ident);
@@ -59,8 +58,7 @@
Datum _Slony_I_logTrigger(PG_FUNCTION_ARGS);
Datum _Slony_I_denyAccess(PG_FUNCTION_ARGS);
Datum _Slony_I_lockedSet(PG_FUNCTION_ARGS);
-Datum _Slony_I_terminateNodeConnections(PG_FUNCTION_ARGS);
-Datum _Slony_I_cleanupListener(PG_FUNCTION_ARGS);
+Datum _Slony_I_killBackend(PG_FUNCTION_ARGS);
Datum _slon_quote_ident(PG_FUNCTION_ARGS);
@@ -1000,92 +998,21 @@
Datum
-_Slony_I_terminateNodeConnections(PG_FUNCTION_ARGS)
+_Slony_I_killBackend(PG_FUNCTION_ARGS)
{
- Name relname = PG_GETARG_NAME(0);
- void *plan;
- Oid argtypes[1];
- Datum args [1];
- int i;
int32 pid;
- bool isnull;
-
- if (SPI_connect() < 0)
- elog(ERROR, "Slony-I: SPI_connect() failed in terminateNodeConnections()");
-
- argtypes[0] = NAMEOID;
- plan = SPI_prepare("select listenerpid "
- " from \"pg_catalog\".pg_listener "
- " where relname = $1; ",
- 1, argtypes);
- if (plan == NULL)
- elog(ERROR, "Slony-I: SPI_prepare() failed in terminateNodeConnections()");
-
- args[0] = NameGetDatum(relname);
- if (SPI_execp(plan, args, NULL, 0) != SPI_OK_SELECT)
- elog(ERROR, "Slony-I: SPI_execp() failed in terminateNodeConnections()");
-
- for (i = 0; i < SPI_processed; i++)
- {
- pid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[i],
- SPI_tuptable->tupdesc, 1, &isnull));
- elog(NOTICE, "Slony-I: terminating DB connection of failed node "
- "with pid %d", pid);
- kill(pid, SIGTERM);
- }
-
- SPI_finish();
-
- return (Datum)0;
-}
-
+ int32 signo;
-Datum
-_Slony_I_cleanupListener(PG_FUNCTION_ARGS)
-{
- static void *plan = NULL;
- int i;
- int32 pid;
- char *relname;
- bool isnull;
-
-
- if (SPI_connect() < 0)
- elog(ERROR, "Slony-I: SPI_connect() failed in cleanupListener()");
-
- if (plan == NULL)
- {
- plan = SPI_saveplan(SPI_prepare("select relname, listenerpid "
- " from \"pg_catalog\".pg_listener; ",
- 0, NULL));
- if (plan == NULL)
- elog(ERROR, "Slony-I: SPI_prepare() failed in cleanupListener()");
- }
-
- if (SPI_execp(plan, NULL, NULL, 0) != SPI_OK_SELECT)
- elog(ERROR, "Slony-I: SPI_execp() failed in cleanupListener()");
-
- for (i = 0; i < SPI_processed; i++)
- {
- pid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[i],
- SPI_tuptable->tupdesc, 2, &isnull));
- if (kill(pid, 0) < 0)
- {
- if (errno == ESRCH)
- {
- relname = SPI_getvalue(SPI_tuptable->vals[i],
- SPI_tuptable->tupdesc, 1);
+ if (!superuser())
+ elog(ERROR, "Slony-I: insufficient privilege for killBackend");
- elog(NOTICE, "Slony-I: removing stale pg_listener entry "
- "for pid %d, relname %s", pid, relname);
- Async_Unlisten(relname, pid);
- }
- }
- }
+ pid = PG_GETARG_INT32(0);
+ signo = PG_GETARG_INT32(1);
- SPI_finish();
+ if (kill(pid, signo) < 0)
+ PG_RETURN_INT32(-1);
- return (Datum)0;
+ PG_RETURN_INT32(0);
}
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.95
retrieving revision 1.96
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.95 -r1.96
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -318,14 +318,11 @@
local_dbconn = local_conn->dbconn;
/*
- * Put the connection into replication mode and listen on the special
- * relation telling what node daemon this connection belongs to.
+ * Put the connection into replication mode
*/
slon_mkquery(&query1,
- "select %s.setSessionRole('_%s', 'slon'); "
- "listen \"_%s_Node_%d\"; ",
- rtcfg_namespace, rtcfg_cluster_name,
- rtcfg_cluster_name, rtcfg_nodeid);
+ "select %s.setSessionRole('_%s', 'slon'); ",
+ rtcfg_namespace, rtcfg_cluster_name);
if (query_execute(node, local_dbconn, &query1) < 0)
slon_abort();
@@ -2500,12 +2497,11 @@
}
}
/*
- * Listen on the special relation telling what node daemon this connection
- * belongs to.
+ * Register this connection in sl_nodelock
*/
slon_mkquery(&query1,
- "listen \"_%s_Node_%d\"; ",
- rtcfg_cluster_name, rtcfg_nodeid);
+ "select %s.registerNodeConnection(%d); ",
+ rtcfg_namespace, rtcfg_nodeid);
if (query_execute(node, pro_dbconn, &query1) < 0)
{
slon_disconnectdb(pro_conn);
@@ -3897,8 +3893,8 @@
* Listen on the special relation telling our node relationship
*/
slon_mkquery(&query,
- "listen \"_%s_Node_%d\"; ",
- rtcfg_cluster_name, rtcfg_nodeid);
+ "select %s.registerNodeConnection(%d); ",
+ rtcfg_namespace, rtcfg_nodeid);
if (query_execute(node, provider->conn->dbconn, &query) < 0)
{
TERMINATE_QUERY_AND_ARCHIVE;
@@ -4522,19 +4518,6 @@
}
}
PQclear(res1);
-
- /*
- * Start listening on the special relation that will cause our local
- * connection to be killed when the provider node fails.
- */
- slon_mkquery(&query,
- "listen \"_%s_Node_%d\"; ",
- rtcfg_cluster_name, provider->no_id);
- if (query_execute(node, local_dbconn, &query) < 0)
- {
- dstring_free(&query);
- return 60;
- }
}
/*
@@ -4580,21 +4563,6 @@
}
PQclear(res1);
}
- for (provider = wd->provider_head; provider; provider = provider->next)
- {
- /*
- * Stop listening on the special relations that will cause our local
- * connection to be killed when the provider node fails.
- */
- slon_mkquery(&query,
- "unlisten \"_%s_Node_%d\"; ",
- rtcfg_cluster_name, provider->no_id);
- if (query_execute(node, local_dbconn, &query) < 0)
- {
- TERMINATE_QUERY_AND_ARCHIVE;
- return 60;
- }
- }
/*
* Get the nodes rowid sequence at that sync time just in case we are
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.22
retrieving revision 1.23
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.22 -r1.23
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -224,15 +224,16 @@
/*
* Listen on the connection for events and confirmations
+ * and register the node connection.
*/
slon_mkquery(&query1,
"listen \"_%s_Event\"; "
"listen \"_%s_Confirm\"; "
- "listen \"_%s_Node_%d\"; ",
+ "select %s.registerNodeConnection(%d); ",
rtcfg_cluster_name, rtcfg_cluster_name,
- rtcfg_cluster_name, rtcfg_nodeid);
+ rtcfg_namespace, rtcfg_nodeid);
res = PQexec(dbconn, dstring_data(&query1));
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR,
"remoteListenThread_%d: \"%s\" - %s",
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.32
retrieving revision 1.33
diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.32 -r1.33
--- src/slon/local_listen.c
+++ src/slon/local_listen.c
@@ -69,10 +69,8 @@
* Listen for local events
*/
slon_mkquery(&query1,
- "select %s.cleanupListener(); "
"listen \"_%s_Event\"; "
"listen \"_%s_Restart\"; ",
- rtcfg_namespace,
rtcfg_cluster_name, rtcfg_cluster_name);
res = PQexec(dbconn, dstring_data(&query1));
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -90,11 +88,13 @@
* Check that we are the only slon daemon connected.
*/
slon_mkquery(&query1,
- "select true from \"pg_catalog\".pg_listener "
- " where relname = '_%s_Restart';",
- rtcfg_cluster_name, rtcfg_cluster_name);
+ "select %s.cleanupNodelock(); "
+ "insert into %s.sl_nodelock values ("
+ " %d, 0, \"pg_catalog\".pg_backend_pid()); ",
+ rtcfg_namespace, rtcfg_namespace,
+ rtcfg_nodeid);
res = PQexec(dbconn, dstring_data(&query1));
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
slon_log(SLON_FATAL,
"localListenThread: \"%s\" - %s",
@@ -103,14 +103,6 @@
dstring_free(&query1);
slon_abort();
}
- if (PQntuples(res) != 1)
- {
- slon_log(SLON_FATAL,
- "localListenThread: Another slon daemon is serving this node already\n");
- PQclear(res);
- dstring_free(&query1);
- slon_abort();
- }
PQclear(res);
/*
- Previous message: [Slony1-commit] By wieck: Use a new table sl_nodelock to guard against concurrent slon
- Next message: [Slony1-commit] By cbbrowne: Note (per Vivek Khera) that LOCK SET waits for old
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list