Jan Wieck wieck at lists.slony.info
Sat Sep 8 19:37:07 PDT 2007
Update of /home/cvsd/slony1/slony1-engine/src/slony_logshipper
In directory main.slony.info:/tmp/cvs-serv17109/src/slony_logshipper

Added Files:
	.cvsignore Makefile dbutil.c ipcutil.c parser.y scan.l 
	slony_logshipper.c slony_logshipper.h 
Log Message:
Add slony_logshipper to HEAD.

Jan


--- NEW FILE: .cvsignore ---
slony_logshipper
parser.c
y.tab.h
scan.c

--- NEW FILE: slony_logshipper.h ---
/*-------------------------------------------------------------------------
 * slony_logshipper.h
 *
 *	Definitions for slony_logshipper
 *
 *	Copyright (c) 2003-2004, PostgreSQL Global Development Group
 *	Author: Jan Wieck, Afilias USA INC.
 *
 *	$Id: slony_logshipper.h,v 1.2 2007-09-09 02:37:05 wieck Exp $
 *-------------------------------------------------------------------------
 */


/* ----------
 * SlonDString
 * ----------
 */
#define		SLON_DSTRING_SIZE_INIT	256
#define		SLON_DSTRING_SIZE_INC	2

typedef struct
{
	size_t		n_alloc;
	size_t		n_used;
	char	   *data;
}	SlonDString;

