diff --git a/clustertest/disorder/tests/BasicTest.js b/clustertest/disorder/tests/BasicTest.js index a678732..bc525ba 100644 *** a/clustertest/disorder/tests/BasicTest.js --- b/clustertest/disorder/tests/BasicTest.js *************** function BasicTest(coordinator, results) *** 10,15 **** --- 10,20 ---- this.tableIdCounter=1; this.sequenceIdCounter=1; this.currentOrigin='db1'; + this.compareQueryList = [ + ['SELECT c_id,c_name,c_total_orders,c_total_value FROM disorder.do_customer order by c_id','c_id'] + ,['SELECT i_id,i_name,i_price,i_in_production FROM disorder.do_item order by i_id','i_id'] + ,['SELECT ii_id, ii_in_stock,ii_reserved,ii_total_sold FROM disorder.do_inventory order by ii_id','ii_id'] + ]; } /** *************** BasicTest.prototype.dropDb = function(db *** 367,373 **** * */ BasicTest.prototype.getSyncWaitTime = function() { ! return 60; } /** --- 372,378 ---- * */ BasicTest.prototype.getSyncWaitTime = function() { ! return 3*60; } /** *************** BasicTest.prototype.seedData = function( *** 603,613 **** BasicTest.prototype.compareDb=function(lhs_db, rhs_db) { //Compare the results. this.coordinator.log("BasicTest.prototype.compareDb ["+lhs_db + ","+rhs_db + "] - begin"); ! var queryList = [ ! ['SELECT c_id,c_name,c_total_orders,c_total_value FROM disorder.do_customer order by c_id','c_id'] ! ,['SELECT i_id,i_name,i_price,i_in_production FROM disorder.do_item order by i_id','i_id'] ! ,['SELECT ii_id, ii_in_stock,ii_reserved,ii_total_sold FROM disorder.do_inventory order by ii_id','ii_id'] ! ]; compareFinished = { onEvent : function(object, event) { --- 608,614 ---- BasicTest.prototype.compareDb=function(lhs_db, rhs_db) { //Compare the results. this.coordinator.log("BasicTest.prototype.compareDb ["+lhs_db + ","+rhs_db + "] - begin"); ! compareFinished = { onEvent : function(object, event) { *************** BasicTest.prototype.compareDb=function(l *** 622,630 **** ! for(var idx=0; idx < queryList.length; idx++) { ! var compareOp = this.coordinator.createCompareOperation(lhs_db,rhs_db,queryList[idx][0], ! queryList[idx][1]); this.coordinator.registerObserver(compareOp, Packages.info.slony.clustertest.testcoordinator.Coordinator.EVENT_FINISHED, new Packages.info.slony.clustertest.testcoordinator.script.ExecutionObserver(compareFinished)); --- 623,631 ---- ! for(var idx=0; idx < this.compareQueryList.length; idx++) { ! var compareOp = this.coordinator.createCompareOperation(lhs_db,rhs_db,this.compareQueryList[idx][0], ! this.compareQueryList[idx][1]); this.coordinator.registerObserver(compareOp, Packages.info.slony.clustertest.testcoordinator.Coordinator.EVENT_FINISHED, new Packages.info.slony.clustertest.testcoordinator.script.ExecutionObserver(compareFinished)); *************** BasicTest.prototype.populateReviewTable= *** 780,783 **** connection.close(); } this.coordinator.log('populating review table on ' + node_id + " - complete"); ! } \ No newline at end of file --- 781,810 ---- connection.close(); } this.coordinator.log('populating review table on ' + node_id + " - complete"); ! } ! ! BasicTest.prototype.updateReviewTable=function(node_id,text) { ! this.coordinator.log('updating review table ' + node_id); ! var connection=this.coordinator.createJdbcConnection('db' + node_id); ! var stat = connection.createStatement(); ! try { ! stat.execute("update disorder.do_item_review set comments=E'" + ! text + "';"); ! var count=stat.getUpdateCount(); ! this.testResults.assertCheck('items updated',count>0,true); ! if(count==0) { ! exit(-1); ! } ! ! } ! catch(error) { ! this.coordinator.log('error updating the review table:' + ! error.getMessage()); ! this.testResults.assertCheck('review update failed',true,false); ! ! } ! finally { ! stat.close(); ! connection.close(); ! } ! this.coordinator.log('updating review table on ' + node_id + " - complete");} \ No newline at end of file diff --git a/clustertest/disorder/tests/MultinodeFailover.js b/clustertest/disorder/tests/MultinodeFailover.js index ...164eb22 . *** a/clustertest/disorder/tests/MultinodeFailover.js --- b/clustertest/disorder/tests/MultinodeFailover.js *************** *** 0 **** --- 1,142 ---- + + + coordinator.includeFile('disorder/tests/FailNodeTest.js'); + + MultinodeFailover = function(coordinator, testResults) { + Failover.call(this, coordinator, testResults); + this.testDescription='Test the FAILOVER command. This test will try FAILOVER' + +' with multiple nodes failing'; + this.compareQueryList.push(['select i_id,comments from disorder.do_item_review order by i_id','i_id']); + + } + MultinodeFailover.prototype = new Failover(); + MultinodeFailover.prototype.constructor = MultinodeFailover; + + MultinodeFailover.prototype.runTest = function() { + this.coordinator.log("MultinodeFailover.prototype.runTest - begin"); + this.testResults.newGroup("Multinode Fail Over Test"); + this.setupReplication(); + this.addCompletePaths(); + /** + * Start the slons. + */ + this.slonArray = []; + for ( var idx = 1; idx <= this.getNodeCount(); idx++) { + this.slonArray[idx - 1] = this.coordinator.createSlonLauncher('db' + idx); + this.slonArray[idx - 1].run(); + } + this.addCompletePaths(); + /** + * Add some tables to replication. + * + */ + this.addTables(); + + /** + * Subscribe the first node. + */ + this.subscribeSet(1,1, 1, [ 2, 3 ]); + this.subscribeSet(1,1, 3, [ 4, 5 ]); + this.slonikSync(1,1); + this.createSecondSet(2); + this.subscribeSet(2,2,2,[3,4,5]); + this.slonikSync(2,2); + var load = this.generateLoad(); + java.lang.Thread.sleep(10*1000); + this.slonikSync(1,1); + this.populateReviewTable(2); + /** + * make sure the _review data makes it to + * all slaves, then let some SYNC events get + * genereated. Next we FAILOVER. + */ + this.slonikSync(2,2); + java.lang.Thread.sleep(10*1000); + this.failover(1,3,2,3); + load.stop(); + this.coordinator.join(load); + /** + * rebuild the nodes. + */ + this.dropTwoNodes(1,2,3); + this.slonikSync(1,3); + this.compareDb('db3','db4'); + this.compareDb('db3','db5'); + this.reAddNode(1,3,3); + + /** + * perform some updates on node3 to the review table + */ + this.updateReviewTable(3,'From node 3'); + this.moveSet(1,3,1); + this.reAddNode(2,1,1); + this.addCompletePaths(); + this.subscribeSet(2,3,3,[2]); + this.moveSet(2,3,2); + + /** + * generate some load (node1) and + * reviews on node2. Let multiple txn snapshots be + * generated. + */ + load=this.generateLoad(); + for(var idx=0; idx < 20; idx++) + { + this.updateReviewTable(2,'From node 2.' + idx); + java.lang.Thread.sleep(1000); + } + /** + * failover. Node 1=>3, node2=>4 + */ + this.failover(1,3,2,4); + load.stop(); + this.coordinator.join(load); + this.dropTwoNodes(1,2,3); + this.reAddNode(1,3,3); + this.reAddNode(2,3,3); + this.addCompletePaths(); + this.subscribeSet(1,3,3,[1,2]); + this.subscribeSet(2,4,4,[2,1]); + this.slonikSync(1,1); + this.moveSet(1,3,1); + this.moveSet(2,4,2); + + + this.slonikSync(1,1); + this.slonikSync(2,2); + for ( var idx = 1; idx <= this.getNodeCount(); idx++) { + this.slonArray[idx - 1].stop(); + this.coordinator.join(this.slonArray[idx - 1]); + } + this.compareDb('db1','db3'); + this.compareDb('db2','db3'); + this.compareDb('db3','db4'); + this.compareDb('db3','db5'); + + } + + MultinodeFailover.prototype.failover=function(originA,backupA,originB,backupB) + { + var slonikPreamble = this.getSlonikPreamble(); + var slonikScript = 'echo \'MultinodeFailover.prototype.failover\';\n'; + slonikScript += 'FAILOVER( node=(id=' + originA + ',backup node=' + backupA +')' + + ', node=(id=' + originB + ',backup node=' + backupB + '));\n'; + var slonik=this.coordinator.createSlonik('failover',slonikPreamble,slonikScript); + slonik.run(); + this.coordinator.join(slonik); + this.testResults.assertCheck('failover passes',slonik.getReturnCode(),0); + + } + + MultinodeFailover.prototype.dropTwoNodes=function(node1,node2,event_node) + { + var slonikPreamble = this.getSlonikPreamble(); + var slonikScript = 'echo \'MultinodeFailover.prototype.dropTwoNodes\';\n'; + slonikScript+= 'drop node(id=\'' + node1 + ',' + node2 + '\',event node = ' + event_node + ');\nuninstall node(id='+node1+');\nuninstall node(id='+node2+');\n' + + var slonik=this.coordinator.createSlonik('drop node',slonikPreamble,slonikScript); + slonik.run(); + this.coordinator.join(slonik); + this.testResults.assertCheck('drop 2 nodes passes',slonik.getReturnCode(),0); + + } diff --git a/clustertest/disorder/tests/disorder_tests.js b/clustertest/disorder/tests/disorder_tests.js index bf3449f..20a2246 100644 *** a/clustertest/disorder/tests/disorder_tests.js --- b/clustertest/disorder/tests/disorder_tests.js *************** coordinator.includeFile('disorder/tests/ *** 25,30 **** --- 25,31 ---- coordinator.includeFile('disorder/tests/MergeSet.js'); coordinator.includeFile('disorder/tests/BulkAddingTest.js'); coordinator.includeFile('disorder/tests/WaitForTest.js'); + coordinator.includeFile('disorder/tests/MultinodeFailover.js'); var tests = [new EmptySet(coordinator,results) ,new OmitCopy(coordinator,results) *************** var tests = *** 50,62 **** ,new MergeSet(coordinator,results) ,new BulkAddingTest(coordinator,results) ,new WaitForTest(coordinator,results) //Below tests are known to fail. ,new UnsubscribeBeforeEnable(coordinator,results) ,new DropSet(coordinator,results) //fails bug 133 ,new CleanupTest(coordinator,results) //cleanup_interval does not (yet) do what the test wants ]; ! //tests=[new WaitForTest(coordinator,results)]; var basicTest = new BasicTest(coordinator,results); --- 51,64 ---- ,new MergeSet(coordinator,results) ,new BulkAddingTest(coordinator,results) ,new WaitForTest(coordinator,results) + ,new MultinodeFailover(coordinator,results) //Below tests are known to fail. ,new UnsubscribeBeforeEnable(coordinator,results) ,new DropSet(coordinator,results) //fails bug 133 ,new CleanupTest(coordinator,results) //cleanup_interval does not (yet) do what the test wants ]; ! //tests=[new MultinodeFailover(coordinator,results)]; var basicTest = new BasicTest(coordinator,results); diff --git a/doc/adminguide/slonik_ref.sgml b/doc/adminguide/slonik_ref.sgml index 39d7fb9..d676093 100644 *** a/doc/adminguide/slonik_ref.sgml --- b/doc/adminguide/slonik_ref.sgml *************** INIT CLUSTER ( *** 652,658 **** ID = ival ! Node ID of the node to remove. EVENT NODE = ival Node ID of the node to generate the event. --- 652,660 ---- ID = ival ! Node ID of the node to remove. This can be ! either a single node id or a comma seperated list of nodes ! EVENT NODE = ival Node ID of the node to generate the event. *************** INIT CLUSTER ( *** 670,675 **** --- 672,678 ---- Example DROP NODE ( ID = 2, EVENT NODE = 1 ); + DROP NODE (ID='3,4,5', EVENT NODE=1); Locking Behaviour *************** INIT CLUSTER ( *** 704,709 **** --- 707,714 ---- Version Information This command was introduced in &slony1; 1.0 In version 2.0, the default value for EVENT NODE was removed, so a node must be specified. + In version 2.2 support for dropping multiple ndoes in a single + command was added *************** MOVE SET ( *** 2657,2662 **** --- 2662,2673 ---- configuration with . + + If multiple set origin nodes have failed then you should tell FAILOVER + about all of them at once. This is done by passing a list of + NODE=(ID=val,BACKUP NODE=val), NODE=(ID=val2, BACKUP NODE=val2) to failover. + + ID = ival ID of the failed node *************** FAILOVER ( *** 2678,2685 **** ID = 1, BACKUP NODE = 2 ); ! ! Locking Behaviour Exclusive locks on each replicated table will be taken out --- 2689,2703 ---- ID = 1, BACKUP NODE = 2 ); ! ! #example of multiple nodes ! FAILOVER( ! NODE=(ID=1, BACKUP NODE=2), ! NODE=(ID=3, BACKUP NODE=4) ! ); ! ! ! Locking Behaviour Exclusive locks on each replicated table will be taken out *************** FAILOVER ( *** 2697,2709 **** linkend="stmtmoveset"> instead, as that does not abandon the failed node. ! ! If there are many nodes in a cluster, and failover includes ! dropping out additional nodes (e.g. when it ! is necessary to treat all nodes at a site ! including an origin as well as subscribers as failed), it is ! necessary to carefully sequence the actions. ! --- 2715,2725 ---- linkend="stmtmoveset"> instead, as that does not abandon the failed node. ! If a second failure occours in the middle of a FAILOVER ! operation then recovery might be complicated. ! ! ! *************** FAILOVER ( *** 2716,2721 **** --- 2732,2739 ---- Version Information This command was introduced in &slony1; 1.0 In version 2.0, the default BACKUP NODE value of 1 was removed, so it is mandatory to provide a value for this parameter. + In version 2.1 support was added for passing multiple nodes to + a single failover command diff --git a/src/backend/slony1_funcs.sql b/src/backend/slony1_funcs.sql index ea04ec1..4627dbf 100644 *** a/src/backend/slony1_funcs.sql --- b/src/backend/slony1_funcs.sql *************** begin *** 1228,1233 **** --- 1228,1239 ---- end if; end loop; + --blank the paths for the failed node. + --this ensures that *this* node won't be pulling + --data from the failed node (if the failed node can be accessed) + update @NAMESPACE@.sl_path set pa_conninfo='' WHERE + pa_server=p_failed_node; + -- Rewrite sl_listen table perform @NAMESPACE@.RebuildListenEntries(); diff --git a/src/slonik/parser.y b/src/slonik/parser.y index 4852de7..3049126 100644 *** a/src/slonik/parser.y --- b/src/slonik/parser.y *************** static int assign_options(statement_opti *** 57,62 **** --- 57,63 ---- option_list *opt_list; SlonikAdmInfo *adm_info; SlonikStmt *statement; + struct failed_node_entry * failed_node_entry; } %type id *************** static int assign_options(statement_opti *** 119,124 **** --- 120,126 ---- %type option_item_id %type option_item_literal %type option_item_yn + %type fail_node_list /* *************** stmt_drop_node : lno K_DROP K_NODE opti *** 617,622 **** --- 619,625 ---- { SlonikStmt_drop_node *new; statement_option opt[] = { + STMT_OPTION_STR( O_ID, NULL ), STMT_OPTION_INT( O_ID, -1 ), STMT_OPTION_INT( O_EVENT_NODE, -1 ), STMT_OPTION_END *************** stmt_drop_node : lno K_DROP K_NODE opti *** 631,638 **** if (assign_options(opt, $4) == 0) { ! new->no_id = opt[0].ival; ! new->ev_origin = opt[1].ival; } else parser_errors++; --- 634,670 ---- if (assign_options(opt, $4) == 0) { ! if(opt[0].ival > -1 ) ! { ! new->no_id_list=malloc(sizeof(int)*2); ! new->no_id_list[0]=opt[1].ival; ! new->no_id_list[1]=-1; ! } ! else ! { ! char * token; ! char * saveptr=NULL; ! int cnt; ! char * option_copy=strdup(opt[0].str); ! for(cnt=0,token=strtok_r(option_copy,",", ! &saveptr); ! token != NULL; cnt++, ! token=strtok_r(NULL,",",&saveptr)); ! free(option_copy); ! new->no_id_list=malloc(sizeof(int)*(cnt+1)); ! cnt=0; ! option_copy=strdup(opt[0].str); ! for(token=strtok_r(option_copy,",",&saveptr); ! token!=NULL; ! token=strtok_r(NULL,",",&saveptr)) ! { ! new->no_id_list[cnt++]=atoi(token); ! } ! free(option_copy); ! new->no_id_list[cnt]=-1; ! } ! new->ev_origin = opt[2].ival; ! } else parser_errors++; *************** stmt_drop_node : lno K_DROP K_NODE opti *** 640,647 **** $$ = (SlonikStmt *)new; } ; ! stmt_failed_node : lno K_FAILOVER option_list { SlonikStmt_failed_node *new; statement_option opt[] = { --- 672,693 ---- $$ = (SlonikStmt *)new; } ; + stmt_failed_node : lno K_FAILOVER '(' fail_node_list ')' ';' + { + SlonikStmt_failed_node *new; + + new = (SlonikStmt_failed_node *) + malloc(sizeof(SlonikStmt_failed_node)); + memset(new, 0, sizeof(SlonikStmt_failed_node)); + new->hdr.stmt_type = STMT_FAILED_NODE; + new->hdr.stmt_filename = current_file; + new->hdr.stmt_lno = $1; ! new->nodes=$4; ! ! $$ = (SlonikStmt *)new; ! } ! | lno K_FAILOVER option_list { SlonikStmt_failed_node *new; statement_option opt[] = { *************** stmt_failed_node : lno K_FAILOVER option *** 656,666 **** new->hdr.stmt_type = STMT_FAILED_NODE; new->hdr.stmt_filename = current_file; new->hdr.stmt_lno = $1; if (assign_options(opt, $3) == 0) { ! new->no_id = opt[0].ival; ! new->backup_node = opt[1].ival; } else parser_errors++; --- 702,715 ---- new->hdr.stmt_type = STMT_FAILED_NODE; new->hdr.stmt_filename = current_file; new->hdr.stmt_lno = $1; + new->nodes=(struct failed_node_entry*) + malloc(sizeof(struct failed_node_entry)*1); + memset(new->nodes,0, sizeof(struct failed_node_entry)); if (assign_options(opt, $3) == 0) { ! new->nodes->no_id = opt[0].ival; ! new->nodes->backup_node = opt[1].ival; } else parser_errors++; *************** stmt_failed_node : lno K_FAILOVER option *** 669,674 **** --- 718,770 ---- } ; + fail_node_list : K_NODE '=' '(' option_list_items ')' + { + struct failed_node_entry *new; + statement_option opt[] = { + STMT_OPTION_INT( O_ID, -1 ), + STMT_OPTION_INT( O_BACKUP_NODE, -1 ), + STMT_OPTION_END + }; + + new = (struct failed_node_entry *) + malloc(sizeof(struct failed_node_entry)); + memset(new, 0, sizeof(struct failed_node_entry)); + if (assign_options(opt, $4) == 0) + { + new->no_id = opt[0].ival; + new->backup_node = opt[1].ival; + } + else + parser_errors++; + + $$ = new; + + } + | K_NODE '=' '(' option_list_items ')' ',' fail_node_list + { + struct failed_node_entry *new; + statement_option opt[] = { + STMT_OPTION_INT( O_ID, -1 ), + STMT_OPTION_INT( O_BACKUP_NODE, -1 ), + STMT_OPTION_END + }; + + new = (struct failed_node_entry *) + malloc(sizeof(struct failed_node_entry)); + memset(new, 0, sizeof(struct failed_node_entry)); + if (assign_options(opt, $4) == 0) + { + new->no_id = opt[0].ival; + new->backup_node = opt[1].ival; + } + else + parser_errors++; + new->next=$7; + $$ = new; + + }; + stmt_uninstall_node : lno K_UNINSTALL K_NODE option_list { SlonikStmt_uninstall_node *new; *************** option_list_item : K_ID '=' option_item_ *** 1535,1540 **** --- 1631,1641 ---- $3->opt_code = O_ID; $$ = $3; } + | K_ID '=' option_item_literal + { + $3->opt_code= O_ID; + $$=$3; + } | K_BACKUP K_NODE '=' option_item_id { $4->opt_code = O_BACKUP_NODE; diff --git a/src/slonik/slonik.c b/src/slonik/slonik.c index 388bc62..bdd6451 100644 *** a/src/slonik/slonik.c --- b/src/slonik/slonik.c *************** script_check_stmts(SlonikScript * script *** 390,408 **** SlonikStmt_drop_node *stmt = (SlonikStmt_drop_node *) hdr; if (stmt->ev_origin < 0) { printf("%s:%d: Error: require EVENT NODE\n", hdr->stmt_filename, hdr->stmt_lno); errors++; } ! if (stmt->ev_origin == stmt->no_id) { ! printf("%s:%d: Error: " ! "Node ID and event node cannot be identical\n", ! hdr->stmt_filename, hdr->stmt_lno); errors++; } if (script_check_adminfo(hdr, stmt->ev_origin) < 0) errors++; } --- 390,425 ---- SlonikStmt_drop_node *stmt = (SlonikStmt_drop_node *) hdr; + if (stmt->ev_origin < 0) { printf("%s:%d: Error: require EVENT NODE\n", hdr->stmt_filename, hdr->stmt_lno); errors++; } ! if(stmt->no_id_list == NULL || ! stmt->no_id_list[0] == -1) { ! printf("%s:%d: Error: A node id must be provided", ! hdr->stmt_filename, hdr->stmt_lno); errors++; } + else + { + int cnt; + for(cnt=0;stmt->no_id_list[cnt]!=-1;cnt++) + { + if(stmt->no_id_list[cnt]==stmt->ev_origin) + { + printf("%s:%d: Error: " + "Node ID (%d) and event node cannot be identical\n", + hdr->stmt_filename, hdr->stmt_lno, + stmt->no_id_list[cnt]); + errors++; + } + } + + } if (script_check_adminfo(hdr, stmt->ev_origin) < 0) errors++; } *************** script_check_stmts(SlonikScript * script *** 412,433 **** { SlonikStmt_failed_node *stmt = (SlonikStmt_failed_node *) hdr; ! if (stmt->backup_node < 0) { ! printf("%s:%d: Error: require BACKUP NODE\n", ! hdr->stmt_filename, hdr->stmt_lno); errors++; } ! if (stmt->backup_node == stmt->no_id) { ! printf("%s:%d: Error: " ! "Node ID and backup node cannot be identical\n", ! hdr->stmt_filename, hdr->stmt_lno); ! errors++; } ! if (script_check_adminfo(hdr, stmt->backup_node) < 0) ! errors++; } break; --- 429,465 ---- { SlonikStmt_failed_node *stmt = (SlonikStmt_failed_node *) hdr; + struct failed_node_entry* node=NULL; ! if(stmt->nodes == NULL) { ! printf("%s:%d: Error: require at least one failed node\n", ! hdr->stmt_filename, hdr->stmt_lno); errors++; } ! for(node=stmt->nodes; node != NULL; ! node=node->next) { ! if (node->backup_node < 0) ! { ! printf("%s:%d: Error: require BACKUP NODE\n", ! hdr->stmt_filename, hdr->stmt_lno); ! errors++; ! } ! if (node->backup_node == node->no_id) ! { ! printf("%s:%d: Error: " ! "Node ID and backup node cannot be identical\n", ! hdr->stmt_filename, hdr->stmt_lno); ! errors++; ! } ! if (script_check_adminfo(hdr, node->backup_node) < 0) ! errors++; } ! /** ! * todo: verify that one backup node isn't also ! * a failing node. ! */ } break; *************** int *** 2437,2445 **** --- 2469,2479 ---- slonik_drop_node(SlonikStmt_drop_node * stmt) { SlonikAdmInfo *adminfo1; + SlonikAdmInfo *adminfo2; SlonDString query; SlonikAdmInfo * curAdmInfo; int rc; + int no_id_idx; adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->ev_origin); if (adminfo1 == NULL) *************** slonik_drop_node(SlonikStmt_drop_node * *** 2448,2501 **** if (db_begin_xact((SlonikStmt *) stmt, adminfo1,false) < 0) return -1; ! if(!auto_wait_disabled) { ! for(curAdmInfo = stmt->hdr.script->adminfo_list; ! curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next) { ! if(curAdmInfo->no_id == stmt->no_id) ! continue; ! if(slonik_is_slony_installed((SlonikStmt*)stmt,curAdmInfo) > 0 ) { ! rc=slonik_wait_config_caughtup(curAdmInfo,(SlonikStmt*)stmt, ! stmt->no_id); ! if(rc < 0) ! return rc; } } - } - - dstring_init(&query); - - slon_mkquery(&query, - "lock table \"_%s\".sl_event_lock, \"_%s\".sl_config_lock;" - "select \"_%s\".dropNode(%d); ", - stmt->hdr.script->clustername, - stmt->hdr.script->clustername, - stmt->hdr.script->clustername, - stmt->no_id); - /** - * we disable auto wait because we perform a wait - * above ignoring the node being dropped. - */ - if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, - stmt->hdr.script,true) < 0) - { dstring_free(&query); - return -1; } - /** - * if we have a conninfo for the node being dropped - * we want to clear out the last seqid. - */ - adminfo1 = get_adminfo(&stmt->hdr,stmt->no_id); - if(adminfo1 != NULL) { - adminfo1->last_event=-1; - } - - dstring_free(&query); return 0; } --- 2482,2553 ---- if (db_begin_xact((SlonikStmt *) stmt, adminfo1,false) < 0) return -1; ! for(no_id_idx=0; stmt->no_id_list[no_id_idx]!=-1;no_id_idx++) { ! if(!auto_wait_disabled) { ! for(curAdmInfo = stmt->hdr.script->adminfo_list; ! curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next) { ! int skip=0; ! int list_idx; ! /** ! * If we have admin info for any of the nodes being dropped ! * we disable 'wait for' on that node. ! */ ! for(list_idx=0; stmt->no_id_list[list_idx] != -1; list_idx++) ! { ! ! if(curAdmInfo->no_id==stmt->no_id_list[list_idx]) ! { ! skip=1; ! break; ! } ! } ! if(skip) ! continue; ! if(slonik_is_slony_installed((SlonikStmt*)stmt,curAdmInfo) > 0 ) ! { ! rc=slonik_wait_config_caughtup(curAdmInfo,(SlonikStmt*)stmt, ! stmt->no_id_list[no_id_idx]); ! if(rc < 0) ! return rc; ! } ! } + + } + dstring_init(&query); + + slon_mkquery(&query, + "lock table \"_%s\".sl_event_lock, \"_%s\".sl_config_lock;" + "select \"_%s\".dropNode(%d); ", + stmt->hdr.script->clustername, + stmt->hdr.script->clustername, + stmt->hdr.script->clustername, + stmt->no_id_list[no_id_idx]); + /** + * we disable auto wait because we perform a wait + * above ignoring the node being dropped. + */ + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,true) < 0) + { + dstring_free(&query); + return -1; + } + /** + * if we have a conninfo for the node being dropped + * we want to clear out the last seqid. + */ + adminfo2 = get_adminfo(&stmt->hdr,stmt->no_id_list[no_id_idx]); + if(adminfo2 != NULL) { + adminfo2->last_event=-1; } dstring_free(&query); } return 0; } *************** typedef struct *** 2518,2523 **** --- 2570,2577 ---- failnode_node **subscribers; failnode_node *max_node; int64 max_seqno; + int old_origin; + int backup_origin; } failnode_set; *************** slonik_failed_node(SlonikStmt_failed_nod *** 2526,2824 **** { SlonikAdmInfo *adminfo1; SlonDString query; - int num_nodes; - int num_sets; int n, i, j, k; ! ! failnode_node *nodeinfo; ! failnode_set *setinfo; ! char *failsetbuf; ! char *failnodebuf; PGresult *res1; PGresult *res2; PGresult *res3; ! int64 max_seqno_total = 0; ! failnode_node *max_node_total = NULL; ! int rc = 0; ! ! adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->backup_node); ! if (adminfo1 == NULL) ! return -1; ! ! if (db_begin_xact((SlonikStmt *) stmt, adminfo1,false) < 0) ! return -1; ! ! dstring_init(&query); ! ! /* ! * On the backup node select a list of all active nodes except for the ! * failed node. ! */ ! slon_mkquery(&query, ! "select no_id from \"_%s\".sl_node " ! " where no_id <> %d " ! " and no_active " ! " order by no_id; ", ! stmt->hdr.script->clustername, ! stmt->no_id); ! res1 = db_exec_select((SlonikStmt *) stmt, adminfo1, &query); ! if (res1 == NULL) ! { ! dstring_free(&query); ! return -1; ! } ! num_nodes = PQntuples(res1); ! ! /* ! * Get a list of all sets that are subscribed more than once directly from ! * the origin ! */ ! slon_mkquery(&query, ! "select S.set_id, count(S.set_id) " ! " from \"_%s\".sl_set S, \"_%s\".sl_subscribe SUB " ! " where S.set_id = SUB.sub_set " ! " and S.set_origin = %d " ! " and SUB.sub_provider = %d " ! " and SUB.sub_active " ! " group by set_id ", ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! stmt->no_id, stmt->no_id); ! res2 = db_exec_select((SlonikStmt *) stmt, adminfo1, &query); ! if (res2 == NULL) { ! PQclear(res1); ! dstring_free(&query); ! return -1; } - num_sets = PQntuples(res2); - - /* - * Allocate and initialize memory to hold some config info - */ - failsetbuf = malloc( sizeof(failnode_set) * num_sets); - failnodebuf = malloc( sizeof(failnode_node) * (num_nodes - +num_sets*num_nodes)); - memset(failsetbuf,0,sizeof(failnode_set) * num_sets); - memset(failnodebuf,0,sizeof(failnode_node) * (num_nodes - + (num_sets * num_nodes) )); - nodeinfo = (failnode_node *) failnodebuf; - setinfo = (failnode_set *) failsetbuf; ! for (i = 0; i < num_sets; i++) ! { ! setinfo[i].subscribers = (failnode_node **) ! (failnodebuf+ sizeof(failnode_node) * ! (num_nodes + (i*num_nodes))); ! } ! /* ! * Connect to all these nodes and determine if there is a node daemon ! * running on that node. */ ! for (i = 0; i < num_nodes; i++) { ! nodeinfo[i].no_id = (int)strtol(PQgetvalue(res1, i, 0), NULL, 10); ! nodeinfo[i].adminfo = get_active_adminfo((SlonikStmt *) stmt, ! nodeinfo[i].no_id); ! if (nodeinfo[i].adminfo == NULL) ! { ! PQclear(res1); ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); return -1; - } slon_mkquery(&query, ! "lock table \"_%s\".sl_config_lock; " ! "select nl_backendpid from \"_%s\".sl_nodelock " ! " where nl_nodeid = \"_%s\".getLocalNodeId('_%s') and " ! " exists (select 1 from pg_catalog.pg_stat_activity " ! " where procpid = nl_backendpid);", ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, stmt->hdr.script->clustername, ! stmt->hdr.script->clustername); ! res3 = db_exec_select((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query); ! if (res3 == NULL) { - PQclear(res1); - PQclear(res2); - free(failnodebuf); - free(failsetbuf); dstring_free(&query); return -1; } ! if (PQntuples(res3) == 0) ! { ! nodeinfo[i].has_slon = false; ! nodeinfo[i].slon_pid = 0; ! } ! else ! { ! nodeinfo[i].has_slon = true; ! nodeinfo[i].slon_pid = (int)strtol(PQgetvalue(res3, 0, 0), NULL, 10); ! } ! PQclear(res3); ! } ! PQclear(res1); ! ! /* ! * For every set we're interested in lookup the direct subscriber nodes. ! */ ! for (i = 0; i < num_sets; i++) ! { ! setinfo[i].set_id = (int)strtol(PQgetvalue(res2, i, 0), NULL, 10); ! setinfo[i].num_directsub = (int)strtol(PQgetvalue(res2, i, 1), NULL, 10); ! ! if (setinfo[i].num_directsub <= 1) ! continue; ! slon_mkquery(&query, ! "select sub_receiver " ! " from \"_%s\".sl_subscribe " ! " where sub_set = %d " ! " and sub_provider = %d " ! " and sub_active and sub_forward; ", stmt->hdr.script->clustername, ! setinfo[i].set_id, ! stmt->no_id); ! ! res3 = db_exec_select((SlonikStmt *) stmt, adminfo1, &query); ! if (res3 == NULL) { ! free(failnodebuf); ! free(failsetbuf); dstring_free(&query); return -1; } ! n = PQntuples(res3); ! ! for (j = 0; j < n; j++) { ! int sub_receiver = (int)strtol(PQgetvalue(res3, j, 0), NULL, 10); ! ! for (k = 0; k < num_nodes; k++) { ! if (nodeinfo[k].no_id == sub_receiver) ! { ! setinfo[i].subscribers[setinfo[i].num_subscribers] = ! &nodeinfo[k]; ! setinfo[i].num_subscribers++; ! break; ! } } ! if (k == num_nodes) { ! printf("node %d not found - inconsistent configuration\n", ! sub_receiver); free(failnodebuf); free(failsetbuf); - PQclear(res3); - PQclear(res2); dstring_free(&query); return -1; } } ! PQclear(res3); ! } ! PQclear(res2); ! ! /* ! * Execute the failedNode() procedure, first on the backup node, then on ! * all other nodes. ! */ ! slon_mkquery(&query, ! "lock table \"_%s\".sl_config_lock; " ! "select \"_%s\".failedNode(%d, %d); ", ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! stmt->no_id, stmt->backup_node); ! printf("executing failedNode() on %d\n",adminfo1->no_id); ! if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0) ! { ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; ! } ! for (i = 0; i < num_nodes; i++) ! { ! if (nodeinfo[i].no_id == stmt->backup_node) ! continue; ! ! if (db_exec_command((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query) < 0) { ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; } ! } ! ! /* ! * Big danger from now on, we commit the work done so far ! */ ! for (i = 0; i < num_nodes; i++) ! { ! if (db_commit_xact((SlonikStmt *) stmt, nodeinfo[i].adminfo) < 0) { free(failnodebuf); free(failsetbuf); dstring_free(&query); return -1; } ! } ! ! /* ! * Wait until all slon replication engines that were running have ! * restarted. ! */ ! n = 0; ! while (n < num_nodes) ! { ! sleep(1); ! n = 0; ! for (i = 0; i < num_nodes; i++) { ! if (!nodeinfo[i].has_slon) ! { ! n++; continue; ! } ! ! slon_mkquery(&query, ! "select nl_backendpid from \"_%s\".sl_nodelock " ! " where nl_backendpid <> %d " ! " and nl_nodeid = \"_%s\".getLocalNodeId('_%s');", ! stmt->hdr.script->clustername, ! nodeinfo[i].slon_pid, ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername ! ); ! res1 = db_exec_select((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query); ! if (res1 == NULL) { free(failnodebuf); free(failsetbuf); dstring_free(&query); return -1; } ! if (PQntuples(res1) == 1) ! { ! nodeinfo[i].has_slon = false; ! n++; ! } ! ! PQclear(res1); ! if (db_rollback_xact((SlonikStmt *) stmt, nodeinfo[i].adminfo) < 0) { free(failnodebuf); free(failsetbuf); --- 2580,2861 ---- { SlonikAdmInfo *adminfo1; SlonDString query; + SlonDString failed_node_list; int n, i, j, k; ! int num_origins=0; ! int cur_origin_idx=0; ! char **failsetbuf; ! char **failnodebuf; PGresult *res1; PGresult *res2; PGresult *res3; ! int64 * max_seqno_total = 0; ! failnode_node **max_node_total = NULL; ! struct failed_node_entry * node_entry = stmt->nodes; ! int rc = 0; ! ! dstring_init(&failed_node_list); ! for(node_entry=stmt->nodes; node_entry != NULL; ! node_entry=node_entry->next) { ! if ( node_entry==stmt->nodes) ! slon_appendquery(&failed_node_list,"%d",node_entry->no_id); ! else ! slon_appendquery(&failed_node_list,",%d",node_entry->no_id); ! num_origins++; } ! ! dstring_init(&query); ! failnodebuf = (char**) malloc ( sizeof(char*) * num_origins); ! failsetbuf = (char**) malloc ( sizeof(char*) * num_origins); ! max_seqno_total = (int64*) malloc ( sizeof(int64) * num_origins); ! max_node_total = (failnode_node **) malloc ( sizeof(failnode_node*) * ! num_origins); ! memset(max_node_total,0, sizeof(failnode_node*) * num_origins); ! memset(max_seqno_total,0, sizeof(int64) * num_origins); ! /** ! * for each failed node */ ! cur_origin_idx=0; ! for(node_entry=stmt->nodes; node_entry != NULL; ! node_entry=node_entry->next,cur_origin_idx++) { ! failnode_node *nodeinfo; ! failnode_set *setinfo; ! adminfo1 = get_active_adminfo((SlonikStmt *) stmt, node_entry->backup_node); ! if (adminfo1 == NULL) return -1; + if (db_begin_xact((SlonikStmt *) stmt, adminfo1,false) < 0) + return -1; + /* + * On the backup node select a list of all active nodes except for the + * failed nodes. + */ slon_mkquery(&query, ! "select no_id from \"_%s\".sl_node " ! " where no_id not in ( %s ) " ! " and no_active " ! " order by no_id; ", stmt->hdr.script->clustername, ! failed_node_list); ! res1 = db_exec_select((SlonikStmt *) stmt, adminfo1, &query); ! if (res1 == NULL) { dstring_free(&query); return -1; } ! node_entry->num_nodes = PQntuples(res1); ! ! /* ! * Get a list of all sets that are subscribed more than once ! directly from the origin ! */ slon_mkquery(&query, ! "select S.set_id, count(S.set_id) " ! " from \"_%s\".sl_set S, \"_%s\".sl_subscribe SUB " ! " where S.set_id = SUB.sub_set " ! " and S.set_origin = %d " ! " and SUB.sub_provider = %d " ! " and SUB.sub_active " ! " group by set_id ", stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! node_entry->no_id, node_entry->no_id); ! res2 = db_exec_select((SlonikStmt *) stmt, adminfo1, &query); ! if (res2 == NULL) { ! PQclear(res1); dstring_free(&query); return -1; } ! node_entry->num_sets = PQntuples(res2); ! ! /* ! * Allocate and initialize memory to hold some config info ! */ ! failsetbuf[cur_origin_idx] = malloc( sizeof(failnode_set) * ! node_entry->num_sets); ! failnodebuf[cur_origin_idx] = malloc( sizeof(failnode_node) ! * (node_entry->num_nodes ! +node_entry->num_sets ! *node_entry->num_nodes)); ! memset(failsetbuf[cur_origin_idx],0,sizeof(failnode_set) * ! node_entry->num_sets); ! memset(failnodebuf[cur_origin_idx],0,sizeof(failnode_node) ! * (node_entry->num_nodes + (node_entry->num_sets ! * node_entry->num_nodes) )); ! ! nodeinfo = (failnode_node *) failnodebuf[cur_origin_idx]; ! setinfo = (failnode_set *) failsetbuf[cur_origin_idx]; ! ! for (i = 0; i < node_entry->num_sets; i++) { ! setinfo[i].subscribers = (failnode_node **) ! (failnodebuf[cur_origin_idx]+ sizeof(failnode_node) * ! (node_entry->num_nodes + (i*node_entry->num_nodes))); ! } ! ! /* ! * Connect to all these nodes and determine if there is a node daemon ! * running on that node. ! */ ! for (i = 0; i < node_entry->num_nodes; i++) ! { ! nodeinfo[i].no_id = (int)strtol(PQgetvalue(res1, i, 0), NULL, 10); ! nodeinfo[i].adminfo = get_active_adminfo((SlonikStmt *) stmt, ! nodeinfo[i].no_id); ! if (nodeinfo[i].adminfo == NULL) { ! PQclear(res1); ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; } ! ! slon_mkquery(&query, ! "lock table \"_%s\".sl_config_lock; " ! "select nl_backendpid from \"_%s\".sl_nodelock " ! " where nl_nodeid = \"_%s\".getLocalNodeId('_%s') and " ! " exists (select 1 from pg_catalog.pg_stat_activity " ! " where procpid = nl_backendpid);", ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername); ! res3 = db_exec_select((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query); ! if (res3 == NULL) { ! PQclear(res1); ! PQclear(res2); free(failnodebuf); free(failsetbuf); dstring_free(&query); return -1; } + if (PQntuples(res3) == 0) + { + nodeinfo[i].has_slon = false; + nodeinfo[i].slon_pid = 0; + } + else + { + nodeinfo[i].has_slon = true; + nodeinfo[i].slon_pid = (int)strtol(PQgetvalue(res3, 0, 0), NULL, 10); + } + PQclear(res3); } ! PQclear(res1); ! ! /* ! * For every set we're interested in lookup the direct subscriber nodes. ! */ ! for (i = 0; i < node_entry->num_sets; i++) { ! setinfo[i].set_id = (int)strtol(PQgetvalue(res2, i, 0), NULL, 10); ! setinfo[i].num_directsub = (int)strtol(PQgetvalue(res2, i, 1), NULL, 10); ! ! if (setinfo[i].num_directsub <= 1) ! continue; ! ! slon_mkquery(&query, ! "select sub_receiver " ! " from \"_%s\".sl_subscribe " ! " where sub_set = %d " ! " and sub_provider = %d " ! " and sub_receiver not in (%s) " ! " and sub_active and sub_forward; ", ! stmt->hdr.script->clustername, ! setinfo[i].set_id, ! node_entry->no_id,dstring_data(&failed_node_list)); ! ! res3 = db_exec_select((SlonikStmt *) stmt, adminfo1, &query); ! if (res3 == NULL) ! { ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; ! } ! n = PQntuples(res3); ! ! for (j = 0; j < n; j++) ! { ! int sub_receiver = (int)strtol(PQgetvalue(res3, j, 0), NULL, 10); ! ! for (k = 0; k < node_entry->num_nodes; k++) ! { ! if (nodeinfo[k].no_id == sub_receiver) ! { ! setinfo[i].subscribers[setinfo[i].num_subscribers] = ! &nodeinfo[k]; ! setinfo[i].num_subscribers++; ! break; ! } ! } ! if (k == node_entry->num_nodes) ! { ! printf("node %d not found - inconsistent configuration\n", ! sub_receiver); ! free(failnodebuf); ! free(failsetbuf); ! PQclear(res3); ! PQclear(res2); ! dstring_free(&query); ! return -1; ! } ! } ! PQclear(res3); } ! PQclear(res2); ! ! /* ! * Execute the failedNode() procedure, first on the backup node, then on ! * all other nodes. ! */ ! slon_mkquery(&query, ! "lock table \"_%s\".sl_config_lock; " ! "select \"_%s\".failedNode(%d, %d); ", ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! node_entry->no_id, node_entry->backup_node); ! printf("executing failedNode() on %d\n",adminfo1->no_id); ! if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0) { free(failnodebuf); free(failsetbuf); dstring_free(&query); return -1; } ! for (i = 0; i < node_entry->num_nodes; i++) { ! if (nodeinfo[i].no_id == node_entry->backup_node) continue; ! ! if (db_exec_command((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query) < 0) { free(failnodebuf); free(failsetbuf); dstring_free(&query); return -1; } ! } ! ! /* ! * Big danger from now on, we commit the work done so far ! */ ! for (i = 0; i < node_entry->num_nodes; i++) ! { ! if (db_commit_xact((SlonikStmt *) stmt, nodeinfo[i].adminfo) < 0) { free(failnodebuf); free(failsetbuf); *************** slonik_failed_node(SlonikStmt_failed_nod *** 2827,3172 **** } } } /* ! * Determine the absolutely last event sequence known from the failed ! * node. */ ! slon_mkquery(&query, ! "select max(ev_seqno) " ! " from \"_%s\".sl_event " ! " where ev_origin = %d; ", ! stmt->hdr.script->clustername, ! stmt->no_id); ! for (i = 0; i < num_nodes; i++) { ! res1 = db_exec_select((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query); ! if (res1 != NULL) { ! if (PQntuples(res1) == 1) { ! int64 max_seqno; ! ! slon_scanint64(PQgetvalue(res1, 0, 0), &max_seqno); ! if (max_seqno > max_seqno_total) { ! max_seqno_total = max_seqno; ! max_node_total = &nodeinfo[i]; } } - PQclear(res1); } - else - rc = -1; } ! ! /* ! * For every set determine the direct subscriber with the highest applied ! * sync, preferring the backup node. ! */ ! for (i = 0; i < num_sets; i++) { ! setinfo[i].max_node = NULL; ! setinfo[i].max_seqno = 0; ! ! if (setinfo[i].num_directsub <= 1) { ! int64 ev_seqno; ! ! slon_mkquery(&query, ! "select max(ev_seqno) " ! " from \"_%s\".sl_event " ! " where ev_origin = %d " ! " and ev_type = 'SYNC'; ", ! stmt->hdr.script->clustername, ! stmt->no_id); ! res1 = db_exec_select((SlonikStmt *) stmt, ! adminfo1, &query); ! if (res1 == NULL) { ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; } ! slon_scanint64(PQgetvalue(res1, 0, 0), &ev_seqno); ! ! setinfo[i].max_seqno = ev_seqno; ! ! PQclear(res1); ! ! continue; } ! ! slon_mkquery(&query, ! "select ssy_seqno " ! " from \"_%s\".sl_setsync " ! " where ssy_setid = %d; ", ! stmt->hdr.script->clustername, ! setinfo[i].set_id); ! ! for (j = 0; j < setinfo[i].num_subscribers; j++) { - int64 ssy_seqno; ! res1 = db_exec_select((SlonikStmt *) stmt, ! setinfo[i].subscribers[j]->adminfo, &query); ! if (res1 == NULL) { ! free(failsetbuf); ! free(failnodebuf); ! dstring_free(&query); ! return -1; } - if (PQntuples(res1) == 1) - { - slon_scanint64(PQgetvalue(res1, 0, 0), &ssy_seqno); ! if (setinfo[i].subscribers[j]->no_id == stmt->backup_node) { ! if (ssy_seqno >= setinfo[i].max_seqno) { ! setinfo[i].max_node = setinfo[i].subscribers[j]; ! setinfo[i].max_seqno = ssy_seqno; } } else { ! if (ssy_seqno > setinfo[i].max_seqno) ! { ! setinfo[i].max_node = setinfo[i].subscribers[j]; ! setinfo[i].max_seqno = ssy_seqno; ! } } ! ! if (ssy_seqno > max_seqno_total) ! max_seqno_total = ssy_seqno; ! } ! else ! { ! printf("can't get setsync status for set %d from node %d\n", ! setinfo[i].set_id, setinfo[i].subscribers[j]->no_id); ! rc = -1; } - - PQclear(res1); - } - } - - /* - * Now switch the backup node to receive all sets from those highest - * nodes. - */ - for (i = 0; i < num_sets; i++) - { - int use_node; - - if (setinfo[i].num_directsub <= 1) - { - use_node = stmt->backup_node; - } - else if (setinfo[i].max_node == NULL) - { - printf("no setsync status for set %d found at all\n", - setinfo[i].set_id); - rc = -1; - use_node = stmt->backup_node; - } - else - { - printf("IMPORTANT: Last known SYNC for set %d = " - INT64_FORMAT "\n", - setinfo[i].set_id, - setinfo[i].max_seqno); - use_node = setinfo[i].max_node->no_id; - - setinfo[i].max_node->num_sets++; } ! if (use_node != stmt->backup_node) { ! /** ! * commit the transaction so a new transaction ! * is ready for the lock table ! */ ! if (db_commit_xact((SlonikStmt *) stmt, adminfo1) < 0) { ! free(failsetbuf); ! free(failnodebuf); ! dstring_free(&query); ! return -1; } ! slon_mkquery(&query, ! "lock table \"_%s\".sl_event_lock, \"_%s\".sl_config_lock;" ! "select \"_%s\".storeListen(%d,%d,%d); " ! "select \"_%s\".subscribeSet_int(%d,%d,%d,'t','f'); ", ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! stmt->no_id, use_node, stmt->backup_node, ! stmt->hdr.script->clustername, ! setinfo[i].set_id, use_node, stmt->backup_node); ! if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0) ! rc = -1; ! } ! } ! ! /* ! * Commit the transaction on the backup node to activate those changes. ! */ ! if (db_commit_xact((SlonikStmt *) stmt, adminfo1) < 0) ! rc = -1; ! ! /* ! * Now execute all FAILED_NODE events on the node that had the highest of ! * all events alltogether. ! */ ! if (max_node_total != NULL) ! { ! for (i = 0; i < num_sets; i++) ! { ! char ev_seqno_c[NAMEDATALEN]; ! char ev_seqfake_c[NAMEDATALEN]; ! ! sprintf(ev_seqno_c, INT64_FORMAT, setinfo[i].max_seqno); ! sprintf(ev_seqfake_c, INT64_FORMAT, ++max_seqno_total); ! if (db_commit_xact((SlonikStmt *) stmt, max_node_total->adminfo) ! < 0) { ! return -1; } - slon_mkquery(&query, - "lock table \"_%s\".sl_event_lock, \"_%s\".sl_config_lock;" - "select \"_%s\".failedNode2(%d,%d,%d,'%s','%s'); ", - stmt->hdr.script->clustername, - stmt->hdr.script->clustername, - stmt->hdr.script->clustername, - stmt->no_id, stmt->backup_node, - setinfo[i].set_id, ev_seqno_c, ev_seqfake_c); - printf("NOTICE: executing \"_%s\".failedNode2 on node %d\n", - stmt->hdr.script->clustername, - max_node_total->adminfo->no_id); - if (db_exec_command((SlonikStmt *) stmt, - max_node_total->adminfo, &query) < 0) - rc = -1; else { ! SlonikAdmInfo * failed_conn_info=NULL; ! SlonikAdmInfo * last_conn_info=NULL; ! bool temp_conn_info=false; ! /** ! * now wait for the FAILOVER to finish. ! * To do this we must wait for the FAILOVER_EVENT ! * which has ev_origin=stmt->no_id (the failed node) ! * but was incjected into the sl_event table on the ! * most ahead node (max_node_total->adminfo) ! * to be confirmed by the backup node. ! * ! * Then we wait for the backup node to send an event ! * and be confirmed elsewhere. ! * ! */ ! SlonikStmt_wait_event wait_event; ! wait_event.hdr=*(SlonikStmt*)stmt; ! wait_event.wait_origin=stmt->no_id; /*failed node*/ ! wait_event.wait_on=max_node_total->adminfo->no_id; ! wait_event.wait_confirmed=-1; ! wait_event.wait_timeout=0; ! /** ! * see if we can find a admconninfo ! * for the failed node. */ ! ! for(failed_conn_info = stmt->hdr.script->adminfo_list; ! failed_conn_info != NULL; ! failed_conn_info=failed_conn_info->next) ! { ! ! if(failed_conn_info->no_id==stmt->no_id) ! { ! break; ! } ! last_conn_info=failed_conn_info; ! } ! if(failed_conn_info == NULL) ! { ! temp_conn_info=true; ! last_conn_info->next = malloc(sizeof(SlonikAdmInfo)); ! memset(last_conn_info->next,0,sizeof(SlonikAdmInfo)); ! failed_conn_info=last_conn_info->next; ! failed_conn_info->no_id=stmt->no_id; ! failed_conn_info->stmt_filename="slonik generated"; ! failed_conn_info->stmt_lno=-1; ! failed_conn_info->conninfo=""; ! failed_conn_info->script=last_conn_info->script; ! } ! ! failed_conn_info->last_event=max_seqno_total; ! ! /* ! * commit all open transactions despite of all possible errors ! * otherwise the WAIT FOR will not work. ! **/ ! for (i = 0; i < num_nodes; i++) { ! if (db_commit_xact((SlonikStmt *) stmt, ! nodeinfo[i].adminfo) < 0) ! rc = -1; } ! ! rc = slonik_wait_event(&wait_event); ! if(rc < 0) ! { ! /** ! * pretty serious? how do we recover? ! */ ! printf("%s:%d error waiting for event\n", ! stmt->hdr.stmt_filename, stmt->hdr.stmt_lno); ! } ! ! if(temp_conn_info) { ! last_conn_info->next=failed_conn_info->next; ! free(failed_conn_info); ! } - slon_mkquery(&query, ! "lock table \"_%s\".sl_event_lock; " ! "select \"_%s\".createEvent('_%s', 'SYNC'); ", stmt->hdr.script->clustername, stmt->hdr.script->clustername, ! stmt->hdr.script->clustername); ! if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, ! stmt->hdr.script,1) < 0) { ! printf("%s:%d: error submitting SYNC event to backup node" ! ,stmt->hdr.stmt_filename, stmt->hdr.stmt_lno); ! } ! ! }/*else*/ ! } ! } ! /* ! * commit all open transactions despite of all possible errors ! */ ! for (i = 0; i < num_nodes; i++) ! { ! if (db_commit_xact((SlonikStmt *) stmt, ! nodeinfo[i].adminfo) < 0) ! rc = -1; } - free(failsetbuf); free(failnodebuf); --- 2864,3291 ---- } } } + /** + * end of loop + */ + /* ! * Wait until all slon replication engines that were running have ! * restarted. */ ! cur_origin_idx=0; ! for(node_entry=stmt->nodes; node_entry != NULL; ! node_entry=node_entry->next) { ! failnode_node *nodeinfo = (failnode_node *) failnodebuf[cur_origin_idx]; ! ! n = 0; ! while (n < node_entry->num_nodes) { ! sleep(1); ! n = 0; ! for (i = 0; i < node_entry->num_nodes; i++) { ! if (!nodeinfo[i].has_slon) { ! n++; ! continue; ! } ! ! slon_mkquery(&query, ! "select nl_backendpid from \"_%s\".sl_nodelock " ! " where nl_backendpid <> %d " ! " and nl_nodeid = \"_%s\".getLocalNodeId('_%s');", ! stmt->hdr.script->clustername, ! nodeinfo[i].slon_pid, ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername ! ); ! res1 = db_exec_select((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query); ! if (res1 == NULL) ! { ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; ! } ! if (PQntuples(res1) == 1) ! { ! nodeinfo[i].has_slon = false; ! n++; ! } ! ! PQclear(res1); ! if (db_rollback_xact((SlonikStmt *) stmt, nodeinfo[i].adminfo) < 0) ! { ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; } } } } ! ! cur_origin_idx=0; ! for(node_entry=stmt->nodes; node_entry != NULL; ! node_entry=node_entry->next, cur_origin_idx++) { ! ! failnode_node * nodeinfo = (failnode_node *) failnodebuf[cur_origin_idx]; ! /* ! * Determine the absolutely last event sequence known from the failed ! * node. ! */ ! slon_mkquery(&query, ! "select max(ev_seqno) " ! " from \"_%s\".sl_event " ! " where ev_origin = %d; ", ! stmt->hdr.script->clustername, ! node_entry->no_id); ! for (i = 0; i < node_entry->num_nodes; i++) { ! res1 = db_exec_select((SlonikStmt *) stmt, nodeinfo[i].adminfo, &query); ! if (res1 != NULL) { ! if (PQntuples(res1) == 1) ! { ! int64 max_seqno; ! ! slon_scanint64(PQgetvalue(res1, 0, 0), &max_seqno); ! if (max_seqno > max_seqno_total[cur_origin_idx]) ! { ! max_seqno_total[cur_origin_idx] = max_seqno; ! max_node_total[cur_origin_idx] = &nodeinfo[i]; ! } ! } ! PQclear(res1); } ! else ! rc = -1; } ! ! /* ! * For every set determine the direct subscriber with the highest ! *applied ! * sync, preferring the backup node. ! */ ! failnode_set * setinfo = (failnode_set *) failsetbuf[cur_origin_idx]; ! adminfo1 = get_active_adminfo((SlonikStmt *) stmt, node_entry->backup_node); ! for (i = 0; i < node_entry->num_sets; i++) { ! setinfo[i].max_node = NULL; ! setinfo[i].max_seqno = 0; ! ! if (setinfo[i].num_directsub <= 1) { ! int64 ev_seqno; ! slon_mkquery(&query, ! "select max(ev_seqno) " ! " from \"_%s\".sl_event " ! " where ev_origin = %d " ! " and ev_type = 'SYNC'; ", ! stmt->hdr.script->clustername, ! node_entry->no_id); ! res1 = db_exec_select((SlonikStmt *) stmt, ! adminfo1, &query); ! if (res1 == NULL) ! { ! free(failnodebuf); ! free(failsetbuf); ! dstring_free(&query); ! return -1; ! } ! slon_scanint64(PQgetvalue(res1, 0, 0), &ev_seqno); ! ! setinfo[i].max_seqno = ev_seqno; ! ! PQclear(res1); ! ! continue; } ! slon_mkquery(&query, ! "select ssy_seqno " ! " from \"_%s\".sl_setsync " ! " where ssy_setid = %d; ", ! stmt->hdr.script->clustername, ! setinfo[i].set_id); ! ! for (j = 0; j < setinfo[i].num_subscribers; j++) ! { ! int64 ssy_seqno; ! ! res1 = db_exec_select((SlonikStmt *) stmt, ! setinfo[i].subscribers[j]->adminfo, &query); ! if (res1 == NULL) { ! free(failsetbuf); ! free(failnodebuf); ! ! dstring_free(&query); ! return -1; ! } ! if (PQntuples(res1) == 1) ! { ! slon_scanint64(PQgetvalue(res1, 0, 0), &ssy_seqno); ! ! if (setinfo[i].subscribers[j]->no_id == node_entry->backup_node) { ! if (ssy_seqno >= setinfo[i].max_seqno) ! { ! setinfo[i].max_node = setinfo[i].subscribers[j]; ! setinfo[i].max_seqno = ssy_seqno; ! } } + else + { + if (ssy_seqno > setinfo[i].max_seqno) + { + setinfo[i].max_node = setinfo[i].subscribers[j]; + setinfo[i].max_seqno = ssy_seqno; + } + } + + if (ssy_seqno > max_seqno_total[cur_origin_idx]) + max_seqno_total[cur_origin_idx] = ssy_seqno; } else { ! printf("can't get setsync status for set %d from node %d\n", ! setinfo[i].set_id, setinfo[i].subscribers[j]->no_id); ! rc = -1; } ! ! PQclear(res1); } } ! /* ! * Now switch the backup node to receive all sets from those highest ! * nodes. ! */ ! for (i = 0; i < node_entry->num_sets; i++) { + int use_node; ! if (setinfo[i].num_directsub <= 1) { ! use_node = node_entry->backup_node; } ! else if (setinfo[i].max_node == NULL) { ! printf("no setsync status for set %d found at all\n", ! setinfo[i].set_id); ! rc = -1; ! use_node = node_entry->backup_node; } else { ! printf("IMPORTANT: Last known SYNC for set %d = " ! INT64_FORMAT "\n", ! setinfo[i].set_id, ! setinfo[i].max_seqno); ! use_node = setinfo[i].max_node->no_id; + setinfo[i].max_node->num_sets++; + } ! if (use_node != node_entry->backup_node) ! { ! /** ! * commit the transaction so a new transaction ! * is ready for the lock table */ ! if (db_commit_xact((SlonikStmt *) stmt, adminfo1) < 0) { ! free(failsetbuf); ! free(failnodebuf); ! dstring_free(&query); ! return -1; } + slon_mkquery(&query, + "lock table \"_%s\".sl_event_lock, \"_%s\".sl_config_lock;" + "select \"_%s\".storeListen(%d,%d,%d); " + "select \"_%s\".subscribeSet_int(%d,%d,%d,'t','f'); ", + stmt->hdr.script->clustername, + stmt->hdr.script->clustername, + stmt->hdr.script->clustername, + node_entry->no_id, use_node, node_entry->backup_node, + stmt->hdr.script->clustername, + setinfo[i].set_id, use_node, node_entry->backup_node); + if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0) + rc = -1; + } + } + /* + * Commit the transaction on the backup node to activate those changes. + */ + if (db_commit_xact((SlonikStmt *) stmt, adminfo1) < 0) + rc = -1; + } + + cur_origin_idx=0; + for(node_entry=stmt->nodes; node_entry != NULL; + node_entry=node_entry->next, cur_origin_idx++) + { + failnode_set * setinfo = (failnode_set *) failsetbuf[cur_origin_idx]; + failnode_node * nodeinfo = (failnode_node *) failnodebuf[cur_origin_idx]; + adminfo1 = get_active_adminfo((SlonikStmt *) stmt, node_entry->backup_node); + /* + * Now execute all FAILED_NODE events on the node that had the highest of + * all events alltogether. + */ + if (max_node_total[cur_origin_idx] != NULL) + { + for (i = 0; i < node_entry->num_sets; i++) + { + char ev_seqno_c[NAMEDATALEN]; + char ev_seqfake_c[NAMEDATALEN]; ! sprintf(ev_seqno_c, INT64_FORMAT, setinfo[i].max_seqno); ! sprintf(ev_seqfake_c, INT64_FORMAT, ! ++max_seqno_total[cur_origin_idx]); ! if (db_commit_xact((SlonikStmt *) stmt, ! max_node_total[cur_origin_idx]->adminfo) ! < 0) { ! return -1; } slon_mkquery(&query, ! "lock table \"_%s\".sl_event_lock, \"_%s\".sl_config_lock;" ! "select \"_%s\".failedNode2(%d,%d,%d,'%s','%s'); ", stmt->hdr.script->clustername, stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! node_entry->no_id, node_entry->backup_node, ! setinfo[i].set_id, ev_seqno_c, ev_seqfake_c); ! printf("NOTICE: executing \"_%s\".failedNode2 on node %d\n", ! stmt->hdr.script->clustername, ! max_node_total[cur_origin_idx]->adminfo->no_id); ! if (db_exec_command((SlonikStmt *) stmt, ! max_node_total[cur_origin_idx]->adminfo, ! &query) < 0) ! rc = -1; ! else { ! SlonikAdmInfo * failed_conn_info=NULL; ! SlonikAdmInfo * last_conn_info=NULL; ! bool temp_conn_info=false; ! /** ! * now wait for the FAILOVER to finish. ! * To do this we must wait for the FAILOVER_EVENT ! * which has ev_origin=stmt->no_id (the failed node) ! * but was incjected into the sl_event table on the ! * most ahead node (max_node_total->adminfo) ! * to be confirmed by the backup node. ! * ! * Then we wait for the backup node to send an event ! * and be confirmed elsewhere. ! * ! */ ! ! ! SlonikStmt_wait_event wait_event; ! wait_event.hdr=*(SlonikStmt*)stmt; ! wait_event.wait_origin=node_entry->no_id; /*failed node*/ ! wait_event.wait_on=max_node_total[cur_origin_idx]->adminfo->no_id; ! wait_event.wait_confirmed=node_entry->backup_node; ! wait_event.wait_timeout=0; ! ! /** ! * see if we can find a admconninfo ! * for the failed node. ! */ ! ! for(failed_conn_info = stmt->hdr.script->adminfo_list; ! failed_conn_info != NULL; ! failed_conn_info=failed_conn_info->next) ! { ! ! if(failed_conn_info->no_id==node_entry->no_id) ! { ! break; ! } ! last_conn_info=failed_conn_info; ! } ! if(failed_conn_info == NULL) ! { ! temp_conn_info=true; ! last_conn_info->next = malloc(sizeof(SlonikAdmInfo)); ! memset(last_conn_info->next,0,sizeof(SlonikAdmInfo)); ! failed_conn_info=last_conn_info->next; ! failed_conn_info->no_id=node_entry->no_id; ! failed_conn_info->stmt_filename="slonik generated"; ! failed_conn_info->stmt_lno=-1; ! failed_conn_info->conninfo=""; ! failed_conn_info->script=last_conn_info->script; ! } ! ! failed_conn_info->last_event=max_seqno_total[cur_origin_idx]; ! ! /* ! * commit all open transactions despite of all possible errors ! * otherwise the WAIT FOR will not work. ! **/ ! for (i = 0; i < node_entry->num_nodes; i++) ! { ! if (db_commit_xact((SlonikStmt *) stmt, ! nodeinfo[i].adminfo) < 0) ! rc = -1; ! } ! ! ! rc = slonik_wait_event(&wait_event); ! if(rc < 0) ! { ! /** ! * pretty serious? how do we recover? ! */ ! printf("%s:%d error waiting for event\n", ! stmt->hdr.stmt_filename, stmt->hdr.stmt_lno); ! } ! ! if(temp_conn_info) ! { ! last_conn_info->next=failed_conn_info->next; ! free(failed_conn_info); ! ! } ! ! slon_mkquery(&query, ! "lock table \"_%s\".sl_event_lock; " ! "select \"_%s\".createEvent('_%s', 'SYNC'); ", ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername, ! stmt->hdr.script->clustername); ! if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, ! stmt->hdr.script,1) < 0) ! { ! printf("%s:%d: error submitting SYNC event to backup node" ! ,stmt->hdr.stmt_filename, stmt->hdr.stmt_lno); ! } ! ! }/*else*/ ! } ! } ! /* ! * commit all open transactions despite of all possible errors ! */ ! for (i = 0; i < node_entry->num_nodes; i++) ! { ! if (db_commit_xact((SlonikStmt *) stmt, ! nodeinfo[i].adminfo) < 0) ! rc = -1; ! } } free(failsetbuf); free(failnodebuf); diff --git a/src/slonik/slonik.h b/src/slonik/slonik.h index b26adbb..fedeb10 100644 *** a/src/slonik/slonik.h --- b/src/slonik/slonik.h *************** struct SlonikStmt_store_node_s *** 197,212 **** struct SlonikStmt_drop_node_s { SlonikStmt hdr; ! int no_id; int ev_origin; }; struct SlonikStmt_failed_node_s { SlonikStmt hdr; ! int no_id; ! int backup_node; }; --- 197,218 ---- struct SlonikStmt_drop_node_s { SlonikStmt hdr; ! int * no_id_list; int ev_origin; }; + struct failed_node_entry { + int no_id; + int backup_node; + struct failed_node_entry * next; + int num_sets; + int num_nodes; + }; struct SlonikStmt_failed_node_s { SlonikStmt hdr; ! struct failed_node_entry * nodes; };