repmgr_queue.c   [plain text]


/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2006,2007 Oracle.  All rights reserved.
 *
 * $Id: repmgr_queue.c,v 1.9 2007/05/17 15:15:51 bostic Exp $
 */

#include "db_config.h"

#define	__INCLUDE_NETWORKING	1
#include "db_int.h"

typedef STAILQ_HEAD(__repmgr_q_header, __repmgr_message) QUEUE_HEADER;
struct __repmgr_queue {
	int size;
	QUEUE_HEADER header;
};

/*
 * PUBLIC: int __repmgr_queue_create __P((DB_ENV *, DB_REP *));
 */
int
__repmgr_queue_create(dbenv, db_rep)
	DB_ENV *dbenv;
	DB_REP *db_rep;
{
	REPMGR_QUEUE *q;
	int ret;

	if ((ret = __os_calloc(dbenv, 1, sizeof(REPMGR_QUEUE), &q)) != 0)
		return (ret);
	q->size = 0;
	STAILQ_INIT(&q->header);
	db_rep->input_queue = q;
	return (0);
}

/*
 * Frees not only the queue header, but also any messages that may be on it,
 * along with their data buffers.
 *
 * PUBLIC: void __repmgr_queue_destroy __P((DB_ENV *));
 */
void
__repmgr_queue_destroy(dbenv)
	DB_ENV *dbenv;
{
	REPMGR_QUEUE *q;
	REPMGR_MESSAGE *m;

	if ((q = dbenv->rep_handle->input_queue) == NULL)
		return;

	while (!STAILQ_EMPTY(&q->header)) {
		m = STAILQ_FIRST(&q->header);
		STAILQ_REMOVE_HEAD(&q->header, entries);
		__os_free(dbenv, m);
	}
	__os_free(dbenv, q);
}

/*
 * PUBLIC: int __repmgr_queue_get __P((DB_ENV *, REPMGR_MESSAGE **));
 *
 * Get the first input message from the queue and return it to the caller.  The
 * caller hereby takes responsibility for the entire message buffer, and should
 * free it when done.
 *
 * Note that caller is NOT expected to hold the mutex.  This is asymmetric with
 * put(), because put() is expected to be called in a loop after select, where
 * it's already necessary to be holding the mutex.
 */
int
__repmgr_queue_get(dbenv, msgp)
	DB_ENV *dbenv;
	REPMGR_MESSAGE **msgp;
{
	DB_REP *db_rep;
	REPMGR_QUEUE *q;
	REPMGR_MESSAGE *m;
	int ret;

	ret = 0;
	db_rep = dbenv->rep_handle;
	q = db_rep->input_queue;

	LOCK_MUTEX(db_rep->mutex);
	while (STAILQ_EMPTY(&q->header) && !db_rep->finished) {
#ifdef DB_WIN32
		if (!ResetEvent(db_rep->queue_nonempty)) {
			ret = GetLastError();
			goto err;
		}
		if (SignalObjectAndWait(db_rep->mutex, db_rep->queue_nonempty,
			INFINITE, FALSE) != WAIT_OBJECT_0) {
			ret = GetLastError();
			goto err;
		}
		LOCK_MUTEX(db_rep->mutex);
#else
		if ((ret = pthread_cond_wait(&db_rep->queue_nonempty,
		    &db_rep->mutex)) != 0)
			goto err;
#endif
	}
	if (db_rep->finished)
		ret = DB_REP_UNAVAIL;
	else {
		m = STAILQ_FIRST(&q->header);
		STAILQ_REMOVE_HEAD(&q->header, entries);
		q->size--;
		*msgp = m;
	}

err:
	UNLOCK_MUTEX(db_rep->mutex);
	return (ret);
}

/*
 * PUBLIC: int __repmgr_queue_put __P((DB_ENV *, REPMGR_MESSAGE *));
 *
 * !!!
 * Caller must hold repmgr->mutex.
 */
int
__repmgr_queue_put(dbenv, msg)
	DB_ENV *dbenv;
	REPMGR_MESSAGE *msg;
{
	DB_REP *db_rep;
	REPMGR_QUEUE *q;

	db_rep = dbenv->rep_handle;
	q = db_rep->input_queue;

	STAILQ_INSERT_TAIL(&q->header, msg, entries);
	q->size++;

	return (__repmgr_signal(&db_rep->queue_nonempty));
}

/*
 * PUBLIC: int __repmgr_queue_size __P((DB_ENV *));
 *
 * !!!
 * Caller must hold repmgr->mutex.
 */
int
__repmgr_queue_size(dbenv)
	DB_ENV *dbenv;
{
	return (dbenv->rep_handle->input_queue->size);
}