#define		dstring_init(__ds) \
do { \
	(__ds)->n_alloc = SLON_DSTRING_SIZE_INIT; \
	(__ds)->n_used = 0; \
	(__ds)->data = malloc(SLON_DSTRING_SIZE_INIT); \
	if ((__ds)->data == NULL) { \
		perror("dstring_init: malloc()"); \
		exit(-1); \
	} \
} while (0)
#define		dstring_reset(__ds) \
do { \
	(__ds)->n_used = 0; \
	(__ds)->data[0] = '\0'; \
} while (0)
#define		dstring_free(__ds) \
do { \
	free((__ds)->data); \
	(__ds)->n_used = 0; \
	(__ds)->data = NULL; \
} while (0)
#define		dstring_nappend(__ds,__s,__n) \
do { \
	if ((__ds)->n_used + (__n) >= (__ds)->n_alloc)	\
	{ \
		while ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \
			(__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \
		(__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \
		if ((__ds)->data == NULL) \
		{ \
			perror("dstring_nappend: realloc()"); \
			exit(-1); \
		} \
	} \
	memcpy(&((__ds)->data[(__ds)->n_used]), (__s), (__n)); \
	(__ds)->n_used += (__n); \
} while (0)
#define		dstring_append(___ds,___s) \
do { \
	register int ___n = strlen((___s)); \
	dstring_nappend((___ds),(___s),___n); \
} while (0)
#define		dstring_addchar(__ds,__c) \
do { \
	if ((__ds)->n_used + 1 >= (__ds)->n_alloc)	\
	{ \
		(__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \
		(__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \
		if ((__ds)->data == NULL) \
		{ \
			perror("dstring_append: realloc()"); \
			exit(-1); \
		} \
	} \
	(__ds)->data[(__ds)->n_used++] = (__c); \
} while (0)
#define		dstring_terminate(__ds) \
do { \
	(__ds)->data[(__ds)->n_used] = '\0'; \
} while (0)
#define		dstring_data(__ds)	((__ds)->data)


/*
 * Parser data structures
 */
typedef struct AttElem_s {
	char			   *attname;
	char			   *attvalue;
	struct AttElem_s  *next;
} AttElem;

typedef struct AttElemList_s {
	AttElem			   *list_head;
	AttElem			   *list_tail;
} AttElemList;

typedef struct InsertStmt_s {
	char			   *namespace;
	char			   *tablename;
	AttElemList		   *attributes;
} InsertStmt;

typedef struct UpdateStmt_s {
	char			   *namespace;
	char			   *tablename;
	AttElemList		   *changes;
	AttElemList		   *qualification;
} UpdateStmt;

typedef struct DeleteStmt_s {
	char			   *namespace;
	char			   *tablename;
	int					only;
	AttElemList		   *qualification;
} DeleteStmt;

typedef struct CopyStmt_s {
	char			   *namespace;
	char			   *tablename;
	AttElemList		   *attributes;
	char			   *from;
} CopyStmt;

typedef struct RenameObject_s {
	char			   *old_namespace;
	char			   *old_name;
	char			   *new_namespace;
	char			   *new_name;
	struct RenameObject_s *next;
} RenameObject;

typedef struct ProcessingCommand_s {
	char			   *command;
	struct ProcessingCommand_s *next;
} ProcessingCommand;


typedef enum {
	LOG_DEBUG = 0,
	LOG_INFO,
	LOG_WARN,
	LOG_ERROR
} log_level;


#ifndef MSGMAX
#define MSGMAX 1024
#endif


/*
 * Globals in slony_logshipper.c
 */
extern int			parse_errors;
extern char		   *current_file;
extern int			opt_quiet;
extern PGconn	   *dbconn;
extern bool			logfile_switch_requested;
extern bool			wait_for_resume;
extern bool			shutdown_smart_requested;
extern bool			shutdown_immed_requested;
extern SlonDString	errlog_messages;

extern char		   *archive_dir;
extern char		   *destination_dir;
extern char		   *destination_conninfo;
extern char		   *logfile_path;
extern int			max_archives;
extern char		   *cluster_name;
extern char		   *namespace;

extern RenameObject			   *rename_list;
extern ProcessingCommand	   *pre_processing_commands;
extern ProcessingCommand	   *post_processing_commands;
extern ProcessingCommand	   *error_commands;

/*
 * Functions in slony_logshipper.c
 */
extern int		process_check_at_counter(char *at_counter);
extern int		process_simple_sql(char *sql);
extern int		process_start_transaction(char *sql);
extern int		process_end_transaction(char *sql);
extern int		process_insert(InsertStmt *stmt);
extern int		process_update(UpdateStmt *stmt);
extern int		process_delete(DeleteStmt *stmt);
extern int		process_copy(CopyStmt *stmt);
extern int		process_copydata(char *line);
extern int		process_copyend(void);
extern void		config_add_rename(RenameObject *entry);
extern int		lookup_rename(char *namespace, char *name,
							char **use_namespace, char **use_name);
extern void		errlog(log_level level, char *fmt, ...);

/*
 * Functions in dbutil.c
 */
int			slon_mkquery(SlonDString * dsp, char *fmt,...);
int			slon_appendquery(SlonDString * dsp, char *fmt,...);


/*
 * Functions in ipcutil.c
 */
int			ipc_init(char *archive_dir);
int			ipc_finish(bool force);
int			ipc_poll(bool blocking);
int			ipc_send_path(char *logfname);
int			ipc_recv_path(char *buf);
int			ipc_send_term(char *archive_dir, bool immediate);
int			ipc_send_logswitch(char *archive_dir);
int			ipc_send_resume(char *archive_dir);
int			ipc_lock(void);
int			ipc_unlock(void);
int			ipc_cleanup(char *archive_dir);
void		ipc_set_shutdown_smart(void);
void		ipc_set_shutdown_immed(void);


/*
 * Parser related globals
 */
extern int	yylineno;
extern char *yytext;
extern FILE *yyin;
extern char	yychunk[];

extern void scan_new_input_file(FILE *in);
extern void scan_push_string(char *str);
extern int	scan_yyinput(void);
extern void	scan_copy_start(void);

extern void parse_error(const char *str);

extern void yyerror(const char *str);
extern int	yyparse(void);
extern int	yylex(void);


/*
 * Local Variables:
 *	tab-width: 4
 *	c-indent-level: 4
 *	c-basic-offset: 4
 * End:
 */

--- NEW FILE: Makefile ---
# ----------
# Makefile for src/slony_logshipper
#
#	Copyright (c) 2003-2004, PostgreSQL Global Development Group
#	Author: Jan Wieck, Afilias USA INC.
#
#	$Id: Makefile,v 1.2 2007-09-09 02:37:05 wieck Exp $
# ----------

slony_subdir = src/slony_logshipper
slony_top_builddir = ../..
SLFILEDESC="Slony command interpreter"
include $(slony_top_builddir)/Makefile.global

ifeq ($(PORTNAME), aix)
  CFLAGS += -D_LARGE_FILES
endif

CFLAGS += -I$(slony_top_builddir) -DPGSHARE="\"$(pgsharedir)\"" 


PROG		= slony_logshipper

ifeq ($(PORTNAME), win)
PROG            = slony_logshipper.exe
LDFLAG		= $(LDFLAG) -lpgport
endif
ifeq ($(PORTNAME), win32)
PROG            = slony_logshipper.exe
LDFLAG		= $(LDFLAG) -lpgport
endif

OBJS		= 					\
	slony_logshipper.o			\
	dbutil.o					\
	ipcutil.o					\
	parser.o $(WIN32RES)		\
	../parsestatements/scanner.o

DISTFILES = Makefile $(wildcard *.c) $(wildcard *.h) $(wildcard *.l) $(wildcard *.y)

ALL =					\
	$(PROG)


all:	$(ALL)

$(PROG):	$(OBJS)
	$(CC) $(CFLAGS) $(OBJS) $(LDFLAGS) -o $(PROG)
slony_logshipper.o:			slony_logshipper.c slony_logshipper.h
dbutil.o:			dbutil.c slony_logshipper.h
parser.o:			parser.c scan.c

parser.c:			parser.y slony_logshipper.h
ifdef YACC
	$(YACC) -d $(YFLAGS) $<
	mv -f y.tab.c parser.c
else
	@echo "Missing yacc $< $@"
	@exit 1
endif

scan.c:				scan.l slony_logshipper.h
ifdef FLEX 
	$(FLEX) $(FLEXFLAGS) -o'$@' $<
else
	@echo "Missing flex $< $@"
	@exit
endif

clean distclean:
	rm -f $(ALL) $(OBJS) $(PROG).core
	rm -f parser.c scan.c y.tab.h

maintainer-clean:	clean
	rm -f parser.c scan.c y.tab.h

install: all installdirs
	$(INSTALL_SCRIPT) $(PROG) $(DESTDIR)$(slonbindir)


installdirs:
	$(mkinstalldirs) $(DESTDIR)$(slonbindir)


distdir: $(DISTFILES)
	mkdir $(distdir)/$(subdir)
	-chmod 777 $(distdir)/$(subdir)
	for file in $(DISTFILES) ; do \
      cp $$file $(distdir)/$(subdir)/$$file || exit; \
    done

--- NEW FILE: parser.y ---
%{
/*-------------------------------------------------------------------------
 * parser.y
 *
 *	The slony_logshipper command language grammar
 *
 *	Copyright (c) 2003-2004, PostgreSQL Global Development Group
 *	Author: Jan Wieck, Afilias USA INC.
 *
 *	$Id: parser.y,v 1.2 2007-09-09 02:37:05 wieck Exp $
 *-------------------------------------------------------------------------
 */

#include "postgres.h"
#include "libpq-fe.h"
#include "slony_logshipper.h"
#include "../parsestatements/scanner.h"
extern int STMTS[MAXSTATEMENTS];

[...1072 lines suppressed...]
	errlog(LOG_ERROR, "%s:%d: ERROR %s\n", current_file,
				yylineno, msg);
	parse_errors++;
}


/*
 * Include the output of fles for the scanner here.
 */
#include "scan.c"



/*
 * Local Variables:
 *  tab-width: 4
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 */

--- NEW FILE: dbutil.c ---
/*-------------------------------------------------------------------------
 * dbutil.c
 *
 *	General database support functions.
 *
 *	Copyright (c) 2003-2004, PostgreSQL Global Development Group
 *	Author: Jan Wieck, Afilias USA INC.
 *
 *	$Id: dbutil.c,v 1.2 2007-09-09 02:37:05 wieck Exp $
 *-------------------------------------------------------------------------
 */


#ifndef WIN32
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#endif
#include "postgres.h"
#include "libpq-fe.h"

#include "slony_logshipper.h"


/*
 * Local functions
 */
static int	slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap);


/* ----------
 * slon_mkquery
 *
 *	A simple query formatting and quoting function using dynamic string
 *	buffer allocation.
 *	Similar to sprintf() it uses formatting symbols:
 *		%s		String argument
 *		%q		Quoted literal (\ and ' will be escaped)
 *		%d		Integer argument
 * ----------
 */
int
slon_mkquery(SlonDString * dsp, char *fmt,...)
{
	va_list		ap;

	dstring_reset(dsp);

	va_start(ap, fmt);
	slon_appendquery_int(dsp, fmt, ap);
	va_end(ap);

	dstring_terminate(dsp);

	return 0;
}


/* ----------
 * slon_appendquery
 *
 *	Append query string material to an existing dynamic string.
 * ----------
 */
int
slon_appendquery(SlonDString * dsp, char *fmt,...)
{
	va_list		ap;

	va_start(ap, fmt);
	slon_appendquery_int(dsp, fmt, ap);
	va_end(ap);

	dstring_terminate(dsp);

	return 0;
}


/* ----------
 * slon_appendquery_int
 *
 *	Implementation of slon_mkquery() and slon_appendquery().
 * ----------
 */
static int
slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap)
{
	char	   *s;
	char		buf[64];

	while (*fmt)
	{
		switch (*fmt)
		{
			case '%':
				fmt++;
				switch (*fmt)
				{
					case 's':
						s = va_arg(ap, char *);
						dstring_append(dsp, s);
						fmt++;
						break;

					case 'q':
						s = va_arg(ap, char *);
						while (s && *s != '\0')
						{
							switch (*s)
							{
								case '\'':
									dstring_addchar(dsp, '\'');
									break;
								case '\\':
									dstring_addchar(dsp, '\\');
									break;
								default:
									break;
							}
							dstring_addchar(dsp, *s);
							s++;
						}
						fmt++;
						break;

					case 'Q':
						s = va_arg(ap, char *);
						while (s && *s != '\0')
						{
							switch (*s)
							{
								case '\'':
								case '\\':
									dstring_addchar(dsp, '\\');
									break;
								default:
									break;
							}
							dstring_addchar(dsp, *s);
							s++;
						}
						fmt++;
						break;

					case 'd':
						sprintf(buf, "%d", va_arg(ap, int));
						dstring_append(dsp, buf);
						fmt++;
						break;

					default:
						dstring_addchar(dsp, '%');
						dstring_addchar(dsp, *fmt);
						fmt++;
						break;
				}
				break;

			case '\\':
				fmt++;
				dstring_addchar(dsp, *fmt);
				fmt++;
				break;

			default:
				dstring_addchar(dsp, *fmt);
				fmt++;
				break;
		}
	}

	dstring_terminate(dsp);

	return 0;
}

--- NEW FILE: slony_logshipper.c ---
/*-------------------------------------------------------------------------
 * slony_logshipper.c
 *
 *	A configuration and admin script utility for Slony-I.
 *
 *	Copyright (c) 2003-2004, PostgreSQL Global Development Group
 *	Author: Jan Wieck, Afilias USA INC.
 *
 *	$Id: slony_logshipper.c,v 1.2 2007-09-09 02:37:05 wieck Exp $
 *-------------------------------------------------------------------------
 */


#ifndef WIN32
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <fcntl.h>
[...1371 lines suppressed...]
	}
	free(buf);

	if (archscan_sort_out(ent->right) < 0)
		return -1;

	free(ent->fname);
	free(ent);

	return 0;
}


