Mon Nov 29 21:37:50 PST 2004
- Previous message: [Slony1-commit] By darcyb: Make it possible to easly obtain the value of a conf option
- Next message: [Slony1-commit] By cbbrowne: The ongoing effort to understand log spooling...
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Added in some code to process syncs; still very rudimentary
Modified Files:
--------------
slony1-engine/src/slonspool:
slonspool (r1.1 -> r1.2)
-------------- next part --------------
Index: slonspool
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonspool/slonspool,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slonspool/slonspool -Lsrc/slonspool/slonspool -u -w -r1.1 -r1.2
--- src/slonspool/slonspool
+++ src/slonspool/slonspool
@@ -2,7 +2,7 @@
use Pg;
use Getopt::Long;
-my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $node);
+my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $subnode, $node);
my @SETS;
my $dbh;
process_options();
@@ -50,8 +50,6 @@
and PGC.relnamespace = PGN.oid
order by tab_id;
};
-
-
}
$sth=$dbh->exec($tquery);
while (@row=$sth->fetchrow) {
@@ -62,7 +60,7 @@
my $line = "*" x 16384;
$ret = $dbh->getline($line, 16384);
while ($line ne "\\.") {
- print SUBSCRIBE, line, "\n";
+ print SUBSCRIBE line, "\n";
$ret = $dbh->getline($line, 16384);
}
print SUBSCRIBE "\.\n";
@@ -80,7 +78,6 @@
my ($nsp, $seqname) = @row;
`$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
}
-
# Next, populate Sync information
# Use the last SYNC's snapshot information and set
# the action sequence list to all actions after
@@ -103,7 +100,100 @@
}
sub listen_to_node {
+ while (1) {
+ process_event();
+ sleep 2;
+ }
+}
+sub process_event {
+ my $dsn = "dbname=$database host=$host port=$port user=$user";
+ if ($password) {
+ $dsn .= " password=$password";
+ }
+ my $dbh = Pg::connectdb($dsn);
+ print "Result:", $dbh->status, " OK=", PGRES_CONNECTION_OK, "\n";
+ my $sync_event;
+ my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm
+ where con_origin = $node order by con_seqno desc limit 1;};
+ print $last_seq, "\n";
+ my $res = $dbh->exec($last_seq);
+ while (my @row = $res->fetchrow) {
+ ($sync_event) = @row;
+ print "Last sync: $sync_event\n";
+ }
+ $sync_event++;
+ print "Next sync: $sync_event\n";
+
+ my $get_tables = qq{ select tab_id from "_$cluster".sl_table where tab_set in ($sets); };
+ my $origin_qualification = " (log_origin = $sub_node) ";
+ my $table_qualification = " ( log_tableid in (";
+ my $res = $dbh->exec($get_tables);
+ my @TABLES;
+ while (my @row=$sth->fetchrow) {
+ my ($table_id) = @row;
+ push @TABLES, $table_id;
+ }
+ $table_qualification .= join(',', @TABLES);
+ $table_qualification .= "))";
+ my $qualification .= " $origin_qualification and $table_qualification ";
+ my $get_event_info = qq{select ev_minxid, ev_maxxid, ev_xip from "_$cluster".sl_event where ev_seqno = $sync_event;};
+ my $res = $dbh->exec($get_event_info);
+ while (my @row=$res->fetchrow) {
+ my ($minxid, $maxxid, $ev_zip) = @row;
+ if ($ev_zip) {
+ $ev_zip = s/'//g; # Strip off unnecessary quotes
+ $qualification .= "and ($log_xid < '$maxxid' and \"_$cluster\".xxid_lt_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
+ $qualification .= "and ($log_xid >= '$minxid' and \"_$cluster\".xxid_ge_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
+ } else {
+ $qualification .= "and ($log_xid < '$maxxid') ";
+ $qualification .= "and ($log_xid >= '$minxid') ";
+ }
+ }
+
+ my $tables_query = qq{t.tab_id, t.tab_reloid, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace;};
+ $res = $dbh->exec($tables_query);
+ while (my @row = $res->fetchrow) {
+ my ($id, $oid, $namespace, $tname) = @row;
+ $TABLENAME[$i] = qq{"$namespace".$tname};
+ }
+
+ my $cursor_query = qq{
+ declare LOG cursor for
+ select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata
+ from "_$cluster".sl_log_1
+ where $qualification
+ order by log_actionseq;};
+
+
+ my $syncname=sprintf("log-%8d", $sync_event);
+ open(LOGOUTPUT, ">$spoolpath/$syncname");
+ print LOGOUTPUT "-- Data for sync $sync_event\n";
+ print LOGOUTPUT "-- ", `date`;
+ print LOGOUTPUT "BEGIN;\n";
+ my $begin = $dbh->exec("begin;");
+ my $cursorexec = $dbh->exec($cursor_query);
+ my $foundsome = "YES";
+ while ($foundsome eq "YES") {
+ $foundsome = "NO";
+ my $res = $dbh->exec("fetch forward 100 in LOG;");
+ while (my @row = $res->fetchrow) {
+ $foundsome = "YES";
+ my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row;
+ if ($cmddata eq "I") {
+ printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata;
+ } elsif ($cmddata eq "U") {
+ printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata;
+ } elsif ($cmddata eq "D") {
+ printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata;
+ }
+ }
+ }
+ close LOGOUTPUT;
+ my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp)
+ values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); };
+ print "Confirm: $confirmation\n";
+ my $cursorexec = $dbh->exec($confirmation);
}
sub connect_to_node {
@@ -119,13 +209,14 @@
$goodopts = GetOptions("help", "database=s", "host=s", "user=s",
"cluster=s", "password=s", "port=s", "sets=s",
"spoolpath=s", "spoolname=s", "pgbins=s",
- "maxsize=i", "maxage=i", "node=i");
+ "maxsize=i", "maxage=i", "node=i", "subnode=i");
if (defined ($opt_help)) {
show_usage();
}
$cluster=$opt_cluster if (defined($opt_cluster));
+ $subnode = $opt_subnode if (defined ($opt_subnode));
$node = $opt_node if (defined($opt_node));
$database=$opt_database if (defined ($opt_database));
$user = $opt_user if (defined ($opt_user));
@@ -154,6 +245,7 @@
print qq{slonspool:
--help get help
--cluster=s Slony-I cluster name
+ --subnode=s Node number subscribed through
--node=i Node number to use to request
--pgbins=s Location of PostgreSQL binaries including slonik and pg_dump
--database=s database to connect to
- Previous message: [Slony1-commit] By darcyb: Make it possible to easly obtain the value of a conf option
- Next message: [Slony1-commit] By cbbrowne: The ongoing effort to understand log spooling...
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list