repmgr_net.c   [plain text]


/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2005,2007 Oracle.  All rights reserved.
 *
 * $Id: repmgr_net.c,v 1.55 2007/06/11 18:29:34 alanb Exp $
 */

#include "db_config.h"

#define	__INCLUDE_NETWORKING	1
#include "db_int.h"
#include "dbinc/mp.h"

/*
 * The functions in this module implement a simple wire protocol for
 * transmitting messages, both replication messages and our own internal control
 * messages.  The protocol is as follows:
 *
 *      1 byte          - message type  (defined in repmgr_int.h)
 *      4 bytes         - size of control
 *      4 bytes         - size of rec
 *      ? bytes         - control
 *      ? bytes         - rec
 *
 * where both sizes are 32-bit binary integers in network byte order.
 * Either control or rec can have zero length, but even in this case the
 * 4-byte length will be present.
 *     Putting both lengths right up at the front allows us to read in fewer
 * phases, and allows us to allocate buffer space for both parts (plus a wrapper
 * struct) at once.
 */

/*
 * In sending a message, we first try to send it in-line, in the sending thread,
 * and without first copying the message, by using scatter/gather I/O, using
 * iovecs to point to the various pieces of the message.  If that all works
 * without blocking, that's optimal.
 *     If we find that, for a particular connection, we can't send without
 * blocking, then we must copy the message for sending later in the select()
 * thread.  In the course of doing that, we might as well "flatten" the message,
 * forming one single buffer, to simplify life.  Not only that, once we've gone
 * to the trouble of doing that, other sites to which we also want to send the
 * message (in the case of a broadcast), may as well take advantage of the
 * simplified structure also.
 *     This structure holds it all.  Note that this structure, and the
 * "flat_msg" structure, are allocated separately, because (1) the flat_msg
 * version is usually not needed; and (2) when it is needed, it will need to
 * live longer than the wrapping sending_msg structure.
 *     Note that, for the broadcast case, where we're going to use this
 * repeatedly, the iovecs is a template that must be copied, since in normal use
 * the iovecs pointers and lengths get adjusted after every partial write.
 */
struct sending_msg {
	REPMGR_IOVECS iovecs;
	u_int8_t type;
	u_int32_t control_size_buf, rec_size_buf;
	REPMGR_FLAT *fmsg;
};

static int __repmgr_send_broadcast
    __P((DB_ENV *, const DBT *, const DBT *, u_int *, u_int *));
static void setup_sending_msg
    __P((struct sending_msg *, u_int, const DBT *, const DBT *));
static int __repmgr_send_internal
    __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
static int enqueue_msg
    __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
static int flatten __P((DB_ENV *, struct sending_msg *));
static REPMGR_SITE *__repmgr_available_site __P((DB_ENV *, int));

/*
 * __repmgr_send --
 *	The send function for DB_ENV->rep_set_transport.
 *
 * !!!
 * This is only ever called as the replication transport call-back, which means
 * it's either on one of our message processing threads or an application
 * thread.  It mustn't be called from the select() thread, because we might call
 * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
 * select() thread.
 *
 * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
 * PUBLIC:     const DB_LSN *, int, u_int32_t));
 */
