TestReplication.java   [plain text]


/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 1997-2003
 *	Sleepycat Software.  All rights reserved.
 *
 * $Id: TestReplication.java,v 1.2 2004/03/30 01:24:39 jtownsen Exp $
 */

/*
 * Simple test of replication, merely to exercise the individual
 * methods in the API.  Rather than use TCP/IP, our transport
 * mechanism is just an ArrayList of byte arrays.
 * It's managed like a queue, and synchronization is via
 * the ArrayList object itself and java's wait/notify.
 * It's not terribly extensible, but it's fine for a small test.
 */

package com.sleepycat.test;

import com.sleepycat.db.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Vector;

public class TestReplication extends Thread
    implements DbRepTransport
{
    public static final String MASTER_ENVDIR = "./master";
    public static final String CLIENT_ENVDIR = "./client";

    private Vector queue = new Vector();
    private DbEnv master_env;
    private DbEnv client_env;

    private static void mkdir(String name)
        throws IOException
    {
        (new File(name)).mkdir();
    }


    // The client thread runs this
    public void run()
    {
        try {
            System.err.println("c10");
            client_env = new DbEnv(0);
            System.err.println("c11");
            client_env.set_rep_transport(1, this);
            System.err.println("c12");
            client_env.open(CLIENT_ENVDIR, Db.DB_CREATE | Db.DB_INIT_MPOOL, 0);
            System.err.println("c13");
            Dbt myid = new Dbt("master01".getBytes());
            System.err.println("c14");
            client_env.rep_start(myid, Db.DB_REP_CLIENT);
            System.err.println("c15");
            DbEnv.RepProcessMessage processMsg = new DbEnv.RepProcessMessage();
            processMsg.envid = 2;
            System.err.println("c20");
            boolean running = true;

            Dbt control = new Dbt();
            Dbt rec = new Dbt();

            while (running) {
                int msgtype = 0;

                System.err.println("c30");
                synchronized (queue) {
                    if (queue.size() == 0) {
                        System.err.println("c40");
                        sleepShort();
                    }
                    else {
                        msgtype = ((Integer)queue.firstElement()).intValue();
                        queue.removeElementAt(0);
                        byte[] data;

                        System.err.println("c50 " + msgtype);

                        switch (msgtype) {
                            case -1:
                                running = false;
                                break;
                            case 1:
                                data = (byte[])queue.firstElement();
                                queue.removeElementAt(0);
                                control.set_data(data);
                                control.set_size(data.length);
                                break;
                            case 2:
                                control.set_data(null);
                                control.set_size(0);
                                break;
                            case 3:
                                data = (byte[])queue.firstElement();
                                queue.removeElementAt(0);
                                rec.set_data(data);
                                rec.set_size(data.length);
                                break;
                            case 4:
                                rec.set_data(null);
                                rec.set_size(0);
                                break;
                        }

                    }
                }
                System.err.println("c60");
                if (msgtype == 3 || msgtype == 4) {
                    System.out.println("client: Got message");
                    client_env.rep_process_message(control, rec,
                                                   processMsg);
                }
            }
            System.err.println("c70");
            Db db = new Db(client_env, 0);
            db.open(null, "x.db", null, Db.DB_BTREE, 0, 0);
            Dbt data = new Dbt();
            System.err.println("c80");
            db.get(null, new Dbt("Hello".getBytes()), data, 0);
            System.err.println("c90");
            System.out.println("Hello " + new String(data.get_data(), data.get_offset(), data.get_size()));
            System.err.println("c95");
            client_env.close(0);
        }
        catch (Exception e) {
            System.err.println("client exception: " + e);
        }
    }

    // Implements DbTransport
    public int send(DbEnv env, Dbt control, Dbt rec, int flags, int envid)
        throws DbException
    {
        System.out.println("Send to " + envid);
        if (envid == 1) {
            System.err.println("Unexpected envid = " + envid);
            return 0;
        }

        int nbytes = 0;

        synchronized (queue) {
            System.out.println("Sending message");
            byte[] data = control.get_data();
            if (data != null && data.length > 0) {
                queue.addElement(new Integer(1));
                nbytes += data.length;
                byte[] newdata = new byte[data.length];
                System.arraycopy(data, 0, newdata, 0, data.length);
                queue.addElement(newdata);
            }
            else
            {
                queue.addElement(new Integer(2));
            }

            data = rec.get_data();
            if (data != null && data.length > 0) {
                queue.addElement(new Integer(3));
                nbytes += data.length;
                byte[] newdata = new byte[data.length];
                System.arraycopy(data, 0, newdata, 0, data.length);
                queue.addElement(newdata);
            }
            else
            {
                queue.addElement(new Integer(4));
            }
            System.out.println("MASTER: sent message");
        }
        return 0;
    }

    public void sleepShort()
    {
        try {
            sleep(100);
        }
        catch (InterruptedException ie)
        {
        }
    }

    public void send_terminator()
    {
        synchronized (queue) {
            queue.addElement(new Integer(-1));
        }
    }

    public void master()
    {
        try {
            master_env = new DbEnv(0);
            master_env.set_rep_transport(2, this);
            master_env.open(MASTER_ENVDIR, Db.DB_CREATE | Db.DB_INIT_MPOOL, 0644);
            System.err.println("10");
            Dbt myid = new Dbt("client01".getBytes());
            master_env.rep_start(myid, Db.DB_REP_MASTER);
            System.err.println("10");
            Db db = new Db(master_env, 0);
            System.err.println("20");
            db.open(null, "x.db", null, Db.DB_BTREE, Db.DB_CREATE, 0644);
            System.err.println("30");
            db.put(null, new Dbt("Hello".getBytes()),
                   new Dbt("world".getBytes()), 0);
            System.err.println("40");
            //DbEnv.RepElectResult electionResult = new DbEnv.RepElectResult();
            //master_env.rep_elect(2, 2, 3, 4, electionResult);
            db.close(0);
            System.err.println("50");
            master_env.close(0);
            send_terminator();
        }
        catch (Exception e) {
            System.err.println("client exception: " + e);
        }
    }

    public static void main(String[] args)
    {
        // The test should only take a few milliseconds.
        // give it 10 seconds before bailing out.
        TimelimitThread t = new TimelimitThread(1000*10);
        t.start();

        try {
            mkdir(CLIENT_ENVDIR);
            mkdir(MASTER_ENVDIR);

            TestReplication rep = new TestReplication();

            // Run the client as a seperate thread.
            rep.start();

            // Run the master.
            rep.master();

            // Wait for the master to finish.
            rep.join();
        }
        catch (Exception e)
        {
            System.err.println("Exception: " + e);
        }
        t.finished();
    }

}

class TimelimitThread extends Thread
{
    long nmillis;
    boolean finished = false;

    TimelimitThread(long nmillis)
    {
        this.nmillis = nmillis;
    }

    public void finished()
    {
        finished = true;
    }

    public void run()
    {
        long targetTime = System.currentTimeMillis() + nmillis;
        long curTime;

        while (!finished &&
               ((curTime = System.currentTimeMillis()) < targetTime)) {
            long diff = targetTime - curTime;
            if (diff > 100)
                diff = 100;
            try {
                sleep(diff);
            }
            catch (InterruptedException ie) {
            }
        }
        System.err.println("");
        System.exit(1);
    }
}