#include <sys_defs.h>
#include <time.h>
#include <string.h>
#include <msg.h>
#include <vstring.h>
#include <vstream.h>
#include <vstring_vstream.h>
#include <events.h>
#include <iostuff.h>
#include <mail_queue.h>
#include <mail_proto.h>
#include <recipient_list.h>
#include <mail_params.h>
#include <deliver_request.h>
#include <verp_sender.h>
#include "qmgr.h"
int qmgr_deliver_concurrency;
#define DELIVER_STAT_OK 0
#define DELIVER_STAT_DEFER 1
#define DELIVER_STAT_CRASH 2
static int qmgr_deliver_initial_reply(VSTREAM *stream)
{
int stat;
if (peekfd(vstream_fileno(stream)) < 0) {
msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else if (attr_scan(stream, ATTR_FLAG_STRICT,
ATTR_TYPE_NUM, MAIL_ATTR_STATUS, &stat,
ATTR_TYPE_END) != 1) {
msg_warn("%s: malformed response", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else {
return (stat ? DELIVER_STAT_DEFER : 0);
}
}
static int qmgr_deliver_final_reply(VSTREAM *stream, VSTRING *reason)
{
int stat;
if (peekfd(vstream_fileno(stream)) < 0) {
msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else if (attr_scan(stream, ATTR_FLAG_STRICT,
ATTR_TYPE_STR, MAIL_ATTR_WHY, reason,
ATTR_TYPE_NUM, MAIL_ATTR_STATUS, &stat,
ATTR_TYPE_END) != 2) {
msg_warn("%s: malformed response", VSTREAM_PATH(stream));
return (DELIVER_STAT_CRASH);
} else {
return (stat ? DELIVER_STAT_DEFER : 0);
}
}
static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
{
QMGR_RCPT_LIST list = entry->rcpt_list;
QMGR_RCPT *recipient;
QMGR_MESSAGE *message = entry->message;
VSTRING *sender_buf = 0;
char *sender;
int flags;
if (message->verp_delims == 0) {
sender = message->sender;
} else {
sender_buf = vstring_alloc(100);
verp_sender(sender_buf, message->verp_delims,
message->sender, list.info->address);
sender = vstring_str(sender_buf);
}
flags = message->tflags
| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
attr_print(stream, ATTR_FLAG_MORE,
ATTR_TYPE_NUM, MAIL_ATTR_FLAGS, flags,
ATTR_TYPE_STR, MAIL_ATTR_QUEUE, message->queue_name,
ATTR_TYPE_STR, MAIL_ATTR_QUEUEID, message->queue_id,
ATTR_TYPE_LONG, MAIL_ATTR_OFFSET, message->data_offset,
ATTR_TYPE_LONG, MAIL_ATTR_SIZE, message->data_size,
ATTR_TYPE_STR, MAIL_ATTR_NEXTHOP, entry->queue->nexthop,
ATTR_TYPE_STR, MAIL_ATTR_ENCODING, message->encoding,
ATTR_TYPE_STR, MAIL_ATTR_SENDER, sender,
ATTR_TYPE_STR, MAIL_ATTR_ERRTO, message->errors_to,
ATTR_TYPE_STR, MAIL_ATTR_RRCPT, message->return_receipt,
ATTR_TYPE_LONG, MAIL_ATTR_TIME, message->arrival_time,
ATTR_TYPE_STR, MAIL_ATTR_CLIENT_NAME, message->client_name,
ATTR_TYPE_STR, MAIL_ATTR_CLIENT_ADDR, message->client_addr,
ATTR_TYPE_STR, MAIL_ATTR_PROTO_NAME, message->client_proto,
ATTR_TYPE_STR, MAIL_ATTR_HELO_NAME, message->client_helo,
ATTR_TYPE_END);
if (sender_buf != 0)
vstring_free(sender_buf);
for (recipient = list.info; recipient < list.info + list.len; recipient++)
attr_print(stream, ATTR_FLAG_MORE,
ATTR_TYPE_LONG, MAIL_ATTR_OFFSET, recipient->offset,
ATTR_TYPE_STR, MAIL_ATTR_ORCPT, recipient->orig_rcpt,
ATTR_TYPE_STR, MAIL_ATTR_RECIP, recipient->address,
ATTR_TYPE_END);
attr_print(stream, ATTR_FLAG_NONE,
ATTR_TYPE_NUM, MAIL_ATTR_OFFSET, 0,
ATTR_TYPE_END);
if (vstream_fflush(stream) != 0) {
msg_warn("write to process (%s): %m", entry->queue->transport->name);
return (-1);
} else {
if (msg_verbose)
msg_info("qmgr_deliver: site `%s'", entry->queue->name);
return (0);
}
}
static void qmgr_deliver_abort(int unused_event, char *context)
{
QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
QMGR_QUEUE *queue = entry->queue;
QMGR_TRANSPORT *transport = queue->transport;
QMGR_MESSAGE *message = entry->message;
msg_fatal("%s: timeout receiving delivery status from transport: %s",
message->queue_id, transport->name);
}
static void qmgr_deliver_update(int unused_event, char *context)
{
QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
QMGR_QUEUE *queue = entry->queue;
QMGR_TRANSPORT *transport = queue->transport;
QMGR_MESSAGE *message = entry->message;
VSTRING *reason = vstring_alloc(1);
int status;
event_cancel_timer(qmgr_deliver_abort, context);
status = qmgr_deliver_final_reply(entry->stream, reason);
if (status == DELIVER_STAT_CRASH) {
message->flags |= DELIVER_STAT_DEFER;
qmgr_transport_throttle(transport, "unknown mail transport error");
msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
transport->name);
qmgr_defer_transport(transport, transport->reason);
}
if (status == DELIVER_STAT_DEFER) {
message->flags |= DELIVER_STAT_DEFER;
if (VSTRING_LEN(reason)) {
qmgr_queue_throttle(queue, vstring_str(reason));
if (queue->window == 0)
qmgr_defer_todo(queue, queue->reason);
}
}
if (VSTRING_LEN(reason) == 0) {
qmgr_transport_unthrottle(transport);
qmgr_queue_unthrottle(queue);
}
event_disable_readwrite(vstream_fileno(entry->stream));
if (vstream_fclose(entry->stream) != 0)
msg_warn("qmgr_deliver_update: close delivery stream: %m");
entry->stream = 0;
qmgr_deliver_concurrency--;
qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
vstring_free(reason);
}
void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
{
QMGR_QUEUE *queue;
QMGR_ENTRY *entry;
if (qmgr_deliver_initial_reply(stream) != 0) {
qmgr_transport_throttle(transport, "mail transport unavailable");
qmgr_defer_transport(transport, transport->reason);
(void) vstream_fclose(stream);
return;
}
if ((queue = qmgr_queue_select(transport)) == 0
|| (entry = qmgr_entry_select(queue)) == 0) {
(void) vstream_fclose(stream);
return;
}
if (qmgr_deliver_send_request(entry, stream) < 0) {
qmgr_entry_unselect(queue, entry);
qmgr_transport_throttle(transport, "mail transport unavailable");
qmgr_defer_transport(transport, transport->reason);
(void) vstream_fclose(stream);
return;
}
qmgr_deliver_concurrency++;
entry->stream = stream;
event_enable_read(vstream_fileno(stream),
qmgr_deliver_update, (char *) entry);
event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout);
}