TxnGuideInMemory.cpp [plain text]
#include <iostream>
#include <db_cxx.h>
#ifdef _WIN32
#include <windows.h>
#define PATHD '\\'
extern "C" {
extern int getopt(int, char * const *, const char *);
extern char *optarg;
}
typedef HANDLE thread_t;
#define thread_create(thrp, attr, func, arg) \
(((*(thrp) = CreateThread(NULL, 0, \
(LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
#define thread_join(thr, statusp) \
((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
typedef HANDLE mutex_t;
#define mutex_init(m, attr) \
(((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
#define mutex_lock(m) \
((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
#else
#include <pthread.h>
#include <unistd.h>
#define PATHD '/'
typedef pthread_t thread_t;
#define thread_create(thrp, attr, func, arg) \
pthread_create((thrp), (attr), (func), (arg))
#define thread_join(thr, statusp) pthread_join((thr), (statusp))
typedef pthread_mutex_t mutex_t;
#define mutex_init(m, attr) pthread_mutex_init((m), (attr))
#define mutex_lock(m) pthread_mutex_lock(m)
#define mutex_unlock(m) pthread_mutex_unlock(m)
#endif
#define NUMWRITERS 5
int global_thread_num;
mutex_t thread_num_lock;
int countRecords(Db *, DbTxn *);
int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
int usage(void);
void *writerThread(void *);
int
main(void)
{
Db *dbp = NULL;
DbEnv *envp = NULL;
thread_t writerThreads[NUMWRITERS];
int i;
u_int32_t envFlags;
const char *progName = "TxnGuideInMemory";
envFlags =
DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_TXN | DB_INIT_MPOOL | DB_PRIVATE | DB_THREAD;
try {
envp = new DbEnv(0);
envp->log_set_config(DB_LOG_IN_MEMORY, 1);
envp->set_lg_bsize(10 * 1024 * 1024);
envp->set_cachesize(0, 10 * 1024 * 1024, 1);
envp->set_lk_detect(DB_LOCK_MINWRITE);
envp->open(NULL, envFlags, 0);
openDb(&dbp, progName, NULL,
envp, DB_DUPSORT);
(void)mutex_init(&thread_num_lock, NULL);
for (i = 0; i < NUMWRITERS; i++)
(void)thread_create(
&writerThreads[i], NULL,
writerThread,
(void *)dbp);
for (i = 0; i < NUMWRITERS; i++)
(void)thread_join(writerThreads[i], NULL);
} catch(DbException &e) {
std::cerr << "Error opening database environment: "
<< std::endl;
std::cerr << e.what() << std::endl;
return (EXIT_FAILURE);
}
try {
if (dbp != NULL)
dbp->close(0);
if (envp != NULL)
envp->close(0);
} catch(DbException &e) {
std::cerr << "Error closing database and environment."
<< std::endl;
std::cerr << e.what() << std::endl;
return (EXIT_FAILURE);
}
std::cout << "I'm all done." << std::endl;
return (EXIT_SUCCESS);
}
void *
writerThread(void *args)
{
int j, thread_num;
int max_retries = 20; char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
"key 5", "key 6", "key 7", "key 8",
"key 9", "key 10"};
Db *dbp = (Db *)args;
DbEnv *envp = dbp->get_env();
(void)mutex_lock(&thread_num_lock);
global_thread_num++;
thread_num = global_thread_num;
(void)mutex_unlock(&thread_num_lock);
srand(thread_num);
for (int i=0; i<50; i++) {
DbTxn *txn;
bool retry = true;
int retry_count = 0;
while (retry) {
try {
txn = NULL;
envp->txn_begin(NULL, &txn, 0);
for (j = 0; j < 10; j++) {
Dbt key, value;
key.set_data(key_strings[j]);
key.set_size((u_int32_t)strlen(key_strings[j]) + 1);
int payload = rand() + i;
value.set_data(&payload);
value.set_size(sizeof(int));
dbp->put(txn, &key, &value, 0);
}
std::cout << thread_num << " : Found "
<< countRecords(dbp, txn)
<< " records in the database." << std::endl;
std::cout << thread_num << " : committing txn : " << i
<< std::endl;
try {
txn->commit(0);
retry = false;
txn = NULL;
} catch (DbException &e) {
std::cout << "Error on txn commit: "
<< e.what() << std::endl;
}
} catch (DbDeadlockException &) {
if (txn != NULL)
(void)txn->abort();
if (retry_count < max_retries) {
std::cerr << "############### Writer " << thread_num
<< ": Got DB_LOCK_DEADLOCK.\n"
<< "Retrying write operation." << std::endl;
retry_count++;
retry = true;
} else {
std::cerr << "Writer " << thread_num
<< ": Got DeadLockException and out of "
<< "retries. Giving up." << std::endl;
retry = false;
}
} catch (DbException &e) {
std::cerr << "db put failed" << std::endl;
std::cerr << e.what() << std::endl;
if (txn != NULL)
txn->abort();
retry = false;
} catch (std::exception &ee) {
std::cerr << "Unknown exception: " << ee.what() << std::endl;
return (0);
}
}
}
return (0);
}
int
countRecords(Db *dbp, DbTxn *txn)
{
Dbc *cursorp = NULL;
int count = 0;
try {
dbp->cursor(txn, &cursorp, 0);
Dbt key, value;
while (cursorp->get(&key, &value, DB_NEXT) == 0) {
count++;
}
} catch (DbDeadlockException &de) {
std::cerr << "countRecords: got deadlock" << std::endl;
cursorp->close();
throw de;
} catch (DbException &e) {
std::cerr << "countRecords error:" << std::endl;
std::cerr << e.what() << std::endl;
}
if (cursorp != NULL) {
try {
cursorp->close();
} catch (DbException &e) {
std::cerr << "countRecords: cursor close failed:" << std::endl;
std::cerr << e.what() << std::endl;
}
}
return (count);
}
int
openDb(Db **dbpp, const char *progname, const char *fileName,
DbEnv *envp, u_int32_t extraFlags)
{
int ret;
u_int32_t openFlags;
try {
Db *dbp = new Db(envp, 0);
*dbpp = dbp;
if (extraFlags != 0)
ret = dbp->set_flags(extraFlags);
openFlags = DB_CREATE | DB_THREAD |
DB_AUTO_COMMIT;
dbp->open(NULL, fileName, NULL, DB_BTREE, openFlags, 0); } catch (DbException &e) {
std::cerr << progname << ": openDb: db open failed:" << std::endl;
std::cerr << e.what() << std::endl;
return (EXIT_FAILURE);
}
return (EXIT_SUCCESS);
}