#include <sys_defs.h>
#include <errno.h>
#include <string.h>
#include <ctype.h>
#include <stdio.h>
#include <msg.h>
#include <mymalloc.h>
#include <dict.h>
#include <vstring.h>
#include <stringops.h>
#include <auto_clnt.h>
#include <vstream.h>
#include <cfg_parser.h>
#include <db_common.h>
#include <memcache_proto.h>
#include <dict_memcache.h>
typedef struct {
DICT dict;
CFG_PARSER *parser;
void *dbc_ctxt;
char *key_format;
int timeout;
int mc_ttl;
int mc_flags;
int err_pause;
int max_tries;
int max_line;
int max_data;
char *memcache;
AUTO_CLNT *clnt;
VSTRING *clnt_buf;
VSTRING *key_buf;
VSTRING *res_buf;
int error;
DICT *backup;
} DICT_MC;
#define DICT_MC_DEF_HOST "localhost"
#define DICT_MC_DEF_PORT "11211"
#define DICT_MC_DEF_MEMCACHE "inet:" DICT_MC_DEF_HOST ":" DICT_MC_DEF_PORT
#define DICT_MC_DEF_KEY_FMT "%s"
#define DICT_MC_DEF_MC_TTL 3600
#define DICT_MC_DEF_MC_TIMEOUT 2
#define DICT_MC_DEF_MC_FLAGS 0
#define DICT_MC_DEF_MAX_TRY 2
#define DICT_MC_DEF_MAX_LINE 1024
#define DICT_MC_DEF_MAX_DATA 10240
#define DICT_MC_DEF_ERR_PAUSE 1
#define DICT_MC_NAME_MEMCACHE "memcache"
#define DICT_MC_NAME_BACKUP "backup"
#define DICT_MC_NAME_KEY_FMT "key_format"
#define DICT_MC_NAME_MC_TTL "ttl"
#define DICT_MC_NAME_MC_TIMEOUT "timeout"
#define DICT_MC_NAME_MC_FLAGS "flags"
#define DICT_MC_NAME_MAX_TRY "max_try"
#define DICT_MC_NAME_MAX_LINE "line_size_limit"
#define DICT_MC_NAME_MAX_DATA "data_size_limit"
#define DICT_MC_NAME_ERR_PAUSE "retry_pause"
#define STR(x) vstring_str(x)
#define LEN(x) VSTRING_LEN(x)
static int dict_memcache_set(DICT_MC *dict_mc, const char *value, int ttl)
{
VSTREAM *fp;
int count;
size_t data_len = strlen(value);
if (data_len > dict_mc->max_data) {
msg_warn("database %s:%s: data for key %s is too long (%s=%d) "
"-- not stored", DICT_TYPE_MEMCACHE, dict_mc->dict.name,
STR(dict_mc->key_buf), DICT_MC_NAME_MAX_DATA,
dict_mc->max_data);
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, DICT_STAT_FAIL);
}
for (count = 0; count < dict_mc->max_tries; count++) {
if (count > 0)
sleep(dict_mc->err_pause);
if ((fp = auto_clnt_access(dict_mc->clnt)) == 0) {
break;
} else if (memcache_printf(fp, "set %s %d %d %ld",
STR(dict_mc->key_buf), dict_mc->mc_flags,
ttl, (long) data_len) < 0
|| memcache_fwrite(fp, value, strlen(value)) < 0
|| memcache_get(fp, dict_mc->clnt_buf,
dict_mc->max_line) < 0) {
if (count > 0)
msg_warn(errno ? "database %s:%s: I/O error: %m" :
"database %s:%s: I/O error",
DICT_TYPE_MEMCACHE, dict_mc->dict.name);
} else if (strcmp(STR(dict_mc->clnt_buf), "STORED") != 0) {
if (count > 0)
msg_warn("database %s:%s: update failed: %.30s",
DICT_TYPE_MEMCACHE, dict_mc->dict.name,
STR(dict_mc->clnt_buf));
} else {
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, DICT_STAT_SUCCESS);
}
auto_clnt_recover(dict_mc->clnt);
}
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_RETRY, DICT_STAT_ERROR);
}
static const char *dict_memcache_get(DICT_MC *dict_mc)
{
VSTREAM *fp;
long todo;
int count;
for (count = 0; count < dict_mc->max_tries; count++) {
if (count > 0)
sleep(dict_mc->err_pause);
if ((fp = auto_clnt_access(dict_mc->clnt)) == 0) {
break;
} else if (memcache_printf(fp, "get %s", STR(dict_mc->key_buf)) < 0
|| memcache_get(fp, dict_mc->clnt_buf, dict_mc->max_line) < 0) {
if (count > 0)
msg_warn(errno ? "database %s:%s: I/O error: %m" :
"database %s:%s: I/O error",
DICT_TYPE_MEMCACHE, dict_mc->dict.name);
} else if (strcmp(STR(dict_mc->clnt_buf), "END") == 0) {
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, (char *) 0);
} else if (sscanf(STR(dict_mc->clnt_buf),
"VALUE %*s %*s %ld", &todo) != 1
|| todo < 0 || todo > dict_mc->max_data) {
if (count > 0)
msg_warn("%s: unexpected memcache server reply: %.30s",
dict_mc->dict.name, STR(dict_mc->clnt_buf));
} else if (memcache_fread(fp, dict_mc->res_buf, todo) < 0) {
if (count > 0)
msg_warn("%s: EOF receiving memcache server reply",
dict_mc->dict.name);
} else {
if (memcache_get(fp, dict_mc->clnt_buf, dict_mc->max_line) < 0
|| strcmp(STR(dict_mc->clnt_buf), "END") != 0)
auto_clnt_recover(dict_mc->clnt);
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, STR(dict_mc->res_buf));
}
auto_clnt_recover(dict_mc->clnt);
}
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_RETRY, (char *) 0);
}
static int dict_memcache_del(DICT_MC *dict_mc)
{
VSTREAM *fp;
int count;
for (count = 0; count < dict_mc->max_tries; count++) {
if (count > 0)
sleep(dict_mc->err_pause);
if ((fp = auto_clnt_access(dict_mc->clnt)) == 0) {
break;
} else if (memcache_printf(fp, "delete %s", STR(dict_mc->key_buf)) < 0
|| memcache_get(fp, dict_mc->clnt_buf, dict_mc->max_line) < 0) {
if (count > 0)
msg_warn(errno ? "database %s:%s: I/O error: %m" :
"database %s:%s: I/O error",
DICT_TYPE_MEMCACHE, dict_mc->dict.name);
} else if (strcmp(STR(dict_mc->clnt_buf), "DELETED") == 0) {
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, DICT_STAT_SUCCESS);
} else if (strcmp(STR(dict_mc->clnt_buf), "NOT_FOUND") == 0) {
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, DICT_STAT_FAIL);
} else {
if (count > 0)
msg_warn("database %s:%s: delete failed: %.30s",
DICT_TYPE_MEMCACHE, dict_mc->dict.name,
STR(dict_mc->clnt_buf));
}
auto_clnt_recover(dict_mc->clnt);
}
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_RETRY, DICT_STAT_ERROR);
}
static ssize_t dict_memcache_prepare_key(DICT_MC *dict_mc, const char *name)
{
if (dict_mc->dict.flags & DICT_FLAG_FOLD_FIX) {
if (dict_mc->dict.fold_buf == 0)
dict_mc->dict.fold_buf = vstring_alloc(10);
vstring_strcpy(dict_mc->dict.fold_buf, name);
name = lowercase(STR(dict_mc->dict.fold_buf));
}
#define DICT_MC_NO_KEY (0)
#define DICT_MC_NO_QUOTING ((void (*)(DICT *, const char *, VSTRING *)) 0)
if (dict_mc->key_format != 0
&& strcmp(dict_mc->key_format, DICT_MC_DEF_KEY_FMT) != 0) {
VSTRING_RESET(dict_mc->key_buf);
if (db_common_expand(dict_mc->dbc_ctxt, dict_mc->key_format,
name, DICT_MC_NO_KEY, dict_mc->key_buf,
DICT_MC_NO_QUOTING) == 0)
return (0);
} else {
vstring_strcpy(dict_mc->key_buf, name);
}
return (LEN(dict_mc->key_buf));
}
static int dict_memcache_valid_key(DICT_MC *dict_mc,
const char *name,
const char *operation,
void (*log_func) (const char *,...))
{
unsigned char *cp;
int rc;
#define DICT_MC_SKIP(why) do { \
if (msg_verbose || log_func != msg_info) \
log_func("%s: skipping %s for name \"%s\": %s", \
dict_mc->dict.name, operation, name, (why)); \
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, 0); \
} while (0)
if (*name == 0)
DICT_MC_SKIP("empty lookup key");
if ((rc = db_common_check_domain(dict_mc->dbc_ctxt, name)) == 0)
DICT_MC_SKIP("domain mismatch");
if (rc < 0)
DICT_ERR_VAL_RETURN(dict_mc, rc, 0);
if (dict_memcache_prepare_key(dict_mc, name) == 0)
DICT_MC_SKIP("empty lookup key expansion");
for (cp = (unsigned char *) STR(dict_mc->key_buf); *cp; cp++)
if (isascii(*cp) && isspace(*cp))
DICT_MC_SKIP("name contains space");
DICT_ERR_VAL_RETURN(dict_mc, DICT_ERR_NONE, 1);
}
static int dict_memcache_update(DICT *dict, const char *name,
const char *value)
{
const char *myname = "dict_memcache_update";
DICT_MC *dict_mc = (DICT_MC *) dict;
DICT *backup = dict_mc->backup;
int upd_res;
if (dict_memcache_valid_key(dict_mc, name, "update", msg_warn) == 0)
DICT_ERR_VAL_RETURN(dict, dict_mc->error, DICT_STAT_FAIL);
upd_res = dict_memcache_set(dict_mc, value, dict_mc->mc_ttl);
dict->error = dict_mc->error;
if (backup) {
upd_res = backup->update(backup, name, value);
dict->error = backup->error;
}
if (msg_verbose)
msg_info("%s: %s: update key \"%s\"(%s) => \"%s\" %s",
myname, dict_mc->dict.name, name, STR(dict_mc->key_buf),
value, dict_mc->error ? "(memcache error)" : (backup
&& backup->error) ? "(backup error)" : "(no error)");
return (upd_res);
}
static const char *dict_memcache_lookup(DICT *dict, const char *name)
{
const char *myname = "dict_memcache_lookup";
DICT_MC *dict_mc = (DICT_MC *) dict;
DICT *backup = dict_mc->backup;
const char *retval;
if (dict_memcache_valid_key(dict_mc, name, "lookup", msg_info) == 0)
DICT_ERR_VAL_RETURN(dict, dict_mc->error, (char *) 0);
retval = dict_memcache_get(dict_mc);
dict->error = dict_mc->error;
if (backup) {
backup->error = 0;
if (retval == 0) {
retval = backup->lookup(backup, name);
dict->error = backup->error;
if (retval != 0)
dict_memcache_set(dict_mc, retval, dict_mc->mc_ttl);
}
}
if (msg_verbose)
msg_info("%s: %s: key \"%s\"(%s) => %s",
myname, dict_mc->dict.name, name, STR(dict_mc->key_buf),
retval ? retval : dict_mc->error ? "(memcache error)" :
(backup && backup->error) ? "(backup error)" : "(not found)");
return (retval);
}
static int dict_memcache_delete(DICT *dict, const char *name)
{
const char *myname = "dict_memcache_delete";
DICT_MC *dict_mc = (DICT_MC *) dict;
DICT *backup = dict_mc->backup;
int del_res;
if (dict_memcache_valid_key(dict_mc, name, "delete", msg_info) == 0)
DICT_ERR_VAL_RETURN(dict, dict_mc->error, dict_mc->error ?
DICT_STAT_ERROR : DICT_STAT_FAIL);
del_res = dict_memcache_del(dict_mc);
dict->error = dict_mc->error;
if (backup) {
del_res = backup->delete(backup, name);
dict->error = backup->error;
}
if (msg_verbose)
msg_info("%s: %s: delete key \"%s\"(%s) => %s",
myname, dict_mc->dict.name, name, STR(dict_mc->key_buf),
dict_mc->error ? "(memcache error)" : (backup
&& backup->error) ? "(backup error)" : "(no error)");
return (del_res);
}
static int dict_memcache_sequence(DICT *dict, int function, const char **key,
const char **value)
{
const char *myname = "dict_memcache_sequence";
DICT_MC *dict_mc = (DICT_MC *) dict;
DICT *backup = dict_mc->backup;
int seq_res;
if (backup == 0) {
msg_warn("database %s:%s: first/next support requires backup database",
DICT_TYPE_MEMCACHE, dict_mc->dict.name);
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, DICT_STAT_FAIL);
} else {
seq_res = backup->sequence(backup, function, key, value);
if (msg_verbose)
msg_info("%s: %s: key \"%s\" => %s",
myname, dict_mc->dict.name, *key ? *key : "(not found)",
*value ? *value : backup->error ? "(backup error)" :
"(not found)");
DICT_ERR_VAL_RETURN(dict, backup->error, seq_res);
}
}
static void dict_memcache_close(DICT *dict)
{
DICT_MC *dict_mc = (DICT_MC *) dict;
cfg_parser_free(dict_mc->parser);
db_common_free_ctx(dict_mc->dbc_ctxt);
if (dict_mc->key_format)
myfree(dict_mc->key_format);
myfree(dict_mc->memcache);
auto_clnt_free(dict_mc->clnt);
vstring_free(dict_mc->clnt_buf);
vstring_free(dict_mc->key_buf);
vstring_free(dict_mc->res_buf);
if (dict->fold_buf)
vstring_free(dict->fold_buf);
if (dict_mc->backup)
dict_close(dict_mc->backup);
dict_free(dict);
}
DICT *dict_memcache_open(const char *name, int open_flags, int dict_flags)
{
DICT_MC *dict_mc;
char *backup;
CFG_PARSER *parser;
if (dict_flags & DICT_FLAG_NO_UNAUTH)
return (dict_surrogate(DICT_TYPE_MEMCACHE, name, open_flags, dict_flags,
"%s:%s map is not allowed for security-sensitive data",
DICT_TYPE_MEMCACHE, name));
open_flags &= (O_RDONLY | O_RDWR | O_WRONLY | O_APPEND);
if (open_flags != O_RDONLY && open_flags != O_RDWR)
return (dict_surrogate(DICT_TYPE_MEMCACHE, name, open_flags, dict_flags,
"%s:%s map requires O_RDONLY or O_RDWR access mode",
DICT_TYPE_MEMCACHE, name));
if ((parser = cfg_parser_alloc(name)) == 0)
return (dict_surrogate(DICT_TYPE_MEMCACHE, name, open_flags, dict_flags,
"open %s: %m", name));
dict_mc = (DICT_MC *) dict_alloc(DICT_TYPE_MEMCACHE, name,
sizeof(*dict_mc));
dict_mc->dict.lookup = dict_memcache_lookup;
if (open_flags == O_RDWR) {
dict_mc->dict.update = dict_memcache_update;
dict_mc->dict.delete = dict_memcache_delete;
}
dict_mc->dict.sequence = dict_memcache_sequence;
dict_mc->dict.close = dict_memcache_close;
dict_mc->dict.flags = dict_flags;
dict_mc->key_buf = vstring_alloc(10);
dict_mc->res_buf = vstring_alloc(10);
dict_mc->parser = parser;
dict_mc->key_format = cfg_get_str(dict_mc->parser, DICT_MC_NAME_KEY_FMT,
DICT_MC_DEF_KEY_FMT, 0, 0);
dict_mc->timeout = cfg_get_int(dict_mc->parser, DICT_MC_NAME_MC_TIMEOUT,
DICT_MC_DEF_MC_TIMEOUT, 0, 0);
dict_mc->mc_ttl = cfg_get_int(dict_mc->parser, DICT_MC_NAME_MC_TTL,
DICT_MC_DEF_MC_TTL, 0, 0);
dict_mc->mc_flags = cfg_get_int(dict_mc->parser, DICT_MC_NAME_MC_FLAGS,
DICT_MC_DEF_MC_FLAGS, 0, 0);
dict_mc->err_pause = cfg_get_int(dict_mc->parser, DICT_MC_NAME_ERR_PAUSE,
DICT_MC_DEF_ERR_PAUSE, 1, 0);
dict_mc->max_tries = cfg_get_int(dict_mc->parser, DICT_MC_NAME_MAX_TRY,
DICT_MC_DEF_MAX_TRY, 1, 0);
dict_mc->max_line = cfg_get_int(dict_mc->parser, DICT_MC_NAME_MAX_LINE,
DICT_MC_DEF_MAX_LINE, 1, 0);
dict_mc->max_data = cfg_get_int(dict_mc->parser, DICT_MC_NAME_MAX_DATA,
DICT_MC_DEF_MAX_DATA, 1, 0);
dict_mc->memcache = cfg_get_str(dict_mc->parser, DICT_MC_NAME_MEMCACHE,
DICT_MC_DEF_MEMCACHE, 0, 0);
dict_mc->clnt = auto_clnt_create(dict_mc->memcache, dict_mc->timeout, 0, 0);
dict_mc->clnt_buf = vstring_alloc(100);
backup = cfg_get_str(dict_mc->parser, DICT_MC_NAME_BACKUP,
(char *) 0, 0, 0);
if (backup) {
dict_mc->backup = dict_open(backup, open_flags, dict_flags);
myfree(backup);
} else
dict_mc->backup = 0;
dict_mc->dbc_ctxt = 0;
db_common_parse(&dict_mc->dict, &dict_mc->dbc_ctxt,
dict_mc->key_format, 1);
db_common_parse_domain(dict_mc->parser, dict_mc->dbc_ctxt);
if (db_common_dict_partial(dict_mc->dbc_ctxt))
dict_mc->dict.flags |= DICT_FLAG_PATTERN;
else
dict_mc->dict.flags |= DICT_FLAG_FIXED;
dict_mc->dict.flags |= DICT_FLAG_MULTI_WRITER;
return (&dict_mc->dict);
}