#include <sys/time.h>
#include <time.h>
#include <vstream.h>
#include <scan_dir.h>
#include <recipient_list.h>
#include <dsn.h>
typedef struct QMGR_TRANSPORT QMGR_TRANSPORT;
typedef struct QMGR_QUEUE QMGR_QUEUE;
typedef struct QMGR_ENTRY QMGR_ENTRY;
typedef struct QMGR_MESSAGE QMGR_MESSAGE;
typedef struct QMGR_JOB QMGR_JOB;
typedef struct QMGR_PEER QMGR_PEER;
typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST;
typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST;
typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST;
typedef struct QMGR_JOB_LIST QMGR_JOB_LIST;
typedef struct QMGR_PEER_LIST QMGR_PEER_LIST;
typedef struct QMGR_SCAN QMGR_SCAN;
#define QMGR_LIST_ROTATE(head, object, peers) { \
head.next->peers.prev = head.prev; \
head.prev->peers.next = head.next; \
head.next = object->peers.next; \
head.next->peers.prev = 0; \
head.prev = object; \
object->peers.next = 0; \
}
#define QMGR_LIST_UNLINK(head, type, object, peers) { \
type _next = object->peers.next; \
type _prev = object->peers.prev; \
if (_prev) _prev->peers.next = _next; \
else head.next = _next; \
if (_next) _next->peers.prev = _prev; \
else head.prev = _prev; \
object->peers.next = object->peers.prev = 0; \
}
#define QMGR_LIST_LINK(head, pred, object, succ, peers) { \
object->peers.prev = pred; \
object->peers.next = succ; \
if (pred) pred->peers.next = object; \
else head.next = object; \
if (succ) succ->peers.prev = object; \
else head.prev = object; \
}
#define QMGR_LIST_PREPEND(head, object, peers) { \
object->peers.next = head.next; \
object->peers.prev = 0; \
if (head.next) { \
head.next->peers.prev = object; \
} else { \
head.prev = object; \
} \
head.next = object; \
}
#define QMGR_LIST_APPEND(head, object, peers) { \
object->peers.prev = head.prev; \
object->peers.next = 0; \
if (head.prev) { \
head.prev->peers.next = object; \
} else { \
head.next = object; \
} \
head.prev = object; \
}
#define QMGR_LIST_INIT(head) { \
head.prev = 0; \
head.next = 0; \
}
struct QMGR_TRANSPORT_LIST {
QMGR_TRANSPORT *next;
QMGR_TRANSPORT *prev;
};
extern struct HTABLE *qmgr_transport_byname;
extern QMGR_TRANSPORT_LIST qmgr_transport_list;
struct QMGR_QUEUE_LIST {
QMGR_QUEUE *next;
QMGR_QUEUE *prev;
};
struct QMGR_JOB_LIST {
QMGR_JOB *next;
QMGR_JOB *prev;
};
struct QMGR_TRANSPORT {
int flags;
int pending;
char *name;
int dest_concurrency_limit;
int init_dest_concurrency;
int recipient_limit;
int rcpt_per_stack;
int rcpt_unused;
int refill_limit;
int refill_delay;
int slot_cost;
int slot_loan;
int slot_loan_factor;
int min_slots;
struct HTABLE *queue_byname;
QMGR_QUEUE_LIST queue_list;
struct HTABLE *job_byname;
QMGR_JOB_LIST job_list;
QMGR_JOB_LIST job_bytime;
QMGR_JOB *job_current;
QMGR_JOB *job_next_unread;
QMGR_JOB *candidate_cache;
QMGR_JOB *candidate_cache_current;
time_t candidate_cache_time;
int blocker_tag;
QMGR_TRANSPORT_LIST peers;
DSN *dsn;
};
#define QMGR_TRANSPORT_STAT_DEAD (1<<1)
typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
extern QMGR_TRANSPORT *qmgr_transport_select(void);
extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY);
extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *);
extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
struct QMGR_ENTRY_LIST {
QMGR_ENTRY *next;
QMGR_ENTRY *prev;
};
struct QMGR_QUEUE {
int dflags;
time_t last_done;
char *name;
char *nexthop;
int todo_refcount;
int busy_refcount;
int window;
QMGR_TRANSPORT *transport;
QMGR_ENTRY_LIST todo;
QMGR_ENTRY_LIST busy;
QMGR_QUEUE_LIST peers;
DSN *dsn;
time_t clog_time_to_warn;
int blocker_tag;
};
#define QMGR_QUEUE_TODO 1
#define QMGR_QUEUE_BUSY 2
extern int qmgr_queue_count;
extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *);
extern void qmgr_queue_done(QMGR_QUEUE *);
extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *);
extern void qmgr_queue_unthrottle(QMGR_QUEUE *);
extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *);
#define QMGR_QUEUE_THROTTLED(q) ((q)->window <= 0)
struct QMGR_ENTRY {
VSTREAM *stream;
QMGR_MESSAGE *message;
RECIPIENT_LIST rcpt_list;
QMGR_QUEUE *queue;
QMGR_PEER *peer;
QMGR_ENTRY_LIST queue_peers;
QMGR_ENTRY_LIST peer_peers;
};
extern QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *);
extern void qmgr_entry_unselect(QMGR_ENTRY *);
extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *);
extern void qmgr_entry_done(QMGR_ENTRY *, int);
extern QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *, QMGR_MESSAGE *);
struct QMGR_MESSAGE {
int flags;
int qflags;
int tflags;
long tflags_offset;
int rflags;
VSTREAM *fp;
int refcount;
int single_rcpt;
struct timeval arrival_time;
time_t create_time;
struct timeval active_time;
time_t queued_time;
time_t refill_time;
long warn_offset;
time_t warn_time;
long data_offset;
char *queue_name;
char *queue_id;
char *encoding;
char *sender;
char *dsn_envid;
int dsn_ret;
char *verp_delims;
char *filter_xport;
char *inspect_xport;
char *redirect_addr;
long data_size;
long cont_length;
long rcpt_offset;
char *client_name;
char *client_addr;
char *client_proto;
char *client_helo;
char *sasl_method;
char *sasl_username;
char *sasl_sender;
char *rewrite_context;
RECIPIENT_LIST rcpt_list;
int rcpt_count;
int rcpt_limit;
int rcpt_unread;
QMGR_JOB_LIST job_list;
};
#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT (1<<16)
#define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1)
extern int qmgr_message_count;
extern int qmgr_recipient_count;
extern void qmgr_message_free(QMGR_MESSAGE *);
extern void qmgr_message_update_warn(QMGR_MESSAGE *);
extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t);
extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
#define QMGR_MSG_STATS(stats, message) \
MSG_STATS_INIT2(stats, \
incoming_arrival, message->arrival_time, \
active_arrival, message->active_time)
struct QMGR_PEER_LIST {
QMGR_PEER *next;
QMGR_PEER *prev;
};
struct QMGR_JOB {
QMGR_MESSAGE *message;
QMGR_TRANSPORT *transport;
QMGR_JOB_LIST message_peers;
QMGR_JOB_LIST transport_peers;
QMGR_JOB_LIST time_peers;
QMGR_JOB *stack_parent;
QMGR_JOB_LIST stack_children;
QMGR_JOB_LIST stack_siblings;
int stack_level;
int blocker_tag;
struct HTABLE *peer_byname;
QMGR_PEER_LIST peer_list;
int slots_used;
int slots_available;
int selected_entries;
int read_entries;
int rcpt_count;
int rcpt_limit;
};
struct QMGR_PEER {
QMGR_JOB *job;
QMGR_QUEUE *queue;
int refcount;
QMGR_ENTRY_LIST entry_list;
QMGR_PEER_LIST peers;
};
extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *);
extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *);
extern QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *, QMGR_TRANSPORT *);
extern void qmgr_job_free(QMGR_JOB *);
extern void qmgr_job_move_limits(QMGR_JOB *);
extern QMGR_PEER *qmgr_peer_create(QMGR_JOB *, QMGR_QUEUE *);
extern QMGR_PEER *qmgr_peer_find(QMGR_JOB *, QMGR_QUEUE *);
extern QMGR_PEER *qmgr_peer_obtain(QMGR_JOB *, QMGR_QUEUE *);
extern void qmgr_peer_free(QMGR_PEER *);
extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *);
extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *);
extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);
extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);
extern int qmgr_deliver_concurrency;
extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *);
extern int qmgr_active_feed(QMGR_SCAN *, const char *);
extern void qmgr_active_drain(void);
extern void qmgr_active_done(QMGR_MESSAGE *);
extern void qmgr_move(const char *, const char *, time_t);
extern void qmgr_enable_all(void);
extern void qmgr_enable_transport(QMGR_TRANSPORT *);
extern void qmgr_enable_queue(QMGR_QUEUE *);
struct QMGR_SCAN {
char *queue;
int flags;
int nflags;
struct SCAN_DIR *handle;
};
#define QMGR_SCAN_START (1<<0)
#define QMGR_SCAN_ALL (1<<1)
#define QMGR_FLUSH_ONCE (1<<2)
#define QMGR_FLUSH_DFXP (1<<3)
#define QMGR_FLUSH_EACH (1<<4)
extern QMGR_SCAN *qmgr_scan_create(const char *);
extern void qmgr_scan_request(QMGR_SCAN *, int);
extern char *qmgr_scan_next(QMGR_SCAN *);
extern QMGR_TRANSPORT *qmgr_error_transport(const char *);
extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *);
extern char *qmgr_error_nexthop(DSN *);