director-connection.c [plain text]
#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "network.h"
#include "istream.h"
#include "ostream.h"
#include "str.h"
#include "llist.h"
#include "master-service.h"
#include "mail-host.h"
#include "director.h"
#include "director-host.h"
#include "director-request.h"
#include "user-directory.h"
#include "director-connection.h"
#include <stdlib.h>
#include <unistd.h>
#define DIRECTOR_VERSION_NAME "director"
#define DIRECTOR_VERSION_MAJOR 1
#define DIRECTOR_VERSION_MINOR 0
#define MAX_INBUF_SIZE 1024
#define MAX_OUTBUF_SIZE (1024*1024*10)
#define OUTBUF_FLUSH_THRESHOLD (1024*128)
#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000)
#define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000)
#define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 10
struct director_connection {
struct director_connection *prev, *next;
struct director *dir;
char *name;
time_t created;
struct director_host *host;
int fd;
struct io *io;
struct istream *input;
struct ostream *output;
struct timeout *to, *to_ping;
struct user_directory_iter *user_iter;
unsigned int in:1;
unsigned int connected:1;
unsigned int version_received:1;
unsigned int me_received:1;
unsigned int handshake_received:1;
unsigned int ignore_host_events:1;
unsigned int handshake_sending_hosts:1;
unsigned int ping_waiting:1;
unsigned int sync_ping:1;
};
static void director_connection_ping(struct director_connection *conn);
static void director_connection_disconnected(struct director_connection **conn);
static bool
director_args_parse_ip_port(struct director_connection *conn,
const char *const *args,
struct ip_addr *ip_r, unsigned int *port_r)
{
if (net_addr2ip(args[0], ip_r) < 0) {
i_error("director(%s): Command has invalid IP address: %s",
conn->name, args[0]);
return FALSE;
}
if (str_to_uint(args[1], port_r) < 0) {
i_error("director(%s): Command has invalid port: %s",
conn->name, args[1]);
return FALSE;
}
return TRUE;
}
static bool director_cmd_me(struct director_connection *conn,
const char *const *args)
{
struct director *dir = conn->dir;
struct director_host *host;
const char *connect_str;
struct ip_addr ip;
unsigned int port;
if (!director_args_parse_ip_port(conn, args, &ip, &port))
return FALSE;
if (!conn->in && (!net_ip_compare(&conn->host->ip, &ip) ||
conn->host->port != port)) {
i_error("Remote director thinks it's someone else "
"(connected to %s:%u, remote says it's %s:%u)",
net_ip2addr(&conn->host->ip), conn->host->port,
net_ip2addr(&ip), port);
return FALSE;
}
host = director_host_get(dir, &ip, port);
host->last_failed = 0;
conn->me_received = TRUE;
if (!conn->in)
return TRUE;
i_free(conn->name);
conn->name = i_strdup_printf("%s/left", host->name);
conn->host = host;
host->last_seq = 0;
connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
net_ip2addr(&host->ip), host->port);
if (host->self) {
i_error("Connection from self, dropping");
return FALSE;
} else if (dir->left == NULL) {
} else if (dir->left->host == host) {
i_warning("Dropping existing connection %s "
"in favor of its new connection %s",
dir->left->host->name, host->name);
director_connection_deinit(&dir->left);
} else {
if (director_host_cmp_to_self(dir->left->host, host,
dir->self_host) < 0) {
i_warning("Director connection %s tried to connect to "
"us, should use %s instead",
host->name, dir->left->host->name);
director_connection_send(conn, t_strdup_printf(
"CONNECT\t%s\t%u\n",
net_ip2addr(&dir->left->host->ip),
dir->left->host->port));
director_connection_ping(dir->left);
return FALSE;
}
i_warning("Replacing director connection %s with %s",
dir->left->host->name, host->name);
director_connection_send(dir->left, connect_str);
(void)o_stream_flush(dir->left->output);
director_connection_deinit(&dir->left);
}
dir->left = conn;
if (dir->right != NULL) {
if (dir->left->host != dir->right->host)
director_connection_send(dir->right, connect_str);
else {
}
} else {
(void)director_connect_host(dir, host);
}
return TRUE;
}
static bool
director_user_refresh(struct director *dir, unsigned int username_hash,
struct mail_host *host, time_t timestamp,
struct user **user_r)
{
struct user *user;
bool ret = FALSE;
user = user_directory_lookup(dir->users, username_hash);
if (user == NULL) {
*user_r = user_directory_add(dir->users, username_hash,
host, timestamp);
return TRUE;
}
if (timestamp == ioloop_time && (time_t)user->timestamp != timestamp) {
user_directory_refresh(dir->users, user);
ret = TRUE;
}
if (user->host != host) {
i_error("User hash %u is being redirected to two hosts: "
"%s and %s", username_hash,
net_ip2addr(&user->host->ip),
net_ip2addr(&host->ip));
if (net_ip_cmp(&user->host->ip, &host->ip) > 0) {
user->host->user_count--;
user->host = host;
user->host->user_count++;
}
ret = TRUE;
}
*user_r = user;
return ret;
}
static bool
director_handshake_cmd_user(struct director_connection *conn,
const char *const *args)
{
unsigned int username_hash, timestamp;
struct ip_addr ip;
struct mail_host *host;
struct user *user;
if (str_array_length(args) != 3 ||
str_to_uint(args[0], &username_hash) < 0 ||
net_addr2ip(args[1], &ip) < 0 ||
str_to_uint(args[2], ×tamp) < 0) {
i_error("director(%s): Invalid USER handshake args",
conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
i_error("director(%s): USER used unknown host %s in handshake",
conn->name, args[1]);
return FALSE;
}
director_user_refresh(conn->dir, username_hash, host, timestamp, &user);
return TRUE;
}
static bool
director_cmd_user(struct director_connection *conn, const char *const *args)
{
unsigned int username_hash;
struct ip_addr ip;
struct mail_host *host;
struct user *user;
if (str_array_length(args) != 2 ||
str_to_uint(args[0], &username_hash) < 0 ||
net_addr2ip(args[1], &ip) < 0) {
i_error("director(%s): Invalid USER args", conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
return TRUE;
}
if (director_user_refresh(conn->dir, username_hash,
host, ioloop_time, &user))
director_update_user(conn->dir, conn->host, user);
return TRUE;
}
static bool director_cmd_director(struct director_connection *conn,
const char *const *args)
{
struct director_host *host;
struct ip_addr ip;
unsigned int port;
if (!director_args_parse_ip_port(conn, args, &ip, &port))
return FALSE;
host = director_host_lookup(conn->dir, &ip, port);
if (host != NULL) {
host->last_failed = 0;
return TRUE;
}
director_host_add(conn->dir, &ip, port);
director_connection_send(conn->dir->right,
t_strdup_printf("DIRECTOR\t%s\t%u\n", net_ip2addr(&ip), port));
return TRUE;
}
static bool
director_cmd_host_hand_start(struct director_connection *conn,
const char *const *args)
{
const ARRAY_TYPE(mail_host) *hosts;
struct mail_host *const *hostp;
unsigned int remote_ring_completed;
if (args == NULL || str_to_uint(args[0], &remote_ring_completed) < 0) {
i_error("director(%s): Invalid HOST-HAND-START args",
conn->name);
return FALSE;
}
if (remote_ring_completed && !conn->dir->ring_handshaked) {
hosts = mail_hosts_get(conn->dir->mail_hosts);
while (array_count(hosts) > 0) {
hostp = array_idx(hosts, 0);
director_remove_host(conn->dir, NULL, NULL, *hostp);
}
} else if (!remote_ring_completed && conn->dir->ring_handshaked) {
conn->ignore_host_events = TRUE;
}
conn->handshake_sending_hosts = TRUE;
return TRUE;
}
static int
director_cmd_is_seen(struct director_connection *conn,
const char *const **_args,
struct director_host **host_r)
{
const char *const *args = *_args;
struct ip_addr ip;
unsigned int port, seq;
struct director_host *host;
if (str_array_length(args) < 3 ||
net_addr2ip(args[0], &ip) < 0 ||
str_to_uint(args[1], &port) < 0 ||
str_to_uint(args[2], &seq) < 0) {
i_error("director(%s): Command is missing parameters: %s",
conn->name, t_strarray_join(args, " "));
return -1;
}
*_args = args + 3;
host = director_host_lookup(conn->dir, &ip, port);
if (host == NULL) {
*host_r = NULL;
} else {
if (seq <= host->last_seq) {
return 1;
}
*host_r = host;
host->last_seq = seq;
}
return 0;
}
static bool
director_cmd_host_int(struct director_connection *conn, const char *const *args,
struct director_host *dir_host)
{
struct mail_host *host;
struct ip_addr ip;
unsigned int vhost_count;
bool update;
if (str_array_length(args) != 2 ||
net_addr2ip(args[0], &ip) < 0 ||
str_to_uint(args[1], &vhost_count) < 0) {
i_error("director(%s): Invalid HOST args", conn->name);
return FALSE;
}
if (conn->ignore_host_events) {
i_assert(conn->handshake_sending_hosts);
return TRUE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
host = mail_host_add_ip(conn->dir->mail_hosts, &ip);
update = TRUE;
} else {
update = host->vhost_count != vhost_count;
}
if (update) {
mail_host_set_vhost_count(conn->dir->mail_hosts,
host, vhost_count);
director_update_host(conn->dir, conn->host, dir_host, host);
}
return TRUE;
}
static bool
director_cmd_host_handshake(struct director_connection *conn,
const char *const *args)
{
return director_cmd_host_int(conn, args, NULL);
}
static bool
director_cmd_host(struct director_connection *conn, const char *const *args)
{
struct director_host *dir_host;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
return director_cmd_host_int(conn, args, dir_host);
}
static bool
director_cmd_host_remove(struct director_connection *conn,
const char *const *args)
{
struct director_host *dir_host;
struct mail_host *host;
struct ip_addr ip;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
if (str_array_length(args) != 1 ||
net_addr2ip(args[0], &ip) < 0) {
i_error("director(%s): Invalid HOST-REMOVE args", conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host != NULL)
director_remove_host(conn->dir, conn->host, dir_host, host);
return TRUE;
}
static bool
director_cmd_host_flush(struct director_connection *conn,
const char *const *args)
{
struct director_host *dir_host;
struct mail_host *host;
struct ip_addr ip;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
if (str_array_length(args) != 1 ||
net_addr2ip(args[0], &ip) < 0) {
i_error("director(%s): Invalid HOST-FLUSH args", conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host != NULL)
director_flush_host(conn->dir, conn->host, dir_host, host);
return TRUE;
}
static bool
director_cmd_user_move(struct director_connection *conn,
const char *const *args)
{
struct director_host *dir_host;
struct mail_host *host;
struct ip_addr ip;
unsigned int username_hash;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
if (str_array_length(args) != 2 ||
str_to_uint(args[0], &username_hash) < 0 ||
net_addr2ip(args[1], &ip) < 0) {
i_error("director(%s): Invalid USER-MOVE args", conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host != NULL) {
director_move_user(conn->dir, conn->host, dir_host,
username_hash, host);
}
return TRUE;
}
static bool
director_cmd_user_killed(struct director_connection *conn,
const char *const *args)
{
unsigned int username_hash;
if (str_array_length(args) != 1 ||
str_to_uint(args[0], &username_hash) < 0) {
i_error("director(%s): Invalid USER-KILLED args", conn->name);
return FALSE;
}
director_user_killed(conn->dir, username_hash);
return TRUE;
}
static bool
director_cmd_user_killed_everywhere(struct director_connection *conn,
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
if (str_array_length(args) != 1 ||
str_to_uint(args[0], &username_hash) < 0) {
i_error("director(%s): Invalid USER-KILLED-EVERYWHERE args",
conn->name);
return FALSE;
}
director_user_killed_everywhere(conn->dir, conn->host,
dir_host, username_hash);
return TRUE;
}
static void director_handshake_cmd_done(struct director_connection *conn)
{
struct director *dir = conn->dir;
if (dir->debug)
i_debug("Handshaked to %s", conn->host->name);
conn->host->last_failed = 0;
conn->handshake_received = TRUE;
if (conn->in) {
director_connection_send(conn, "DONE\n");
if (dir->right != NULL) {
director_connection_send(dir->right,
t_strdup_printf("DIRECTOR\t%s\t%u\n",
net_ip2addr(&conn->host->ip),
conn->host->port));
}
}
if (dir->left != NULL && dir->right != NULL &&
dir->left->handshake_received && dir->right->handshake_received) {
dir->sync_seq++;
dir->ring_synced = FALSE;
director_connection_send(dir->right,
t_strdup_printf("SYNC\t%s\t%u\t%u\n",
net_ip2addr(&dir->self_ip),
dir->self_port, dir->sync_seq));
}
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
director_connection_ping, conn);
}
static bool
director_connection_handle_handshake(struct director_connection *conn,
const char *cmd, const char *const *args)
{
struct director_host *host;
struct ip_addr ip;
unsigned int port;
if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) {
if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) {
i_error("director(%s): Wrong protocol in socket "
"(%s vs %s)",
conn->name, args[0], DIRECTOR_VERSION_NAME);
return FALSE;
} else if (atoi(args[1]) != DIRECTOR_VERSION_MAJOR) {
i_error("director(%s): Incompatible protocol version: "
"%u vs %u", conn->name, atoi(args[1]),
DIRECTOR_VERSION_MAJOR);
return FALSE;
}
conn->version_received = TRUE;
return TRUE;
}
if (!conn->version_received) {
i_error("director(%s): Incompatible protocol", conn->name);
return FALSE;
}
if (strcmp(cmd, "ME") == 0 && !conn->me_received &&
str_array_length(args) == 2)
return director_cmd_me(conn, args);
if (!conn->in && strcmp(cmd, "CONNECT") == 0 &&
str_array_length(args) == 2) {
if (!director_args_parse_ip_port(conn, args, &ip, &port))
return FALSE;
conn->dir->right = NULL;
host = director_host_get(conn->dir, &ip, port);
host->last_failed = 0;
if (conn->dir->debug)
i_debug("Received CONNECT reference to %s", host->name);
(void)director_connect_host(conn->dir, host);
return FALSE;
}
if (conn->in && strcmp(cmd, "DIRECTOR") == 0 && conn->me_received)
return director_cmd_director(conn, args);
if (strcmp(cmd, "HOST") == 0) {
if (conn->handshake_sending_hosts)
return director_cmd_host_handshake(conn, args);
else
return director_cmd_host(conn, args);
}
if (conn->handshake_sending_hosts &&
strcmp(cmd, "HOST-HAND-END") == 0) {
conn->ignore_host_events = FALSE;
conn->handshake_sending_hosts = FALSE;
return TRUE;
}
if (conn->in && strcmp(cmd, "HOST-HAND-START") == 0 &&
conn->me_received)
return director_cmd_host_hand_start(conn, args);
if (strcmp(cmd, "USER") == 0 && conn->me_received) {
if (conn->in)
return director_handshake_cmd_user(conn, args);
else
return director_cmd_user(conn, args);
}
if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
!conn->handshake_sending_hosts) {
director_handshake_cmd_done(conn);
return TRUE;
}
i_error("director(%s): Invalid handshake command: %s "
"(in=%d me_received=%d)", conn->name, cmd,
conn->in, conn->me_received);
return FALSE;
}
static bool director_connection_sync(struct director_connection *conn,
const char *const *args, const char *line)
{
struct director *dir = conn->dir;
struct director_host *host;
struct ip_addr ip;
unsigned int port, seq;
if (str_array_length(args) != 3 ||
!director_args_parse_ip_port(conn, args, &ip, &port) ||
str_to_uint(args[2], &seq) < 0) {
i_error("director(%s): Invalid SYNC args", conn->name);
return FALSE;
}
host = director_host_lookup(dir, &ip, port);
if (host == NULL)
return TRUE;
if (host->self) {
if (dir->sync_seq != seq) {
return TRUE;
}
if (!dir->ring_handshaked) {
director_set_ring_handshaked(dir);
} else if (dir->ring_synced) {
i_error("Received SYNC from %s (seq=%u) "
"while already synced", conn->name, seq);
return TRUE;
} else {
if (dir->debug) {
i_debug("Ring is synced (%s sent seq=%u)",
conn->name, seq);
}
director_set_ring_synced(dir);
}
return TRUE;
}
if (dir->right != NULL) {
director_connection_send(dir->right,
t_strconcat(line, "\n", NULL));
}
return TRUE;
}
static bool director_cmd_connect(struct director_connection *conn,
const char *const *args)
{
struct director *dir = conn->dir;
struct director_host *host;
struct ip_addr ip;
unsigned int port;
if (str_array_length(args) != 2 ||
!director_args_parse_ip_port(conn, args, &ip, &port)) {
i_error("director(%s): Invalid CONNECT args", conn->name);
return FALSE;
}
host = director_host_lookup(dir, &ip, port);
if (host == NULL) {
i_error("Received CONNECT request to unknown host %s:%u",
net_ip2addr(&ip), port);
return TRUE;
}
if (dir->right != NULL &&
director_host_cmp_to_self(host, dir->right->host,
dir->self_host) <= 0) {
if (dir->debug) {
i_debug("Ignoring CONNECT request to %s "
"(current right is %s)",
host->name, dir->right->name);
}
return TRUE;
}
if (dir->debug) {
if (dir->right == NULL) {
i_debug("Received CONNECT request to %s, "
"initializing right", host->name);
} else {
i_debug("Received CONNECT request to %s, "
"replacing current right %s",
host->name, dir->right->name);
}
}
(void)director_connect_host(dir, host);
return TRUE;
}
static bool director_cmd_pong(struct director_connection *conn)
{
if (!conn->ping_waiting)
return TRUE;
conn->ping_waiting = FALSE;
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
director_connection_ping, conn);
return TRUE;
}
static bool
director_connection_handle_line(struct director_connection *conn,
const char *line)
{
const char *cmd, *const *args;
args = t_strsplit(line, "\t");
cmd = args[0]; args++;
if (cmd == NULL) {
i_error("director(%s): Received empty line", conn->name);
return FALSE;
}
if (strcmp(cmd, "PING") == 0) {
director_connection_send(conn, "PONG\n");
return TRUE;
}
if (strcmp(cmd, "PONG") == 0)
return director_cmd_pong(conn);
if (!conn->handshake_received) {
if (!director_connection_handle_handshake(conn, cmd, args)) {
if (conn->dir->debug) {
i_debug("director(%s): Handshaking failed",
conn->host->name);
}
if (conn->host != NULL)
conn->host->last_failed = ioloop_time;
return FALSE;
}
return TRUE;
}
if (strcmp(cmd, "USER") == 0)
return director_cmd_user(conn, args);
if (strcmp(cmd, "HOST") == 0)
return director_cmd_host(conn, args);
if (strcmp(cmd, "HOST-REMOVE") == 0)
return director_cmd_host_remove(conn, args);
if (strcmp(cmd, "HOST-FLUSH") == 0)
return director_cmd_host_flush(conn, args);
if (strcmp(cmd, "USER-MOVE") == 0)
return director_cmd_user_move(conn, args);
if (strcmp(cmd, "USER-KILLED") == 0)
return director_cmd_user_killed(conn, args);
if (strcmp(cmd, "USER-KILLED-EVERYWHERE") == 0)
return director_cmd_user_killed_everywhere(conn, args);
if (strcmp(cmd, "DIRECTOR") == 0)
return director_cmd_director(conn, args);
if (strcmp(cmd, "SYNC") == 0)
return director_connection_sync(conn, args, line);
if (strcmp(cmd, "CONNECT") == 0)
return director_cmd_connect(conn, args);
i_error("director(%s): Unknown command (in this state): %s",
conn->name, cmd);
return FALSE;
}
static void director_connection_input(struct director_connection *conn)
{
struct director *dir = conn->dir;
char *line;
bool ret;
if (conn->to_ping != NULL)
timeout_reset(conn->to_ping);
switch (i_stream_read(conn->input)) {
case 0:
return;
case -1:
i_error("Director %s disconnected%s", conn->name,
conn->handshake_received ? "" :
" before handshake finished");
director_connection_disconnected(&conn);
return;
case -2:
i_error("BUG: Director %s sent us more than %d bytes",
conn->name, MAX_INBUF_SIZE);
director_connection_disconnected(&conn);
return;
}
director_sync_freeze(dir);
while ((line = i_stream_next_line(conn->input)) != NULL) {
T_BEGIN {
ret = director_connection_handle_line(conn, line);
} T_END;
if (!ret) {
if (dir->debug) {
i_debug("director(%s): Invalid input, disconnecting",
conn->name);
}
director_connection_disconnected(&conn);
break;
}
}
director_sync_thaw(dir);
}
static void director_connection_send_directors(struct director_connection *conn,
string_t *str)
{
struct director_host *const *hostp;
array_foreach(&conn->dir->dir_hosts, hostp) {
str_printfa(str, "DIRECTOR\t%s\t%u\n",
net_ip2addr(&(*hostp)->ip), (*hostp)->port);
}
}
static void
director_connection_send_hosts(struct director_connection *conn, string_t *str)
{
struct mail_host *const *hostp;
str_printfa(str, "HOST-HAND-START\t%u\n", conn->dir->ring_handshaked);
array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp) {
str_printfa(str, "HOST\t%s\t%u\n",
net_ip2addr(&(*hostp)->ip), (*hostp)->vhost_count);
}
str_printfa(str, "HOST-HAND-END\t%u\n", conn->dir->ring_handshaked);
}
static int director_connection_send_users(struct director_connection *conn)
{
struct user *user;
int ret;
o_stream_cork(conn->output);
while ((user = user_directory_iter_next(conn->user_iter)) != NULL) {
if (!user_directory_user_has_connections(conn->dir->users,
user)) {
continue;
}
T_BEGIN {
const char *line;
line = t_strdup_printf("USER\t%u\t%s\t%u\n",
user->username_hash,
net_ip2addr(&user->host->ip),
user->timestamp);
director_connection_send(conn, line);
} T_END;
if (o_stream_get_buffer_used_size(conn->output) >= OUTBUF_FLUSH_THRESHOLD) {
if ((ret = o_stream_flush(conn->output)) <= 0) {
return ret;
}
}
}
user_directory_iter_deinit(&conn->user_iter);
director_connection_send(conn, "DONE\n");
i_assert(conn->io == NULL);
conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
ret = o_stream_flush(conn->output);
o_stream_uncork(conn->output);
return ret;
}
static int director_connection_output(struct director_connection *conn)
{
if (conn->user_iter != NULL)
return director_connection_send_users(conn);
else
return o_stream_flush(conn->output);
}
static void
director_connection_init_timeout(struct director_connection *conn)
{
if (conn->host != NULL)
conn->host->last_failed = ioloop_time;
if (!conn->connected)
i_error("director(%s): Connect timed out", conn->name);
else
i_error("director(%s): Handshaking timed out", conn->name);
director_connection_disconnected(&conn);
}
static struct director_connection *
director_connection_init_common(struct director *dir, int fd)
{
struct director_connection *conn;
conn = i_new(struct director_connection, 1);
conn->created = ioloop_time;
conn->fd = fd;
conn->dir = dir;
conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE);
conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE);
o_stream_set_flush_callback(conn->output,
director_connection_output, conn);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS,
director_connection_init_timeout, conn);
DLLIST_PREPEND(&dir->connections, conn);
return conn;
}
static void director_connection_send_handshake(struct director_connection *conn)
{
director_connection_send(conn, t_strdup_printf(
"VERSION\t"DIRECTOR_VERSION_NAME"\t%u\t%u\n"
"ME\t%s\t%u\n",
DIRECTOR_VERSION_MAJOR, DIRECTOR_VERSION_MINOR,
net_ip2addr(&conn->dir->self_ip), conn->dir->self_port));
}
struct director_connection *
director_connection_init_in(struct director *dir, int fd,
const struct ip_addr *ip)
{
struct director_connection *conn;
conn = director_connection_init_common(dir, fd);
conn->in = TRUE;
conn->connected = TRUE;
conn->name = i_strdup_printf("%s/in", net_ip2addr(ip));
conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
director_connection_send_handshake(conn);
return conn;
}
static void director_connection_connected(struct director_connection *conn)
{
struct director *dir = conn->dir;
string_t *str = t_str_new(1024);
int err;
if ((err = net_geterror(conn->fd)) != 0) {
conn->host->last_failed = ioloop_time;
i_error("director(%s): connect() failed: %s", conn->name,
strerror(err));
director_connection_disconnected(&conn);
return;
}
conn->connected = TRUE;
if (dir->right != NULL) {
if (director_host_cmp_to_self(conn->host, dir->right->host,
dir->self_host) <= 0) {
i_warning("Aborting incorrect outgoing connection to %s "
"(already connected to correct one: %s)",
conn->host->name, dir->right->host->name);
director_connection_deinit(&conn);
return;
}
i_warning("Replacing director connection %s with %s",
dir->right->host->name, conn->host->name);
director_connection_deinit(&dir->right);
}
dir->right = conn;
i_free(conn->name);
conn->name = i_strdup_printf("%s/right", conn->host->name);
io_remove(&conn->io);
director_connection_send_handshake(conn);
director_connection_send_directors(conn, str);
director_connection_send_hosts(conn, str);
director_connection_send(conn, str_c(str));
conn->user_iter = user_directory_iter_init(dir->users);
(void)director_connection_send_users(conn);
}
struct director_connection *
director_connection_init_out(struct director *dir, int fd,
struct director_host *host)
{
struct director_connection *conn;
host->last_seq = 0;
conn = director_connection_init_common(dir, fd);
conn->name = i_strdup_printf("%s/out", host->name);
conn->host = host;
conn->io = io_add(conn->fd, IO_READ,
director_connection_connected, conn);
return conn;
}
void director_connection_deinit(struct director_connection **_conn)
{
struct director_connection *conn = *_conn;
struct director *dir = conn->dir;
*_conn = NULL;
if (dir->debug && conn->host != NULL)
i_debug("Disconnecting from %s", conn->host->name);
if (conn->host != NULL && !conn->in &&
conn->created + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time)
conn->host->last_failed = ioloop_time;
DLLIST_REMOVE(&dir->connections, conn);
if (dir->left == conn)
dir->left = NULL;
if (dir->right == conn)
dir->right = NULL;
if (conn->user_iter != NULL)
user_directory_iter_deinit(&conn->user_iter);
if (conn->to != NULL)
timeout_remove(&conn->to);
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
if (conn->io != NULL)
io_remove(&conn->io);
i_stream_unref(&conn->input);
o_stream_unref(&conn->output);
if (close(conn->fd) < 0)
i_error("close(director connection) failed: %m");
if (conn->in)
master_service_client_connection_destroyed(master_service);
i_free(conn->name);
i_free(conn);
if (dir->left == NULL || dir->right == NULL) {
dir->sync_seq++;
dir->ring_synced = FALSE;
}
}
void director_connection_disconnected(struct director_connection **_conn)
{
struct director_connection *conn = *_conn;
struct director *dir = conn->dir;
director_connection_deinit(_conn);
if (dir->right == NULL)
director_connect(dir);
}
static void director_connection_timeout(struct director_connection *conn)
{
director_connection_disconnected(&conn);
}
void director_connection_send(struct director_connection *conn,
const char *data)
{
unsigned int len = strlen(data);
off_t ret;
if (conn->output->closed || !conn->connected)
return;
ret = o_stream_send(conn->output, data, len);
if (ret != (off_t)len) {
if (ret < 0)
i_error("director(%s): write() failed: %m", conn->name);
else {
i_error("director(%s): Output buffer full, "
"disconnecting", conn->name);
}
o_stream_close(conn->output);
conn->to = timeout_add(0, director_connection_timeout, conn);
}
}
void director_connection_send_except(struct director_connection *conn,
struct director_host *skip_host,
const char *data)
{
if (conn->host != skip_host)
director_connection_send(conn, data);
}
static void director_connection_ping_timeout(struct director_connection *conn)
{
i_error("director(%s): Ping timed out, disconnecting", conn->name);
director_connection_disconnected(&conn);
}
static void director_connection_ping(struct director_connection *conn)
{
conn->sync_ping = FALSE;
if (conn->ping_waiting)
return;
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
director_connection_ping_timeout, conn);
director_connection_send(conn, "PING\n");
conn->ping_waiting = TRUE;
}
const char *director_connection_get_name(struct director_connection *conn)
{
return conn->name;
}
struct director_host *
director_connection_get_host(struct director_connection *conn)
{
return conn->host;
}
struct director_connection *
director_connection_find_outgoing(struct director *dir,
struct director_host *host)
{
struct director_connection *conn;
for (conn = dir->connections; conn != NULL; conn = conn->next) {
if (conn->host == host && !conn->in)
return conn;
}
return NULL;
}
void director_connection_cork(struct director_connection *conn)
{
o_stream_cork(conn->output);
}
void director_connection_uncork(struct director_connection *conn)
{
o_stream_uncork(conn->output);
}
void director_connection_wait_sync(struct director_connection *conn)
{
if (conn->ping_waiting || conn->sync_ping)
return;
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS,
director_connection_ping, conn);
conn->sync_ping = TRUE;
}
void director_connections_deinit(struct director *dir)
{
struct director_connection *conn;
while (dir->connections != NULL) {
conn = dir->connections;
dir->connections = conn->next;
director_connection_deinit(&conn);
}
}