#include "db_config.h"
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/lock.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *));
static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *));
static int __rep_fire_newmaster __P((ENV *, u_int32_t, int));
static int __rep_fire_startupdone __P((ENV *, u_int32_t, int));
static int __rep_getnext __P((ENV *, DB_THREAD_INFO *));
static int __rep_lsn_cmp __P((const void *, const void *));
static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *));
static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
DBT *, db_timespec *, DB_LSN *));
static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *));
static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t));
#define MASTER_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_F_MASTER)) { \
RPRINT(env, DB_VERB_REP_MSGS, \
(env, "Master record received on client")); \
REP_PRINT_MESSAGE(env, \
eid, rp, "rep_process_message", 0); \
ret = EINVAL; \
goto errlock; \
} \
} while (0)
#define CLIENT_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_F_CLIENT)) { \
RPRINT(env, DB_VERB_REP_MSGS, \
(env, "Client record received on master")); \
\
if (IS_USING_LEASES(env)) \
DB_ASSERT(env, \
__rep_lease_check(env, 0) == \
DB_REP_LEASE_EXPIRED); \
else { \
REP_PRINT_MESSAGE(env, \
eid, rp, "rep_process_message", 0); \
(void)__rep_send_message(env, DB_EID_BROADCAST, \
REP_DUPMASTER, NULL, NULL, 0, 0); \
} \
ret = DB_REP_DUPMASTER; \
goto errlock; \
} \
} while (0)
#define CLIENT_REREQ do { \
if (F_ISSET(rep, REP_F_CLIENT)) { \
STAT(rep->stat.st_client_svc_req++); \
if (ret == DB_NOTFOUND) { \
STAT(rep->stat.st_client_svc_miss++); \
ret = __rep_skip_msg(env, rep, eid, rp->rectype);\
} \
} \
} while (0)
#define MASTER_UPDATE(env, renv) do { \
REP_SYSTEM_LOCK(env); \
F_SET((renv), DB_REGENV_REPLOCKED); \
(void)time(&(renv)->op_timestamp); \
REP_SYSTEM_UNLOCK(env); \
} while (0)
#define RECOVERING_SKIP do { \
if (IS_REP_CLIENT(env) && recovering) { \
\
STAT(rep->stat.st_msgs_recover++); \
ret = __rep_skip_msg(env, rep, eid, rp->rectype); \
goto errlock; \
} \
} while (0)
#define RECOVERING_LOG_SKIP do { \
if (F_ISSET(rep, REP_F_DELAY) || \
rep->master_id == DB_EID_INVALID || \
(recovering && \
(!F_ISSET(rep, REP_F_RECOVER_LOG) || \
LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0))) { \
\
STAT(rep->stat.st_msgs_recover++); \
ret = __rep_skip_msg(env, rep, eid, rp->rectype); \
goto errlock; \
} \
} while (0)
#define ANYSITE(rep)
int
__rep_process_message(dbenv, control, rec, eid, ret_lsnp)
DB_ENV *dbenv;
DBT *control, *rec;
int eid;
DB_LSN *ret_lsnp;
{
DBT data_dbt;
DB_LOG *dblp;
DB_LSN last_lsn, lsn;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
ENV *env;
LOG *lp;
REGENV *renv;
REGINFO *infop;
REP *rep;
REP_46_CONTROL *rp46;
REP_OLD_CONTROL *orp;
__rep_control_args *rp, tmprp;
__rep_egen_args egen_arg;
size_t len;
u_int32_t gen, rep_version;
int cmp, do_sync, lockout, recovering, ret, t_ret;
time_t savetime;
u_int8_t buf[__REP_MAXMSG_SIZE];
env = dbenv->env;
ENV_REQUIRES_CONFIG_XX(
env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP);
if (control == NULL || control->size == 0) {
__db_errx(env,
"DB_ENV->rep_process_message: control argument must be specified");
return (EINVAL);
}
if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) {
__db_errx(env,
"Environment not configured as replication master or client");
return (EINVAL);
}
if ((ret = __dbt_usercopy(env, control)) != 0 ||
(ret = __dbt_usercopy(env, rec)) != 0) {
__dbt_userfree(env, control, rec, NULL);
__db_errx(env,
"DB_ENV->rep_process_message: error retrieving DBT contents");
return ret;
}
ret = 0;
do_sync = 0;
lockout = 0;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
infop = env->reginfo;
renv = infop->primary;
rep_version = ((REP_OLD_CONTROL *)control->data)->rep_version;
if (rep_version <= DB_REPVERSION_45) {
orp = (REP_OLD_CONTROL *)control->data;
if (rep_version == DB_REPVERSION_45 &&
F_ISSET(orp, REPCTL_INIT_45)) {
F_CLR(orp, REPCTL_INIT_45);
F_SET(orp, REPCTL_INIT);
}
tmprp.rep_version = orp->rep_version;
tmprp.log_version = orp->log_version;
tmprp.lsn = orp->lsn;
tmprp.rectype = orp->rectype;
tmprp.gen = orp->gen;
tmprp.flags = orp->flags;
tmprp.msg_sec = 0;
tmprp.msg_nsec = 0;
} else if (rep_version == DB_REPVERSION_46) {
rp46 = (REP_46_CONTROL *)control->data;
tmprp.rep_version = rp46->rep_version;
tmprp.log_version = rp46->log_version;
tmprp.lsn = rp46->lsn;
tmprp.rectype = rp46->rectype;
tmprp.gen = rp46->gen;
tmprp.flags = rp46->flags;
tmprp.msg_sec = (u_int32_t)rp46->msg_time.tv_sec;
tmprp.msg_nsec = (u_int32_t)rp46->msg_time.tv_nsec;
} else
if ((ret = __rep_control_unmarshal(env, &tmprp,
control->data, control->size, NULL)) != 0)
return (ret);
rp = &tmprp;
if (ret_lsnp != NULL)
ZERO_LSN(*ret_lsnp);
ENV_ENTER(env, ip);
REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0);
if (rp->rep_version < DB_REPVERSION) {
if (rp->rep_version < DB_REPVERSION_MIN) {
__db_errx(env,
"unsupported old replication message version %lu, minimum version %d",
(u_long)rp->rep_version, DB_REPVERSION_MIN);
ret = EINVAL;
goto errlock;
}
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Received record %lu with old rep version %lu",
(u_long)rp->rectype, (u_long)rp->rep_version));
rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype);
DB_ASSERT(env, rp->rectype != REP_INVALID);
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Converted to record %lu with old rep version %lu",
(u_long)rp->rectype, (u_long)rp->rep_version));
} else if (rp->rep_version > DB_REPVERSION) {
__db_errx(env,
"unexpected replication message version %lu, expected %d",
(u_long)rp->rep_version, DB_REPVERSION);
ret = EINVAL;
goto errlock;
}
if (rp->log_version < DB_LOGVERSION) {
if (rp->log_version < DB_LOGVERSION_MIN) {
__db_errx(env,
"unsupported old replication log version %lu, minimum version %d",
(u_long)rp->log_version, DB_LOGVERSION_MIN);
ret = EINVAL;
goto errlock;
}
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Received record %lu with old log version %lu",
(u_long)rp->rectype, (u_long)rp->log_version));
} else if (rp->log_version > DB_LOGVERSION) {
__db_errx(env,
"unexpected log record version %lu, expected %d",
(u_long)rp->log_version, DB_LOGVERSION);
ret = EINVAL;
goto errlock;
}
REP_SYSTEM_LOCK(env);
if (F_ISSET(rep, REP_F_READY_MSG)) {
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Racing replication msg lockout, ignore message."));
if (F_ISSET(rp, REPCTL_PERM))
ret = DB_REP_IGNORE;
REP_SYSTEM_UNLOCK(env);
if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) {
STAT(rep->stat.st_client_svc_req++);
STAT(rep->stat.st_client_svc_miss++);
(void)__rep_send_message(env,
eid, REP_REREQUEST, NULL, NULL, 0, 0);
}
goto out;
}
rep->msg_th++;
gen = rep->gen;
recovering = F_ISSET(rep, REP_F_RECOVER_MASK);
savetime = renv->rep_timestamp;
STAT(rep->stat.st_msgs_processed++);
REP_SYSTEM_UNLOCK(env);
if (!IS_USING_LEASES(env) &&
(F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) {
__db_errx(env,
"Inconsistent lease configuration");
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Client received lease message and not using leases"));
ret = EINVAL;
ret = __env_panic(env, ret);
goto errlock;
}
if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) {
STAT(rep->stat.st_msgs_badgen++);
if (F_ISSET(rp, REPCTL_PERM))
ret = DB_REP_IGNORE;
goto errlock;
}
if (rp->gen > gen) {
if (F_ISSET(rep, REP_F_MASTER)) {
STAT(rep->stat.st_dupmasters++);
ret = DB_REP_DUPMASTER;
if (IS_USING_LEASES(env))
DB_ASSERT(env,
__rep_lease_check(env, 0) ==
DB_REP_LEASE_EXPIRED);
else if (rp->rectype != REP_DUPMASTER)
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_DUPMASTER,
NULL, NULL, 0, 0);
goto errlock;
}
if (rp->rectype == REP_ALIVE ||
rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
REP_SYSTEM_LOCK(env);
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Updating gen from %lu to %lu",
(u_long)gen, (u_long)rp->gen));
rep->master_id = DB_EID_INVALID;
gen = rep->gen = rp->gen;
REP_SYSTEM_UNLOCK(env);
if (rp->rectype == REP_ALIVE)
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
NULL, 0, 0);
} else if (rp->rectype != REP_NEWMASTER) {
if (__rep_check_doreq(env, rep))
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_MASTER_REQ,
NULL, NULL, 0, 0);
goto errlock;
}
}
if (F_ISSET(rp, REPCTL_GROUP_ESTD)) {
REP_SYSTEM_LOCK(env);
#ifdef DIAGNOSTIC
if (!F_ISSET(rep, REP_F_GROUP_ESTD))
RPRINT(env, DB_VERB_REP_MSGS, (env,
"I am now part of an established group"));
#endif
F_SET(rep, REP_F_GROUP_ESTD);
REP_SYSTEM_UNLOCK(env);
}
switch (rp->rectype) {
case REP_ALIVE:
ANYSITE(rep);
if (rp->rep_version < DB_REPVERSION_47)
egen_arg.egen = *(u_int32_t *)rec->data;
else if ((ret = __rep_egen_unmarshal(env, &egen_arg,
rec->data, rec->size, NULL)) != 0)
return (ret);
REP_SYSTEM_LOCK(env);
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Received ALIVE egen of %lu, mine %lu",
(u_long)egen_arg.egen, (u_long)rep->egen));
if (egen_arg.egen > rep->egen) {
__rep_elect_done(env, rep, 0);
rep->egen = egen_arg.egen;
F_SET(rep, REP_F_EGENUPDATE);
}
REP_SYSTEM_UNLOCK(env);
break;
case REP_ALIVE_REQ:
ANYSITE(rep);
LOG_SYSTEM_LOCK(env);
lsn = lp->lsn;
LOG_SYSTEM_UNLOCK(env);
#ifdef CONFIG_TEST
if (F_ISSET(rep, REP_F_MASTER))
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
#endif
REP_SYSTEM_LOCK(env);
egen_arg.egen = rep->egen;
REP_SYSTEM_UNLOCK(env);
if (rep->version < DB_REPVERSION_47)
DB_INIT_DBT(data_dbt, &egen_arg.egen,
sizeof(egen_arg.egen));
else {
if ((ret = __rep_egen_marshal(env,
&egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0)
goto errlock;
DB_INIT_DBT(data_dbt, buf, len);
}
(void)__rep_send_message(env,
eid, REP_ALIVE, &lsn, &data_dbt, 0, 0);
break;
case REP_ALL_REQ:
RECOVERING_SKIP;
ret = __rep_allreq(env, rp, eid);
CLIENT_REREQ;
break;
case REP_BULK_LOG:
RECOVERING_LOG_SKIP;
CLIENT_ONLY(rep, rp);
ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp);
break;
case REP_BULK_PAGE:
CLIENT_ONLY(rep, rp);
ret = __rep_bulk_page(env, ip, eid, rp, rec);
break;
case REP_DUPMASTER:
if (F_ISSET(rep, REP_F_MASTER))
ret = DB_REP_DUPMASTER;
break;
#ifdef NOTYET
case REP_FILE:
CLIENT_ONLY(rep, rp);
break;
case REP_FILE_REQ:
ret = __rep_send_file(env, rec, eid);
break;
#endif
case REP_FILE_FAIL:
CLIENT_ONLY(rep, rp);
break;
case REP_LEASE_GRANT:
MASTER_ONLY(rep, rp);
ret = __rep_lease_grant(env, rp, rec, eid);
break;
case REP_LOG:
case REP_LOG_MORE:
RECOVERING_LOG_SKIP;
CLIENT_ONLY(rep, rp);
ret = __rep_log(env, ip, rp, rec, savetime, ret_lsnp);
break;
case REP_LOG_REQ:
RECOVERING_SKIP;
if (F_ISSET(rp, REPCTL_INIT))
MASTER_UPDATE(env, renv);
ret = __rep_logreq(env, rp, rec, eid);
CLIENT_REREQ;
break;
case REP_NEWSITE:
STAT(rep->stat.st_newsites++);
if (F_ISSET(rep, REP_F_MASTER)) {
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
LOG_SYSTEM_LOCK(env);
lsn = lp->lsn;
LOG_SYSTEM_UNLOCK(env);
(void)__rep_send_message(env,
eid, REP_NEWMASTER, &lsn, NULL, 0, 0);
if (IS_USING_LEASES(env))
(void)__rep_lease_refresh(env);
}
ret = DB_REP_NEWSITE;
break;
case REP_NEWCLIENT:
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0);
ret = DB_REP_NEWSITE;
if (F_ISSET(rep, REP_F_CLIENT)) {
REP_SYSTEM_LOCK(env);
egen_arg.egen = rep->egen;
if (eid == rep->master_id) {
rep->master_id = DB_EID_INVALID;
if (F_ISSET(rep, REP_F_READY_MSG))
goto errhlk;
if ((t_ret =
__rep_lockout_msg(env, rep, 1)) != 0) {
ret = t_ret;
goto errhlk;
}
lockout = 1;
REP_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
if (F_ISSET(rep, REP_F_READY_API |
REP_F_READY_OP)) {
RPRINT(env, DB_VERB_REP_MSGS, (env,
"NEWCLIENT is cleaning up old internal init for invalid master"));
t_ret = __rep_init_cleanup(env,
rep, DB_FORCE);
F_CLR(rep, REP_F_RECOVER_MASK);
}
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (t_ret != 0) {
ret = t_ret;
RPRINT(env, DB_VERB_REP_MSGS, (env,
"NEWCLIENT error cleaning up internal init for invalid master: %d", ret));
goto errhlk;
}
F_CLR(rep, REP_F_READY_MSG);
lockout = 0;
}
REP_SYSTEM_UNLOCK(env);
if (rep->version < DB_REPVERSION_47)
DB_INIT_DBT(data_dbt, &egen_arg.egen,
sizeof(egen_arg.egen));
else {
if ((ret = __rep_egen_marshal(env, &egen_arg,
buf, __REP_EGEN_SIZE, &len)) != 0)
goto errlock;
DB_INIT_DBT(data_dbt, buf, len);
}
(void)__rep_send_message(env, DB_EID_BROADCAST,
REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
break;
}
case REP_MASTER_REQ:
RECOVERING_SKIP;
if (F_ISSET(rep, REP_F_MASTER)) {
LOG_SYSTEM_LOCK(env);
lsn = lp->lsn;
LOG_SYSTEM_UNLOCK(env);
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
if (IS_USING_LEASES(env))
(void)__rep_lease_refresh(env);
}
if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
REP_SYSTEM_LOCK(env);
egen_arg.egen = rep->egen;
if (eid == rep->master_id)
rep->master_id = DB_EID_INVALID;
REP_SYSTEM_UNLOCK(env);
if (rep->version < DB_REPVERSION_47)
DB_INIT_DBT(data_dbt, &egen_arg.egen,
sizeof(egen_arg.egen));
else {
if ((ret = __rep_egen_marshal(env, &egen_arg,
buf, __REP_EGEN_SIZE, &len)) != 0)
goto errlock;
DB_INIT_DBT(data_dbt, buf, len);
}
(void)__rep_send_message(env, eid,
REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
}
break;
case REP_NEWFILE:
RECOVERING_LOG_SKIP;
CLIENT_ONLY(rep, rp);
ret = __rep_apply(env,
ip, rp, rec, ret_lsnp, NULL, &last_lsn);
if (ret == DB_REP_LOGREADY)
ret = __rep_logready(env, rep, savetime, &last_lsn);
break;
case REP_NEWMASTER:
ANYSITE(rep);
if (F_ISSET(rep, REP_F_MASTER) &&
eid != rep->eid) {
STAT(rep->stat.st_dupmasters++);
ret = DB_REP_DUPMASTER;
if (IS_USING_LEASES(env))
DB_ASSERT(env,
__rep_lease_check(env, 0) ==
DB_REP_LEASE_EXPIRED);
else
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_DUPMASTER,
NULL, NULL, 0, 0);
break;
}
if ((ret =
__rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER)
ret = __rep_fire_newmaster(env, rp->gen, eid);
break;
case REP_PAGE:
case REP_PAGE_MORE:
CLIENT_ONLY(rep, rp);
ret = __rep_page(env, ip, eid, rp, rec);
if (ret == DB_REP_PAGEDONE)
ret = 0;
break;
case REP_PAGE_FAIL:
CLIENT_ONLY(rep, rp);
ret = __rep_page_fail(env, ip, eid, rp, rec);
break;
case REP_PAGE_REQ:
RECOVERING_SKIP;
MASTER_UPDATE(env, renv);
ret = __rep_page_req(env, ip, eid, rp, rec);
CLIENT_REREQ;
break;
case REP_REREQUEST:
CLIENT_ONLY(rep, rp);
STAT(rep->stat.st_client_rerequests++);
ret = __rep_resend_req(env, 1);
break;
case REP_START_SYNC:
RECOVERING_SKIP;
MUTEX_LOCK(env, rep->mtx_clientdb);
cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
if (cmp <= 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
do_sync = 1;
} else {
STAT(rep->stat.st_startsync_delayed++);
if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0)
rep->ckp_lsn = rp->lsn;
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Delayed START_SYNC memp_sync due to missing records."));
RPRINT(env, DB_VERB_REP_MSGS, (env,
"ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]",
(u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset,
(u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset));
MUTEX_UNLOCK(env, rep->mtx_clientdb);
}
break;
case REP_UPDATE:
CLIENT_ONLY(rep, rp);
ret = __rep_update_setup(env, eid, rp, rec);
break;
case REP_UPDATE_REQ:
MASTER_ONLY(rep, rp);
infop = env->reginfo;
renv = infop->primary;
MASTER_UPDATE(env, renv);
ret = __rep_update_req(env, rp, eid);
break;
case REP_VERIFY:
if (recovering) {
MUTEX_LOCK(env, rep->mtx_clientdb);
cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (cmp != 0) {
ret = __rep_skip_msg(
env, rep, eid, rp->rectype);
break;
}
}
CLIENT_ONLY(rep, rp);
ret = __rep_verify(env, rp, rec, eid, savetime);
break;
case REP_VERIFY_FAIL:
CLIENT_ONLY(rep, rp);
ret = __rep_verify_fail(env, rp, eid);
break;
case REP_VERIFY_REQ:
RECOVERING_SKIP;
ret = __rep_verify_req(env, rp, eid);
CLIENT_REREQ;
break;
case REP_VOTE1:
ret = __rep_vote1(env, rp, rec, eid);
break;
case REP_VOTE2:
ret = __rep_vote2(env, rp, rec, eid);
break;
default:
__db_errx(env,
"DB_ENV->rep_process_message: unknown replication message: type %lu",
(u_long)rp->rectype);
ret = EINVAL;
break;
}
errlock:
REP_SYSTEM_LOCK(env);
errhlk: if (lockout)
F_CLR(rep, REP_F_READY_MSG);
rep->msg_th--;
REP_SYSTEM_UNLOCK(env);
if (do_sync) {
MUTEX_LOCK(env, rep->mtx_ckp);
lsn = rp->lsn;
ret = __memp_sync(
env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn);
MUTEX_UNLOCK(env, rep->mtx_ckp);
RPRINT(env, DB_VERB_REP_MSGS,
(env, "ALIVE: Completed sync [%lu][%lu]",
(u_long)lsn.file, (u_long)lsn.offset));
}
out:
if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
if (ret_lsnp != NULL)
*ret_lsnp = rp->lsn;
ret = DB_REP_NOTPERM;
}
__dbt_userfree(env, control, rec, NULL);
ENV_LEAVE(env, ip);
return (ret);
}
int
__rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp)
ENV *env;
DB_THREAD_INFO *ip;
__rep_control_args *rp;
DBT *rec;
DB_LSN *ret_lsnp;
int *is_dupp;
DB_LSN *last_lsnp;
{
DB *dbp;
DBT control_dbt, key_dbt;
DBT rec_dbt;
DB_LOG *dblp;
DB_LSN max_lsn, save_lsn;
DB_REP *db_rep;
LOG *lp;
REP *rep;
db_timespec msg_time, max_ts;
u_int32_t gen;
int cmp, event, master, ret, set_apply, t_ret;
COMPQUIET(gen, 0);
COMPQUIET(master, DB_EID_INVALID);
db_rep = env->rep_handle;
rep = db_rep->region;
event = ret = set_apply = 0;
memset(&control_dbt, 0, sizeof(control_dbt));
memset(&rec_dbt, 0, sizeof(rec_dbt));
ZERO_LSN(max_lsn);
timespecclear(&max_ts);
timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
cmp = -2;
dblp = env->lg_handle;
MUTEX_LOCK(env, rep->mtx_clientdb);
if (db_rep->rep_db == NULL &&
(ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
goto out;
}
dbp = db_rep->rep_db;
lp = dblp->reginfo.primary;
REP_SYSTEM_LOCK(env);
if (F_ISSET(rep, REP_F_RECOVER_LOG) &&
LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0)
lp->ready_lsn = rep->first_lsn;
cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
if (F_ISSET(rep, REP_F_READY_APPLY) && cmp >= 0)
F_SET(rep, REP_F_SKIPPED_APPLY);
if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0)
cmp = -1;
if (cmp == 0) {
if (F_ISSET(rep, REP_F_READY_APPLY)) {
RPRINT(env, DB_VERB_REP_MISC, (env,
"rep_apply: In election. Ignoring [%lu][%lu]",
(u_long)rp->lsn.file, (u_long)rp->lsn.offset));
REP_SYSTEM_UNLOCK(env);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
goto out;
}
rep->apply_th++;
set_apply = 1;
RPRINT(env, DB_VERB_REP_MISC, (env,
"rep_apply: Set apply_th %d", rep->apply_th));
REP_SYSTEM_UNLOCK(env);
if ((ret = __rep_process_rec(env, ip,
rp, rec, &max_ts, &max_lsn)) != 0)
goto err;
__os_gettime(env, &lp->rcvd_ts, 1);
ZERO_LSN(lp->max_wait_lsn);
while (ret == 0 &&
LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
gap_check:
if ((ret = __rep_remfirst(env, ip,
&control_dbt, &rec_dbt)) != 0)
goto err;
rp = (__rep_control_args *)control_dbt.data;
timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
rec = &rec_dbt;
if ((ret = __rep_process_rec(env, ip,
rp, rec, &max_ts, &max_lsn)) != 0)
goto err;
--rep->stat.st_log_queued;
lp->rcvd_ts = lp->last_ts;
lp->wait_ts = rep->request_gap;
if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) {
__os_gettime(env, &lp->rcvd_ts, 1);
ret = 0;
break;
} else if (ret != 0)
goto err;
}
if (!IS_ZERO_LSN(lp->waiting_lsn) &&
LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
if (__rep_check_doreq(env, rep) && (ret =
__rep_loggap_req(env, rep, &rp->lsn, 0)) != 0)
goto err;
} else {
lp->wait_ts = rep->request_gap;
ZERO_LSN(lp->max_wait_lsn);
}
} else if (cmp > 0) {
REP_SYSTEM_UNLOCK(env);
memset(&key_dbt, 0, sizeof(key_dbt));
key_dbt.data = rp;
key_dbt.size = sizeof(*rp);
ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
if (ret == 0) {
rep->stat.st_log_queued++;
__os_gettime(env, &lp->last_ts, 1);
#ifdef HAVE_STATISTICS
STAT(rep->stat.st_log_queued_total++);
if (rep->stat.st_log_queued_max <
rep->stat.st_log_queued)
rep->stat.st_log_queued_max =
rep->stat.st_log_queued;
#endif
}
if (ret == DB_KEYEXIST)
ret = 0;
if (ret != 0)
goto done;
if (IS_ZERO_LSN(lp->waiting_lsn) ||
LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0)
lp->waiting_lsn = rp->lsn;
if (__rep_check_doreq(env, rep) &&
(ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0))
goto err;
if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
max_lsn = rp->lsn;
ret = DB_REP_NOTPERM;
}
goto done;
} else {
STAT(rep->stat.st_log_duplicated++);
REP_SYSTEM_UNLOCK(env);
if (is_dupp != NULL)
*is_dupp = 1;
if (F_ISSET(rp, REPCTL_PERM))
max_lsn = lp->max_perm_lsn;
if (IS_USING_LEASES(env) &&
F_ISSET(rp, REPCTL_LEASE) &&
timespecisset(&msg_time)) {
if (timespeccmp(&msg_time, &lp->max_lease_ts, >))
max_ts = msg_time;
else
max_ts = lp->max_lease_ts;
}
goto done;
}
if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0)
goto gap_check;
done:
err:
REP_SYSTEM_LOCK(env);
if (ret == 0 &&
F_ISSET(rep, REP_F_RECOVER_LOG) &&
!IS_ZERO_LSN(rep->last_lsn) &&
LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) {
*last_lsnp = max_lsn;
ZERO_LSN(rep->last_lsn);
ZERO_LSN(max_lsn);
ret = DB_REP_LOGREADY;
}
if (set_apply) {
rep->apply_th--;
RPRINT(env, DB_VERB_REP_MISC, (env,
"rep_apply: Decrement apply_th %d", rep->apply_th));
}
if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
!IS_ZERO_LSN(max_lsn)) {
if (ret_lsnp != NULL)
*ret_lsnp = max_lsn;
ret = DB_REP_ISPERM;
DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0);
lp->max_perm_lsn = max_lsn;
}
if ((ret == 0 || ret == DB_REP_ISPERM) &&
rep->stat.st_startup_complete == 0 &&
!F_ISSET(rep, REP_F_RECOVER_LOG) &&
((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) ||
(cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) {
rep->stat.st_startup_complete = 1;
event = 1;
gen = rep->gen;
master = rep->master_id;
}
REP_SYSTEM_UNLOCK(env);
if (!IS_ZERO_LSN(rep->ckp_lsn) &&
LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) >= 0) {
save_lsn = rep->ckp_lsn;
ZERO_LSN(rep->ckp_lsn);
} else
ZERO_LSN(save_lsn);
if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) &&
timespecisset(&max_ts)) {
if ((t_ret = __rep_update_grant(env, &max_ts)) != 0)
ret = t_ret;
else if (timespeccmp(&max_ts, &lp->max_lease_ts, >))
lp->max_lease_ts = max_ts;
}
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (!IS_ZERO_LSN(save_lsn)) {
MUTEX_LOCK(env, rep->mtx_ckp);
RPRINT(env, DB_VERB_REP_MISC, (env,
"Starting delayed __memp_sync call [%lu][%lu]",
(u_long)save_lsn.file, (u_long)save_lsn.offset));
t_ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &save_lsn);
MUTEX_UNLOCK(env, rep->mtx_ckp);
}
if (event) {
RPRINT(env, DB_VERB_REP_MISC, (env,
"Start-up is done [%lu][%lu]",
(u_long)rp->lsn.file, (u_long)rp->lsn.offset));
if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) {
DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM);
ret = t_ret;
goto out;
}
}
if (ret == 0 && rp->rectype == REP_NEWFILE && lp->db_log_autoremove)
__log_autoremove(env);
if (control_dbt.data != NULL)
__os_ufree(env, control_dbt.data);
if (rec_dbt.data != NULL)
__os_ufree(env, rec_dbt.data);
out:
switch (ret) {
case 0:
break;
case DB_REP_ISPERM:
RPRINT(env, DB_VERB_REP_MSGS,
(env, "Returning ISPERM [%lu][%lu], cmp = %d",
(u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
break;
case DB_REP_LOGREADY:
RPRINT(env, DB_VERB_REP_MSGS, (env,
"Returning LOGREADY up to [%lu][%lu], cmp = %d",
(u_long)last_lsnp->file,
(u_long)last_lsnp->offset, cmp));
break;
case DB_REP_NOTPERM:
if (!F_ISSET(rep, REP_F_RECOVER_LOG) &&
!IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
*ret_lsnp = max_lsn;
RPRINT(env, DB_VERB_REP_MSGS,
(env, "Returning NOTPERM [%lu][%lu], cmp = %d",
(u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
break;
default:
RPRINT(env, DB_VERB_REP_MSGS,
(env, "Returning %d [%lu][%lu], cmp = %d", ret,
(u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
break;
}
return (ret);
}
int
__rep_process_txn(env, rec)
ENV *env;
DBT *rec;
{
DBT data_dbt, *lock_dbt;
DB_LOCKER *locker;
DB_LOCKREQ req, *lvp;
DB_LOGC *logc;
DB_LSN prev_lsn, *lsnp;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
DB_TXNHEAD *txninfo;
LSN_COLLECTION lc;
REP *rep;
__txn_regop_args *txn_args;
__txn_regop_42_args *txn42_args;
__txn_xa_regop_args *prep_args;
u_int32_t rectype;
u_int i;
int ret, t_ret;
db_rep = env->rep_handle;
rep = db_rep->region;
logc = NULL;
txn_args = NULL;
txn42_args = NULL;
prep_args = NULL;
txninfo = NULL;
ENV_ENTER(env, ip);
memset(&data_dbt, 0, sizeof(data_dbt));
if (F_ISSET(env, ENV_THREAD))
F_SET(&data_dbt, DB_DBT_REALLOC);
LOGCOPY_32(env, &rectype, rec->data);
memset(&lc, 0, sizeof(lc));
if (rectype == DB___txn_regop) {
if (rep->version >= DB_REPVERSION_44) {
if ((ret = __txn_regop_read(
env, rec->data, &txn_args)) != 0)
return (ret);
if (txn_args->opcode != TXN_COMMIT) {
__os_free(env, txn_args);
return (0);
}
prev_lsn = txn_args->prev_lsn;
lock_dbt = &txn_args->locks;
} else {
if ((ret = __txn_regop_42_read(
env, rec->data, &txn42_args)) != 0)
return (ret);
if (txn42_args->opcode != TXN_COMMIT) {
__os_free(env, txn42_args);
return (0);
}
prev_lsn = txn42_args->prev_lsn;
lock_dbt = &txn42_args->locks;
}
} else {
DB_ASSERT(env, rectype == DB___txn_xa_regop);
if ((ret = __txn_xa_regop_read(
env, rec->data, &prep_args)) != 0)
return (ret);
prev_lsn = prep_args->prev_lsn;
lock_dbt = &prep_args->locks;
}
if ((ret = __lock_id(env, NULL, &locker)) != 0)
goto err1;
if ((ret =
__lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
goto err;
if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0)
goto err;
qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0)
goto err;
if ((ret = __log_cursor(env, &logc)) != 0)
goto err;
for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
__db_errx(env, "failed to read the log at [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
if ((ret = __db_dispatch(env, &env->recover_dtab,
&data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) {
__db_errx(env, "transaction failed at [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
}
err: memset(&req, 0, sizeof(req));
req.op = DB_LOCK_PUT_ALL;
if ((t_ret =
__lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0)
ret = t_ret;
err1: if (txn_args != NULL)
__os_free(env, txn_args);
if (txn42_args != NULL)
__os_free(env, txn42_args);
if (prep_args != NULL)
__os_free(env, prep_args);
if (lc.array != NULL)
__os_free(env, lc.array);
if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (txninfo != NULL)
__db_txnlist_end(env, txninfo);
if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
__os_ufree(env, data_dbt.data);
#ifdef HAVE_STATISTICS
if (ret == 0)
rep->stat.st_txns_applied++;
#endif
return (ret);
}
static int
__rep_collect_txn(env, lsnp, lc)
ENV *env;
DB_LSN *lsnp;
LSN_COLLECTION *lc;
{
__txn_child_args *argp;
DB_LOGC *logc;
DB_LSN c_lsn;
DBT data;
u_int32_t rectype;
u_int nalloc;
int ret, t_ret;
memset(&data, 0, sizeof(data));
F_SET(&data, DB_DBT_REALLOC);
if ((ret = __log_cursor(env, &logc)) != 0)
return (ret);
while (!IS_ZERO_LSN(*lsnp) &&
(ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) {
LOGCOPY_32(env, &rectype, data.data);
if (rectype == DB___txn_child) {
if ((ret = __txn_child_read(
env, data.data, &argp)) != 0)
goto err;
c_lsn = argp->c_lsn;
*lsnp = argp->prev_lsn;
__os_free(env, argp);
ret = __rep_collect_txn(env, &c_lsn, lc);
} else {
if (lc->nalloc < lc->nlsns + 1) {
nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
if ((ret = __os_realloc(env,
nalloc * sizeof(DB_LSN), &lc->array)) != 0)
goto err;
lc->nalloc = nalloc;
}
lc->array[lc->nlsns++] = *lsnp;
LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data +
sizeof(u_int32_t) + sizeof(u_int32_t));
}
if (ret != 0)
goto err;
}
if (ret != 0)
__db_errx(env, "collect failed at: [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (data.data != NULL)
__os_ufree(env, data.data);
return (ret);
}
static int
__rep_lsn_cmp(lsn1, lsn2)
const void *lsn1, *lsn2;
{
return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2));
}
static int
__rep_newfile(env, rp, rec)
ENV *env;
__rep_control_args *rp;
DBT *rec;
{
DB_LOG *dblp;
DB_LSN tmplsn;
DB_REP *db_rep;
LOG *lp;
REP *rep;
__rep_newfile_args nf_args;
int ret;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
db_rep = env->rep_handle;
rep = db_rep->region;
if (F_ISSET(rep, REP_F_NEWFILE))
return (0);
if (rp->lsn.file + 1 > lp->ready_lsn.file) {
if (rec == NULL || rec->size == 0) {
RPRINT(env, DB_VERB_REP_MISC, (env,
"rep_newfile: Old-style NEWFILE msg. Use control msg log version: %lu",
(u_long) rp->log_version));
nf_args.version = rp->log_version;
} else if (rp->rep_version < DB_REPVERSION_47)
nf_args.version = *(u_int32_t *)rec->data;
else if ((ret = __rep_newfile_unmarshal(env, &nf_args,
rec->data, rec->size, NULL)) != 0)
return (ret);
RPRINT(env, DB_VERB_REP_MISC,
(env, "rep_newfile: File %lu vers %lu",
(u_long)rp->lsn.file + 1, (u_long)nf_args.version));
REP_SYSTEM_LOCK(env);
F_SET(rep, REP_F_NEWFILE);
REP_SYSTEM_UNLOCK(env);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
LOG_SYSTEM_LOCK(env);
ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version);
LOG_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
F_CLR(rep, REP_F_NEWFILE);
REP_SYSTEM_UNLOCK(env);
if (ret == 0)
lp->ready_lsn = tmplsn;
return (ret);
} else
return (0);
}
static int
__rep_do_ckp(env, rec, rp)
ENV *env;
DBT *rec;
__rep_control_args *rp;
{
DB_ENV *dbenv;
__txn_ckp_args *ckp_args;
DB_LSN ckp_lsn;
REP *rep;
int ret;
dbenv = env->dbenv;
if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0)
return (ret);
ckp_lsn = ckp_args->ckp_lsn;
__os_free(env, ckp_args);
rep = env->rep_handle->region;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
DB_TEST_WAIT(env, env->test_check);
(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1);
MUTEX_LOCK(env, rep->mtx_ckp);
ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn);
MUTEX_UNLOCK(env, rep->mtx_ckp);
(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0);
if (ret == 0)
ret = __txn_updateckp(env, &rp->lsn);
else {
__db_errx(env, "Error syncing ckp [%lu][%lu]",
(u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
ret = __env_panic(env, ret);
}
MUTEX_LOCK(env, rep->mtx_clientdb);
return (ret);
}
static int
__rep_remfirst(env, ip, cntrl, rec)
ENV *env;
DB_THREAD_INFO *ip;
DBT *cntrl;
DBT *rec;
{
DB *dbp;
DBC *dbc;
DB_REP *db_rep;
int ret, t_ret;
db_rep = env->rep_handle;
dbp = db_rep->rep_db;
if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
return (ret);
F_SET(cntrl, DB_DBT_REALLOC);
F_SET(rec, DB_DBT_REALLOC);
if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0)
ret = __dbc_del(dbc, 0);
if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
static int
__rep_getnext(env, ip)
ENV *env;
DB_THREAD_INFO *ip;
{
DB *dbp;
DBC *dbc;
DBT lsn_dbt, nextrec_dbt;
DB_LOG *dblp;
DB_REP *db_rep;
LOG *lp;
__rep_control_args *rp;
int ret, t_ret;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
db_rep = env->rep_handle;
dbp = db_rep->rep_db;
if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
return (ret);
memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
memset(&lsn_dbt, 0, sizeof(lsn_dbt));
ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
if (ret != DB_NOTFOUND && ret != 0)
goto err;
if (ret == DB_NOTFOUND) {
ZERO_LSN(lp->waiting_lsn);
goto err;
}
rp = (__rep_control_args *)lsn_dbt.data;
lp->waiting_lsn = rp->lsn;
err: if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
static int
__rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp)
ENV *env;
DB_THREAD_INFO *ip;
__rep_control_args *rp;
DBT *rec;
db_timespec *ret_tsp;
DB_LSN *ret_lsnp;
{
DB *dbp;
DBT control_dbt, key_dbt, rec_dbt;
DB_REP *db_rep;
REP *rep;
db_timespec msg_time;
u_int32_t rectype, txnid;
int ret, t_ret;
db_rep = env->rep_handle;
rep = db_rep->region;
dbp = db_rep->rep_db;
ret = 0;
if (rp->rectype == REP_NEWFILE) {
ret = __rep_newfile(env, rp, rec);
return (0);
}
LOGCOPY_32(env, &rectype, rec->data);
memset(&control_dbt, 0, sizeof(control_dbt));
memset(&rec_dbt, 0, sizeof(rec_dbt));
timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
if (rectype != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) {
if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0)
return (ret);
STAT(rep->stat.st_log_records++);
if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
*ret_lsnp = rp->lsn;
goto out;
}
}
switch (rectype) {
case DB___dbreg_register:
LOGCOPY_32(env, &txnid,
(u_int8_t *)rec->data + sizeof(u_int32_t));
if (txnid == TXN_INVALID)
ret = __db_dispatch(env, &env->recover_dtab,
rec, &rp->lsn, DB_TXN_APPLY, NULL);
break;
case DB___txn_regop:
do {
ret = 0;
if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
ret = __txn_openfiles(env, ip, NULL, 1);
F_SET(db_rep, DBREP_OPENFILES);
}
if (ret == 0)
ret = __rep_process_txn(env, rec);
} while (ret == DB_LOCK_DEADLOCK);
if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
ret = __log_flush(env, NULL);
if (ret != 0) {
__db_errx(env, "Error processing txn [%lu][%lu]",
(u_long)rp->lsn.file, (u_long)rp->lsn.offset);
ret = __env_panic(env, ret);
}
break;
case DB___txn_xa_regop:
ret = __log_flush(env, NULL);
rep->max_prep_lsn = rp->lsn;
RPRINT(env, DB_VERB_REP_MSGS,
(env, "process_rec: prepare at [%lu][%lu]",
(u_long)rep->max_prep_lsn.file,
(u_long)rep->max_prep_lsn.offset));
break;
case DB___txn_ckp:
memset(&key_dbt, 0, sizeof(key_dbt));
key_dbt.data = rp;
key_dbt.size = sizeof(*rp);
ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
if (ret == DB_KEYEXIST) {
if (ret_lsnp != NULL)
*ret_lsnp = rp->lsn;
ret = DB_REP_NOTPERM;
}
if (ret != 0)
break;
if ((ret = __rep_do_ckp(env, rec, rp)) == 0)
ret = __log_rep_put(env, &rp->lsn, rec,
DB_LOG_CHKPNT);
if ((t_ret = __rep_remfirst(env, ip,
&control_dbt, &rec_dbt)) != 0 && ret == 0)
ret = t_ret;
if (ret == 0)
ret = __log_flush(env, NULL);
break;
default:
break;
}
out:
if (ret == 0 && F_ISSET(rp, REPCTL_PERM))
*ret_lsnp = rp->lsn;
if (IS_USING_LEASES(env) &&
F_ISSET(rp, REPCTL_LEASE))
*ret_tsp = msg_time;
if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH))
ret = __log_flush(env, NULL);
if (control_dbt.data != NULL)
__os_ufree(env, control_dbt.data);
if (rec_dbt.data != NULL)
__os_ufree(env, rec_dbt.data);
return (ret);
}
int
__rep_resend_req(env, rereq)
ENV *env;
int rereq;
{
DB_LOG *dblp;
DB_LSN lsn;
DB_REP *db_rep;
LOG *lp;
REP *rep;
int ret;
u_int32_t gapflags, repflags;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
repflags = rep->flags;
if (FLD_ISSET(repflags, REP_F_DELAY))
return (ret);
gapflags = rereq ? REP_GAP_REREQUEST : 0;
if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) {
MUTEX_LOCK(env, rep->mtx_clientdb);
lsn = lp->verify_lsn;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (!IS_ZERO_LSN(lsn))
(void)__rep_send_message(env, rep->master_id,
REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_REREQUEST);
} else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) {
(void)__rep_send_message(env, rep->master_id,
REP_UPDATE_REQ, NULL, NULL, 0, 0);
} else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) {
REP_SYSTEM_LOCK(env);
ret = __rep_pggap_req(env, rep, NULL, gapflags);
REP_SYSTEM_UNLOCK(env);
} else {
MUTEX_LOCK(env, rep->mtx_clientdb);
ret = __rep_loggap_req(env, rep, NULL, gapflags);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
}
return (ret);
}
int
__rep_check_doreq(env, rep)
ENV *env;
REP *rep;
{
DB_LOG *dblp;
LOG *lp;
db_timespec now;
int req;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
__os_gettime(env, &now, 1);
timespecsub(&now, &lp->rcvd_ts);
req = timespeccmp(&now, &lp->wait_ts, >=);
if (req) {
timespecadd(&lp->wait_ts, &lp->wait_ts);
if (timespeccmp(&lp->wait_ts, &rep->max_gap, >))
lp->wait_ts = rep->max_gap;
__os_gettime(env, &lp->rcvd_ts, 1);
}
return (req);
}
static int
__rep_skip_msg(env, rep, eid, rectype)
ENV *env;
REP *rep;
int eid;
u_int32_t rectype;
{
int do_req, ret;
ret = 0;
if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype))
do_req = 1;
else {
MUTEX_LOCK(env, rep->mtx_clientdb);
do_req = __rep_check_doreq(env, rep);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
}
if (do_req && rectype != REP_MASTER_REQ) {
if (rep->master_id == DB_EID_INVALID)
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
else if (eid == rep->master_id)
ret = __rep_resend_req(env, 0);
else if (F_ISSET(rep, REP_F_CLIENT))
(void)__rep_send_message(env,
eid, REP_REREQUEST, NULL, NULL, 0, 0);
}
return (ret);
}
static int
__rep_fire_newmaster(env, gen, master)
ENV *env;
u_int32_t gen;
int master;
{
DB_REP *db_rep;
REP *rep;
db_rep = env->rep_handle;
rep = db_rep->region;
REP_EVENT_LOCK(env);
if (rep->newmaster_event_gen < gen) {
__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
rep->newmaster_event_gen = gen;
}
REP_EVENT_UNLOCK(env);
return (0);
}
static int
__rep_fire_startupdone(env, gen, master)
ENV *env;
u_int32_t gen;
int master;
{
DB_REP *db_rep;
REP *rep;
db_rep = env->rep_handle;
rep = db_rep->region;
REP_EVENT_LOCK(env);
if (rep->newmaster_event_gen < gen) {
__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
rep->newmaster_event_gen = gen;
}
if (rep->newmaster_event_gen == gen)
__rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL);
REP_EVENT_UNLOCK(env);
return (0);
}