int
__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
	DB_ENV *dbenv;
	const DBT *control, *rec;
	const DB_LSN *lsnp;
	int eid;
	u_int32_t flags;
{
	DB_REP *db_rep;
	u_int nsites, npeers, available, needed;
	int ret, t_ret;
	REPMGR_SITE *site;
	REPMGR_CONNECTION *conn;

	db_rep = dbenv->rep_handle;

	LOCK_MUTEX(db_rep->mutex);
	if (eid == DB_EID_BROADCAST) {
		if ((ret = __repmgr_send_broadcast(dbenv, control, rec,
			 &nsites, &npeers)) != 0)
			goto out;
	} else {
		/*
		 * If this is a request that can be sent anywhere, then see if
		 * we can send it to our peer (to save load on the master), but
		 * not if it's a rerequest, 'cuz that likely means we tried this
		 * already and failed.
		 */
		if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
		    DB_REP_ANYWHERE &&
		    IS_VALID_EID(db_rep->peer) &&
		    (site = __repmgr_available_site(dbenv, db_rep->peer)) !=
		    NULL) {
			RPRINT(dbenv, (dbenv, "sending request to peer"));
		} else if ((site = __repmgr_available_site(dbenv, eid)) ==
		    NULL) {
			RPRINT(dbenv, (dbenv,
			    "ignoring message sent to unavailable site"));
			ret = DB_REP_UNAVAIL;
			goto out;
		}

		conn = site->ref.conn;
		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
		    control, rec)) == DB_REP_UNAVAIL &&
		    (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
			ret = t_ret;
		if (ret != 0)
			goto out;

		nsites = 1;
		npeers = site->priority > 0 ? 1 : 0;
	}
	/*
	 * Right now, nsites and npeers represent the (maximum) number of sites
	 * we've attempted to begin sending the message to.  Of course we
	 * haven't really received any ack's yet.  But since we've only sent to
	 * nsites/npeers other sites, that's the maximum number of ack's we
	 * could possibly expect.  If even that number fails to satisfy our PERM
	 * policy, there's no point waiting for something that will never
	 * happen.
	 */
	if (LF_ISSET(DB_REP_PERMANENT)) {
		switch (db_rep->perm_policy) {
		case DB_REPMGR_ACKS_NONE:
			goto out;

		case DB_REPMGR_ACKS_ONE:
			needed = 1;
			available = nsites;
			break;

		case DB_REPMGR_ACKS_ALL:
			/* Number of sites in the group besides myself. */
			needed = __repmgr_get_nsites(db_rep) - 1;
			available = nsites;
			break;

		case DB_REPMGR_ACKS_ONE_PEER:
			needed = 1;
			available = npeers;
			break;

		case DB_REPMGR_ACKS_ALL_PEERS:
			/*
			 * Too hard to figure out "needed", since we're not
			 * keeping track of how many peers we have; so just skip
			 * the optimization in this case.
			 */
			needed = 1;
			available = npeers;
			break;

		case DB_REPMGR_ACKS_QUORUM:
			/*
			 * The minimum number of acks necessary to ensure that
			 * the transaction is durable if an election is held.
			 */
			needed = (__repmgr_get_nsites(db_rep) - 1) / 2;
			available = npeers;
			break;

		default:
			COMPQUIET(available, 0);
			COMPQUIET(needed, 0);
			(void)__db_unknown_path(dbenv, "__repmgr_send");
			break;
		}
		if (available < needed) {
			ret = DB_REP_UNAVAIL;
			goto out;
		}
		/* In ALL_PEERS case, display of "needed" might be confusing. */
		RPRINT(dbenv, (dbenv,
		    "will await acknowledgement: need %u", needed));
		ret = __repmgr_await_ack(dbenv, lsnp);
	}

out:	UNLOCK_MUTEX(db_rep->mutex);
	if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) {
		STAT(db_rep->region->mstat.st_perm_failed++);
		DB_EVENT(dbenv, DB_EVENT_REP_PERM_FAILED, NULL);
	}
	return (ret);
}

static REPMGR_SITE *
__repmgr_available_site(dbenv, eid)
	DB_ENV *dbenv;
	int eid;
{
	DB_REP *db_rep;
	REPMGR_SITE *site;

	db_rep = dbenv->rep_handle;
	site = SITE_FROM_EID(eid);
	if (site->state != SITE_CONNECTED)
		return (NULL);

	if (F_ISSET(site->ref.conn, CONN_CONNECTING))
		return (NULL);
	return (site);
}

/*
 * Sends message to all sites with which we currently have an active
 * connection.  Sets result parameters according to how many sites we attempted
 * to begin sending to, even if we did nothing more than queue it for later
 * delivery.
 *
 * !!!
 * Caller must hold dbenv->mutex.
 *
 * !!!
 * Note that this cannot be called from the select() thread, in case we call
 * __repmgr_bust_connection(..., FALSE).
 */