/*
 * Local Variables:
 *	tab-width: 4
 *	c-indent-level: 4
 *	c-basic-offset: 4
 * End:
 */

--- NEW FILE: ipcutil.c ---
/*-------------------------------------------------------------------------
 * ipcutil.c
 *
 *	Semaphore and message passing support for the slony_logshipper.
 *
 *	Copyright (c) 2003-2004, PostgreSQL Global Development Group
 *	Author: Jan Wieck, Afilias USA INC.
 *
 *	$Id: ipcutil.c,v 1.2 2007-09-09 02:37:05 wieck Exp $
 *-------------------------------------------------------------------------
 */


#ifndef WIN32
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <errno.h>
#include <signal.h>
#endif
#include "postgres.h"
#include "libpq-fe.h"

#include "slony_logshipper.h"


/*
 * The daemonized logshipper keeps a sorted queue of archive
 * files that need processing.
 */
typedef struct	queue_elem_s {
	char				   *archive_path;
	struct queue_elem_s	   *next;
} queue_elem;


/*
 * Static data
 */
static char			   *ipc_archive_dir = NULL;
static key_t			semkey;
static key_t			msgkey;
static int				semid;
static int				msgid;
static int				ipc_creator;
static queue_elem	   *archive_queue_head = NULL;
static queue_elem	   *archive_queue_tail = NULL;


/*
 * Local functions
 */
static int	ipc_generate_keys(char *archive_dir);
static void	ipc_sighandler(int sig);
static int	ipc_add_path(char *path);
static int	ipc_send_code(char *archive_dir, int code);


/*
 * ipc_init() -
 *
 *	Called to setup or connect to the semaphore set and message queue.
 */
