# -*- perl -*-
# RServ.pm
# Vadim Mikheev, (c) 2000, PostgreSQL Inc.

package RServ;

require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
	Rollback RollbackAndQuit Connect Exec Exec2 MkInfo ExecDebug
	$debug $quiet $verbose
	);
@EXPORT_OK = qw();
use strict;
use Pg;

my $debug = 0;
my $quiet = 1;
my $verbose = 0;

$debug = 1;
$quiet = 0;
$verbose = 1;

my %Mtables = ();
my %Stables = ();

sub GetServerId
{
    my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);

    print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);

    my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
			  " host='$Host' AND dbase='$DB'");

    if ($result->resultStatus ne PGRES_TUPLES_OK)
    {
        print STDERR $mconn->errorMessage unless ($quiet);
        return(-1);
    }
    
    if ($result->cmdTuples && $result->cmdTuples > 1)
    {
        printf STDERR "Duplicate host definitions.\n" unless ($quiet);
        return(-2);
    }

    my @row = $result->fetchrow;

    print STDERR "GetServerId($DB,$Host) == $row[0]\n" if ($debug);

    return $row[0];
}

sub PrepareSnapshot
{
    my ($mconn, $sconn, $outf, $mserver, $sserver, $multimaster, $onlytables) = @_;

    if ($mserver == $sserver) {
	print STDERR "master and slave numbers are same [$mserver] !\n";
	return(-1);
    }

    print STDERR "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);

    # dump master server ID into snapshot file (to prevent replication
    # of colums from master back to slave)
    print $outf "-- SERVER $mserver\n";

    # first, we must know for wich tables the slave subscribed
    my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
    return (-1) if ($result == -1);

    my @row;
    while (@row = $result->fetchrow) {
	$Stables{$row[0]} = 1;
    }
    
    print STDERR "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);

    Exec($mconn,"BEGIN");
    Exec($mconn,"set transaction isolation level serializable");
    
    # MAP oid --> tabname, keyname, key_type
    my $sql = qq{
    	select pgc.oid, pgc.relname, pga.attname, pgt.typname
		from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
			pg_type pgt
		where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
			AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
    };
    $result = Exec($mconn,$sql);

    while (@row = $result->fetchrow)
    {
		printf "$row[0], $row[1], $row[2]\n" if ($debug);
 		if (ref($onlytables) eq 'HASH') {
 			next unless (exists $onlytables->{$row[1]});
 			$onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
 		}
	push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
    }

    print STDERR "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
    if (! %Mtables) {
    	print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
	RollbackAndQuit($mconn);
    }

    # Read last succeeded sync
    $sql = qq{
    	select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
	where server = $sserver AND syncid =
		(select max(syncid) from _RSERV_SYNC_
			where server = $sserver AND status > 0)
    };
    
    $result = Exec($mconn,$sql);
 
    my @lastsync = $result->fetchrow;
    print STDERR "lastsync: ",join(",",@lastsync),"\n" if ($debug);

    # exclude data which originated from master server
    my $sel_server = " and l.server = $mserver ";

    my $sinfo = "";
    if (@lastsync && $lastsync[3] ne '')	# sync info
    {
	$sinfo = "and (l.logid >= $lastsync[3]";
	$sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
	$sinfo .= ")";
    }

    my $havedeal = 0;
    
    # DELETED rows
    $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
	" where l.delete = 1 $sinfo $sel_server order by l.reloid";
    
    printf "DELETED: $sql\n" if $debug;
    
    $result = $mconn->exec($sql);
    if ($result->resultStatus ne PGRES_TUPLES_OK) {
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }
    
    my $lastoid = -1;
    while (@row = $result->fetchrow) {
	next unless exists $Mtables{$row[0]};
	next unless exists $Stables{$Mtables{$row[0]}[0]};

	if ($lastoid != $row[0]) {
	    if ($lastoid == -1) {
		my $syncid = GetSYNCID($mconn, $outf);
		return($syncid) if $syncid < 0;
		$havedeal = 1;
	    } else {
		printf $outf "\\.\n";
	    }
	    printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
	    $lastoid = $row[0];
	}
	if (! defined $row[1]) {
	    print STDERR "NULL key\n" unless ($quiet);
	    $mconn->exec("ROLLBACK");
	    return(-2);
	}
	printf $outf "%s\n", OutputValue($row[1]);
    }
    printf $outf "\\.\n" if ($lastoid != -1);
    
    # UPDATED rows
    
    my ($taboid, $tabname, $tabkey);
    foreach $taboid (keys %Mtables)
    {
	my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
	next unless exists $Stables{$tabname};

	my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';

	$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
	  "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
	  " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
	  $sel_server;
	
	printf "UPDATED: $sql\n" if $debug;
	
	$result = $mconn->exec($sql);
	if ($result->resultStatus ne PGRES_TUPLES_OK)
	{
	    printf $outf "-- ERROR\n" if $havedeal;
	    print STDERR $mconn->errorMessage unless ($quiet);
	    $mconn->exec("ROLLBACK");
	    return(-1);
	}
	next if $result->ntuples <= 0;
	if (! $havedeal)
	{
	    my $syncid = GetSYNCID($mconn, $outf);
	    return($syncid) if $syncid < 0;
	    $havedeal = 1;
	}
	printf $outf "-- UPDATE $tabname\n";
	printf "-- UPDATE $tabname\n" if $debug;
	while (@row = $result->fetchrow)
	{
	    for (my $i = 0; $i <= $#row; $i++)
	    {
		printf $outf "	" if $i;
		printf "	" if $i && $debug;
		printf $outf "%s", OutputValue($row[$i]);
		printf "%s", OutputValue($row[$i]) if $debug;;
	    }
	    printf $outf "\n";
	    printf "\n" if $debug;
	}
	printf $outf "\\.\n";
	printf "\\.\n" if $debug;;
    }

    # INSERTED rows

    foreach $taboid (keys %Mtables)
    {
	my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
	next unless exists $Stables{$tabname};

	my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';

	$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
	  "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
	  " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
	  $sel_server;
	
	printf "INSERTED: $sql\n" if $debug;
	
	$result = $mconn->exec($sql);
	if ($result->resultStatus ne PGRES_TUPLES_OK)
	{
	    printf $outf "-- ERROR\n" if $havedeal;
	    print STDERR $mconn->errorMessage unless ($quiet);
	    $mconn->exec("ROLLBACK");
	    return(-1);
	}
	next if $result->ntuples <= 0;
	if (! $havedeal)
	{
	    my $syncid = GetSYNCID($mconn, $outf);
	    return($syncid) if $syncid < 0;
	    $havedeal = 1;
	}
	printf $outf "-- INSERT $tabname\n";
	printf "-- INSERT $tabname\n" if $debug;
	while (@row = $result->fetchrow)
	{
	    for (my $i = 0; $i <= $#row; $i++)
	    {
		printf $outf "	" if $i;
		printf "	" if $i && $debug;
		printf $outf "%s", OutputValue($row[$i]);
		printf "%s", OutputValue($row[$i]) if $debug;
	    }
	    printf $outf "\n";
	    printf "\n" if $debug;
	}
	printf $outf "\\.\n";
	printf "\\.\n" if $debug;;
    }
    
    
    unless ($havedeal)
    {
    	print STDERR "hon't have deal, rollback...\n" if ($debug);
	$mconn->exec("ROLLBACK");
	return(0);
    }
    
    # Remember this snapshot info
    $result = $mconn->exec("select _rserv_sync_($sserver)");
    if ($result->resultStatus ne PGRES_TUPLES_OK)
    {
	printf $outf "-- ERROR\n";
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }
    
    $result = $mconn->exec("COMMIT");
    if ($result->resultStatus ne PGRES_COMMAND_OK)
    {
	printf $outf "-- ERROR\n";
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }

    printf $outf "-- OK\n";
    printf "-- OK\n" if $debug;
    
    return(1);
    
}

sub OutputValue
{
	my ($val) = @_; # @_[0];

	return("\\N") unless defined $val;

	$val =~ s/\\/\\\\/g;
	$val =~ s/	/\\011/g;
	$val =~ s/\n/\\012/g;
	$val =~ s/\'/\\047/g;

	return($val);
}

# Get syncid for new snapshot
sub GetSYNCID
{
    my ($conn, $outf) = @_; # (@_[0], @_[1]);
    
    my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
    if ($result->resultStatus ne PGRES_TUPLES_OK)
    {
	print STDERR $conn->errorMessage unless ($quiet);
	$conn->exec("ROLLBACK");
	return(-1);
    }
 
    my @row = $result->fetchrow;
    
    printf $outf "-- SYNCID $row[0]\n";
    printf "-- SYNCID $row[0]\n" if $debug;
    return($row[0]);
}


sub CleanLog
{
    my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
    
    my $result = $conn->exec("BEGIN");
    if ($result->resultStatus ne PGRES_COMMAND_OK)
    {
	print STDERR $conn->errorMessage unless ($quiet);
	$conn->exec("ROLLBACK");
	return(-1);
    }
    
    my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
	" where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
	" where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
    
    printf "$sql\n" if $debug;
    
    $result = $conn->exec($sql);
    if ($result->resultStatus ne PGRES_TUPLES_OK)
    {
	print STDERR $conn->errorMessage unless ($quiet);
	return(-1);
    }
    my $maxid = '';
    my %active = ();
    while (my @row = $result->fetchrow)
    {
	$maxid = $row[0] if $maxid eq '';
	last if $row[0] > $maxid;
	my @ids = split(/[ 	]+,[ 	]+/, $row[1]);
	foreach my $aid (@ids)
	{
	    $active{$aid} = 1 unless exists $active{$aid};
	}
    }
    if ($maxid eq '')
    {
	print STDERR "No Sync IDs\n" unless ($quiet);
	return(0);
    }
    my $alist = join(',', keys %active);
    my $sinfo = "logid < $maxid";
    $sinfo .= " AND logid not in ($alist)" if $alist ne '';
    #if (ref($onlytables) eq 'HASH') {
    #	foreach my $onlytable (keys %{$onlytables}) {
    #		$sinfo
    #	}
    #}
    $sql = "delete from _RSERV_LOG_ where " . 
	"logtime < now() - '$howold second'::interval AND $sinfo";
    
    printf "$sql\n" if $debug;
    
    $result = $conn->exec($sql);
    if ($result->resultStatus ne PGRES_COMMAND_OK)
    {
	print STDERR $conn->errorMessage unless ($quiet);
	$conn->exec("ROLLBACK");
	return(-1);
    }
    $maxid = $result->cmdTuples;
    
    $result = $conn->exec("COMMIT");
    if ($result->resultStatus ne PGRES_COMMAND_OK)
    {
	print STDERR $conn->errorMessage unless ($quiet);
	$conn->exec("ROLLBACK");
	return(-1);
    }
    
    return($maxid);
}

sub ApplySnapshot
{
    my ($sconn, $inpf, $multimaster, $onlytables) = @_; # (@_[0], @_[1]);

    my $serverId;

    my $result = $sconn->exec("BEGIN");
    if ($result->resultStatus ne PGRES_COMMAND_OK) {
	print STDERR $sconn->errorMessage unless ($quiet);
	$sconn->exec("ROLLBACK");
	return(-1);
    }
    
    $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
    if ($result->resultStatus ne PGRES_COMMAND_OK) {
	print STDERR $sconn->errorMessage unless ($quiet);
	$sconn->exec("ROLLBACK");
	return(-1);
    }
    
    # MAP name --> oid, keyname, keynum
    my $sql = qq{
    	select pgc.oid, pgc.relname, pga.attname, rt.key
	from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
	where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
		AND pga.attnum = rt.key
    };
    
    $result = $sconn->exec($sql);
    if ($result->resultStatus ne PGRES_TUPLES_OK) {
	print STDERR $sconn->errorMessage unless ($quiet);
	$sconn->exec("ROLLBACK");
	return(-1);
    }
    %Stables = ();
    while (my @row = $result->fetchrow) {
	#	printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
 		if (ref($onlytables) eq 'HASH') {
 			next unless (exists $onlytables->{$row[1]});
 			$onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
 		}
	push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
    }

    print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);

    my $ok = 0;
    my $syncid = -1;
    while(<$inpf>) {
	$_ =~ s/\n//;
	my ($cmt, $cmd, $prm) = split (/[ 	]+/, $_, 3);
	die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
	if ($cmt ne '--') {
	    printf STDERR "Invalid format\n" unless ($quiet);
	    $sconn->exec("ROLLBACK");
	    return(-2);
	}
	if ($cmd eq 'DELETE') {
	    if ($syncid == -1) {
		printf STDERR "Sync ID unspecified\n" unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-2);
	    }
	    $result = DoDelete($sconn, $inpf, $prm); 
	    if ($result) {
		$sconn->exec("ROLLBACK");
		return($result);
	    }
	} elsif ($cmd eq 'INSERT') {
	    if ($syncid == -1) {
		printf STDERR "Sync ID unspecified\n" unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-2);
	    }
	    $result = DoInsert($sconn, $inpf, $prm);
	    if ($result) {
		$sconn->exec("ROLLBACK");
		return($result);
	    }
	} elsif ($cmd eq 'UPDATE') {
	    if ($syncid == -1) {
		printf STDERR "Sync ID unspecified\n" unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-2);
	    }
	    $result = DoUpdate($sconn, $inpf, $prm);
	    if ($result) {
		$sconn->exec("ROLLBACK");
		return($result);
	    }
	} elsif ($cmd eq 'SYNCID') {
	    if ($syncid != -1) {
		printf STDERR "Second Sync ID ?!\n" unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-2);
	    }
	    if ($prm !~ /^\d+$/) {
		printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-2);
	    }
	    $syncid = $prm;
	    
	    printf STDERR "Sync ID $syncid\n" unless ($quiet);
	    
	    $result = $sconn->exec(qq{
	    	select syncid, synctime
		from _RSERV_SLAVE_SYNC_
		where syncid = 
			(select max(syncid) from _RSERV_SLAVE_SYNC_)
	    });
	    if ($result->resultStatus ne PGRES_TUPLES_OK) {
		print STDERR "can't get current syncid from _rserv_slave_sync_: ",$sconn->errorMessage unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-1);
	    }

	    my @row = $result->fetchrow;
	    print STDERR "Slave Sync ID ",($row[0] || "null"),"\n" if ($debug);
	    if (! defined $row[0]) {
		$result = Exec($sconn,qq{
			insert into _RSERV_SLAVE_SYNC_ (syncid, synctime)
			values ($syncid, now())
		});
	    } elsif ($row[0] >= $prm) {
		printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(0);
	    } else {
		$result = Exec($sconn,qq{
			update _RSERV_SLAVE_SYNC_ set syncid = $syncid, synctime = now()
		});
	    }
	    if ($result->resultStatus ne PGRES_COMMAND_OK) {
		print STDERR $sconn->errorMessage unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-1);
	    }
	} elsif ($cmd eq 'OK') {
	    $ok = 1;
	    if ($multimaster) {
	    # now, update server in _rserv_log_ based on transaction xid
		ExecFatch($sconn,"select count(*) from _rserv_log_");
		ExecDebug($sconn,"select * from _rserv_log_");
		my $keys_sql = qq{
			update _rserv_log_ set server=$serverId
			where logid = (select _rserv_xid_())
		};
				
		Exec($sconn,$keys_sql);
	    }
	    last;
	} elsif ($cmd eq 'ERROR') {
	    printf STDERR "ERROR signaled\n" unless ($quiet);
	    $sconn->exec("ROLLBACK");
	    return(-2);
	} elsif ($cmd eq 'SERVER') {
	    if ($prm !~ /^\d+$/) {
		printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
		$sconn->exec("ROLLBACK");
		return(-2);
	    }
	    $serverId = $prm;
	    print STDERR "Server ID $serverId\n" unless ($quiet);
	} else {
	    printf STDERR "Unknown command $cmd\n" unless ($quiet);
	    $sconn->exec("ROLLBACK");
	    return(-2);
	}
    }
    
    if (! $ok) {
	printf STDERR "No OK flag in input\n" unless ($quiet);
	$sconn->exec("ROLLBACK");
	return(-2);
    }
    
    $result = $sconn->exec("COMMIT");
    if ($result->resultStatus ne PGRES_COMMAND_OK) {
	print STDERR $sconn->errorMessage unless ($quiet);
	$sconn->exec("ROLLBACK");
	return(-1);
    }
    
    return(1);
}