static int
__repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
	DB_ENV *dbenv;
	const DBT *control, *rec;
	u_int *nsitesp, *npeersp;
{
	DB_REP *db_rep;
	struct sending_msg msg;
	REPMGR_CONNECTION *conn;
	REPMGR_SITE *site;
	u_int nsites, npeers;
	int ret;

	db_rep = dbenv->rep_handle;

	setup_sending_msg(&msg, REPMGR_REP_MESSAGE, control, rec);
	nsites = npeers = 0;

	/*
	 * Traverse the connections list.  Here, even in bust_connection, we
	 * don't unlink the current list entry, so we can use the TAILQ_FOREACH
	 * macro.
	 */
	TAILQ_FOREACH(conn, &db_rep->connections, entries) {
		if (F_ISSET(conn, CONN_CONNECTING | CONN_DEFUNCT) ||
		    !IS_VALID_EID(conn->eid))
			continue;

		if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
			site = SITE_FROM_EID(conn->eid);
			nsites++;
			if (site->priority > 0)
				npeers++;
		} else if (ret == DB_REP_UNAVAIL) {
			if ((ret = __repmgr_bust_connection(
			     dbenv, conn, FALSE)) != 0)
				return (ret);
		} else
			return (ret);
	}

	*nsitesp = nsites;
	*npeersp = npeers;
	return (0);
}

/*
 * __repmgr_send_one --
 *	Send a message to a site, or if you can't just yet, make a copy of it
 * and arrange to have it sent later.  'rec' may be NULL, in which case we send
 * a zero length and no data.
 *
 * If we get an error, we take care of cleaning up the connection (calling
 * __repmgr_bust_connection()), so that the caller needn't do so.
 *
 * !!!
 * Note that the mutex should be held through this call.
 * It doubles as a synchronizer to make sure that two threads don't
 * intersperse writes that are part of two single messages.
 *
 * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
 * PUBLIC:    u_int, const DBT *, const DBT *));
 */
int
__repmgr_send_one(dbenv, conn, msg_type, control, rec)
	DB_ENV *dbenv;
	REPMGR_CONNECTION *conn;
	u_int msg_type;
	const DBT *control, *rec;
{
	struct sending_msg msg;

	setup_sending_msg(&msg, msg_type, control, rec);
	return (__repmgr_send_internal(dbenv, conn, &msg));
}

/*
 * Attempts a "best effort" to send a message on the given site.  If there is an
 * excessive backlog of message already queued on the connection, we simply drop
 * this message, and still return 0 even in this case.
 */
static int
__repmgr_send_internal(dbenv, conn, msg)
	DB_ENV *dbenv;
	REPMGR_CONNECTION *conn;
	struct sending_msg *msg;
{
#define	OUT_QUEUE_LIMIT 10	/* arbitrary, for now */
	REPMGR_IOVECS iovecs;
	SITE_STRING_BUFFER buffer;
	int ret;
	size_t nw;
	size_t total_written;

	DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
		/*
		 * Output to this site is currently owned by the select()
		 * thread, so we can't try sending in-line here.  We can only
		 * queue the msg for later.
		 */
		RPRINT(dbenv, (dbenv, "msg to %s to be queued",
		    __repmgr_format_eid_loc(dbenv->rep_handle,
		    conn->eid, buffer)));
		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
			return (enqueue_msg(dbenv, conn, msg, 0));
		else {
			RPRINT(dbenv, (dbenv, "queue limit exceeded"));
			STAT(dbenv->rep_handle->
			    region->mstat.st_msgs_dropped++);
			return (0);
		}
	}

	/*
	 * Send as much data to the site as we can, without blocking.  Keep
	 * writing as long as we're making some progress.  Make a scratch copy
	 * of iovecs for our use, since we destroy it in the process of
	 * adjusting pointers after each partial I/O.
	 */
	memcpy(&iovecs, &msg->iovecs, sizeof(iovecs));
	total_written = 0;
	while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset],
	    iovecs.count-iovecs.offset, &nw)) == 0) {
		total_written += nw;
		if (__repmgr_update_consumed(&iovecs, nw)) /* all written */
			return (0);
	}

	if (ret != WOULDBLOCK) {
		__db_err(dbenv, ret, "socket writing failure");
		return (DB_REP_UNAVAIL);
	}

	RPRINT(dbenv, (dbenv, "wrote only %lu bytes to %s",
	    (u_long)total_written,
	    __repmgr_format_eid_loc(dbenv->rep_handle, conn->eid, buffer)));
	/*
	 * We can't send any more without blocking: queue (a pointer to) a
	 * "flattened" copy of the message, so that the select() thread will
	 * finish sending it later.
	 */
	if ((ret = enqueue_msg(dbenv, conn, msg, total_written)) != 0)
		return (ret);

	STAT(dbenv->rep_handle->region->mstat.st_msgs_queued++);

	/*
	 * Wake the main select thread so that it can discover that it has
	 * received ownership of this connection.  Note that we didn't have to
	 * do this in the previous case (above), because the non-empty queue
	 * implies that the select() thread is already managing ownership of
	 * this connection.
	 */
