Sat Sep 8 07:21:43 PDT 2007
- Previous message: [Slony1-commit] slony1-engine/src/slon remote_worker.c
- Next message: [Slony1-commit] slony1-engine/src/ducttape test_8_logshipper.conf
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slony_logshipper
In directory main.slony.info:/tmp/cvs-serv20140/src/slony_logshipper
Added Files:
Tag: REL_1_2_STABLE
.cvsignore Makefile dbutil.c ipcutil.c parser.y scan.l
slony_logshipper.c slony_logshipper.h
Log Message:
Add the slony_logshipper.
This is a standalone utility that can be used with the slon -x option
to postprocess slony archive log files. Since it does not require any
changes in the existing slony or slonik functionality, I decided to
the current stable branch to be released with 1.2.12.
Jan
--- NEW FILE: .cvsignore ---
slony_logshipper
parser.c
y.tab.h
scan.c
--- NEW FILE: slony_logshipper.h ---
/*-------------------------------------------------------------------------
* slony_logshipper.h
*
* Definitions for slony_logshipper
*
* Copyright (c) 2003-2004, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
* $Id: slony_logshipper.h,v 1.1.2.1 2007-09-08 14:21:40 wieck Exp $
*-------------------------------------------------------------------------
*/
/* ----------
* SlonDString
* ----------
*/
#define SLON_DSTRING_SIZE_INIT 256
#define SLON_DSTRING_SIZE_INC 2
typedef struct
{
size_t n_alloc;
size_t n_used;
char *data;
} SlonDString;
#define dstring_init(__ds) \
do { \
(__ds)->n_alloc = SLON_DSTRING_SIZE_INIT; \
(__ds)->n_used = 0; \
(__ds)->data = malloc(SLON_DSTRING_SIZE_INIT); \
if ((__ds)->data == NULL) { \
perror("dstring_init: malloc()"); \
exit(-1); \
} \
} while (0)
#define dstring_reset(__ds) \
do { \
(__ds)->n_used = 0; \
(__ds)->data[0] = '\0'; \
} while (0)
#define dstring_free(__ds) \
do { \
free((__ds)->data); \
(__ds)->n_used = 0; \
(__ds)->data = NULL; \
} while (0)
#define dstring_nappend(__ds,__s,__n) \
do { \
if ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \
{ \
while ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \
(__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \
(__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \
if ((__ds)->data == NULL) \
{ \
perror("dstring_nappend: realloc()"); \
exit(-1); \
} \
} \
memcpy(&((__ds)->data[(__ds)->n_used]), (__s), (__n)); \
(__ds)->n_used += (__n); \
} while (0)
#define dstring_append(___ds,___s) \
do { \
register int ___n = strlen((___s)); \
dstring_nappend((___ds),(___s),___n); \
} while (0)
#define dstring_addchar(__ds,__c) \
do { \
if ((__ds)->n_used + 1 >= (__ds)->n_alloc) \
{ \
(__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \
(__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \
if ((__ds)->data == NULL) \
{ \
perror("dstring_append: realloc()"); \
exit(-1); \
} \
} \
(__ds)->data[(__ds)->n_used++] = (__c); \
} while (0)
#define dstring_terminate(__ds) \
do { \
(__ds)->data[(__ds)->n_used] = '\0'; \
} while (0)
#define dstring_data(__ds) ((__ds)->data)
/*
* Parser data structures
*/
typedef struct AttElem_s {
char *attname;
char *attvalue;
struct AttElem_s *next;
} AttElem;
typedef struct AttElemList_s {
AttElem *list_head;
AttElem *list_tail;
} AttElemList;
typedef struct InsertStmt_s {
char *namespace;
char *tablename;
AttElemList *attributes;
} InsertStmt;
typedef struct UpdateStmt_s {
char *namespace;
char *tablename;
AttElemList *changes;
AttElemList *qualification;
} UpdateStmt;
typedef struct DeleteStmt_s {
char *namespace;
char *tablename;
int only;
AttElemList *qualification;
} DeleteStmt;
typedef struct CopyStmt_s {
char *namespace;
char *tablename;
AttElemList *attributes;
char *from;
} CopyStmt;
typedef struct RenameObject_s {
char *old_namespace;
char *old_name;
char *new_namespace;
char *new_name;
struct RenameObject_s *next;
} RenameObject;
typedef struct ProcessingCommand_s {
char *command;
struct ProcessingCommand_s *next;
} ProcessingCommand;
typedef enum {
LOG_DEBUG = 0,
LOG_INFO,
LOG_WARN,
LOG_ERROR
} log_level;
#ifndef MSGMAX
#define MSGMAX 1024
#endif
/*
* Globals in slony_logshipper.c
*/
extern int parse_errors;
extern char *current_file;
extern int opt_quiet;
extern PGconn *dbconn;
extern bool logfile_switch_requested;
extern bool wait_for_resume;
extern bool shutdown_smart_requested;
extern bool shutdown_immed_requested;
extern SlonDString errlog_messages;
extern char *archive_dir;
extern char *destination_dir;
extern char *destination_conninfo;
extern char *logfile_path;
extern int max_archives;
extern char *cluster_name;
extern char *namespace;
extern RenameObject *rename_list;
extern ProcessingCommand *pre_processing_commands;
extern ProcessingCommand *post_processing_commands;
extern ProcessingCommand *error_commands;
/*
* Functions in slony_logshipper.c
*/
extern int process_check_at_counter(char *at_counter);
extern int process_simple_sql(char *sql);
extern int process_start_transaction(char *sql);
extern int process_end_transaction(char *sql);
extern int process_insert(InsertStmt *stmt);
extern int process_update(UpdateStmt *stmt);
extern int process_delete(DeleteStmt *stmt);
extern int process_copy(CopyStmt *stmt);
extern int process_copydata(char *line);
extern int process_copyend(void);
extern void config_add_rename(RenameObject *entry);
extern int lookup_rename(char *namespace, char *name,
char **use_namespace, char **use_name);
extern void errlog(log_level level, char *fmt, ...);
/*
* Functions in dbutil.c
*/
int slon_mkquery(SlonDString * dsp, char *fmt,...);
int slon_appendquery(SlonDString * dsp, char *fmt,...);
/*
* Functions in ipcutil.c
*/
int ipc_init(char *archive_dir);
int ipc_finish(bool force);
int ipc_poll(bool blocking);
int ipc_send_path(char *logfname);
int ipc_recv_path(char *buf);
int ipc_send_term(char *archive_dir, bool immediate);
int ipc_send_logswitch(char *archive_dir);
int ipc_send_resume(char *archive_dir);
int ipc_lock(void);
int ipc_unlock(void);
int ipc_cleanup(char *archive_dir);
void ipc_set_shutdown_smart(void);
void ipc_set_shutdown_immed(void);
/*
* Parser related globals
*/
extern int yylineno;
extern char *yytext;
extern FILE *yyin;
extern char yychunk[];
extern void scan_new_input_file(FILE *in);
extern void scan_push_string(char *str);
extern int scan_yyinput(void);
extern void scan_copy_start(void);
extern void parse_error(const char *str);
extern void yyerror(const char *str);
extern int yyparse(void);
extern int yylex(void);
/*
* Local Variables:
* tab-width: 4
* c-indent-level: 4
* c-basic-offset: 4
* End:
*/
--- NEW FILE: Makefile ---
# ----------
# Makefile for src/slony_logshipper
#
# Copyright (c) 2003-2004, PostgreSQL Global Development Group
# Author: Jan Wieck, Afilias USA INC.
#
# $Id: Makefile,v 1.1.2.1 2007-09-08 14:21:40 wieck Exp $
# ----------
slony_subdir = src/slony_logshipper
slony_top_builddir = ../..
SLFILEDESC="Slony command interpreter"
include $(slony_top_builddir)/Makefile.global
ifeq ($(PORTNAME), aix)
CFLAGS += -D_LARGE_FILES
endif
CFLAGS += -I$(slony_top_builddir) -DPGSHARE="\"$(pgsharedir)\""
PROG = slony_logshipper
ifeq ($(PORTNAME), win)
PROG = slony_logshipper.exe
LDFLAG = $(LDFLAG) -lpgport
endif
ifeq ($(PORTNAME), win32)
PROG = slony_logshipper.exe
LDFLAG = $(LDFLAG) -lpgport
endif
OBJS = \
slony_logshipper.o \
dbutil.o \
ipcutil.o \
parser.o $(WIN32RES) \
../parsestatements/scanner.o
DISTFILES = Makefile $(wildcard *.c) $(wildcard *.h) $(wildcard *.l) $(wildcard *.y)
ALL = \
$(PROG)
all: $(ALL)
$(PROG): $(OBJS)
$(CC) $(CFLAGS) $(OBJS) $(LDFLAGS) -o $(PROG)
slony_logshipper.o: slony_logshipper.c slony_logshipper.h
dbutil.o: dbutil.c slony_logshipper.h
parser.o: parser.c scan.c
parser.c: parser.y slony_logshipper.h
ifdef YACC
$(YACC) -d $(YFLAGS) $<
mv -f y.tab.c parser.c
else
@echo "Missing yacc $< $@"
@exit 1
endif
scan.c: scan.l slony_logshipper.h
ifdef FLEX
$(FLEX) $(FLEXFLAGS) -o'$@' $<
else
@echo "Missing flex $< $@"
@exit
endif
clean distclean:
rm -f $(ALL) $(OBJS) $(PROG).core
rm -f parser.c scan.c y.tab.h
maintainer-clean: clean
rm -f parser.c scan.c y.tab.h
install: all installdirs
$(INSTALL_SCRIPT) $(PROG) $(DESTDIR)$(slonbindir)
installdirs:
$(mkinstalldirs) $(DESTDIR)$(slonbindir)
distdir: $(DISTFILES)
mkdir $(distdir)/$(subdir)
-chmod 777 $(distdir)/$(subdir)
for file in $(DISTFILES) ; do \
cp $$file $(distdir)/$(subdir)/$$file || exit; \
done
--- NEW FILE: parser.y ---
%{
/*-------------------------------------------------------------------------
* parser.y
*
* The slony_logshipper command language grammar
*
* Copyright (c) 2003-2004, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
* $Id: parser.y,v 1.1.2.1 2007-09-08 14:21:40 wieck Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "slony_logshipper.h"
#include "../parsestatements/scanner.h"
extern int STMTS[MAXSTATEMENTS];
[...1061 lines suppressed...]
errlog(LOG_ERROR, "%s:%d: ERROR %s\n", current_file,
yylineno, msg);
parse_errors++;
}
/*
* Include the output of fles for the scanner here.
*/
#include "scan.c"
/*
* Local Variables:
* tab-width: 4
* c-indent-level: 4
* c-basic-offset: 4
* End:
*/
--- NEW FILE: dbutil.c ---
/*-------------------------------------------------------------------------
* dbutil.c
*
* General database support functions.
*
* Copyright (c) 2003-2004, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
* $Id: dbutil.c,v 1.1.2.1 2007-09-08 14:21:40 wieck Exp $
*-------------------------------------------------------------------------
*/
#ifndef WIN32
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#endif
#include "postgres.h"
#include "libpq-fe.h"
#include "slony_logshipper.h"
/*
* Local functions
*/
static int slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap);
/* ----------
* slon_mkquery
*
* A simple query formatting and quoting function using dynamic string
* buffer allocation.
* Similar to sprintf() it uses formatting symbols:
* %s String argument
* %q Quoted literal (\ and ' will be escaped)
* %d Integer argument
* ----------
*/
int
slon_mkquery(SlonDString * dsp, char *fmt,...)
{
va_list ap;
dstring_reset(dsp);
va_start(ap, fmt);
slon_appendquery_int(dsp, fmt, ap);
va_end(ap);
dstring_terminate(dsp);
return 0;
}
/* ----------
* slon_appendquery
*
* Append query string material to an existing dynamic string.
* ----------
*/
int
slon_appendquery(SlonDString * dsp, char *fmt,...)
{
va_list ap;
va_start(ap, fmt);
slon_appendquery_int(dsp, fmt, ap);
va_end(ap);
dstring_terminate(dsp);
return 0;
}
/* ----------
* slon_appendquery_int
*
* Implementation of slon_mkquery() and slon_appendquery().
* ----------
*/
static int
slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap)
{
char *s;
char buf[64];
while (*fmt)
{
switch (*fmt)
{
case '%':
fmt++;
switch (*fmt)
{
case 's':
s = va_arg(ap, char *);
dstring_append(dsp, s);
fmt++;
break;
case 'q':
s = va_arg(ap, char *);
while (s && *s != '\0')
{
switch (*s)
{
case '\'':
dstring_addchar(dsp, '\'');
break;
case '\\':
dstring_addchar(dsp, '\\');
break;
default:
break;
}
dstring_addchar(dsp, *s);
s++;
}
fmt++;
break;
case 'Q':
s = va_arg(ap, char *);
while (s && *s != '\0')
{
switch (*s)
{
case '\'':
case '\\':
dstring_addchar(dsp, '\\');
break;
default:
break;
}
dstring_addchar(dsp, *s);
s++;
}
fmt++;
break;
case 'd':
sprintf(buf, "%d", va_arg(ap, int));
dstring_append(dsp, buf);
fmt++;
break;
default:
dstring_addchar(dsp, '%');
dstring_addchar(dsp, *fmt);
fmt++;
break;
}
break;
case '\\':
fmt++;
dstring_addchar(dsp, *fmt);
fmt++;
break;
default:
dstring_addchar(dsp, *fmt);
fmt++;
break;
}
}
dstring_terminate(dsp);
return 0;
}
--- NEW FILE: slony_logshipper.c ---
/*-------------------------------------------------------------------------
* slony_logshipper.c
*
* A configuration and admin script utility for Slony-I.
*
* Copyright (c) 2003-2004, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
* $Id: slony_logshipper.c,v 1.1.2.1 2007-09-08 14:21:40 wieck Exp $
*-------------------------------------------------------------------------
*/
#ifndef WIN32
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <fcntl.h>
[...1371 lines suppressed...]
}
free(buf);
if (archscan_sort_out(ent->right) < 0)
return -1;
free(ent->fname);
free(ent);
return 0;
}
/*
* Local Variables:
* tab-width: 4
* c-indent-level: 4
* c-basic-offset: 4
* End:
*/
--- NEW FILE: ipcutil.c ---
/*-------------------------------------------------------------------------
* ipcutil.c
*
* Semaphore and message passing support for the slony_logshipper.
*
* Copyright (c) 2003-2004, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
* $Id: ipcutil.c,v 1.1.2.1 2007-09-08 14:21:40 wieck Exp $
*-------------------------------------------------------------------------
*/
#ifndef WIN32
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <errno.h>
#include <signal.h>
#endif
#include "postgres.h"
#include "libpq-fe.h"
#include "slony_logshipper.h"
/*
* The daemonized logshipper keeps a sorted queue of archive
* files that need processing.
*/
typedef struct queue_elem_s {
char *archive_path;
struct queue_elem_s *next;
} queue_elem;
/*
* Static data
*/
static char *ipc_archive_dir = NULL;
static key_t semkey;
static key_t msgkey;
static int semid;
static int msgid;
static int ipc_creator;
static queue_elem *archive_queue_head = NULL;
static queue_elem *archive_queue_tail = NULL;
/*
* Local functions
*/
static int ipc_generate_keys(char *archive_dir);
static void ipc_sighandler(int sig);
static int ipc_add_path(char *path);
static int ipc_send_code(char *archive_dir, int code);
/*
* ipc_init() -
*
* Called to setup or connect to the semaphore set and message queue.
*/
int
ipc_init(char *archive_dir)
{
struct sembuf sops[2];
if (ipc_generate_keys(archive_dir) < 0)
return -1;
/*
* We eventually have to start over again in case
* the existing daemon destroys the semaphore set
* after we attached and before we can lock it.
*/
while (true)
{
/*
* Create or attach to the semaphore set
*/
semid = semget(semkey, 2, 0700 | IPC_CREAT);
if (semid < 0)
{
fprintf(stderr, "cannot create or attache to semaphore set\n"
"semget(): %s\n", strerror(errno));
return -1;
}
/*
* We now do two initial operations with NOWAIT:
* wait for #1 =0
* inc sem #1 +1
* We never again touch semaphore #1, so this either succeeds, meaning
* that we created the set and hold the current lock. Or it fails with
* EAGAIN, meaning we attached to an existing set. Or it fails with
* EIDRM, meaning the set was destroyed.
*/
sops[0].sem_num = 1;
sops[0].sem_op = 0;
sops[0].sem_flg = IPC_NOWAIT;
sops[1].sem_num = 1;
sops[1].sem_op = 1;
sops[1].sem_flg = 0;
if (semop(semid, sops, 2) < 0)
{
if (errno == EIDRM)
continue;
if (errno != EAGAIN)
{
fprintf(stderr, "semop failed in ipc_init(): %s",
strerror(errno));
return -1;
}
/*
* Grab the lock on semaphore #0
*/
if (ipc_lock() < 0)
{
/*
* Since theres a gap between attaching and locking, the
* set could have been destroyed. In that case, start over.
*/
if (errno == EIDRM)
continue;
fprintf(stderr, "semop() failed in ipc_init(): %s\n",
strerror(errno));
return -1;
}
ipc_creator = 0;
}
else
{
/*
* Initial semop succeeded - we are the creator of this set.
*/
ipc_creator = 1;
signal(SIGINT, ipc_sighandler);
signal(SIGTERM, ipc_sighandler);
signal(SIGPIPE, ipc_sighandler);
}
break;
}
/*
* At this point we have the semaphore set and it is locked.
*/
msgid = msgget(msgkey, 0700 | IPC_CREAT);
if (msgid < 0)
{
fprintf(stderr, "msgget() failed in ipc_init(): %s\n", strerror(errno));
if (ipc_creator)
{
if (semctl(semid, 0, IPC_RMID) < 0)
fprintf(stderr, "semctl() failed in ipc_init(): %s\n",
strerror(errno));
}
return -1;
}
return ipc_creator;
}
/*
* ipc_finish() -
*
* Locks and destroys the semaphore set and message queue if called
* in the daemonized queue runner. If force isn't given, it will
* only do so if the archive queue is empty after locking the set
* and draining the message queue.
*/
int
ipc_finish(bool force)
{
if (ipc_creator)
{
if (!force)
{
/*
* We are the creator of the semaphore set, so if this isn't
* a force operation, we lock it first, poll the message queue
* and check that we have an empty queue.
*/
if (ipc_lock() < 0)
{
fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
strerror(errno));
return -1;
}
if (ipc_poll(false) < 0)
return -1;
if (archive_queue_head != NULL)
{
if (ipc_unlock() < 0)
{
fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
strerror(errno));
return -1;
}
return 1;
}
}
/*
* At this point, we are either forced to stop or we have a lock
* and the queue is empty.
*/
if (msgctl(msgid, IPC_RMID, NULL) < 0)
{
fprintf(stderr, "msgctl() failed in ipc_finish(): %s\n",
strerror(errno));
semctl(semid, 0, IPC_RMID);
return -1;
}
if (semctl(semid, 0, IPC_RMID) < 0)
{
fprintf(stderr, "semctl() failed in ipc_finish(): %s\n",
strerror(errno));
return -1;
}
}
return 0;
}
/*
* ipc_poll() -
*
* Check for incoming messages
*/
int
ipc_poll(bool blocking)
{
int rc;
struct {
long mtype;
char mtext[MSGMAX];
} msg;
while(true)
{
rc = msgrcv(msgid, &msg, sizeof(msg), 0,
(blocking) ? 0 : IPC_NOWAIT);
if (rc < 0)
{
if (errno == ENOMSG)
return 0;
fprintf(stderr, "msgrcv() failed in ipc_poll(): %s\n",
strerror(errno));
return -1;
}
if (msg.mtype == 2)
shutdown_smart_requested = true;
else if (msg.mtype == 3)
shutdown_immed_requested = true;
else if (msg.mtype == 4)
wait_for_resume = false;
else if (msg.mtype == 5)
logfile_switch_requested = true;
else
if (ipc_add_path(msg.mtext) < 0)
return -1;
if (blocking)
break;
}
return 0;
}
/*
* ipc_add_path() -
*
* Add an archive path to the sorted queue
*/
static int
ipc_add_path(char *path)
{
queue_elem **elemp;
queue_elem *elem;
if ((elem = (queue_elem *)malloc(sizeof(queue_elem))) == NULL)
{
fprintf(stderr, "out of memory in ipc_add_path()\n");
return -1;
}
if ((elem->archive_path = strdup(path)) == NULL)
{
fprintf(stderr, "out of memory in ipc_add_path()\n");
return -1;
}
elem->next = NULL;
/*
* See if we have to insert it in front of something else
*/
for (elemp = &archive_queue_head; *elemp != NULL; elemp = &((*elemp)->next))
{
if (strcmp(elem->archive_path, (*elemp)->archive_path) < 0)
{
elem->next = *elemp;
*elemp = elem;
return 0;
}
}
if (archive_queue_head == NULL)
{
archive_queue_head = elem;
archive_queue_tail = elem;
}
else
{
archive_queue_tail->next = elem;
archive_queue_tail = elem;
}
return 0;
}
/*
* ipc_send_path() -
*
* Add an archive path to the daemons archive queue. Done by calling
* ipc_add_path() or sending the path to the daemon.
*/
int
ipc_send_path(char *logfname)
{
struct {
long mtype;
char mtext[MSGMAX];
} msg;
if (strlen(logfname) > (MSGMAX - 1))
{
fprintf(stderr, "path exceeds MSGMAX: %s\n", logfname);
return -1;
}
/*
* As the creator, we are also the consumer, so we simply add the
* file to the queue.
*/
if (ipc_creator)
return ipc_add_path(logfname);
msg.mtype = 1;
strcpy(msg.mtext, logfname);
if (msgsnd(msgid, &msg, strlen(logfname) + 1, 0) < 0)
{
fprintf(stderr, "msgsnd() in ipc_send_path() failed: %s\n",
strerror(errno));
return -1;
}
return 0;
}
/*
* ipc_recv_path()
*
* Get the next archive file to process from the queue.
*/
int
ipc_recv_path(char *buf)
{
queue_elem *elem;
int rc;
struct {
long mtype;
char mtext[MSGMAX];
} msg;
while (true)
{
if (ipc_poll(false) < 0)
{
ipc_finish(true);
return -1;
}
/*
* If something requested an immediate shutdown, don't report any
* more logfiles back.
*/
if (shutdown_immed_requested)
{
ipc_finish(true);
return 0;
}
/*
* If a smart shutdown was requested, try to close the queue
* but don't force it.
*/
if (shutdown_smart_requested)
{
if ((rc = ipc_finish(false)) == 0)
{
return 0;
}
if (rc < 0)
{
ipc_finish(true);
return -1;
}
}
/*
* If we have something in the queue, return that.
*/
if (archive_queue_head != NULL)
{
elem = archive_queue_head;
archive_queue_head = archive_queue_head->next;
if (archive_queue_head == NULL)
archive_queue_tail = NULL;
strcpy(buf, elem->archive_path);
free(elem->archive_path);
free(elem);
return 1;
}
/*
* Receive one single message blocking for it.
*/
rc = msgrcv(msgid, &msg, sizeof(msg), 0, 0);
if (rc < 0)
{
if (errno == EINTR)
continue;
fprintf(stderr, "msgrcv() failed in ipc_recv_path(): %s\n",
strerror(errno));
ipc_finish(true);
return -1;
}
if (msg.mtype == 2)
shutdown_smart_requested = true;
else if (msg.mtype == 3)
shutdown_immed_requested = true;
else if (msg.mtype == 4)
wait_for_resume = false;
else if (msg.mtype == 5)
logfile_switch_requested = true;
else
if (ipc_add_path(msg.mtext) < 0)
{
ipc_finish(true);
return -1;
}
}
}
/*
* ipc_send_term() -
*
* Send a terminate request to the daemon.
*/
int
ipc_send_term(char *archive_dir, bool immediate)
{
return ipc_send_code(archive_dir, (immediate) ? 3 : 2);
}
/*
* ipc_send_logswitch() -
*
* Send a logswitch code to the daemon.
*/
int
ipc_send_logswitch(char *archive_dir)
{
return ipc_send_code(archive_dir, 5);
}
/*
* ipc_send_resume() -
*
* Send a resume (after error) code to the daemon.
*/
int
ipc_send_resume(char *archive_dir)
{
return ipc_send_code(archive_dir, 4);
}
/*
* ipc_send_code() -
*
* Support function for ipc_send_term() and ipc_send_resume().
*/
static int
ipc_send_code(char *archive_dir, int code)
{
struct {
long mtype;
char mtext[1];
} msg;
if (ipc_generate_keys(archive_dir) < 0)
return -1;
if ((semid = semget(semkey, 0, 0)) < 0)
{
if (!opt_quiet)
fprintf(stderr, "no logshipper daemon running\n");
return 2;
}
if (ipc_lock() < 0)
return -1;
if ((msgid = msgget(msgkey, 0)) < 0)
{
fprintf(stderr, "msgget() failed in ipc_send_code(): %s\n",
strerror(errno));
ipc_unlock();
return -1;
}
msg.mtype = (long)code;
if (msgsnd(msgid, &msg, 0, 0) < 0)
{
fprintf(stderr, "msgsnd() failed in ipc_send_code(): %s\n",
strerror(errno));
ipc_unlock();
return -1;
}
return ipc_unlock();
}
/*
* ipc_lock()
*
* Lock the semaphore.
*/
int
ipc_lock(void)
{
struct sembuf sops[1] = {{0, -1, 0}};
if (semop(semid, sops, 1) < 0)
{
fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
strerror(errno));
return -1;
}
return 0;
}
/*
* ipc_unlock()
*
* Unlock the semaphore.
*/
int
ipc_unlock(void)
{
struct sembuf sops[1] = {{0, 1, 0}};
if (semop(semid, sops, 1) < 0)
{
fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
strerror(errno));
return -1;
}
return 0;
}
/*
* ipc_cleanup() -
*
* Destroy the semaphore set and message queue. Use with caution, this
* code does not check if there is a daemon running.
*/
int
ipc_cleanup(char *archive_dir)
{
int rc = 0;
if (ipc_generate_keys(archive_dir) < 0)
return -1;
if ((semid = semget(semkey, 0, 0)) >= 0)
{
if (semctl(semid, 0, IPC_RMID) < 0)
{
fprintf(stderr, "semctl() failed in ipc_cleanup(): %s\n",
strerror(errno));
rc = -1;
}
else
if (!opt_quiet)
fprintf(stderr, "semaphore set removed\n");
rc = 1;
}
else
if (!opt_quiet)
fprintf(stderr, "no semaphore set found\n");
if ((msgid = msgget(msgkey, 0)) >= 0)
{
if (msgctl(msgid, IPC_RMID, NULL) < 0)
{
fprintf(stderr, "msgctl() failed in ipc_cleanup(): %s\n",
strerror(errno));
rc = -1;
}
else
if (!opt_quiet)
fprintf(stderr, "message queue removed\n");
if (rc >= 0)
rc |= 2;
}
else
if (!opt_quiet)
fprintf(stderr, "no message queue found\n");
return rc;
}
/*
* ipc_set_shutdown_smart() -
*
* Put the queue into smart shutdown mode. This will cause
* ipc_recv_path() to return 0 once the queue is empty.
*/
void
ipc_set_shutdown_smart(void)
{
shutdown_smart_requested = true;
}
/*
* ipc_set_shutdown_immed() -
*
* Put the queue into immediate shutdown mode. This will cause
* ipc_recv_path() to return 0 at the next call.
*/
void
ipc_set_shutdown_immed(void)
{
shutdown_immed_requested = true;
}
/*
* ipc_generate_keys() -
*
* Generate the semkey and msgkey used for the Sys-V IPC objects.
*/
static int
ipc_generate_keys(char *archive_dir)
{
if (ipc_archive_dir != NULL)
{
free(ipc_archive_dir);
ipc_archive_dir = NULL;
}
/* ----
* Compute the two IPC keys used for the semaphore set and the
* message queue.
* ----
*/
semkey = ftok(archive_dir, 64);
if (semkey < 0)
{
fprintf(stderr, "ftok(%s, 64): %s\n", archive_dir, strerror(errno));
return -1;
}
msgkey = ftok(archive_dir, 65);
if (msgkey < 0)
{
fprintf(stderr, "ftok(%s, 65): %s\n", archive_dir, strerror(errno));
return -1;
}
ipc_archive_dir = strdup(archive_dir);
if (ipc_archive_dir == NULL)
{
fprintf(stderr, "out of memory in ipc_generate_keys()\n");
return -1;
}
return 0;
}
/*
* ipc_sighandler()
*
* Called on SIGINT and SIGTERM. Puts the daemon into immediate
* shutdown mode by sending ourself a -T message.
*/
static void
ipc_sighandler(int sig)
{
struct {
long mtype;
char mtext[1];
} msg;
msg.mtype = 3;
msgsnd(msgid, &msg, 0, 0);
}
--- NEW FILE: scan.l ---
%{
/*-------------------------------------------------------------------------
* scan.l
*
* Flex keyword and token scanner for slony_logshipper
*
* Copyright (c) 2003-2004, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
* $Id: scan.l,v 1.1.2.1 2007-09-08 14:21:40 wieck Exp $
*-------------------------------------------------------------------------
*/
/*
* Structure used to implement the input buffer stack.
*/
struct __yy_buffer
{
YY_BUFFER_STATE buffer; /* lexer buffer to restore on pop */
long lineno; /* line number to restore on pop */
char * fileName; /* file name to restore on pop */
FILE * yyin; /* yyin to restore on pop */
struct __yy_buffer * prev; /* pointer to previous stack frame */
} * yy_buffer = NULL;
/*
* Structure to hold defined symbols
*/
typedef struct _symbol
{
char * name; /* Name of symbol with % prepended */
char * value; /* Value of symbol */
struct _symbol * next; /* Pointer to next symbol */
} symbol;
/*
* A buffer to send large items like literals in pieces to the parser.
*/
char yychunk[YY_BUF_SIZE / 2];
/*
* Local data
*/
static symbol * symbols; /* Head of list of symbols */
static char *getSymbol(const char * name); /* Return a symbol's value */
static void addSymbol(char * str); /* Add a new symbol */
static void freeSymbols(void); /* Free all symbols */
static void pushBuffer(char *context);/* Push lexer buffer onto the stack */
static void popBuffer( void ); /* Pop previous lexer buffer */
extern char * current_file;
%}
%option 8bit
%option noyywrap
%option yylineno
%option case-insensitive
%option pointer
%x incl define
%x IN_STRING
%x COPYSTART COPY COPYLS
digit [0-9]
ident_start [A-Za-z\200-\377_]
ident_cont [A-Za-z\200-\377_0-9\$]
space [ \t\n\r\f]
instr_start (.|{space})
quoted_ident (\"[^\"]*\")+
identifier ({ident_start}{ident_cont}*|{quoted_ident})
conf_comment (#[^\n]*)
exec_ddl (--{space}DDL_SCRIPT{space}*)
archive_comment (--[^\n]*)
%%
include{space}* { BEGIN(incl); }
define{space}* { BEGIN(define); }
%{
/* ----------
* Keywords
* ----------
*/
%}
start_config { return K_START_CONFIG; }
start_archive { return K_START_ARCHIVE; }
{conf_comment} { return K_CONF_COMMENT; }
{archive_comment} { return K_ARCHIVE_COMMENT; }
{exec_ddl} { return K_EXEC_DDL; }
analyze { return K_ANALYZE; }
and { return K_AND; }
archive { return K_ARCHIVE; }
archives { return K_ARCHIVES; }
cluster { return K_CLUSTER; }
command { return K_COMMAND; }
commit { return K_COMMIT; }
copy { return K_COPY; }
database { return K_DATABASE; }
delete { return K_DELETE; }
destination { return K_DESTINATION; }
dir { return K_DIR; }
error { return K_ERROR; }
from { return K_FROM; }
ignore { return K_IGNORE; }
insert { return K_INSERT; }
into { return K_INTO; }
logfile { return K_LOGFILE; }
max { return K_MAX; }
name { return K_NAME; }
namespace { return K_NAMESPACE; }
null { return K_NULL; }
only { return K_ONLY; }
post { return K_POST; }
pre { return K_PRE; }
processing { return K_PROCESSING; }
rename { return K_RENAME; }
select { return K_SELECT; }
set { return K_SET; }
start { return K_START; }
table { return K_TABLE; }
to { return K_TO; }
transaction { return K_TRANSACTION; }
update { return K_UPDATE; }
vacuum { return K_VACUUM; }
values { return K_VALUES; }
where { return K_WHERE; }
archivetracking_offline { return T_TRACKING_FUNCTION; }
finishtableaftercopy { return T_FINISH_FUNCTION; }
sequencesetvalue_offline { return T_SEQSETVAL_FUNCTION; }
setval { return T_PGSETVAL_FUNCTION; }
%{
/* ----------
* Generic "things"
* ----------
*/
%}
{digit}+ { return T_NUMBER; }
{identifier} { return T_IDENT; }
{space}+ ;
' {
char *cp = yychunk;
int len = 0;
int c;
BEGIN(IN_STRING);
while(len < sizeof(yychunk) - 1)
{
c = input();
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside literal\n",
current_file);
BEGIN(INITIAL);
break;
}
if (c == '\\')
{
c = input();
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside literal\n",
current_file);
BEGIN(INITIAL);
break;
}
*cp++ = c;
len++;
continue;
}
if (c == '\'')
{
c = input();
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside literal\n",
current_file);
BEGIN(INITIAL);
break;
}
if (c == '\'')
{
*cp++ = c;
len++;
continue;
}
unput(c);
*cp = '\0';
BEGIN(INITIAL);
return T_LITERAL;
}
*cp++ = c;
len++;
}
*cp = '\0';
return T_LITERAL_PART;
}
<IN_STRING>{instr_start} {
char *cp = yychunk;
int len = 0;
int c;
bool chunk_start;
BEGIN(IN_STRING);
chunk_start = true;
while(len < sizeof(yychunk) - 1)
{
if (chunk_start)
{
c = *yytext;
chunk_start = false;
}
else
{
c = input();
}
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside literal\n",
current_file);
BEGIN(INITIAL);
break;
}
if (c == '\\')
{
c = input();
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside literal\n",
current_file);
BEGIN(INITIAL);
break;
}
*cp++ = c;
len++;
continue;
}
if (c == '\'')
{
c = input();
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside literal\n",
current_file);
BEGIN(INITIAL);
break;
}
if (c == '\'')
{
*cp++ = c;
len++;
continue;
}
unput(c);
*cp = '\0';
BEGIN(INITIAL);
return T_LITERAL;
}
*cp++ = c;
len++;
}
*cp = '\0';
return T_LITERAL_PART;
}
<COPYSTART>{space}+ {
BEGIN(COPYLS);
}
<COPYLS>\\.\n {
BEGIN(INITIAL);
return T_COPYEND;
}
<COPYLS>\\.\r\n {
BEGIN(INITIAL);
return T_COPYEND;
}
<COPYLS>.|{space} {
char *cp = yychunk;
int len = 0;
int c;
bool chunk_start;
BEGIN(COPY);
chunk_start = true;
while(len < sizeof(yychunk) - 2)
{
if (chunk_start)
{
c = yytext[0];
chunk_start = false;
}
else
{
c = input();
}
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside of COPY data\n",
current_file);
BEGIN(INITIAL);
return T_COPYEND;
}
*cp++ = c;
len++;
if (c == '\n')
{
*cp = '\0';
BEGIN(COPYLS);
return T_COPYDATA;
}
if (len == 5)
{
*cp = '\0';
return T_COPYDATA_PART;
}
}
*cp = '\0';
return T_COPYDATA_PART;
}
<COPY>.|{space} {
char *cp = yychunk;
int len = 0;
int c;
bool chunk_start;
chunk_start = true;
while(len < sizeof(yychunk) - 2)
{
if (chunk_start)
{
c = yytext[0];
chunk_start = false;
}
else
{
c = input();
}
if (c == EOF)
{
errlog(LOG_ERROR, "%s: EOF inside of COPY data\n",
current_file);
BEGIN(INITIAL);
return T_COPYEND;
}
*cp++ = c;
len++;
if (c == '\n')
{
*cp = '\0';
BEGIN(COPYLS);
return T_COPYDATA;
}
if (len == 5)
{
*cp = '\0';
return T_COPYDATA_PART;
}
}
*cp = '\0';
return T_COPYDATA_PART;
}
%{ /*
' {
start_charpos = yytext;
BEGIN(IN_STRING);
}
<IN_STRING>\\. { }
<IN_STRING>\\ { }
<IN_STRING>'' { }
<IN_STRING>' {
yyleng += (yytext - start_charpos) - 2;
yytext = start_charpos + 1;
BEGIN(INITIAL);
return T_LITERAL;
}
<IN_STRING>[^'\\]+ {}
*/
%}
<define>{identifier}{space}+.*";" { addSymbol( yytext ); BEGIN(INITIAL); }
@{identifier} {
char * value = getSymbol( yytext );
if( value )
{
pushBuffer( strdup( current_file ));
yy_scan_string( value );
}
}
<incl>\<[^\>]+\>{space}*";"? {
char * fileName = strdup( yytext + 1 ); /* Skip '<' */
*strchr( fileName, '>' ) = '\0'; /* Trim '>' */
pushBuffer( fileName );
if(( yyin = fopen( fileName, "r" )) == NULL )
{
errlog(LOG_ERROR, "Include file (%s) not found\n", fileName );
exit( 1 );
}
yy_switch_to_buffer( yy_create_buffer( yyin, YY_BUF_SIZE ));
BEGIN(INITIAL);
}
<<EOF>> {
if( yy_buffer == NULL )
{
yyterminate();
if (YY_CURRENT_BUFFER)
yy_delete_buffer(YY_CURRENT_BUFFER);
}
else
popBuffer();
}
#[^\r\n]* ;
. { return yytext[0]; }
%%
void
scan_new_input_file(FILE *in)
{
while (yy_buffer != NULL)
popBuffer();
if (YY_CURRENT_BUFFER)
yy_delete_buffer(YY_CURRENT_BUFFER);
yy_switch_to_buffer(yy_create_buffer(in, YY_BUF_SIZE));
yylineno = 1;
freeSymbols();
}
void scan_push_string(char *str)
{
pushBuffer(strdup("init"));
yy_scan_string (str);
}
int scan_yyinput(void)
{
return input();
}
void scan_copy_start(void)
{
BEGIN(COPYSTART);
}
void pushBuffer( char * context )
{
struct __yy_buffer * yb = malloc( sizeof( *yb ));
yb->buffer = YY_CURRENT_BUFFER;
yb->lineno = yylineno;
yb->fileName = current_file;
yb->yyin = yyin;
yb->prev = yy_buffer;
yy_buffer = yb;
current_file = context;
yylineno = 1;
yyin = NULL;
}
static void popBuffer( void )
{
struct __yy_buffer * yb = yy_buffer;
if( yyin != NULL )
fclose( yyin );
yy_delete_buffer( YY_CURRENT_BUFFER );
yy_switch_to_buffer( yy_buffer->buffer );
current_file = yy_buffer->fileName;
yylineno = yy_buffer->lineno;
yyin = yy_buffer->yyin;
yy_buffer = yy_buffer->prev;
free( yb );
}
static void addSymbol( char * str )
{
char * name = str;
char * value = str;
symbol * sym = NULL;
while( *value != ' ' && *value != '\t' )
value++;
*(value++) = '\0';
while( *value == ' ' || *value == '\t' )
value++;
value[strlen(value) - 1 ] = '\0';
sym = malloc( sizeof( *sym ));
sym->value = strdup( value );
sym->next = NULL;
/* Store the symbol name in searchable form with a leading @ */
sym->name = malloc( strlen( name ) + 1 + 1 );
sym->name[0] = '@';
strcpy( sym->name+1, name );
if( symbols != NULL )
sym->next = symbols;
symbols = sym;
}
static char * getSymbol( const char * name )
{
symbol * sym;
for( sym = symbols; sym; sym = sym->next )
{
if( strcmp( name, sym->name ) == 0 )
return( sym->value );
}
return( NULL );
}
static void freeSymbols( void )
{
symbol * sym = symbols;
while( sym )
{
symbol * victim = sym;
sym = sym->next;
free( victim->name );
free( victim->value );
free( victim );
}
symbols = NULL;
}
/*
* Local Variables:
* tab-width: 4
* c-indent-level: 4
* c-basic-offset: 4
* End:
*/
- Previous message: [Slony1-commit] slony1-engine/src/slon remote_worker.c
- Next message: [Slony1-commit] slony1-engine/src/ducttape test_8_logshipper.conf
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list