#include "db_config.h"
#define __INCLUDE_NETWORKING 1
#include "db_int.h"
#include "dbinc/mp.h"
struct sending_msg {
REPMGR_IOVECS iovecs;
u_int8_t type;
u_int32_t control_size_buf, rec_size_buf;
REPMGR_FLAT *fmsg;
};
static int __repmgr_send_broadcast
__P((DB_ENV *, const DBT *, const DBT *, u_int *, u_int *));
static void setup_sending_msg
__P((struct sending_msg *, u_int, const DBT *, const DBT *));
static int __repmgr_send_internal
__P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
static int enqueue_msg
__P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
static int flatten __P((DB_ENV *, struct sending_msg *));
static REPMGR_SITE *__repmgr_available_site __P((DB_ENV *, int));
int
__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
DB_ENV *dbenv;
const DBT *control, *rec;
const DB_LSN *lsnp;
int eid;
u_int32_t flags;
{
DB_REP *db_rep;
u_int nsites, npeers, available, needed;
int ret, t_ret;
REPMGR_SITE *site;
REPMGR_CONNECTION *conn;
db_rep = dbenv->rep_handle;
LOCK_MUTEX(db_rep->mutex);
if (eid == DB_EID_BROADCAST) {
if ((ret = __repmgr_send_broadcast(dbenv, control, rec,
&nsites, &npeers)) != 0)
goto out;
} else {
if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
DB_REP_ANYWHERE &&
IS_VALID_EID(db_rep->peer) &&
(site = __repmgr_available_site(dbenv, db_rep->peer)) !=
NULL) {
RPRINT(dbenv, (dbenv, "sending request to peer"));
} else if ((site = __repmgr_available_site(dbenv, eid)) ==
NULL) {
RPRINT(dbenv, (dbenv,
"ignoring message sent to unavailable site"));
ret = DB_REP_UNAVAIL;
goto out;
}
conn = site->ref.conn;
if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
control, rec)) == DB_REP_UNAVAIL &&
(t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
ret = t_ret;
if (ret != 0)
goto out;
nsites = 1;
npeers = site->priority > 0 ? 1 : 0;
}
if (LF_ISSET(DB_REP_PERMANENT)) {
switch (db_rep->perm_policy) {
case DB_REPMGR_ACKS_NONE:
goto out;
case DB_REPMGR_ACKS_ONE:
needed = 1;
available = nsites;
break;
case DB_REPMGR_ACKS_ALL:
needed = __repmgr_get_nsites(db_rep) - 1;
available = nsites;
break;
case DB_REPMGR_ACKS_ONE_PEER:
needed = 1;
available = npeers;
break;
case DB_REPMGR_ACKS_ALL_PEERS:
needed = 1;
available = npeers;
break;
case DB_REPMGR_ACKS_QUORUM:
needed = (__repmgr_get_nsites(db_rep) - 1) / 2;
available = npeers;
break;
default:
COMPQUIET(available, 0);
COMPQUIET(needed, 0);
(void)__db_unknown_path(dbenv, "__repmgr_send");
break;
}
if (available < needed) {
ret = DB_REP_UNAVAIL;
goto out;
}
RPRINT(dbenv, (dbenv,
"will await acknowledgement: need %u", needed));
ret = __repmgr_await_ack(dbenv, lsnp);
}
out: UNLOCK_MUTEX(db_rep->mutex);
if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) {
STAT(db_rep->region->mstat.st_perm_failed++);
DB_EVENT(dbenv, DB_EVENT_REP_PERM_FAILED, NULL);
}
return (ret);
}
static REPMGR_SITE *
__repmgr_available_site(dbenv, eid)
DB_ENV *dbenv;
int eid;
{
DB_REP *db_rep;
REPMGR_SITE *site;
db_rep = dbenv->rep_handle;
site = SITE_FROM_EID(eid);
if (site->state != SITE_CONNECTED)
return (NULL);
if (F_ISSET(site->ref.conn, CONN_CONNECTING))
return (NULL);
return (site);
}
static int
__repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
DB_ENV *dbenv;
const DBT *control, *rec;
u_int *nsitesp, *npeersp;
{
DB_REP *db_rep;
struct sending_msg msg;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site;
u_int nsites, npeers;
int ret;
db_rep = dbenv->rep_handle;
setup_sending_msg(&msg, REPMGR_REP_MESSAGE, control, rec);
nsites = npeers = 0;
TAILQ_FOREACH(conn, &db_rep->connections, entries) {
if (F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT) ||
!IS_VALID_EID(conn->eid))
continue;
if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
site = SITE_FROM_EID(conn->eid);
nsites++;
if (site->priority > 0)
npeers++;
} else if (ret == DB_REP_UNAVAIL) {
if ((ret = __repmgr_bust_connection(
dbenv, conn, FALSE)) != 0)
return (ret);
} else
return (ret);
}
*nsitesp = nsites;
*npeersp = npeers;
return (0);
}
int
__repmgr_send_one(dbenv, conn, msg_type, control, rec)
DB_ENV *dbenv;
REPMGR_CONNECTION *conn;
u_int msg_type;
const DBT *control, *rec;
{
struct sending_msg msg;
setup_sending_msg(&msg, msg_type, control, rec);
return (__repmgr_send_internal(dbenv, conn, &msg));
}
static int
__repmgr_send_internal(dbenv, conn, msg)
DB_ENV *dbenv;
REPMGR_CONNECTION *conn;
struct sending_msg *msg;
{
#define OUT_QUEUE_LIMIT 10
REPMGR_IOVECS iovecs;
SITE_STRING_BUFFER buffer;
int ret;
size_t nw;
size_t total_written;
DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
if (!STAILQ_EMPTY(&conn->outbound_queue)) {
RPRINT(dbenv, (dbenv, "msg to %s to be queued",
__repmgr_format_eid_loc(dbenv->rep_handle,
conn->eid, buffer)));
if (conn->out_queue_length < OUT_QUEUE_LIMIT)
return (enqueue_msg(dbenv, conn, msg, 0));
else {
RPRINT(dbenv, (dbenv, "queue limit exceeded"));
STAT(dbenv->rep_handle->
region->mstat.st_msgs_dropped++);
return (0);
}
}
memcpy(&iovecs, &msg->iovecs, sizeof(iovecs));
total_written = 0;
while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset],
iovecs.count-iovecs.offset, &nw)) == 0) {
total_written += nw;
if (__repmgr_update_consumed(&iovecs, nw))
return (0);
}
if (ret != WOULDBLOCK) {
__db_err(dbenv, ret, "socket writing failure");
return (DB_REP_UNAVAIL);
}
RPRINT(dbenv, (dbenv, "wrote only %lu bytes to %s",
(u_long)total_written,
__repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer)));
if ((ret = enqueue_msg(dbenv, conn, msg, total_written)) != 0)
return (ret);
STAT(dbenv->rep_handle->region->mstat.st_msgs_queued++);
#ifdef DB_WIN32
if (WSAEventSelect(conn->fd, conn->event_object,
FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
ret = net_errno;
__db_err(dbenv, ret, "can't add FD_WRITE event bit");
return (ret);
}
#endif
return (__repmgr_wake_main_thread(dbenv));
}
int
__repmgr_is_permanent(dbenv, lsnp)
DB_ENV *dbenv;
const DB_LSN *lsnp;
{
DB_REP *db_rep;
REPMGR_SITE *site;
u_int eid, nsites, npeers;
int is_perm, has_missing_peer;
db_rep = dbenv->rep_handle;
if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE)
return (TRUE);
nsites = npeers = 0;
has_missing_peer = FALSE;
for (eid = 0; eid < db_rep->site_cnt; eid++) {
site = SITE_FROM_EID(eid);
if (site->priority == -1) {
has_missing_peer = TRUE;
continue;
}
if (log_compare(&site->max_ack, lsnp) >= 0) {
nsites++;
if (site->priority > 0)
npeers++;
} else {
if (site->priority > 0)
has_missing_peer = TRUE;
}
}
switch (db_rep->perm_policy) {
case DB_REPMGR_ACKS_ONE:
is_perm = (nsites >= 1);
break;
case DB_REPMGR_ACKS_ONE_PEER:
is_perm = (npeers >= 1);
break;
case DB_REPMGR_ACKS_QUORUM:
if (__repmgr_get_nsites(db_rep) == 2) {
is_perm = (npeers >= 1);
} else
is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2);
break;
case DB_REPMGR_ACKS_ALL:
is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1);
break;
case DB_REPMGR_ACKS_ALL_PEERS:
if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) {
has_missing_peer = TRUE;
}
is_perm = !has_missing_peer;
break;
default:
is_perm = FALSE;
(void)__db_unknown_path(dbenv, "__repmgr_is_permanent");
}
return (is_perm);
}
int
__repmgr_bust_connection(dbenv, conn, do_close)
DB_ENV *dbenv;
REPMGR_CONNECTION *conn;
int do_close;
{
DB_REP *db_rep;
int connecting, ret, eid;
db_rep = dbenv->rep_handle;
ret = 0;
DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
eid = conn->eid;
connecting = F_ISSET(conn, CONN_CONNECTING);
if (do_close)
__repmgr_cleanup_connection(dbenv, conn);
else {
F_SET(conn, CONN_DEFUNCT);
conn->eid = -1;
}
if (IS_VALID_EID(eid)) {
if ((ret = __repmgr_schedule_connection_attempt(
dbenv, (u_int)eid, FALSE)) != 0)
return (ret);
if (!connecting && eid == db_rep->master_eid) {
(void)__memp_set_config(
dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
if ((ret = __repmgr_init_election(
dbenv, ELECT_FAILURE_ELECTION)) != 0)
return (ret);
}
} else if (!do_close) {
ret = __repmgr_wake_main_thread(dbenv);
}
return (ret);
}
void
__repmgr_cleanup_connection(dbenv, conn)
DB_ENV *dbenv;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
QUEUED_OUTPUT *out;
REPMGR_FLAT *msg;
DBT *dbt;
db_rep = dbenv->rep_handle;
TAILQ_REMOVE(&db_rep->connections, conn, entries);
if (conn->fd != INVALID_SOCKET) {
(void)closesocket(conn->fd);
#ifdef DB_WIN32
(void)WSACloseEvent(conn->event_object);
#endif
}
if (conn->reading_phase == DATA_PHASE) {
if (conn->msg_type == REPMGR_REP_MESSAGE)
__os_free(dbenv, conn->input.rep_message);
else {
dbt = &conn->input.repmgr_msg.cntrl;
__os_free(dbenv, dbt->data);
dbt = &conn->input.repmgr_msg.rec;
if (dbt->size > 0)
__os_free(dbenv, dbt->data);
}
}
while (!STAILQ_EMPTY(&conn->outbound_queue)) {
out = STAILQ_FIRST(&conn->outbound_queue);
STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
msg = out->msg;
if (--msg->ref_count <= 0)
__os_free(dbenv, msg);
__os_free(dbenv, out);
}
__os_free(dbenv, conn);
}
static int
enqueue_msg(dbenv, conn, msg, offset)
DB_ENV *dbenv;
REPMGR_CONNECTION *conn;
struct sending_msg *msg;
size_t offset;
{
QUEUED_OUTPUT *q_element;
int ret;
if (msg->fmsg == NULL && ((ret = flatten(dbenv, msg)) != 0))
return (ret);
if ((ret = __os_malloc(dbenv, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
return (ret);
q_element->msg = msg->fmsg;
msg->fmsg->ref_count++;
q_element->offset = offset;
STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
conn->out_queue_length++;
return (0);
}
static void
setup_sending_msg(msg, type, control, rec)
struct sending_msg *msg;
u_int type;
const DBT *control, *rec;
{
u_int32_t rec_size;
__repmgr_iovec_init(&msg->iovecs);
msg->type = type;
__repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type));
msg->control_size_buf = htonl(control->size);
__repmgr_add_buffer(&msg->iovecs,
&msg->control_size_buf, sizeof(msg->control_size_buf));
rec_size = rec == NULL ? 0 : rec->size;
msg->rec_size_buf = htonl(rec_size);
__repmgr_add_buffer(
&msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf));
if (control->size > 0)
__repmgr_add_dbt(&msg->iovecs, control);
if (rec_size > 0)
__repmgr_add_dbt(&msg->iovecs, rec);
msg->fmsg = NULL;
}
static int
flatten(dbenv, msg)
DB_ENV *dbenv;
struct sending_msg *msg;
{
u_int8_t *p;
size_t msg_size;
int i, ret;
DB_ASSERT(dbenv, msg->fmsg == NULL);
msg_size = msg->iovecs.total_bytes;
if ((ret = __os_malloc(dbenv, sizeof(*msg->fmsg) + msg_size,
&msg->fmsg)) != 0)
return (ret);
msg->fmsg->length = msg_size;
msg->fmsg->ref_count = 0;
p = &msg->fmsg->data[0];
for (i = 0; i < msg->iovecs.count; i++) {
memcpy(p, msg->iovecs.vectors[i].iov_base,
msg->iovecs.vectors[i].iov_len);
p = &p[msg->iovecs.vectors[i].iov_len];
}
__repmgr_iovec_init(&msg->iovecs);
__repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size);
return (0);
}
int
__repmgr_find_site(dbenv, host, port)
DB_ENV *dbenv;
const char *host;
u_int port;
{
DB_REP *db_rep;
REPMGR_SITE *site;
u_int i;
db_rep = dbenv->rep_handle;
for (i = 0; i < db_rep->site_cnt; i++) {
site = &db_rep->sites[i];
if (strcmp(site->net_addr.host, host) == 0 &&
site->net_addr.port == port)
return ((int)i);
}
return (-1);
}
int
__repmgr_pack_netaddr(dbenv, host, port, list, addr)
DB_ENV *dbenv;
const char *host;
u_int port;
ADDRINFO *list;
repmgr_netaddr_t *addr;
{
int ret;
DB_ASSERT(dbenv, host != NULL);
if ((ret = __os_strdup(dbenv, host, &addr->host)) != 0)
return (ret);
addr->port = (u_int16_t)port;
addr->address_list = list;
addr->current = NULL;
return (0);
}
int
__repmgr_getaddr(dbenv, host, port, flags, result)
DB_ENV *dbenv;
const char *host;
u_int port;
int flags;
ADDRINFO **result;
{
ADDRINFO *answer, hints;
char buffer[10];
#ifdef DB_WIN32
int ret;
#endif
if (port > UINT16_MAX) {
__db_errx(dbenv, "port %u larger than max port %u",
port, UINT16_MAX);
return (EINVAL);
}
#ifdef DB_WIN32
if (!dbenv->rep_handle->wsa_inited &&
(ret = __repmgr_wsa_init(dbenv)) != 0)
return (ret);
#endif
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = flags;
(void)snprintf(buffer, sizeof(buffer), "%u", port);
if (__db_getaddrinfo(dbenv, host, port, buffer, &hints, &answer) != 0)
return (DB_REP_UNAVAIL);
*result = answer;
return (0);
}
int
__repmgr_add_site(dbenv, host, port, newsitep)
DB_ENV *dbenv;
const char *host;
u_int port;
REPMGR_SITE **newsitep;
{
DB_REP *db_rep;
ADDRINFO *address_list;
repmgr_netaddr_t addr;
REPMGR_SITE *site;
int ret, eid;
ret = 0;
db_rep = dbenv->rep_handle;
if (IS_VALID_EID(eid = __repmgr_find_site(dbenv, host, port))) {
site = SITE_FROM_EID(eid);
ret = EEXIST;
goto out;
}
if ((ret = __repmgr_getaddr(
dbenv, host, port, 0, &address_list)) == DB_REP_UNAVAIL) {
address_list = NULL;
} else if (ret != 0)
return (ret);
if ((ret = __repmgr_pack_netaddr(
dbenv, host, port, address_list, &addr)) != 0) {
__db_freeaddrinfo(dbenv, address_list);
return (ret);
}
if ((ret = __repmgr_new_site(dbenv, &site, &addr, SITE_IDLE)) != 0) {
__repmgr_cleanup_netaddr(dbenv, &addr);
return (ret);
}
if (db_rep->selector != NULL &&
(ret = __repmgr_schedule_connection_attempt(
dbenv, (u_int)EID_FROM_SITE(site), TRUE)) != 0)
return (ret);
out:
if (newsitep != NULL)
*newsitep = site;
return (ret);
}
int
__repmgr_net_create(dbenv, db_rep)
DB_ENV *dbenv;
DB_REP *db_rep;
{
COMPQUIET(dbenv, NULL);
db_rep->listen_fd = INVALID_SOCKET;
db_rep->master_eid = DB_EID_INVALID;
TAILQ_INIT(&db_rep->connections);
TAILQ_INIT(&db_rep->retries);
return (0);
}
int
__repmgr_listen(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
ADDRINFO *ai;
char *why;
int sockopt, ret;
socket_t s;
db_rep = dbenv->rep_handle;
s = INVALID_SOCKET;
ai = ADDR_LIST_FIRST(&db_rep->my_addr);
COMPQUIET(why, "");
DB_ASSERT(dbenv, ai != NULL);
for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) {
if ((s = socket(ai->ai_family,
ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
why = "can't create listen socket";
continue;
}
sockopt = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
sizeof(sockopt)) != 0) {
why = "can't set REUSEADDR socket option";
break;
}
if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
why = "can't bind socket to listening address";
(void)closesocket(s);
s = INVALID_SOCKET;
continue;
}
if (listen(s, 5) != 0) {
why = "listen()";
break;
}
if ((ret = __repmgr_set_nonblocking(s)) != 0) {
__db_err(dbenv, ret, "can't unblock listen socket");
goto clean;
}
db_rep->listen_fd = s;
return (0);
}
ret = net_errno;
__db_err(dbenv, ret, why);
clean: if (s != INVALID_SOCKET)
(void)closesocket(s);
return (ret);
}
int
__repmgr_net_close(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
#ifndef DB_WIN32
struct sigaction sigact;
#endif
int ret;
db_rep = dbenv->rep_handle;
if (db_rep->listen_fd == INVALID_SOCKET)
return (0);
TAILQ_FOREACH(conn, &db_rep->connections, entries) {
if (conn->fd != INVALID_SOCKET) {
(void)closesocket(conn->fd);
conn->fd = INVALID_SOCKET;
#ifdef DB_WIN32
(void)WSACloseEvent(conn->event_object);
#endif
}
}
ret = 0;
if (closesocket(db_rep->listen_fd) == SOCKET_ERROR)
ret = net_errno;
#ifdef DB_WIN32
if (WSACleanup() == SOCKET_ERROR && ret == 0)
ret = net_errno;
db_rep->wsa_inited = FALSE;
#else
if (db_rep->chg_sig_handler) {
memset(&sigact, 0, sizeof(sigact));
sigact.sa_handler = SIG_DFL;
if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0)
ret = errno;
}
#endif
db_rep->listen_fd = INVALID_SOCKET;
return (ret);
}
void
__repmgr_net_destroy(dbenv, db_rep)
DB_ENV *dbenv;
DB_REP *db_rep;
{
REPMGR_CONNECTION *conn;
REPMGR_RETRY *retry;
REPMGR_SITE *site;
u_int i;
__repmgr_cleanup_netaddr(dbenv, &db_rep->my_addr);
if (db_rep->sites == NULL)
return;
while (!TAILQ_EMPTY(&db_rep->retries)) {
retry = TAILQ_FIRST(&db_rep->retries);
TAILQ_REMOVE(&db_rep->retries, retry, entries);
__os_free(dbenv, retry);
}
while (!TAILQ_EMPTY(&db_rep->connections)) {
conn = TAILQ_FIRST(&db_rep->connections);
__repmgr_cleanup_connection(dbenv, conn);
}
for (i = 0; i < db_rep->site_cnt; i++) {
site = &db_rep->sites[i];
__repmgr_cleanup_netaddr(dbenv, &site->net_addr);
}
__os_free(dbenv, db_rep->sites);
db_rep->sites = NULL;
}