#ifdef DB_WIN32
	if (WSAEventSelect(conn->fd, conn->event_object,
	    FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
		ret = net_errno;
		__db_err(dbenv, ret, "can't add FD_WRITE event bit");
		return (ret);
	}
#endif
	return (__repmgr_wake_main_thread(dbenv));
}

/*
 * PUBLIC: int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
 *
 * Count up how many sites have ack'ed the given LSN.  Returns TRUE if enough
 * sites have ack'ed; FALSE otherwise.
 *
 * !!!
 * Caller must hold the mutex.
 */
int
__repmgr_is_permanent(dbenv, lsnp)
	DB_ENV *dbenv;
	const DB_LSN *lsnp;
{
	DB_REP *db_rep;
	REPMGR_SITE *site;
	u_int eid, nsites, npeers;
	int is_perm, has_missing_peer;

	db_rep = dbenv->rep_handle;

	if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE)
		return (TRUE);

	nsites = npeers = 0;
	has_missing_peer = FALSE;
	for (eid = 0; eid < db_rep->site_cnt; eid++) {
		site = SITE_FROM_EID(eid);
		if (site->priority == -1) {
			/*
			 * Never connected to this site: since we can't know
			 * whether it's a peer, assume the worst.
			 */
			has_missing_peer = TRUE;
			continue;
		}

		if (log_compare(&site->max_ack, lsnp) >= 0) {
			nsites++;
			if (site->priority > 0)
				npeers++;
		} else {
			/* This site hasn't ack'ed the message. */
			if (site->priority > 0)
				has_missing_peer = TRUE;
		}
	}

	switch (db_rep->perm_policy) {
	case DB_REPMGR_ACKS_ONE:
		is_perm = (nsites >= 1);
		break;
	case DB_REPMGR_ACKS_ONE_PEER:
		is_perm = (npeers >= 1);
		break;
	case DB_REPMGR_ACKS_QUORUM:
		/*
		 * The minimum number of acks necessary to ensure that the
		 * transaction is durable if an election is held (given that we
		 * always conduct elections according to the standard,
		 * recommended practice of requiring votes from a majority of
		 * sites).
		 */
		if (__repmgr_get_nsites(db_rep) == 2) {
			/*
			 * A group of 2 sites is, as always, a special case.
			 * For a transaction to be durable the other site has to
			 * have received it.
			 */
			is_perm = (npeers >= 1);
		} else
			is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2);
		break;
	case DB_REPMGR_ACKS_ALL:
		/* Adjust by 1, since get_nsites includes local site. */
		is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1);
		break;
	case DB_REPMGR_ACKS_ALL_PEERS:
		if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) {
			/* Assume missing site might be a peer. */
			has_missing_peer = TRUE;
		}
		is_perm = !has_missing_peer;
		break;
	default:
		is_perm = FALSE;
		(void)__db_unknown_path(dbenv, "__repmgr_is_permanent");
	}
	return (is_perm);
}

/*
 * Abandons a connection, to recover from an error.  Upon entry the conn struct
 * must be on the connections list.
 *
 * If the 'do_close' flag is true, we do the whole job; the clean-up includes
 * removing the struct from the list and freeing all its memory, so upon return
 * the caller must not refer to it any further.  Otherwise, we merely mark the
 * connection for clean-up later by the main thread.
 *
 * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
 * PUBLIC:     REPMGR_CONNECTION *, int));
 *
 * !!!
 * Caller holds mutex.
 */
