#include "sm.h"
#ifdef STORAGE_DB
#include <db.h>
typedef struct drvdata_st {
DB_ENV *env;
char *path;
int sync;
xht dbs;
xht filters;
} *drvdata_t;
typedef struct dbdata_st {
drvdata_t data;
DB *db;
} *dbdata_t;
union xhashv {
void **val;
dbdata_t *dbd_val;
};
static st_ret_t _st_db_add_type(st_driver_t drv, const char *type) {
drvdata_t data = (drvdata_t) drv->private;
dbdata_t dbd;
int err;
dbd = (dbdata_t) malloc(sizeof(struct dbdata_st));
memset(dbd, 0, sizeof(struct dbdata_st));
dbd->data = data;
if((err = db_create(&(dbd->db), data->env, 0)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't create db handle: %s", db_strerror(err));
free(dbd);
return st_FAILED;
}
if((err = dbd->db->set_flags(dbd->db, DB_DUP)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't set database for duplicate storage: %s", db_strerror(err));
dbd->db->close(dbd->db, 0);
free(dbd);
return st_FAILED;
}
if((err = dbd->db->open(dbd->db, NULL, "sm.db", type, DB_HASH, DB_AUTO_COMMIT | DB_CREATE, 0)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't open storage db: %s", db_strerror(err));
dbd->db->close(dbd->db, 0);
free(dbd);
return st_FAILED;
}
xhash_put(data->dbs, type, dbd);
return st_SUCCESS;
}
static st_ret_t _st_db_cursor_new(st_driver_t drv, dbdata_t dbd, DBC **cursor, DB_TXN **txnid) {
int err;
if(txnid != NULL)
if((err = dbd->data->env->txn_begin(dbd->data->env, NULL, txnid, DB_TXN_SYNC)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't begin new transaction: %s", db_strerror(err));
return st_FAILED;
}
if(txnid == NULL)
err = dbd->db->cursor(dbd->db, NULL, cursor, 0);
else
err = dbd->db->cursor(dbd->db, *txnid, cursor, 0);
if(err != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't create cursor: %s", db_strerror(err));
if(txnid != NULL)
(*txnid)->abort(*txnid);
return st_FAILED;
}
return st_SUCCESS;
}
static st_ret_t _st_db_cursor_free(st_driver_t drv, dbdata_t dbd, DBC *cursor, DB_TXN *txnid) {
int err;
if((err = cursor->c_close(cursor)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't close cursor: %s", db_strerror(err));
if(txnid != NULL)
txnid->abort(txnid);
return st_FAILED;
}
if(txnid != NULL)
if((err = txnid->commit(txnid, DB_TXN_SYNC)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't commit transaction: %s", db_strerror(err));
return st_FAILED;
}
return st_SUCCESS;
}
static void _st_db_object_serialise(os_object_t o, char **buf, int *len) {
char *key, *xml, *xmlstr;
void *val;
os_type_t ot;
int cur = 0, xlen;
log_debug(ZONE, "serialising object");
*buf = NULL;
*len = 0;
if(os_object_iter_first(o))
do {
os_object_iter_get(o, &key, &val, &ot);
log_debug(ZONE, "serialising key %s", key);
ser_string_set(key, &cur, buf, len);
ser_int_set(ot, &cur, buf, len);
switch(ot) {
case os_type_BOOLEAN:
ser_int_set(((int) val) != 0, &cur, buf, len);
break;
case os_type_INTEGER:
ser_int_set((int) val, &cur, buf, len);
break;
case os_type_STRING:
ser_string_set((char *) val, &cur, buf, len);
break;
case os_type_NAD:
nad_print((nad_t) val, 0, &xml, &xlen);
xmlstr = (char *) malloc(sizeof(char) * (xlen + 1));
sprintf(xmlstr, "%.*s", xlen, xml);
ser_string_set(xmlstr, &cur, buf, len);
free(xmlstr);
break;
case os_type_UNKNOWN:
break;
}
} while(os_object_iter_next(o));
*len = cur;
}
static os_object_t _st_db_object_deserialise(st_driver_t drv, os_t os, const char *buf, int len) {
os_object_t o;
int cur;
char *key, *sval;
int ot;
int ival;
nad_t nad;
log_debug(ZONE, "deserialising object");
o = os_object_new(os);
cur = 0;
while(cur < len) {
if(ser_string_get(&key, &cur, buf, len) != 0 || ser_int_get(&ot, &cur, buf, len) != 0) {
log_debug(ZONE, "ran off the end of the buffer");
return o;
}
log_debug(ZONE, "deserialising key %s", key);
switch((os_type_t) ot) {
case os_type_BOOLEAN:
ser_int_get(&ival, &cur, buf, len);
ival = (ival != 0);
os_object_put(o, key, &ival, os_type_BOOLEAN);
break;
case os_type_INTEGER:
ser_int_get(&ival, &cur, buf, len);
os_object_put(o, key, &ival, os_type_INTEGER);
break;
case os_type_STRING:
ser_string_get(&sval, &cur, buf, len);
os_object_put(o, key, sval, os_type_STRING);
free(sval);
break;
case os_type_NAD:
ser_string_get(&sval, &cur, buf, len);
nad = nad_parse(drv->st->sm->router->nad_cache, sval, strlen(sval));
free(sval);
if(nad == NULL) {
log_write(drv->st->sm->log, LOG_ERR, "db: unable to parse stored XML - database corruption?");
return NULL;
}
os_object_put(o, key, nad, os_type_NAD);
nad_free(nad);
break;
case os_type_UNKNOWN:
break;
}
free(key);
}
return o;
}
static st_ret_t _st_db_put_guts(st_driver_t drv, const char *type, const char *owner, os_t os, dbdata_t dbd, DBC *c, DB_TXN *t) {
DBT key, val;
os_object_t o;
char *buf;
int len, err;
memset(&key, 0, sizeof(DBT));
memset(&val, 0, sizeof(DBT));
key.data = (char *) owner;
key.size = strlen(owner);
if(os_iter_first(os))
do {
o = os_iter_object(os);
_st_db_object_serialise(o, &buf, &len);
val.data = buf;
val.size = len;
if((err = c->c_put(c, &key, &val, DB_KEYLAST)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't store value for type %s owner %s in storage db: %s", type, owner, db_strerror(err));
free(buf);
return st_FAILED;
}
free(buf);
} while(os_iter_next(os));
return st_SUCCESS;
}
static st_ret_t _st_db_put(st_driver_t drv, const char *type, const char *owner, os_t os) {
drvdata_t data = (drvdata_t) drv->private;
dbdata_t dbd = xhash_get(data->dbs, type);
DBC *c;
DB_TXN *t;
st_ret_t ret;
if(os_count(os) == 0)
return st_SUCCESS;
ret = _st_db_cursor_new(drv, dbd, &c, &t);
if(ret != st_SUCCESS)
return ret;
ret = _st_db_put_guts(drv, type, owner, os, dbd, c, t);
if(ret != st_SUCCESS) {
t->abort(t);
_st_db_cursor_free(drv, dbd, c, NULL);
return st_FAILED;
}
return _st_db_cursor_free(drv, dbd, c, t);
}
static st_ret_t _st_db_get(st_driver_t drv, const char *type, const char *owner, const char *filter, os_t *os) {
drvdata_t data = (drvdata_t) drv->private;
dbdata_t dbd = xhash_get(data->dbs, type);
DBC *c;
DB_TXN *t;
st_ret_t ret;
DBT key, val;
st_filter_t f;
int err;
os_object_t o;
char *cfilter;
ret = _st_db_cursor_new(drv, dbd, &c, &t);
if(ret != st_SUCCESS)
return ret;
f = NULL;
if(filter != NULL) {
f = xhash_get(data->filters, filter);
if(f == NULL) {
f = storage_filter(filter);
cfilter = pstrdup(xhash_pool(data->filters), filter);
xhash_put(data->filters, cfilter, (void *) f);
pool_cleanup(xhash_pool(data->filters), (pool_cleaner) pool_free, f->p);
}
}
memset(&key, 0, sizeof(DBT));
memset(&val, 0, sizeof(DBT));
key.data = (char *) owner;
key.size = strlen(owner);
*os = os_new();
err = c->c_get(c, &key, &val, DB_SET);
while(err == 0) {
o = _st_db_object_deserialise(drv, *os, val.data, val.size);
if(o != NULL && !storage_match(f, o, *os))
os_object_free(o);
err = c->c_get(c, &key, &val, DB_NEXT_DUP);
}
if(err != 0 && err != DB_NOTFOUND) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't move cursor for type %s owner %s in storage db: %s", type, owner, db_strerror(err));
t->abort(t);
_st_db_cursor_free(drv, dbd, c, NULL);
os_free(*os);
return st_FAILED;
}
ret = _st_db_cursor_free(drv, dbd, c, t);
if(ret != st_SUCCESS) {
os_free(*os);
return ret;
}
if(os_count(*os) == 0) {
os_free(*os);
return st_NOTFOUND;
}
return st_SUCCESS;
}
static st_ret_t _st_db_delete_guts(st_driver_t drv, const char *type, const char *owner, const char *filter, dbdata_t dbd, DBC *c, DB_TXN *t) {
drvdata_t data = (drvdata_t) drv->private;
DBT key, val;
st_filter_t f;
int err;
os_t os;
os_object_t o;
char *cfilter;
f = NULL;
if(filter != NULL) {
f = xhash_get(data->filters, filter);
if(f == NULL) {
f = storage_filter(filter);
cfilter = pstrdup(xhash_pool(data->filters), filter);
xhash_put(data->filters, cfilter, (void *) f);
pool_cleanup(xhash_pool(data->filters), (pool_cleaner) pool_free, f->p);
}
}
memset(&key, 0, sizeof(DBT));
memset(&val, 0, sizeof(DBT));
key.data = (char *) owner;
key.size = strlen(owner);
os = os_new();
err = c->c_get(c, &key, &val, DB_SET);
while(err == 0) {
o = _st_db_object_deserialise(drv, os, val.data, val.size);
if(o != NULL && storage_match(f, o, os))
err = c->c_del(c, 0);
if(err == 0)
err = c->c_get(c, &key, &val, DB_NEXT_DUP);
}
os_free(os);
if(err != 0 && err != DB_NOTFOUND) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't move cursor for type %s owner %s in storage db: %s", type, owner, db_strerror(err));
return st_FAILED;
}
return st_SUCCESS;
}
static st_ret_t _st_db_delete(st_driver_t drv, const char *type, const char *owner, const char *filter) {
drvdata_t data = (drvdata_t) drv->private;
dbdata_t dbd = xhash_get(data->dbs, type);
DBC *c;
DB_TXN *t;
st_ret_t ret;
ret = _st_db_cursor_new(drv, dbd, &c, &t);
if(ret != st_SUCCESS)
return ret;
ret = _st_db_delete_guts(drv, type, owner, filter, dbd, c, t);
if(ret != st_SUCCESS) {
t->abort(t);
_st_db_cursor_free(drv, dbd, c, NULL);
return st_FAILED;
}
return _st_db_cursor_free(drv, dbd, c, t);
}
static st_ret_t _st_db_replace(st_driver_t drv, const char *type, const char *owner, const char *filter, os_t os) {
drvdata_t data = (drvdata_t) drv->private;
dbdata_t dbd = xhash_get(data->dbs, type);
DBC *c;
DB_TXN *t;
st_ret_t ret;
ret = _st_db_cursor_new(drv, dbd, &c, &t);
if(ret != st_SUCCESS)
return ret;
ret = _st_db_delete_guts(drv, type, owner, filter, dbd, c, t);
if(ret != st_SUCCESS) {
t->abort(t);
_st_db_cursor_free(drv, dbd, c, NULL);
return st_FAILED;
}
if(os_count(os) == 0)
return _st_db_cursor_free(drv, dbd, c, t);
ret = _st_db_put_guts(drv, type, owner, os, dbd, c, t);
if(ret != st_SUCCESS) {
t->abort(t);
_st_db_cursor_free(drv, dbd, c, NULL);
return st_FAILED;
}
return _st_db_cursor_free(drv, dbd, c, t);
}
static void _st_db_free(st_driver_t drv) {
drvdata_t data = (drvdata_t) drv->private;
const char *key;
dbdata_t dbd;
DB_ENV *env;
union xhashv xhv;
xhv.dbd_val = &dbd;
if(xhash_iter_first(data->dbs))
do {
xhash_iter_get(data->dbs, &key, xhv.val);
log_debug(ZONE, "closing %s db", key);
dbd->db->close(dbd->db, 0);
free(dbd);
} while(xhash_iter_next(data->dbs));
xhash_free(data->dbs);
xhash_free(data->filters);
data->env->close(data->env, 0);
if (db_env_create(&env, 0) == 0)
env->remove(env, data->path, 0);
free(data);
}
static void _st_db_panic(DB_ENV *env, int errval) {
log_t log = (log_t) env->app_private;
log_write(log, LOG_CRIT, "db: corruption detected! close all jabberd processes and run db_recover");
exit(2);
}
st_ret_t st_db_init(st_driver_t drv) {
char *path;
int err;
DB_ENV *env;
drvdata_t data;
path = config_get_one(drv->st->sm->config, "storage.db.path", 0);
if(path == NULL) {
log_write(drv->st->sm->log, LOG_ERR, "db: no path specified in config file");
return st_FAILED;
}
if((err = db_env_create(&env, 0)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't create environment: %s", db_strerror(err));
return st_FAILED;
}
if((err = env->set_paniccall(env, _st_db_panic)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't set panic call: %s", db_strerror(err));
return st_FAILED;
}
env->app_private = drv->st->sm->log;
if((err = env->open(env, path, DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_LOG | DB_INIT_TXN | DB_CREATE, 0)) != 0) {
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't open environment: %s", db_strerror(err));
env->close(env, 0);
return st_FAILED;
}
data = (drvdata_t) malloc(sizeof(struct drvdata_st));
memset(data, 0, sizeof(struct drvdata_st));
data->env = env;
data->path = path;
if(config_get_one(drv->st->sm->config, "storage.db.sync", 0) != NULL)
data->sync = 1;
data->dbs = xhash_new(101);
data->filters = xhash_new(17);
drv->private = (void *) data;
drv->add_type = _st_db_add_type;
drv->put = _st_db_put;
drv->get = _st_db_get;
drv->replace = _st_db_replace;
drv->delete = _st_db_delete;
drv->free = _st_db_free;
return st_SUCCESS;
}
#endif