int
ipc_init(char *archive_dir)
{
	struct sembuf	sops[2];

	if (ipc_generate_keys(archive_dir) < 0)
		return -1;

	/*
	 * We eventually have to start over again in case
	 * the existing daemon destroys the semaphore set
	 * after we attached and before we can lock it.
	 */
	while (true)
	{
		/*
		 * Create or attach to the semaphore set
		 */
		semid = semget(semkey, 2, 0700 | IPC_CREAT);
		if (semid < 0)
		{
			fprintf(stderr, "cannot create or attache to semaphore set\n"
							"semget(): %s\n", strerror(errno));
			return -1;
		}

		/*
		 * We now do two initial operations with NOWAIT:
		 *		wait for #1 =0
		 *		inc sem  #1 +1
		 * We never again touch semaphore #1, so this either succeeds, meaning
		 * that we created the set and hold the current lock. Or it fails with
		 * EAGAIN, meaning we attached to an existing set. Or it fails with
		 * EIDRM, meaning the set was destroyed.
		 */
		sops[0].sem_num = 1;
		sops[0].sem_op  = 0;
		sops[0].sem_flg = IPC_NOWAIT;
		sops[1].sem_num = 1;
		sops[1].sem_op  = 1;
		sops[1].sem_flg = 0;

		if (semop(semid, sops, 2) < 0)
		{
			if (errno == EIDRM)
				continue;

			if (errno != EAGAIN)
			{
				fprintf(stderr, "semop failed in ipc_init(): %s",
						strerror(errno));
				return -1;
			}

			/*
			 * Grab the lock on semaphore #0
			 */
			if (ipc_lock() < 0)
			{
				/*
				 * Since theres a gap between attaching and locking, the
				 * set could have been destroyed. In that case, start over.
				 */
				if (errno == EIDRM)
					continue;

				fprintf(stderr, "semop() failed in ipc_init(): %s\n",
						strerror(errno));
				return -1;
			}

			ipc_creator = 0;
		}
		else
		{
			/*
			 * Initial semop succeeded - we are the creator of this set.
			 */
			ipc_creator = 1;
			signal(SIGINT, ipc_sighandler);
			signal(SIGTERM, ipc_sighandler);
			signal(SIGPIPE, ipc_sighandler);
		}

		break;
	}

	/*
	 * At this point we have the semaphore set and it is locked.
	 */
	msgid = msgget(msgkey, 0700 | IPC_CREAT);
	if (msgid < 0)
	{
		fprintf(stderr, "msgget() failed in ipc_init(): %s\n", strerror(errno));
		if (ipc_creator)
		{
			if (semctl(semid, 0, IPC_RMID) < 0)
				fprintf(stderr, "semctl() failed in ipc_init(): %s\n",
						strerror(errno));
		}
		return -1;
	}

	return ipc_creator;
}


/*
 * ipc_finish() -
 *
 *	Locks and destroys the semaphore set and message queue if called
 *	in the daemonized queue runner. If force isn't given, it will
 *	only do so if the archive queue is empty after locking the set
 *	and draining the message queue.
 */
int
ipc_finish(bool force)
{
	if (ipc_creator)
	{
		if (!force)
		{
			/*
			 * We are the creator of the semaphore set, so if this isn't
			 * a force operation, we lock it first, poll the message queue
			 * and check that we have an empty queue.
			 */
			if (ipc_lock() < 0)
			{
				fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
						strerror(errno));
				return -1;
			}

			if (ipc_poll(false) < 0)
				return -1;
			
			if (archive_queue_head != NULL)
			{
				if (ipc_unlock() < 0)
				{
					fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
							strerror(errno));
					return -1;
				}
				return 1;
			}
		}
		/*
		 * At this point, we are either forced to stop or we have a lock
		 * and the queue is empty.
		 */
		if (msgctl(msgid, IPC_RMID, NULL) < 0)
		{
			fprintf(stderr, "msgctl() failed in ipc_finish(): %s\n",
					strerror(errno));
			semctl(semid, 0, IPC_RMID);
			return -1;
		}

		if (semctl(semid, 0, IPC_RMID) < 0)
		{
			fprintf(stderr, "semctl() failed in ipc_finish(): %s\n",
					strerror(errno));
			return -1;
		}
	}

	return 0;
}


/*
 * ipc_poll() -
 *
 *	Check for incoming messages
 */
int
ipc_poll(bool blocking)
{
	int		rc;
	struct {
		long		mtype;
		char		mtext[MSGMAX];
	}	msg;

	while(true)
	{
		rc = msgrcv(msgid, &msg, sizeof(msg), 0, 
				(blocking) ? 0 : IPC_NOWAIT);
		if (rc < 0)
		{
			if (errno == ENOMSG)
				return 0;

			fprintf(stderr, "msgrcv() failed in ipc_poll(): %s\n",
					strerror(errno));
			return -1;
		}

		if (msg.mtype == 2)
			shutdown_smart_requested = true;
		else if (msg.mtype == 3)
			shutdown_immed_requested = true;
		else if (msg.mtype == 4)
			wait_for_resume = false;
		else if (msg.mtype == 5)
			logfile_switch_requested = true;
		else
			if (ipc_add_path(msg.mtext) < 0)
				return -1;

		if (blocking)
			break;
	}

	return 0;
}


/*
 * ipc_add_path() -
 *
 *	Add an archive path to the sorted queue
 */
static int
ipc_add_path(char *path)
{
	queue_elem	  **elemp;
	queue_elem	   *elem;

	if ((elem = (queue_elem *)malloc(sizeof(queue_elem))) == NULL)
	{
		fprintf(stderr, "out of memory in ipc_add_path()\n");
		return -1;
	}
	if ((elem->archive_path = strdup(path)) == NULL)
	{
		fprintf(stderr, "out of memory in ipc_add_path()\n");
		return -1;
	}
	elem->next = NULL;

	/*
	 * See if we have to insert it in front of something else
	 */
	for (elemp = &archive_queue_head; *elemp != NULL; elemp = &((*elemp)->next))
	{
		if (strcmp(elem->archive_path, (*elemp)->archive_path) < 0)
		{
			elem->next = *elemp;
			*elemp = elem;
			return 0;
		}
	}
	

	if (archive_queue_head == NULL)
	{
		archive_queue_head = elem;
		archive_queue_tail = elem;
	}
	else
	{
		archive_queue_tail->next = elem;
		archive_queue_tail = elem;
	}

	return 0;
}