int
__repmgr_bust_connection(dbenv, conn, do_close)
	DB_ENV *dbenv;
	REPMGR_CONNECTION *conn;
	int do_close;
{
	DB_REP *db_rep;
	int connecting, ret, eid;

	db_rep = dbenv->rep_handle;
	ret = 0;

	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
	eid = conn->eid;
	connecting = F_ISSET(conn, CONN_CONNECTING);
	if (do_close)
		__repmgr_cleanup_connection(dbenv, conn);
	else {
		F_SET(conn, CONN_DEFUNCT);
		conn->eid = -1;
	}

	/*
	 * When we first accepted the incoming connection, we set conn->eid to
	 * -1 to indicate that we didn't yet know what site it might be from.
	 * If we then get here because we later decide it was a redundant
	 * connection, the following scary stuff will correctly not happen.
	 */
	if (IS_VALID_EID(eid)) {
		/* schedule_connection_attempt wakes the main thread. */
		if ((ret = __repmgr_schedule_connection_attempt(
		    dbenv, (u_int)eid, FALSE)) != 0)
			return (ret);

		/*
		 * If this connection had gotten no further than the CONNECTING
		 * state, this can't count as a loss of connection to the
		 * master.
		 */
		if (!connecting && eid == db_rep->master_eid) {
			(void)__memp_set_config(
			    dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
			if ((ret = __repmgr_init_election(
			    dbenv, ELECT_FAILURE_ELECTION)) != 0)
				return (ret);
		}
	} else if (!do_close) {
		/*
		 * One way or another, make sure the main thread is poked, so
		 * that we do the deferred clean-up.
		 */
		ret = __repmgr_wake_main_thread(dbenv);
	}
	return (ret);
}

/*
 * PUBLIC: void __repmgr_cleanup_connection
 * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
 */
void
__repmgr_cleanup_connection(dbenv, conn)
	DB_ENV *dbenv;
	REPMGR_CONNECTION *conn;
{
	DB_REP *db_rep;
	QUEUED_OUTPUT *out;
	REPMGR_FLAT *msg;
	DBT *dbt;

	db_rep = dbenv->rep_handle;

	TAILQ_REMOVE(&db_rep->connections, conn, entries);
	if (conn->fd != INVALID_SOCKET) {
		(void)closesocket(conn->fd);
#ifdef DB_WIN32
		(void)WSACloseEvent(conn->event_object);
#endif
	}

	/*
	 * Deallocate any input and output buffers we may have.
	 */
	if (conn->reading_phase == DATA_PHASE) {
		if (conn->msg_type == REPMGR_REP_MESSAGE)
			__os_free(dbenv, conn->input.rep_message);
		else {
			dbt = &conn->input.repmgr_msg.cntrl;
			__os_free(dbenv, dbt->data);
			dbt = &conn->input.repmgr_msg.rec;
			if (dbt->size > 0)
				__os_free(dbenv, dbt->data);
		}
	}
	while (!STAILQ_EMPTY(&conn->outbound_queue)) {
		out = STAILQ_FIRST(&conn->outbound_queue);
		STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
		msg = out->msg;
		if (--msg->ref_count <= 0)
			__os_free(dbenv, msg);
		__os_free(dbenv, out);
	}

	__os_free(dbenv, conn);
}

static int
enqueue_msg(dbenv, conn, msg, offset)
	DB_ENV *dbenv;
	REPMGR_CONNECTION *conn;
	struct sending_msg *msg;
	size_t offset;
{
	QUEUED_OUTPUT *q_element;
	int ret;

	if (msg->fmsg == NULL && ((ret = flatten(dbenv, msg)) != 0))
		return (ret);
	if ((ret = __os_malloc(dbenv, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
		return (ret);
	q_element->msg = msg->fmsg;
	msg->fmsg->ref_count++;	/* encapsulation would be sweeter */
	q_element->offset = offset;

	/* Put it on the connection's outbound queue. */
	STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
	conn->out_queue_length++;
	return (0);
}

/*
 * The 'rec' DBT can be NULL, in which case we treat it like a zero-length DBT.
 * But 'control' is always present.
 */
static void
setup_sending_msg(msg, type, control, rec)
	struct sending_msg *msg;
	u_int type;
	const DBT *control, *rec;
{
	u_int32_t rec_size;

	/*
	 * The wire protocol is documented in a comment at the top of this
	 * module.
	 */
	__repmgr_iovec_init(&msg->iovecs);
	msg->type = type;
	__repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type));

	msg->control_size_buf = htonl(control->size);
	__repmgr_add_buffer(&msg->iovecs,
	    &msg->control_size_buf, sizeof(msg->control_size_buf));

	rec_size = rec == NULL ? 0 : rec->size;
	msg->rec_size_buf = htonl(rec_size);
	__repmgr_add_buffer(
	    &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf));

	if (control->size > 0)
		__repmgr_add_dbt(&msg->iovecs, control);

	if (rec_size > 0)
		__repmgr_add_dbt(&msg->iovecs, rec);

	msg->fmsg = NULL;
}

/*
 * Convert a message stored as iovec pointers to various pieces, into flattened
 * form, by copying all the pieces, and then make the iovec just point to the
 * new simplified form.
 */
static int
flatten(dbenv, msg)
	DB_ENV *dbenv;
	struct sending_msg *msg;
{
	u_int8_t *p;
	size_t msg_size;
	int i, ret;

	DB_ASSERT(dbenv, msg->fmsg == NULL);

	msg_size = msg->iovecs.total_bytes;
	if ((ret = __os_malloc(dbenv, sizeof(*msg->fmsg) + msg_size,
	    &msg->fmsg)) != 0)
		return (ret);
	msg->fmsg->length = msg_size;
	msg->fmsg->ref_count = 0;
	p = &msg->fmsg->data[0];

	for (i = 0; i < msg->iovecs.count; i++) {
		memcpy(p, msg->iovecs.vectors[i].iov_base,
		    msg->iovecs.vectors[i].iov_len);
		p = &p[msg->iovecs.vectors[i].iov_len];
	}
	__repmgr_iovec_init(&msg->iovecs);
	__repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size);
	return (0);
}

