#include "db_config.h"
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
#ifdef REP_DIAGNOSTIC
#include "dbinc/db_page.h"
#include "dbinc/fop.h"
#include "dbinc/btree.h"
#include "dbinc/hash.h"
#include "dbinc/qam.h"
#endif
#define TIMESTAMP_CHECK(env, ts, renv) do { \
if (renv->op_timestamp != 0 && \
renv->op_timestamp + DB_REGENV_TIMEOUT < ts) { \
REP_SYSTEM_LOCK(env); \
F_CLR(renv, DB_REGENV_REPLOCKED); \
renv->op_timestamp = 0; \
REP_SYSTEM_UNLOCK(env); \
} \
} while (0)
static int __rep_lockout_int __P((ENV *, REP *, u_int32_t *, u_int32_t,
const char *, u_int32_t));
static int __rep_newmaster_empty __P((ENV *, int));
#ifdef REP_DIAGNOSTIC
static void __rep_print_logmsg __P((ENV *, const DBT *, DB_LSN *));
#endif
int
__rep_bulk_message(env, bulk, repth, lsn, dbt, flags)
ENV *env;
REP_BULK *bulk;
REP_THROTTLE *repth;
DB_LSN *lsn;
const DBT *dbt;
u_int32_t flags;
{
DB_REP *db_rep;
REP *rep;
__rep_bulk_args b_args;
size_t len;
int ret;
u_int32_t recsize, typemore;
u_int8_t *p;
db_rep = env->rep_handle;
rep = db_rep->region;
ret = 0;
recsize = dbt->size + sizeof(DB_LSN) + sizeof(dbt->size);
MUTEX_LOCK(env, rep->mtx_clientdb);
while (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
__os_yield(env, 1, 0);
MUTEX_LOCK(env, rep->mtx_clientdb);
}
if (recsize > bulk->len) {
RPRINT(env, DB_VERB_REP_MSGS, (env,
"bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x",
recsize, recsize, bulk->len));
STAT(rep->stat.st_bulk_overflows++);
(void)__rep_send_bulk(env, bulk, flags);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (DB_REP_BULKOVF);
}
while (recsize + *(bulk->offp) > bulk->len) {
RPRINT(env, DB_VERB_REP_MSGS, (env,
"bulk_msg: Record %lu (%#lx) doesn't fit. Send %lu (%#lx) now.",
(u_long)recsize, (u_long)recsize,
(u_long)bulk->len, (u_long)bulk->len));
STAT(rep->stat.st_bulk_fills++);
if ((ret = __rep_send_bulk(env, bulk, flags)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
}
if (bulk->type == REP_BULK_LOG)
typemore = REP_LOG_MORE;
else
typemore = REP_PAGE_MORE;
if (repth != NULL) {
if ((ret = __rep_send_throttle(env,
bulk->eid, repth, REP_THROTTLE_ONLY, flags)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
if (repth->type == typemore) {
RPRINT(env, DB_VERB_REP_MSGS, (env,
"bulk_msg: Record %lu (0x%lx) hit throttle limit.",
(u_long)recsize, (u_long)recsize));
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
}
p = bulk->addr + *(bulk->offp);
b_args.len = dbt->size;
b_args.lsn = *lsn;
b_args.bulkdata = *dbt;
if (*(bulk->offp) == 0)
bulk->lsn = *lsn;
if (rep->version < DB_REPVERSION_47) {
len = 0;
memcpy(p, &dbt->size, sizeof(dbt->size));
p += sizeof(dbt->size);
memcpy(p, lsn, sizeof(DB_LSN));
p += sizeof(DB_LSN);
memcpy(p, dbt->data, dbt->size);
p += dbt->size;
} else if ((ret = __rep_bulk_marshal(env, &b_args, p,
bulk->len, &len)) != 0)
goto err;
*(bulk->offp) = (uintptr_t)p + (uintptr_t)len - (uintptr_t)bulk->addr;
STAT(rep->stat.st_bulk_records++);
if (LF_ISSET(REPCTL_PERM)) {
RPRINT(env, DB_VERB_REP_MSGS, (env,
"bulk_msg: Send buffer after copy due to PERM"));
ret = __rep_send_bulk(env, bulk, flags);
}
err:
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
int
__rep_send_bulk(env, bulkp, ctlflags)
ENV *env;
REP_BULK *bulkp;
u_int32_t ctlflags;
{
DBT dbt;
DB_REP *db_rep;
REP *rep;
int ret;
if (*(bulkp->offp) == 0)
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
FLD_SET(*(bulkp->flagsp), BULK_XMIT);
DB_INIT_DBT(dbt, bulkp->addr, *(bulkp->offp));
MUTEX_UNLOCK(env, rep->mtx_clientdb);
RPRINT(env, DB_VERB_REP_MSGS, (env,
"send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size));
STAT(rep->stat.st_bulk_transfers++);
if ((ret = __rep_send_message(env,
bulkp->eid, bulkp->type, &bulkp->lsn, &dbt, ctlflags, 0)) != 0)
ret = DB_REP_UNAVAIL;
MUTEX_LOCK(env, rep->mtx_clientdb);
*(bulkp->offp) = 0;
FLD_CLR(*(bulkp->flagsp), BULK_XMIT);
return (ret);
}
int
__rep_bulk_alloc(env, bulkp, eid, offp, flagsp, type)
ENV *env;
REP_BULK *bulkp;
int eid;
uintptr_t *offp;
u_int32_t *flagsp, type;
{
int ret;
memset(bulkp, 0, sizeof(REP_BULK));
*offp = *flagsp = 0;
bulkp->len = MEGABYTE;
if ((ret = __os_malloc(env, bulkp->len, &bulkp->addr)) != 0)
return (ret);
bulkp->offp = offp;
bulkp->type = type;
bulkp->eid = eid;
bulkp->flagsp = flagsp;
return (ret);
}
int
__rep_bulk_free(env, bulkp, flags)
ENV *env;
REP_BULK *bulkp;
u_int32_t flags;
{
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
MUTEX_LOCK(env, db_rep->region->mtx_clientdb);
ret = __rep_send_bulk(env, bulkp, flags);
MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb);
__os_free(env, bulkp->addr);
return (ret);
}
int
__rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags)
ENV *env;
int eid;
u_int32_t rtype;
DB_LSN *lsnp;
const DBT *dbt;
u_int32_t ctlflags, repflags;
{
DBT cdbt, scrap_dbt;
DB_ENV *dbenv;
DB_LOG *dblp;
DB_REP *db_rep;
LOG *lp;
REP *rep;
REP_46_CONTROL cntrl46;
REP_OLD_CONTROL ocntrl;
__rep_control_args cntrl;
db_timespec msg_time;
int ret;
u_int32_t myflags, rectype;
u_int8_t buf[__REP_CONTROL_SIZE];
size_t len;
dbenv = env->dbenv;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
#if defined(DEBUG_ROP) || defined(DEBUG_WOP)
if (db_rep->send == NULL)
return (0);
#endif
memset(&cntrl, 0, sizeof(cntrl));
memset(&ocntrl, 0, sizeof(ocntrl));
memset(&cntrl46, 0, sizeof(cntrl46));
if (lsnp == NULL)
ZERO_LSN(cntrl.lsn);
else
cntrl.lsn = *lsnp;
if (rep->version == DB_REPVERSION)
cntrl.rectype = rtype;
else if (rep->version < DB_REPVERSION) {
cntrl.rectype = __rep_msg_to_old(rep->version, rtype);
RPRINT(env, DB_VERB_REP_MSGS, (env,
"rep_send_msg: rtype %lu to version %lu record %lu.",
(u_long)rtype, (u_long)rep->version,
(u_long)cntrl.rectype));
if (cntrl.rectype == REP_INVALID)
return (ret);
} else {
__db_errx(env,
"rep_send_message: Unknown rep version %lu, my version %lu",
(u_long)rep->version, (u_long)DB_REPVERSION);
return (__env_panic(env, EINVAL));
}
cntrl.flags = ctlflags;
cntrl.rep_version = rep->version;
cntrl.log_version = lp->persist.version;
cntrl.gen = rep->gen;
if (dbt == NULL) {
memset(&scrap_dbt, 0, sizeof(DBT));
dbt = &scrap_dbt;
}
myflags = repflags;
if (FLD_ISSET(ctlflags, REPCTL_PERM))
myflags |= DB_REP_PERMANENT;
else if (rtype != REP_LOG || FLD_ISSET(ctlflags, REPCTL_RESEND))
myflags |= DB_REP_NOBUFFER;
if (rtype == REP_LOG && !FLD_ISSET(ctlflags, REPCTL_PERM)) {
LOGCOPY_32(env, &rectype, dbt->data);
if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
F_SET(&cntrl, REPCTL_PERM);
}
if (F_ISSET(rep, REP_F_GROUP_ESTD))
F_SET(&cntrl, REPCTL_GROUP_ESTD);
if (rep->version != DB_REPVERSION)
FLD_CLR(myflags, DB_REP_ANYWHERE);
if (IS_REP_MASTER(env) && IS_USING_LEASES(env) &&
FLD_ISSET(ctlflags, REPCTL_PERM)) {
F_SET(&cntrl, REPCTL_LEASE);
DB_ASSERT(env, rep->version == DB_REPVERSION);
__os_gettime(env, &msg_time, 1);
cntrl.msg_sec = (u_int32_t)msg_time.tv_sec;
cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec;
}
REP_PRINT_MESSAGE(env, eid, &cntrl, "rep_send_message", myflags);
#ifdef REP_DIAGNOSTIC
if (FLD_ISSET(
env->dbenv->verbose, DB_VERB_REP_MSGS) && rtype == REP_LOG)
__rep_print_logmsg(env, dbt, lsnp);
#endif
DB_ASSERT(env, !FLD_ISSET(myflags, DB_REP_PERMANENT) ||
!IS_ZERO_LSN(cntrl.lsn));
memset(&cdbt, 0, sizeof(cdbt));
if (rep->version <= DB_REPVERSION_45) {
if (rep->version == DB_REPVERSION_45 &&
F_ISSET(&cntrl, REPCTL_INIT)) {
F_CLR(&cntrl, REPCTL_INIT);
F_SET(&cntrl, REPCTL_INIT_45);
}
ocntrl.rep_version = cntrl.rep_version;
ocntrl.log_version = cntrl.log_version;
ocntrl.lsn = cntrl.lsn;
ocntrl.rectype = cntrl.rectype;
ocntrl.gen = cntrl.gen;
ocntrl.flags = cntrl.flags;
cdbt.data = &ocntrl;
cdbt.size = sizeof(ocntrl);
} else if (rep->version == DB_REPVERSION_46) {
cntrl46.rep_version = cntrl.rep_version;
cntrl46.log_version = cntrl.log_version;
cntrl46.lsn = cntrl.lsn;
cntrl46.rectype = cntrl.rectype;
cntrl46.gen = cntrl.gen;
cntrl46.msg_time.tv_sec = (time_t)cntrl.msg_sec;
cntrl46.msg_time.tv_nsec = (long)cntrl.msg_nsec;
cntrl46.flags = cntrl.flags;
cdbt.data = &cntrl46;
cdbt.size = sizeof(cntrl46);
} else {
(void)__rep_control_marshal(env, &cntrl, buf,
__REP_CONTROL_SIZE, &len);
DB_INIT_DBT(cdbt, buf, len);
}
ret = db_rep->send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags);
if (ret != 0) {
RPRINT(env, DB_VERB_REP_MSGS, (env,
"rep_send_function returned: %d", ret));
#ifdef HAVE_STATISTICS
rep->stat.st_msgs_send_failures++;
} else
rep->stat.st_msgs_sent++;
#else
}
#endif
return (ret);
}
#ifdef REP_DIAGNOSTIC
static void
__rep_print_logmsg(env, logdbt, lsnp)
ENV *env;
const DBT *logdbt;
DB_LSN *lsnp;
{
static int first = 1;
static DB_DISTAB dtab;
if (first) {
first = 0;
(void)__bam_init_print(env, &dtab);
(void)__crdel_init_print(env, &dtab);
(void)__db_init_print(env, &dtab);
(void)__dbreg_init_print(env, &dtab);
(void)__fop_init_print(env, &dtab);
(void)__ham_init_print(env, &dtab);
(void)__qam_init_print(env, &dtab);
(void)__txn_init_print(env, &dtab);
}
(void)__db_dispatch(
env, &dtab, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
}
#endif
int
__rep_new_master(env, cntrl, eid)
ENV *env;
__rep_control_args *cntrl;
int eid;
{
DBT dbt;
DB_ENV *dbenv;
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN first_lsn, lsn;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
LOG *lp;
REGENV *renv;
REGINFO *infop;
REP *rep;
db_timeout_t lease_to;
u_int32_t unused;
int change, do_req, lockout, ret, t_ret;
dbenv = env->dbenv;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
logc = NULL;
lockout = 0;
REP_SYSTEM_LOCK(env);
change = rep->gen != cntrl->gen || rep->master_id != eid;
if (change) {
if (F_ISSET(rep, REP_F_READY_MSG))
goto lckout;
if ((ret = __rep_lockout_msg(env, rep, 1)) != 0)
goto errlck;
lockout = 1;
if (IS_USING_LEASES(env) &&
((lease_to = __rep_lease_waittime(env)) != 0)) {
REP_SYSTEM_UNLOCK(env);
__os_yield(env, 0, (u_long)lease_to);
REP_SYSTEM_LOCK(env);
}
if ((ret = __env_init_rec(env, cntrl->log_version)) != 0)
goto errlck;
REP_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, rep->mtx_clientdb);
__os_gettime(env, &lp->rcvd_ts, 1);
lp->wait_ts = rep->request_gap;
ZERO_LSN(lp->verify_lsn);
ZERO_LSN(lp->waiting_lsn);
ZERO_LSN(lp->max_wait_lsn);
if (db_rep->rep_db == NULL &&
(ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
goto err;
}
REP_SYSTEM_LOCK(env);
if (F_ISSET(rep, REP_F_READY_API | REP_F_READY_OP)) {
ret = __rep_init_cleanup(env, rep, DB_FORCE);
F_CLR(rep, REP_F_RECOVER_MASK);
}
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (ret != 0) {
goto errlck;
}
ENV_GET_THREAD_INFO(env, ip);
if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &unused))
!= 0)
goto errlck;
__rep_elect_done(env, rep, 1);
RPRINT(env, DB_VERB_REP_MISC, (env,
"Updating gen from %lu to %lu from master %d",
(u_long)rep->gen, (u_long)cntrl->gen, eid));
rep->gen = cntrl->gen;
(void)__rep_write_gen(env, rep->gen);
if (rep->egen <= rep->gen)
rep->egen = rep->gen + 1;
rep->master_id = eid;
STAT(rep->stat.st_master_changes++);
rep->stat.st_startup_complete = 0;
__log_set_version(env, cntrl->log_version);
rep->version = cntrl->rep_version;
RPRINT(env, DB_VERB_REP_MISC, (env,
"egen: %lu. rep version %lu",
(u_long)rep->egen, (u_long)rep->version));
if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT))
F_SET(rep, REP_F_DELAY);
F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER_VERIFY);
F_CLR(rep, REP_F_READY_MSG);
lockout = 0;
} else
__rep_elect_done(env, rep, 1);
REP_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, rep->mtx_clientdb);
lsn = lp->ready_lsn;
if (!change) {
ret = 0;
do_req = __rep_check_doreq(env, rep);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (do_req &&
(F_ISSET(rep, REP_F_RECOVER_MASK) ||
LOG_COMPARE(&lsn, &cntrl->lsn) < 0)) {
ret = __rep_resend_req(env, 0);
if (ret != 0)
RPRINT(env, DB_VERB_REP_MISC, (env,
"resend_req ret is %lu", (u_long)ret));
}
if (!F_ISSET(rep, REP_F_RECOVER_MASK)) {
REP_SYSTEM_LOCK(env);
F_CLR(rep, REP_F_NOARCHIVE);
REP_SYSTEM_UNLOCK(env);
}
return (ret);
}
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
if ((ret = __rep_newmaster_empty(env, eid)) != 0)
goto err;
(void)__memp_set_config(dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
return (DB_REP_NEWMASTER);
}
memset(&dbt, 0, sizeof(dbt));
if (cntrl->lsn.file < lsn.file) {
if ((ret = __log_cursor(env, &logc)) != 0)
goto err;
ret = __logc_get(logc, &first_lsn, &dbt, DB_FIRST);
if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (ret == DB_NOTFOUND)
goto notfound;
else if (ret != 0)
goto err;
if (cntrl->lsn.file < first_lsn.file)
goto notfound;
}
if ((ret = __log_cursor(env, &logc)) != 0)
goto err;
ret = __rep_log_backup(env, rep, logc, &lsn);
if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (ret == DB_NOTFOUND)
goto notfound;
else if (ret != 0)
goto err;
MUTEX_LOCK(env, rep->mtx_clientdb);
lp->verify_lsn = lsn;
__os_gettime(env, &lp->rcvd_ts, 1);
lp->wait_ts = rep->request_gap;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (!F_ISSET(rep, REP_F_DELAY))
(void)__rep_send_message(env,
eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE);
(void)__memp_set_config(dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
return (DB_REP_NEWMASTER);
err:
REP_SYSTEM_LOCK(env);
errlck: if (lockout)
F_CLR(rep, REP_F_READY_MSG);
F_CLR(rep, REP_F_RECOVER_MASK | REP_F_DELAY);
lckout: REP_SYSTEM_UNLOCK(env);
return (ret);
notfound:
if (lp->db_log_inmemory)
ZERO_LSN(lsn);
else
INIT_LSN(lsn);
RPRINT(env, DB_VERB_REP_MISC,
(env, "No commit or ckp found. Truncate log."));
ret = lp->db_log_inmemory ?
__log_zero(env, &lsn) :
__log_vtruncate(env, &lsn, &lsn, NULL);
if (ret != 0 && ret != DB_NOTFOUND)
return (ret);
infop = env->reginfo;
renv = infop->primary;
REP_SYSTEM_LOCK(env);
(void)time(&renv->rep_timestamp);
REP_SYSTEM_UNLOCK(env);
if ((ret = __rep_newmaster_empty(env, eid)) != 0)
goto err;
return (DB_REP_NEWMASTER);
}
static int
__rep_newmaster_empty(env, eid)
ENV *env;
int eid;
{
DB_REP *db_rep;
LOG *lp;
REP *rep;
int msg, ret;
db_rep = env->rep_handle;
rep = db_rep->region;
lp = env->lg_handle->reginfo.primary;
msg = ret = 0;
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
lp->wait_ts = rep->request_gap;
F_CLR(rep, REP_F_RECOVER_VERIFY);
F_SET(rep, REP_F_RECOVER_UPDATE);
if (F_ISSET(rep, REP_F_DELAY)) {
} else if (FLD_ISSET(rep->config, REP_C_NOAUTOINIT)) {
F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK);
ret = DB_REP_JOIN_FAILURE;
} else {
msg = 1;
}
REP_SYSTEM_UNLOCK(env);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (msg)
(void)__rep_send_message(env, eid, REP_UPDATE_REQ,
NULL, NULL, 0, 0);
return (ret);
}
int
__rep_noarchive(env)
ENV *env;
{
DB_REP *db_rep;
REGENV *renv;
REGINFO *infop;
REP *rep;
time_t timestamp;
infop = env->reginfo;
renv = infop->primary;
if (F_ISSET(renv, DB_REGENV_REPLOCKED)) {
(void)time(×tamp);
TIMESTAMP_CHECK(env, timestamp, renv);
if (F_ISSET(renv, DB_REGENV_REPLOCKED))
return (EINVAL);
}
if (!REP_ON(env))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
return (F_ISSET(rep, REP_F_NOARCHIVE) ? 1 : 0);
}
void
__rep_send_vote(env, lsnp, nsites, nvotes, pri, tie, egen, eid, vtype, flags)
ENV *env;
DB_LSN *lsnp;
int eid;
u_int32_t nsites, nvotes, pri;
u_int32_t flags, egen, tie, vtype;
{
DB_REP *db_rep;
DBT vote_dbt;
REP *rep;
REP_OLD_VOTE_INFO ovi;
__rep_vote_info_args vi;
u_int8_t buf[__REP_VOTE_INFO_SIZE];
size_t len;
db_rep = env->rep_handle;
rep = db_rep->region;
memset(&vi, 0, sizeof(vi));
memset(&vote_dbt, 0, sizeof(vote_dbt));
if (rep->version < DB_REPVERSION_47) {
memset(&ovi, 0, sizeof(ovi));
ovi.egen = egen;
ovi.priority = (int) pri;
ovi.nsites = (int) nsites;
ovi.nvotes = (int) nvotes;
ovi.tiebreaker = tie;
vote_dbt.data = &ovi;
vote_dbt.size = sizeof(ovi);
} else {
vi.egen = egen;
vi.priority = pri;
vi.nsites = nsites;
vi.nvotes = nvotes;
vi.tiebreaker = tie;
(void)__rep_vote_info_marshal(env, &vi, buf,
__REP_VOTE_INFO_SIZE, &len);
DB_INIT_DBT(vote_dbt, buf, len);
}
(void)__rep_send_message(env, eid, vtype, lsnp, &vote_dbt, flags, 0);
}
void
__rep_elect_done(env, rep, found_master)
ENV *env;
REP *rep;
int found_master;
{
int inelect;
db_timespec endtime;
inelect = IN_ELECTION(rep);
F_CLR(rep,
REP_F_EPHASE0 | REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY);
if (found_master)
F_CLR(rep, REP_F_EGENUPDATE);
rep->sites = 0;
rep->votes = 0;
if (inelect) {
if (timespecisset(&rep->etime)) {
__os_gettime(env, &endtime, 1);
timespecsub(&endtime, &rep->etime);
#ifdef HAVE_STATISTICS
rep->stat.st_election_sec = (u_int32_t)endtime.tv_sec;
rep->stat.st_election_usec = (u_int32_t)
(endtime.tv_nsec / NS_PER_US);
#endif
RPRINT(env, DB_VERB_REP_ELECT, (env,
"Election finished in %lu.%09lu sec",
(u_long)endtime.tv_sec, (u_long)endtime.tv_nsec));
timespecclear(&rep->etime);
}
rep->egen++;
}
RPRINT(env, DB_VERB_REP_ELECT,
(env, "Election done; egen %lu", (u_long)rep->egen));
}
int
__rep_grow_sites(env, nsites)
ENV *env;
u_int32_t nsites;
{
REGENV *renv;
REGINFO *infop;
REP *rep;
int ret, *tally;
u_int32_t nalloc;
rep = env->rep_handle->region;
nalloc = 2 * rep->asites;
if (nalloc < nsites)
nalloc = nsites;
infop = env->reginfo;
renv = infop->primary;
MUTEX_LOCK(env, renv->mtx_regenv);
if ((ret = __env_alloc(infop,
(size_t)nalloc * sizeof(REP_VTALLY), &tally)) == 0) {
if (rep->tally_off != INVALID_ROFF)
__env_alloc_free(
infop, R_ADDR(infop, rep->tally_off));
rep->tally_off = R_OFFSET(infop, tally);
if ((ret = __env_alloc(infop,
(size_t)nalloc * sizeof(REP_VTALLY), &tally)) == 0) {
if (rep->v2tally_off != INVALID_ROFF)
__env_alloc_free(infop,
R_ADDR(infop, rep->v2tally_off));
rep->v2tally_off = R_OFFSET(infop, tally);
rep->asites = nalloc;
rep->nsites = nsites;
} else {
if (rep->v2tally_off != INVALID_ROFF)
__env_alloc_free(infop,
R_ADDR(infop, rep->v2tally_off));
__env_alloc_free(infop,
R_ADDR(infop, rep->tally_off));
rep->v2tally_off = rep->tally_off = INVALID_ROFF;
rep->asites = 0;
rep->nsites = 0;
}
}
MUTEX_UNLOCK(env, renv->mtx_regenv);
return (ret);
}
int
__env_rep_enter(env, checklock)
ENV *env;
int checklock;
{
DB_REP *db_rep;
REGENV *renv;
REGINFO *infop;
REP *rep;
int cnt;
time_t timestamp;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
infop = env->reginfo;
renv = infop->primary;
if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
(void)time(×tamp);
TIMESTAMP_CHECK(env, timestamp, renv);
if (F_ISSET(renv, DB_REGENV_REPLOCKED))
return (EINVAL);
}
REP_SYSTEM_LOCK(env);
for (cnt = 0; F_ISSET(rep, REP_F_READY_API);) {
REP_SYSTEM_UNLOCK(env);
if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
__db_errx(env,
"Operation locked out. Waiting for replication lockout to complete");
return (DB_REP_LOCKOUT);
}
__os_yield(env, 1, 0);
REP_SYSTEM_LOCK(env);
if (++cnt % 60 == 0)
__db_errx(env,
"DB_ENV handle waiting %d minutes for replication lockout to complete",
cnt / 60);
}
rep->handle_cnt++;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__env_db_rep_exit(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
rep->handle_cnt--;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__db_rep_enter(dbp, checkgen, checklock, return_now)
DB *dbp;
int checkgen, checklock, return_now;
{
DB_REP *db_rep;
ENV *env;
REGENV *renv;
REGINFO *infop;
REP *rep;
time_t timestamp;
env = dbp->env;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
infop = env->reginfo;
renv = infop->primary;
if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
(void)time(×tamp);
TIMESTAMP_CHECK(env, timestamp, renv);
if (F_ISSET(renv, DB_REGENV_REPLOCKED))
return (EINVAL);
}
REP_SYSTEM_LOCK(env);
if (F_ISSET(rep, REP_F_READY_OP)) {
REP_SYSTEM_UNLOCK(env);
if (!return_now)
__os_yield(env, 5, 0);
return (DB_LOCK_DEADLOCK);
}
if (checkgen && dbp->timestamp != renv->rep_timestamp) {
REP_SYSTEM_UNLOCK(env);
__db_errx(env, "%s %s",
"replication recovery unrolled committed transactions;",
"open DB and DBcursor handles must be closed");
return (DB_REP_HANDLE_DEAD);
}
rep->handle_cnt++;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__op_rep_enter(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
int cnt;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
for (cnt = 0; F_ISSET(rep, REP_F_READY_OP);) {
REP_SYSTEM_UNLOCK(env);
if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
__db_errx(env,
"Operation locked out. Waiting for replication lockout to complete");
return (DB_REP_LOCKOUT);
}
__os_yield(env, 5, 0);
cnt += 5;
REP_SYSTEM_LOCK(env);
if (cnt % 60 == 0)
__db_errx(env,
"__op_rep_enter waiting %d minutes for lockout to complete",
cnt / 60);
}
rep->op_cnt++;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__op_rep_exit(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
DB_ASSERT(env, rep->op_cnt > 0);
rep->op_cnt--;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__rep_lockout_api(env, rep)
ENV *env;
REP *rep;
{
int ret;
if ((ret = __rep_lockout_int(env, rep, &rep->op_cnt, 0,
"op_cnt", REP_F_READY_OP)) != 0)
return (ret);
return (__rep_lockout_int(env, rep, &rep->handle_cnt, 0,
"handle_cnt", REP_F_READY_API));
}
int
__rep_lockout_apply(env, rep, apply_th)
ENV *env;
REP *rep;
u_int32_t apply_th;
{
return (__rep_lockout_int(env, rep, &rep->apply_th, apply_th,
"apply_th", REP_F_READY_APPLY));
}
int
__rep_lockout_msg(env, rep, msg_th)
ENV *env;
REP *rep;
u_int32_t msg_th;
{
return (__rep_lockout_int(env, rep, &rep->msg_th, msg_th,
"msg_th", REP_F_READY_MSG));
}
static int
__rep_lockout_int(env, rep, fieldp, field_val, msg, lockout_flag)
ENV *env;
REP *rep;
u_int32_t *fieldp;
const char *msg;
u_int32_t field_val, lockout_flag;
{
int wait_cnt;
F_SET(rep, lockout_flag);
for (wait_cnt = 0; *fieldp > field_val;) {
REP_SYSTEM_UNLOCK(env);
__os_yield(env, 1, 0);
#ifdef DIAGNOSTIC
if (wait_cnt == 5)
__db_errx(env,
"Waiting for %s (%lu) to complete replication lockout",
msg, (u_long)*fieldp);
if (++wait_cnt % 60 == 0)
__db_errx(env,
"Waiting for %s (%lu) to complete replication lockout for %d minutes",
msg, (u_long)*fieldp, wait_cnt / 60);
#endif
REP_SYSTEM_LOCK(env);
}
COMPQUIET(msg, NULL);
return (0);
}
int
__rep_send_throttle(env, eid, repth, flags, ctlflags)
ENV *env;
int eid;
REP_THROTTLE *repth;
u_int32_t ctlflags, flags;
{
DB_REP *db_rep;
REP *rep;
u_int32_t size, typemore;
int check_limit;
check_limit = repth->gbytes != 0 || repth->bytes != 0;
if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
typemore = 0;
if (repth->type == REP_LOG)
typemore = REP_LOG_MORE;
if (repth->type == REP_PAGE)
typemore = REP_PAGE_MORE;
DB_ASSERT(env, typemore != 0);
size = repth->data_dbt->size + sizeof(__rep_control_args);
if (check_limit) {
while (repth->bytes <= size) {
if (repth->gbytes > 0) {
repth->bytes += GIGABYTE;
--(repth->gbytes);
continue;
}
STAT(rep->stat.st_nthrottles++);
repth->type = typemore;
goto send;
}
repth->bytes -= size;
}
send: if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) &&
(__rep_send_message(env, eid, repth->type,
&repth->lsn, repth->data_dbt, (REPCTL_RESEND | ctlflags), 0) != 0))
return (DB_REP_UNAVAIL);
return (0);
}
u_int32_t
__rep_msg_to_old(version, rectype)
u_int32_t version, rectype;
{
static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
REP_INVALID,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
REP_INVALID,
23,
24,
25,
26,
27,
28,
29
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
}
};
return (table[version][rectype]);
}
u_int32_t
__rep_msg_from_old(version, rectype)
u_int32_t version, rectype;
{
static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
25,
26,
27,
28,
29,
30,
31,
REP_INVALID,
REP_INVALID
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
}
};
return (table[version][rectype]);
}
void
#ifdef STDC_HEADERS
__rep_print(ENV *env, const char *fmt, ...)
#else
__rep_print(env, fmt, va_alist)
ENV *env;
const char *fmt;
va_dcl
#endif
{
va_list ap;
DB_MSGBUF mb;
REP *rep;
const char *s;
DB_MSGBUF_INIT(&mb);
s = NULL;
if (env->dbenv->db_errpfx != NULL)
s = env->dbenv->db_errpfx;
else if (REP_ON(env)) {
rep = env->rep_handle->region;
if (F_ISSET(rep, REP_F_CLIENT))
s = "CLIENT";
else if (F_ISSET(rep, REP_F_MASTER))
s = "MASTER";
}
if (s == NULL)
s = "REP_UNDEF";
__db_msgadd(env, &mb, "%s: ", s);
#ifdef STDC_HEADERS
va_start(ap, fmt);
#else
va_start(ap);
#endif
__db_msgadd_ap(env, &mb, fmt, ap);
va_end(ap);
DB_MSGBUF_FLUSH(env, &mb);
}
void
__rep_print_message(env, eid, rp, str, flags)
ENV *env;
int eid;
__rep_control_args *rp;
char *str;
u_int32_t flags;
{
u_int32_t ctlflags, rectype;
char ftype[64], *type;
rectype = rp->rectype;
ctlflags = rp->flags;
if (rp->rep_version != DB_REPVERSION)
rectype = __rep_msg_from_old(rp->rep_version, rectype);
switch (rectype) {
case REP_ALIVE:
type = "alive";
break;
case REP_ALIVE_REQ:
type = "alive_req";
break;
case REP_ALL_REQ:
type = "all_req";
break;
case REP_BULK_LOG:
type = "bulk_log";
break;
case REP_BULK_PAGE:
type = "bulk_page";
break;
case REP_DUPMASTER:
type = "dupmaster";
break;
case REP_FILE:
type = "file";
break;
case REP_FILE_FAIL:
type = "file_fail";
break;
case REP_FILE_REQ:
type = "file_req";
break;
case REP_LEASE_GRANT:
type = "lease_grant";
break;
case REP_LOG:
type = "log";
break;
case REP_LOG_MORE:
type = "log_more";
break;
case REP_LOG_REQ:
type = "log_req";
break;
case REP_MASTER_REQ:
type = "master_req";
break;
case REP_NEWCLIENT:
type = "newclient";
break;
case REP_NEWFILE:
type = "newfile";
break;
case REP_NEWMASTER:
type = "newmaster";
break;
case REP_NEWSITE:
type = "newsite";
break;
case REP_PAGE:
type = "page";
break;
case REP_PAGE_FAIL:
type = "page_fail";
break;
case REP_PAGE_MORE:
type = "page_more";
break;
case REP_PAGE_REQ:
type = "page_req";
break;
case REP_REREQUEST:
type = "rerequest";
break;
case REP_START_SYNC:
type = "start_sync";
break;
case REP_UPDATE:
type = "update";
break;
case REP_UPDATE_REQ:
type = "update_req";
break;
case REP_VERIFY:
type = "verify";
break;
case REP_VERIFY_FAIL:
type = "verify_fail";
break;
case REP_VERIFY_REQ:
type = "verify_req";
break;
case REP_VOTE1:
type = "vote1";
break;
case REP_VOTE2:
type = "vote2";
break;
default:
type = "NOTYPE";
break;
}
ftype[0] = '\0';
if (LF_ISSET(DB_REP_ANYWHERE))
(void)strcat(ftype, " any");
if (FLD_ISSET(ctlflags, REPCTL_FLUSH))
(void)strcat(ftype, " flush");
if (!FLD_ISSET(ctlflags, REPCTL_GROUP_ESTD))
(void)strcat(ftype, " nogroup");
if (FLD_ISSET(ctlflags, REPCTL_LEASE))
(void)strcat(ftype, " lease");
if (LF_ISSET(DB_REP_NOBUFFER))
(void)strcat(ftype, " nobuf");
if (LF_ISSET(DB_REP_PERMANENT))
(void)strcat(ftype, " perm");
if (LF_ISSET(DB_REP_REREQUEST))
(void)strcat(ftype, " rereq");
if (FLD_ISSET(ctlflags, REPCTL_RESEND))
(void)strcat(ftype, " resend");
if (FLD_ISSET(ctlflags, REPCTL_LOG_END))
(void)strcat(ftype, " logend");
RPRINT(env, DB_VERB_REP_MSGS,
(env,
"%s %s: msgv = %lu logv %lu gen = %lu eid %d, type %s, LSN [%lu][%lu] %s",
env->db_home, str,
(u_long)rp->rep_version, (u_long)rp->log_version, (u_long)rp->gen,
eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset, ftype));
DB_ASSERT(env, rp->rep_version <= DB_REPVERSION+10);
DB_ASSERT(env, rp->log_version <= DB_LOGVERSION+10);
}
void
__rep_fire_event(env, event, info)
ENV *env;
u_int32_t event;
void *info;
{
int ret;
ret = __repmgr_handle_event(env, event, info);
DB_ASSERT(env, ret == 0 || ret == DB_EVENT_NOT_HANDLED);
if (ret == DB_EVENT_NOT_HANDLED)
DB_EVENT(env, event, info);
}