/*
 * ipc_send_path() -
 *
 *	Add an archive path to the daemons archive queue. Done by calling
 *	ipc_add_path() or sending the path to the daemon.
 */
int
ipc_send_path(char *logfname)
{
	struct {
		long		mtype;
		char		mtext[MSGMAX];
	}	msg;

	if (strlen(logfname) > (MSGMAX - 1))
	{
		fprintf(stderr, "path exceeds MSGMAX: %s\n", logfname);
		return -1;
	}

	/*
	 * As the creator, we are also the consumer, so we simply add the
	 * file to the queue.
	 */
	if (ipc_creator)
		return ipc_add_path(logfname);

	msg.mtype = 1;
	strcpy(msg.mtext, logfname);
	if (msgsnd(msgid, &msg, strlen(logfname) + 1, 0) < 0)
	{
		fprintf(stderr, "msgsnd() in ipc_send_path() failed: %s\n",
				strerror(errno));
		return -1;
	}

	return 0;
}


/*
 * ipc_recv_path()
 *
 *	Get the next archive file to process from the queue.
 */
int
ipc_recv_path(char *buf)
{
	queue_elem	   *elem;
	int				rc;
	struct {
		long		mtype;
		char		mtext[MSGMAX];
	}	msg;

	while (true)
	{
		if (ipc_poll(false) < 0)
		{
			ipc_finish(true);
			return -1;
		}

		/*
		 * If something requested an immediate shutdown, don't report any
		 * more logfiles back.
		 */
		if (shutdown_immed_requested)
		{
			ipc_finish(true);
			return 0;
		}

		/* 
		 * If a smart shutdown was requested, try to close the queue
		 * but don't force it.
		 */
		if (shutdown_smart_requested)
		{
			if ((rc = ipc_finish(false)) == 0)
			{
				return 0;
			}
			if (rc < 0)
			{
				ipc_finish(true);
				return -1;
			}
		}

		/*
		 * If we have something in the queue, return that.
		 */
		if (archive_queue_head != NULL)
		{
			elem = archive_queue_head;
			archive_queue_head = archive_queue_head->next;
			if (archive_queue_head == NULL)
				archive_queue_tail = NULL;

			strcpy(buf, elem->archive_path);
			free(elem->archive_path);
			free(elem);
			return 1;
		}

		/*
		 * Receive one single message blocking for it.
		 */
		rc = msgrcv(msgid, &msg, sizeof(msg), 0, 0);
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;

			fprintf(stderr, "msgrcv() failed in ipc_recv_path(): %s\n",
					strerror(errno));
			ipc_finish(true);
			return -1;
		}

		if (msg.mtype == 2)
			shutdown_smart_requested = true;
		else if (msg.mtype == 3)
			shutdown_immed_requested = true;
		else if (msg.mtype == 4)
			wait_for_resume = false;
		else if (msg.mtype == 5)
			logfile_switch_requested = true;
		else
			if (ipc_add_path(msg.mtext) < 0)
			{
				ipc_finish(true);
				return -1;
			}
	}
}


/*
 * ipc_send_term() -
 *
 *	Send a terminate request to the daemon.
 */
int
ipc_send_term(char *archive_dir, bool immediate)
{
	return ipc_send_code(archive_dir, (immediate) ? 3 : 2);
}


/*
 * ipc_send_logswitch() -
 *
 *	Send a logswitch code to the daemon.
 */
int
ipc_send_logswitch(char *archive_dir)
{
	return ipc_send_code(archive_dir, 5);
}


/*
 * ipc_send_resume() -
 *
 *	Send a resume (after error) code to the daemon.
 */
int
ipc_send_resume(char *archive_dir)
{
	return ipc_send_code(archive_dir, 4);
}


/*
 * ipc_send_code() -
 *
 *	Support function for ipc_send_term() and ipc_send_resume().
 */
static int
ipc_send_code(char *archive_dir, int code)
{
	struct {
		long	mtype;
		char	mtext[1];
	} msg;

	if (ipc_generate_keys(archive_dir) < 0)
		return -1;

	if ((semid = semget(semkey, 0, 0)) < 0)
	{
		if (!opt_quiet)
			fprintf(stderr, "no logshipper daemon running\n");
		return 2;
	}

	if (ipc_lock() < 0)
		return -1;

	if ((msgid = msgget(msgkey, 0)) < 0)
	{
		fprintf(stderr, "msgget() failed in ipc_send_code(): %s\n",
				strerror(errno));
		ipc_unlock();
		return -1;
	}
	
	msg.mtype = (long)code;
	if (msgsnd(msgid, &msg, 0, 0) < 0)
	{
		fprintf(stderr, "msgsnd() failed in ipc_send_code(): %s\n",
				strerror(errno));
		ipc_unlock();
		return -1;
	}

	return ipc_unlock();
}


/*
 * ipc_lock()
 *
 *	Lock the semaphore.
 */
int
ipc_lock(void)
{
	struct sembuf	sops[1] = {{0, -1, 0}};

	if (semop(semid, sops, 1) < 0)
	{
		fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
				strerror(errno));
		return -1;
	}
	return 0;
}


/*
 * ipc_unlock()
 *
 *	Unlock the semaphore.
 */
int
ipc_unlock(void)
{
	struct sembuf	sops[1] = {{0, 1, 0}};

	if (semop(semid, sops, 1) < 0)
	{
		fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
				strerror(errno));
		return -1;
	}
	return 0;
}


