#include <sys_defs.h>
#include <errno.h>
#include <unistd.h>
#include <msg.h>
#include <mymalloc.h>
#include <stringops.h>
#include <vstring.h>
#include <vstream.h>
#include <attr.h>
#include <dict.h>
#include <mail_proto.h>
#include <mail_params.h>
#include <clnt_stream.h>
#include <dict_proxy.h>
typedef struct {
DICT dict;
CLNT_STREAM *clnt;
const char *service;
int inst_flags;
VSTRING *reskey;
VSTRING *result;
} DICT_PROXY;
#define STR(x) vstring_str(x)
#define VSTREQ(v,s) (strcmp(STR(v),s) == 0)
static CLNT_STREAM *proxymap_stream;
static CLNT_STREAM *proxywrite_stream;
static int dict_proxy_sequence(DICT *dict, int function,
const char **key, const char **value)
{
const char *myname = "dict_proxy_sequence";
DICT_PROXY *dict_proxy = (DICT_PROXY *) dict;
VSTREAM *stream;
int status;
int count = 0;
int request_flags;
VSTRING_RESET(dict_proxy->reskey);
VSTRING_TERMINATE(dict_proxy->reskey);
VSTRING_RESET(dict_proxy->result);
VSTRING_TERMINATE(dict_proxy->result);
request_flags = dict_proxy->inst_flags
| (dict->flags & DICT_FLAG_RQST_MASK);
for (;;) {
stream = clnt_stream_access(dict_proxy->clnt);
errno = 0;
count += 1;
if (attr_print(stream, ATTR_FLAG_NONE,
SEND_ATTR_STR(MAIL_ATTR_REQ, PROXY_REQ_SEQUENCE),
SEND_ATTR_STR(MAIL_ATTR_TABLE, dict->name),
SEND_ATTR_INT(MAIL_ATTR_FLAGS, request_flags),
SEND_ATTR_INT(MAIL_ATTR_FUNC, function),
ATTR_TYPE_END) != 0
|| vstream_fflush(stream)
|| attr_scan(stream, ATTR_FLAG_STRICT,
RECV_ATTR_INT(MAIL_ATTR_STATUS, &status),
RECV_ATTR_STR(MAIL_ATTR_KEY, dict_proxy->reskey),
RECV_ATTR_STR(MAIL_ATTR_VALUE, dict_proxy->result),
ATTR_TYPE_END) != 3) {
if (msg_verbose || count > 1 || (errno && errno != EPIPE && errno != ENOENT))
msg_warn("%s: service %s: %m", myname, VSTREAM_PATH(stream));
} else {
if (msg_verbose)
msg_info("%s: table=%s flags=%s func=%d -> status=%d key=%s val=%s",
myname, dict->name, dict_flags_str(request_flags),
function, status, STR(dict_proxy->reskey),
STR(dict_proxy->result));
switch (status) {
case PROXY_STAT_BAD:
msg_fatal("%s sequence failed for table \"%s\" function %d: "
"invalid request",
dict_proxy->service, dict->name, function);
case PROXY_STAT_DENY:
msg_fatal("%s service is not configured for table \"%s\"",
dict_proxy->service, dict->name);
case PROXY_STAT_OK:
*key = STR(dict_proxy->reskey);
*value = STR(dict_proxy->result);
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, DICT_STAT_SUCCESS);
case PROXY_STAT_NOKEY:
*key = *value = 0;
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, DICT_STAT_FAIL);
case PROXY_STAT_RETRY:
*key = *value = 0;
DICT_ERR_VAL_RETURN(dict, DICT_ERR_RETRY, DICT_STAT_ERROR);
case PROXY_STAT_CONFIG:
*key = *value = 0;
DICT_ERR_VAL_RETURN(dict, DICT_ERR_CONFIG, DICT_STAT_ERROR);
default:
msg_warn("%s sequence failed for table \"%s\" function %d: "
"unexpected reply status %d",
dict_proxy->service, dict->name, function, status);
}
}
clnt_stream_recover(dict_proxy->clnt);
sleep(1);
}
}
static const char *dict_proxy_lookup(DICT *dict, const char *key)
{
const char *myname = "dict_proxy_lookup";
DICT_PROXY *dict_proxy = (DICT_PROXY *) dict;
VSTREAM *stream;
int status;
int count = 0;
int request_flags;
VSTRING_RESET(dict_proxy->result);
VSTRING_TERMINATE(dict_proxy->result);
request_flags = dict_proxy->inst_flags
| (dict->flags & DICT_FLAG_RQST_MASK);
for (;;) {
stream = clnt_stream_access(dict_proxy->clnt);
errno = 0;
count += 1;
if (attr_print(stream, ATTR_FLAG_NONE,
SEND_ATTR_STR(MAIL_ATTR_REQ, PROXY_REQ_LOOKUP),
SEND_ATTR_STR(MAIL_ATTR_TABLE, dict->name),
SEND_ATTR_INT(MAIL_ATTR_FLAGS, request_flags),
SEND_ATTR_STR(MAIL_ATTR_KEY, key),
ATTR_TYPE_END) != 0
|| vstream_fflush(stream)
|| attr_scan(stream, ATTR_FLAG_STRICT,
RECV_ATTR_INT(MAIL_ATTR_STATUS, &status),
RECV_ATTR_STR(MAIL_ATTR_VALUE, dict_proxy->result),
ATTR_TYPE_END) != 2) {
if (msg_verbose || count > 1 || (errno && errno != EPIPE && errno != ENOENT))
msg_warn("%s: service %s: %m", myname, VSTREAM_PATH(stream));
} else {
if (msg_verbose)
msg_info("%s: table=%s flags=%s key=%s -> status=%d result=%s",
myname, dict->name,
dict_flags_str(request_flags), key,
status, STR(dict_proxy->result));
switch (status) {
case PROXY_STAT_BAD:
msg_fatal("%s lookup failed for table \"%s\" key \"%s\": "
"invalid request",
dict_proxy->service, dict->name, key);
case PROXY_STAT_DENY:
msg_fatal("%s service is not configured for table \"%s\"",
dict_proxy->service, dict->name);
case PROXY_STAT_OK:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, STR(dict_proxy->result));
case PROXY_STAT_NOKEY:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, (char *) 0);
case PROXY_STAT_RETRY:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_RETRY, (char *) 0);
case PROXY_STAT_CONFIG:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_CONFIG, (char *) 0);
default:
msg_warn("%s lookup failed for table \"%s\" key \"%s\": "
"unexpected reply status %d",
dict_proxy->service, dict->name, key, status);
}
}
clnt_stream_recover(dict_proxy->clnt);
sleep(1);
}
}
static int dict_proxy_update(DICT *dict, const char *key, const char *value)
{
const char *myname = "dict_proxy_update";
DICT_PROXY *dict_proxy = (DICT_PROXY *) dict;
VSTREAM *stream;
int status;
int count = 0;
int request_flags;
request_flags = dict_proxy->inst_flags
| (dict->flags & DICT_FLAG_RQST_MASK);
for (;;) {
stream = clnt_stream_access(dict_proxy->clnt);
errno = 0;
count += 1;
if (attr_print(stream, ATTR_FLAG_NONE,
SEND_ATTR_STR(MAIL_ATTR_REQ, PROXY_REQ_UPDATE),
SEND_ATTR_STR(MAIL_ATTR_TABLE, dict->name),
SEND_ATTR_INT(MAIL_ATTR_FLAGS, request_flags),
SEND_ATTR_STR(MAIL_ATTR_KEY, key),
SEND_ATTR_STR(MAIL_ATTR_VALUE, value),
ATTR_TYPE_END) != 0
|| vstream_fflush(stream)
|| attr_scan(stream, ATTR_FLAG_STRICT,
RECV_ATTR_INT(MAIL_ATTR_STATUS, &status),
ATTR_TYPE_END) != 1) {
if (msg_verbose || count > 1 || (errno && errno != EPIPE && errno != ENOENT))
msg_warn("%s: service %s: %m", myname, VSTREAM_PATH(stream));
} else {
if (msg_verbose)
msg_info("%s: table=%s flags=%s key=%s value=%s -> status=%d",
myname, dict->name, dict_flags_str(request_flags),
key, value, status);
switch (status) {
case PROXY_STAT_BAD:
msg_fatal("%s update failed for table \"%s\" key \"%s\": "
"invalid request",
dict_proxy->service, dict->name, key);
case PROXY_STAT_DENY:
msg_fatal("%s update access is not configured for table \"%s\"",
dict_proxy->service, dict->name);
case PROXY_STAT_OK:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, DICT_STAT_SUCCESS);
case PROXY_STAT_NOKEY:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, DICT_STAT_FAIL);
case PROXY_STAT_RETRY:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_RETRY, DICT_STAT_ERROR);
case PROXY_STAT_CONFIG:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_CONFIG, DICT_STAT_ERROR);
default:
msg_warn("%s update failed for table \"%s\" key \"%s\": "
"unexpected reply status %d",
dict_proxy->service, dict->name, key, status);
}
}
clnt_stream_recover(dict_proxy->clnt);
sleep(1);
}
}
static int dict_proxy_delete(DICT *dict, const char *key)
{
const char *myname = "dict_proxy_delete";
DICT_PROXY *dict_proxy = (DICT_PROXY *) dict;
VSTREAM *stream;
int status;
int count = 0;
int request_flags;
request_flags = dict_proxy->inst_flags
| (dict->flags & DICT_FLAG_RQST_MASK);
for (;;) {
stream = clnt_stream_access(dict_proxy->clnt);
errno = 0;
count += 1;
if (attr_print(stream, ATTR_FLAG_NONE,
SEND_ATTR_STR(MAIL_ATTR_REQ, PROXY_REQ_DELETE),
SEND_ATTR_STR(MAIL_ATTR_TABLE, dict->name),
SEND_ATTR_INT(MAIL_ATTR_FLAGS, request_flags),
SEND_ATTR_STR(MAIL_ATTR_KEY, key),
ATTR_TYPE_END) != 0
|| vstream_fflush(stream)
|| attr_scan(stream, ATTR_FLAG_STRICT,
RECV_ATTR_INT(MAIL_ATTR_STATUS, &status),
ATTR_TYPE_END) != 1) {
if (msg_verbose || count > 1 || (errno && errno != EPIPE && errno !=
ENOENT))
msg_warn("%s: service %s: %m", myname, VSTREAM_PATH(stream));
} else {
if (msg_verbose)
msg_info("%s: table=%s flags=%s key=%s -> status=%d",
myname, dict->name, dict_flags_str(request_flags),
key, status);
switch (status) {
case PROXY_STAT_BAD:
msg_fatal("%s delete failed for table \"%s\" key \"%s\": "
"invalid request",
dict_proxy->service, dict->name, key);
case PROXY_STAT_DENY:
msg_fatal("%s update access is not configured for table \"%s\"",
dict_proxy->service, dict->name);
case PROXY_STAT_OK:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, DICT_STAT_SUCCESS);
case PROXY_STAT_NOKEY:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_NONE, DICT_STAT_FAIL);
case PROXY_STAT_RETRY:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_RETRY, DICT_STAT_ERROR);
case PROXY_STAT_CONFIG:
DICT_ERR_VAL_RETURN(dict, DICT_ERR_CONFIG, DICT_STAT_ERROR);
default:
msg_warn("%s delete failed for table \"%s\" key \"%s\": "
"unexpected reply status %d",
dict_proxy->service, dict->name, key, status);
}
}
clnt_stream_recover(dict_proxy->clnt);
sleep(1);
}
}
static void dict_proxy_close(DICT *dict)
{
DICT_PROXY *dict_proxy = (DICT_PROXY *) dict;
vstring_free(dict_proxy->reskey);
vstring_free(dict_proxy->result);
dict_free(dict);
}
DICT *dict_proxy_open(const char *map, int open_flags, int dict_flags)
{
const char *myname = "dict_proxy_open";
DICT_PROXY *dict_proxy;
VSTREAM *stream;
int server_flags;
int status;
const char *service;
char *relative_path;
char *kludge = 0;
char *prefix;
CLNT_STREAM **pstream;
if (dict_flags & DICT_FLAG_NO_PROXY)
return (dict_open(map, open_flags, dict_flags));
if (open_flags == O_RDONLY) {
pstream = &proxymap_stream;
service = var_proxymap_service;
} else if ((open_flags & O_RDWR) == O_RDWR) {
pstream = &proxywrite_stream;
service = var_proxywrite_service;
} else
msg_fatal("%s: %s map open requires O_RDONLY or O_RDWR mode",
map, DICT_TYPE_PROXY);
if (*pstream == 0) {
relative_path = concatenate(MAIL_CLASS_PRIVATE "/",
service, (char *) 0);
if (access(relative_path, F_OK) == 0)
prefix = MAIL_CLASS_PRIVATE;
else
prefix = kludge = concatenate(var_queue_dir, "/",
MAIL_CLASS_PRIVATE, (char *) 0);
*pstream = clnt_stream_create(prefix, service, var_ipc_idle_limit,
var_ipc_ttl_limit);
if (kludge)
myfree(kludge);
myfree(relative_path);
}
dict_proxy = (DICT_PROXY *)
dict_alloc(DICT_TYPE_PROXY, map, sizeof(*dict_proxy));
dict_proxy->dict.lookup = dict_proxy_lookup;
dict_proxy->dict.update = dict_proxy_update;
dict_proxy->dict.delete = dict_proxy_delete;
dict_proxy->dict.sequence = dict_proxy_sequence;
dict_proxy->dict.close = dict_proxy_close;
dict_proxy->inst_flags = (dict_flags & DICT_FLAG_INST_MASK);
dict_proxy->reskey = vstring_alloc(10);
dict_proxy->result = vstring_alloc(10);
dict_proxy->clnt = *pstream;
dict_proxy->service = service;
for (;;) {
stream = clnt_stream_access(dict_proxy->clnt);
errno = 0;
if (attr_print(stream, ATTR_FLAG_NONE,
SEND_ATTR_STR(MAIL_ATTR_REQ, PROXY_REQ_OPEN),
SEND_ATTR_STR(MAIL_ATTR_TABLE, dict_proxy->dict.name),
SEND_ATTR_INT(MAIL_ATTR_FLAGS, dict_proxy->inst_flags),
ATTR_TYPE_END) != 0
|| vstream_fflush(stream)
|| attr_scan(stream, ATTR_FLAG_STRICT,
RECV_ATTR_INT(MAIL_ATTR_STATUS, &status),
RECV_ATTR_INT(MAIL_ATTR_FLAGS, &server_flags),
ATTR_TYPE_END) != 2) {
if (msg_verbose || (errno != EPIPE && errno != ENOENT))
msg_warn("%s: service %s: %m", VSTREAM_PATH(stream), myname);
} else {
if (msg_verbose)
msg_info("%s: connect to map=%s status=%d server_flags=%s",
myname, dict_proxy->dict.name, status,
dict_flags_str(server_flags));
switch (status) {
case PROXY_STAT_BAD:
msg_fatal("%s open failed for table \"%s\": invalid request",
dict_proxy->service, dict_proxy->dict.name);
case PROXY_STAT_DENY:
msg_fatal("%s service is not configured for table \"%s\"",
dict_proxy->service, dict_proxy->dict.name);
case PROXY_STAT_OK:
dict_proxy->dict.flags = (dict_flags & ~DICT_FLAG_IMPL_MASK)
| (server_flags & DICT_FLAG_IMPL_MASK);
return (DICT_DEBUG (&dict_proxy->dict));
default:
msg_warn("%s open failed for table \"%s\": unexpected status %d",
dict_proxy->service, dict_proxy->dict.name, status);
}
}
clnt_stream_recover(dict_proxy->clnt);
sleep(1);
}
}