#include <sys_defs.h>
#include <time.h>
#include <string.h>
#include <msg.h>
#include <vstring.h>
#include <vstream.h>
#include <vstring_vstream.h>
#include <events.h>
#include <iostuff.h>
#include <stringops.h>
#include <mymalloc.h>
#include <mail_queue.h>
#include <mail_proto.h>
#include <recipient_list.h>
#include <mail_params.h>
#include <deliver_request.h>
#include <verp_sender.h>
#include <dsn_util.h>
#include <dsn_buf.h>
#include <dsb_scan.h>
#include <rcpt_print.h>
#include <smtputf8.h>
#include "qmgr.h"
int qmgr_deliver_concurrency;
#define DELIVER_STAT_OK 0
#define DELIVER_STAT_DEFER 1
#define DELIVER_STAT_CRASH 2
static int qmgr_deliver_initial_reply(VSTREAM *stream)
{
int stat;
if (peekfd(vstream_fileno(stream)) < 0) {
msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else if (attr_scan(stream, ATTR_FLAG_STRICT,
RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
ATTR_TYPE_END) != 1) {
msg_warn("%s: malformed response", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else {
return (stat ? DELIVER_STAT_DEFER : 0);
}
}
static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
{
int stat;
if (peekfd(vstream_fileno(stream)) < 0) {
msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else if (attr_scan(stream, ATTR_FLAG_STRICT,
RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
ATTR_TYPE_END) != 2) {
msg_warn("%s: malformed response", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else {
return (stat ? DELIVER_STAT_DEFER : 0);
}
}
static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
{
RECIPIENT_LIST list = entry->rcpt_list;
RECIPIENT *recipient;
QMGR_MESSAGE *message = entry->message;
VSTRING *sender_buf = 0;
MSG_STATS stats;
char *sender;
int flags;
int smtputf8 = message->smtputf8;
const char *addr;
for (recipient = list.info; recipient < list.info + list.len; recipient++)
if (var_smtputf8_enable && (addr = recipient->address)[0]
&& !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
if (message->verp_delims)
smtputf8 |= SMTPUTF8_FLAG_SENDER;
}
if (message->verp_delims == 0) {
sender = message->sender;
} else {
sender_buf = vstring_alloc(100);
verp_sender(sender_buf, message->verp_delims,
message->sender, list.info);
sender = vstring_str(sender_buf);
}
flags = message->tflags
| entry->queue->dflags
| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
(void) QMGR_MSG_STATS(&stats, message);
attr_print(stream, ATTR_FLAG_NONE,
SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
SEND_ATTR_FUNC(msg_stats_print, (void *) &stats),
SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
ATTR_TYPE_END);
if (sender_buf != 0)
vstring_free(sender_buf);
for (recipient = list.info; recipient < list.info + list.len; recipient++)
attr_print(stream, ATTR_FLAG_NONE,
SEND_ATTR_FUNC(rcpt_print, (void *) recipient),
ATTR_TYPE_END);
if (vstream_fflush(stream) != 0) {
msg_warn("write to process (%s): %m", entry->queue->transport->name);
return (-1);
} else {
if (msg_verbose)
msg_info("qmgr_deliver: site `%s'", entry->queue->name);
return (0);
}
}
static void qmgr_deliver_abort(int unused_event, void *context)
{
QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
QMGR_QUEUE *queue = entry->queue;
QMGR_TRANSPORT *transport = queue->transport;
QMGR_MESSAGE *message = entry->message;
msg_fatal("%s: timeout receiving delivery status from transport: %s",
message->queue_id, transport->name);
}
static void qmgr_deliver_update(int unused_event, void *context)
{
QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
QMGR_QUEUE *queue = entry->queue;
QMGR_TRANSPORT *transport = queue->transport;
QMGR_MESSAGE *message = entry->message;
static DSN_BUF *dsb;
int status;
#define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
event_disable_readwrite(vstream_fileno(entry->stream)); \
(void) vstream_fclose(entry->stream); \
entry->stream = 0; \
qmgr_deliver_concurrency--; \
} while (0)
if (dsb == 0)
dsb = dsb_create();
event_cancel_timer(qmgr_deliver_abort, context);
status = qmgr_deliver_final_reply(entry->stream, dsb);
if (status == DELIVER_STAT_CRASH) {
message->flags |= DELIVER_STAT_DEFER;
#if 0
whatsup = concatenate("unknown ", transport->name,
" mail transport error", (char *) 0);
qmgr_transport_throttle(transport,
DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
myfree(whatsup);
#else
qmgr_transport_throttle(transport,
DSN_SIMPLE(&dsb->dsn, "4.3.0",
"unknown mail transport error"));
#endif
msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
transport->name);
QMGR_DELIVER_RELEASE_AGENT(entry);
qmgr_entry_unselect(queue, entry);
qmgr_defer_transport(transport, &dsb->dsn);
return;
}
#define SUSPENDED "delivery temporarily suspended: "
if (status == DELIVER_STAT_DEFER) {
message->flags |= DELIVER_STAT_DEFER;
if (VSTRING_LEN(dsb->status)) {
if (!dsn_valid(vstring_str(dsb->status)))
vstring_strcpy(dsb->status, "4.0.0");
if (VSTRING_LEN(dsb->reason) == 0)
vstring_strcpy(dsb->reason, "unknown error");
vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
if (QMGR_QUEUE_READY(queue)) {
qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
if (QMGR_QUEUE_THROTTLED(queue))
qmgr_defer_todo(queue, &dsb->dsn);
}
}
}
if (status != DELIVER_STAT_CRASH) {
qmgr_transport_unthrottle(transport);
if (VSTRING_LEN(dsb->reason) == 0)
qmgr_queue_unthrottle(queue);
}
QMGR_DELIVER_RELEASE_AGENT(entry);
qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
}
void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
{
QMGR_QUEUE *queue;
QMGR_ENTRY *entry;
DSN dsn;
if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
#if 0
whatsup = concatenate(transport->name,
" mail transport unavailable", (char *) 0);
qmgr_transport_throttle(transport,
DSN_SIMPLE(&dsn, "4.3.0", whatsup));
myfree(whatsup);
#else
qmgr_transport_throttle(transport,
DSN_SIMPLE(&dsn, "4.3.0",
"mail transport unavailable"));
#endif
qmgr_defer_transport(transport, &dsn);
if (stream)
(void) vstream_fclose(stream);
return;
}
if ((queue = qmgr_queue_select(transport)) == 0
|| (entry = qmgr_entry_select(queue)) == 0) {
(void) vstream_fclose(stream);
return;
}
if (qmgr_deliver_send_request(entry, stream) < 0) {
qmgr_entry_unselect(queue, entry);
#if 0
whatsup = concatenate(transport->name,
" mail transport unavailable", (char *) 0);
qmgr_transport_throttle(transport,
DSN_SIMPLE(&dsn, "4.3.0", whatsup));
myfree(whatsup);
#else
qmgr_transport_throttle(transport,
DSN_SIMPLE(&dsn, "4.3.0",
"mail transport unavailable"));
#endif
qmgr_defer_transport(transport, &dsn);
(void) vstream_fclose(stream);
return;
}
qmgr_deliver_concurrency++;
entry->stream = stream;
event_enable_read(vstream_fileno(stream),
qmgr_deliver_update, (void *) entry);
event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
}