#include <sys_defs.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#ifdef STRCASECMP_IN_STRINGS_H
#include <strings.h>
#endif
#include <msg.h>
#include <mymalloc.h>
#include <vstring.h>
#include <vstream.h>
#include <split_at.h>
#include <valid_hostname.h>
#include <argv.h>
#include <stringops.h>
#include <myflock.h>
#include <sane_time.h>
#include <dict.h>
#include <mail_queue.h>
#include <mail_params.h>
#include <canon_addr.h>
#include <record.h>
#include <rec_type.h>
#include <sent.h>
#include <deliver_completed.h>
#include <opened.h>
#include <verp_sender.h>
#include <mail_proto.h>
#include <qmgr_user.h>
#include <split_addr.h>
#include <dsn_mask.h>
#include <rec_attr_map.h>
#include <rewrite_clnt.h>
#include <resolve_clnt.h>
#include "qmgr.h"
int qmgr_message_count;
int qmgr_recipient_count;
static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
const char *queue_id, int qflags)
{
QMGR_MESSAGE *message;
message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE));
qmgr_message_count++;
message->flags = 0;
message->qflags = qflags;
message->tflags = 0;
message->tflags_offset = 0;
message->rflags = QMGR_READ_FLAG_DEFAULT;
message->fp = 0;
message->refcount = 0;
message->single_rcpt = 0;
message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0;
message->create_time = 0;
GETTIMEOFDAY(&message->active_time);
message->queued_time = sane_time();
message->refill_time = 0;
message->data_offset = 0;
message->queue_id = mystrdup(queue_id);
message->queue_name = mystrdup(queue_name);
message->encoding = 0;
message->sender = 0;
message->dsn_envid = 0;
message->dsn_ret = 0;
message->filter_xport = 0;
message->inspect_xport = 0;
message->redirect_addr = 0;
message->data_size = 0;
message->cont_length = 0;
message->warn_offset = 0;
message->warn_time = 0;
message->rcpt_offset = 0;
message->verp_delims = 0;
message->client_name = 0;
message->client_addr = 0;
message->client_port = 0;
message->client_proto = 0;
message->client_helo = 0;
message->sasl_method = 0;
message->sasl_username = 0;
message->sasl_sender = 0;
message->rewrite_context = 0;
recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
message->rcpt_count = 0;
message->rcpt_limit = var_qmgr_msg_rcpt_limit;
message->rcpt_unread = 0;
QMGR_LIST_INIT(message->job_list);
return (message);
}
static void qmgr_message_close(QMGR_MESSAGE *message)
{
vstream_fclose(message->fp);
message->fp = 0;
}
static int qmgr_message_open(QMGR_MESSAGE *message)
{
if (message->fp)
msg_panic("%s: queue file is open", message->queue_id);
if ((message->fp = mail_queue_open(message->queue_name,
message->queue_id,
O_RDWR, 0)) == 0) {
if (errno != ENOENT)
msg_fatal("open %s %s: %m", message->queue_name, message->queue_id);
msg_warn("open %s %s: %m", message->queue_name, message->queue_id);
return (-1);
}
return (0);
}
static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message)
{
VSTRING *buf;
long orig_offset, extra_offset;
int rec_type;
char *start;
buf = vstring_alloc(100);
if ((orig_offset = vstream_ftell(message->fp)) < 0)
msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
if (vstream_fseek(message->fp, 0, SEEK_SET) < 0)
msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
message->rcpt_unread = 0;
for (;;) {
rec_type = rec_get(message->fp, buf, 0);
if (rec_type <= 0)
break;
start = vstring_str(buf);
if (msg_verbose > 1)
msg_info("old-style scan record %c %s", rec_type, start);
if (rec_type == REC_TYPE_END)
break;
if (rec_type == REC_TYPE_DONE
|| rec_type == REC_TYPE_RCPT
|| rec_type == REC_TYPE_DRCP) {
message->rcpt_unread++;
continue;
}
if (rec_type == REC_TYPE_MESG) {
if (message->data_offset == 0) {
if ((message->data_offset = vstream_ftell(message->fp)) < 0)
msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
if ((extra_offset = atol(start)) <= message->data_offset)
msg_fatal("bad extra offset %s file %s",
start, VSTREAM_PATH(message->fp));
if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0)
msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
message->data_size = extra_offset - message->data_offset;
}
continue;
}
}
if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0)
msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
vstring_free(buf);
if (message->data_offset == 0 || rec_type != REC_TYPE_END)
msg_fatal("%s: envelope records out of order", message->queue_id);
}
static int qmgr_message_read(QMGR_MESSAGE *message)
{
VSTRING *buf;
int rec_type;
long curr_offset;
long save_offset = message->rcpt_offset;
int save_unread = message->rcpt_unread;
char *start;
int recipient_limit;
const char *error_text;
char *name;
char *value;
char *orig_rcpt = 0;
int count;
int dsn_notify = 0;
char *dsn_orcpt = 0;
int n;
int have_log_client_attr = 0;
buf = vstring_alloc(100);
if (message->rcpt_offset) {
if (message->rcpt_list.len)
msg_panic("%s: recipient list not empty on recipient reload",
message->queue_id);
if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0)
msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
message->rcpt_offset = 0;
recipient_limit = message->rcpt_limit - message->rcpt_count;
} else {
recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count;
if (recipient_limit < message->rcpt_limit)
recipient_limit = message->rcpt_limit;
}
if (recipient_limit > 5000)
recipient_limit = 5000;
if (recipient_limit <= 0)
msg_panic("%s: no recipient slots available", message->queue_id);
if (msg_verbose)
msg_info("%s: recipient limit %d", message->queue_id, recipient_limit);
for (;;) {
if ((curr_offset = vstream_ftell(message->fp)) < 0)
msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
if (curr_offset == message->data_offset && curr_offset > 0) {
if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0)
msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
curr_offset += message->data_size;
}
rec_type = rec_get(message->fp, buf, 0);
start = vstring_str(buf);
if (msg_verbose > 1)
msg_info("record %c %s", rec_type, start);
if (rec_type <= 0) {
msg_warn("%s: message rejected: missing end record",
message->queue_id);
break;
}
if (rec_type == REC_TYPE_END) {
message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
break;
}
if (rec_type == REC_TYPE_ATTR) {
if ((error_text = split_nameval(start, &name, &value)) != 0) {
msg_warn("%s: ignoring bad attribute: %s: %.200s",
message->queue_id, error_text, start);
rec_type = REC_TYPE_ERROR;
break;
}
if ((n = rec_attr_map(name)) != 0) {
start = value;
rec_type = n;
}
}
if (rec_type == REC_TYPE_RCPT) {
if (message->rcpt_offset == 0) {
message->rcpt_unread--;
recipient_list_add(&message->rcpt_list, curr_offset,
dsn_orcpt ? dsn_orcpt : "",
dsn_notify ? dsn_notify : 0,
orig_rcpt ? orig_rcpt : "", start);
if (dsn_orcpt) {
myfree(dsn_orcpt);
dsn_orcpt = 0;
}
if (orig_rcpt) {
myfree(orig_rcpt);
orig_rcpt = 0;
}
if (dsn_notify)
dsn_notify = 0;
if (message->rcpt_list.len >= recipient_limit) {
if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0)
msg_fatal("vstream_ftell %s: %m",
VSTREAM_PATH(message->fp));
if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
break;
if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER)
continue;
if (curr_offset > message->data_offset) {
message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
break;
}
if (vstream_fseek(message->fp, message->data_offset
+ message->data_size, SEEK_SET) < 0)
msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
continue;
}
}
continue;
}
if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) {
if (message->rcpt_offset == 0) {
message->rcpt_unread--;
if (dsn_orcpt) {
myfree(dsn_orcpt);
dsn_orcpt = 0;
}
if (orig_rcpt) {
myfree(orig_rcpt);
orig_rcpt = 0;
}
if (dsn_notify)
dsn_notify = 0;
}
continue;
}
if (rec_type == REC_TYPE_DSN_ORCPT) {
if (dsn_orcpt != 0) {
msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>",
message->queue_id, dsn_orcpt);
myfree(dsn_orcpt);
dsn_orcpt = 0;
}
if (message->rcpt_offset == 0)
dsn_orcpt = mystrdup(start);
continue;
}
if (rec_type == REC_TYPE_DSN_NOTIFY) {
if (dsn_notify != 0) {
msg_warn("%s: ignoring out-of-order DSN notify flags <%d>",
message->queue_id, dsn_notify);
dsn_notify = 0;
}
if (message->rcpt_offset == 0) {
if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n))
msg_warn("%s: ignoring malformed DSN notify flags <%.200s>",
message->queue_id, start);
else
dsn_notify = n;
continue;
}
}
if (rec_type == REC_TYPE_ORCP) {
if (orig_rcpt != 0) {
msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
message->queue_id, orig_rcpt);
myfree(orig_rcpt);
orig_rcpt = 0;
}
if (message->rcpt_offset == 0)
orig_rcpt = mystrdup(start);
continue;
}
if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
continue;
if (rec_type == REC_TYPE_SIZE) {
if (message->data_offset == 0) {
if ((count = sscanf(start, "%ld %ld %d %d %ld",
&message->data_size, &message->data_offset,
&message->rcpt_unread, &message->rflags,
&message->cont_length)) >= 3) {
if (message->data_offset <= 0 || message->data_size <= 0) {
msg_warn("%s: invalid size record: %.100s",
message->queue_id, start);
rec_type = REC_TYPE_ERROR;
break;
}
if (message->rflags & ~QMGR_READ_FLAG_USER) {
msg_warn("%s: invalid flags in size record: %.100s",
message->queue_id, start);
rec_type = REC_TYPE_ERROR;
break;
}
} else if (count == 1) {
qmgr_message_oldstyle_scan(message);
} else {
msg_warn("%s: message rejected: weird size record",
message->queue_id);
rec_type = REC_TYPE_ERROR;
break;
}
}
if (message->cont_length == 0) {
message->cont_length = message->data_size;
} else if (message->cont_length < 0) {
msg_warn("%s: invalid size record: %.100s",
message->queue_id, start);
rec_type = REC_TYPE_ERROR;
break;
}
continue;
}
if (rec_type == REC_TYPE_TIME) {
if (message->arrival_time.tv_sec == 0)
REC_TYPE_TIME_SCAN(start, message->arrival_time);
continue;
}
if (rec_type == REC_TYPE_CTIME) {
if (message->create_time == 0)
message->create_time = atol(start);
continue;
}
if (rec_type == REC_TYPE_FILT) {
if (message->filter_xport != 0)
myfree(message->filter_xport);
message->filter_xport = mystrdup(start);
continue;
}
if (rec_type == REC_TYPE_INSP) {
if (message->inspect_xport != 0)
myfree(message->inspect_xport);
message->inspect_xport = mystrdup(start);
continue;
}
if (rec_type == REC_TYPE_RDR) {
if (message->redirect_addr != 0)
myfree(message->redirect_addr);
message->redirect_addr = mystrdup(start);
continue;
}
if (rec_type == REC_TYPE_FROM) {
if (message->sender == 0) {
message->sender = mystrdup(start);
opened(message->queue_id, message->sender,
message->cont_length, message->rcpt_unread,
"queue %s", message->queue_name);
}
continue;
}
if (rec_type == REC_TYPE_DSN_ENVID) {
if (message->dsn_envid == 0)
message->dsn_envid = mystrdup(start);
}
if (rec_type == REC_TYPE_DSN_RET) {
if (message->dsn_ret == 0) {
if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n))
msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s",
message->queue_id, start);
else
message->dsn_ret = n;
}
}
if (rec_type == REC_TYPE_ATTR) {
if (strcmp(name, MAIL_ATTR_ENCODING) == 0) {
if (message->encoding != 0)
myfree(message->encoding);
message->encoding = mystrdup(value);
}
else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) {
if (have_log_client_attr == 0 && message->client_name == 0)
message->client_name = mystrdup(value);
} else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) {
if (have_log_client_attr == 0 && message->client_addr == 0)
message->client_addr = mystrdup(value);
} else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) {
if (have_log_client_attr == 0 && message->client_proto == 0)
message->client_proto = mystrdup(value);
} else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) {
if (have_log_client_attr == 0 && message->client_helo == 0)
message->client_helo = mystrdup(value);
}
else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) {
if (message->client_name != 0)
myfree(message->client_name);
message->client_name = mystrdup(value);
have_log_client_attr = 1;
} else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) {
if (message->client_addr != 0)
myfree(message->client_addr);
message->client_addr = mystrdup(value);
have_log_client_attr = 1;
} else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) {
if (message->client_port != 0)
myfree(message->client_port);
message->client_port = mystrdup(value);
have_log_client_attr = 1;
} else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) {
if (message->client_proto != 0)
myfree(message->client_proto);
message->client_proto = mystrdup(value);
have_log_client_attr = 1;
} else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) {
if (message->client_helo != 0)
myfree(message->client_helo);
message->client_helo = mystrdup(value);
have_log_client_attr = 1;
} else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) {
if (message->sasl_method == 0)
message->sasl_method = mystrdup(value);
else
msg_warn("%s: ignoring multiple %s attribute: %s",
message->queue_id, MAIL_ATTR_SASL_METHOD, value);
} else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) {
if (message->sasl_username == 0)
message->sasl_username = mystrdup(value);
else
msg_warn("%s: ignoring multiple %s attribute: %s",
message->queue_id, MAIL_ATTR_SASL_USERNAME, value);
} else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) {
if (message->sasl_sender == 0)
message->sasl_sender = mystrdup(value);
else
msg_warn("%s: ignoring multiple %s attribute: %s",
message->queue_id, MAIL_ATTR_SASL_SENDER, value);
} else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) {
if (message->rewrite_context == 0)
message->rewrite_context = mystrdup(value);
else
msg_warn("%s: ignoring multiple %s attribute: %s",
message->queue_id, MAIL_ATTR_RWR_CONTEXT, value);
}
else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) {
message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value));
if (message->tflags == DEL_REQ_FLAG_RECORD)
message->tflags_offset = curr_offset;
else
message->tflags_offset = 0;
}
continue;
}
if (rec_type == REC_TYPE_WARN) {
if (message->warn_offset == 0) {
message->warn_offset = curr_offset;
REC_TYPE_WARN_SCAN(start, message->warn_time);
}
continue;
}
if (rec_type == REC_TYPE_VERP) {
if (message->verp_delims == 0) {
if (message->sender == 0 || message->sender[0] == 0) {
msg_warn("%s: ignoring VERP request for null sender",
message->queue_id);
} else if (verp_delims_verify(start) != 0) {
msg_warn("%s: ignoring bad VERP request: \"%.100s\"",
message->queue_id, start);
} else {
message->single_rcpt = 1;
message->verp_delims = mystrdup(start);
}
}
continue;
}
}
if (dsn_orcpt != 0) {
if (rec_type > 0)
msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>",
message->queue_id, dsn_orcpt);
myfree(dsn_orcpt);
}
if (orig_rcpt != 0) {
if (rec_type > 0)
msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
message->queue_id, orig_rcpt);
myfree(orig_rcpt);
}
message->refill_time = sane_time();
if (message->dsn_envid == 0)
message->dsn_envid = mystrdup("");
if (message->encoding == 0)
message->encoding = mystrdup(MAIL_ATTR_ENC_NONE);
if (message->client_name == 0)
message->client_name = mystrdup("");
if (message->client_addr == 0)
message->client_addr = mystrdup("");
if (message->client_port == 0)
message->client_port = mystrdup("");
if (message->client_proto == 0)
message->client_proto = mystrdup("");
if (message->client_helo == 0)
message->client_helo = mystrdup("");
if (message->sasl_method == 0)
message->sasl_method = mystrdup("");
if (message->sasl_username == 0)
message->sasl_username = mystrdup("");
if (message->sasl_sender == 0)
message->sasl_sender = mystrdup("");
if (message->rewrite_context == 0)
message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL);
if (message->create_time == 0)
message->create_time = message->arrival_time.tv_sec;
vstring_free(buf);
if (message->rcpt_unread < 0
|| (message->rcpt_offset == 0 && message->rcpt_unread != 0)) {
msg_warn("%s: rcpt count mismatch (%d)",
message->queue_id, message->rcpt_unread);
message->rcpt_unread = 0;
}
if (rec_type <= 0) {
} else if (message->arrival_time.tv_sec == 0) {
msg_warn("%s: message rejected: missing arrival time record",
message->queue_id);
} else if (message->sender == 0) {
msg_warn("%s: message rejected: missing sender record",
message->queue_id);
} else if (message->data_offset == 0) {
msg_warn("%s: message rejected: missing size record",
message->queue_id);
} else {
return (0);
}
message->rcpt_offset = save_offset;
message->rcpt_unread = save_unread;
recipient_list_free(&message->rcpt_list);
recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
return (-1);
}
void qmgr_message_update_warn(QMGR_MESSAGE *message)
{
if (qmgr_message_open(message)
|| vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0
|| rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT,
REC_TYPE_WARN_ARG(0)) < 0
|| vstream_fflush(message->fp))
msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
qmgr_message_close(message);
}
void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset)
{
if (offset <= 0)
msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset);
if (qmgr_message_open(message)
|| rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0
|| vstream_fflush(message->fp))
msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
qmgr_message_close(message);
}
static int qmgr_message_sort_compare(const void *p1, const void *p2)
{
RECIPIENT *rcpt1 = (RECIPIENT *) p1;
RECIPIENT *rcpt2 = (RECIPIENT *) p2;
QMGR_QUEUE *queue1;
QMGR_QUEUE *queue2;
char *at1;
char *at2;
int result;
queue1 = rcpt1->u.queue;
queue2 = rcpt2->u.queue;
if (queue1 != 0 && queue2 == 0)
return (-1);
if (queue1 == 0 && queue2 != 0)
return (1);
if (queue1 != 0 && queue2 != 0) {
if ((result = strcmp(queue1->transport->name,
queue2->transport->name)) != 0)
return (result);
if ((result = strcmp(queue1->name, queue2->name)) != 0)
return (result);
}
at1 = strrchr(rcpt1->address, '@');
at2 = strrchr(rcpt2->address, '@');
if (at1 == 0 && at2 != 0)
return (1);
if (at1 != 0 && at2 == 0)
return (-1);
if (at1 != 0 && at2 != 0
&& (result = strcasecmp(at1, at2)) != 0)
return (result);
return (strcmp(rcpt1->address, rcpt2->address));
}
static void qmgr_message_sort(QMGR_MESSAGE *message)
{
qsort((char *) message->rcpt_list.info, message->rcpt_list.len,
sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare);
if (msg_verbose) {
RECIPIENT_LIST list = message->rcpt_list;
RECIPIENT *rcpt;
msg_info("start sorted recipient list");
for (rcpt = list.info; rcpt < list.info + list.len; rcpt++)
msg_info("qmgr_message_sort: %s", rcpt->address);
msg_info("end sorted recipient list");
}
}
static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient,
const char *addr, RESOLVE_REPLY *reply)
{
#define QMGR_REDIRECT(rp, tp, np) do { \
(rp)->flags = 0; \
vstring_strcpy((rp)->transport, (tp)); \
vstring_strcpy((rp)->nexthop, (np)); \
} while (0)
if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0)
resolve_clnt_query_from(message->sender, addr, reply);
else
resolve_clnt_verify_from(message->sender, addr, reply);
if (reply->flags & RESOLVE_FLAG_FAIL) {
QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY,
"4.3.0 address resolver failure");
return (0);
} else if (reply->flags & RESOLVE_FLAG_ERROR) {
QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR,
"5.1.3 bad address syntax");
return (0);
} else {
return (0);
}
}
static void qmgr_message_resolve(QMGR_MESSAGE *message)
{
static ARGV *defer_xport_argv;
RECIPIENT_LIST list = message->rcpt_list;
RECIPIENT *recipient;
QMGR_TRANSPORT *transport = 0;
QMGR_QUEUE *queue = 0;
RESOLVE_REPLY reply;
VSTRING *queue_name;
char *at;
char **cpp;
char *nexthop;
ssize_t len;
int status;
DSN dsn;
MSG_STATS stats;
DSN *saved_dsn;
#define STREQ(x,y) (strcmp(x,y) == 0)
#define STR vstring_str
#define LEN VSTRING_LEN
resolve_clnt_init(&reply);
queue_name = vstring_alloc(1);
for (recipient = list.info; recipient < list.info + list.len; recipient++) {
if (message->redirect_addr) {
if (recipient > list.info) {
recipient->u.queue = 0;
continue;
}
message->rcpt_offset = 0;
message->rcpt_unread = 0;
rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr,
reply.recipient);
RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
if (qmgr_resolve_one(message, recipient,
recipient->address, &reply) < 0)
continue;
if (!STREQ(recipient->address, STR(reply.recipient)))
RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
}
else if (message->filter_xport
&& (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) {
reply.flags = 0;
vstring_strcpy(reply.transport, message->filter_xport);
if ((nexthop = split_at(STR(reply.transport), ':')) == 0
|| *nexthop == 0)
nexthop = var_myhostname;
vstring_strcpy(reply.nexthop, nexthop);
vstring_strcpy(reply.recipient, recipient->address);
}
else {
if (qmgr_resolve_one(message, recipient,
recipient->address, &reply) < 0)
continue;
if (!STREQ(recipient->address, STR(reply.recipient)))
RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
}
if (recipient->address[0] == 0) {
QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR,
"5.1.3 null recipient address");
}
if (reply.flags & RESOLVE_CLASS_LOCAL) {
at = strrchr(STR(reply.recipient), '@');
len = (at ? (at - STR(reply.recipient))
: strlen(STR(reply.recipient)));
if (strncasecmp(STR(reply.recipient), var_double_bounce_sender,
len) == 0
&& !var_double_bounce_sender[len]) {
status = sent(message->tflags, message->queue_id,
QMGR_MSG_STATS(&stats, message), recipient,
"none", DSN_SIMPLE(&dsn, "2.0.0",
"undeliverable postmaster notification discarded"));
if (status == 0) {
deliver_completed(message->fp, recipient->offset);
#if 0
msg_warn("%s: undeliverable postmaster notification discarded",
message->queue_id);
#endif
} else
message->flags |= status;
continue;
}
}
if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) {
if (defer_xport_argv == 0)
defer_xport_argv = argv_split(var_defer_xports, " \t\r\n,");
for (cpp = defer_xport_argv->argv; *cpp; cpp++)
if (strcmp(*cpp, STR(reply.transport)) == 0)
break;
if (*cpp) {
QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY,
"4.3.2 deferred transport");
}
}
if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) {
if ((transport = qmgr_transport_find(STR(reply.transport))) == 0)
transport = qmgr_transport_create(STR(reply.transport));
queue = 0;
}
if ((message->qflags & QMGR_FLUSH_EACH) != 0
&& QMGR_TRANSPORT_THROTTLED(transport))
qmgr_transport_unthrottle(transport);
if (QMGR_TRANSPORT_THROTTLED(transport)) {
saved_dsn = transport->dsn;
if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) {
nexthop = qmgr_error_nexthop(saved_dsn);
vstring_strcpy(reply.nexthop, nexthop);
myfree(nexthop);
queue = 0;
} else {
qmgr_defer_recipient(message, recipient, saved_dsn);
continue;
}
}
vstring_strcpy(queue_name, STR(reply.nexthop));
if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0
&& strcmp(transport->name, MAIL_SERVICE_RETRY) != 0
&& transport->recipient_limit == 1) {
at = strrchr(STR(reply.recipient), '@');
len = (at ? (at - STR(reply.recipient))
: strlen(STR(reply.recipient)));
vstring_strncpy(queue_name, STR(reply.recipient), len);
if (*var_rcpt_delim && split_addr(STR(queue_name), *var_rcpt_delim))
vstring_truncate(queue_name, strlen(STR(queue_name)));
vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop));
}
lowercase(STR(queue_name));
if (queue == 0 || !STREQ(queue->name, STR(queue_name))) {
if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0)
queue = qmgr_queue_create(transport, STR(queue_name),
STR(reply.nexthop));
}
if ((message->qflags & QMGR_FLUSH_EACH) != 0
&& QMGR_QUEUE_THROTTLED(queue))
qmgr_queue_unthrottle(queue);
if (QMGR_QUEUE_THROTTLED(queue)) {
saved_dsn = queue->dsn;
if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) {
qmgr_defer_recipient(message, recipient, saved_dsn);
continue;
}
}
recipient->u.queue = queue;
}
resolve_clnt_free(&reply);
vstring_free(queue_name);
}
static void qmgr_message_assign(QMGR_MESSAGE *message)
{
RECIPIENT_LIST list = message->rcpt_list;
RECIPIENT *recipient;
QMGR_ENTRY *entry = 0;
QMGR_QUEUE *queue;
QMGR_JOB *job = 0;
QMGR_PEER *peer = 0;
#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit)))
for (recipient = list.info; recipient < list.info + list.len; recipient++) {
if ((queue = recipient->u.queue) == 0)
continue;
if (job == 0 || queue->transport != job->transport) {
job = qmgr_job_obtain(message, queue->transport);
peer = 0;
}
if (peer == 0 || queue != peer->queue)
peer = qmgr_peer_obtain(job, queue);
entry = peer->entry_list.prev;
if (message->single_rcpt || entry == 0
|| !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len))
entry = qmgr_entry_create(peer, message);
recipient_list_add(&entry->rcpt_list, recipient->offset,
recipient->dsn_orcpt, recipient->dsn_notify,
recipient->orig_addr, recipient->address);
job->rcpt_count++;
message->rcpt_count++;
qmgr_recipient_count++;
}
recipient_list_free(&message->rcpt_list);
recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
for (job = message->job_list.next; job; job = job->message_peers.next)
if (job->selected_entries < job->read_entries
&& job->blocker_tag != job->transport->blocker_tag)
job->transport->candidate_cache_current = 0;
}
static void qmgr_message_move_limits(QMGR_MESSAGE *message)
{
QMGR_JOB *job;
for (job = message->job_list.next; job; job = job->message_peers.next)
qmgr_job_move_limits(job);
}
void qmgr_message_free(QMGR_MESSAGE *message)
{
QMGR_JOB *job;
if (message->refcount != 0)
msg_panic("qmgr_message_free: reference len: %d", message->refcount);
if (message->fp)
msg_panic("qmgr_message_free: queue file is open");
while ((job = message->job_list.next) != 0)
qmgr_job_free(job);
myfree(message->queue_id);
myfree(message->queue_name);
if (message->dsn_envid)
myfree(message->dsn_envid);
if (message->encoding)
myfree(message->encoding);
if (message->sender)
myfree(message->sender);
if (message->verp_delims)
myfree(message->verp_delims);
if (message->filter_xport)
myfree(message->filter_xport);
if (message->inspect_xport)
myfree(message->inspect_xport);
if (message->redirect_addr)
myfree(message->redirect_addr);
if (message->client_name)
myfree(message->client_name);
if (message->client_addr)
myfree(message->client_addr);
if (message->client_port)
myfree(message->client_port);
if (message->client_proto)
myfree(message->client_proto);
if (message->client_helo)
myfree(message->client_helo);
if (message->sasl_method)
myfree(message->sasl_method);
if (message->sasl_username)
myfree(message->sasl_username);
if (message->sasl_sender)
myfree(message->sasl_sender);
if (message->rewrite_context)
myfree(message->rewrite_context);
recipient_list_free(&message->rcpt_list);
qmgr_message_count--;
myfree((char *) message);
}
QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id,
int qflags, mode_t mode)
{
const char *myname = "qmgr_message_alloc";
QMGR_MESSAGE *message;
if (msg_verbose)
msg_info("%s: %s %s", myname, queue_name, queue_id);
message = qmgr_message_create(queue_name, queue_id, qflags);
#define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT)
if (qmgr_message_open(message) < 0) {
qmgr_message_free(message);
return (0);
}
if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) {
msg_info("%s: skipped, still being delivered", queue_id);
qmgr_message_close(message);
qmgr_message_free(message);
return (QMGR_MESSAGE_LOCKED);
}
if (qmgr_message_read(message) < 0) {
qmgr_message_close(message);
qmgr_message_free(message);
return (0);
} else {
if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0)
msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp));
if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT)
msg_fatal("%s: %s: remove %s %s: %m", myname,
queue_id, MAIL_QUEUE_DEFER, queue_id);
qmgr_message_sort(message);
qmgr_message_resolve(message);
qmgr_message_sort(message);
qmgr_message_assign(message);
qmgr_message_close(message);
if (message->rcpt_offset == 0)
qmgr_message_move_limits(message);
return (message);
}
}
QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message)
{
const char *myname = "qmgr_message_realloc";
if (message->rcpt_offset <= 0)
msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset);
if (msg_verbose)
msg_info("%s: %s %s offset %ld", myname, message->queue_name,
message->queue_id, message->rcpt_offset);
if (qmgr_message_open(message) < 0)
return (0);
if (qmgr_message_read(message) < 0) {
qmgr_message_close(message);
return (0);
} else {
qmgr_message_sort(message);
qmgr_message_resolve(message);
qmgr_message_sort(message);
qmgr_message_assign(message);
qmgr_message_close(message);
if (message->rcpt_offset == 0)
qmgr_message_move_limits(message);
return (message);
}
}