sub DoDelete
{
    my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);

    # only delete tables that the slave wants
    if (! defined($Stables{$tabname})) {
	print STDERR "Not configured to delete rows from table $tabname\n" unless $quiet;
	while (<$inpf>) {
	    my $istring = $_;
	    $istring =~ s/\n//;
	    last if ($istring eq '\.');
	}
	return(0);
    }

    my $ok = 0;
    while(<$inpf>)
    {
	if ($_ !~ /\n$/)
	{
	    printf STDERR "Invalid format\n" unless ($quiet);
	    return(-2);
	}
	my $key = $_;
	$key =~ s/\n//;
	if ($key eq '\.')
	{
	    $ok = 1;
	    last;
	}
	
	my $sql = "delete from \"$tabname\" where ".
	    "\"$Stables{$tabname}->[1]\" = '$key'";
	
	printf "$sql\n" if $debug;
	
	my $result = $sconn->exec($sql);
	if ($result->resultStatus ne PGRES_COMMAND_OK)
	{
	    print STDERR $sconn->errorMessage unless ($quiet);
	    return(-1);
	}
    }
    
    if (! $ok)
    {
	printf STDERR "No end of input in DELETE section\n" unless ($quiet);
	return(-2);
    }
    
    return(0);
}


sub DoUpdate
{
    my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);

    # only update the tables that the slave wants
    if (! defined($Stables{$tabname})) {
	print STDERR "Not configured to update rows from table $tabname\n" unless $quiet;
	while (<$inpf>) {
	    my $istring = $_;
	    $istring =~ s/\n//;
	    last if ($istring eq '\.');
	}
	return(0);
    }

    my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
    
    my @CopyBuf = ();
    my $CBufLen = 0;
    my $CBufMax = 16 * 1024 * 1024;	# max size of buf for copy

    my $sql = "select attnum, attname from pg_attribute" .
	" where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
    
    my $result = $sconn->exec($sql);
    if ($result->resultStatus ne PGRES_TUPLES_OK)
    {
	print STDERR $sconn->errorMessage unless ($quiet);
	return(-1);
    }
    
    my @anames = ();
    while (my @row = $result->fetchrow) {
	$anames[$row[0]] = $row[1];
    }
    
    my $istring;
    my $ok = 0;
    while(<$inpf>) {
	if ($_ !~ /\n$/) {
	    printf STDERR "Invalid format\n" unless ($quiet);
	    return(-2);
	}
	$istring = $_;
	$istring =~ s/\n//;
	if ($istring eq '\.') {
	    $ok = 1;
	    last;
	}
	my @vals = split(/	/, $istring);
	if ($oidkey) {
	    if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) {
		printf STDERR "Invalid OID\n" unless ($quiet);
		return(-2);
	    }
	    $oidkey = $vals[0];
	} else {
	    unshift @vals, '';
	}
	
	$sql = "update \"$tabname\" set ";
	my $ocnt = 0;
	for (my $i = 1; $i <= $#anames; $i++) {
	    if ($vals[$i] eq '\N') {
		if ($i == $Stables{$tabname}->[2]) {
		    printf STDERR "NULL key\n" unless ($quiet);
		    return(-2);
		}
		$vals[$i] = 'null';
	    } else {
		$vals[$i] = "'" . $vals[$i] . "'";
		next if $i == $Stables{$tabname}->[2];
	    }
	    $ocnt++;
	    $sql .= ', ' if $ocnt > 1;
	    $sql .= "\"$anames[$i]\" = $vals[$i]";
	} if ($oidkey) {
	    $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
	} else {
	    $sql .= " where \"$Stables{$tabname}->[1]\" = ".
		$vals[$Stables{$tabname}->[2]];
	}
	
	printf "$sql\n" if $debug;
	
	$result = $sconn->exec($sql);
	
	if ($result->resultStatus ne PGRES_COMMAND_OK) {
	    print STDERR $sconn->errorMessage unless ($quiet);
	    return(-1);
	}
	next if $result->cmdTuples == 1;	# updated
	
	if ($result->cmdTuples > 1) {
	    printf STDERR "Duplicate keys\n" unless ($quiet);
	    return(-2);
	} 

	# no key - copy
	push @CopyBuf, "$istring\n";
	$CBufLen += length($istring);
	
	if ($CBufLen >= $CBufMax) {
	    $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
	    return($result) if $result;
	    @CopyBuf = ();
	    $CBufLen = 0;
	}
    }
    
    if (! $ok) {
	printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
	return(-2);
    }
    
    if ($CBufLen) {
	print STDERR "@CopyBuf\n" if $debug;
	$result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
	return($result) if $result;
    }

    return(0);
}

