CVS User Account cvsuser
Mon Nov 29 21:37:50 PST 2004
Log Message:
-----------
Added in some code to process syncs; still very rudimentary

Modified Files:
--------------
    slony1-engine/src/slonspool:
        slonspool (r1.1 -> r1.2)

-------------- next part --------------
Index: slonspool
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonspool/slonspool,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slonspool/slonspool -Lsrc/slonspool/slonspool -u -w -r1.1 -r1.2
--- src/slonspool/slonspool
+++ src/slonspool/slonspool
@@ -2,7 +2,7 @@
 
 use Pg;
 use Getopt::Long;
-my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $node);
+my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $subnode, $node);
 my @SETS;
 my $dbh;
 process_options();
@@ -50,8 +50,6 @@
 		    and PGC.relnamespace = PGN.oid 
 		    order by tab_id; 
 		   };
-    
-    
   }
   $sth=$dbh->exec($tquery);
   while (@row=$sth->fetchrow) {
@@ -62,7 +60,7 @@
     my $line = "*" x 16384;
     $ret = $dbh->getline($line, 16384);
     while ($line ne "\\.") {
-      print SUBSCRIBE, line, "\n";
+      print SUBSCRIBE line, "\n";
       $ret = $dbh->getline($line, 16384);
     }
     print SUBSCRIBE "\.\n";
@@ -80,7 +78,6 @@
     my ($nsp, $seqname) = @row;
     `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
   }
-  
   # Next, populate Sync information
   # Use the last SYNC's snapshot information and set
   # the action sequence list to all actions after
@@ -103,7 +100,100 @@
 }
 
 sub listen_to_node {
+  while (1) {
+    process_event();
+    sleep 2;
+  }
+}
   
+sub process_event {
+  my $dsn = "dbname=$database host=$host port=$port user=$user";
+  if ($password) {
+    $dsn .= " password=$password";
+  }
+  my $dbh = Pg::connectdb($dsn);
+  print "Result:",  $dbh->status, " OK=", PGRES_CONNECTION_OK, "\n";
+  my $sync_event;
+  my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm
+                    where con_origin = $node order by con_seqno desc limit 1;};
+  print $last_seq, "\n";
+  my $res = $dbh->exec($last_seq);
+  while (my @row = $res->fetchrow) {
+    ($sync_event) = @row;
+    print "Last sync: $sync_event\n";
+  }
+  $sync_event++;
+  print "Next sync: $sync_event\n";
+
+  my $get_tables = qq{ select tab_id from "_$cluster".sl_table where tab_set in ($sets); };
+  my $origin_qualification = " (log_origin = $sub_node) ";
+  my $table_qualification = " ( log_tableid in (";
+  my $res = $dbh->exec($get_tables);
+  my @TABLES;
+  while (my @row=$sth->fetchrow) {
+    my ($table_id) = @row;
+    push @TABLES, $table_id;
+  }
+  $table_qualification .= join(',', @TABLES);
+  $table_qualification .= "))";
+  my $qualification .= " $origin_qualification and $table_qualification ";
+  my $get_event_info = qq{select ev_minxid, ev_maxxid, ev_xip from "_$cluster".sl_event where ev_seqno = $sync_event;};
+  my $res = $dbh->exec($get_event_info);
+  while (my @row=$res->fetchrow) {
+    my ($minxid, $maxxid, $ev_zip) = @row;
+    if ($ev_zip) {
+      $ev_zip = s/'//g;  # Strip off unnecessary quotes
+      $qualification .= "and ($log_xid < '$maxxid' and \"_$cluster\".xxid_lt_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
+      $qualification .= "and ($log_xid >= '$minxid' and \"_$cluster\".xxid_ge_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
+    } else {
+      $qualification .= "and ($log_xid < '$maxxid') ";
+      $qualification .= "and ($log_xid >= '$minxid') ";
+    }
+  }
+
+  my $tables_query = qq{t.tab_id, t.tab_reloid, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace;};
+  $res = $dbh->exec($tables_query);
+  while (my @row = $res->fetchrow) {
+    my ($id, $oid, $namespace, $tname) = @row;
+    $TABLENAME[$i] = qq{"$namespace".$tname};
+  }
+
+  my $cursor_query = qq{
+  declare LOG cursor for
+    select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata
+    from "_$cluster".sl_log_1
+    where $qualification
+    order by log_actionseq;};
+
+
+  my $syncname=sprintf("log-%8d", $sync_event);
+  open(LOGOUTPUT, ">$spoolpath/$syncname");
+  print LOGOUTPUT "-- Data for sync $sync_event\n";
+  print LOGOUTPUT "-- ", `date`;
+  print LOGOUTPUT "BEGIN;\n";
+  my $begin = $dbh->exec("begin;");
+  my $cursorexec = $dbh->exec($cursor_query);
+  my $foundsome = "YES";
+  while ($foundsome eq "YES") {
+    $foundsome = "NO";
+    my $res = $dbh->exec("fetch forward 100 in LOG;");
+    while (my @row = $res->fetchrow) {
+      $foundsome = "YES";
+      my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row;
+      if ($cmddata eq "I") {
+	printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata;
+      } elsif ($cmddata eq "U") {
+	printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata;
+      } elsif ($cmddata eq "D") {
+	printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata;
+      }
+    }
+  }
+  close LOGOUTPUT;
+  my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp)
+                         values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); };
+  print "Confirm: $confirmation\n";
+  my $cursorexec = $dbh->exec($confirmation);
 }
 
 sub connect_to_node {
@@ -119,13 +209,14 @@
   $goodopts = GetOptions("help", "database=s", "host=s", "user=s", 
 			 "cluster=s", "password=s", "port=s", "sets=s",
 			 "spoolpath=s", "spoolname=s", "pgbins=s", 
-			 "maxsize=i", "maxage=i", "node=i");
+			 "maxsize=i", "maxage=i", "node=i", "subnode=i");
 
   if (defined ($opt_help)) {
     show_usage();
   }
 
   $cluster=$opt_cluster if (defined($opt_cluster));
+  $subnode = $opt_subnode if (defined ($opt_subnode));
   $node = $opt_node if (defined($opt_node));
   $database=$opt_database if (defined ($opt_database));
   $user = $opt_user if (defined ($opt_user));
@@ -154,6 +245,7 @@
   print qq{slonspool:
      --help                get help
      --cluster=s           Slony-I cluster name
+     --subnode=s   Node number subscribed through
      --node=i              Node number to use to request 
      --pgbins=s            Location of PostgreSQL binaries including slonik and pg_dump
      --database=s          database to connect to


More information about the Slony1-commit mailing list