diff --git a/src/backend/slony1_funcs.sql b/src/backend/slony1_funcs.sql index 7588f00..9e6ebf6 100644 --- a/src/backend/slony1_funcs.sql +++ b/src/backend/slony1_funcs.sql @@ -809,6 +809,11 @@ declare v_old_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check if the node exists -- ---- select * into v_old_row @@ -855,6 +860,11 @@ declare v_node_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that we are the node to activate and that we are -- currently disabled. -- ---- @@ -898,6 +908,11 @@ declare v_sub_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that the node is inactive -- ---- select * into v_node_row @@ -1002,6 +1017,11 @@ declare v_idx integer; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that this got called on a different node -- ---- if @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@') = ANY (p_no_ids) then @@ -1067,6 +1087,11 @@ declare v_tab_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- If the dropped node is a remote node, clean the configuration -- from all traces for it. -- ---- @@ -1130,6 +1155,11 @@ declare v_n int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- All consistency checks first if p_is_candidate then @@ -1211,6 +1241,11 @@ declare v_restart_required boolean; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + v_restart_required:=false; -- -- any nodes other than the backup receiving @@ -1289,6 +1324,11 @@ declare v_row record; v_new_event bigint; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + select * into v_row from @NAMESPACE@.sl_event where ev_origin = p_failed_node @@ -1331,8 +1371,13 @@ as $$ declare begin - perform @NAMESPACE@.failoverSet_int(p_failed_node, - p_backup_node,p_seq_no); + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + perform @NAMESPACE@.failoverSet_int(p_failed_node, + p_backup_node,p_seq_no); notify "_@CLUSTERNAME@_Restart"; return 0; @@ -1352,6 +1397,11 @@ declare v_last_sync int8; v_set int4; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + SELECT max(ev_seqno) into v_last_sync FROM @NAMESPACE@.sl_event where ev_origin=p_failed_node; if v_last_sync > p_last_seqno then @@ -1529,41 +1579,42 @@ as $$ declare v_dummy int4; begin - select 1 into v_dummy from @NAMESPACE@.sl_node where - no_id = p_no_id; + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + update @NAMESPACE@.sl_node set + no_active = np.no_active, + no_comment = np.no_comment, + no_failed = np.no_failed + from @NAMESPACE@.sl_node np + where np.no_id = p_no_provider + and sl_node.no_id = p_no_id; if not found then insert into @NAMESPACE@.sl_node (no_id, no_active, no_comment,no_failed) - select p_no_id, no_active, p_no_comment,no_failed + select p_no_id, no_active, p_no_comment, no_failed from @NAMESPACE@.sl_node where no_id = p_no_provider; - else - update @NAMESPACE@.sl_node set - no_active= np.no_active - ,no_comment=np.no_comment - ,no_failed=np.no_failed - from @NAMESPACE@.sl_node np - where np.no_id=p_no_provider - and sl_node.no_id=p_no_id; - end if; - select 1 into v_dummy from @NAMESPACE@.sl_path where - pa_server=p_no_provider and pa_client=p_no_id; - if not found then - insert into @NAMESPACE@.sl_path - (pa_server, pa_client, pa_conninfo, pa_connretry) - select pa_server, p_no_id, '', pa_connretry - from @NAMESPACE@.sl_path - where pa_client = p_no_provider; - end if; - select 1 into v_dummy from @NAMESPACE@.sl_path where - pa_server=p_no_id and pa_client=p_no_provider; - if not found then - insert into @NAMESPACE@.sl_path - (pa_server, pa_client, pa_conninfo, pa_connretry) - select p_no_id, pa_client, '', pa_connretry - from @NAMESPACE@.sl_path - where pa_server = p_no_provider; end if; + + insert into @NAMESPACE@.sl_path + (pa_server, pa_client, pa_conninfo, pa_connretry) + select pa_server, p_no_id, '', pa_connretry + from @NAMESPACE@.sl_path + where pa_client = p_no_provider + and (pa_server, p_no_id) not in (select pa_server, pa_client + from @NAMESPACE@.sl_path); + + insert into @NAMESPACE@.sl_path + (pa_server, pa_client, pa_conninfo, pa_connretry) + select p_no_id, pa_client, '', pa_connretry + from @NAMESPACE@.sl_path + where pa_server = p_no_provider + and (p_no_id, pa_client) not in (select pa_server, pa_client + from @NAMESPACE@.sl_path); + insert into @NAMESPACE@.sl_subscribe (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) select sub_set, sub_provider, p_no_id, sub_forward, sub_active @@ -1596,6 +1647,11 @@ as $$ declare v_row record; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + perform "pg_catalog".setval('@NAMESPACE@.sl_local_node_id', p_no_id); perform @NAMESPACE@.resetSession(); for v_row in select sub_set from @NAMESPACE@.sl_subscribe @@ -1662,6 +1718,11 @@ declare v_dummy int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check if the path already exists -- ---- select 1 into v_dummy @@ -1723,6 +1784,11 @@ declare v_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- There should be no existing subscriptions. Auto unsubscribing -- is considered too dangerous. -- ---- @@ -1774,6 +1840,11 @@ returns int4 as $$ begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Remove any dangling sl_listen entries with the server -- as provider and the client as receiver. This must have -- been cleared out before, but obviously was not. @@ -1837,6 +1908,11 @@ as $$ declare v_exists int4; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + select 1 into v_exists from @NAMESPACE@.sl_listen where li_origin = p_li_origin @@ -1909,6 +1985,11 @@ create or replace function @NAMESPACE@.dropListen_int (p_li_origin int4, p_li_pr returns int4 as $$ begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + delete from @NAMESPACE@.sl_listen where li_origin = p_li_origin and li_provider = p_li_provider @@ -1938,6 +2019,11 @@ as $$ declare v_local_node_id int4; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + v_local_node_id := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@'); insert into @NAMESPACE@.sl_set @@ -1962,6 +2048,11 @@ as $$ declare v_dummy int4; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + select 1 into v_dummy from @NAMESPACE@.sl_set where set_id = p_set_id @@ -2008,6 +2099,11 @@ declare v_tab_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that the set exists and that we are the origin -- and that it is not already locked. -- ---- @@ -2077,6 +2173,11 @@ declare v_tab_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that the set exists and that we are the origin -- and that it is not already locked. -- ---- @@ -2141,6 +2242,11 @@ declare v_lv_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that the set is locked and that this locking -- happened long enough ago. -- ---- @@ -2233,6 +2339,11 @@ declare v_last_sync int8; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Get our local node ID -- ---- v_local_node_id := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@'); @@ -2406,6 +2517,11 @@ declare v_origin int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that the set exists and originates here -- ---- select set_origin into v_origin from @NAMESPACE@.sl_set @@ -2441,6 +2557,11 @@ declare v_tab_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Restore all tables original triggers and rules and remove -- our replication stuff. -- ---- @@ -2492,6 +2613,11 @@ declare in_progress boolean; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that both sets exist and originate here -- ---- if p_set_id = p_add_id then @@ -2596,6 +2722,11 @@ create or replace function @NAMESPACE@.mergeSet_int (p_set_id int4, p_add_id int returns int4 as $$ begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + update @NAMESPACE@.sl_sequence set seq_set = p_set_id where seq_set = p_add_id; @@ -2627,6 +2758,11 @@ declare v_set_origin int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that we are the origin of the set -- ---- select set_origin into v_set_origin @@ -2683,6 +2819,11 @@ declare v_prec record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- For sets with a remote origin, check that we are subscribed -- to that set. Otherwise we ignore the table because it might -- not even exist in our database. @@ -2790,6 +2931,11 @@ declare v_set_id int4; v_set_origin int4; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + -- ---- -- Determine the set_id -- ---- @@ -2843,6 +2989,11 @@ declare v_sub_provider int4; v_tab_reloid oid; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + -- ---- -- Determine the set_id -- ---- @@ -2903,6 +3054,11 @@ declare v_set_origin int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that we are the origin of the set -- ---- select set_origin into v_set_origin @@ -2956,6 +3112,11 @@ declare v_sync_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- For sets with a remote origin, check that we are subscribed -- to that set. Otherwise we ignore the sequence because it might -- not even exist in our database. @@ -3048,6 +3209,11 @@ declare v_set_origin int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Determine set id for this sequence -- ---- select seq_set into v_set_id from @NAMESPACE@.sl_sequence where seq_id = p_seq_id; @@ -3103,6 +3269,11 @@ declare v_sync_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Determine set id for this sequence -- ---- select seq_set into v_set_id from @NAMESPACE@.sl_sequence where seq_id = p_seq_id; @@ -3167,6 +3338,11 @@ declare v_origin int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Get the tables current set -- ---- select tab_set into v_old_set_id from @NAMESPACE@.sl_table @@ -3250,6 +3426,11 @@ returns int4 as $$ begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Move the table to the new set -- ---- update @NAMESPACE@.sl_table @@ -3276,6 +3457,11 @@ declare v_origin int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Get the sequences current set -- ---- select seq_set into v_old_set_id from @NAMESPACE@.sl_sequence @@ -3358,6 +3544,11 @@ returns int4 as $$ begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Move the sequence to the new set -- ---- update @NAMESPACE@.sl_sequence @@ -3382,7 +3573,6 @@ declare v_fqname text; v_found integer; begin - -- ---- -- Get the sequences fully qualified name -- ---- @@ -3823,6 +4013,11 @@ declare v_missing_sets text; v_ev_seqno bigint; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + -- -- Check that the receiver exists -- @@ -3913,6 +4108,11 @@ declare v_ev_seqno2 int8; v_rec record; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + -- -- Check that the receiver exists -- @@ -4055,6 +4255,11 @@ declare v_seq_id bigint; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Lookup the set origin -- ---- select set_origin into v_set_origin @@ -4189,6 +4394,11 @@ declare v_tab_row record; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- Check that this is called on the receiver node -- ---- if p_sub_receiver != @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@') then @@ -4282,6 +4492,11 @@ as $$ declare begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- All the real work is done before event generation on the -- subscriber. -- ---- @@ -4344,6 +4559,11 @@ declare v_n int4; begin -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + + -- ---- -- The real work is done in the replication engine. All -- we have to do here is remembering that it happened. -- ---- @@ -4748,6 +4968,11 @@ as $$ declare v_row record; begin + -- ---- + -- Grab the central configuration lock + -- ---- + lock table @NAMESPACE@.sl_config_lock; + -- First remove the entire configuration delete from @NAMESPACE@.sl_listen; diff --git a/src/slon/remote_worker.c b/src/slon/remote_worker.c index 71f1b3b..cafd083 100644 --- a/src/slon/remote_worker.c +++ b/src/slon/remote_worker.c @@ -246,7 +246,7 @@ static void monitor_subscriber_query(PerfMon * pm); static void monitor_subscriber_iud(PerfMon * pm); static void adjust_provider_info(SlonNode * node, - WorkerGroupData * wd, int cleanup, int event_provider); + WorkerGroupData * wd, int cleanup); static int query_execute(SlonNode * node, PGconn *dbconn, SlonDString * dsp); static void query_append_event(SlonDString * dsp, @@ -391,7 +391,7 @@ remoteWorkerThread_main(void *cdata) if (curr_config != rtcfg_seq_get()) { - adjust_provider_info(node, wd, false, -1); + adjust_provider_info(node, wd, false); curr_config = rtcfg_seq_get(); /* @@ -1530,7 +1530,7 @@ remoteWorkerThread_main(void *cdata) * Thread exit time has arrived. Disconnect from all data providers and * free memory */ - adjust_provider_info(node, wd, true, -1); + adjust_provider_info(node, wd, true); slon_disconnectdb(local_conn); dstring_free(&query1); @@ -1555,8 +1555,7 @@ remoteWorkerThread_main(void *cdata) * ---------- */ static void -adjust_provider_info(SlonNode * node, WorkerGroupData * wd, int cleanup, - int event_provider) +adjust_provider_info(SlonNode * node, WorkerGroupData * wd, int cleanup) { ProviderInfo *provider; ProviderInfo *provnext; @@ -1757,45 +1756,6 @@ adjust_provider_info(SlonNode * node, WorkerGroupData * wd, int cleanup, provider->pa_conninfo = strdup(rtcfg_node->pa_conninfo); } } - - /* - * Step 4. - * - * If we don't have ANY provider at this point, fall back - * on the node that we got this event from. - */ - if (event_provider >= 0 && wd->provider_head == NULL) - { - /* - * No provider entry found. Create a new one. - */ - provider = (ProviderInfo *) - malloc(sizeof(ProviderInfo)); - memset(provider, 0, sizeof(ProviderInfo)); - provider->no_id = event_provider; - provider->wd = wd; - - dstring_init(&provider->helper_query); - - /* - * Add the provider to our work group - */ - DLLIST_ADD_TAIL(wd->provider_head, wd->provider_tail, - provider); - - /* - * Copy the runtime configurations conninfo into the provider - * info. - */ - rtcfg_node = rtcfg_findNode(provider->no_id); - if (rtcfg_node != NULL) - { - provider->pa_connretry = rtcfg_node->pa_connretry; - if (rtcfg_node->pa_conninfo != NULL) - provider->pa_conninfo = - strdup(rtcfg_node->pa_conninfo); - } - } } @@ -3630,7 +3590,6 @@ sync_event(SlonNode * node, SlonConn * local_conn, SlonDString lsquery; SlonDString *provider_query; SlonDString actionseq_subquery; - SlonSet * set_ptr = NULL; int actionlist_len; int64 min_ssy_seqno; @@ -3663,40 +3622,23 @@ sync_event(SlonNode * node, SlonConn * local_conn, } } - /* - * Make sure that we have the event provider in our provider list. - */ - for (provider = wd->provider_head; provider; provider = provider->next) - { - if (provider->no_id == event->event_provider) - break; - } - if (provider == NULL) - { - rtcfg_lock(); - /** - * is this remote_worker a set origin? - * if not we can ignore the SYNC event. - */ - for(set_ptr = rtcfg_set_list_head ; - set_ptr != NULL; set_ptr = set_ptr->next) - - { - if ( set_ptr->set_origin == node->no_id && set_ptr->sub_active) - break; - } - if ( set_ptr != NULL && event->event_provider != node->no_id && set_ptr->sub_active ) - { - //adjust_provider_info(node, wd, false, event->event_provider); - slon_log(SLON_ERROR,"remoteWorkerThread_%d: event provider %d is not in the provider list\n", - node->no_id,event->event_provider); - rtcfg_unlock(); - slon_retry(); - } - rtcfg_unlock(); - } - + * If the provider list is empty, there are no sets from this + * origin that are replicated. + */ + if (wd->provider_head == NULL) + { + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT + " not subscribed to any sets from this origin\n", + node->no_id, event->ev_seqno); + if (archive_dir) + { + rc = archive_close(node); + if (rc < 0) + slon_retry(); + } + return 0; + } /* * Establish all required data provider connections