sub DoInsert
{
    my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);

    # only insert rows into tables that the slave wants
    if (! defined($Stables{$tabname})) {
	print STDERR "Not configured to insert rows from table $tabname\n" unless $quiet;
	while (<$inpf>) {
	    my $istring = $_;
	    $istring =~ s/\n//;
	    last if ($istring eq '\.');
	}
	return(0);
    }

    my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
    
    my @CopyBuf = ();
    my $CBufLen = 0;
    my $CBufMax = 16 * 1024 * 1024;	# max size of buf for copy
    
    my $istring;
    my $ok = 0;
    while(<$inpf>) {
	if ($_ !~ /\n$/) {
	    printf STDERR "Invalid format\n" unless ($quiet);
	    return(-2);
	}
	$istring = $_;
	$istring =~ s/\n//;
	if ($istring eq '\.') {
	    $ok = 1;
	    last;
	}

	# no key - copy
	push @CopyBuf, "$istring\n";
	$CBufLen += length($istring);
	
	if ($CBufLen >= $CBufMax) {
	    my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
	    return($result) if $result;
	    @CopyBuf = ();
	    $CBufLen = 0;
	}
    }
    
    if (! $ok) {
	printf STDERR "No end of input in INSERT section\n" unless ($quiet);
	return(-2);
    }
    
    if ($CBufLen) {
	print STDERR "@CopyBuf\n" if $debug;
	my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
	return($result) if $result;
    }
    
    return(0);
}


