#include "db_config.h"
#ifndef lint
static const char revid[] = "$Id: rep_util.c,v 1.2 2004/03/30 01:23:56 jtownsen Exp $";
#endif
#ifndef NO_SYSTEM_INCLUDES
#include <stdlib.h>
#include <string.h>
#endif
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_shash.h"
#include "dbinc/btree.h"
#include "dbinc/fop.h"
#include "dbinc/hash.h"
#include "dbinc/log.h"
#include "dbinc/lock.h"
#include "dbinc/qam.h"
#include "dbinc/txn.h"
#ifdef REP_DIAGNOSTIC
static void __rep_print_logmsg __P((DB_ENV *, const DBT *, DB_LSN *));
#endif
int
__rep_check_alloc(dbenv, r, n)
DB_ENV *dbenv;
TXN_RECS *r;
int n;
{
int nalloc, ret;
while (r->nalloc < r->npages + n) {
nalloc = r->nalloc == 0 ? 20 : r->nalloc * 2;
if ((ret = __os_realloc(dbenv, nalloc * sizeof(LSN_PAGE),
&r->array)) != 0)
return (ret);
r->nalloc = nalloc;
}
return (0);
}
int
__rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags)
DB_ENV *dbenv;
int eid;
u_int32_t rtype;
DB_LSN *lsnp;
const DBT *dbtp;
u_int32_t flags;
{
DB_REP *db_rep;
REP *rep;
DBT cdbt, scrap_dbt;
REP_CONTROL cntrl;
int ret;
u_int32_t myflags, rectype;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
memset(&cntrl, 0, sizeof(cntrl));
if (lsnp == NULL)
ZERO_LSN(cntrl.lsn);
else
cntrl.lsn = *lsnp;
cntrl.rectype = rtype;
cntrl.flags = flags;
cntrl.rep_version = DB_REPVERSION;
cntrl.log_version = DB_LOGVERSION;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
cntrl.gen = rep->gen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
memset(&cdbt, 0, sizeof(cdbt));
cdbt.data = &cntrl;
cdbt.size = sizeof(cntrl);
if (dbtp == NULL) {
memset(&scrap_dbt, 0, sizeof(DBT));
dbtp = &scrap_dbt;
}
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__rep_print_message(dbenv, eid, &cntrl, "rep_send_message");
#endif
#ifdef REP_DIAGNOSTIC
if (rtype == REP_LOG)
__rep_print_logmsg(dbenv, dbtp, lsnp);
#endif
myflags = 0;
if (LF_ISSET(DB_LOG_PERM))
myflags = DB_REP_PERMANENT;
else if (rtype != REP_LOG)
myflags = DB_REP_NOBUFFER;
else {
memcpy(&rectype, dbtp->data, sizeof(rectype));
if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
F_SET(&cntrl, DB_LOG_PERM);
}
ret = dbenv->rep_send(dbenv, &cdbt, dbtp, &cntrl.lsn, eid, myflags);
if (ret == 0)
rep->stat.st_msgs_sent++;
else
rep->stat.st_msgs_send_failures++;
#ifdef DIAGNOSTIC
if (ret != 0 && FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "rep_send_function returned: %d", ret);
#endif
return (ret);
}
#ifdef REP_DIAGNOSTIC
static void
__rep_print_logmsg(dbenv, logdbt, lsnp)
DB_ENV *dbenv;
const DBT *logdbt;
DB_LSN *lsnp;
{
static int (**ptab)__P((DB_ENV *,
DBT *, DB_LSN *, db_recops, void *)) = NULL;
size_t ptabsize = 0;
if (ptabsize == 0) {
(void)__bam_init_print(dbenv, &ptab, &ptabsize);
(void)__crdel_init_print(dbenv, &ptab, &ptabsize);
(void)__db_init_print(dbenv, &ptab, &ptabsize);
(void)__dbreg_init_print(dbenv, &ptab, &ptabsize);
(void)__fop_init_print(dbenv, &ptab, &ptabsize);
(void)__ham_init_print(dbenv, &ptab, &ptabsize);
(void)__qam_init_print(dbenv, &ptab, &ptabsize);
(void)__txn_init_print(dbenv, &ptab, &ptabsize);
}
(void)__db_dispatch(dbenv,
ptab, ptabsize, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
}
#endif
int
__rep_new_master(dbenv, cntrl, eid)
DB_ENV *dbenv;
REP_CONTROL *cntrl;
int eid;
{
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN last_lsn, lsn;
DB_REP *db_rep;
DBT dbt;
LOG *lp;
REP *rep;
int change, ret, t_ret;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
ret = 0;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
__rep_elect_done(dbenv, rep);
change = rep->gen != cntrl->gen || rep->master_id != eid;
if (change) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Updating gen from %lu to %lu from master %d",
(u_long)rep->gen, (u_long)cntrl->gen, eid);
#endif
rep->gen = cntrl->gen;
if (rep->egen <= rep->gen)
rep->egen = rep->gen + 1;
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Updating egen to %lu", (u_long)rep->egen);
#endif
rep->master_id = eid;
rep->stat.st_master_changes++;
F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER);
}
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
R_LOCK(dbenv, &dblp->reginfo);
last_lsn = lsn = lp->lsn;
if (last_lsn.offset > sizeof(LOGP))
last_lsn.offset -= lp->len;
R_UNLOCK(dbenv, &dblp->reginfo);
if (!change) {
ret = 0;
if (F_ISSET(rep, REP_F_RECOVER)) {
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lsn = lp->verify_lsn;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (!IS_ZERO_LSN(lsn))
(void)__rep_send_message(dbenv, eid,
REP_VERIFY_REQ, &last_lsn, NULL, 0);
} else {
if (log_compare(&lsn, &cntrl->lsn) < 0)
(void)__rep_send_message(dbenv,
eid, REP_ALL_REQ, &lsn, NULL, 0);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
F_CLR(rep, REP_F_NOARCHIVE);
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
return (ret);
}
if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
empty: MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
F_CLR(rep, REP_F_NOARCHIVE | REP_F_READY | REP_F_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (!IS_INIT_LSN(cntrl->lsn))
(void)__rep_send_message(dbenv, rep->master_id,
REP_ALL_REQ, &lsn, NULL, 0);
return (DB_REP_NEWMASTER);
} else if (last_lsn.offset <= sizeof(LOGP)) {
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
memset(&dbt, 0, sizeof(dbt));
ret = __log_c_get(logc, &last_lsn, &dbt, DB_LAST);
if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (ret == DB_NOTFOUND)
goto empty;
if (ret != 0) {
if (change) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
F_CLR(rep, REP_F_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
return (ret);
}
}
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lp->verify_lsn = last_lsn;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
(void)__rep_send_message(dbenv,
eid, REP_VERIFY_REQ, &last_lsn, NULL, 0);
return (DB_REP_NEWMASTER);
}
int
__rep_is_client(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REP *rep;
int ret;
if (!REP_ON(dbenv))
return (0);
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
ret = F_ISSET(rep, REP_F_UPGRADE | REP_F_LOGSONLY);
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
return (ret);
}
int
__rep_noarchive(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REP *rep;
if (!REP_ON(dbenv))
return (0);
db_rep = dbenv->rep_handle;
rep = db_rep->region;
return (F_ISSET(rep, REP_F_NOARCHIVE));
}
void
__rep_send_vote(dbenv, lsnp, nsites, pri, tiebreaker, egen, eid, vtype)
DB_ENV *dbenv;
DB_LSN *lsnp;
int eid, nsites, pri, tiebreaker;
u_int32_t egen, vtype;
{
DBT vote_dbt;
REP_VOTE_INFO vi;
memset(&vi, 0, sizeof(vi));
vi.egen = egen;
vi.priority = pri;
vi.nsites = nsites;
vi.tiebreaker = tiebreaker;
memset(&vote_dbt, 0, sizeof(vote_dbt));
vote_dbt.data = &vi;
vote_dbt.size = sizeof(vi);
(void)__rep_send_message(dbenv, eid, vtype, lsnp, &vote_dbt, 0);
}
void
__rep_elect_done(dbenv, rep)
DB_ENV *dbenv;
REP *rep;
{
int inelect;
#ifndef DIAGNOSTIC
COMPQUIET(dbenv, NULL);
#endif
inelect = IN_ELECTION_TALLY(rep);
F_CLR(rep, REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY);
rep->sites = 0;
rep->votes = 0;
if (inelect)
rep->egen++;
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Election done; egen %lu", (u_long)rep->egen);
#endif
}
int
__rep_grow_sites(dbenv, nsites)
DB_ENV *dbenv;
int nsites;
{
REGENV *renv;
REGINFO *infop;
REP *rep;
int nalloc, ret, *tally;
rep = ((DB_REP *)dbenv->rep_handle)->region;
nalloc = 2 * rep->asites;
if (nalloc < nsites)
nalloc = nsites;
infop = dbenv->reginfo;
renv = infop->primary;
MUTEX_LOCK(dbenv, &renv->mutex);
if ((ret = __db_shalloc(infop->addr,
nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY),
&tally)) == 0) {
if (rep->tally_off != INVALID_ROFF)
__db_shalloc_free(infop->addr,
R_ADDR(infop, rep->tally_off));
rep->tally_off = R_OFFSET(infop, tally);
if ((ret = __db_shalloc(infop->addr,
nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY),
&tally)) == 0) {
if (rep->v2tally_off != INVALID_ROFF)
__db_shalloc_free(infop->addr,
R_ADDR(infop, rep->v2tally_off));
rep->v2tally_off = R_OFFSET(infop, tally);
rep->asites = nalloc;
rep->nsites = nsites;
} else {
if (rep->v2tally_off != INVALID_ROFF)
__db_shalloc_free(infop->addr,
R_ADDR(infop, rep->v2tally_off));
__db_shalloc_free(infop->addr,
R_ADDR(infop, rep->tally_off));
rep->v2tally_off = rep->tally_off = INVALID_ROFF;
rep->asites = 0;
rep->nsites = 0;
}
}
MUTEX_UNLOCK(dbenv, &renv->mutex);
return (ret);
}
void
__env_rep_enter(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REP *rep;
int cnt;
if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
return;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
for (cnt = 0; rep->in_recovery;) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
(void)__os_sleep(dbenv, 1, 0);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (++cnt % 60 == 0)
__db_err(dbenv,
"DB_ENV handle waiting %d minutes for replication recovery to complete",
cnt / 60);
}
rep->handle_cnt++;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
void
__env_rep_exit(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REP *rep;
if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
return;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
rep->handle_cnt--;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
int
__db_rep_enter(dbp, checkgen, return_now)
DB *dbp;
int checkgen, return_now;
{
DB_ENV *dbenv;
DB_REP *db_rep;
REP *rep;
dbenv = dbp->dbenv;
if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (F_ISSET(rep, REP_F_READY)) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (!return_now)
(void)__os_sleep(dbenv, 5, 0);
return (DB_LOCK_DEADLOCK);
}
if (checkgen && dbp->timestamp != rep->timestamp) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
__db_err(dbenv, "%s %s",
"replication recovery unrolled committed transactions;",
"open DB and DBcursor handles must be closed");
return (DB_REP_HANDLE_DEAD);
}
rep->handle_cnt++;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
return (0);
}
void
__db_rep_exit(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REP *rep;
if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
return;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
rep->handle_cnt--;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
void
__op_rep_enter(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REP *rep;
int cnt;
if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
return;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
for (cnt = 0; F_ISSET(rep, REP_F_READY);) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
(void)__os_sleep(dbenv, 5, 0);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (++cnt % 60 == 0)
__db_err(dbenv,
"__op_rep_enter waiting %d minutes for op count to drain",
cnt / 60);
}
rep->op_cnt++;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
void
__op_rep_exit(dbenv)
DB_ENV *dbenv;
{
DB_REP *db_rep;
REP *rep;
if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
return;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
DB_ASSERT(rep->op_cnt > 0);
rep->op_cnt--;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
void
__rep_get_gen(dbenv, genp)
DB_ENV *dbenv;
u_int32_t *genp;
{
DB_REP *db_rep;
REP *rep;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (rep->recover_gen > rep->gen)
*genp = rep->recover_gen;
else
*genp = rep->gen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
#ifdef NOTYET
static int __rep_send_file __P((DB_ENV *, DBT *, u_int32_t));
static int
__rep_send_file(dbenv, rec, eid)
DB_ENV *dbenv;
DBT *rec;
u_int32_t eid;
{
DB *dbp;
DB_LOCK lk;
DB_MPOOLFILE *mpf;
DBC *dbc;
DBT rec_dbt;
PAGE *pagep;
db_pgno_t last_pgno, pgno;
int ret, t_ret;
dbp = NULL;
dbc = NULL;
pagep = NULL;
mpf = NULL;
LOCK_INIT(lk);
if ((ret = db_create(&dbp, dbenv, 0)) != 0)
goto err;
if ((ret = __db_open(
dbp, rec->data, NULL, DB_UNKNOWN, 0, 0, PGNO_BASE_MD)) != 0)
goto err;
if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
goto err;
memset(&rec_dbt, 0, sizeof(rec_dbt));
last_pgno = 1;
for (pgno = 0; pgno <= last_pgno; pgno++) {
if ((ret = __db_lget(dbc, 0, pgno, DB_LOCK_READ, 0, &lk)) != 0)
goto err;
if ((ret = __memp_fget(mpf, &pgno, 0, &pagep)) != 0)
goto err;
if (pgno == 0)
last_pgno = ((DBMETA *)pagep)->last_pgno;
rec_dbt.data = pagep;
rec_dbt.size = dbp->pgsize;
if (__rep_send_message(dbenv, eid,
REP_FILE, NULL, &rec_dbt, pgno == last_pgno) != 0)
break;
ret = __memp_fput(mpf, pagep, 0);
pagep = NULL;
if (ret != 0)
goto err;
ret = __LPUT(dbc, lk);
LOCK_INIT(lk);
if (ret != 0)
goto err;
}
err: if (LOCK_ISSET(lk) && (t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
ret = t_ret;
if (dbc != NULL && (t_ret = __db_c_close(dbc)) != 0 && ret == 0)
ret = t_ret;
if (pagep != NULL &&
(t_ret = __memp_fput(mpf, pagep, 0)) != 0 && ret == 0)
ret = t_ret;
if (dbp != NULL && (t_ret = __db_close(dbp, NULL, 0)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
#endif
#ifdef DIAGNOSTIC
void
__rep_print_message(dbenv, eid, rp, str)
DB_ENV *dbenv;
int eid;
REP_CONTROL *rp;
char *str;
{
char *type;
switch (rp->rectype) {
case REP_ALIVE:
type = "alive";
break;
case REP_ALIVE_REQ:
type = "alive_req";
break;
case REP_ALL_REQ:
type = "all_req";
break;
case REP_DUPMASTER:
type = "dupmaster";
break;
case REP_FILE:
type = "file";
break;
case REP_FILE_REQ:
type = "file_req";
break;
case REP_LOG:
type = "log";
break;
case REP_LOG_MORE:
type = "log_more";
break;
case REP_LOG_REQ:
type = "log_req";
break;
case REP_MASTER_REQ:
type = "master_req";
break;
case REP_NEWCLIENT:
type = "newclient";
break;
case REP_NEWFILE:
type = "newfile";
break;
case REP_NEWMASTER:
type = "newmaster";
break;
case REP_NEWSITE:
type = "newsite";
break;
case REP_PAGE:
type = "page";
break;
case REP_PAGE_REQ:
type = "page_req";
break;
case REP_PLIST:
type = "plist";
break;
case REP_PLIST_REQ:
type = "plist_req";
break;
case REP_VERIFY:
type = "verify";
break;
case REP_VERIFY_FAIL:
type = "verify_fail";
break;
case REP_VERIFY_REQ:
type = "verify_req";
break;
case REP_VOTE1:
type = "vote1";
break;
case REP_VOTE2:
type = "vote2";
break;
default:
type = "NOTYPE";
break;
}
__db_err(dbenv, "%s %s: gen = %lu eid %d, type %s, LSN [%lu][%lu]",
dbenv->db_home, str, (u_long)rp->gen,
eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset);
}
#endif