#include <sys_defs.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <ctype.h>
#include <stdarg.h>
#ifdef STRCASECMP_IN_STRINGS_H
#include <strings.h>
#endif
#include <msg.h>
#include <mymalloc.h>
#include <vstring.h>
#include <vstream.h>
#include <netstring.h>
#include <dict.h>
#include <mail_params.h>
#include <record.h>
#include <rec_type.h>
#include <mail_proto.h>
#include <cleanup_user.h>
#include <mail_date.h>
#include <mail_conf.h>
#include <debug_peer.h>
#include <mail_stream.h>
#include <namadr_list.h>
#include <quote_822_local.h>
#include <match_parent_style.h>
#include <lex_822.h>
#include <verp_sender.h>
#include <mail_server.h>
#include <qmqpd.h>
int var_qmqpd_timeout;
int var_qmqpd_err_sleep;
char *var_always_bcc;
char *var_filter_xport;
char *var_qmqpd_clients;
#define STR(x) vstring_str(x)
#define LEN(x) VSTRING_LEN(x)
#define DO_LOG 1
#define DONT_LOG 0
static NAMADR_LIST *qmqpd_clients;
static void qmqpd_open_file(QMQPD_STATE *state)
{
state->dest = mail_stream_service(MAIL_CLASS_PUBLIC, var_cleanup_service);
if (state->dest == 0
|| attr_print(state->dest->stream, ATTR_FLAG_NONE,
ATTR_TYPE_NUM, MAIL_ATTR_FLAGS, CLEANUP_FLAG_FILTER,
ATTR_TYPE_END) != 0)
msg_fatal("unable to connect to the %s %s service",
MAIL_CLASS_PUBLIC, var_cleanup_service);
state->cleanup = state->dest->stream;
state->queue_id = mystrdup(state->dest->id);
msg_info("%s: client=%s", state->queue_id, state->namaddr);
rec_fprintf(state->cleanup, REC_TYPE_TIME, "%ld", (long) state->time);
if (*var_filter_xport)
rec_fprintf(state->cleanup, REC_TYPE_FILT, "%s", var_filter_xport);
}
static void qmqpd_read_content(QMQPD_STATE *state)
{
state->where = "receiving message content";
netstring_get(state->client, state->message, var_message_limit);
}
static void qmqpd_copy_sender(QMQPD_STATE *state)
{
char *end_prefix;
char *end_origin;
int verp_requested;
static char verp_delims[] = "-=";
state->where = "receiving sender address";
netstring_get(state->client, state->buf, var_line_limit);
VSTRING_TERMINATE(state->buf);
verp_requested =
((end_origin = vstring_end(state->buf) - 4) > STR(state->buf)
&& strcmp(end_origin, "-@[]") == 0
&& (end_prefix = strchr(STR(state->buf), '@')) != 0
&& --end_prefix < end_origin - 2
&& end_prefix > STR(state->buf));
if (verp_requested) {
verp_delims[0] = end_prefix[0];
if (verp_delims_verify(verp_delims) != 0) {
state->err |= CLEANUP_STAT_CONT;
vstring_sprintf(state->why_rejected, "Invalid VERP delimiters: \"%s\". Need two characters from \"%s\"",
verp_delims, var_verp_filter);
}
memmove(end_prefix, end_prefix + 1, end_origin - end_prefix - 1);
vstring_truncate(state->buf, end_origin - STR(state->buf) - 1);
}
if (state->err == CLEANUP_STAT_OK
&& REC_PUT_BUF(state->cleanup, REC_TYPE_FROM, state->buf) < 0)
state->err = CLEANUP_STAT_WRITE;
if (verp_requested)
if (state->err == CLEANUP_STAT_OK
&& rec_put(state->cleanup, REC_TYPE_VERP, verp_delims, 2) < 0)
state->err = CLEANUP_STAT_WRITE;
state->sender = mystrndup(STR(state->buf), LEN(state->buf));
}
static void qmqpd_write_attributes(QMQPD_STATE *state)
{
rec_fprintf(state->cleanup, REC_TYPE_ATTR, "%s=%s",
MAIL_ATTR_CLIENT_NAME, state->name);
rec_fprintf(state->cleanup, REC_TYPE_ATTR, "%s=%s",
MAIL_ATTR_CLIENT_ADDR, state->addr);
rec_fprintf(state->cleanup, REC_TYPE_ATTR, "%s=%s",
MAIL_ATTR_ORIGIN, state->namaddr);
rec_fprintf(state->cleanup, REC_TYPE_ATTR, "%s=%s",
MAIL_ATTR_PROTO_NAME, state->protocol);
}
static void qmqpd_copy_recipients(QMQPD_STATE *state)
{
int ch;
state->where = "receiving recipient address";
while ((ch = VSTREAM_GETC(state->client)) != ',') {
vstream_ungetc(state->client, ch);
netstring_get(state->client, state->buf, var_line_limit);
if (state->err == CLEANUP_STAT_OK
&& REC_PUT_BUF(state->cleanup, REC_TYPE_RCPT, state->buf) < 0)
state->err = CLEANUP_STAT_WRITE;
state->rcpt_count++;
if (state->recipient == 0)
state->recipient = mystrndup(STR(state->buf), LEN(state->buf));
}
if (state->err == CLEANUP_STAT_OK
&& *var_always_bcc
&& rec_fputs(state->cleanup, REC_TYPE_RCPT, var_always_bcc) < 0)
state->err = CLEANUP_STAT_WRITE;
}
static int qmqpd_next_line(VSTRING *message, char **start, int *len,
char **next)
{
char *beyond = STR(message) + LEN(message);
char *enough = *next + var_line_limit;
char *cp;
#define UCHARPTR(x) ((unsigned char *) (x))
for (cp = *start = *next; ; cp++) {
if (cp >= beyond)
return ((*len = (*next = cp) - *start) > 0 ? UCHARPTR(cp)[-1] : -1);
if (*cp == '\n')
return ((*len = cp - *start), (*next = cp + 1), '\n');
if (cp >= enough)
return ((*len = cp - *start), (*next = cp), UCHARPTR(cp)[-1]);
}
}
static void qmqpd_write_content(QMQPD_STATE *state)
{
char *start;
char *next;
int len;
int rec_type;
int first = 1;
int ch;
rec_fputs(state->cleanup, REC_TYPE_MESG, "");
rec_fprintf(state->cleanup, REC_TYPE_NORM, "Received: from %s (%s [%s])",
state->name, state->name, state->addr);
if (state->rcpt_count == 1 && state->recipient) {
rec_fprintf(state->cleanup, REC_TYPE_NORM,
"\tby %s (%s) with %s id %s",
var_myhostname, var_mail_name,
state->protocol, state->queue_id);
quote_822_local(state->buf, state->recipient);
rec_fprintf(state->cleanup, REC_TYPE_NORM,
"\tfor <%s>; %s", STR(state->buf), mail_date(state->time));
} else {
rec_fprintf(state->cleanup, REC_TYPE_NORM,
"\tby %s (%s) with %s",
var_myhostname, var_mail_name, state->protocol);
rec_fprintf(state->cleanup, REC_TYPE_NORM,
"\tid %s; %s", state->queue_id, mail_date(state->time));
}
#ifdef RECEIVED_ENVELOPE_FROM
quote_822_local(state->buf, state->sender);
rec_fprintf(state->cleanup, REC_TYPE_NORM,
"\t(envelope-from <%s>)", STR(state->buf));
#endif
for (next = STR(state->message); ; ) {
if ((ch = qmqpd_next_line(state->message, &start, &len, &next)) < 0)
break;
if (ch == '\n')
rec_type = REC_TYPE_NORM;
else
rec_type = REC_TYPE_CONT;
if (first) {
if (strncmp(start + strspn(start, ">"), "From ", 5) == 0) {
rec_fprintf(state->cleanup, rec_type,
"X-Mailbox-Line: %*s", len, start);
continue;
}
first = 0;
if (len > 0 && IS_SPACE_TAB(start[0]))
rec_put(state->cleanup, REC_TYPE_NORM, "", 0);
}
if (rec_put(state->cleanup, rec_type, start, len) < 0) {
state->err = CLEANUP_STAT_WRITE;
return;
}
}
}
static void qmqpd_close_file(QMQPD_STATE *state)
{
if (state->err == CLEANUP_STAT_OK)
if (rec_fputs(state->cleanup, REC_TYPE_XTRA, "") < 0
|| rec_fputs(state->cleanup, REC_TYPE_END, "") < 0
|| vstream_fflush(state->cleanup))
state->err = CLEANUP_STAT_WRITE;
if (state->err == 0)
state->err = mail_stream_finish(state->dest, state->why_rejected);
else
mail_stream_cleanup(state->dest);
state->dest = 0;
}
static void qmqpd_reply(QMQPD_STATE *state, int log_message,
int status_code, const char *fmt,...)
{
va_list ap;
if (status_code == QMQPD_STAT_HARD && var_soft_bounce)
status_code = QMQPD_STAT_RETRY;
VSTRING_RESET(state->buf);
VSTRING_ADDCH(state->buf, status_code);
va_start(ap, fmt);
vstring_vsprintf_append(state->buf, fmt, ap);
va_end(ap);
NETSTRING_PUT_BUF(state->client, state->buf);
if (log_message)
(status_code == QMQPD_STAT_OK ? msg_info : msg_warn) ("%s: %s: %s",
state->queue_id, state->namaddr, STR(state->buf) + 1);
if (status_code != QMQPD_STAT_OK)
sleep(var_qmqpd_err_sleep);
netstring_fflush(state->client);
}
static void qmqpd_send_status(QMQPD_STATE *state)
{
state->where = "sending completion status";
if (state->err == CLEANUP_STAT_OK) {
qmqpd_reply(state, DONT_LOG, QMQPD_STAT_OK,
"Ok: queued as %s", state->queue_id);
} else if ((state->err & CLEANUP_STAT_BAD) != 0) {
qmqpd_reply(state, DO_LOG, QMQPD_STAT_RETRY,
"Error: internal error %d", state->err);
} else if ((state->err & CLEANUP_STAT_SIZE) != 0) {
qmqpd_reply(state, DO_LOG, QMQPD_STAT_HARD,
"Error: message too large");
} else if ((state->err & CLEANUP_STAT_HOPS) != 0) {
qmqpd_reply(state, DO_LOG, QMQPD_STAT_HARD,
"Error: too many hops");
} else if ((state->err & CLEANUP_STAT_CONT) != 0) {
qmqpd_reply(state, DO_LOG, QMQPD_STAT_HARD,
"Error: %s", STR(state->why_rejected));
} else if ((state->err & CLEANUP_STAT_WRITE) != 0) {
qmqpd_reply(state, DO_LOG, QMQPD_STAT_RETRY,
"Error: queue file write error");
} else if ((state->err & CLEANUP_STAT_RCPT) != 0) {
qmqpd_reply(state, DO_LOG, QMQPD_STAT_HARD,
"Error: no recipients specified");
} else {
qmqpd_reply(state, DO_LOG, QMQPD_STAT_RETRY,
"Error: internal error %d", state->err);
}
}
static void qmqpd_receive(QMQPD_STATE *state)
{
qmqpd_open_file(state);
state->where = "receiving QMQP packet header";
(void) netstring_get_length(state->client);
qmqpd_read_content(state);
qmqpd_copy_sender(state);
qmqpd_write_attributes(state);
qmqpd_copy_recipients(state);
if (state->err == 0)
qmqpd_write_content(state);
qmqpd_close_file(state);
qmqpd_send_status(state);
}
static void qmqpd_proto(QMQPD_STATE *state)
{
int status;
netstring_setup(state->client, var_qmqpd_timeout);
switch (status = vstream_setjmp(state->client)) {
default:
msg_panic("qmqpd_proto: unknown status %d", status);
case NETSTRING_ERR_EOF:
state->reason = "lost connection";
break;
case NETSTRING_ERR_TIME:
state->reason = "read/write timeout";
break;
case NETSTRING_ERR_FORMAT:
state->reason = "netstring format error";
if (vstream_setjmp(state->client) == 0)
if (state->reason && state->where)
qmqpd_reply(state, DONT_LOG, QMQPD_STAT_HARD, "%s while %s",
state->reason, state->where);
break;
case NETSTRING_ERR_SIZE:
state->reason = "netstring length exceeds storage limit";
if (vstream_setjmp(state->client) == 0)
if (state->reason && state->where)
qmqpd_reply(state, DONT_LOG, QMQPD_STAT_HARD, "%s while %s",
state->reason, state->where);
break;
case 0:
if (namadr_list_match(qmqpd_clients, state->name, state->addr) == 0) {
qmqpd_reply(state, DONT_LOG, QMQPD_STAT_HARD,
"Error: %s is not authorized to use this service",
state->namaddr);
} else
qmqpd_receive(state);
break;
}
if (state->reason && state->where)
msg_info("%s: %s: %s while %s",
state->queue_id, state->namaddr, state->reason, state->where);
}
static void qmqpd_service(VSTREAM *stream, char *unused_service, char **argv)
{
QMQPD_STATE *state;
if (argv[0])
msg_fatal("unexpected command-line argument: %s", argv[0]);
state = qmqpd_state_alloc(stream);
debug_peer_check(state->name, state->addr);
msg_info("connect from %s", state->namaddr);
qmqpd_proto(state);
msg_info("disconnect from %s", state->namaddr);
debug_peer_restore();
qmqpd_state_free(state);
}
static void pre_accept(char *unused_name, char **unused_argv)
{
if (dict_changed()) {
msg_info("lookup table has changed -- exiting");
exit(0);
}
}
static void pre_jail_init(char *unused_name, char **unused_argv)
{
debug_peer_init();
qmqpd_clients =
namadr_list_init(match_parent_style(VAR_QMQPD_CLIENTS),
var_qmqpd_clients);
}
int main(int argc, char **argv)
{
static CONFIG_TIME_TABLE time_table[] = {
VAR_QMTPD_TMOUT, DEF_QMTPD_TMOUT, &var_qmqpd_timeout, 1, 0,
VAR_QMTPD_ERR_SLEEP, DEF_QMTPD_ERR_SLEEP, &var_qmqpd_err_sleep, 0, 0,
0,
};
static CONFIG_STR_TABLE str_table[] = {
VAR_ALWAYS_BCC, DEF_ALWAYS_BCC, &var_always_bcc, 0, 0,
VAR_FILTER_XPORT, DEF_FILTER_XPORT, &var_filter_xport, 0, 0,
VAR_QMQPD_CLIENTS, DEF_QMQPD_CLIENTS, &var_qmqpd_clients, 0, 0,
0,
};
single_server_main(argc, argv, qmqpd_service,
MAIL_SERVER_TIME_TABLE, time_table,
MAIL_SERVER_STR_TABLE, str_table,
MAIL_SERVER_PRE_INIT, pre_jail_init,
MAIL_SERVER_PRE_ACCEPT, pre_accept,
0);
}