Jan Wieck wieck at lists.slony.info
Mon Aug 20 11:22:28 PDT 2007
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv10238/src/slon

Modified Files:
	remote_worker.c 
Log Message:
Forward patching the new archive tracking into HEAD


Jan


Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.150
retrieving revision 1.151
diff -C2 -d -r1.150 -r1.151
*** remote_worker.c	29 Jul 2007 17:38:23 -0000	1.150
--- remote_worker.c	20 Aug 2007 18:22:26 -0000	1.151
***************
*** 260,265 ****
  static int	archive_append_data(SlonNode *node, const char *s, int len);
  static int archive_tracking(SlonNode *node, const char *namespace, 
! 					int sub_set, const char *firstseq,
! 					const char *seqbuf, const char *timestamp);
  
  
--- 260,264 ----
  static int	archive_append_data(SlonNode *node, const char *s, int len);
  static int archive_tracking(SlonNode *node, const char *namespace, 
! 					PGconn *local_dbconn);
  
  
***************
*** 691,705 ****
  					slon_retry();
  
! 				for (pinfo=wd->provider_head; pinfo != NULL; pinfo = pinfo->next)
! 				{
! 					for(pset = pinfo->set_head; pset != NULL; pset = pset->next)
! 					{
! 						if (archive_tracking(node, rtcfg_namespace, 
! 								pset->set_id, pset->ssy_seqno, seqbuf, 
! 								event->ev_timestamp_c) < 0)
! 							slon_retry();
! 						strcpy(pset->ssy_seqno, seqbuf);
! 					}
! 				}
  
  				/*
--- 690,696 ----
  					slon_retry();
  
! 				if (archive_tracking(node, rtcfg_namespace, 
! 						local_dbconn) < 0)
! 					slon_retry();
  
  				/*
***************
*** 875,891 ****
  								 "select %s.dropSet_int(%d); ",
  								 rtcfg_namespace, set_id);
- 
- 				/*
- 				 * The table deleted needs to be dropped from log shipping too
- 				 */
- 				if (archive_dir)
- 				{
- 					slon_mkquery(&lsquery,
- 								 "delete from %s.sl_setsync_offline "
- 								 "  where ssy_setid= %d;",
- 								 rtcfg_namespace, set_id);
- 					if (archive_append_ds(node, &lsquery) < 0)
- 						slon_retry();
- 				}
  			}
  			else if (strcmp(event->ev_type, "MERGE_SET") == 0)
--- 866,869 ----
***************
*** 901,917 ****
  								 set_id, add_id);
  
- 				/*
- 				 * Log shipping gets the change here that we need to delete
- 				 * the table being merged from the set being maintained.
- 				 */
- 				if (archive_dir)
- 				{
- 					slon_mkquery(&lsquery,
- 							  "delete from %s.sl_setsync_offline "
- 							  "  where ssy_setid= %d;",
- 							  rtcfg_namespace, add_id);
- 					if (archive_append_ds(node, &lsquery) < 0)
- 						slon_retry();
- 				}
  			}
  			else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
--- 879,882 ----
***************
*** 1281,1293 ****
  
  				need_reloadListen = true;
- 				if (archive_dir)
- 				{
- 					(void) slon_mkquery(&lsquery,
- 								 "delete from %s.sl_setsync_offline "
- 								 "  where ssy_setid= %d;",
- 								 rtcfg_namespace, sub_set);
- 					if (archive_append_ds(node, &lsquery) < 0)
- 						slon_retry();
- 				}
  			}
  			else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
--- 1246,1249 ----
***************
*** 3424,3450 ****
  		return -1;
  	}
- 	if (archive_dir)
- 	{
- 		(void) slon_mkquery(&lsquery,
- 			     "insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) "
- 			     "values ('%d', '%s');",
- 			     rtcfg_namespace, set_id, ssy_seqno);
- 		rc = archive_append_ds(node, &lsquery);
- 		if (rc < 0)
- 		{
- 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- 					 " could not insert to sl_setsync_offline",
- 					 node->no_id);
- 			PQclear(res1);
- 			slon_disconnectdb(pro_conn);
- 			dstring_free(&query1);
- 			dstring_free(&query2);
- 			dstring_free(&query3);
- 			dstring_free(&lsquery);
- 			dstring_free(&indexregenquery);
- 			archive_terminate(node);
- 			return -1;
- 		}
- 	}
  	PQclear(res1);
  	gettimeofday(&tv_now, NULL);
--- 3380,3383 ----
***************
*** 3896,3932 ****
  
  			PQclear(res2);
