Jan Wieck wieck at lists.slony.info
Mon Aug 20 10:02:30 PDT 2007
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv9726/src/slon

Modified Files:
      Tag: REL_1_2_STABLE
	remote_worker.c 
Log Message:
I've had it now with the setsync tracking in offline archives.

The whole idea to track the setsync status is bogus to begin with, since
the real online replica doesn't update the sl_setsync table on every
event.

I added another table, sl_archive_counter where the log writing slon
simply tracks when it wrote the last offline archive file and maintains
a counter. This counter is now tracked in the offline replica and must
increment gap free.

Jan


Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.124.2.19
retrieving revision 1.124.2.20
diff -C2 -d -r1.124.2.19 -r1.124.2.20
*** remote_worker.c	10 Aug 2007 18:32:21 -0000	1.124.2.19
--- remote_worker.c	20 Aug 2007 17:02:28 -0000	1.124.2.20
***************
*** 263,268 ****
  static int	archive_append_data(SlonNode *node, const char *s, int len);
  static int archive_tracking(SlonNode *node, const char *namespace, 
! 					int sub_set, const char *firstseq,
! 					const char *seqbuf, const char *timestamp);
  
  
--- 263,267 ----
  static int	archive_append_data(SlonNode *node, const char *s, int len);
  static int archive_tracking(SlonNode *node, const char *namespace, 
! 					PGconn *local_dbconn);
  
  
***************
*** 692,706 ****
  					slon_retry();
  
! 				for (pinfo=wd->provider_head; pinfo != NULL; pinfo = pinfo->next)
! 				{
! 					for(pset = pinfo->set_head; pset != NULL; pset = pset->next)
! 					{
! 						if (archive_tracking(node, rtcfg_namespace, 
! 								pset->set_id, pset->ssy_seqno, seqbuf, 
! 								event->ev_timestamp_c) < 0)
! 							slon_retry();
! 						strcpy(pset->ssy_seqno, seqbuf);
! 					}
! 				}
  
  				/*
--- 691,697 ----
  					slon_retry();
  
! 				if (archive_tracking(node, rtcfg_namespace, 
! 						local_dbconn) < 0)
! 					slon_retry();
  
  				/*
***************
*** 876,892 ****
  								 "select %s.dropSet_int(%d); ",
  								 rtcfg_namespace, set_id);
- 
- 				/*
- 				 * The table deleted needs to be dropped from log shipping too
- 				 */
- 				if (archive_dir)
- 				{
- 					slon_mkquery(&lsquery,
- 								 "delete from %s.sl_setsync_offline "
- 								 "  where ssy_setid= %d;",
- 								 rtcfg_namespace, set_id);
- 					if (archive_append_ds(node, &lsquery) < 0)
- 						slon_retry();
- 				}
  			}
  			else if (strcmp(event->ev_type, "MERGE_SET") == 0)
--- 867,870 ----
***************
*** 902,918 ****
  								 set_id, add_id);
  
- 				/*
- 				 * Log shipping gets the change here that we need to delete
- 				 * the table being merged from the set being maintained.
- 				 */
- 				if (archive_dir)
- 				{
- 					slon_mkquery(&lsquery,
- 							  "delete from %s.sl_setsync_offline "
- 							  "  where ssy_setid= %d;",
- 							  rtcfg_namespace, add_id);
- 					if (archive_append_ds(node, &lsquery) < 0)
- 						slon_retry();
- 				}
  			}
  			else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
--- 880,883 ----
***************
*** 1302,1314 ****
  
  				need_reloadListen = true;
- 				if (archive_dir)
- 				{
- 					slon_mkquery(&lsquery,
- 								 "delete from %s.sl_setsync_offline "
- 								 "  where ssy_setid= %d;",
- 								 rtcfg_namespace, sub_set);
- 					if (archive_append_ds(node, &lsquery) < 0)
- 						slon_retry();
- 				}
  			}
  			else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
