Chris Browne cbbrowne at lists.slony.info
Mon May 26 14:09:50 PDT 2008
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv12883/slon

Modified Files:
	local_listen.c remote_listen.c remote_worker.c 
Log Message:
Remove code that generates + listens to LISTEN/NOTIFY events for
  Events + Confirms

remote_listen.c now functions solely via polling, which we already know
works fine, as we have had a polling mode for quite a while in the 2.0
code tree.

The slon _Restart LISTEN/NOTIFY has been left alone; making sure that this
is handled rightly will be left as a further exercise to be handled
independently.


Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.168
retrieving revision 1.169
diff -C2 -d -r1.168 -r1.169
*** remote_worker.c	26 May 2008 20:09:54 -0000	1.168
--- remote_worker.c	26 May 2008 21:09:48 -0000	1.169
***************
*** 243,247 ****
  			  SlonDString *dsp);
  static void query_append_event(SlonDString *dsp,
! 				   SlonWorkMsg_event *event, bool suppress_notify);
  static void store_confirm_forward(SlonNode *node, SlonConn *conn,
  					  SlonWorkMsg_confirm *confirm);
--- 243,247 ----
  			  SlonDString *dsp);
  static void query_append_event(SlonDString *dsp,
! 							   SlonWorkMsg_event *event);
  static void store_confirm_forward(SlonNode *node, SlonConn *conn,
  					  SlonWorkMsg_confirm *confirm);
***************
*** 291,295 ****
  	bool		event_ok;
  	bool		need_reloadListen = false;
- 	bool		suppress_notify;
  
  	slon_log(SLON_INFO,
--- 291,294 ----
***************
*** 503,508 ****
  
  		/*
! 		 * Construct the queries to begin a transaction, notify on the event
! 		 * and confirm relations, insert the event into our local sl_event
  		 * table and confirm it in our local sl_confirm table. When this
  		 * transaction commits, every other remote node listening for events
--- 502,507 ----
  
  		/*
! 		 * Construct the queries to begin a transaction, insert the event 
! 		 * into our local sl_event
  		 * table and confirm it in our local sl_confirm table. When this
  		 * transaction commits, every other remote node listening for events
***************
*** 624,628 ****
  			{
  				/*
! 				 * Execute the forwarding and notify stuff, but do not commit
  				 * the transaction yet.
  				 */
--- 623,627 ----
  			{
  				/*
! 				 * Execute the forwarding stuff, but do not commit
  				 * the transaction yet.
  				 */
***************
*** 663,674 ****
  			dstring_reset(&query1);
  			last_sync_group_size = 0;
- 			suppress_notify = FALSE;
  			for (i = 0; i < sync_group_size; i++)
  			{
! 				query_append_event(&query1, sync_group[i], suppress_notify);
! 				if (i < (sync_group_size - 1))
! 					free(sync_group[i]);
! 				last_sync_group_size++;
! 				suppress_notify = TRUE;
  			}
  			slon_appendquery(&query1, "commit transaction;");
--- 662,671 ----
  			dstring_reset(&query1);
  			last_sync_group_size = 0;
  			for (i = 0; i < sync_group_size; i++)
  			{
! 					query_append_event(&query1, sync_group[i]);
! 					if (i < (sync_group_size - 1))
! 							free(sync_group[i]);
! 					last_sync_group_size++;
  			}
  			slon_appendquery(&query1, "commit transaction;");
***************
*** 1041,1045 ****
  									 "notify \"_%s_Restart\"; ",
  									 rtcfg_cluster_name);
! 					query_append_event(&query1, event, FALSE);
  					slon_appendquery(&query1, "commit transaction;");
  					query_execute(node, local_dbconn, &query1);
--- 1038,1042 ----
  									 "notify \"_%s_Restart\"; ",
  									 rtcfg_cluster_name);
! 					query_append_event(&query1, event);
  					slon_appendquery(&query1, "commit transaction;");
  					query_execute(node, local_dbconn, &query1);