- 
- 			/*
- 			 * Add a call to the setsync tracking function to the archive log.
- 			 * This function ensures that all archive log files are applied in
- 			 * the right order.
- 			 */
- 			if (archive_dir)
- 			{
- 				for (pset = provider->set_head; pset; pset = pset->next)
- 					if (pset->set_id == sub_set) break;
- 				if (pset == NULL)
- 				{
- 					slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- 							"set %d not found in runtime config\n",
- 							node->no_id, sub_set);
- 					slon_retry();
- 				}
- 
- 				slon_log(SLON_DEBUG2, "writing archive log...\n");
- 				fflush(stderr);
- 				fflush(stdout);
- 				rc = archive_tracking(node, rtcfg_namespace, sub_set,
- 										 pset->ssy_seqno, seqbuf,
- 										 event->ev_timestamp_c);
- 				if (rc < 0)
- 					slon_retry();
- 
- 				strcpy(pset->ssy_seqno, seqbuf);
- 			}
  		}
  		PQclear(res1);
  
  		/*
  		 * We didn't add anything good in the provider clause. That shouldn't
  		 * be!
--- 3829,3848 ----
  
  			PQclear(res2);
  		}
  		PQclear(res1);
  
  		/*
+ 		 * Add a call to the archive tracking function to the archive log.
+ 		 * This function ensures that all archive log files are applied in
+ 		 * the right order.
+ 		 */
+ 		if (archive_dir)
+ 		{
+ 			rc = archive_tracking(node, rtcfg_namespace, local_dbconn);
+ 			if (rc < 0)
+ 				slon_retry();
+ 		}
+ 
+ 		/*
  		 * We didn't add anything good in the provider clause. That shouldn't
  		 * be!
***************
*** 5167,5171 ****
  		"------------------------------------------------------------------\n"
  		"commit;\n"
! 		"vacuum analyze %s.sl_setsync_offline;\n",
  		rtcfg_namespace);
  	if (rc < 0)
--- 5083,5087 ----
  		"------------------------------------------------------------------\n"
  		"commit;\n"
! 		"vacuum analyze %s.sl_archive_tracking;\n",
  		rtcfg_namespace);
  	if (rc < 0)
***************
*** 5220,5232 ****
   */
  static int
! archive_tracking(SlonNode *node, const char *namespace, int sub_set, 
! 					const char *firstseq, const char *seqbuf, 
! 					const char *timestamp)
  {
! 	int		rc;
  
  	if (!archive_dir)
  		return 0;
  
  	if (node->archive_fp == NULL)
  	{
--- 5136,5178 ----
   */
  static int
! archive_tracking(SlonNode *node, const char *namespace, 
! 					PGconn *local_dbconn)
  {
! 	SlonDString query;
! 	PGresult	*res;
! 	int			rc;
  
  	if (!archive_dir)
  		return 0;
  
+ 	dstring_init(&query);
+ 	slon_mkquery(&query,
+ 			"update %s.sl_archive_counter "
+ 			"    set ac_num = ac_num + 1, "
+ 			"        ac_timestamp = CURRENT_TIMESTAMP; "
+ 			"select ac_num, ac_timestamp from %s.sl_archive_counter; ",
+ 			namespace, namespace);
+ 	res = PQexec(local_dbconn, dstring_data(&query));
+ 	if ((rc = PQresultStatus(res)) != PGRES_TUPLES_OK)
+ 	{
+ 		slon_log(SLON_ERROR,
+ 				 "remoteWorkerThread_%d: \"%s\" %s %s\n",
+ 				 node->no_id, dstring_data(&query),
+ 				 PQresStatus(rc),
+ 				 PQresultErrorMessage(res));
+ 		PQclear(res);
+ 		dstring_free(&query);
+ 		return -1;
+ 	}
+ 	if ((rc = PQntuples(res)) != 1)
+ 	{
+ 		slon_log(SLON_ERROR,
+ 				 "remoteWorkerThread_%d: expected 1 row in sl_archive_counter - found %d",
+ 				 node->no_id, rc);
+ 		PQclear(res);
+ 		dstring_free(&query);
+ 		return -1;
+ 	}
+ 
  	if (node->archive_fp == NULL)
  	{
***************
*** 5234,5247 ****
  				"Cannot write to archive file %s - not open\n",
  				node->no_id, node->archive_temp);
  		return -1;
  	}
  
  	rc = fprintf(node->archive_fp, 
! 		"\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
  		"-- end of log archiving header\n"
  		"------------------------------------------------------------------\n"
  		"-- start of Slony-I data\n"
  		"------------------------------------------------------------------\n",
! 		namespace, sub_set, firstseq, seqbuf, timestamp);
  	if (rc < 0)
  	{
--- 5180,5197 ----
  				"Cannot write to archive file %s - not open\n",
  				node->no_id, node->archive_temp);
+ 		PQclear(res);
+ 		dstring_free(&query);
  		return -1;
  	}
  
  	rc = fprintf(node->archive_fp, 
! 		"\nselect %s.archiveTracking_offline('%s', '%s');\n"
  		"-- end of log archiving header\n"
  		"------------------------------------------------------------------\n"
  		"-- start of Slony-I data\n"
  		"------------------------------------------------------------------\n",
! 		namespace, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 1));
! 	PQclear(res);
! 	dstring_free(&query);
  	if (rc < 0)
  	{



More information about the Slony1-commit mailing list