Jan Wieck wieck at lists.slony.info
Thu Aug 23 11:13:01 PDT 2007
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv30310

Modified Files:
	remote_worker.c slon.h 
Log Message:
Changing the file name of the archive logs to be based on the
internal archive tracking counter. That way a mechanism that applies
the archives to the offline replica can easily figure out which file
needs to be applied next by looking at sl_archive_tracking.

Jan


Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.151
retrieving revision 1.152
diff -C2 -d -r1.151 -r1.152
*** remote_worker.c	20 Aug 2007 18:22:26 -0000	1.151
--- remote_worker.c	23 Aug 2007 18:12:59 -0000	1.152
***************
*** 253,257 ****
  
  
! static int	archive_open(SlonNode *node, char *seqbuf);
  static int	archive_close(SlonNode *node);
  static void archive_terminate(SlonNode *node);
--- 253,258 ----
  
  
! static int	archive_open(SlonNode *node, char *seqbuf, 
! 					PGconn *dbconn);
  static int	archive_close(SlonNode *node);
  static void archive_terminate(SlonNode *node);
***************
*** 259,264 ****
  static int	archive_append_str(SlonNode *node, const char *s);
  static int	archive_append_data(SlonNode *node, const char *s, int len);
- static int archive_tracking(SlonNode *node, const char *namespace, 
- 					PGconn *local_dbconn);
  
  
--- 260,263 ----
***************
*** 680,688 ****
  			if (archive_dir)
  			{
- 				ProviderInfo   *pinfo;
- 				ProviderSet	   *pset;
  				char			buf[256];
  
! 				if (archive_open(node, seqbuf) < 0)
  					slon_retry();
  				sprintf(buf, "-- %s", event->ev_type);
--- 679,685 ----
  			if (archive_dir)
  			{
  				char			buf[256];
  
! 				if (archive_open(node, seqbuf, local_dbconn) < 0)
  					slon_retry();
  				sprintf(buf, "-- %s", event->ev_type);
***************
*** 690,697 ****
  					slon_retry();
  
- 				if (archive_tracking(node, rtcfg_namespace, 
- 						local_dbconn) < 0)
- 					slon_retry();
- 
  				/*
  				 * Leave the archive open for event specific actions.
--- 687,690 ----
***************
*** 3471,3475 ****
  	if (archive_dir)
  	{
! 		rc = archive_open(node, seqbuf);
  		if (rc < 0)
  		{
--- 3464,3468 ----
  	if (archive_dir)
  	{
! 		rc = archive_open(node, seqbuf, local_dbconn);
  		if (rc < 0)
  		{
***************
*** 3833,3848 ****
  
  		/*
- 		 * Add a call to the archive tracking function to the archive log.
- 		 * This function ensures that all archive log files are applied in
- 		 * the right order.
- 		 */
- 		if (archive_dir)
- 		{
- 			rc = archive_tracking(node, rtcfg_namespace, local_dbconn);
- 			if (rc < 0)
- 				slon_retry();
- 		}
- 
- 		/*
  		 * We didn't add anything good in the provider clause. That shouldn't
  		 * be!
--- 3826,3829 ----
***************
*** 4969,4975 ****
   * - Second, you generate the header using generate_archive_header()
   *
-  * - Third, you need to set up the sync tracking function in the log
-  *	 using logarchive_tracking()
-  *
   * ========  Here Ends The Header of the Log Shipping Archive ========
   *
--- 4950,4953 ----
***************
*** 4995,5000 ****
   */
  static int
! archive_open(SlonNode *node, char *seqbuf)
  {
  	int		i;
  	int		rc;
--- 4973,4980 ----
   */
  static int
! archive_open(SlonNode *node, char *seqbuf, PGconn *dbconn)
  {
+ 	SlonDString query;
+ 	PGresult	*res;
  	int		i;
  	int		rc;
***************
*** 5007,5011 ****
  		node->archive_name = malloc(SLON_MAX_PATH);
  		node->archive_temp = malloc(SLON_MAX_PATH);
! 		if (node->archive_name == NULL || node->archive_temp == NULL)
  		{
  			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
--- 4987,4994 ----
  		node->archive_name = malloc(SLON_MAX_PATH);
  		node->archive_temp = malloc(SLON_MAX_PATH);
! 		node->archive_counter = malloc(64);
! 		node->archive_timestamp = malloc(256);
! 		if (node->archive_name == NULL || node->archive_temp == NULL || 
! 			node->archive_counter == NULL || node->archive_timestamp == NULL)
  		{
  			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
***************
*** 5024,5032 ****
  	}
  
  	sprintf(node->archive_name, "%s/slony1_log_%d_", archive_dir, 
  			node->no_id);
! 	for (i = strlen(seqbuf); i < 20; i++)
  		strcat(node->archive_name, "0");
! 	strcat(node->archive_name, seqbuf);
  	strcat(node->archive_name, ".sql");
  	strcpy(node->archive_temp, node->archive_name);
--- 5007,5048 ----
  	}
  
+ 	dstring_init(&query);
+ 	slon_mkquery(&query,
+ 			"update %s.sl_archive_counter "
+ 			"    set ac_num = ac_num + 1, "
+ 			"        ac_timestamp = CURRENT_TIMESTAMP; "
+ 			"select ac_num, ac_timestamp from %s.sl_archive_counter; ",
+ 			rtcfg_namespace, rtcfg_namespace);
+ 	res = PQexec(dbconn, dstring_data(&query));
+ 	if ((rc = PQresultStatus(res)) != PGRES_TUPLES_OK)
+ 	{
+ 		slon_log(SLON_ERROR,
+ 				 "remoteWorkerThread_%d: \"%s\" %s %s\n",
+ 				 node->no_id, dstring_data(&query),
+ 				 PQresStatus(rc),
+ 				 PQresultErrorMessage(res));
+ 		PQclear(res);
+ 		dstring_free(&query);
+ 		return -1;
+ 	}
+ 	if ((rc = PQntuples(res)) != 1)
+ 	{
+ 		slon_log(SLON_ERROR,
+ 				 "remoteWorkerThread_%d: expected 1 row in sl_archive_counter - found %d",
+ 				 node->no_id, rc);
+ 		PQclear(res);
+ 		dstring_free(&query);
+ 		return -1;
+ 	}
+ 	strcpy(node->archive_counter, PQgetvalue(res, 0, 0));
+ 	strcpy(node->archive_timestamp, PQgetvalue(res, 0, 1));
+ 	PQclear(res);
+ 	dstring_free(&query);
+ 	
  	sprintf(node->archive_name, "%s/slony1_log_%d_", archive_dir, 
  			node->no_id);
! 	for (i = strlen(node->archive_counter); i < 20; i++)
  		strcat(node->archive_name, "0");
! 	strcat(node->archive_name, node->archive_counter);
  	strcat(node->archive_name, ".sql");
  	strcpy(node->archive_temp, node->archive_name);
***************
*** 5042,5050 ****
  
  	rc = fprintf(node->archive_fp,
! 				   "-- Slony-I log shipping archive\n"
! 				   "-- Node %d, Event %s\n"
! 				   "set session_replication_role to replica;\n"
! 				   "start transaction;\n",
! 				   node->no_id, seqbuf);
  	if (rc < 0)
  	{
--- 5058,5082 ----
  
  	rc = fprintf(node->archive_fp,
! 		"------------------------------------------------------------------\n"
! 		"-- Slony-I log shipping archive\n"
! 		"-- Node %d, Event %s\n"
! 		"------------------------------------------------------------------\n"
! 		"set session_replication_role to replica;\n"
! 		"start transaction;\n",
! 		node->no_id, seqbuf);
! 	if (rc < 0)
! 	{
! 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! 				"Cannot write to archive file %s - %s\n",
! 				node->no_id, node->archive_temp, strerror(errno));
! 		return -1;
! 	}
! 	rc = fprintf(node->archive_fp,
! 		"select %s.archiveTrackingOffline('%s', '%s');\n"
! 		"-- end of log archiving header\n"
! 		"------------------------------------------------------------------\n"
! 		"-- start of Slony-I data\n"
! 		"------------------------------------------------------------------\n",
! 		rtcfg_namespace, node->archive_counter, node->archive_timestamp);
  	if (rc < 0)
  	{
***************
*** 5132,5209 ****
  
  /* ----------
-  * archive_tracking
-  * ----------
-  */
- static int
- archive_tracking(SlonNode *node, const char *namespace, 
- 					PGconn *local_dbconn)
- {
- 	SlonDString query;
- 	PGresult	*res;
- 	int			rc;
- 
- 	if (!archive_dir)
- 		return 0;
- 
- 	dstring_init(&query);
- 	slon_mkquery(&query,
- 			"update %s.sl_archive_counter "
- 			"    set ac_num = ac_num + 1, "
- 			"        ac_timestamp = CURRENT_TIMESTAMP; "
- 			"select ac_num, ac_timestamp from %s.sl_archive_counter; ",
- 			namespace, namespace);
- 	res = PQexec(local_dbconn, dstring_data(&query));
- 	if ((rc = PQresultStatus(res)) != PGRES_TUPLES_OK)
- 	{
- 		slon_log(SLON_ERROR,
- 				 "remoteWorkerThread_%d: \"%s\" %s %s\n",
- 				 node->no_id, dstring_data(&query),
- 				 PQresStatus(rc),
- 				 PQresultErrorMessage(res));
- 		PQclear(res);
- 		dstring_free(&query);
- 		return -1;
- 	}
- 	if ((rc = PQntuples(res)) != 1)
- 	{
- 		slon_log(SLON_ERROR,
- 				 "remoteWorkerThread_%d: expected 1 row in sl_archive_counter - found %d",
- 				 node->no_id, rc);
- 		PQclear(res);
- 		dstring_free(&query);
- 		return -1;
- 	}
- 
- 	if (node->archive_fp == NULL)
- 	{
- 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- 				"Cannot write to archive file %s - not open\n",
- 				node->no_id, node->archive_temp);
- 		PQclear(res);
- 		dstring_free(&query);
- 		return -1;
- 	}
- 
- 	rc = fprintf(node->archive_fp, 
- 		"\nselect %s.archiveTracking_offline('%s', '%s');\n"
- 		"-- end of log archiving header\n"
- 		"------------------------------------------------------------------\n"
- 		"-- start of Slony-I data\n"
- 		"------------------------------------------------------------------\n",
- 		namespace, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 1));
- 	PQclear(res);
- 	dstring_free(&query);
- 	if (rc < 0)
- 	{
- 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- 				"Cannot write to archive file %s - %s\n",
- 				node->no_id, node->archive_temp, strerror(errno));
- 		return -1;
- 	}
- 
- 	return 0;
- }
- 
- /* ----------
   * archive_append_ds
   * ----------
--- 5164,5167 ----

Index: slon.h
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.63
retrieving revision 1.64
diff -C2 -d -r1.63 -r1.64
*** slon.h	20 Apr 2007 20:53:18 -0000	1.63
--- slon.h	23 Aug 2007 18:12:59 -0000	1.64
***************
*** 111,114 ****
--- 111,116 ----
  	char	   *archive_name;
  	char	   *archive_temp;
+ 	char	   *archive_counter;
+ 	char	   *archive_timestamp;
  	FILE	   *archive_fp;
  



More information about the Slony1-commit mailing list