***************
*** 1423,1430 ****
  			if (event_ok)
  			{
! 				query_append_event(&query1, event, FALSE);
! 				slon_appendquery(&query1, "commit transaction;");
! 				if (archive_close(node) < 0)
! 					slon_retry();
  			}
  			else
--- 1420,1427 ----
  			if (event_ok)
  			{
! 					query_append_event(&query1, event);
! 					slon_appendquery(&query1, "commit transaction;");
! 					if (archive_close(node) < 0)
! 							slon_retry();
  			}
  			else
***************
*** 2141,2162 ****
   * query_append_event
   *
!  * Add queries to a dstring that notify for Event and Confirm and that insert a
!  * duplicate of an event record as well as the confirmation for it.
!  * "suppress_notify" parm permits omitting the notify request if running this many times
   * ----------
   */
  static void
! query_append_event(SlonDString *dsp, SlonWorkMsg_event *event, bool suppress_notify)
  {
  	char		seqbuf[64];
  
  	sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
- 	if (!suppress_notify)
- 	{
- 		slon_appendquery(dsp,
- 						 "notify \"_%s_Event\"; ",
- 						 rtcfg_cluster_name);
- 
- 	}
  	slon_appendquery(dsp,
  					 "insert into %s.sl_event "
--- 2138,2151 ----
   * query_append_event
   *
!  * Add queries to a dstring that insert a duplicate of an event record
!  * as well as the confirmation for it.  
   * ----------
   */
  static void
! query_append_event(SlonDString *dsp, SlonWorkMsg_event *event)
  {
  	char		seqbuf[64];
  
  	sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
  	slon_appendquery(dsp,
  					 "insert into %s.sl_event "

Index: remote_listen.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.44
retrieving revision 1.45
diff -C2 -d -r1.44 -r1.45
*** remote_listen.c	26 May 2008 20:14:27 -0000	1.44
--- remote_listen.c	26 May 2008 21:09:48 -0000	1.45
***************
*** 56,64 ****
  					SlonConn * conn, struct listat * listat);
  
- typedef enum {
- 	SLON_POLLSTATE_POLL=1, 
- 	SLON_POLLSTATE_LISTEN
- } PollState;
- static PollState	poll_state;
  static int			poll_sleep;
  
--- 56,59 ----
***************
*** 92,97 ****
  	int64		new_config_seq = 0;
  
- 	PollState	oldpstate;
- 
  	slon_log(SLON_INFO,
  			 "remoteListenThread_%d: thread starts\n",
--- 87,90 ----
***************
*** 106,110 ****
  
  	poll_sleep = 0;
- 	poll_state = SLON_POLLSTATE_POLL;	/* Initially, start in Polling mode */
  
  	sprintf(conn_symname, "node_%d_listen", node->no_id);
--- 99,102 ----
***************
*** 238,263 ****
  			 */
  			(void) slon_mkquery(&query1,
- 				     /* "listen \"_%s_Event\"; " */
- 				     /*	 skip confirms "listen \"_%s_Confirm\"; " */
  				     "select %s.registerNodeConnection(%d); ",
- 				     /* rtcfg_cluster_name,  */
  				     rtcfg_namespace, rtcfg_nodeid);
  
- 			if (poll_state == SLON_POLLSTATE_LISTEN) {
- 				slon_appendquery(&query1, 
- 						 "listen \"_%s_Event\"; ",
- 						 rtcfg_cluster_name);
- 			} else {
- 				slon_appendquery(&query1, 
- 						 "unlisten \"_%s_Event\"; ",
- 						 rtcfg_cluster_name);
- 			}
  			res = PQexec(dbconn, dstring_data(&query1));
! 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
  			{
  				slon_log(SLON_ERROR,
  					 "remoteListenThread_%d: \"%s\" - %s",
  					 node->no_id,
! 					 dstring_data(&query1), PQresultErrorMessage(res));
  				PQclear(res);
  				slon_disconnectdb(conn);
--- 230,243 ----
  			 */
  			(void) slon_mkquery(&query1,
  				     "select %s.registerNodeConnection(%d); ",
  				     rtcfg_namespace, rtcfg_nodeid);
  
  			res = PQexec(dbconn, dstring_data(&query1));
! 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			{
  				slon_log(SLON_ERROR,
  					 "remoteListenThread_%d: \"%s\" - %s",
  					 node->no_id,
! 						 dstring_data(&query1), PQresultErrorMessage(res));
  				PQclear(res);
  				slon_disconnectdb(conn);
***************
*** 318,322 ****
  		 * Receive events from the provider node
  		 */
- 		oldpstate = poll_state;
  		rc = remoteListen_receive_events(node, conn, listat_head);
  		if (rc < 0)
--- 298,301 ----
***************
*** 333,371 ****
  			continue;
  		}
- 		if (oldpstate != poll_state) { /* Switched states... */
- 			switch (poll_state) {
- 			case SLON_POLLSTATE_POLL:
- 				slon_log(SLON_DEBUG2, 
- 					 "remoteListenThread_%d: UNLISTEN - switch into polling mode\n",
- 					 node->no_id);
- 
- 				(void) slon_mkquery(&query1,
- 					     "unlisten \"_%s_Event\"; ",
- 					     rtcfg_cluster_name);
- 				break;
- 			case SLON_POLLSTATE_LISTEN:
- 				slon_log(SLON_DEBUG2, 
- 					 "remoteListenThread_%d: LISTEN - switch from polling mode to use LISTEN\n",
- 					 node->no_id);
- 				(void) slon_mkquery(&query1,
- 					     "listen \"_%s_Event\"; ",
- 					     rtcfg_cluster_name);
- 				break;
- 			}			
- 			res = PQexec(dbconn, dstring_data(&query1));
- 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
- 			{
- 				slon_log(SLON_ERROR,
- 					 "remoteListenThread_%d: \"%s\" - %s",
- 					 node->no_id,
- 					 dstring_data(&query1), PQresultErrorMessage(res));
- 				PQclear(res);
- 				slon_disconnectdb(conn);
- 				free(conn_conninfo);
- 				conn = NULL;
- 				conn_conninfo = NULL;
- 				continue;
- 			}
- 		}
  
  		/*
--- 312,315 ----
***************
*** 375,381 ****
  		 */
  		
- 		/* Initially: Let's just blindly check... */
- 		/* if (forward_confirm)
- 		   { */
  		rc = remoteListen_forward_confirm(node, conn);
  		if (rc < 0)
--- 319,322 ----
***************
*** 800,804 ****
  	if (ntuples > 0) {
  			if ((sel_max_events > 2) && (sync_group_maxsize > 100)) {
- 					poll_state = SLON_POLLSTATE_LISTEN;
  					slon_log(SLON_INFO, "remoteListenThread_%d: drew maximum # of events for %d iterations\n",
  							 node->no_id, sel_max_events);
--- 741,744 ----
***************
*** 808,812 ****
  			} else {
  					poll_sleep = 0;
- 					poll_state = SLON_POLLSTATE_POLL;
  			}
  	} else {
--- 748,751 ----
***************
*** 814,822 ****
  			if (poll_sleep > sync_interval_timeout) {
  					poll_sleep = sync_interval_timeout;
- 					poll_state = SLON_POLLSTATE_LISTEN;
  			}
  	}
  	PQclear(res);
- 	last_event_sel = ntuples;
  	return 0;
  }
--- 753,759 ----

Index: local_listen.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.45
retrieving revision 1.46
diff -C2 -d -r1.45 -r1.46
*** local_listen.c	21 Jan 2008 18:54:11 -0000	1.45
--- local_listen.c	26 May 2008 21:09:48 -0000	1.46
***************
*** 68,74 ****
  	 */
  	(void) slon_mkquery(&query1,
- 		     /* "listen \"_%s_Event\"; " */
  		     "listen \"_%s_Restart\"; ",
- 		     /*	 rtcfg_cluster_name,  */
  		     rtcfg_cluster_name);
  	res = PQexec(dbconn, dstring_data(&query1));
--- 68,72 ----



More information about the Slony1-commit mailing list