/*
 * PUBLIC: int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
 */
int
__repmgr_find_site(dbenv, host, port)
	DB_ENV *dbenv;
	const char *host;
	u_int port;
{
	DB_REP *db_rep;
	REPMGR_SITE *site;
	u_int i;

	db_rep = dbenv->rep_handle;
	for (i = 0; i < db_rep->site_cnt; i++) {
		site = &db_rep->sites[i];

		if (strcmp(site->net_addr.host, host) == 0 &&
		    site->net_addr.port == port)
			return ((int)i);
	}

	return (-1);
}

/*
 * Stash a copy of the given host name and port number into a convenient data
 * structure so that we can save it permanently.  This is kind of like a
 * constructor for a netaddr object, except that the caller supplies the memory
 * for the base struct (though not the subordinate attachments).
 *
 * All inputs are assumed to have been already validated.
 *
 * PUBLIC: int __repmgr_pack_netaddr __P((DB_ENV *, const char *,
 * PUBLIC:     u_int, ADDRINFO *, repmgr_netaddr_t *));
 */
int
__repmgr_pack_netaddr(dbenv, host, port, list, addr)
	DB_ENV *dbenv;
	const char *host;
	u_int port;
	ADDRINFO *list;
	repmgr_netaddr_t *addr;
{
	int ret;

	DB_ASSERT(dbenv, host != NULL);

	if ((ret = __os_strdup(dbenv, host, &addr->host)) != 0)
		return (ret);
	addr->port = (u_int16_t)port;
	addr->address_list = list;
	addr->current = NULL;
	return (0);
}

/*
 * PUBLIC: int __repmgr_getaddr __P((DB_ENV *,
 * PUBLIC:     const char *, u_int, int, ADDRINFO **));
 */