--- 1267,1270 ----
***************
*** 3744,3767 ****
  	if (archive_dir)
  	{
- 		slon_mkquery(&lsquery,
- 			     "insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) "
- 			     "values ('%d', '%s');",
- 			     rtcfg_namespace, set_id, ssy_seqno);
- 		rc = archive_append_ds(node, &lsquery);
- 		if (rc < 0)
- 		{
- 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- 					 " could not add data to archive",
- 					 node->no_id);
- 			slon_disconnectdb(pro_conn);
- 			dstring_free(&query1);
- 			dstring_free(&query2);
- 			dstring_free(&query3);
- 			dstring_free(&lsquery);
- 			dstring_free(&indexregenquery);
- 			archive_terminate(node);
- 			return -1;
- 		}
- 
  		/*
  		 * Refresh the sl_sequence_offline table
--- 3700,3703 ----
***************
*** 4282,4318 ****
  
  			PQclear(res2);
- 
- 			/*
- 			 * Add a call to the setsync tracking function to the archive log.
- 			 * This function ensures that all archive log files are applied in
- 			 * the right order.
- 			 */
- 			if (archive_dir)
- 			{
- 				for (pset = provider->set_head; pset; pset = pset->next)
- 					if (pset->set_id == sub_set) break;
- 				if (pset == NULL)
- 				{
- 					slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- 							"set %d not found in runtime config\n",
- 							node->no_id, sub_set);
- 					slon_retry();
- 				}
- 
- 				slon_log(SLON_DEBUG2, "writing archive log...\n");
- 				fflush(stderr);
- 				fflush(stdout);
- 				rc = archive_tracking(node, rtcfg_namespace, sub_set,
- 										 pset->ssy_seqno, seqbuf,
- 										 event->ev_timestamp_c);
- 				if (rc < 0)
- 					slon_retry();
- 
- 				strcpy(pset->ssy_seqno, seqbuf);
- 			}
  		}
  		PQclear(res1);
  
  		/*
  		 * We didn't add anything good in the provider clause. That shouldn't
  		 * be!
--- 4218,4237 ----
  
  			PQclear(res2);
  		}
  		PQclear(res1);
  
  		/*
+ 		 * Add a call to the archive tracking function to the archive log.
+ 		 * This function ensures that all archive log files are applied in
+ 		 * the right order.
+ 		 */
+ 		if (archive_dir)
+ 		{
+ 			rc = archive_tracking(node, rtcfg_namespace, local_dbconn);
+ 			if (rc < 0)
+ 				slon_retry();
+ 		}
+ 
+ 		/*
  		 * We didn't add anything good in the provider clause. That shouldn't
  		 * be!
***************
*** 5592,5596 ****
  		"------------------------------------------------------------------\n"
  		"commit;\n"
! 		"vacuum analyze %s.sl_setsync_offline;\n",
  		rtcfg_namespace);
  	if (rc < 0)
--- 5511,5515 ----
  		"------------------------------------------------------------------\n"
  		"commit;\n"
! 		"vacuum analyze %s.sl_archive_tracking;\n",
  		rtcfg_namespace);
  	if (rc < 0)
***************
*** 5645,5657 ****
   */
  static int
! archive_tracking(SlonNode *node, const char *namespace, int sub_set, 
! 					const char *firstseq, const char *seqbuf, 
! 					const char *timestamp)
  {
! 	int		rc;
  
  	if (!archive_dir)
  		return 0;
  
  	if (node->archive_fp == NULL)
  	{
--- 5564,5606 ----
   */
  static int
! archive_tracking(SlonNode *node, const char *namespace, 
! 					PGconn *local_dbconn)
  {
! 	SlonDString query;
! 	PGresult	*res;
! 	int			rc;
  
  	if (!archive_dir)
  		return 0;
  
+ 	dstring_init(&query);
+ 	slon_mkquery(&query,
+ 			"update %s.sl_archive_counter "
+ 			"    set ac_num = ac_num + 1, "
+ 			"        ac_timestamp = CURRENT_TIMESTAMP; "
+ 			"select ac_num, ac_timestamp from %s.sl_archive_counter; ",
+ 			namespace, namespace);
+ 	res = PQexec(local_dbconn, dstring_data(&query));
+ 	if ((rc = PQresultStatus(res)) != PGRES_TUPLES_OK)
+ 	{
+ 		slon_log(SLON_ERROR,
+ 				 "remoteWorkerThread_%d: \"%s\" %s %s\n",
+ 				 node->no_id, dstring_data(&query),
+ 				 PQresStatus(rc),
+ 				 PQresultErrorMessage(res));
+ 		PQclear(res);
+ 		dstring_free(&query);
+ 		return -1;
+ 	}
+ 	if ((rc = PQntuples(res)) != 1)
+ 	{
+ 		slon_log(SLON_ERROR,
+ 				 "remoteWorkerThread_%d: expected 1 row in sl_archive_counter - found %d",
+ 				 node->no_id, rc);
+ 		PQclear(res);
+ 		dstring_free(&query);
+ 		return -1;
+ 	}
+ 
  	if (node->archive_fp == NULL)
  	{
***************
*** 5659,5672 ****
  				"Cannot write to archive file %s - not open\n",
  				node->no_id, node->archive_temp);
  		return -1;
  	}
  
  	rc = fprintf(node->archive_fp, 
! 		"\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
  		"-- end of log archiving header\n"
  		"------------------------------------------------------------------\n"
  		"-- start of Slony-I data\n"
  		"------------------------------------------------------------------\n",
! 		namespace, sub_set, firstseq, seqbuf, timestamp);
  	if (rc < 0)
  	{
--- 5608,5625 ----
  				"Cannot write to archive file %s - not open\n",
  				node->no_id, node->archive_temp);
+ 		PQclear(res);
+ 		dstring_free(&query);
  		return -1;
  	}
  
  	rc = fprintf(node->archive_fp, 
! 		"\nselect %s.archiveTracking_offline('%s', '%s');\n"
  		"-- end of log archiving header\n"
  		"------------------------------------------------------------------\n"
  		"-- start of Slony-I data\n"
  		"------------------------------------------------------------------\n",
! 		namespace, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 1));
! 	PQclear(res);
! 	dstring_free(&query);
  	if (rc < 0)
  	{



More information about the Slony1-commit mailing list