/*
 * ipc_cleanup() -
 *
 *	Destroy the semaphore set and message queue. Use with caution, this
 *	code does not check if there is a daemon running.
 */
int
ipc_cleanup(char *archive_dir)
{
	int			rc = 0;

	if (ipc_generate_keys(archive_dir) < 0)
		return -1;

	if ((semid = semget(semkey, 0, 0)) >= 0)
	{
		if (semctl(semid, 0, IPC_RMID) < 0)
		{
			fprintf(stderr, "semctl() failed in ipc_cleanup(): %s\n",
					strerror(errno));
			rc = -1;
		}
		else
			if (!opt_quiet)
				fprintf(stderr, "semaphore set removed\n");
			rc = 1;
	}
	else
		if (!opt_quiet)
			fprintf(stderr, "no semaphore set found\n");

	if ((msgid = msgget(msgkey, 0)) >= 0)
	{
		if (msgctl(msgid, IPC_RMID, NULL) < 0)
		{
			fprintf(stderr, "msgctl() failed in ipc_cleanup(): %s\n",
					strerror(errno));
			rc = -1;
		}
		else
			if (!opt_quiet)
				fprintf(stderr, "message queue removed\n");
		if (rc >= 0)
			rc |= 2;
	}
	else
		if (!opt_quiet)
			fprintf(stderr, "no message queue found\n");

	return rc;
}


/*
 * ipc_set_shutdown_smart() -
 *
 *	Put the queue into smart shutdown mode. This will cause
 *	ipc_recv_path() to return 0 once the queue is empty.
 */
void
ipc_set_shutdown_smart(void)
{
	shutdown_smart_requested = true;
}


/*
 * ipc_set_shutdown_immed() -
 *
 *	Put the queue into immediate shutdown mode. This will cause
 *	ipc_recv_path() to return 0 at the next call.
 */
void
ipc_set_shutdown_immed(void)
{
	shutdown_immed_requested = true;
}


/*
 * ipc_generate_keys() -
 *
 *	Generate the semkey and msgkey used for the Sys-V IPC objects.
 */
static int
ipc_generate_keys(char *archive_dir)
{
	if (ipc_archive_dir != NULL)
	{
		free(ipc_archive_dir);
		ipc_archive_dir = NULL;
	}

	/* ----
	 * Compute the two IPC keys used for the semaphore set and the
	 * message queue.
	 * ----
	 */
	semkey = ftok(archive_dir, 64);
	if (semkey < 0)
	{
		fprintf(stderr, "ftok(%s, 64): %s\n", archive_dir, strerror(errno));
		return -1;
	}
	msgkey = ftok(archive_dir, 65);
	if (msgkey < 0)
	{
		fprintf(stderr, "ftok(%s, 65): %s\n", archive_dir, strerror(errno));
		return -1;
	}

	ipc_archive_dir = strdup(archive_dir);
	if (ipc_archive_dir == NULL)
	{
		fprintf(stderr, "out of memory in ipc_generate_keys()\n");
		return -1;
	}

	return 0;
}


/*
 * ipc_sighandler()
 *
 *	Called on SIGINT and SIGTERM. Puts the daemon into immediate
 *	shutdown mode by sending ourself a -T message.
 */
static void
ipc_sighandler(int sig)
{
	struct {
		long	mtype;
		char	mtext[1];
	} msg;

	msg.mtype = 3;
	msgsnd(msgid, &msg, 0, 0);
}



--- NEW FILE: scan.l ---
%{
/*-------------------------------------------------------------------------
 * scan.l
 *
 *	Flex keyword and token scanner for slony_logshipper
 *
 *	Copyright (c) 2003-2004, PostgreSQL Global Development Group
 *	Author: Jan Wieck, Afilias USA INC.
 *
 *	$Id: scan.l,v 1.2 2007-09-09 02:37:05 wieck Exp $
 *-------------------------------------------------------------------------
 */

/*
 * Structure used to implement the input buffer stack.
 */
struct __yy_buffer
{
	YY_BUFFER_STATE	     buffer;		/* lexer buffer to restore on pop	*/
	long		         lineno;		/* line number to restore on pop	*/
	char               * fileName;		/* file name to restore on pop		*/
	FILE			   * yyin;			/* yyin to restore on pop			*/
	struct __yy_buffer * prev;			/* pointer to previous stack frame	*/
} * yy_buffer = NULL;

/*
 * Structure to hold defined symbols
 */
typedef struct _symbol
{
    char           * name;				/* Name of symbol with % prepended	*/
	char           * value;				/* Value of symbol					*/
	struct _symbol * next;				/* Pointer to next symbol			*/
} symbol;

/*
 * A buffer to send large items like literals in pieces to the parser.
 */
char yychunk[YY_BUF_SIZE / 2];

/*
 * Local data
 */
static symbol * symbols;				/* Head of list of symbols			*/

static char  *getSymbol(const char * name);	/* Return a symbol's value		*/
static void   addSymbol(char * str);	/* Add a new symbol					*/
static void   freeSymbols(void);		/* Free all symbols                 */
static void   pushBuffer(char *context);/* Push lexer buffer onto the stack	*/
static void   popBuffer( void );		/* Pop previous lexer buffer		*/

extern char * current_file;
%}

%option 8bit
%option noyywrap
%option yylineno
%option case-insensitive
%option pointer

%x  incl define
%x	IN_STRING
%x	COPYSTART COPY COPYLS

digit			[0-9]
ident_start		[A-Za-z\200-\377_]
ident_cont		[A-Za-z\200-\377_0-9\$]
space			[ \t\n\r\f]
instr_start		(.|{space})

