#include <sys_defs.h>
#include <sys/stat.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <utime.h>
#include <errno.h>
#ifndef S_IRWXU
#define S_IRWXU 0700
#endif
#include <msg.h>
#include <events.h>
#include <mymalloc.h>
#include <vstream.h>
#include <mail_params.h>
#include <mail_open_ok.h>
#include <mail_queue.h>
#include <recipient_list.h>
#include <bounce.h>
#include <defer.h>
#include <trace.h>
#include <abounce.h>
#include <rec_type.h>
#include <qmgr_user.h>
#include "qmgr.h"
static void qmgr_active_done_2_bounce_flush(int, char *);
static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
static void qmgr_active_done_3_defer_flush(int, char *);
static void qmgr_active_done_3_defer_warn(int, char *);
static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
static void qmgr_active_corrupt(const char *queue_id)
{
const char *myname = "qmgr_active_corrupt";
if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
if (errno != ENOENT)
msg_fatal("%s: save corrupt file queue %s id %s: %m",
myname, MAIL_QUEUE_ACTIVE, queue_id);
msg_warn("%s: save corrupt file queue %s id %s: %m",
myname, MAIL_QUEUE_ACTIVE, queue_id);
} else {
msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
}
}
static void qmgr_active_defer(const char *queue_name, const char *queue_id,
const char *dest_queue, int delay)
{
const char *myname = "qmgr_active_defer";
const char *path;
struct utimbuf tbuf;
if (msg_verbose)
msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
tbuf.actime = tbuf.modtime = event_time() + delay;
path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
if (utime(path, &tbuf) < 0 && errno != ENOENT)
msg_fatal("%s: update %s time stamps: %m", myname, path);
if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
if (errno != ENOENT)
msg_fatal("%s: rename %s from %s to %s: %m", myname,
queue_id, queue_name, dest_queue);
msg_warn("%s: rename %s from %s to %s: %m", myname,
queue_id, queue_name, dest_queue);
} else if (msg_verbose) {
msg_info("%s: defer %s", myname, queue_id);
}
}
int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
{
const char *myname = "qmgr_active_feed";
QMGR_MESSAGE *message;
struct stat st;
const char *path;
if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
msg_panic("%s: bad queue %s", myname, scan_info->queue);
if (msg_verbose)
msg_info("%s: queue %s", myname, scan_info->queue);
if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
return (0);
if (msg_verbose)
msg_info("%s: %s", myname, path);
if ((scan_info->flags & QMGR_SCAN_ALL) == 0
&& st.st_mtime > time((time_t *) 0) + 1) {
if (msg_verbose)
msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
(long) (st.st_mtime - event_time()));
return (0);
}
if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
if (errno != ENOENT)
msg_fatal("%s: %s: rename from %s to %s: %m", myname,
queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
msg_warn("%s: %s: rename from %s to %s: %m", myname,
queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
return (0);
}
#define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
(st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
scan_info->flags | QMGR_FLUSH_AFTER :
scan_info->flags,
(st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE :
0)) == 0) {
qmgr_active_corrupt(queue_id);
return (0);
} else if (message == QMGR_MESSAGE_LOCKED) {
qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
return (0);
} else {
if (message->refcount == 0)
qmgr_active_done(message);
return (1);
}
}
void qmgr_active_done(QMGR_MESSAGE *message)
{
const char *myname = "qmgr_active_done";
struct stat st;
if (msg_verbose)
msg_info("%s: %s", myname, message->queue_id);
if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
if (st.st_size == 0) {
if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
msg_fatal("remove %s %s: %m",
MAIL_QUEUE_BOUNCE, message->queue_id);
} else {
if (msg_verbose)
msg_info("%s: bounce %s", myname, message->queue_id);
if (message->verp_delims == 0 || var_verp_bounce_off)
abounce_flush(BOUNCE_FLAG_KEEP,
message->queue_name,
message->queue_id,
message->encoding,
message->sender,
message->dsn_envid,
message->dsn_ret,
qmgr_active_done_2_bounce_flush,
(char *) message);
else
abounce_flush_verp(BOUNCE_FLAG_KEEP,
message->queue_name,
message->queue_id,
message->encoding,
message->sender,
message->dsn_envid,
message->dsn_ret,
message->verp_delims,
qmgr_active_done_2_bounce_flush,
(char *) message);
return;
}
}
qmgr_active_done_2_generic(message);
}
static void qmgr_active_done_2_bounce_flush(int status, char *context)
{
QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
message->flags |= status;
qmgr_active_done_2_generic(message);
}
static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
{
const char *myname = "qmgr_active_done_2_generic";
const char *path;
struct stat st;
int status;
if (message->flags
&& mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
qmgr_active_corrupt(message->queue_id);
qmgr_message_free(message);
return;
}
if (message->rcpt_offset > 0) {
if (qmgr_message_realloc(message) == 0) {
qmgr_active_corrupt(message->queue_id);
qmgr_message_free(message);
} else {
if (message->refcount == 0)
qmgr_active_done(message);
}
return;
}
if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD))
|| (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
status = trace_flush(message->tflags,
message->queue_name,
message->queue_id,
message->encoding,
message->sender,
message->dsn_envid,
message->dsn_ret);
if (status == 0 && message->tflags_offset)
qmgr_message_kill_record(message, message->tflags_offset);
message->flags |= status;
}
if (message->flags) {
if (event_time() >= message->create_time +
(*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
msg_info("%s: from=<%s>, status=expired, returned to sender",
message->queue_id, message->sender);
if (message->verp_delims == 0 || var_verp_bounce_off)
adefer_flush(BOUNCE_FLAG_KEEP,
message->queue_name,
message->queue_id,
message->encoding,
message->sender,
message->dsn_envid,
message->dsn_ret,
qmgr_active_done_3_defer_flush,
(char *) message);
else
adefer_flush_verp(BOUNCE_FLAG_KEEP,
message->queue_name,
message->queue_id,
message->encoding,
message->sender,
message->dsn_envid,
message->dsn_ret,
message->verp_delims,
qmgr_active_done_3_defer_flush,
(char *) message);
return;
} else if (message->warn_time > 0
&& event_time() >= message->warn_time - 1) {
if (msg_verbose)
msg_info("%s: sending defer warning for %s", myname, message->queue_id);
adefer_warn(BOUNCE_FLAG_KEEP,
message->queue_name,
message->queue_id,
message->encoding,
message->sender,
message->dsn_envid,
message->dsn_ret,
qmgr_active_done_3_defer_warn,
(char *) message);
return;
}
}
qmgr_active_done_3_generic(message);
}
static void qmgr_active_done_3_defer_warn(int status, char *context)
{
QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
if (status == 0)
qmgr_message_update_warn(message);
qmgr_active_done_3_generic(message);
}
static void qmgr_active_done_3_defer_flush(int status, char *context)
{
QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
message->flags = status;
qmgr_active_done_3_generic(message);
}
static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
{
const char *myname = "qmgr_active_done_3_generic";
int delay;
if (message->flags) {
if (message->create_time > 0) {
delay = event_time() - message->create_time;
if (delay > var_max_backoff_time)
delay = var_max_backoff_time;
if (delay < var_min_backoff_time)
delay = var_min_backoff_time;
} else {
delay = var_min_backoff_time;
}
qmgr_active_defer(message->queue_name, message->queue_id,
MAIL_QUEUE_DEFERRED, delay);
}
else {
if (mail_queue_remove(message->queue_name, message->queue_id)) {
if (errno != ENOENT)
msg_fatal("%s: remove %s from %s: %m", myname,
message->queue_id, message->queue_name);
msg_warn("%s: remove %s from %s: %m", myname,
message->queue_id, message->queue_name);
} else {
msg_info("%s: removed", message->queue_id);
}
}
qmgr_message_free(message);
}
void qmgr_active_drain(void)
{
QMGR_TRANSPORT *transport;
while ((transport = qmgr_transport_select()) != 0) {
if (msg_verbose)
msg_info("qmgr_active_drain: allocate %s", transport->name);
qmgr_transport_alloc(transport, qmgr_deliver);
}
}