Sat Sep 8 19:37:07 PDT 2007
- Previous message: [Slony1-commit] slony1-engine/src/slon remote_worker.c
- Next message: [Slony1-commit] slony1-engine/doc/adminguide logshipping.sgml
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slony_logshipper In directory main.slony.info:/tmp/cvs-serv17109/src/slony_logshipper Added Files: .cvsignore Makefile dbutil.c ipcutil.c parser.y scan.l slony_logshipper.c slony_logshipper.h Log Message: Add slony_logshipper to HEAD. Jan --- NEW FILE: .cvsignore --- slony_logshipper parser.c y.tab.h scan.c --- NEW FILE: slony_logshipper.h --- /*------------------------------------------------------------------------- * slony_logshipper.h * * Definitions for slony_logshipper * * Copyright (c) 2003-2004, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * * $Id: slony_logshipper.h,v 1.2 2007-09-09 02:37:05 wieck Exp $ *------------------------------------------------------------------------- */ /* ---------- * SlonDString * ---------- */ #define SLON_DSTRING_SIZE_INIT 256 #define SLON_DSTRING_SIZE_INC 2 typedef struct { size_t n_alloc; size_t n_used; char *data; } SlonDString; #define dstring_init(__ds) \ do { \ (__ds)->n_alloc = SLON_DSTRING_SIZE_INIT; \ (__ds)->n_used = 0; \ (__ds)->data = malloc(SLON_DSTRING_SIZE_INIT); \ if ((__ds)->data == NULL) { \ perror("dstring_init: malloc()"); \ exit(-1); \ } \ } while (0) #define dstring_reset(__ds) \ do { \ (__ds)->n_used = 0; \ (__ds)->data[0] = '\0'; \ } while (0) #define dstring_free(__ds) \ do { \ free((__ds)->data); \ (__ds)->n_used = 0; \ (__ds)->data = NULL; \ } while (0) #define dstring_nappend(__ds,__s,__n) \ do { \ if ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ { \ while ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ if ((__ds)->data == NULL) \ { \ perror("dstring_nappend: realloc()"); \ exit(-1); \ } \ } \ memcpy(&((__ds)->data[(__ds)->n_used]), (__s), (__n)); \ (__ds)->n_used += (__n); \ } while (0) #define dstring_append(___ds,___s) \ do { \ register int ___n = strlen((___s)); \ dstring_nappend((___ds),(___s),___n); \ } while (0) #define dstring_addchar(__ds,__c) \ do { \ if ((__ds)->n_used + 1 >= (__ds)->n_alloc) \ { \ (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ if ((__ds)->data == NULL) \ { \ perror("dstring_append: realloc()"); \ exit(-1); \ } \ } \ (__ds)->data[(__ds)->n_used++] = (__c); \ } while (0) #define dstring_terminate(__ds) \ do { \ (__ds)->data[(__ds)->n_used] = '\0'; \ } while (0) #define dstring_data(__ds) ((__ds)->data) /* * Parser data structures */ typedef struct AttElem_s { char *attname; char *attvalue; struct AttElem_s *next; } AttElem; typedef struct AttElemList_s { AttElem *list_head; AttElem *list_tail; } AttElemList; typedef struct InsertStmt_s { char *namespace; char *tablename; AttElemList *attributes; } InsertStmt; typedef struct UpdateStmt_s { char *namespace; char *tablename; AttElemList *changes; AttElemList *qualification; } UpdateStmt; typedef struct DeleteStmt_s { char *namespace; char *tablename; int only; AttElemList *qualification; } DeleteStmt; typedef struct CopyStmt_s { char *namespace; char *tablename; AttElemList *attributes; char *from; } CopyStmt; typedef struct RenameObject_s { char *old_namespace; char *old_name; char *new_namespace; char *new_name; struct RenameObject_s *next; } RenameObject; typedef struct ProcessingCommand_s { char *command; struct ProcessingCommand_s *next; } ProcessingCommand; typedef enum { LOG_DEBUG = 0, LOG_INFO, LOG_WARN, LOG_ERROR } log_level; #ifndef MSGMAX #define MSGMAX 1024 #endif /* * Globals in slony_logshipper.c */ extern int parse_errors; extern char *current_file; extern int opt_quiet; extern PGconn *dbconn; extern bool logfile_switch_requested; extern bool wait_for_resume; extern bool shutdown_smart_requested; extern bool shutdown_immed_requested; extern SlonDString errlog_messages; extern char *archive_dir; extern char *destination_dir; extern char *destination_conninfo; extern char *logfile_path; extern int max_archives; extern char *cluster_name; extern char *namespace; extern RenameObject *rename_list; extern ProcessingCommand *pre_processing_commands; extern ProcessingCommand *post_processing_commands; extern ProcessingCommand *error_commands; /* * Functions in slony_logshipper.c */ extern int process_check_at_counter(char *at_counter); extern int process_simple_sql(char *sql); extern int process_start_transaction(char *sql); extern int process_end_transaction(char *sql); extern int process_insert(InsertStmt *stmt); extern int process_update(UpdateStmt *stmt); extern int process_delete(DeleteStmt *stmt); extern int process_copy(CopyStmt *stmt); extern int process_copydata(char *line); extern int process_copyend(void); extern void config_add_rename(RenameObject *entry); extern int lookup_rename(char *namespace, char *name, char **use_namespace, char **use_name); extern void errlog(log_level level, char *fmt, ...); /* * Functions in dbutil.c */ int slon_mkquery(SlonDString * dsp, char *fmt,...); int slon_appendquery(SlonDString * dsp, char *fmt,...); /* * Functions in ipcutil.c */ int ipc_init(char *archive_dir); int ipc_finish(bool force); int ipc_poll(bool blocking); int ipc_send_path(char *logfname); int ipc_recv_path(char *buf); int ipc_send_term(char *archive_dir, bool immediate); int ipc_send_logswitch(char *archive_dir); int ipc_send_resume(char *archive_dir); int ipc_lock(void); int ipc_unlock(void); int ipc_cleanup(char *archive_dir); void ipc_set_shutdown_smart(void); void ipc_set_shutdown_immed(void); /* * Parser related globals */ extern int yylineno; extern char *yytext; extern FILE *yyin; extern char yychunk[]; extern void scan_new_input_file(FILE *in); extern void scan_push_string(char *str); extern int scan_yyinput(void); extern void scan_copy_start(void); extern void parse_error(const char *str); extern void yyerror(const char *str); extern int yyparse(void); extern int yylex(void); /* * Local Variables: * tab-width: 4 * c-indent-level: 4 * c-basic-offset: 4 * End: */ --- NEW FILE: Makefile --- # ---------- # Makefile for src/slony_logshipper # # Copyright (c) 2003-2004, PostgreSQL Global Development Group # Author: Jan Wieck, Afilias USA INC. # # $Id: Makefile,v 1.2 2007-09-09 02:37:05 wieck Exp $ # ---------- slony_subdir = src/slony_logshipper slony_top_builddir = ../.. SLFILEDESC="Slony command interpreter" include $(slony_top_builddir)/Makefile.global ifeq ($(PORTNAME), aix) CFLAGS += -D_LARGE_FILES endif CFLAGS += -I$(slony_top_builddir) -DPGSHARE="\"$(pgsharedir)\"" PROG = slony_logshipper ifeq ($(PORTNAME), win) PROG = slony_logshipper.exe LDFLAG = $(LDFLAG) -lpgport endif ifeq ($(PORTNAME), win32) PROG = slony_logshipper.exe LDFLAG = $(LDFLAG) -lpgport endif OBJS = \ slony_logshipper.o \ dbutil.o \ ipcutil.o \ parser.o $(WIN32RES) \ ../parsestatements/scanner.o DISTFILES = Makefile $(wildcard *.c) $(wildcard *.h) $(wildcard *.l) $(wildcard *.y) ALL = \ $(PROG) all: $(ALL) $(PROG): $(OBJS) $(CC) $(CFLAGS) $(OBJS) $(LDFLAGS) -o $(PROG) slony_logshipper.o: slony_logshipper.c slony_logshipper.h dbutil.o: dbutil.c slony_logshipper.h parser.o: parser.c scan.c parser.c: parser.y slony_logshipper.h ifdef YACC $(YACC) -d $(YFLAGS) $< mv -f y.tab.c parser.c else @echo "Missing yacc $< $@" @exit 1 endif scan.c: scan.l slony_logshipper.h ifdef FLEX $(FLEX) $(FLEXFLAGS) -o'$@' $< else @echo "Missing flex $< $@" @exit endif clean distclean: rm -f $(ALL) $(OBJS) $(PROG).core rm -f parser.c scan.c y.tab.h maintainer-clean: clean rm -f parser.c scan.c y.tab.h install: all installdirs $(INSTALL_SCRIPT) $(PROG) $(DESTDIR)$(slonbindir) installdirs: $(mkinstalldirs) $(DESTDIR)$(slonbindir) distdir: $(DISTFILES) mkdir $(distdir)/$(subdir) -chmod 777 $(distdir)/$(subdir) for file in $(DISTFILES) ; do \ cp $$file $(distdir)/$(subdir)/$$file || exit; \ done --- NEW FILE: parser.y --- %{ /*------------------------------------------------------------------------- * parser.y * * The slony_logshipper command language grammar * * Copyright (c) 2003-2004, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * * $Id: parser.y,v 1.2 2007-09-09 02:37:05 wieck Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" #include "libpq-fe.h" #include "slony_logshipper.h" #include "../parsestatements/scanner.h" extern int STMTS[MAXSTATEMENTS]; [...1072 lines suppressed...] errlog(LOG_ERROR, "%s:%d: ERROR %s\n", current_file, yylineno, msg); parse_errors++; } /* * Include the output of fles for the scanner here. */ #include "scan.c" /* * Local Variables: * tab-width: 4 * c-indent-level: 4 * c-basic-offset: 4 * End: */ --- NEW FILE: dbutil.c --- /*------------------------------------------------------------------------- * dbutil.c * * General database support functions. * * Copyright (c) 2003-2004, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * * $Id: dbutil.c,v 1.2 2007-09-09 02:37:05 wieck Exp $ *------------------------------------------------------------------------- */ #ifndef WIN32 #include <stdio.h> #include <stdlib.h> #include <stdarg.h> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h> #endif #include "postgres.h" #include "libpq-fe.h" #include "slony_logshipper.h" /* * Local functions */ static int slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap); /* ---------- * slon_mkquery * * A simple query formatting and quoting function using dynamic string * buffer allocation. * Similar to sprintf() it uses formatting symbols: * %s String argument * %q Quoted literal (\ and ' will be escaped) * %d Integer argument * ---------- */ int slon_mkquery(SlonDString * dsp, char *fmt,...) { va_list ap; dstring_reset(dsp); va_start(ap, fmt); slon_appendquery_int(dsp, fmt, ap); va_end(ap); dstring_terminate(dsp); return 0; } /* ---------- * slon_appendquery * * Append query string material to an existing dynamic string. * ---------- */ int slon_appendquery(SlonDString * dsp, char *fmt,...) { va_list ap; va_start(ap, fmt); slon_appendquery_int(dsp, fmt, ap); va_end(ap); dstring_terminate(dsp); return 0; } /* ---------- * slon_appendquery_int * * Implementation of slon_mkquery() and slon_appendquery(). * ---------- */ static int slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap) { char *s; char buf[64]; while (*fmt) { switch (*fmt) { case '%': fmt++; switch (*fmt) { case 's': s = va_arg(ap, char *); dstring_append(dsp, s); fmt++; break; case 'q': s = va_arg(ap, char *); while (s && *s != '\0') { switch (*s) { case '\'': dstring_addchar(dsp, '\''); break; case '\\': dstring_addchar(dsp, '\\'); break; default: break; } dstring_addchar(dsp, *s); s++; } fmt++; break; case 'Q': s = va_arg(ap, char *); while (s && *s != '\0') { switch (*s) { case '\'': case '\\': dstring_addchar(dsp, '\\'); break; default: break; } dstring_addchar(dsp, *s); s++; } fmt++; break; case 'd': sprintf(buf, "%d", va_arg(ap, int)); dstring_append(dsp, buf); fmt++; break; default: dstring_addchar(dsp, '%'); dstring_addchar(dsp, *fmt); fmt++; break; } break; case '\\': fmt++; dstring_addchar(dsp, *fmt); fmt++; break; default: dstring_addchar(dsp, *fmt); fmt++; break; } } dstring_terminate(dsp); return 0; } --- NEW FILE: slony_logshipper.c --- /*------------------------------------------------------------------------- * slony_logshipper.c * * A configuration and admin script utility for Slony-I. * * Copyright (c) 2003-2004, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * * $Id: slony_logshipper.c,v 1.2 2007-09-09 02:37:05 wieck Exp $ *------------------------------------------------------------------------- */ #ifndef WIN32 #include <stdio.h> #include <stdlib.h> #include <stdarg.h> #include <unistd.h> #include <fcntl.h> [...1371 lines suppressed...] } free(buf); if (archscan_sort_out(ent->right) < 0) return -1; free(ent->fname); free(ent); return 0; } /* * Local Variables: * tab-width: 4 * c-indent-level: 4 * c-basic-offset: 4 * End: */ --- NEW FILE: ipcutil.c --- /*------------------------------------------------------------------------- * ipcutil.c * * Semaphore and message passing support for the slony_logshipper. * * Copyright (c) 2003-2004, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * * $Id: ipcutil.c,v 1.2 2007-09-09 02:37:05 wieck Exp $ *------------------------------------------------------------------------- */ #ifndef WIN32 #include <stdio.h> #include <stdlib.h> #include <stdarg.h> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/msg.h> #include <sys/wait.h> #include <errno.h> #include <signal.h> #endif #include "postgres.h" #include "libpq-fe.h" #include "slony_logshipper.h" /* * The daemonized logshipper keeps a sorted queue of archive * files that need processing. */ typedef struct queue_elem_s { char *archive_path; struct queue_elem_s *next; } queue_elem; /* * Static data */ static char *ipc_archive_dir = NULL; static key_t semkey; static key_t msgkey; static int semid; static int msgid; static int ipc_creator; static queue_elem *archive_queue_head = NULL; static queue_elem *archive_queue_tail = NULL; /* * Local functions */ static int ipc_generate_keys(char *archive_dir); static void ipc_sighandler(int sig); static int ipc_add_path(char *path); static int ipc_send_code(char *archive_dir, int code); /* * ipc_init() - * * Called to setup or connect to the semaphore set and message queue. */ int ipc_init(char *archive_dir) { struct sembuf sops[2]; if (ipc_generate_keys(archive_dir) < 0) return -1; /* * We eventually have to start over again in case * the existing daemon destroys the semaphore set * after we attached and before we can lock it. */ while (true) { /* * Create or attach to the semaphore set */ semid = semget(semkey, 2, 0700 | IPC_CREAT); if (semid < 0) { fprintf(stderr, "cannot create or attache to semaphore set\n" "semget(): %s\n", strerror(errno)); return -1; } /* * We now do two initial operations with NOWAIT: * wait for #1 =0 * inc sem #1 +1 * We never again touch semaphore #1, so this either succeeds, meaning * that we created the set and hold the current lock. Or it fails with * EAGAIN, meaning we attached to an existing set. Or it fails with * EIDRM, meaning the set was destroyed. */ sops[0].sem_num = 1; sops[0].sem_op = 0; sops[0].sem_flg = IPC_NOWAIT; sops[1].sem_num = 1; sops[1].sem_op = 1; sops[1].sem_flg = 0; if (semop(semid, sops, 2) < 0) { if (errno == EIDRM) continue; if (errno != EAGAIN) { fprintf(stderr, "semop failed in ipc_init(): %s", strerror(errno)); return -1; } /* * Grab the lock on semaphore #0 */ if (ipc_lock() < 0) { /* * Since theres a gap between attaching and locking, the * set could have been destroyed. In that case, start over. */ if (errno == EIDRM) continue; fprintf(stderr, "semop() failed in ipc_init(): %s\n", strerror(errno)); return -1; } ipc_creator = 0; } else { /* * Initial semop succeeded - we are the creator of this set. */ ipc_creator = 1; signal(SIGINT, ipc_sighandler); signal(SIGTERM, ipc_sighandler); signal(SIGPIPE, ipc_sighandler); } break; } /* * At this point we have the semaphore set and it is locked. */ msgid = msgget(msgkey, 0700 | IPC_CREAT); if (msgid < 0) { fprintf(stderr, "msgget() failed in ipc_init(): %s\n", strerror(errno)); if (ipc_creator) { if (semctl(semid, 0, IPC_RMID) < 0) fprintf(stderr, "semctl() failed in ipc_init(): %s\n", strerror(errno)); } return -1; } return ipc_creator; } /* * ipc_finish() - * * Locks and destroys the semaphore set and message queue if called * in the daemonized queue runner. If force isn't given, it will * only do so if the archive queue is empty after locking the set * and draining the message queue. */ int ipc_finish(bool force) { if (ipc_creator) { if (!force) { /* * We are the creator of the semaphore set, so if this isn't * a force operation, we lock it first, poll the message queue * and check that we have an empty queue. */ if (ipc_lock() < 0) { fprintf(stderr, "semop() failed in ipc_finish(): %s\n", strerror(errno)); return -1; } if (ipc_poll(false) < 0) return -1; if (archive_queue_head != NULL) { if (ipc_unlock() < 0) { fprintf(stderr, "semop() failed in ipc_finish(): %s\n", strerror(errno)); return -1; } return 1; } } /* * At this point, we are either forced to stop or we have a lock * and the queue is empty. */ if (msgctl(msgid, IPC_RMID, NULL) < 0) { fprintf(stderr, "msgctl() failed in ipc_finish(): %s\n", strerror(errno)); semctl(semid, 0, IPC_RMID); return -1; } if (semctl(semid, 0, IPC_RMID) < 0) { fprintf(stderr, "semctl() failed in ipc_finish(): %s\n", strerror(errno)); return -1; } } return 0; } /* * ipc_poll() - * * Check for incoming messages */ int ipc_poll(bool blocking) { int rc; struct { long mtype; char mtext[MSGMAX]; } msg; while(true) { rc = msgrcv(msgid, &msg, sizeof(msg), 0, (blocking) ? 0 : IPC_NOWAIT); if (rc < 0) { if (errno == ENOMSG) return 0; fprintf(stderr, "msgrcv() failed in ipc_poll(): %s\n", strerror(errno)); return -1; } if (msg.mtype == 2) shutdown_smart_requested = true; else if (msg.mtype == 3) shutdown_immed_requested = true; else if (msg.mtype == 4) wait_for_resume = false; else if (msg.mtype == 5) logfile_switch_requested = true; else if (ipc_add_path(msg.mtext) < 0) return -1; if (blocking) break; } return 0; } /* * ipc_add_path() - * * Add an archive path to the sorted queue */ static int ipc_add_path(char *path) { queue_elem **elemp; queue_elem *elem; if ((elem = (queue_elem *)malloc(sizeof(queue_elem))) == NULL) { fprintf(stderr, "out of memory in ipc_add_path()\n"); return -1; } if ((elem->archive_path = strdup(path)) == NULL) { fprintf(stderr, "out of memory in ipc_add_path()\n"); return -1; } elem->next = NULL; /* * See if we have to insert it in front of something else */ for (elemp = &archive_queue_head; *elemp != NULL; elemp = &((*elemp)->next)) { if (strcmp(elem->archive_path, (*elemp)->archive_path) < 0) { elem->next = *elemp; *elemp = elem; return 0; } } if (archive_queue_head == NULL) { archive_queue_head = elem; archive_queue_tail = elem; } else { archive_queue_tail->next = elem; archive_queue_tail = elem; } return 0; } /* * ipc_send_path() - * * Add an archive path to the daemons archive queue. Done by calling * ipc_add_path() or sending the path to the daemon. */ int ipc_send_path(char *logfname) { struct { long mtype; char mtext[MSGMAX]; } msg; if (strlen(logfname) > (MSGMAX - 1)) { fprintf(stderr, "path exceeds MSGMAX: %s\n", logfname); return -1; } /* * As the creator, we are also the consumer, so we simply add the * file to the queue. */ if (ipc_creator) return ipc_add_path(logfname); msg.mtype = 1; strcpy(msg.mtext, logfname); if (msgsnd(msgid, &msg, strlen(logfname) + 1, 0) < 0) { fprintf(stderr, "msgsnd() in ipc_send_path() failed: %s\n", strerror(errno)); return -1; } return 0; } /* * ipc_recv_path() * * Get the next archive file to process from the queue. */ int ipc_recv_path(char *buf) { queue_elem *elem; int rc; struct { long mtype; char mtext[MSGMAX]; } msg; while (true) { if (ipc_poll(false) < 0) { ipc_finish(true); return -1; } /* * If something requested an immediate shutdown, don't report any * more logfiles back. */ if (shutdown_immed_requested) { ipc_finish(true); return 0; } /* * If a smart shutdown was requested, try to close the queue * but don't force it. */ if (shutdown_smart_requested) { if ((rc = ipc_finish(false)) == 0) { return 0; } if (rc < 0) { ipc_finish(true); return -1; } } /* * If we have something in the queue, return that. */ if (archive_queue_head != NULL) { elem = archive_queue_head; archive_queue_head = archive_queue_head->next; if (archive_queue_head == NULL) archive_queue_tail = NULL; strcpy(buf, elem->archive_path); free(elem->archive_path); free(elem); return 1; } /* * Receive one single message blocking for it. */ rc = msgrcv(msgid, &msg, sizeof(msg), 0, 0); if (rc < 0) { if (errno == EINTR) continue; fprintf(stderr, "msgrcv() failed in ipc_recv_path(): %s\n", strerror(errno)); ipc_finish(true); return -1; } if (msg.mtype == 2) shutdown_smart_requested = true; else if (msg.mtype == 3) shutdown_immed_requested = true; else if (msg.mtype == 4) wait_for_resume = false; else if (msg.mtype == 5) logfile_switch_requested = true; else if (ipc_add_path(msg.mtext) < 0) { ipc_finish(true); return -1; } } } /* * ipc_send_term() - * * Send a terminate request to the daemon. */ int ipc_send_term(char *archive_dir, bool immediate) { return ipc_send_code(archive_dir, (immediate) ? 3 : 2); } /* * ipc_send_logswitch() - * * Send a logswitch code to the daemon. */ int ipc_send_logswitch(char *archive_dir) { return ipc_send_code(archive_dir, 5); } /* * ipc_send_resume() - * * Send a resume (after error) code to the daemon. */ int ipc_send_resume(char *archive_dir) { return ipc_send_code(archive_dir, 4); } /* * ipc_send_code() - * * Support function for ipc_send_term() and ipc_send_resume(). */ static int ipc_send_code(char *archive_dir, int code) { struct { long mtype; char mtext[1]; } msg; if (ipc_generate_keys(archive_dir) < 0) return -1; if ((semid = semget(semkey, 0, 0)) < 0) { if (!opt_quiet) fprintf(stderr, "no logshipper daemon running\n"); return 2; } if (ipc_lock() < 0) return -1; if ((msgid = msgget(msgkey, 0)) < 0) { fprintf(stderr, "msgget() failed in ipc_send_code(): %s\n", strerror(errno)); ipc_unlock(); return -1; } msg.mtype = (long)code; if (msgsnd(msgid, &msg, 0, 0) < 0) { fprintf(stderr, "msgsnd() failed in ipc_send_code(): %s\n", strerror(errno)); ipc_unlock(); return -1; } return ipc_unlock(); } /* * ipc_lock() * * Lock the semaphore. */ int ipc_lock(void) { struct sembuf sops[1] = {{0, -1, 0}}; if (semop(semid, sops, 1) < 0) { fprintf(stderr, "semop() failed in ipc_lock(): %s\n", strerror(errno)); return -1; } return 0; } /* * ipc_unlock() * * Unlock the semaphore. */ int ipc_unlock(void) { struct sembuf sops[1] = {{0, 1, 0}}; if (semop(semid, sops, 1) < 0) { fprintf(stderr, "semop() failed in ipc_lock(): %s\n", strerror(errno)); return -1; } return 0; } /* * ipc_cleanup() - * * Destroy the semaphore set and message queue. Use with caution, this * code does not check if there is a daemon running. */ int ipc_cleanup(char *archive_dir) { int rc = 0; if (ipc_generate_keys(archive_dir) < 0) return -1; if ((semid = semget(semkey, 0, 0)) >= 0) { if (semctl(semid, 0, IPC_RMID) < 0) { fprintf(stderr, "semctl() failed in ipc_cleanup(): %s\n", strerror(errno)); rc = -1; } else if (!opt_quiet) fprintf(stderr, "semaphore set removed\n"); rc = 1; } else if (!opt_quiet) fprintf(stderr, "no semaphore set found\n"); if ((msgid = msgget(msgkey, 0)) >= 0) { if (msgctl(msgid, IPC_RMID, NULL) < 0) { fprintf(stderr, "msgctl() failed in ipc_cleanup(): %s\n", strerror(errno)); rc = -1; } else if (!opt_quiet) fprintf(stderr, "message queue removed\n"); if (rc >= 0) rc |= 2; } else if (!opt_quiet) fprintf(stderr, "no message queue found\n"); return rc; } /* * ipc_set_shutdown_smart() - * * Put the queue into smart shutdown mode. This will cause * ipc_recv_path() to return 0 once the queue is empty. */ void ipc_set_shutdown_smart(void) { shutdown_smart_requested = true; } /* * ipc_set_shutdown_immed() - * * Put the queue into immediate shutdown mode. This will cause * ipc_recv_path() to return 0 at the next call. */ void ipc_set_shutdown_immed(void) { shutdown_immed_requested = true; } /* * ipc_generate_keys() - * * Generate the semkey and msgkey used for the Sys-V IPC objects. */ static int ipc_generate_keys(char *archive_dir) { if (ipc_archive_dir != NULL) { free(ipc_archive_dir); ipc_archive_dir = NULL; } /* ---- * Compute the two IPC keys used for the semaphore set and the * message queue. * ---- */ semkey = ftok(archive_dir, 64); if (semkey < 0) { fprintf(stderr, "ftok(%s, 64): %s\n", archive_dir, strerror(errno)); return -1; } msgkey = ftok(archive_dir, 65); if (msgkey < 0) { fprintf(stderr, "ftok(%s, 65): %s\n", archive_dir, strerror(errno)); return -1; } ipc_archive_dir = strdup(archive_dir); if (ipc_archive_dir == NULL) { fprintf(stderr, "out of memory in ipc_generate_keys()\n"); return -1; } return 0; } /* * ipc_sighandler() * * Called on SIGINT and SIGTERM. Puts the daemon into immediate * shutdown mode by sending ourself a -T message. */ static void ipc_sighandler(int sig) { struct { long mtype; char mtext[1]; } msg; msg.mtype = 3; msgsnd(msgid, &msg, 0, 0); } --- NEW FILE: scan.l --- %{ /*------------------------------------------------------------------------- * scan.l * * Flex keyword and token scanner for slony_logshipper * * Copyright (c) 2003-2004, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * * $Id: scan.l,v 1.2 2007-09-09 02:37:05 wieck Exp $ *------------------------------------------------------------------------- */ /* * Structure used to implement the input buffer stack. */ struct __yy_buffer { YY_BUFFER_STATE buffer; /* lexer buffer to restore on pop */ long lineno; /* line number to restore on pop */ char * fileName; /* file name to restore on pop */ FILE * yyin; /* yyin to restore on pop */ struct __yy_buffer * prev; /* pointer to previous stack frame */ } * yy_buffer = NULL; /* * Structure to hold defined symbols */ typedef struct _symbol { char * name; /* Name of symbol with % prepended */ char * value; /* Value of symbol */ struct _symbol * next; /* Pointer to next symbol */ } symbol; /* * A buffer to send large items like literals in pieces to the parser. */ char yychunk[YY_BUF_SIZE / 2]; /* * Local data */ static symbol * symbols; /* Head of list of symbols */ static char *getSymbol(const char * name); /* Return a symbol's value */ static void addSymbol(char * str); /* Add a new symbol */ static void freeSymbols(void); /* Free all symbols */ static void pushBuffer(char *context);/* Push lexer buffer onto the stack */ static void popBuffer( void ); /* Pop previous lexer buffer */ extern char * current_file; %} %option 8bit %option noyywrap %option yylineno %option case-insensitive %option pointer %x incl define %x IN_STRING %x COPYSTART COPY COPYLS digit [0-9] ident_start [A-Za-z\200-\377_] ident_cont [A-Za-z\200-\377_0-9\$] space [ \t\n\r\f] instr_start (.|{space}) quoted_ident (\"[^\"]*\")+ identifier ({ident_start}{ident_cont}*|{quoted_ident}) conf_comment (#[^\n]*) exec_ddl (--{space}DDL_SCRIPT{space}*) archive_comment (--[^\n]*) %% include{space}* { BEGIN(incl); } define{space}* { BEGIN(define); } %{ /* ---------- * Keywords * ---------- */ %} start_config { return K_START_CONFIG; } start_archive { return K_START_ARCHIVE; } {conf_comment} { return K_CONF_COMMENT; } {archive_comment} { return K_ARCHIVE_COMMENT; } {exec_ddl} { return K_EXEC_DDL; } analyze { return K_ANALYZE; } and { return K_AND; } archive { return K_ARCHIVE; } archives { return K_ARCHIVES; } cluster { return K_CLUSTER; } command { return K_COMMAND; } commit { return K_COMMIT; } copy { return K_COPY; } database { return K_DATABASE; } delete { return K_DELETE; } destination { return K_DESTINATION; } dir { return K_DIR; } error { return K_ERROR; } from { return K_FROM; } ignore { return K_IGNORE; } insert { return K_INSERT; } into { return K_INTO; } logfile { return K_LOGFILE; } max { return K_MAX; } name { return K_NAME; } namespace { return K_NAMESPACE; } null { return K_NULL; } only { return K_ONLY; } post { return K_POST; } pre { return K_PRE; } processing { return K_PROCESSING; } rename { return K_RENAME; } replica { return K_REPLICA; } select { return K_SELECT; } session_replication_role { return K_SESSION_ROLE; } set { return K_SET; } start { return K_START; } table { return K_TABLE; } to { return K_TO; } transaction { return K_TRANSACTION; } update { return K_UPDATE; } vacuum { return K_VACUUM; } values { return K_VALUES; } where { return K_WHERE; } archivetracking_offline { return T_TRACKING_FUNCTION; } finishtableaftercopy { return T_FINISH_FUNCTION; } sequencesetvalue_offline { return T_SEQSETVAL_FUNCTION; } setval { return T_PGSETVAL_FUNCTION; } %{ /* ---------- * Generic "things" * ---------- */ %} {digit}+ { return T_NUMBER; } {identifier} { return T_IDENT; } {space}+ ; ' { char *cp = yychunk; int len = 0; int c; BEGIN(IN_STRING); while(len < sizeof(yychunk) - 1) { c = input(); if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside literal\n", current_file); BEGIN(INITIAL); break; } if (c == '\\') { c = input(); if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside literal\n", current_file); BEGIN(INITIAL); break; } *cp++ = c; len++; continue; } if (c == '\'') { c = input(); if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside literal\n", current_file); BEGIN(INITIAL); break; } if (c == '\'') { *cp++ = c; len++; continue; } unput(c); *cp = '\0'; BEGIN(INITIAL); return T_LITERAL; } *cp++ = c; len++; } *cp = '\0'; return T_LITERAL_PART; } <IN_STRING>{instr_start} { char *cp = yychunk; int len = 0; int c; bool chunk_start; BEGIN(IN_STRING); chunk_start = true; while(len < sizeof(yychunk) - 1) { if (chunk_start) { c = *yytext; chunk_start = false; } else { c = input(); } if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside literal\n", current_file); BEGIN(INITIAL); break; } if (c == '\\') { c = input(); if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside literal\n", current_file); BEGIN(INITIAL); break; } *cp++ = c; len++; continue; } if (c == '\'') { c = input(); if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside literal\n", current_file); BEGIN(INITIAL); break; } if (c == '\'') { *cp++ = c; len++; continue; } unput(c); *cp = '\0'; BEGIN(INITIAL); return T_LITERAL; } *cp++ = c; len++; } *cp = '\0'; return T_LITERAL_PART; } <COPYSTART>{space}+ { BEGIN(COPYLS); } <COPYLS>\\.\n { BEGIN(INITIAL); return T_COPYEND; } <COPYLS>\\.\r\n { BEGIN(INITIAL); return T_COPYEND; } <COPYLS>.|{space} { char *cp = yychunk; int len = 0; int c; bool chunk_start; BEGIN(COPY); chunk_start = true; while(len < sizeof(yychunk) - 2) { if (chunk_start) { c = yytext[0]; chunk_start = false; } else { c = input(); } if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside of COPY data\n", current_file); BEGIN(INITIAL); return T_COPYEND; } *cp++ = c; len++; if (c == '\n') { *cp = '\0'; BEGIN(COPYLS); return T_COPYDATA; } if (len == 5) { *cp = '\0'; return T_COPYDATA_PART; } } *cp = '\0'; return T_COPYDATA_PART; } <COPY>.|{space} { char *cp = yychunk; int len = 0; int c; bool chunk_start; chunk_start = true; while(len < sizeof(yychunk) - 2) { if (chunk_start) { c = yytext[0]; chunk_start = false; } else { c = input(); } if (c == EOF) { errlog(LOG_ERROR, "%s: EOF inside of COPY data\n", current_file); BEGIN(INITIAL); return T_COPYEND; } *cp++ = c; len++; if (c == '\n') { *cp = '\0'; BEGIN(COPYLS); return T_COPYDATA; } if (len == 5) { *cp = '\0'; return T_COPYDATA_PART; } } *cp = '\0'; return T_COPYDATA_PART; } %{ /* ' { start_charpos = yytext; BEGIN(IN_STRING); } <IN_STRING>\\. { } <IN_STRING>\\ { } <IN_STRING>'' { } <IN_STRING>' { yyleng += (yytext - start_charpos) - 2; yytext = start_charpos + 1; BEGIN(INITIAL); return T_LITERAL; } <IN_STRING>[^'\\]+ {} */ %} <define>{identifier}{space}+.*";" { addSymbol( yytext ); BEGIN(INITIAL); } @{identifier} { char * value = getSymbol( yytext ); if( value ) { pushBuffer( strdup( current_file )); yy_scan_string( value ); } } <incl>\<[^\>]+\>{space}*";"? { char * fileName = strdup( yytext + 1 ); /* Skip '<' */ *strchr( fileName, '>' ) = '\0'; /* Trim '>' */ pushBuffer( fileName ); if(( yyin = fopen( fileName, "r" )) == NULL ) { errlog(LOG_ERROR, "Include file (%s) not found\n", fileName ); exit( 1 ); } yy_switch_to_buffer( yy_create_buffer( yyin, YY_BUF_SIZE )); BEGIN(INITIAL); } <<EOF>> { if( yy_buffer == NULL ) { yyterminate(); if (YY_CURRENT_BUFFER) yy_delete_buffer(YY_CURRENT_BUFFER); } else popBuffer(); } #[^\r\n]* ; . { return yytext[0]; } %% void scan_new_input_file(FILE *in) { while (yy_buffer != NULL) popBuffer(); if (YY_CURRENT_BUFFER) yy_delete_buffer(YY_CURRENT_BUFFER); yy_switch_to_buffer(yy_create_buffer(in, YY_BUF_SIZE)); yylineno = 1; freeSymbols(); } void scan_push_string(char *str) { pushBuffer(strdup("init")); yy_scan_string (str); } int scan_yyinput(void) { return input(); } void scan_copy_start(void) { BEGIN(COPYSTART); } void pushBuffer( char * context ) { struct __yy_buffer * yb = malloc( sizeof( *yb )); yb->buffer = YY_CURRENT_BUFFER; yb->lineno = yylineno; yb->fileName = current_file; yb->yyin = yyin; yb->prev = yy_buffer; yy_buffer = yb; current_file = context; yylineno = 1; yyin = NULL; } static void popBuffer( void ) { struct __yy_buffer * yb = yy_buffer; if( yyin != NULL ) fclose( yyin ); yy_delete_buffer( YY_CURRENT_BUFFER ); yy_switch_to_buffer( yy_buffer->buffer ); current_file = yy_buffer->fileName; yylineno = yy_buffer->lineno; yyin = yy_buffer->yyin; yy_buffer = yy_buffer->prev; free( yb ); } static void addSymbol( char * str ) { char * name = str; char * value = str; symbol * sym = NULL; while( *value != ' ' && *value != '\t' ) value++; *(value++) = '\0'; while( *value == ' ' || *value == '\t' ) value++; value[strlen(value) - 1 ] = '\0'; sym = malloc( sizeof( *sym )); sym->value = strdup( value ); sym->next = NULL; /* Store the symbol name in searchable form with a leading @ */ sym->name = malloc( strlen( name ) + 1 + 1 ); sym->name[0] = '@'; strcpy( sym->name+1, name ); if( symbols != NULL ) sym->next = symbols; symbols = sym; } static char * getSymbol( const char * name ) { symbol * sym; for( sym = symbols; sym; sym = sym->next ) { if( strcmp( name, sym->name ) == 0 ) return( sym->value ); } return( NULL ); } static void freeSymbols( void ) { symbol * sym = symbols; while( sym ) { symbol * victim = sym; sym = sym->next; free( victim->name ); free( victim->value ); free( victim ); } symbols = NULL; } /* * Local Variables: * tab-width: 4 * c-indent-level: 4 * c-basic-offset: 4 * End: */
- Previous message: [Slony1-commit] slony1-engine/src/slon remote_worker.c
- Next message: [Slony1-commit] slony1-engine/doc/adminguide logshipping.sgml
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list