#include <sys_defs.h>
#ifdef HAS_LMDB
#include <sys/stat.h>
#include <string.h>
#include <unistd.h>
#include <limits.h>
#include <msg.h>
#include <mymalloc.h>
#include <htable.h>
#include <iostuff.h>
#include <vstring.h>
#include <myflock.h>
#include <stringops.h>
#include <slmdb.h>
#include <dict.h>
#include <dict_lmdb.h>
#include <warn_stat.h>
typedef struct {
DICT dict;
SLMDB slmdb;
VSTRING *key_buf;
VSTRING *val_buf;
} DICT_LMDB;
#define DICT_LMDB_SUFFIX "lmdb"
#define SCOPY(buf, data, size) \
vstring_str(vstring_strncpy(buf ? buf : (buf = vstring_alloc(10)), data, size))
#define DICT_LMDB_SIZE_INCR 2
#define DICT_LMDB_SIZE_MAX SSIZE_T_MAX
#define DICT_LMDB_API_RETRY_LIMIT 2
#define DICT_LMDB_BULK_RETRY_LIMIT \
((int) (2 * sizeof(size_t) * CHAR_BIT))
static const char *dict_lmdb_lookup(DICT *dict, const char *name)
{
DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
MDB_val mdb_key;
MDB_val mdb_value;
const char *result = 0;
int status;
ssize_t klen;
dict->error = 0;
klen = strlen(name);
if ((dict->flags & (DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL)) == 0)
msg_panic("dict_lmdb_lookup: no DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL flag");
if (dict->flags & DICT_FLAG_FOLD_FIX) {
if (dict->fold_buf == 0)
dict->fold_buf = vstring_alloc(10);
vstring_strcpy(dict->fold_buf, name);
name = lowercase(vstring_str(dict->fold_buf));
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_SHARED) < 0)
msg_fatal("%s: lock dictionary: %m", dict->name);
if (dict->flags & DICT_FLAG_TRY1NULL) {
mdb_key.mv_data = (void *) name;
mdb_key.mv_size = klen + 1;
status = slmdb_get(&dict_lmdb->slmdb, &mdb_key, &mdb_value);
if (status == 0) {
dict->flags &= ~DICT_FLAG_TRY0NULL;
result = SCOPY(dict_lmdb->val_buf, mdb_value.mv_data,
mdb_value.mv_size);
} else if (status != MDB_NOTFOUND) {
msg_fatal("error reading %s:%s: %s",
dict_lmdb->dict.type, dict_lmdb->dict.name,
mdb_strerror(status));
}
}
if (result == 0 && (dict->flags & DICT_FLAG_TRY0NULL)) {
mdb_key.mv_data = (void *) name;
mdb_key.mv_size = klen;
status = slmdb_get(&dict_lmdb->slmdb, &mdb_key, &mdb_value);
if (status == 0) {
dict->flags &= ~DICT_FLAG_TRY1NULL;
result = SCOPY(dict_lmdb->val_buf, mdb_value.mv_data,
mdb_value.mv_size);
} else if (status != MDB_NOTFOUND) {
msg_fatal("error reading %s:%s: %s",
dict_lmdb->dict.type, dict_lmdb->dict.name,
mdb_strerror(status));
}
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
msg_fatal("%s: unlock dictionary: %m", dict->name);
return (result);
}
static int dict_lmdb_update(DICT *dict, const char *name, const char *value)
{
DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
MDB_val mdb_key;
MDB_val mdb_value;
int status;
dict->error = 0;
if ((dict->flags & (DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL)) == 0)
msg_panic("dict_lmdb_update: no DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL flag");
if (dict->flags & DICT_FLAG_FOLD_FIX) {
if (dict->fold_buf == 0)
dict->fold_buf = vstring_alloc(10);
vstring_strcpy(dict->fold_buf, name);
name = lowercase(vstring_str(dict->fold_buf));
}
mdb_key.mv_data = (void *) name;
mdb_value.mv_data = (void *) value;
mdb_key.mv_size = strlen(name);
mdb_value.mv_size = strlen(value);
if ((dict->flags & DICT_FLAG_TRY1NULL)
&& (dict->flags & DICT_FLAG_TRY0NULL)) {
#ifdef LMDB_NO_TRAILING_NULL
dict->flags &= ~DICT_FLAG_TRY1NULL;
#else
dict->flags &= ~DICT_FLAG_TRY0NULL;
#endif
}
if (dict->flags & DICT_FLAG_TRY1NULL) {
mdb_key.mv_size++;
mdb_value.mv_size++;
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_EXCLUSIVE) < 0)
msg_fatal("%s: lock dictionary: %m", dict->name);
status = slmdb_put(&dict_lmdb->slmdb, &mdb_key, &mdb_value,
(dict->flags & DICT_FLAG_DUP_REPLACE) ? 0 : MDB_NOOVERWRITE);
if (status != 0) {
if (status == MDB_KEYEXIST) {
if (dict->flags & DICT_FLAG_DUP_IGNORE)
;
else if (dict->flags & DICT_FLAG_DUP_WARN)
msg_warn("%s:%s: duplicate entry: \"%s\"",
dict_lmdb->dict.type, dict_lmdb->dict.name, name);
else
msg_fatal("%s:%s: duplicate entry: \"%s\"",
dict_lmdb->dict.type, dict_lmdb->dict.name, name);
} else {
msg_fatal("error updating %s:%s: %s",
dict_lmdb->dict.type, dict_lmdb->dict.name,
mdb_strerror(status));
}
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
msg_fatal("%s: unlock dictionary: %m", dict->name);
return (status);
}
static int dict_lmdb_delete(DICT *dict, const char *name)
{
DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
MDB_val mdb_key;
int status = 1;
ssize_t klen;
dict->error = 0;
klen = strlen(name);
if ((dict->flags & (DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL)) == 0)
msg_panic("dict_lmdb_delete: no DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL flag");
if (dict->flags & DICT_FLAG_FOLD_FIX) {
if (dict->fold_buf == 0)
dict->fold_buf = vstring_alloc(10);
vstring_strcpy(dict->fold_buf, name);
name = lowercase(vstring_str(dict->fold_buf));
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_EXCLUSIVE) < 0)
msg_fatal("%s: lock dictionary: %m", dict->name);
if (dict->flags & DICT_FLAG_TRY1NULL) {
mdb_key.mv_data = (void *) name;
mdb_key.mv_size = klen + 1;
status = slmdb_del(&dict_lmdb->slmdb, &mdb_key);
if (status != 0) {
if (status == MDB_NOTFOUND)
status = 1;
else
msg_fatal("error deleting from %s:%s: %s",
dict_lmdb->dict.type, dict_lmdb->dict.name,
mdb_strerror(status));
} else {
dict->flags &= ~DICT_FLAG_TRY0NULL;
}
}
if (status > 0 && (dict->flags & DICT_FLAG_TRY0NULL)) {
mdb_key.mv_data = (void *) name;
mdb_key.mv_size = klen;
status = slmdb_del(&dict_lmdb->slmdb, &mdb_key);
if (status != 0) {
if (status == MDB_NOTFOUND)
status = 1;
else
msg_fatal("error deleting from %s:%s: %s",
dict_lmdb->dict.type, dict_lmdb->dict.name,
mdb_strerror(status));
} else {
dict->flags &= ~DICT_FLAG_TRY1NULL;
}
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
msg_fatal("%s: unlock dictionary: %m", dict->name);
return (status);
}
static int dict_lmdb_sequence(DICT *dict, int function,
const char **key, const char **value)
{
const char *myname = "dict_lmdb_sequence";
DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
MDB_val mdb_key;
MDB_val mdb_value;
MDB_cursor_op op;
int status;
dict->error = 0;
switch (function) {
case DICT_SEQ_FUN_FIRST:
op = MDB_FIRST;
break;
case DICT_SEQ_FUN_NEXT:
op = MDB_NEXT;
break;
default:
msg_panic("%s: invalid function: %d", myname, function);
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_SHARED) < 0)
msg_fatal("%s: lock dictionary: %m", dict->name);
status = slmdb_cursor_get(&dict_lmdb->slmdb, &mdb_key, &mdb_value, op);
switch (status) {
case 0:
*key = SCOPY(dict_lmdb->key_buf, mdb_key.mv_data, mdb_key.mv_size);
if (mdb_value.mv_data != 0 && mdb_value.mv_size > 0)
*value = SCOPY(dict_lmdb->val_buf, mdb_value.mv_data,
mdb_value.mv_size);
else
*value = "";
break;
case MDB_NOTFOUND:
status = 1;
break;
default:
msg_fatal("error seeking %s:%s: %s",
dict_lmdb->dict.type, dict_lmdb->dict.name,
mdb_strerror(status));
}
if ((dict->flags & DICT_FLAG_LOCK)
&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
msg_fatal("%s: unlock dictionary: %m", dict->name);
return (status);
}
static void dict_lmdb_close(DICT *dict)
{
DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
slmdb_close(&dict_lmdb->slmdb);
if (dict_lmdb->key_buf)
vstring_free(dict_lmdb->key_buf);
if (dict_lmdb->val_buf)
vstring_free(dict_lmdb->val_buf);
if (dict->fold_buf)
vstring_free(dict->fold_buf);
dict_free(dict);
}
static void dict_lmdb_longjmp(void *context, int val)
{
DICT_LMDB *dict_lmdb = (DICT_LMDB *) context;
dict_longjmp(&dict_lmdb->dict, val);
}
static void dict_lmdb_notify(void *context, int error_code,...)
{
DICT_LMDB *dict_lmdb = (DICT_LMDB *) context;
va_list ap;
va_start(ap, error_code);
switch (error_code) {
case MDB_SUCCESS:
msg_info("database %s:%s: using size limit %lu during open",
dict_lmdb->dict.type, dict_lmdb->dict.name,
(unsigned long) va_arg(ap, size_t));
break;
case MDB_MAP_FULL:
msg_info("database %s:%s: using size limit %lu after MDB_MAP_FULL",
dict_lmdb->dict.type, dict_lmdb->dict.name,
(unsigned long) va_arg(ap, size_t));
break;
case MDB_MAP_RESIZED:
msg_info("database %s:%s: using size limit %lu after MDB_MAP_RESIZED",
dict_lmdb->dict.type, dict_lmdb->dict.name,
(unsigned long) va_arg(ap, size_t));
break;
case MDB_READERS_FULL:
msg_info("database %s:%s: pausing after MDB_READERS_FULL",
dict_lmdb->dict.type, dict_lmdb->dict.name);
break;
default:
msg_warn("unknown MDB error code: %d", error_code);
break;
}
va_end(ap);
}
static void dict_lmdb_assert(void *context, const char *text)
{
DICT_LMDB *dict_lmdb = (DICT_LMDB *) context;
msg_fatal("%s:%s: internal error: %s",
dict_lmdb->dict.type, dict_lmdb->dict.name, text);
}
DICT *dict_lmdb_open(const char *path, int open_flags, int dict_flags)
{
DICT_LMDB *dict_lmdb;
DICT *dict;
struct stat st;
SLMDB slmdb;
char *mdb_path;
int mdb_flags, slmdb_flags, status;
int db_fd;
#define DICT_LMDB_OPEN_RETURN(d) do { \
DICT *__d = (d); \
myfree(mdb_path); \
return (__d); \
} while (0)
mdb_path = concatenate(path, "." DICT_TYPE_LMDB, (char *) 0);
mdb_flags = MDB_NOSUBDIR | MDB_NOLOCK;
if (open_flags == O_RDONLY)
mdb_flags |= MDB_RDONLY;
slmdb_flags = 0;
if (dict_flags & DICT_FLAG_BULK_UPDATE)
slmdb_flags |= SLMDB_FLAG_BULK;
#ifndef MDB_NOMEMINIT
if (dict_flags & DICT_FLAG_BULK_UPDATE)
mdb_flags |= MDB_WRITEMAP;
#endif
if ((status = slmdb_init(&slmdb, dict_lmdb_map_size, DICT_LMDB_SIZE_INCR,
DICT_LMDB_SIZE_MAX)) != 0
|| (status = slmdb_open(&slmdb, mdb_path, open_flags, mdb_flags,
slmdb_flags)) != 0) {
dict = dict_surrogate(DICT_TYPE_LMDB, path, open_flags, dict_flags,
"open database %s: %s", mdb_path, mdb_strerror(status));
DICT_LMDB_OPEN_RETURN(dict);
}
db_fd = slmdb_fd(&slmdb);
if (dict_flags & DICT_FLAG_BULK_UPDATE) {
if (myflock(db_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_EXCLUSIVE) < 0)
msg_fatal("%s: lock dictionary: %m", mdb_path);
if (myflock(db_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_SHARED) < 0)
msg_fatal("%s: unlock dictionary: %m", mdb_path);
}
dict_lmdb = (DICT_LMDB *) dict_alloc(DICT_TYPE_LMDB, path, sizeof(*dict_lmdb));
dict_lmdb->slmdb = slmdb;
dict_lmdb->dict.lookup = dict_lmdb_lookup;
dict_lmdb->dict.update = dict_lmdb_update;
dict_lmdb->dict.delete = dict_lmdb_delete;
dict_lmdb->dict.sequence = dict_lmdb_sequence;
dict_lmdb->dict.close = dict_lmdb_close;
if (fstat(db_fd, &st) < 0)
msg_fatal("dict_lmdb_open: fstat: %m");
dict_lmdb->dict.lock_fd = dict_lmdb->dict.stat_fd = db_fd;
dict_lmdb->dict.lock_type = MYFLOCK_STYLE_FCNTL;
dict_lmdb->dict.mtime = st.st_mtime;
dict_lmdb->dict.owner.uid = st.st_uid;
dict_lmdb->dict.owner.status = (st.st_uid != 0);
dict_lmdb->key_buf = 0;
dict_lmdb->val_buf = 0;
if ((dict_flags & DICT_FLAG_LOCK) != 0
&& stat(path, &st) == 0
&& st.st_mtime > dict_lmdb->dict.mtime
&& st.st_mtime < time((time_t *) 0) - 100)
msg_warn("database %s is older than source file %s", mdb_path, path);
#define DICT_LMDB_IMPL_FLAGS (DICT_FLAG_FIXED | DICT_FLAG_MULTI_WRITER)
dict_lmdb->dict.flags = dict_flags | DICT_LMDB_IMPL_FLAGS;
if ((dict_flags & (DICT_FLAG_TRY0NULL | DICT_FLAG_TRY1NULL)) == 0)
dict_lmdb->dict.flags |= (DICT_FLAG_TRY0NULL | DICT_FLAG_TRY1NULL);
if (dict_flags & DICT_FLAG_FOLD_FIX)
dict_lmdb->dict.fold_buf = vstring_alloc(10);
if (dict_flags & DICT_FLAG_BULK_UPDATE)
dict_jmp_alloc(&dict_lmdb->dict);
if (slmdb_control(&dict_lmdb->slmdb,
CA_SLMDB_CTL_API_RETRY_LIMIT(DICT_LMDB_API_RETRY_LIMIT),
CA_SLMDB_CTL_BULK_RETRY_LIMIT(DICT_LMDB_BULK_RETRY_LIMIT),
CA_SLMDB_CTL_LONGJMP_FN(dict_lmdb_longjmp),
CA_SLMDB_CTL_NOTIFY_FN(msg_verbose ?
dict_lmdb_notify : (SLMDB_NOTIFY_FN) 0),
CA_SLMDB_CTL_ASSERT_FN(dict_lmdb_assert),
CA_SLMDB_CTL_CB_CONTEXT((void *) dict_lmdb),
CA_SLMDB_CTL_END) != 0)
msg_panic("dict_lmdb_open: slmdb_control: %m");
if (msg_verbose)
dict_lmdb_notify((void *) dict_lmdb, MDB_SUCCESS,
slmdb_curr_limit(&dict_lmdb->slmdb));
DICT_LMDB_OPEN_RETURN(DICT_DEBUG (&dict_lmdb->dict));
}
#endif