sub DoCopy {
    my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
    
    my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') . 
	"FROM STDIN";
    my $result = $sconn->exec($sql);
    if ($result->resultStatus ne PGRES_COPY_IN) {
	print STDERR $sconn->errorMessage unless ($quiet);
	return(-1);
    }
    
    foreach my $str (@{$CBuf}) {
	$sconn->putline($str);
    }
    
    $sconn->putline("\\.\n");
    
    if ($sconn->endcopy) {
	print STDERR $sconn->errorMessage unless ($quiet);
	return(-1);
    }
    
    return(0);
}


#
# Returns last SyncID applied on Slave
#
sub GetSyncID {
    my ($sconn) = @_; # (@_[0]);
    
    my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
    if ($result->resultStatus ne PGRES_TUPLES_OK) {
	print STDERR $sconn->errorMessage unless ($quiet);
	return(-1);
    }
    my @row = $result->fetchrow;
    print STDERR "GetSyncID: ",($row[0] || 'null'),"\n" if ($debug);
    return(undef) unless defined $row[0];	# null
    return($row[0]);
}

#
# Updates _RSERV_SYNC_ on Master with Slave SyncID
#
sub SyncSyncID {
    my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
    
    my $result = $mconn->exec("BEGIN");
    if ($result->resultStatus ne PGRES_COMMAND_OK) {
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }
    
    $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
			  " where server = $sserver AND syncid = $syncid" .
			  " for update");
    if ($result->resultStatus ne PGRES_TUPLES_OK) {
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }
    my @row = $result->fetchrow;
    if (! defined $row[0]) {
	printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(0);
    }
    if ($row[1] > 0) {
	printf STDERR "SyncID $syncid for server ".
	    "$sserver already updated\n" unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(0);
    }
    $result = $mconn->exec("update _RSERV_SYNC_" .
			  " set synctime = now(), status = 1" .
			  " where server = $sserver AND syncid = $syncid");
    if ($result->resultStatus ne PGRES_COMMAND_OK) {
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }
    $result = $mconn->exec("delete from _RSERV_SYNC_" .
			  " where server = $sserver AND syncid < $syncid");
    if ($result->resultStatus ne PGRES_COMMAND_OK) {
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }
    
    $result = $mconn->exec("COMMIT");
    if ($result->resultStatus ne PGRES_COMMAND_OK) {
	print STDERR $mconn->errorMessage unless ($quiet);
	$mconn->exec("ROLLBACK");
	return(-1);
    }
    
    return(1);
}