int
__repmgr_getaddr(dbenv, host, port, flags, result)
	DB_ENV *dbenv;
	const char *host;
	u_int port;
	int flags;    /* Matches struct addrinfo declaration. */
	ADDRINFO **result;
{
	ADDRINFO *answer, hints;
	char buffer[10];		/* 2**16 fits in 5 digits. */
#ifdef DB_WIN32
	int ret;
#endif

	/*
	 * Ports are really 16-bit unsigned values, but it's too painful to
	 * push that type through the API.
	 */
	if (port > UINT16_MAX) {
		__db_errx(dbenv, "port %u larger than max port %u",
		    port, UINT16_MAX);
		return (EINVAL);
	}

#ifdef DB_WIN32
	if (!dbenv->rep_handle->wsa_inited &&
	    (ret = __repmgr_wsa_init(dbenv)) != 0)
		return (ret);
#endif

	memset(&hints, 0, sizeof(hints));
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_flags = flags;
	(void)snprintf(buffer, sizeof(buffer), "%u", port);

	/*
	 * Although it's generally bad to discard error information, the return
	 * code from __db_getaddrinfo is undependable.  Our callers at least
	 * would like to be able to distinguish errors in getaddrinfo (which we
	 * want to consider to be re-tryable), from other failure (e.g., EINVAL,
	 * above).
	 */
	if (__db_getaddrinfo(dbenv, host, port, buffer, &hints, &answer) != 0)
		return (DB_REP_UNAVAIL);
	*result = answer;

	return (0);
}

/*
 * Adds a new site to our array of known sites (unless it already exists),
 * and schedules it for immediate connection attempt.  Whether it exists or not,
 * we set newsitep, either to the already existing site, or to the newly created
 * site.  Unless newsitep is passed in as NULL, which is allowed.
 *
 * PUBLIC: int __repmgr_add_site
 * PUBLIC:     __P((DB_ENV *, const char *, u_int, REPMGR_SITE **));
 *
 * !!!
 * Caller is expected to hold the mutex.
 */
int
__repmgr_add_site(dbenv, host, port, newsitep)
	DB_ENV *dbenv;
	const char *host;
	u_int port;
	REPMGR_SITE **newsitep;
{
	DB_REP *db_rep;
	ADDRINFO *address_list;
	repmgr_netaddr_t addr;
	REPMGR_SITE *site;
	int ret, eid;

	ret = 0;
	db_rep = dbenv->rep_handle;

	if (IS_VALID_EID(eid = __repmgr_find_site(dbenv, host, port))) {
		site = SITE_FROM_EID(eid);
		ret = EEXIST;
		goto out;
	}

	if ((ret = __repmgr_getaddr(
	    dbenv, host, port, 0, &address_list)) == DB_REP_UNAVAIL) {
		/* Allow re-tryable errors.  We'll try again later. */
		address_list = NULL;
	} else if (ret != 0)
		return (ret);

	if ((ret = __repmgr_pack_netaddr(
	    dbenv, host, port, address_list, &addr)) != 0) {
		__db_freeaddrinfo(dbenv, address_list);
		return (ret);
	}

	if ((ret = __repmgr_new_site(dbenv, &site, &addr, SITE_IDLE)) != 0) {
		__repmgr_cleanup_netaddr(dbenv, &addr);
		return (ret);
	}

	if (db_rep->selector != NULL &&
	    (ret = __repmgr_schedule_connection_attempt(
	    dbenv, (u_int)EID_FROM_SITE(site), TRUE)) != 0)
		return (ret);

	/* Note that we should only come here for success and EEXIST. */
out:
	if (newsitep != NULL)
		*newsitep = site;
	return (ret);
}

/*
 * Initializes net-related memory in the db_rep handle.
 *
 * PUBLIC: int __repmgr_net_create __P((DB_ENV *, DB_REP *));
 */
int
__repmgr_net_create(dbenv, db_rep)
	DB_ENV *dbenv;
	DB_REP *db_rep;
{
	COMPQUIET(dbenv, NULL);

	db_rep->listen_fd = INVALID_SOCKET;
	db_rep->master_eid = DB_EID_INVALID;

	TAILQ_INIT(&db_rep->connections);
	TAILQ_INIT(&db_rep->retries);

	return (0);
}

/*
 * listen_socket_init --
 *	Initialize a socket for listening.  Sets
 *	a file descriptor for the socket, ready for an accept() call
 *	in a thread that we're happy to let block.
 *
 * PUBLIC:  int __repmgr_listen __P((DB_ENV *));
 */