quoted_ident	(\"[^\"]*\")+
identifier		({ident_start}{ident_cont}*|{quoted_ident})

conf_comment	(#[^\n]*)
exec_ddl		(--{space}DDL_SCRIPT{space}*)
archive_comment	(--[^\n]*)

%%

include{space}* 		{ BEGIN(incl);				}
define{space}*  		{ BEGIN(define);			}

%{
/* ----------
 * Keywords
 * ----------
 */
%}
start_config			{ return K_START_CONFIG;	}
start_archive			{ return K_START_ARCHIVE;	}
{conf_comment}			{ return K_CONF_COMMENT;	}
{archive_comment}		{ return K_ARCHIVE_COMMENT;	}
{exec_ddl}				{ return K_EXEC_DDL;		}

analyze					{ return K_ANALYZE;			}
and						{ return K_AND;				}
archive					{ return K_ARCHIVE;			}
archives				{ return K_ARCHIVES;		}
cluster					{ return K_CLUSTER;			}
command					{ return K_COMMAND;			}
commit					{ return K_COMMIT;			}
copy					{ return K_COPY;			}
database				{ return K_DATABASE;		}
delete					{ return K_DELETE;			}
destination				{ return K_DESTINATION;		}
dir						{ return K_DIR;				}
error					{ return K_ERROR;			}
from					{ return K_FROM;			}
ignore					{ return K_IGNORE;			}
insert					{ return K_INSERT;			}
into					{ return K_INTO;			}
logfile					{ return K_LOGFILE;			}
max						{ return K_MAX;				}
name					{ return K_NAME;			}
namespace				{ return K_NAMESPACE;		}
null					{ return K_NULL;			}
only					{ return K_ONLY;			}
post					{ return K_POST;			}
pre						{ return K_PRE;				}
processing				{ return K_PROCESSING;		}
rename					{ return K_RENAME;			}
replica					{ return K_REPLICA;			}
select					{ return K_SELECT;			}
session_replication_role { return K_SESSION_ROLE;	}
set						{ return K_SET;				}
start					{ return K_START;			}
table					{ return K_TABLE;			}
to						{ return K_TO;				}
transaction				{ return K_TRANSACTION;		}
update					{ return K_UPDATE;			}
vacuum					{ return K_VACUUM;			}
values					{ return K_VALUES;			}
where					{ return K_WHERE;			}

archivetracking_offline { return T_TRACKING_FUNCTION; }
finishtableaftercopy	{ return T_FINISH_FUNCTION;	}
sequencesetvalue_offline { return T_SEQSETVAL_FUNCTION;	}
setval 					{ return T_PGSETVAL_FUNCTION;	}

%{
/* ----------
 * Generic "things"
 * ----------
 */
%}
{digit}+				{ return T_NUMBER;			}
{identifier}			{ return T_IDENT;			}

{space}+		;

'				{
					char	   *cp = yychunk;
					int			len = 0;
					int			c;

					BEGIN(IN_STRING);
					while(len < sizeof(yychunk) - 1)
					{
						c = input();
						if (c == EOF)
						{
							errlog(LOG_ERROR, "%s: EOF inside literal\n",
									current_file);
							BEGIN(INITIAL);
							break;
						}
						if (c == '\\')
						{
							c = input();
							if (c == EOF)
							{
								errlog(LOG_ERROR, "%s: EOF inside literal\n",
										current_file);
								BEGIN(INITIAL);
								break;
							}
							*cp++ = c;
							len++;
							continue;
						}
						if (c == '\'')
						{
							c = input();
							if (c == EOF)
							{
								errlog(LOG_ERROR, "%s: EOF inside literal\n",
										current_file);
								BEGIN(INITIAL);
								break;
							}
							if (c == '\'')
							{
								*cp++ = c;
								len++;
								continue;
							}
							unput(c);
							*cp = '\0';
							BEGIN(INITIAL);
							return T_LITERAL;
						}
						*cp++ = c;
						len++;
					}
					*cp = '\0';
					return T_LITERAL_PART;
				}

<IN_STRING>{instr_start}	{
					char	   *cp = yychunk;
					int			len = 0;
					int			c;
					bool		chunk_start;

					BEGIN(IN_STRING);

					chunk_start = true;
					while(len < sizeof(yychunk) - 1)
					{
						if (chunk_start)
						{
							c = *yytext;
							chunk_start = false;
						}
						else
						{
							c = input();
						}
						if (c == EOF)
						{
							errlog(LOG_ERROR, "%s: EOF inside literal\n",
									current_file);
							BEGIN(INITIAL);
							break;
						}
						if (c == '\\')
						{
							c = input();
							if (c == EOF)
							{
								errlog(LOG_ERROR, "%s: EOF inside literal\n",
										current_file);
								BEGIN(INITIAL);
								break;
							}
							*cp++ = c;
							len++;
							continue;
						}
						if (c == '\'')
						{
							c = input();
							if (c == EOF)
							{
								errlog(LOG_ERROR, "%s: EOF inside literal\n",
										current_file);
								BEGIN(INITIAL);
								break;
							}
							if (c == '\'')
							{
								*cp++ = c;
								len++;
								continue;
							}
							unput(c);
							*cp = '\0';
							BEGIN(INITIAL);
							return T_LITERAL;
						}
						*cp++ = c;
						len++;
					}
					*cp = '\0';
					return T_LITERAL_PART;
				}

<COPYSTART>{space}+	{
					BEGIN(COPYLS);
				}
<COPYLS>\\.\n	{
					BEGIN(INITIAL);
					return T_COPYEND;
				}
<COPYLS>\\.\r\n	{
					BEGIN(INITIAL);
					return T_COPYEND;
				}
<COPYLS>.|{space} {
					char	   *cp = yychunk;
					int			len = 0;
					int			c;
					bool		chunk_start;

					BEGIN(COPY);

					chunk_start = true;
					while(len < sizeof(yychunk) - 2)
					{
						if (chunk_start)
						{
							c = yytext[0];
							chunk_start = false;
						}
						else
						{
							c = input();
						}
						if (c == EOF)
						{
							errlog(LOG_ERROR, "%s: EOF inside of COPY data\n",
									current_file);
							BEGIN(INITIAL);
							return T_COPYEND;
						}
						*cp++ = c;
						len++;
						if (c == '\n')
						{
							*cp = '\0';
							BEGIN(COPYLS);
							return T_COPYDATA;
						}
						if (len == 5)
						{
							*cp = '\0';
							return T_COPYDATA_PART;
						}
					}
					*cp = '\0';
					return T_COPYDATA_PART;
				}

<COPY>.|{space}	{
					char	   *cp = yychunk;
					int			len = 0;
					int			c;
					bool		chunk_start;

					chunk_start = true;
					while(len < sizeof(yychunk) - 2)
					{
						if (chunk_start)
						{
							c = yytext[0];
							chunk_start = false;
						}
						else
						{
							c = input();
						}
						if (c == EOF)
						{
							errlog(LOG_ERROR, "%s: EOF inside of COPY data\n",
									current_file);
							BEGIN(INITIAL);
							return T_COPYEND;
						}
						*cp++ = c;
						len++;
						if (c == '\n')
						{
							*cp = '\0';
							BEGIN(COPYLS);
							return T_COPYDATA;
						}
						if (len == 5)
						{
							*cp = '\0';
							return T_COPYDATA_PART;
						}
					}
					*cp = '\0';
					return T_COPYDATA_PART;
				}


%{ /*
'		{ 
			start_charpos = yytext;
			BEGIN(IN_STRING);
		}
<IN_STRING>\\.	{ }
<IN_STRING>\\	{ }
<IN_STRING>''	{ }
<IN_STRING>'	{
			yyleng += (yytext - start_charpos) - 2;
			yytext = start_charpos + 1;
			BEGIN(INITIAL);
			return T_LITERAL;
		}
<IN_STRING>[^'\\]+ {}
*/
%}

<define>{identifier}{space}+.*";" { addSymbol( yytext ); BEGIN(INITIAL); }

@{identifier}   { 
                  char * value = getSymbol( yytext );
                  
                  if( value )
                  {
                    pushBuffer( strdup( current_file ));
                    yy_scan_string( value );
                  }
                }

<incl>\<[^\>]+\>{space}*";"? {

                    char * fileName = strdup( yytext + 1 ); /* Skip '<' */

                    *strchr( fileName, '>' ) = '\0';        /* Trim '>' */

                    pushBuffer( fileName );

                    if(( yyin = fopen( fileName, "r" )) == NULL )
                    {
                        errlog(LOG_ERROR,  "Include file (%s) not found\n", fileName );
                        exit( 1 );
                    }
                 
                    yy_switch_to_buffer( yy_create_buffer( yyin, YY_BUF_SIZE ));

                    BEGIN(INITIAL);
                }

<<EOF>>         {
                    if( yy_buffer == NULL )
					{
                        yyterminate();
						if (YY_CURRENT_BUFFER)
							yy_delete_buffer(YY_CURRENT_BUFFER);
					}
                    else
                        popBuffer();
                }

#[^\r\n]*		;

.		{ return yytext[0];				}

%%

void
scan_new_input_file(FILE *in)
{
	while (yy_buffer != NULL)
		popBuffer();

	if (YY_CURRENT_BUFFER)
		yy_delete_buffer(YY_CURRENT_BUFFER);

	yy_switch_to_buffer(yy_create_buffer(in, YY_BUF_SIZE));

	yylineno = 1;

    freeSymbols();
}

void scan_push_string(char *str)
{
	pushBuffer(strdup("init"));
	yy_scan_string (str);
}


int scan_yyinput(void)
{
	return input();
}

void scan_copy_start(void)
{
	BEGIN(COPYSTART);
}

void pushBuffer( char * context )
{
	struct __yy_buffer * yb = malloc( sizeof( *yb ));

	yb->buffer   = YY_CURRENT_BUFFER;
	yb->lineno   = yylineno;
	yb->fileName = current_file;
	yb->yyin	 = yyin;
	yb->prev     = yy_buffer;

	yy_buffer = yb;

	current_file = context;
	yylineno     = 1;
	yyin         = NULL;
}

static void popBuffer( void )
{
	struct __yy_buffer * yb = yy_buffer;

	if( yyin != NULL )
		fclose( yyin );
	
	yy_delete_buffer( YY_CURRENT_BUFFER );
	yy_switch_to_buffer( yy_buffer->buffer );
	
	current_file = yy_buffer->fileName;
	yylineno     = yy_buffer->lineno;
	yyin         = yy_buffer->yyin;
	
	yy_buffer = yy_buffer->prev;
	
	free( yb );
}

static void addSymbol( char * str )
{
	char   * name  = str;
	char   * value = str;
	symbol * sym   = NULL;

	while( *value != ' ' && *value != '\t' )
		value++;

	*(value++) = '\0';

	while( *value == ' ' || *value == '\t' )
		value++;

	value[strlen(value) - 1 ] = '\0';

	sym = malloc( sizeof( *sym ));
	sym->value = strdup( value );
	sym->next  = NULL;
	
	/* Store the symbol name in searchable form  with a leading @ */

	sym->name  = malloc( strlen( name ) + 1 + 1 );

	sym->name[0] = '@';
	strcpy( sym->name+1, name );

	if( symbols != NULL )
		sym->next = symbols;

	symbols = sym;
}

static char * getSymbol( const char * name )
{
	symbol * sym;

	for( sym = symbols; sym; sym = sym->next )
	{
		if( strcmp( name, sym->name ) == 0 )
			return( sym->value );
	}

	return( NULL );
}

static void freeSymbols( void )
{
	symbol * sym = symbols;

	while( sym )
	{
		symbol * victim = sym;

		sym = sym->next;

		free( victim->name );
		free( victim->value );
		free( victim );
	}

	symbols = NULL;
}
/*
 * Local Variables:
 *  tab-width: 4
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 */



More information about the Slony1-commit mailing list