# stuff moved from perl scripts for better re-use

sub Rollback {
    my $conn = shift @_;

    print STDERR $conn->errorMessage unless ($quiet);
    $conn->exec("ROLLBACK");
}

sub RollbackAndQuit {
    my $conn = shift @_;

    Rollback($conn);
    exit (-1);
}

sub Connect {
	my $info = shift @_;

	print("Connecting to $info\n") if ($debug || $verbose);
	my $conn = Pg::connectdb($info);
	if ($conn->status != PGRES_CONNECTION_OK) {
	    die "Failed opening $info";
	}
	return $conn;
}

sub Exec {
	my $conn = shift || die "Exec needs connection!";
	my $sql = shift || die "Exec needs SQL statement!";
	# used to return error code if no tuples are retured
	my $return_code = shift;

	if ($debug) {
		# re-format SQL in one line (for nicer output)
		$sql =~ s/[\s\n\r]+/ /gs;
		print STDERR "Exec: $sql\n";
	}
	my $result = $conn->exec($sql);
	if ($result->resultStatus eq PGRES_COMMAND_OK) {
		return $result;
	} elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
		print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
		return $result;
	} else {
		if (defined($return_code)) {
			print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
			return($return_code);
		} else {
			RollbackAndQuit($conn)
		}
	}
}

