Tue Apr 3 14:55:06 PDT 2007
- Previous message: [Slony1-commit] slony1-engine RELEASE-1.2.10 configure
- Next message: [Slony1-commit] slony1-engine/src/slonik slonik.c
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv3668/src/slon
Modified Files:
Tag: REL_1_2_STABLE
remote_worker.c
Log Message:
Update to DDL handling - when a script is specified with "EXECUTE ONLY ON"
a specific node, it should be invoked, by slonik, only on that node.
Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.124.2.12
retrieving revision 1.124.2.13
diff -C2 -d -r1.124.2.12 -r1.124.2.13
*** remote_worker.c 6 Mar 2007 18:47:45 -0000 1.124.2.12
--- remote_worker.c 3 Apr 2007 21:55:03 -0000 1.124.2.13
***************
*** 269,272 ****
--- 269,275 ----
static void compress_actionseq(const char *ssy_actionseq, SlonDString * action_subquery);
+ static int process_ddl_script(SlonWorkMsg_event * event,SlonNode * node,
+ PGconn * local_dbconn, char * seqbuf );
+ static int check_set_subscriber(int set_id, int node_id,PGconn * local_dbconn);
/* ----------
***************
*** 1340,1438 ****
else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
{
! int ddl_setid = (int)strtol(event->ev_data1, NULL, 10);
! char *ddl_script = event->ev_data2;
! int ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10);
! int num_statements = -1, stmtno;
!
! PGresult *res;
! ExecStatusType rstat;
!
!
! slon_appendquery(&query1,
! "select %s.ddlScript_prepare_int(%d, %d); ",
! rtcfg_namespace,
! ddl_setid, ddl_only_on_node);
!
! if (query_execute(node, local_dbconn, &query1) < 0) {
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %\n",
! node->no_id, ddl_setid, ddl_only_on_node);
! slon_retry();
! }
!
! num_statements = scan_for_statements (ddl_script);
! slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n",
! node->no_id, num_statements);
! if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) {
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n",
! node->no_id, num_statements);
! slon_retry();
! }
!
! for (stmtno=0; stmtno < num_statements; stmtno++) {
! int startpos, endpos;
! char *dest;
! if (stmtno == 0)
! startpos = 0;
! else
! startpos = STMTS[stmtno-1];
!
! endpos = STMTS[stmtno];
! dest = (char *) malloc (endpos - startpos + 1);
! if (dest == 0) {
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n",
! node->no_id, endpos - startpos + 1);
! slon_retry();
! }
! strncpy(dest, ddl_script + startpos, endpos-startpos);
! dest[STMTS[stmtno]-startpos] = 0;
! slon_mkquery(&query1, dest);
! slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n",
! node->no_id, stmtno, dest);
! free(dest);
!
! res = PQexec(local_dbconn, dstring_data(&query1));
!
! if (PQresultStatus(res) != PGRES_COMMAND_OK &&
! PQresultStatus(res) != PGRES_TUPLES_OK &&
! PQresultStatus(res) != PGRES_EMPTY_QUERY)
! {
! rstat = PQresultStatus(res);
! slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat));
! dstring_free(&query1);
! slon_retry();
! }
! rstat = PQresultStatus(res);
! slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat));
! }
!
! slon_mkquery(&query1, "select %s.ddlScript_complete_int(%d, %d); ",
! rtcfg_namespace,
! ddl_setid,
! ddl_only_on_node);
!
! /* DDL_SCRIPT needs to be turned into a log shipping script */
! /* Note that the issue about parsing that mandates breaking
! up compound statements into
! individually-processed statements does not apply to log
! shipping as psql parses and processes each statement
! individually */
!
! if (archive_dir)
! {
! if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
! {
!
! if (archive_open(node, seqbuf) < 0)
! slon_retry();
! if (archive_tracking(node, rtcfg_namespace,
! ddl_setid, seqbuf, seqbuf,
! event->ev_timestamp_c) < 0)
! slon_retry();
! if (archive_append_str(node, ddl_script) < 0)
! slon_retry();
! if (archive_close(node) < 0)
! slon_retry();
! }
! }
}
else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
--- 1343,1347 ----
else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
{
! process_ddl_script(event,node,local_dbconn,seqbuf);
}
else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
***************
*** 6098,6099 ****
--- 6007,6181 ----
slon_log(SLON_DEBUG4, " compressed actionseq subquery... %s\n", dstring_data(action_subquery));
}
+
+
+ /**
+ *
+ * Process a ddl_script command.
+ */
+ static int process_ddl_script(SlonWorkMsg_event * event,SlonNode * node,
+ PGconn * local_dbconn,
+ char * seqbuf)
+ {
+ int ddl_setid = (int)strtol(event->ev_data1, NULL, 10);
+ char *ddl_script = event->ev_data2;
+ int ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10);
+ int num_statements = -1, stmtno;
+ int node_in_set;
+ int localNodeId;
+ PGresult *res;
+ ExecStatusType rstat;
+ SlonDString query1;
+
+
+
+ dstring_init(&query1);
+ /**
+ * Check to make sure this node is part of the set
+ */
+ slon_log(SLON_INFO, "Checking local node id\n");
+ localNodeId = db_getLocalNodeId(local_dbconn);
+ slon_log(SLON_INFO,"Found local node id\n");
+ node_in_set = check_set_subscriber(ddl_setid,localNodeId,local_dbconn);
+
+ if(!node_in_set) {
+ /**
+ *
+ * Node is not part of the set.
+ * Do not forward teh DDL to the node,
+ * nor should it be included in the log for log-shipping.
+ */
+ slon_log(SLON_INFO,"Not forwarding DDL to node %d for set %d\n",
+ node->no_id,ddl_setid);
+
+ }
+ else
+ {
+ slon_appendquery(&query1,
+ "select %s.ddlScript_prepare_int(%d, %d); ",
+ rtcfg_namespace,
+ ddl_setid, ddl_only_on_node);
+
+ if (query_execute(node, local_dbconn, &query1) < 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %\n",
+ node->no_id, ddl_setid, ddl_only_on_node);
+ slon_retry();
+ }
+
+ num_statements = scan_for_statements (ddl_script);
+ slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n",
+ node->no_id, num_statements);
+ if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n",
+ node->no_id, num_statements);
+ slon_retry();
+ }
+
+ for (stmtno=0; stmtno < num_statements; stmtno++) {
+ int startpos, endpos;
+ char *dest;
+ if (stmtno == 0)
+ startpos = 0;
+ else
+ startpos = STMTS[stmtno-1];
+
+ endpos = STMTS[stmtno];
+ dest = (char *) malloc (endpos - startpos + 1);
+ if (dest == 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n",
+ node->no_id, endpos - startpos + 1);
+ slon_retry();
+ }
+ strncpy(dest, ddl_script + startpos, endpos-startpos);
+ dest[STMTS[stmtno]-startpos] = 0;
+ slon_mkquery(&query1, dest);
+ slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n",
+ node->no_id, stmtno, dest);
+ free(dest);
+
+ res = PQexec(local_dbconn, dstring_data(&query1));
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK &&
+ PQresultStatus(res) != PGRES_EMPTY_QUERY)
+ {
+ rstat = PQresultStatus(res);
+ slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat));
+ dstring_free(&query1);
+ slon_retry();
+ }
+ rstat = PQresultStatus(res);
+ slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat));
+ }
+
+ slon_mkquery(&query1, "select %s.ddlScript_complete_int(%d, %d); ",
+ rtcfg_namespace,
+ ddl_setid,
+ ddl_only_on_node);
+
+ if (query_execute(node, local_dbconn, &query1) < 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL completion failed - set %d - only on node %\n",
+ node->no_id, ddl_setid, ddl_only_on_node);
+ slon_retry();
+ }
+
+ /* DDL_SCRIPT needs to be turned into a log shipping script */
+ /* Note that the issue about parsing that mandates breaking
+ up compound statements into
+ individually-processed statements does not apply to log
+ shipping as psql parses and processes each statement
+ individually */
+
+ if (archive_dir)
+ {
+ if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
+ {
+
+ if (archive_open(node, seqbuf) < 0)
+ slon_retry();
+ if (archive_tracking(node, rtcfg_namespace,
+ ddl_setid, seqbuf, seqbuf,
+ event->ev_timestamp_c) < 0)
+ slon_retry();
+ if (archive_append_str(node, ddl_script) < 0)
+ slon_retry();
+ if (archive_close(node) < 0)
+ slon_retry();
+ }
+ }
+ }/*else node a subscriber */
+
+ dstring_free(&query1);
+
+ }
+
+ /**
+ * Checks to see if the node specified is a member of the set.
+ *
+ */
+ static int check_set_subscriber(int set_id, int node_id,PGconn * local_dbconn)
+ {
+
+
+ SlonDString query1;
+ PGresult* res;
+ dstring_init(&query1);
+
+ slon_appendquery(&query1,"select 1 from %s.sl_subscribe WHERE sub_set=%d AND sub_receiver=%d for update"
+ ,rtcfg_namespace,set_id,node_id);
+ res = PQexec(local_dbconn,dstring_data(&query1));
+ if(PQresultStatus(res)!=PGRES_TUPLES_OK) {
+ slon_log(SLON_ERROR,"remoteWorkerThread_%d: DDL preperation can not check set membership"
+ ,node_id);
+ dstring_free(&query1);
+ slon_retry();
+ }
+ dstring_free(&query1);
+ if(PQntuples(res)==0) {
+ PQclear(res);
+ return 0;
+ }
+ PQclear(res);
+ return 1;
+
+
+ }
- Previous message: [Slony1-commit] slony1-engine RELEASE-1.2.10 configure
- Next message: [Slony1-commit] slony1-engine/src/slonik slonik.c
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list