diff --git a/clustertest/regression/testddl/init_schema.sql b/clustertest/regression/testddl/init_schema.sql new file mode 100644 index ee95188..1e94075 *** a/clustertest/regression/testddl/init_schema.sql --- b/clustertest/regression/testddl/init_schema.sql *************** zone_id integer *** 60,62 **** --- 60,72 ---- ALTER TABLE ONLY billing_discount ADD CONSTRAINT billing_discount_pkey PRIMARY KEY (billing_discount_id); + CREATE OR REPLACE FUNCTION insert_table1() returns trigger + as $$ + declare + + begin + insert into table1(data) values (NEW.data); + return NEW; + end; + $$ + language plpgsql; \ No newline at end of file diff --git a/clustertest/regression/testddl/testddl.js b/clustertest/regression/testddl/testddl.js new file mode 100644 index 4288f49..7ecf475 *** a/clustertest/regression/testddl/testddl.js --- b/clustertest/regression/testddl/testddl.js *************** function init_tables() { *** 29,34 **** --- 29,37 ---- +"set add table (id=4, set id=1, origin=1, fully qualified name = 'public.table4');\n" +"set add table (id=5, set id=1, origin=1, fully qualified name = 'public.table5');\n" +"set add table (id=6, set id=1, origin=1, fully qualified name = 'public.billing_discount');\n" + + "set add sequence(id=1,set id=1, origin=1, fully qualified name= 'public.table1_id_seq');\n" + + "set add sequence(id=2,set id=1, origin=1, fully qualified name= 'public.table5_id_seq');\n" + return script; } *************** function individual_ddl(coordinator, nod *** 121,126 **** --- 124,156 ---- run_slonik('update ddl',coordinator,preamble,slonikScript); } + function trigger_function(coordinator) { + /** + * We stop the slons because we want to make sure that a SYNC does not + * happen in between the EXECUTE_SCRIPT and the next SYNC. + */ + terminate_slon(coordinator); + var sql = ''; + for(var idx=0; idx < 1000; idx++) { + sql = sql + "insert into table5(data) values ('seqtest');\n" + } + var psql = coordinator.createPsqlCommand('db1',sql); + psql.run(); + coordinator.join(psql); + premable = get_slonik_preamble(); + slonikScript = "EXECUTE SCRIPT( SQL='alter table table1 drop column seqed;create trigger table5_trigger " + + " before INSERT on public.table5 for each row execute procedure " + + " insert_table1();'" + + ' ,EVENT NODE=1 );'; + run_slonik('add trigger ddl',coordinator,preamble,slonikScript); + slonikScript = "EXECUTE SCRIPT( SQL='insert into table5(data) values (9);'" + + ' ,EVENT NODE=1);'; + run_slonik('add trigger ddl',coordinator,preamble,slonikScript); + + launch_slon(coordinator); + + } + function inline_ddl(coordinator) { premable = get_slonik_preamble(); *************** function do_test(coordinator) { *** 166,171 **** --- 196,206 ---- inline_ddl(coordinator); wait_for_sync(coordinator); + + + trigger_function(coordinator); + wait_for_sync(coordinator); + } function get_compare_queries() { diff --git a/doc/adminguide/ddlchanges.sgml b/doc/adminguide/ddlchanges.sgml new file mode 100644 index 549e576..21f9f34 *** a/doc/adminguide/ddlchanges.sgml --- b/doc/adminguide/ddlchanges.sgml *************** replica nodes. It is advisiable to not *** 66,71 **** --- 66,85 ---- or updating rows to a table while a script changing that table (adding or deleting columns) is also running. + &slony1; 2.2.x and higher will replicate the SQL + of an EXECUTE SCRIPT as part of a SYNC. Scripts that perfrom an "ALTER TABLE" + to a replicated table will be replicated in the correct order with respect + to other concurrent activities on that table because of the exclusive lock + that the alter table received on the origin node. If your EXECUTE SCRIPT + does not obtain exclusive locks on all of the tables it uses then + you need to make sure that any transactions running concurrently with + the script are not making changes that can effect the results of the script. + For example, if your script does a nextval('some_replicated_seq') and + that sequence is being incremented by another transactions at the same time then + it is possible that script sees a different value from the sequence when + it runs on a replica than it did on the origin. + + diff --git a/src/backend/slony1_base.sql b/src/backend/slony1_base.sql new file mode 100644 index 83ee96f..9be5397 *** a/src/backend/slony1_base.sql --- b/src/backend/slony1_base.sql *************** comment on column @NAMESPACE@.sl_compone *** 728,731 **** -- ---------------------------------------------------------------------- grant usage on schema @NAMESPACE@ to public; - --- 728,730 ---- diff --git a/src/backend/slony1_funcs.c b/src/backend/slony1_funcs.c new file mode 100644 index 0699cbc..0051ea3 *** a/src/backend/slony1_funcs.c --- b/src/backend/slony1_funcs.c *************** *** 42,47 **** --- 42,48 ---- #include "utils/memutils.h" #include "utils/hsearch.h" #include "utils/timestamp.h" + #include "utils/int8.h" #ifdef HAVE_GETACTIVESNAPSHOT #include "utils/snapmgr.h" #endif *************** versionFunc(logApply)(PG_FUNCTION_ARGS) *** 989,995 **** char *ddl_script; bool localNodeFound = true; Datum script_insert_args[5]; ! char query[1024]; apply_num_script++; --- 990,1003 ---- char *ddl_script; bool localNodeFound = true; Datum script_insert_args[5]; ! Datum *nodeargs; ! bool *nodeargsnulls; ! int nodeargsn; ! Datum *seqargs; ! bool *seqargsnulls; ! int seqargsn; ! Datum array_holder; ! Datum delim_text; apply_num_script++; *************** versionFunc(logApply)(PG_FUNCTION_ARGS) *** 1001,1029 **** SPI_fnumber(tupdesc, "log_cmdargs"), &isnull); if (isnull) elog(ERROR, "Slony-I: log_cmdargs is NULL"); - deconstruct_array(DatumGetArrayTypeP(dat), TEXTOID, -1, false, 'i', ! &cmdargs, &cmdargsnulls, &cmdargsn); ! /* * The first element is the DDL statement itself. */ ddl_script = DatumGetCString(DirectFunctionCall1( textout, cmdargs[0])); - /* * If there is an optional node ID list, check that we are in it. */ ! if (cmdargsn > 1) { localNodeFound = false; ! for (i = 1; i < cmdargsn; i++) { int32 nodeId = DatumGetInt32( DirectFunctionCall1(int4in, ! DirectFunctionCall1(textout, cmdargs[i]))); ! if (nodeId == cs->localNodeId) { localNodeFound = true; --- 1009,1072 ---- SPI_fnumber(tupdesc, "log_cmdargs"), &isnull); if (isnull) elog(ERROR, "Slony-I: log_cmdargs is NULL"); deconstruct_array(DatumGetArrayTypeP(dat), TEXTOID, -1, false, 'i', ! &cmdargs, &cmdargsnulls, &cmdargsn); ! ! nodeargs=NULL; ! nodeargsn=0; ! seqargs=NULL; ! seqargsn=0; ! if( cmdargsn >= 2 ) ! { ! delim_text=DirectFunctionCall1(textin,CStringGetDatum(",")); ! if ( (! cmdargsnulls[1]) ) ! { ! char * astr=DatumGetCString(DirectFunctionCall1(textout, ! cmdargs[1])); ! ! if ( strcmp(astr,"")) ! { ! array_holder = DirectFunctionCall2(text_to_array,cmdargs[1], ! delim_text); ! deconstruct_array(DatumGetArrayTypeP(array_holder), ! TEXTOID, -1, false, 'i', ! &nodeargs, &nodeargsnulls, &nodeargsn); ! } ! } ! } ! if(cmdargsn >= 3) ! { ! if ( (! cmdargsnulls[2]) ) ! { ! char * astr=DatumGetCString(DirectFunctionCall1(textout, ! cmdargs[2])); ! if( strcmp(astr,"") ) ! { ! array_holder = DirectFunctionCall2(text_to_array,cmdargs[2], ! delim_text); ! deconstruct_array(DatumGetArrayTypeP(array_holder), ! TEXTARRAYOID, -1, false, 'i', ! &seqargs, &seqargsnulls, &seqargsn); ! } ! } ! } /* * The first element is the DDL statement itself. */ ddl_script = DatumGetCString(DirectFunctionCall1( textout, cmdargs[0])); /* * If there is an optional node ID list, check that we are in it. */ ! if (nodeargsn > 0) { localNodeFound = false; ! for (i = 0; i < nodeargsn; i++) { int32 nodeId = DatumGetInt32( DirectFunctionCall1(int4in, ! DirectFunctionCall1(textout, nodeargs[i]))); if (nodeId == cs->localNodeId) { localNodeFound = true; *************** versionFunc(logApply)(PG_FUNCTION_ARGS) *** 1038,1043 **** --- 1081,1133 ---- */ if (localNodeFound) { + + char query[1024]; + Oid argtypes[3]; + argtypes[0] = INT4OID; + argtypes[1] = INT4OID; + argtypes[2] = INT8OID; + + snprintf(query,1023,"select %s.sequenceSetValue($1," \ + "$2,NULL,$3); ",tg->tg_trigger->tgargs[0]); + void * plan = SPI_prepare(query,3,argtypes); + if ( plan == NULL ) + { + + elog(ERROR,"could not prepare plan to call sequenceSetValue"); + } + /** + * before we execute the DDL we need to update the sequences. + */ + if ( seqargsn > 0 ) + { + + for( i = 0; (i+2) < seqargsn; i=i+3 ) + { + Datum call_args[3]; + bool call_nulls[3]; + call_args[0] = DirectFunctionCall1(int4in, + DirectFunctionCall1(textout,seqargs[i])); + call_args[1] = DirectFunctionCall1(int4in, + DirectFunctionCall1(textout,seqargs[i+1])); + call_args[2] = DirectFunctionCall1(int8in, + DirectFunctionCall1(textout,seqargs[i+2])); + + call_nulls[0]=0; + call_nulls[1]=0; + call_nulls[2]=0; + + if ( SPI_execp(plan,call_args,call_nulls,0) < 0 ) + { + elog(ERROR,"error executing sequenceSetValue plan"); + + } + + } + + + } + sprintf(query,"set session_replication_role to local;"); if(SPI_exec(query,0) < 0) { *************** versionFunc(logApply)(PG_FUNCTION_ARGS) *** 1606,1612 **** MemoryContextSwitchTo(oldContext); cacheEnt->typmod[i / 2] = target_rel->rd_att->attrs[colnum - 1]->atttypmod; - sprintf(applyQueryPos, "%s%s = $%d", (i > 0) ? " AND " : "", slon_quote_identifier(colname), --- 1696,1701 ---- diff --git a/src/backend/slony1_funcs.sql b/src/backend/slony1_funcs.sql new file mode 100644 index 00aad22..dcbc9d9 *** a/src/backend/slony1_funcs.sql --- b/src/backend/slony1_funcs.sql *************** begin *** 3366,3375 **** execute 'select setval(''' || v_fqname || ''', ' || p_last_value::text || ')'; ! insert into @NAMESPACE@.sl_seqlog (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value); ! return p_seq_id; end; $$ language plpgsql; --- 3366,3376 ---- execute 'select setval(''' || v_fqname || ''', ' || p_last_value::text || ')'; ! if p_ev_seqno is not null then ! insert into @NAMESPACE@.sl_seqlog (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value); ! end if; return p_seq_id; end; $$ language plpgsql; *************** declare *** 3391,3400 **** --- 3392,3404 ---- c_found_origin boolean; c_node text; c_cmdargs text[]; + c_nodeargs text; + c_delim text; begin c_local_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@'); c_cmdargs = array_append('{}'::text[], p_statement); + c_nodeargs = ''; if p_nodes is not null then c_found_origin := 'f'; -- p_nodes list needs to consist of a list of nodes that exist *************** begin *** 3411,3418 **** if c_local_node = (c_node::integer) then c_found_origin := 't'; end if; ! ! c_cmdargs = array_append(c_cmdargs, c_node); end loop; if not c_found_origin then --- 3415,3425 ---- if c_local_node = (c_node::integer) then c_found_origin := 't'; end if; ! if length(c_nodeargs)>0 then ! c_nodeargs = c_nodeargs ||','|| c_node; ! else ! c_nodeargs=c_node; ! end if; end loop; if not c_found_origin then *************** begin *** 3421,3435 **** p_statement, p_nodes, c_local_node; end if; end if; ! execute p_statement; ! insert into @NAMESPACE@.sl_log_script (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) values (c_local_node, pg_catalog.txid_current(), nextval('@NAMESPACE@.sl_action_seq'), 'S', c_cmdargs); ! return currval('@NAMESPACE@.sl_action_seq'); end; $$ language plpgsql; --- 3428,3451 ---- p_statement, p_nodes, c_local_node; end if; end if; + c_cmdargs = array_append(c_cmdargs,c_nodeargs); + c_delim=','; + c_cmdargs = array_append(c_cmdargs, ! (select @NAMESPACE@.string_agg( seq_id::text || c_delim ! || c_local_node || ! c_delim || seq_last_value) ! FROM ( ! select seq_id, ! seq_last_value from @NAMESPACE@.sl_seqlastvalue ! where seq_origin = c_local_node) as FOO ! where NOT @NAMESPACE@.seqtrack(seq_id,seq_last_value) is NULL)); insert into @NAMESPACE@.sl_log_script (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) values (c_local_node, pg_catalog.txid_current(), nextval('@NAMESPACE@.sl_action_seq'), 'S', c_cmdargs); ! execute p_statement; return currval('@NAMESPACE@.sl_action_seq'); end; $$ language plpgsql; *************** begin *** 6224,6226 **** --- 6240,6275 ---- return v_seq_id; end $$ language plpgsql; + + + + -- + -- we create a function + aggregate for string_agg to aggregate strings + -- some versions of PG (ie prior to 9.0) don't support this + CREATE OR replace function @NAMESPACE@.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS + $BODY$ + DECLARE + c_delim text; + BEGIN + c_delim = ','; + IF (txt_before IS NULL or txt_before='') THEN + RETURN txt_new; + END IF; + RETURN txt_before || c_delim || txt_new; + END; + $BODY$ + LANGUAGE plpgsql; + comment on function @NAMESPACE@.agg_text_sum(text,text) is + 'An accumulator function used by the slony string_agg function to + aggregate rows into a string'; + -- + -- create a string_agg function in the slony schema. + -- PG 8.3 does not have this function so we make our own + -- when slony stops supporting PG 8.3 we can switch to + -- the PG 9.0+ provided version of string_agg + -- + CREATE AGGREGATE @NAMESPACE@.string_agg(text) ( + SFUNC=@NAMESPACE@.agg_text_sum, + STYPE=text, + INITCOND='' + ); \ No newline at end of file