sub Exec2 {
	my $mconn = shift @_;
	my $sconn = shift @_;
	my $sql = shift @_;

	my $result = $mconn->exec($sql);
	RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
	$result = $sconn->exec($sql);
	RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
	# XXX TODO: return results?!
}

# exec sql query and return one row from it
sub ExecFatch {
	my $conn = shift || die "ExecFatch need conn!";
	my $sql = shift || die "ExecFatch need SQL!";

	if ($debug) {
		# re-format SQL in one line (for nicer output)
		$sql =~ s/[\s\n\r]+/ /gs;
		print STDERR "Exec: $sql\n";
	}

	my $result = $conn->exec($sql);
	RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);

	print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);

	my @row = $result->fetchrow;
	print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
	return @row;
}

# exec sql query and dump all rows retured to STDERR (great for debugging)
sub ExecDebug {
	return if (! $debug);

	my $conn = shift || die "ExecDebug need conn!";
	my $sql = shift || die "ExecDebug need SQL!";

	if ($debug) {
		# re-format SQL in one line (for nicer output)
		$sql =~ s/[\s\n\r]+/ /gs;
		print STDERR "Exec: $sql\n";
	}

	my $result = $conn->exec($sql);
	RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);

	print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);

	while (my @row = $result->fetchrow) {
		print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
	}
	return $result->ntuples;
}
sub MkInfo {
	my $db = shift || die "need database name!";
	my $host = shift;
	my $port = shift;
	my $user = shift;
	my $password = shift;

	my $info = "dbname=$db";
	$info = "$info host=$host" if (defined($host));
	$info = "$info port=$port" if (defined($port));
	$info = "$info user=$user" if (defined($user));
	$info = "$info password=$password" if (defined($password));

	return $info;
}

1;