int
__repmgr_listen(dbenv)
	DB_ENV *dbenv;
{
	DB_REP *db_rep;
	ADDRINFO *ai;
	char *why;
	int sockopt, ret;
	socket_t s;

	db_rep = dbenv->rep_handle;

	/* Use OOB value as sentinel to show no socket open. */
	s = INVALID_SOCKET;
	ai = ADDR_LIST_FIRST(&db_rep->my_addr);

	/*
	 * Given the assert is correct, we execute the loop at least once, which
	 * means 'why' will have been set by the time it's needed.  But I guess
	 * lint doesn't know about DB_ASSERT.
	 */
	COMPQUIET(why, "");
	DB_ASSERT(dbenv, ai != NULL);
	for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) {

		if ((s = socket(ai->ai_family,
		    ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
			why = "can't create listen socket";
			continue;
		}

		/*
		 * When testing, it's common to kill and restart regularly.  On
		 * some systems, this causes bind to fail with "address in use"
		 * errors unless this option is set.
		 */
		sockopt = 1;
		if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
		    sizeof(sockopt)) != 0) {
			why = "can't set REUSEADDR socket option";
			break;
		}

		if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
			why = "can't bind socket to listening address";
			(void)closesocket(s);
			s = INVALID_SOCKET;
			continue;
		}

		if (listen(s, 5) != 0) {
			why = "listen()";
			break;
		}

		if ((ret = __repmgr_set_nonblocking(s)) != 0) {
			__db_err(dbenv, ret, "can't unblock listen socket");
			goto clean;
		}

		db_rep->listen_fd = s;
		return (0);
	}

	ret = net_errno;
	__db_err(dbenv, ret, why);
clean:	if (s != INVALID_SOCKET)
		(void)closesocket(s);
	return (ret);
}

/*
 * PUBLIC: int __repmgr_net_close __P((DB_ENV *));
 */
int
__repmgr_net_close(dbenv)
	DB_ENV *dbenv;
{
	DB_REP *db_rep;
	REPMGR_CONNECTION *conn;
#ifndef DB_WIN32
	struct sigaction sigact;
#endif
	int ret;

	db_rep = dbenv->rep_handle;
	if (db_rep->listen_fd == INVALID_SOCKET)
		return (0);

	TAILQ_FOREACH(conn, &db_rep->connections, entries) {
		if (conn->fd != INVALID_SOCKET) {
			(void)closesocket(conn->fd);
			conn->fd = INVALID_SOCKET;
#ifdef DB_WIN32
			(void)WSACloseEvent(conn->event_object);
#endif
		}
	}

	ret = 0;
	if (closesocket(db_rep->listen_fd) == SOCKET_ERROR)
		ret = net_errno;

#ifdef DB_WIN32
	/* Shut down the Windows sockets DLL. */
	if (WSACleanup() == SOCKET_ERROR && ret == 0)
		ret = net_errno;
	db_rep->wsa_inited = FALSE;
#else
	/* Restore original SIGPIPE handling configuration. */
	if (db_rep->chg_sig_handler) {
		memset(&sigact, 0, sizeof(sigact));
		sigact.sa_handler = SIG_DFL;
		if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0)
			ret = errno;
	}
#endif
	db_rep->listen_fd = INVALID_SOCKET;
	return (ret);
}

/*
 * PUBLIC: void __repmgr_net_destroy __P((DB_ENV *, DB_REP *));
 */
void
__repmgr_net_destroy(dbenv, db_rep)
	DB_ENV *dbenv;
	DB_REP *db_rep;
{
	REPMGR_CONNECTION *conn;
	REPMGR_RETRY *retry;
	REPMGR_SITE *site;
	u_int i;

	__repmgr_cleanup_netaddr(dbenv, &db_rep->my_addr);

	if (db_rep->sites == NULL)
		return;

	while (!TAILQ_EMPTY(&db_rep->retries)) {
		retry = TAILQ_FIRST(&db_rep->retries);
		TAILQ_REMOVE(&db_rep->retries, retry, entries);
		__os_free(dbenv, retry);
	}

	while (!TAILQ_EMPTY(&db_rep->connections)) {
		conn = TAILQ_FIRST(&db_rep->connections);
		__repmgr_cleanup_connection(dbenv, conn);
	}

	for (i = 0; i < db_rep->site_cnt; i++) {
		site = &db_rep->sites[i];
		__repmgr_cleanup_netaddr(dbenv, &site->net_addr);
	}
	__os_free(dbenv, db_rep->sites);
	db_rep->sites = NULL;
}