#include "db_config.h"
#include "db_int.h"
#include "dbinc/log.h"
static int __rep_chk_newfile __P((DB_ENV *, DB_LOGC *, REP *,
REP_CONTROL *, int));
int
__rep_allreq(dbenv, rp, eid)
DB_ENV *dbenv;
REP_CONTROL *rp;
int eid;
{
DB_LOGC *logc;
DB_LSN log_end, oldfilelsn;
DB_REP *db_rep;
DBT data_dbt, newfiledbt;
REP *rep;
REP_BULK bulk;
REP_THROTTLE repth;
uintptr_t bulkoff;
u_int32_t bulkflags, end_flag, flags, use_bulk, version;
int ret, t_ret;
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
memset(&data_dbt, 0, sizeof(data_dbt));
use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
if (use_bulk && (ret = __rep_bulk_alloc(dbenv, &bulk, eid,
&bulkoff, &bulkflags, REP_BULK_LOG)) != 0)
goto err;
memset(&repth, 0, sizeof(repth));
REP_SYSTEM_LOCK(dbenv);
repth.gbytes = rep->gbytes;
repth.bytes = rep->bytes;
oldfilelsn = repth.lsn = rp->lsn;
repth.type = REP_LOG;
repth.data_dbt = &data_dbt;
REP_SYSTEM_UNLOCK(dbenv);
if ((ret = __logc_get(logc, &log_end, &data_dbt, DB_LAST)) != 0) {
if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER))
ret = 0;
goto err;
}
flags = IS_ZERO_LSN(rp->lsn) ||
IS_INIT_LSN(rp->lsn) ? DB_FIRST : DB_SET;
ret = __logc_get(logc, &repth.lsn, &data_dbt, flags);
if (ret == 0 && repth.lsn.file != 1 && flags == DB_FIRST) {
(void)__rep_send_message(dbenv, eid,
REP_VERIFY_FAIL, &repth.lsn, NULL, 0, 0);
goto err;
}
if (ret == DB_NOTFOUND) {
ret = __rep_chk_newfile(dbenv, logc, rep, rp, eid);
if (ret == 0)
ret = __logc_get(logc, &repth.lsn,
&data_dbt, DB_CURRENT);
if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER)) {
ret = 0;
goto err;
}
if (ret != 0)
goto err;
}
for (end_flag = 0;
ret == 0 && repth.type != REP_LOG_MORE && end_flag == 0;
ret = __logc_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) {
if (repth.lsn.file != oldfilelsn.file) {
if ((ret = __logc_version(logc, &version)) != 0)
break;
memset(&newfiledbt, 0, sizeof(newfiledbt));
newfiledbt.data = &version;
newfiledbt.size = sizeof(version);
(void)__rep_send_message(dbenv,
eid, REP_NEWFILE, &oldfilelsn, &newfiledbt, 0, 0);
}
end_flag = (LOG_COMPARE(&repth.lsn, &log_end) >= 0 &&
(F_ISSET(rep, REP_F_MASTER) ||
rep->stat.st_startup_complete)) ?
REPCTL_LOG_END : 0;
if (use_bulk)
ret = __rep_bulk_message(dbenv, &bulk, &repth,
&repth.lsn, &data_dbt, (REPCTL_RESEND | end_flag));
if (!use_bulk || ret == DB_REP_BULKOVF)
ret = __rep_send_throttle(dbenv,
eid, &repth, 0, end_flag);
if (ret != 0)
break;
oldfilelsn = repth.lsn;
oldfilelsn.offset += logc->len;
}
if (ret == DB_NOTFOUND || ret == DB_REP_UNAVAIL)
ret = 0;
if (use_bulk && (t_ret = __rep_bulk_free(dbenv, &bulk,
(REPCTL_RESEND | end_flag))) != 0 && ret == 0)
ret = t_ret;
err:
if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__rep_log(dbenv, rp, rec, savetime, ret_lsnp)
DB_ENV *dbenv;
REP_CONTROL *rp;
DBT *rec;
time_t savetime;
DB_LSN *ret_lsnp;
{
DB_LOG *dblp;
DB_LSN last_lsn, lsn;
DB_REP *db_rep;
LOG *lp;
REP *rep;
int is_dup, master, ret;
is_dup = ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
ret = __rep_apply(dbenv, rp, rec, ret_lsnp, &is_dup, &last_lsn);
switch (ret) {
case DB_REP_LOGREADY:
if ((ret =
__rep_logready(dbenv, rep, savetime, &last_lsn)) != 0)
goto out;
break;
case DB_REP_ISPERM:
case DB_REP_NOTPERM:
case 0:
if (is_dup)
goto out;
else
break;
default:
goto out;
}
if (rp->rectype == REP_LOG_MORE) {
master = rep->master_id;
MUTEX_LOCK(dbenv, rep->mtx_clientdb);
lsn = lp->ready_lsn;
if (LOG_COMPARE(&rp->lsn, &lsn) > 0)
lsn = rp->lsn;
if (master == DB_EID_INVALID) {
ret = 0;
MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
goto out;
}
if (IS_ZERO_LSN(lp->waiting_lsn))
lp->wait_recs = rep->max_gap;
ret = __rep_loggap_req(dbenv, rep, &lsn, REP_GAP_FORCE);
MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
}
out:
return (ret);
}
int
__rep_bulk_log(dbenv, rp, rec, savetime, ret_lsnp)
DB_ENV *dbenv;
REP_CONTROL *rp;
DBT *rec;
time_t savetime;
DB_LSN *ret_lsnp;
{
DB_REP *db_rep;
REP *rep;
DB_LSN last_lsn;
int ret;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
ret = __log_rep_split(dbenv, rp, rec, ret_lsnp, &last_lsn);
switch (ret) {
case DB_REP_LOGREADY:
ret = __rep_logready(dbenv, rep, savetime, &last_lsn);
break;
default:
break;
}
return (ret);
}
int
__rep_logreq(dbenv, rp, rec, eid)
DB_ENV *dbenv;
REP_CONTROL *rp;
DBT *rec;
int eid;
{
DB_LOGC *logc;
DB_LSN lsn, oldfilelsn;
DB_REP *db_rep;
DBT data_dbt, newfiledbt;
REP *rep;
REP_BULK bulk;
REP_THROTTLE repth;
uintptr_t bulkoff;
u_int32_t bulkflags, use_bulk, version;
int ret, t_ret;
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
if (rec != NULL && rec->size != 0) {
RPRINT(dbenv, (dbenv,
"[%lu][%lu]: LOG_REQ max lsn: [%lu][%lu]",
(u_long) rp->lsn.file, (u_long)rp->lsn.offset,
(u_long)((DB_LSN *)rec->data)->file,
(u_long)((DB_LSN *)rec->data)->offset));
}
memset(&data_dbt, 0, sizeof(data_dbt));
oldfilelsn = lsn = rp->lsn;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
ret = __logc_get(logc, &lsn, &data_dbt, DB_SET);
if (ret == 0)
(void)__rep_send_message(dbenv,
eid, REP_LOG, &lsn, &data_dbt, REPCTL_RESEND, 0);
else if (ret == DB_NOTFOUND) {
ret = __rep_chk_newfile(dbenv, logc, rep, rp, eid);
if (ret == DB_NOTFOUND) {
if (F_ISSET(rep, REP_F_MASTER)) {
__db_errx(dbenv,
"Request for LSN [%lu][%lu] fails",
(u_long)rp->lsn.file,
(u_long)rp->lsn.offset);
ret = EINVAL;
} else
ret = DB_NOTFOUND;
}
}
if (ret != 0)
goto err;
use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
if (use_bulk && (ret = __rep_bulk_alloc(dbenv, &bulk, eid,
&bulkoff, &bulkflags, REP_BULK_LOG)) != 0)
goto err;
memset(&repth, 0, sizeof(repth));
REP_SYSTEM_LOCK(dbenv);
repth.gbytes = rep->gbytes;
repth.bytes = rep->bytes;
repth.type = REP_LOG;
repth.data_dbt = &data_dbt;
REP_SYSTEM_UNLOCK(dbenv);
while (ret == 0 && rec != NULL && rec->size != 0 &&
repth.type == REP_LOG) {
if ((ret =
__logc_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) != 0) {
if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER))
ret = 0;
break;
}
if (LOG_COMPARE(&repth.lsn, (DB_LSN *)rec->data) >= 0)
break;
if (repth.lsn.file != oldfilelsn.file) {
if ((ret = __logc_version(logc, &version)) != 0)
break;
memset(&newfiledbt, 0, sizeof(newfiledbt));
newfiledbt.data = &version;
newfiledbt.size = sizeof(version);
(void)__rep_send_message(dbenv,
eid, REP_NEWFILE, &oldfilelsn, &newfiledbt, 0, 0);
}
if (use_bulk)
ret = __rep_bulk_message(dbenv, &bulk, &repth,
&repth.lsn, &data_dbt, REPCTL_RESEND);
if (!use_bulk || ret == DB_REP_BULKOVF)
ret = __rep_send_throttle(dbenv, eid, &repth, 0, 0);
if (ret != 0) {
if (ret == DB_REP_UNAVAIL)
ret = 0;
break;
}
oldfilelsn = repth.lsn;
oldfilelsn.offset += logc->len;
}
if (use_bulk && (t_ret = __rep_bulk_free(dbenv, &bulk,
REPCTL_RESEND)) != 0 && ret == 0)
ret = t_ret;
err:
if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__rep_loggap_req(dbenv, rep, lsnp, gapflags)
DB_ENV *dbenv;
REP *rep;
DB_LSN *lsnp;
u_int32_t gapflags;
{
DB_LOG *dblp;
DBT max_lsn_dbt, *max_lsn_dbtp;
DB_LSN next_lsn;
LOG *lp;
u_int32_t ctlflags, flags, type;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
if (FLD_ISSET(gapflags, REP_GAP_FORCE))
next_lsn = *lsnp;
else
next_lsn = lp->ready_lsn;
ctlflags = flags = 0;
type = REP_LOG_REQ;
if (FLD_ISSET(gapflags, (REP_GAP_FORCE | REP_GAP_REREQUEST)) ||
IS_ZERO_LSN(lp->max_wait_lsn) ||
(lsnp != NULL && LOG_COMPARE(lsnp, &lp->max_wait_lsn) == 0)) {
lp->max_wait_lsn = lp->waiting_lsn;
if (FLD_ISSET(gapflags, REP_GAP_FORCE)) {
if (LOG_COMPARE(&lp->max_wait_lsn, lsnp) <= 0) {
if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
DB_ASSERT(dbenv, LOG_COMPARE(lsnp,
&rep->last_lsn) <= 0);
lp->max_wait_lsn = rep->last_lsn;
} else
ZERO_LSN(lp->max_wait_lsn);
}
}
if (IS_ZERO_LSN(lp->max_wait_lsn))
type = REP_ALL_REQ;
memset(&max_lsn_dbt, 0, sizeof(max_lsn_dbt));
max_lsn_dbt.data = &lp->max_wait_lsn;
max_lsn_dbt.size = sizeof(lp->max_wait_lsn);
max_lsn_dbtp = &max_lsn_dbt;
if (FLD_ISSET(gapflags, REP_GAP_REREQUEST))
flags = DB_REP_REREQUEST;
else
flags = DB_REP_ANYWHERE;
} else {
max_lsn_dbtp = NULL;
lp->max_wait_lsn = next_lsn;
flags = DB_REP_REREQUEST;
}
if (rep->master_id != DB_EID_INVALID) {
STAT(rep->stat.st_log_requested++);
if (F_ISSET(rep, REP_F_RECOVER_LOG))
ctlflags = REPCTL_INIT;
(void)__rep_send_message(dbenv, rep->master_id,
type, &next_lsn, max_lsn_dbtp, ctlflags, flags);
} else
(void)__rep_send_message(dbenv, DB_EID_BROADCAST,
REP_MASTER_REQ, NULL, NULL, 0, 0);
return (0);
}
int
__rep_logready(dbenv, rep, savetime, last_lsnp)
DB_ENV *dbenv;
REP *rep;
time_t savetime;
DB_LSN *last_lsnp;
{
int ret;
if ((ret = __log_flush(dbenv, NULL)) != 0)
goto out;
if ((ret = __rep_verify_match(dbenv, last_lsnp,
savetime)) == 0) {
REP_SYSTEM_LOCK(dbenv);
ZERO_LSN(rep->first_lsn);
if (rep->originfo != NULL) {
__os_free(dbenv, rep->originfo);
rep->originfo = NULL;
}
F_CLR(rep, REP_F_RECOVER_LOG);
REP_SYSTEM_UNLOCK(dbenv);
} else {
out: __db_errx(dbenv,
"Client initialization failed. Need to manually restore client");
return (__db_panic(dbenv, ret));
}
return (ret);
}
static int
__rep_chk_newfile(dbenv, logc, rep, rp, eid)
DB_ENV *dbenv;
DB_LOGC *logc;
REP *rep;
REP_CONTROL *rp;
int eid;
{
DB_LOG *dblp;
DB_LSN endlsn;
DBT data_dbt, newfiledbt;
LOG *lp;
u_int32_t version;
int ret;
ret = 0;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
memset(&data_dbt, 0, sizeof(data_dbt));
LOG_SYSTEM_LOCK(dbenv);
endlsn = lp->lsn;
LOG_SYSTEM_UNLOCK(dbenv);
if (endlsn.file > rp->lsn.file) {
endlsn.file = rp->lsn.file + 1;
endlsn.offset = 0;
if ((ret = __logc_get(logc,
&endlsn, &data_dbt, DB_SET)) != 0 ||
(ret = __logc_get(logc,
&endlsn, &data_dbt, DB_PREV)) != 0) {
RPRINT(dbenv, (dbenv,
"Unable to get prev of [%lu][%lu]",
(u_long)rp->lsn.file,
(u_long)rp->lsn.offset));
if (F_ISSET(rep, REP_F_MASTER)) {
ret = 0;
(void)__rep_send_message(dbenv, eid,
REP_VERIFY_FAIL, &rp->lsn,
NULL, 0, 0);
} else
ret = DB_NOTFOUND;
} else {
endlsn.offset += logc->len;
if ((ret = __logc_version(logc,
&version)) == 0) {
memset(&newfiledbt, 0,
sizeof(newfiledbt));
newfiledbt.data = &version;
newfiledbt.size = sizeof(version);
(void)__rep_send_message(dbenv, eid,
REP_NEWFILE, &endlsn,
&newfiledbt, 0, 0);
}
}
} else
ret = DB_NOTFOUND;
return (ret);
}