RepQuoteExample.java [plain text]
package db.repquote;
import java.io.FileNotFoundException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.lang.Thread;
import java.lang.InterruptedException;
import com.sleepycat.db.*;
import db.repquote.RepConfig;
public class RepQuoteExample
{
private RepConfig appConfig;
private RepQuoteEnvironment dbenv;
public static void usage()
{
System.err.println("usage: " + RepConfig.progname);
System.err.println("[-C][-M][-F][-h home][-o host:port]" +
"[-m host:port][-f host:port][-n nsites][-p priority][-v]");
System.err.println(
"\t -C start the site as client of the replication group\n" +
"\t -M start the site as master of the replication group\n" +
"\t -f host:port (optional; f stands for friend and \n" +
"\t indicates a peer relationship to the specified site)\n" +
"\t -h home directory\n" +
"\t -m host:port (required; m stands for me)\n" +
"\t -n nsites (optional; number of sites in replication \n" +
"\t group; defaults to 0\n" +
"\t In which case the number of sites are computed \n" +
"\t dynamically\n" +
"\t -o host:port (optional; o stands for other; any number\n" +
"\t of these may be specified)\n" +
"\t -p priority (optional: defaults to 100)\n" +
"\t -v Enable verbose logging\n");
System.exit(1);
}
public static void main(String[] argv)
throws Exception
{
RepConfig config = new RepConfig();
boolean isPeer;
String tmpHost;
int tmpPort = 0;
for (int i = 0; i < argv.length; i++)
{
isPeer = false;
if (argv[i].compareTo("-C") == 0) {
config.startPolicy = ReplicationManagerStartPolicy.REP_CLIENT;
} else if (argv[i].compareTo("-h") == 0) {
i++;
config.home = argv[i];
} else if (argv[i].compareTo("-M") == 0) {
config.startPolicy = ReplicationManagerStartPolicy.REP_MASTER;
} else if (argv[i].compareTo("-m") == 0) {
i++;
String[] words = argv[i].split(":");
if (words.length != 2) {
System.err.println(
"Invalid host specification host:port needed.");
usage();
}
try {
tmpPort = Integer.parseInt(words[1]);
} catch (NumberFormatException nfe) {
System.err.println("Invalid host specification, " +
"could not parse port number.");
usage();
}
config.setThisHost(words[0], tmpPort);
} else if (argv[i].compareTo("-n") == 0) {
i++;
config.totalSites = Integer.parseInt(argv[i]);
} else if (argv[i].compareTo("-f") == 0 ||
argv[i].compareTo("-o") == 0) {
if (argv[i] == "-f")
isPeer = true;
i++;
String[] words = argv[i].split(":");
if (words.length != 2) {
System.err.println(
"Invalid host specification host:port needed.");
usage();
}
try {
tmpPort = Integer.parseInt(words[1]);
} catch (NumberFormatException nfe) {
System.err.println("Invalid host specification, " +
"could not parse port number.");
usage();
}
config.addOtherHost(words[0], tmpPort, isPeer);
} else if (argv[i].compareTo("-p") == 0) {
i++;
config.priority = Integer.parseInt(argv[i]);
} else if (argv[i].compareTo("-v") == 0) {
config.verbose = true;
} else {
System.err.println("Unrecognized option: " + argv[i]);
usage();
}
}
if ((!config.gotListenAddress()) || config.home.length() == 0)
usage();
RepQuoteExample runner = null;
try {
runner = new RepQuoteExample();
runner.init(config);
runner.doloop();
runner.terminate();
} catch (DatabaseException dbe) {
System.err.println("Caught an exception during " +
"initialization or processing: " + dbe);
if (runner != null)
runner.terminate();
}
}
public RepQuoteExample()
throws DatabaseException
{
appConfig = null;
dbenv = null;
}
public int init(RepConfig config)
throws DatabaseException
{
int ret = 0;
appConfig = config;
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setErrorStream(System.err);
envConfig.setErrorPrefix(RepConfig.progname);
envConfig.setReplicationManagerLocalSite(appConfig.getThisHost());
for (ReplicationHostAddress host = appConfig.getFirstOtherHost();
host != null; host = appConfig.getNextOtherHost())
{
envConfig.replicationManagerAddRemoteSite(host);
}
if (appConfig.totalSites > 0)
envConfig.setReplicationNumSites(appConfig.totalSites);
envConfig.setReplicationPriority(appConfig.priority);
envConfig.setCacheSize(RepConfig.CACHESIZE);
envConfig.setTxnNoSync(true);
envConfig.setEventHandler(new RepQuoteEventHandler());
envConfig.setReplicationManagerAckPolicy(ReplicationManagerAckPolicy.ALL);
envConfig.setAllowCreate(true);
envConfig.setRunRecovery(true);
envConfig.setThreaded(true);
envConfig.setInitializeReplication(true);
envConfig.setInitializeLocking(true);
envConfig.setInitializeLogging(true);
envConfig.setInitializeCache(true);
envConfig.setTransactional(true);
envConfig.setVerboseReplication(appConfig.verbose);
try {
dbenv = new RepQuoteEnvironment(appConfig.getHome(), envConfig);
} catch(FileNotFoundException e) {
System.err.println("FileNotFound exception: " + e);
System.err.println(
"Ensure that the environment directory is pre-created.");
ret = 1;
}
dbenv.replicationManagerStart(3, appConfig.startPolicy);
return ret;
}
public int doloop()
throws DatabaseException
{
Database db = null;
for (;;)
{
if (db == null) {
DatabaseConfig dbconf = new DatabaseConfig();
dbconf.setPageSize(512);
dbconf.setType(DatabaseType.BTREE);
if (dbenv.getIsMaster()) {
dbconf.setAllowCreate(true);
}
dbconf.setTransactional(true);
try {
db = dbenv.openDatabase
(null, RepConfig.progname, null, dbconf);
} catch (java.io.FileNotFoundException e) {
System.err.println("no stock database available yet.");
if (db != null) {
db.close(true);
db = null;
}
try {
Thread.sleep(RepConfig.SLEEPTIME);
} catch (InterruptedException ie) {}
continue;
}
}
BufferedReader stdin =
new BufferedReader(new InputStreamReader(System.in));
System.out.print("QUOTESERVER");
if (!dbenv.getIsMaster())
System.out.print("(read-only)");
System.out.print("> ");
System.out.flush();
String nextline = null;
try {
nextline = stdin.readLine();
} catch (IOException ioe) {
System.err.println("Unable to get data from stdin");
break;
}
String[] words = nextline.split("\\s");
if (words.length == 0 ||
(words.length == 1 && words[0].length() == 0)) {
try {
printStocks(db);
} catch (DeadlockException de) {
continue;
} catch (DatabaseException e) {
System.err.println("Got db exception reading replication" +
"DB: " + e);
System.err.println("Expected if it was due to a dead " +
"replication handle, otherwise an unexpected error.");
db.close(true); db = null;
continue;
}
continue;
}
if (words.length == 1 &&
(words[0].compareToIgnoreCase("quit") == 0 ||
words[0].compareToIgnoreCase("exit") == 0)) {
break;
} else if (words.length != 2) {
System.err.println("Format: TICKER VALUE");
continue;
}
if (!dbenv.getIsMaster()) {
System.err.println("Can't update client.");
continue;
}
DatabaseEntry key = new DatabaseEntry(words[0].getBytes());
DatabaseEntry data = new DatabaseEntry(words[1].getBytes());
db.put(null, key, data);
}
if (db != null)
db.close(true);
return 0;
}
public void terminate()
throws DatabaseException
{
dbenv.close();
}
private void printStocks(Database db)
throws DeadlockException, DatabaseException
{
Cursor dbc = db.openCursor(null, null);
System.out.println("\tSymbol\tPrice");
System.out.println("\t======\t=====");
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
OperationStatus ret;
for (ret = dbc.getFirst(key, data, LockMode.DEFAULT);
ret == OperationStatus.SUCCESS;
ret = dbc.getNext(key, data, LockMode.DEFAULT)) {
String keystr = new String
(key.getData(), key.getOffset(), key.getSize());
String datastr = new String
(data.getData(), data.getOffset(), data.getSize());
System.out.println("\t"+keystr+"\t"+datastr);
}
dbc.close();
}
private
class RepQuoteEventHandler extends EventHandlerAdapter {
public void handleRepClientEvent()
{
dbenv.setIsMaster(false);
}
public void handleRepMasterEvent()
{
dbenv.setIsMaster(true);
}
}
}