#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <db.h>
#ifdef _WIN32
#include <windows.h>
#define PATHD '\\'
extern int getopt(int, char * const *, const char *);
extern char *optarg;
typedef HANDLE thread_t;
#define thread_create(thrp, attr, func, arg) \
(((*(thrp) = CreateThread(NULL, 0, \
(LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
#define thread_join(thr, statusp) \
((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
typedef HANDLE mutex_t;
#define mutex_init(m, attr) \
(((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
#define mutex_lock(m) \
((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
#else
#include <pthread.h>
#include <unistd.h>
#define PATHD '/'
typedef pthread_t thread_t;
#define thread_create(thrp, attr, func, arg) \
pthread_create((thrp), (attr), (func), (arg))
#define thread_join(thr, statusp) pthread_join((thr), (statusp))
typedef pthread_mutex_t mutex_t;
#define mutex_init(m, attr) pthread_mutex_init((m), (attr))
#define mutex_lock(m) pthread_mutex_lock(m)
#define mutex_unlock(m) pthread_mutex_unlock(m)
#endif
#define NUMWRITERS 5
int global_thread_num;
mutex_t thread_num_lock;
int count_records(DB *, DB_TXN *);
int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t);
int usage(void);
void *writer_thread(void *);
int
usage()
{
fprintf(stderr, " [-h <database_home_directory>]\n");
return (EXIT_FAILURE);
}
int
main(int argc, char *argv[])
{
DB *dbp = NULL;
DB_ENV *envp = NULL;
thread_t writer_threads[NUMWRITERS];
int ch, i, ret, ret_t;
u_int32_t env_flags;
char *db_home_dir;
const char *prog_name = "txn_guide";
const char *file_name = "mydb.db";
#ifdef _WIN32
db_home_dir = ".\\";
#else
db_home_dir = "./";
#endif
while ((ch = getopt(argc, argv, "h:")) != EOF)
switch (ch) {
case 'h':
db_home_dir = optarg;
break;
case '?':
default:
return (usage());
}
ret = db_env_create(&envp, 0);
if (ret != 0) {
fprintf(stderr, "Error creating environment handle: %s\n",
db_strerror(ret));
goto err;
}
ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE);
if (ret != 0) {
fprintf(stderr, "Error setting lock detect: %s\n",
db_strerror(ret));
goto err;
}
env_flags =
DB_CREATE |
DB_RECOVER |
DB_INIT_LOCK |
DB_INIT_LOG |
DB_INIT_TXN |
DB_INIT_MPOOL |
DB_THREAD;
ret = envp->open(envp, db_home_dir, env_flags, 0);
if (ret != 0) {
fprintf(stderr, "Error opening environment: %s\n",
db_strerror(ret));
goto err;
}
ret = open_db(&dbp, prog_name, file_name,
envp, DB_DUPSORT);
if (ret != 0)
goto err;
(void)mutex_init(&thread_num_lock, NULL);
for (i = 0; i < NUMWRITERS; i++)
(void)thread_create(
&writer_threads[i], NULL, writer_thread, (void *)dbp);
for (i = 0; i < NUMWRITERS; i++)
(void)thread_join(writer_threads[i], NULL);
err:
if (dbp != NULL) {
ret_t = dbp->close(dbp, 0);
if (ret_t != 0) {
fprintf(stderr, "%s database close failed: %s\n",
file_name, db_strerror(ret_t));
ret = ret_t;
}
}
if (envp != NULL) {
ret_t = envp->close(envp, 0);
if (ret_t != 0) {
fprintf(stderr, "environment close failed: %s\n",
db_strerror(ret_t));
ret = ret_t;
}
}
printf("I'm all done.\n");
return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
void *
writer_thread(void *args)
{
DB *dbp;
DB_ENV *envp;
DBT key, value;
DB_TXN *txn;
int i, j, payload, ret, thread_num;
int retry_count, max_retries = 20;
char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
"key 5", "key 6", "key 7", "key 8",
"key 9", "key 10"};
dbp = (DB *)args;
envp = dbp->get_env(dbp);
(void)mutex_lock(&thread_num_lock);
global_thread_num++;
thread_num = global_thread_num;
(void)mutex_unlock(&thread_num_lock);
srand(thread_num);
for (i = 0; i < 50; i++) {
retry_count = 0;
retry:
ret = envp->txn_begin(envp, NULL, &txn, 0);
if (ret != 0) {
envp->err(envp, ret, "txn_begin failed");
return ((void *)EXIT_FAILURE);
}
for (j = 0; j < 10; j++) {
memset(&key, 0, sizeof(DBT));
key.data = key_strings[j];
key.size = (u_int32_t)strlen(key_strings[j]) + 1;
memset(&value, 0, sizeof(DBT));
payload = rand() + i;
value.data = &payload;
value.size = sizeof(int);
switch (ret = dbp->put(dbp, txn, &key, &value, 0)) {
case 0:
break;
case DB_KEYEXIST:
printf("Got keyexists.\n");
break;
case DB_LOCK_DEADLOCK:
(void)txn->abort(txn);
if (retry_count < max_retries) {
printf("Writer %i: Got DB_LOCK_DEADLOCK.\n",
thread_num);
printf("Writer %i: Retrying write operation.\n",
thread_num);
retry_count++;
goto retry;
}
printf("Writer %i: ", thread_num);
printf("Got DB_LOCK_DEADLOCK and out of retries.\n");
printf("Writer %i: Giving up.\n", thread_num);
return ((void *)EXIT_FAILURE);
default:
envp->err(envp, ret, "db put failed");
ret = txn->abort(txn);
if (ret != 0)
envp->err(envp, ret,
"txn abort failed");
return ((void *)EXIT_FAILURE);
}
}
printf("Thread %i. Record count: %i\n", thread_num,
count_records(dbp, NULL));
ret = txn->commit(txn, 0);
if (ret != 0) {
envp->err(envp, ret, "txn commit failed");
return ((void *)EXIT_FAILURE);
}
}
return ((void *)EXIT_SUCCESS);
}
int
count_records(DB *dbp, DB_TXN *txn)
{
DBT key, value;
DBC *cursorp;
int count, ret;
cursorp = NULL;
count = 0;
ret = dbp->cursor(dbp, txn, &cursorp,
DB_READ_UNCOMMITTED);
if (ret != 0) {
dbp->err(dbp, ret,
"count_records: cursor open failed.");
goto cursor_err;
}
memset(&key, 0, sizeof(DBT));
memset(&value, 0, sizeof(DBT));
do {
ret = cursorp->get(cursorp, &key, &value, DB_NEXT);
switch (ret) {
case 0:
count++;
break;
case DB_NOTFOUND:
break;
default:
dbp->err(dbp, ret,
"Count records unspecified error");
goto cursor_err;
}
} while (ret == 0);
cursor_err:
if (cursorp != NULL) {
ret = cursorp->close(cursorp);
if (ret != 0) {
dbp->err(dbp, ret,
"count_records: cursor close failed.");
}
}
return (count);
}
int
open_db(DB **dbpp, const char *progname, const char *file_name,
DB_ENV *envp, u_int32_t extra_flags)
{
int ret;
u_int32_t open_flags;
DB *dbp;
ret = db_create(&dbp, envp, 0);
if (ret != 0) {
fprintf(stderr, "%s: %s\n", progname,
db_strerror(ret));
return (EXIT_FAILURE);
}
*dbpp = dbp;
if (extra_flags != 0) {
ret = dbp->set_flags(dbp, extra_flags);
if (ret != 0) {
dbp->err(dbp, ret,
"open_db: Attempt to set extra flags failed.");
return (EXIT_FAILURE);
}
}
open_flags = DB_CREATE |
DB_READ_UNCOMMITTED |
DB_AUTO_COMMIT;
ret = dbp->open(dbp,
NULL,
file_name,
NULL,
DB_BTREE,
open_flags,
0);
if (ret != 0) {
dbp->err(dbp, ret, "Database '%s' open failed",
file_name);
return (EXIT_FAILURE);
}
return (EXIT_SUCCESS);
}