#include "db_config.h"
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/fop.h"
#include "dbinc/lock.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/qam.h"
#include "dbinc/txn.h"
static int __rep_check_uid __P((DB_ENV *, u_int8_t *, u_int8_t *, u_int8_t *));
static int __rep_filedone __P((DB_ENV *, int, REP *, __rep_fileinfo_args *,
u_int32_t));
static int __rep_find_dbs __P((DB_ENV *, u_int8_t **, size_t *,
size_t *, u_int32_t *));
static int __rep_get_fileinfo __P((DB_ENV *, const char *,
const char *, __rep_fileinfo_args *, u_int8_t *, u_int32_t *));
static int __rep_get_file_list __P((DB_ENV *, DB_FH *, DBT *));
static int __rep_log_setup __P((DB_ENV *,
REP *, u_int32_t, u_int32_t, DB_LSN *));
static int __rep_mpf_open __P((DB_ENV *, DB_MPOOLFILE **,
__rep_fileinfo_args *, u_int32_t));
static int __rep_nextfile __P((DB_ENV *, int, REP *));
static int __rep_page_gap __P((DB_ENV *, REP *, __rep_fileinfo_args *,
u_int32_t));
static int __rep_page_sendpages __P((DB_ENV *, int,
__rep_fileinfo_args *, DB_MPOOLFILE *, DB *));
static int __rep_queue_filedone __P((DB_ENV *, REP *, __rep_fileinfo_args *));
static int __rep_remove_all __P((DB_ENV *, DBT *));
static int __rep_remove_file __P((DB_ENV *, u_int8_t *, const char *,
u_int32_t, u_int32_t));
static int __rep_remove_logs __P((DB_ENV *));
static int __rep_remove_by_list __P((DB_ENV *, void *, u_int32_t));
static int __rep_remove_by_prefix __P((DB_ENV *, const char *, const char *,
size_t, APPNAME));
static int __rep_walk_dir __P((DB_ENV *, const char *, u_int8_t **, u_int8_t *,
size_t *, size_t *, u_int32_t *));
static int __rep_write_page __P((DB_ENV *, REP *, __rep_fileinfo_args *));
int
__rep_update_req(dbenv, eid)
DB_ENV *dbenv;
int eid;
{
DBT updbt, vdbt;
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN lsn;
size_t filelen, filesz, updlen;
u_int32_t filecnt, flag, version;
u_int8_t *buf, *fp;
int ret, t_ret;
dblp = dbenv->lg_handle;
logc = NULL;
filecnt = 0;
filelen = 0;
updlen = 0;
filesz = MEGABYTE;
if ((ret = __os_calloc(dbenv, 1, filesz, &buf)) != 0)
return (ret);
fp = buf + sizeof(__rep_update_args);
if ((ret = __rep_find_dbs(
dbenv, &fp, &filesz, &filelen, &filecnt)) != 0)
goto err;
flag = DB_SET;
if ((ret = __log_get_stable_lsn(dbenv, &lsn)) != 0) {
if (ret != DB_NOTFOUND)
goto err;
ret = 0;
flag = DB_FIRST;
}
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto err;
memset(&vdbt, 0, sizeof(vdbt));
if ((ret = __logc_get(logc, &lsn, &vdbt, flag)) != 0) {
if (ret != DB_NOTFOUND)
goto err;
INIT_LSN(lsn);
version = DB_LOGVERSION;
} else {
if ((ret = __logc_version(logc, &version)) != 0)
goto err;
}
if ((ret = __rep_update_buf(buf, filesz, &updlen,
&lsn, version, filecnt)) != 0)
goto err;
DB_INIT_DBT(updbt, buf, filelen + updlen);
LOG_SYSTEM_LOCK(dbenv);
lsn = ((LOG *)dblp->reginfo.primary)->lsn;
LOG_SYSTEM_UNLOCK(dbenv);
(void)__rep_send_message(
dbenv, eid, REP_UPDATE, &lsn, &updbt, 0, 0);
err: __os_free(dbenv, buf);
if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
static int
__rep_find_dbs(dbenv, fp, fileszp, filelenp, filecntp)
DB_ENV *dbenv;
u_int8_t **fp;
size_t *fileszp, *filelenp;
u_int32_t *filecntp;
{
int ret;
char **ddir, *real_dir;
u_int8_t *origfp;
ret = 0;
real_dir = NULL;
if (dbenv->db_data_dir == NULL) {
ret = __rep_walk_dir(dbenv, dbenv->db_home, fp, NULL,
fileszp, filelenp, filecntp);
} else {
origfp = *fp;
for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) {
if ((ret = __db_appname(dbenv, DB_APP_NONE,
*ddir, 0, NULL, &real_dir)) != 0)
break;
if ((ret = __rep_walk_dir(dbenv, real_dir, fp, origfp,
fileszp, filelenp, filecntp)) != 0)
break;
__os_free(dbenv, real_dir);
real_dir = NULL;
}
}
if (ret == 0)
ret = __rep_walk_dir(dbenv, NULL,
fp, NULL, fileszp, filelenp, filecntp);
if (real_dir != NULL)
__os_free(dbenv, real_dir);
return (ret);
}
static int
__rep_walk_dir(dbenv, dir, fp, origfp, fileszp, filelenp, filecntp)
DB_ENV *dbenv;
const char *dir;
u_int8_t **fp, *origfp;
size_t *fileszp, *filelenp;
u_int32_t *filecntp;
{
DBT namedbt, uiddbt;
__rep_fileinfo_args tmpfp;
size_t len, offset;
int cnt, first_file, i, ret;
u_int8_t *rfp, uid[DB_FILE_ID_LEN];
char *file, **names, *subdb;
memset(&namedbt, 0, sizeof(namedbt));
memset(&uiddbt, 0, sizeof(uiddbt));
if (dir == NULL) {
RPRINT(dbenv, (dbenv,
"Walk_dir: Getting info for in-memory named files"));
if ((ret = __memp_inmemlist(dbenv, &names, &cnt)) != 0)
return (ret);
} else {
RPRINT(dbenv, (dbenv,
"Walk_dir: Getting info for dir: %s", dir));
if ((ret = __os_dirlist(dbenv, dir, &names, &cnt)) != 0)
return (ret);
}
rfp = NULL;
if (fp != NULL)
rfp = *fp;
RPRINT(dbenv, (dbenv, "Walk_dir: Dir %s has %d files",
(dir == NULL) ? "INMEM" : dir, cnt));
first_file = 1;
for (i = 0; i < cnt; i++) {
RPRINT(dbenv, (dbenv,
"Walk_dir: File %d name: %s", i, names[i]));
if (strncmp(names[i], "__db", 4) == 0)
continue;
if (strncmp(names[i], "DB_CONFIG", 9) == 0)
continue;
if (strncmp(names[i], "log", 3) == 0)
continue;
if (dir == NULL) {
file = NULL;
subdb = names[i];
} else {
file = names[i];
subdb = NULL;
}
if ((ret = __rep_get_fileinfo(dbenv,
file, subdb, &tmpfp, uid, filecntp)) != 0) {
RPRINT(dbenv, (dbenv,
"Walk_dir: File %d %s: returned error %s",
i, names[i], db_strerror(ret)));
ret = 0;
continue;
}
RPRINT(dbenv, (dbenv,
"Walk_dir: File %d (of %d) %s at 0x%lx: pgsize %lu, max_pgno %lu",
tmpfp.filenum, *filecntp, names[i], P_TO_ULONG(rfp),
(u_long)tmpfp.pgsize, (u_long)tmpfp.max_pgno));
if (first_file && origfp != NULL) {
if (rfp != origfp &&
(ret = __rep_check_uid(dbenv, origfp,
origfp + *filelenp, uid)) != 0) {
if (ret == DB_KEYEXIST) {
ret = 0;
(*filecntp)--;
}
goto err;
}
first_file = 0;
}
DB_SET_DBT(namedbt, names[i], strlen(names[i]) + 1);
DB_SET_DBT(uiddbt, uid, DB_FILE_ID_LEN);
retry: ret = __rep_fileinfo_buf(rfp, *fileszp, &len,
tmpfp.pgsize, tmpfp.pgno, tmpfp.max_pgno,
tmpfp.filenum, tmpfp.id, tmpfp.type,
tmpfp.flags, &uiddbt, &namedbt);
if (ret == ENOMEM) {
offset = (size_t)(rfp - *fp);
*fileszp *= 2;
*fp -= sizeof(__rep_update_args);
if ((ret = __os_realloc(dbenv, *fileszp, *fp)) != 0)
break;
*fp += sizeof(__rep_update_args);
rfp = *fp + offset;
goto retry;
}
rfp += len;
*fp = rfp;
*filelenp += len;
}
err:
__os_dirfree(dbenv, names, cnt);
return (ret);
}
static int
__rep_check_uid(dbenv, fp, endfp, uid)
DB_ENV *dbenv;
u_int8_t *fp, *endfp, *uid;
{
__rep_fileinfo_args *rfp;
u_int8_t *fuid;
int ret;
void *next;
ret = 0;
next = fp;
rfp = NULL;
while (next <= (void *)endfp) {
if ((ret =
__rep_fileinfo_read(dbenv, next, &next, &rfp)) != 0) {
__db_errx(dbenv, "Rep_check_uid: Could not malloc");
goto err;
}
fuid = (u_int8_t *)rfp->uid.data;
if (memcmp(fuid, uid, DB_FILE_ID_LEN) == 0) {
RPRINT(dbenv, (dbenv,
"Check_uid: Found matching file."));
ret = DB_KEYEXIST;
goto err;
}
__os_free(dbenv, rfp);
rfp = NULL;
}
err:
if (rfp != NULL)
__os_free(dbenv, rfp);
return (ret);
}
static int
__rep_get_fileinfo(dbenv, file, subdb, rfp, uid, filecntp)
DB_ENV *dbenv;
const char *file, *subdb;
__rep_fileinfo_args *rfp;
u_int8_t *uid;
u_int32_t *filecntp;
{
DB *dbp, *entdbp;
DB_LOCK lk;
DB_LOG *dblp;
DB_MPOOLFILE *mpf;
DBC *dbc;
DBMETA *dbmeta;
PAGE *pagep;
int i, ret, t_ret;
dbp = NULL;
dbc = NULL;
pagep = NULL;
mpf = NULL;
LOCK_INIT(lk);
if ((ret = __db_create_internal(&dbp, dbenv, 0)) != 0)
goto err;
if ((ret = __db_open(dbp, NULL, file, subdb, DB_UNKNOWN,
DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0),
0, PGNO_BASE_MD)) != 0)
goto err;
if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
goto err;
if ((ret = __db_lget(
dbc, 0, dbp->meta_pgno, DB_LOCK_READ, 0, &lk)) != 0)
goto err;
if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, dbc->txn,
0, &pagep)) != 0)
goto err;
dbmeta = (DBMETA *)pagep;
rfp->pgno = 0;
if (dbp->type == DB_QUEUE)
rfp->max_pgno = 0;
else
rfp->max_pgno = dbmeta->last_pgno;
rfp->pgsize = dbp->pgsize;
memcpy(uid, dbp->fileid, DB_FILE_ID_LEN);
rfp->filenum = (*filecntp)++;
rfp->type = (u_int32_t)dbp->type;
rfp->flags = dbp->flags;
rfp->id = DB_LOGFILEID_INVALID;
ret = __memp_fput(dbp->mpf, pagep, dbc->priority);
pagep = NULL;
if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
ret = t_ret;
if (ret != 0)
goto err;
err:
if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
ret = t_ret;
if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
ret = t_ret;
if (pagep != NULL && (t_ret =
__memp_fput(mpf, pagep, dbc->priority)) != 0 && ret == 0)
ret = t_ret;
if (dbp != NULL && (t_ret = __db_close(dbp, NULL, 0)) != 0 && ret == 0)
ret = t_ret;
if (ret == 0) {
LOG_SYSTEM_LOCK(dbenv);
for (dblp = dbenv->lg_handle,
i = 0; i < dblp->dbentry_cnt; i++) {
entdbp = dblp->dbentry[i].dbp;
if (entdbp == NULL)
break;
DB_ASSERT(dbenv, entdbp->log_filename != NULL);
if (memcmp(uid,
entdbp->log_filename->ufid,
DB_FILE_ID_LEN) == 0)
rfp->id = i;
}
LOG_SYSTEM_UNLOCK(dbenv);
}
return (ret);
}
int
__rep_page_req(dbenv, eid, rec)
DB_ENV *dbenv;
int eid;
DBT *rec;
{
__rep_fileinfo_args *msgfp;
DB *dbp;
DBT msgdbt;
DB_LOG *dblp;
DB_MPOOLFILE *mpf;
DB_REP *db_rep;
REP *rep;
int ret, t_ret;
void *next;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dblp = dbenv->lg_handle;
if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0)
return (ret);
RPRINT(dbenv, (dbenv, "page_req: file %d page %lu to %lu",
msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
LOG_SYSTEM_LOCK(dbenv);
if (msgfp->id >= 0 && dblp->dbentry_cnt > msgfp->id) {
dbp = dblp->dbentry[msgfp->id].dbp;
if (dbp != NULL) {
DB_ASSERT(dbenv, dbp->log_filename != NULL);
if (memcmp(msgfp->uid.data, dbp->log_filename->ufid,
DB_FILE_ID_LEN) == 0) {
LOG_SYSTEM_UNLOCK(dbenv);
RPRINT(dbenv, (dbenv,
"page_req: found %d in dbreg",
msgfp->filenum));
ret = __rep_page_sendpages(dbenv, eid,
msgfp, dbp->mpf, dbp);
goto err;
}
}
}
LOG_SYSTEM_UNLOCK(dbenv);
RPRINT(dbenv,
(dbenv, "page_req: Open %d via mpf_open", msgfp->filenum));
if ((ret = __rep_mpf_open(dbenv, &mpf, msgfp, 0)) != 0) {
memset(&msgdbt, 0, sizeof(msgdbt));
msgdbt.data = msgfp;
msgdbt.size = sizeof(*msgfp);
RPRINT(dbenv, (dbenv, "page_req: Open %d failed",
msgfp->filenum));
if (F_ISSET(rep, REP_F_MASTER))
(void)__rep_send_message(dbenv, eid, REP_FILE_FAIL,
NULL, &msgdbt, 0, 0);
else
ret = DB_NOTFOUND;
goto err;
}
ret = __rep_page_sendpages(dbenv, eid, msgfp, mpf, NULL);
t_ret = __memp_fclose(mpf, 0);
if (ret == 0 && t_ret != 0)
ret = t_ret;
err:
__os_free(dbenv, msgfp);
return (ret);
}
static int
__rep_page_sendpages(dbenv, eid, msgfp, mpf, dbp)
DB_ENV *dbenv;
int eid;
__rep_fileinfo_args *msgfp;
DB_MPOOLFILE *mpf;
DB *dbp;
{
DB *qdbp;
DBT lockdbt, msgdbt, pgdbt;
DB_LOCK lock;
DB_LOCK_ILOCK lock_obj;
DB_LOCKER *locker;
DB_LOG *dblp;
DB_LSN lsn;
DB_REP *db_rep;
PAGE *pagep;
REP *rep;
REP_BULK bulk;
REP_THROTTLE repth;
db_pgno_t p;
uintptr_t bulkoff;
size_t len, msgsz;
u_int32_t bulkflags, use_bulk;
int opened, ret, t_ret;
u_int8_t *buf;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
locker = NULL;
opened = 0;
qdbp = NULL;
buf = NULL;
bulk.addr = NULL;
use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
if (msgfp->type == (u_int32_t)DB_QUEUE) {
if (dbp == NULL) {
if ((ret = __db_create_internal(&qdbp, dbenv, 0)) != 0)
goto err;
if ((ret = __db_open(qdbp, NULL,
FLD_ISSET(msgfp->flags, DB_AM_INMEM) ?
NULL : msgfp->info.data,
FLD_ISSET(msgfp->flags, DB_AM_INMEM) ?
msgfp->info.data : NULL,
DB_UNKNOWN,
DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ?
DB_THREAD : 0), 0, PGNO_BASE_MD)) != 0)
goto err;
opened = 1;
} else
qdbp = dbp;
}
msgsz = sizeof(__rep_fileinfo_args) + DB_FILE_ID_LEN + msgfp->pgsize;
if ((ret = __os_calloc(dbenv, 1, msgsz, &buf)) != 0)
goto err;
memset(&msgdbt, 0, sizeof(msgdbt));
memset(&pgdbt, 0, sizeof(pgdbt));
RPRINT(dbenv, (dbenv, "sendpages: file %d page %lu to %lu",
msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
memset(&repth, 0, sizeof(repth));
if (use_bulk && (ret = __rep_bulk_alloc(dbenv, &bulk, eid,
&bulkoff, &bulkflags, REP_BULK_PAGE)) != 0)
goto err;
REP_SYSTEM_LOCK(dbenv);
repth.gbytes = rep->gbytes;
repth.bytes = rep->bytes;
repth.type = REP_PAGE;
repth.data_dbt = &msgdbt;
REP_SYSTEM_UNLOCK(dbenv);
LOCK_INIT(lock);
memset(&lock_obj, 0, sizeof(lock_obj));
if ((ret = __lock_id(dbenv, NULL, &locker)) != 0)
goto err;
memcpy(lock_obj.fileid, mpf->fileid, DB_FILE_ID_LEN);
lock_obj.type = DB_PAGE_LOCK;
memset(&lockdbt, 0, sizeof(lockdbt));
lockdbt.data = &lock_obj;
lockdbt.size = sizeof(lock_obj);
for (p = msgfp->pgno; p <= msgfp->max_pgno; p++) {
lock_obj.pgno = p;
if ((ret = __lock_get(dbenv, locker, DB_LOCK_NOWAIT, &lockdbt,
DB_LOCK_READ, &lock)) != 0) {
if (ret == DB_LOCK_NOTGRANTED)
continue;
goto err;
}
if (msgfp->type == (u_int32_t)DB_QUEUE && p != 0)
#ifdef HAVE_QUEUE
ret = __qam_fget(qdbp, &p, NULL,
DB_MPOOL_CREATE, &pagep);
#else
ret = DB_PAGE_NOTFOUND;
#endif
else
ret = __memp_fget(mpf, &p, NULL,
DB_MPOOL_CREATE, &pagep);
if (ret == DB_PAGE_NOTFOUND) {
memset(&pgdbt, 0, sizeof(pgdbt));
ZERO_LSN(lsn);
msgfp->pgno = p;
if (F_ISSET(rep, REP_F_MASTER)) {
ret = 0;
RPRINT(dbenv, (dbenv,
"sendpages: PAGE_FAIL on page %lu",
(u_long)p));
(void)__rep_send_message(dbenv, eid,
REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0);
} else
ret = DB_NOTFOUND;
goto lockerr;
} else if (ret != 0)
goto lockerr;
else
DB_SET_DBT(pgdbt, pagep, msgfp->pgsize);
len = 0;
RPRINT(dbenv, (dbenv,
"sendpages: %lu, page lsn [%lu][%lu]", (u_long)p,
(u_long)pagep->lsn.file, (u_long)pagep->lsn.offset));
ret = __rep_fileinfo_buf(buf, msgsz, &len,
msgfp->pgsize, p, msgfp->max_pgno,
msgfp->filenum, msgfp->id, msgfp->type,
msgfp->flags, &msgfp->uid, &pgdbt);
if (msgfp->type != (u_int32_t)DB_QUEUE || p == 0)
t_ret = __memp_fput(mpf, pagep, DB_PRIORITY_UNCHANGED);
#ifdef HAVE_QUEUE
else
t_ret = __qam_fput(qdbp, p, pagep, qdbp->priority);
#endif
if ((t_ret = __ENV_LPUT(dbenv, lock)) != 0 && ret == 0)
ret = t_ret;
if (ret != 0)
goto err;
DB_ASSERT(dbenv, len <= msgsz);
DB_SET_DBT(msgdbt, buf, len);
dblp = dbenv->lg_handle;
LOG_SYSTEM_LOCK(dbenv);
repth.lsn = ((LOG *)dblp->reginfo.primary)->lsn;
LOG_SYSTEM_UNLOCK(dbenv);
if (use_bulk)
ret = __rep_bulk_message(dbenv, &bulk, &repth,
&repth.lsn, &msgdbt, 0);
if (!use_bulk || ret == DB_REP_BULKOVF)
ret = __rep_send_throttle(dbenv, eid, &repth, 0, 0);
RPRINT(dbenv, (dbenv,
"sendpages: %lu, lsn [%lu][%lu]", (u_long)p,
(u_long)repth.lsn.file, (u_long)repth.lsn.offset));
if (repth.type == REP_PAGE_MORE || ret != 0) {
if (ret == DB_REP_UNAVAIL)
ret = 0;
break;
}
}
if (0) {
lockerr: if ((t_ret = __ENV_LPUT(dbenv, lock)) != 0 && ret == 0)
ret = t_ret;
}
err:
if (use_bulk && bulk.addr != NULL &&
(t_ret = __rep_bulk_free(dbenv, &bulk, 0)) != 0 && ret == 0)
ret = t_ret;
if (opened && (t_ret = __db_close(qdbp, NULL, DB_NOSYNC)) != 0 &&
ret == 0)
ret = t_ret;
if (buf != NULL)
__os_free(dbenv, buf);
if (locker != NULL && (t_ret = __lock_id_free(dbenv,
locker)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__rep_update_setup(dbenv, eid, rp, rec)
DB_ENV *dbenv;
int eid;
REP_CONTROL *rp;
DBT *rec;
{
DB_LOG *dblp;
DB_REP *db_rep;
LOG *lp;
REGENV *renv;
REGINFO *infop;
REP *rep;
__rep_update_args *rup;
int ret;
u_int32_t count, infolen;
void *next;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
REP_SYSTEM_LOCK(dbenv);
if (!F_ISSET(rep, REP_F_RECOVER_UPDATE) || IN_ELECTION(rep)) {
REP_SYSTEM_UNLOCK(dbenv);
return (0);
}
F_CLR(rep, REP_F_RECOVER_UPDATE);
F_SET(rep, REP_F_RECOVER_PAGE);
DB_ASSERT(dbenv,
!IS_USING_LEASES(dbenv) || __rep_islease_granted(dbenv) == 0);
if ((ret = __rep_lockout_api(dbenv, rep)) != 0)
goto err;
infop = dbenv->reginfo;
renv = infop->primary;
(void)time(&renv->rep_timestamp);
REP_SYSTEM_UNLOCK(dbenv);
MUTEX_LOCK(dbenv, rep->mtx_clientdb);
lp->wait_recs = rep->request_gap;
lp->rcvd_recs = 0;
ZERO_LSN(lp->ready_lsn);
ZERO_LSN(lp->verify_lsn);
ZERO_LSN(lp->waiting_lsn);
ZERO_LSN(lp->max_wait_lsn);
ZERO_LSN(lp->max_perm_lsn);
if (db_rep->rep_db == NULL)
ret = __rep_client_dbinit(dbenv, 0, REP_DB);
MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
if (ret != 0)
goto err_nolock;
if ((ret = __rep_update_read(dbenv, rec->data, &next, &rup)) != 0)
goto err_nolock;
if ((ret = __db_truncate(db_rep->rep_db, NULL, &count)) != 0)
goto err_nolock;
REP_SYSTEM_LOCK(dbenv);
rep->first_lsn = rup->first_lsn;
rep->first_vers = rup->first_vers;
rep->last_lsn = rp->lsn;
rep->nfiles = rup->num_files;
__os_free(dbenv, rup);
RPRINT(dbenv, (dbenv,
"Update setup for %d files.", rep->nfiles));
RPRINT(dbenv, (dbenv, "Update setup: First LSN [%lu][%lu].",
(u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset));
RPRINT(dbenv, (dbenv, "Update setup: Last LSN [%lu][%lu]",
(u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
if (rep->nfiles > 0) {
infolen = rec->size - sizeof(__rep_update_args);
if ((ret = __os_calloc(dbenv, 1, infolen, &rep->originfo)) != 0)
goto err;
memcpy(rep->originfo, next, infolen);
rep->nextinfo = rep->originfo;
}
if ((ret = __rep_remove_all(dbenv, rec)) != 0)
goto err;
rep->curfile = 0;
if ((ret = __rep_nextfile(dbenv, eid, rep)) != 0)
goto err;
if (0) {
err_nolock: REP_SYSTEM_LOCK(dbenv);
}
err:
if (ret != 0) {
if (rep->originfo != NULL) {
__os_free(dbenv, rep->originfo);
rep->originfo = NULL;
}
RPRINT(dbenv, (dbenv,
"Update_setup: Error: Clear PAGE, set UPDATE again. %s",
db_strerror(ret)));
F_CLR(rep, REP_F_RECOVER_PAGE | REP_F_READY_API |
REP_F_READY_OP);
F_SET(rep, REP_F_RECOVER_UPDATE);
}
REP_SYSTEM_UNLOCK(dbenv);
return (ret);
}
static int
__rep_remove_all(dbenv, rec)
DB_ENV *dbenv;
DBT *rec;
{
__rep_fileinfo_args *finfo;
DB_FH *fhp;
DB_LSN unused;
size_t cnt, filelen, filesz, updlen;
u_int32_t bufsz, filecnt;
char *fname;
int ret, t_ret;
u_int8_t *buf, *fp, *origfp;
ZERO_LSN(unused);
finfo = NULL;
fname = NULL;
fhp = NULL;
filelen = 0;
filecnt = 0;
filesz = MEGABYTE;
if ((ret = __os_calloc(dbenv, 1, filesz, &buf)) != 0)
return (ret);
origfp = fp = buf + sizeof(__rep_update_args);
if ((ret = __rep_find_dbs(
dbenv, &fp, &filesz, &filelen, &filecnt)) != 0)
goto out;
if ((ret = __rep_update_buf(buf, filesz, &updlen,
&unused, 0, filecnt)) != 0)
goto out;
if ((ret = __db_appname(
dbenv, DB_APP_NONE, REP_INITNAME, 0, NULL, &fname)) != 0)
goto out;
bufsz = updlen + filelen;
if ((ret = __os_open(dbenv, fname, 0,
DB_OSO_CREATE | DB_OSO_TRUNC, __db_omode(OWNER_RW), &fhp)) != 0 ||
(ret = __os_write(dbenv, fhp, &bufsz, sizeof(bufsz), &cnt)) != 0 ||
(ret = __os_write(dbenv, fhp, buf, bufsz, &cnt)) != 0 ||
(ret = __os_fsync(dbenv, fhp)) != 0) {
__db_err(dbenv, ret, "%s", fname);
goto out;
}
if ((ret = __rep_remove_logs(dbenv)) != 0)
goto out;
if ((ret = __rep_closefiles(dbenv, 0)) != 0)
goto out;
fp = origfp;
while (filecnt-- > 0) {
if ((ret =__rep_fileinfo_read(dbenv,
fp, (void*)&fp, &finfo)) != 0)
goto out;
if ((ret = __rep_remove_file(dbenv, finfo->uid.data,
finfo->info.data, finfo->type, finfo->flags)) != 0)
goto out;
__os_free(dbenv, finfo);
finfo = NULL;
}
if ((ret = __os_write(dbenv, fhp,
&rec->size, sizeof(rec->size), &cnt)) != 0 ||
(ret = __os_write(dbenv, fhp, rec->data, rec->size, &cnt)) != 0 ||
(ret = __os_fsync(dbenv, fhp)) != 0) {
__db_err(dbenv, ret, "%s", fname);
goto out;
}
out:
if (fhp != NULL && (t_ret = __os_closehandle(dbenv, fhp)) && ret == 0)
ret = t_ret;
if (fname != NULL)
__os_free(dbenv, fname);
if (finfo != NULL)
__os_free(dbenv, finfo);
__os_free(dbenv, buf);
return (ret);
}
static int
__rep_remove_logs(dbenv)
DB_ENV *dbenv;
{
DB_LOG *dblp;
DB_LSN lsn;
LOG *lp;
u_int32_t fnum, lastfile;
int ret;
char *name;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
if ((ret = __memp_sync_int(dbenv,
NULL, 0, DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0)
return (ret);
if ((ret = __log_flush(dbenv, NULL)) != 0)
return (ret);
if (lp->db_log_inmemory) {
ZERO_LSN(lsn);
if ((ret = __log_zero(dbenv, &lsn)) != 0)
return (ret);
} else {
lastfile = lp->lsn.file;
for (fnum = 1; fnum <= lastfile; fnum++) {
if ((ret = __log_name(dblp, fnum, &name, NULL, 0)) != 0)
return (ret);
(void)time(&lp->timestamp);
(void)__os_unlink(dbenv, name);
__os_free(dbenv, name);
}
}
return (0);
}
static int
__rep_remove_file(dbenv, uid, name, type, flags)
DB_ENV *dbenv;
u_int8_t *uid;
const char *name;
u_int32_t type, flags;
{
#ifdef HAVE_QUEUE
DB *dbp;
int ret;
if (type == (u_int32_t)DB_QUEUE && !LF_ISSET(DB_AM_INMEM)) {
if ((ret = __db_create_internal(&dbp, dbenv, 0)) != 0)
return (ret);
if ((ret = __lock_id(dbenv, NULL, &dbp->locker)) != 0)
return (ret);
RPRINT(dbenv, (dbenv, "QAM: Unlink %s via __qam_remove", name));
if ((ret = __qam_remove(dbp, NULL, name, NULL)) != 0) {
RPRINT(dbenv, (dbenv, "qam_remove returned %d", ret));
(void)__db_close(dbp, NULL, DB_NOSYNC);
return (ret);
}
if ((ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0)
return (ret);
}
#else
COMPQUIET(type, 0);
COMPQUIET(flags, 0);
#endif
return (__fop_remove(dbenv, NULL, uid, name, DB_APP_DATA, 0));
}
int
__rep_bulk_page(dbenv, eid, rp, rec)
DB_ENV *dbenv;
int eid;
REP_CONTROL *rp;
DBT *rec;
{
DBT pgrec;
REP_CONTROL tmprp;
u_int32_t len;
int ret;
u_int8_t *p, *ep;
memset(&pgrec, 0, sizeof(pgrec));
memcpy(&tmprp, rp, sizeof(tmprp));
tmprp.rectype = REP_PAGE;
ret = 0;
for (ep = (u_int8_t *)rec->data + rec->size, p = (u_int8_t *)rec->data;
p < ep; p += len) {
memcpy(&len, p, sizeof(len));
p += sizeof(len);
memcpy(&tmprp.lsn, p, sizeof(DB_LSN));
p += sizeof(DB_LSN);
pgrec.data = p;
pgrec.size = len;
RPRINT(dbenv, (dbenv,
"rep_bulk_page: Processing LSN [%lu][%lu]",
(u_long)tmprp.lsn.file, (u_long)tmprp.lsn.offset));
RPRINT(dbenv, (dbenv,
"rep_bulk_page: p %#lx ep %#lx pgrec data %#lx, size %lu (%#lx)",
P_TO_ULONG(p), P_TO_ULONG(ep), P_TO_ULONG(pgrec.data),
(u_long)pgrec.size, (u_long)pgrec.size));
ret = __rep_page(dbenv, eid, &tmprp, &pgrec);
RPRINT(dbenv, (dbenv,
"rep_bulk_page: rep_page ret %d", ret));
if (ret != 0) {
if (ret == DB_REP_PAGEDONE)
ret = 0;
break;
}
}
return (ret);
}
int
__rep_page(dbenv, eid, rp, rec)
DB_ENV *dbenv;
int eid;
REP_CONTROL *rp;
DBT *rec;
{
DB_REP *db_rep;
DBT key, data;
REP *rep;
__rep_fileinfo_args *msgfp;
db_recno_t recno;
int ret;
void *next;
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
if (!F_ISSET(rep, REP_F_RECOVER_PAGE))
return (DB_REP_PAGEDONE);
if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0)
return (ret);
MUTEX_LOCK(dbenv, rep->mtx_clientdb);
REP_SYSTEM_LOCK(dbenv);
DB_ASSERT(dbenv,
!IS_USING_LEASES(dbenv) || __rep_islease_granted(dbenv) == 0);
RPRINT(dbenv, (dbenv,
"PAGE: Received page %lu from file %d",
(u_long)msgfp->pgno, msgfp->filenum));
if (msgfp->filenum != rep->curfile) {
RPRINT(dbenv, (dbenv, "Msg file %d != curfile %d",
msgfp->filenum, rep->curfile));
ret = DB_REP_PAGEDONE;
goto err;
}
if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0) {
RPRINT(dbenv, (dbenv,
"PAGE: Client_dbinit %s", db_strerror(ret)));
goto err;
}
memset(&key, 0, sizeof(key));
memset(&data, 0, sizeof(data));
recno = (db_recno_t)(msgfp->pgno + 1);
key.data = &recno;
key.ulen = key.size = sizeof(db_recno_t);
key.flags = DB_DBT_USERMEM;
ret = __db_put(rep->file_dbp, NULL, &key, &data, DB_NOOVERWRITE);
if (ret == DB_KEYEXIST) {
RPRINT(dbenv, (dbenv,
"PAGE: Received duplicate page %lu from file %d",
(u_long)msgfp->pgno, msgfp->filenum));
STAT(rep->stat.st_pg_duplicated++);
ret = 0;
goto err;
}
if (ret != 0)
goto err;
RPRINT(dbenv, (dbenv,
"PAGE: Write page %lu into mpool", (u_long)msgfp->pgno));
ret = __rep_write_page(dbenv, rep, msgfp);
if (ret != 0) {
(void)__db_del(rep->file_dbp, NULL, &key, 0);
goto err;
}
STAT(rep->stat.st_pg_records++);
rep->npages++;
if (LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0)
rep->last_lsn = rp->lsn;
ret = __rep_filedone(dbenv, eid, rep, msgfp, rp->rectype);
err: REP_SYSTEM_UNLOCK(dbenv);
MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
__os_free(dbenv, msgfp);
return (ret);
}
int
__rep_page_fail(dbenv, eid, rec)
DB_ENV *dbenv;
int eid;
DBT *rec;
{
DB_REP *db_rep;
REP *rep;
__rep_fileinfo_args *msgfp, *rfp;
int ret;
void *next;
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
if (!F_ISSET(rep, REP_F_RECOVER_PAGE))
return (0);
if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0)
return (ret);
MUTEX_LOCK(dbenv, rep->mtx_clientdb);
REP_SYSTEM_LOCK(dbenv);
DB_ASSERT(dbenv,
!IS_USING_LEASES(dbenv) || __rep_islease_granted(dbenv) == 0);
if (msgfp->filenum != rep->curfile) {
RPRINT(dbenv, (dbenv, "Msg file %d != curfile %d",
msgfp->filenum, rep->curfile));
goto out;
}
rfp = rep->curinfo;
if (rfp->type != (u_int32_t)DB_QUEUE)
--rfp->max_pgno;
else {
RPRINT(dbenv, (dbenv,
"page_fail: BEFORE page %lu failed. ready %lu, max %lu, npages %d",
(u_long)msgfp->pgno, (u_long)rep->ready_pg,
(u_long)rfp->max_pgno, rep->npages));
if (msgfp->pgno == rfp->max_pgno)
--rfp->max_pgno;
if (msgfp->pgno >= rep->ready_pg) {
rep->ready_pg = msgfp->pgno + 1;
rep->npages = rep->ready_pg;
}
RPRINT(dbenv, (dbenv,
"page_fail: AFTER page %lu failed. ready %lu, max %lu, npages %d",
(u_long)msgfp->pgno, (u_long)rep->ready_pg,
(u_long)rfp->max_pgno, rep->npages));
}
ret = __rep_filedone(dbenv, eid, rep, msgfp, REP_PAGE_FAIL);
out:
REP_SYSTEM_UNLOCK(dbenv);
MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
__os_free(dbenv, msgfp);
return (ret);
}
static int
__rep_write_page(dbenv, rep, msgfp)
DB_ENV *dbenv;
REP *rep;
__rep_fileinfo_args *msgfp;
{
__rep_fileinfo_args *rfp;
int ret;
void *dst;
rfp = NULL;
rfp = rep->curinfo;
if (rep->file_mpf == NULL) {
if (!F_ISSET(rfp, DB_AM_INMEM)) {
RPRINT(dbenv, (dbenv,
"rep_write_page: Calling fop_create for %s",
(char *)rfp->info.data));
if ((ret = __fop_create(dbenv, NULL, NULL,
rfp->info.data, DB_APP_DATA,
dbenv->db_mode, 0)) != 0)
goto err;
}
if ((ret =
__rep_mpf_open(dbenv, &rep->file_mpf, rep->curinfo,
F_ISSET(rfp, DB_AM_INMEM) ? DB_CREATE : 0)) != 0)
goto err;
}
if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0) {
#ifdef HAVE_QUEUE
ret = __qam_fget(rep->queue_dbp, &msgfp->pgno, NULL,
DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &dst);
#else
ret = __db_no_queue_am(dbenv);
#endif
} else
ret = __memp_fget(rep->file_mpf, &msgfp->pgno, NULL,
DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &dst);
if (ret != 0)
goto err;
memcpy(dst, msgfp->info.data, msgfp->pgsize);
#ifdef HAVE_QUEUE
if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0)
ret = __qam_fput(rep->queue_dbp,
msgfp->pgno, dst, rep->queue_dbp->priority);
else
#endif
ret = __memp_fput(rep->file_mpf, dst, rep->file_dbp->priority);
err: return (ret);
}
static int
__rep_page_gap(dbenv, rep, msgfp, type)
DB_ENV *dbenv;
REP *rep;
__rep_fileinfo_args *msgfp;
u_int32_t type;
{
DB_LOG *dblp;
DBC *dbc;
DBT data, key;
LOG *lp;
__rep_fileinfo_args *rfp;
db_recno_t recno;
int ret, t_ret;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
dbc = NULL;
rfp = rep->curinfo;
if (rfp->filenum != msgfp->filenum) {
ret = DB_REP_PAGEDONE;
goto err;
}
if (msgfp->pgno < rep->ready_pg) {
RPRINT(dbenv, (dbenv,
"PAGE_GAP: pgno %lu < ready %lu, waiting %lu",
(u_long)msgfp->pgno, (u_long)rep->ready_pg,
(u_long)rep->waiting_pg));
goto err;
}
RPRINT(dbenv, (dbenv,
"PAGE_GAP: pgno %lu, max_pg %lu ready %lu, waiting %lu max_wait %lu",
(u_long)msgfp->pgno, (u_long)rfp->max_pgno, (u_long)rep->ready_pg,
(u_long)rep->waiting_pg, (u_long)rep->max_wait_pg));
if (msgfp->pgno > rep->ready_pg) {
if (rep->waiting_pg == PGNO_INVALID ||
msgfp->pgno < rep->waiting_pg)
rep->waiting_pg = msgfp->pgno;
} else {
rep->ready_pg++;
lp->rcvd_recs = 0;
if (rep->ready_pg == rep->waiting_pg) {
lp->wait_recs = 0;
lp->rcvd_recs = 0;
rep->max_wait_pg = PGNO_INVALID;
memset(&key, 0, sizeof(key));
memset(&data, 0, sizeof(data));
if ((ret = __db_cursor(rep->file_dbp, NULL,
&dbc, 0)) != 0)
goto err;
recno = (db_recno_t)rep->waiting_pg + 1;
key.data = &recno;
key.ulen = key.size = sizeof(db_recno_t);
key.flags = DB_DBT_USERMEM;
ret = __dbc_get(dbc, &key, &data, DB_SET);
if (ret != 0)
goto err;
RPRINT(dbenv, (dbenv,
"PAGE_GAP: Set cursor for ready %lu, waiting %lu",
(u_long)rep->ready_pg, (u_long)rep->waiting_pg));
}
while (ret == 0 && rep->ready_pg == rep->waiting_pg) {
rep->ready_pg++;
ret = __dbc_get(dbc, &key, &data, DB_NEXT);
if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY) {
rep->waiting_pg = PGNO_INVALID;
RPRINT(dbenv, (dbenv,
"PAGE_GAP: Next cursor No next - ready %lu, waiting %lu",
(u_long)rep->ready_pg,
(u_long)rep->waiting_pg));
break;
}
rep->waiting_pg = *(db_pgno_t *)key.data;
rep->waiting_pg--;
RPRINT(dbenv, (dbenv,
"PAGE_GAP: Next cursor ready %lu, waiting %lu",
(u_long)rep->ready_pg, (u_long)rep->waiting_pg));
}
}
if (rep->ready_pg > rfp->max_pgno)
goto err;
if ((rep->waiting_pg != PGNO_INVALID &&
rep->ready_pg != rep->waiting_pg) || type == REP_PAGE_MORE) {
if (lp->wait_recs == 0) {
lp->wait_recs = rep->request_gap;
lp->rcvd_recs = 0;
rep->max_wait_pg = PGNO_INVALID;
}
if (type == REP_PAGE_MORE)
rfp->pgno = msgfp->pgno;
if ((__rep_check_doreq(dbenv, rep) || type == REP_PAGE_MORE) &&
((ret = __rep_pggap_req(dbenv, rep, rfp,
(type == REP_PAGE_MORE) ? REP_GAP_FORCE : 0)) != 0))
goto err;
} else {
lp->wait_recs = 0;
rep->max_wait_pg = PGNO_INVALID;
}
err:
if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__rep_init_cleanup(dbenv, rep, force)
DB_ENV *dbenv;
REP *rep;
int force;
{
DB_LOG *dblp;
LOG *lp;
int cleanup_failure, ret, t_ret;
ret = 0;
if (rep->file_mpf != NULL) {
ret = __memp_fclose(rep->file_mpf, 0);
rep->file_mpf = NULL;
}
if (rep->file_dbp != NULL) {
t_ret = __db_close(rep->file_dbp, NULL, DB_NOSYNC);
rep->file_dbp = NULL;
if (t_ret != 0 && ret == 0)
ret = t_ret;
}
if (force && rep->queue_dbp != NULL) {
t_ret = __db_close(rep->queue_dbp, NULL, DB_NOSYNC);
rep->queue_dbp = NULL;
if (t_ret != 0 && ret == 0)
ret = t_ret;
}
if (rep->curinfo != NULL) {
__os_free(dbenv, rep->curinfo);
rep->curinfo = NULL;
}
if (F_ISSET(rep, REP_F_INTERNAL_INIT_MASK) && force) {
RPRINT(dbenv, (dbenv, "clean up interrupted internal init"));
cleanup_failure = 0;
if ((t_ret = __rep_remove_logs(dbenv)) == 0) {
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
if ((t_ret = __rep_log_setup(dbenv,
rep, 1, DB_LOGVERSION, &lp->ready_lsn)) != 0) {
cleanup_failure = 1;
if (ret == 0)
ret = t_ret;
}
} else {
cleanup_failure = 1;
if (ret == 0)
ret = t_ret;
}
if ((t_ret = __rep_remove_by_list(dbenv,
rep->originfo, rep->nfiles)) != 0) {
cleanup_failure = 1;
if (ret == 0)
ret = t_ret;
}
if (!cleanup_failure &&
(t_ret = __rep_remove_init_file(dbenv)) != 0) {
if (ret == 0)
ret = t_ret;
}
if (rep->originfo != NULL) {
__os_free(dbenv, rep->originfo);
rep->originfo = NULL;
}
}
return (ret);
}
static int
__rep_filedone(dbenv, eid, rep, msgfp, type)
DB_ENV *dbenv;
int eid;
REP *rep;
__rep_fileinfo_args *msgfp;
u_int32_t type;
{
__rep_fileinfo_args *rfp;
int ret;
ret = __rep_page_gap(dbenv, rep, msgfp, type);
if (ret == DB_REP_PAGEDONE)
return (0);
rfp = rep->curinfo;
RPRINT(dbenv, (dbenv, "FILEDONE: have %lu pages. Need %lu.",
(u_long)rep->npages, (u_long)rfp->max_pgno + 1));
if (rep->npages <= rfp->max_pgno)
return (0);
if (rfp->type == (u_int32_t)DB_QUEUE &&
((ret = __rep_queue_filedone(dbenv, rep, rfp)) !=
DB_REP_PAGEDONE))
return (ret);
if ((ret = __rep_init_cleanup(dbenv, rep, 0)) != 0)
goto err;
rep->curfile++;
ret = __rep_nextfile(dbenv, eid, rep);
err:
return (ret);
}
static int
__rep_nextfile(dbenv, eid, rep)
DB_ENV *dbenv;
int eid;
REP *rep;
{
DBT dbt;
int ret;
if (rep->master_id != DB_EID_INVALID)
eid = rep->master_id;
if (rep->curfile == rep->nfiles) {
RPRINT(dbenv, (dbenv,
"NEXTFILE: have %d files. RECOVER_LOG now", rep->nfiles));
if ((ret = __memp_sync_int(dbenv, NULL, 0,
DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0)
return (ret);
F_CLR(rep, REP_F_RECOVER_PAGE);
F_SET(rep, REP_F_RECOVER_LOG);
memset(&dbt, 0, sizeof(dbt));
dbt.data = &rep->last_lsn;
dbt.size = sizeof(rep->last_lsn);
REP_SYSTEM_UNLOCK(dbenv);
if ((ret = __rep_log_setup(dbenv, rep,
rep->first_lsn.file, rep->first_vers, NULL)) != 0)
return (ret);
RPRINT(dbenv, (dbenv,
"NEXTFILE: LOG_REQ from LSN [%lu][%lu] to [%lu][%lu]",
(u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset,
(u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
(void)__rep_send_message(dbenv, eid,
REP_LOG_REQ, &rep->first_lsn, &dbt,
REPCTL_INIT, DB_REP_ANYWHERE);
REP_SYSTEM_LOCK(dbenv);
return (0);
}
rep->finfo = rep->nextinfo;
if ((ret = __rep_fileinfo_read(dbenv, rep->finfo, &rep->nextinfo,
&rep->curinfo)) != 0) {
RPRINT(dbenv, (dbenv,
"NEXTINFO: Fileinfo read: %s", db_strerror(ret)));
return (ret);
}
DB_ASSERT(dbenv, rep->curinfo->pgno == 0);
rep->ready_pg = 0;
rep->npages = 0;
rep->waiting_pg = PGNO_INVALID;
rep->max_wait_pg = PGNO_INVALID;
memset(&dbt, 0, sizeof(dbt));
RPRINT(dbenv, (dbenv,
"Next file %d: pgsize %lu, maxpg %lu", rep->curinfo->filenum,
(u_long)rep->curinfo->pgsize, (u_long)rep->curinfo->max_pgno));
dbt.data = rep->finfo;
dbt.size =
(u_int32_t)((u_int8_t *)rep->nextinfo - (u_int8_t *)rep->finfo);
(void)__rep_send_message(dbenv, eid, REP_PAGE_REQ,
NULL, &dbt, 0, DB_REP_ANYWHERE);
return (0);
}
static int
__rep_mpf_open(dbenv, mpfp, rfp, flags)
DB_ENV *dbenv;
DB_MPOOLFILE **mpfp;
__rep_fileinfo_args *rfp;
u_int32_t flags;
{
DB db;
int ret;
if ((ret = __memp_fcreate(dbenv, mpfp)) != 0)
return (ret);
db.dbenv = dbenv;
db.type = (DBTYPE)rfp->type;
db.pgsize = rfp->pgsize;
memcpy(db.fileid, rfp->uid.data, DB_FILE_ID_LEN);
db.flags = rfp->flags;
F_CLR(&db, DB_AM_OPEN_CALLED);
db.mpf = *mpfp;
if (F_ISSET(&db, DB_AM_INMEM))
(void)__memp_set_flags(db.mpf, DB_MPOOL_NOFILE, 1);
if ((ret = __db_env_mpool(&db, rfp->info.data, flags)) != 0) {
(void)__memp_fclose(db.mpf, 0);
*mpfp = NULL;
}
return (ret);
}
int
__rep_pggap_req(dbenv, rep, reqfp, gapflags)
DB_ENV *dbenv;
REP *rep;
__rep_fileinfo_args *reqfp;
u_int32_t gapflags;
{
DBT max_pg_dbt;
__rep_fileinfo_args *tmpfp, t;
size_t len;
u_int32_t flags;
int alloc, ret;
ret = 0;
alloc = 0;
if (rep->curinfo == NULL)
return (0);
if (reqfp == NULL) {
if ((ret = __rep_finfo_alloc(dbenv, rep->curinfo, &tmpfp)) != 0)
return (ret);
alloc = 1;
} else {
t = *reqfp;
tmpfp = &t;
}
flags = 0;
memset(&max_pg_dbt, 0, sizeof(max_pg_dbt));
if (FLD_ISSET(gapflags, REP_GAP_FORCE))
tmpfp->pgno++;
else
tmpfp->pgno = rep->ready_pg;
max_pg_dbt.data = rep->finfo;
max_pg_dbt.size =
(u_int32_t)((u_int8_t *)rep->nextinfo - (u_int8_t *)rep->finfo);
if (rep->max_wait_pg == PGNO_INVALID ||
FLD_ISSET(gapflags, REP_GAP_FORCE | REP_GAP_REREQUEST)) {
if (rep->waiting_pg == PGNO_INVALID) {
if (FLD_ISSET(gapflags,
REP_GAP_FORCE | REP_GAP_REREQUEST))
rep->max_wait_pg = rep->curinfo->max_pgno;
else
rep->max_wait_pg = rep->ready_pg;
} else {
if (FLD_ISSET(gapflags, REP_GAP_FORCE) &&
rep->waiting_pg < tmpfp->pgno)
rep->max_wait_pg = rep->curinfo->max_pgno;
else
rep->max_wait_pg = rep->waiting_pg - 1;
}
tmpfp->max_pgno = rep->max_wait_pg;
if (FLD_ISSET(gapflags, REP_GAP_REREQUEST))
flags = DB_REP_REREQUEST;
else
flags = DB_REP_ANYWHERE;
} else {
rep->max_wait_pg = rep->ready_pg;
tmpfp->max_pgno = rep->ready_pg;
flags = DB_REP_REREQUEST;
}
if (rep->master_id != DB_EID_INVALID) {
STAT(rep->stat.st_pg_requested++);
ret = __rep_fileinfo_buf(rep->finfo, max_pg_dbt.size, &len,
tmpfp->pgsize, tmpfp->pgno, tmpfp->max_pgno,
tmpfp->filenum, tmpfp->id, tmpfp->type,
tmpfp->flags, &tmpfp->uid, &tmpfp->info);
DB_ASSERT(dbenv, len == max_pg_dbt.size);
(void)__rep_send_message(dbenv, rep->master_id,
REP_PAGE_REQ, NULL, &max_pg_dbt, 0, flags);
} else
(void)__rep_send_message(dbenv, DB_EID_BROADCAST,
REP_MASTER_REQ, NULL, NULL, 0, 0);
if (alloc)
__os_free(dbenv, tmpfp);
return (ret);
}
int
__rep_finfo_alloc(dbenv, rfpsrc, rfpp)
DB_ENV *dbenv;
__rep_fileinfo_args *rfpsrc, **rfpp;
{
__rep_fileinfo_args *rfp;
size_t size;
int ret;
void *uidp, *infop;
size = sizeof(__rep_fileinfo_args) + rfpsrc->uid.size +
rfpsrc->info.size;
if ((ret = __os_malloc(dbenv, size, &rfp)) != 0)
return (ret);
memcpy(rfp, rfpsrc, sizeof(__rep_fileinfo_args));
uidp = (u_int8_t *)rfp + sizeof(__rep_fileinfo_args);
rfp->uid.data = uidp;
memcpy(uidp, rfpsrc->uid.data, rfpsrc->uid.size);
infop = (u_int8_t *)uidp + rfpsrc->uid.size;
rfp->info.data = infop;
memcpy(infop, rfpsrc->info.data, rfpsrc->info.size);
*rfpp = rfp;
return (ret);
}
static int
__rep_log_setup(dbenv, rep, file, version, lsnp)
DB_ENV *dbenv;
REP *rep;
u_int32_t file;
u_int32_t version;
DB_LSN *lsnp;
{
DB_LOG *dblp;
DB_LSN lsn;
DB_TXNMGR *mgr;
DB_TXNREGION *region;
LOG *lp;
int ret;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
mgr = dbenv->tx_handle;
region = mgr->reginfo.primary;
LOG_SYSTEM_LOCK(dbenv);
if ((ret = __log_newfile(dblp, &lsn, file, version)) == 0 &&
lsnp != NULL)
*lsnp = lsn;
LOG_SYSTEM_UNLOCK(dbenv);
rep->first_lsn = lp->lsn;
TXN_SYSTEM_LOCK(dbenv);
ZERO_LSN(region->last_ckp);
TXN_SYSTEM_UNLOCK(dbenv);
return (ret);
}
static int
__rep_queue_filedone(dbenv, rep, rfp)
DB_ENV *dbenv;
REP *rep;
__rep_fileinfo_args *rfp;
{
#ifndef HAVE_QUEUE
COMPQUIET(rep, NULL);
COMPQUIET(rfp, NULL);
return (__db_no_queue_am(dbenv));
#else
db_pgno_t first, last;
u_int32_t flags;
int empty, ret, t_ret;
ret = 0;
if (rep->queue_dbp == NULL) {
if ((ret = __memp_sync_int(dbenv, NULL, 0,
DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0)
goto out;
if ((ret =
__db_create_internal(&rep->queue_dbp, dbenv, 0)) != 0)
goto out;
flags = DB_NO_AUTO_COMMIT |
(F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0);
if ((ret = __db_open(rep->queue_dbp, NULL,
FLD_ISSET(rfp->flags, DB_AM_INMEM) ? NULL : rfp->info.data,
FLD_ISSET(rfp->flags, DB_AM_INMEM) ? rfp->info.data : NULL,
DB_QUEUE, flags, 0, PGNO_BASE_MD)) != 0)
goto out;
}
if ((ret = __queue_pageinfo(rep->queue_dbp,
&first, &last, &empty, 0, 0)) != 0)
goto out;
RPRINT(dbenv, (dbenv,
"Queue fileinfo: first %lu, last %lu, empty %d",
(u_long)first, (u_long)last, empty));
if (rfp->max_pgno == 0) {
if (empty)
goto out;
if (first > last) {
rfp->max_pgno =
QAM_RECNO_PAGE(rep->queue_dbp, UINT32_MAX);
} else
rfp->max_pgno = last;
RPRINT(dbenv, (dbenv,
"Queue fileinfo: First req: first %lu, last %lu",
(u_long)first, (u_long)rfp->max_pgno));
goto req;
} else if (rfp->max_pgno != last) {
first = 1;
rfp->max_pgno = last;
RPRINT(dbenv, (dbenv,
"Queue fileinfo: Wrap req: first %lu, last %lu",
(u_long)first, (u_long)last));
req:
rep->npages = first;
rep->ready_pg = first;
rep->waiting_pg = rfp->max_pgno + 1;
rep->max_wait_pg = PGNO_INVALID;
ret = __rep_pggap_req(dbenv, rep, rfp, 0);
return (ret);
}
out:
if (rep->queue_dbp != NULL &&
(t_ret = __db_close(rep->queue_dbp, NULL, DB_NOSYNC)) != 0 &&
ret == 0)
ret = t_ret;
rep->queue_dbp = NULL;
if (ret == 0)
ret = DB_REP_PAGEDONE;
return (ret);
#endif
}
int
__rep_remove_init_file(dbenv)
DB_ENV *dbenv;
{
int ret;
char *name;
if ((ret = __db_appname(
dbenv, DB_APP_NONE, REP_INITNAME, 0, NULL, &name)) != 0)
return (ret);
(void)__os_unlink(dbenv, name);
__os_free(dbenv, name);
return (0);
}
int
__rep_reset_init(dbenv)
DB_ENV *dbenv;
{
DB_FH *fhp;
__rep_update_args *rup;
DBT dbt;
char *allocated_dir, *dir, *init_name;
void *next;
int ret, t_ret;
allocated_dir = NULL;
rup = NULL;
dbt.data = NULL;
if ((ret = __db_appname(
dbenv, DB_APP_NONE, REP_INITNAME, 0, NULL, &init_name)) != 0)
return (ret);
if ((ret = __os_open(dbenv, init_name, 0, DB_OSO_RDONLY,
__db_omode(OWNER_RW), &fhp)) != 0) {
if (ret == ENOENT)
ret = 0;
goto out;
}
RPRINT(dbenv, (dbenv, "Cleaning up interrupted internal init"));
ret = __rep_get_file_list(dbenv, fhp, &dbt);
if ((t_ret = __os_closehandle(dbenv, fhp)) != 0 || ret != 0) {
if (ret == 0)
ret = t_ret;
goto out;
}
if (dbt.data == NULL) {
goto rm;
}
if (dbenv->db_log_dir == NULL)
dir = dbenv->db_home;
else {
if ((ret = __db_appname(dbenv, DB_APP_NONE,
dbenv->db_log_dir, 0, NULL, &dir)) != 0)
goto out;
allocated_dir = dir;
}
if ((ret = __rep_remove_by_prefix(dbenv,
dir, LFPREFIX, sizeof(LFPREFIX)-1, DB_APP_LOG)) != 0)
goto out;
if ((ret = __rep_update_read(dbenv, dbt.data, &next, &rup)) != 0)
goto out;
if ((ret = __rep_remove_by_list(dbenv, next, rup->num_files)) != 0)
goto out;
rm: (void)__os_unlink(dbenv, init_name);
out: if (rup != NULL)
__os_free(dbenv, rup);
if (allocated_dir != NULL)
__os_free(dbenv, allocated_dir);
if (dbt.data != NULL)
__os_free(dbenv, dbt.data);
__os_free(dbenv, init_name);
return (ret);
}
static int
__rep_get_file_list(dbenv, fhp, dbt)
DB_ENV *dbenv;
DB_FH *fhp;
DBT *dbt;
{
u_int32_t length;
size_t cnt;
int i, ret;
dbt->data = NULL;
for (i = 1; i <= 2; i++) {
if ((ret = __os_read(dbenv,
fhp, &length, sizeof(length), &cnt)) != 0)
goto err;
if (cnt == 0 && dbt->data != NULL)
break;
if (cnt != sizeof(length))
goto err;
if ((ret = __os_realloc(dbenv,
(size_t)length, &dbt->data)) != 0)
goto err;
if ((ret = __os_read(
dbenv, fhp, dbt->data, length, &cnt)) != 0 ||
cnt != (size_t)length)
goto err;
}
dbt->size = length;
return (0);
err:
if (dbt->data != NULL)
__os_free(dbenv, dbt->data);
dbt->data = NULL;
return (ret);
}
static int
__rep_remove_by_prefix(dbenv, dir, prefix, pref_len, appname)
DB_ENV *dbenv;
const char *dir;
const char *prefix;
size_t pref_len;
APPNAME appname;
{
char *namep, **names;
int cnt, i, ret;
if ((ret = __os_dirlist(dbenv, dir, &names, &cnt)) != 0)
return (ret);
for (i = 0; i < cnt; i++) {
if (strncmp(names[i], prefix, pref_len) == 0) {
if ((ret = __db_appname(dbenv,
appname, names[i], 0, NULL, &namep)) != 0)
goto out;
(void)__os_unlink(dbenv, namep);
__os_free(dbenv, namep);
}
}
out: __os_dirfree(dbenv, names, cnt);
return (ret);
}
static int
__rep_remove_by_list(dbenv, filelist, count)
DB_ENV *dbenv;
void *filelist;
u_int32_t count;
{
__rep_fileinfo_args *file_argsp;
char **ddir, *dir, *namep;
int ret;
ret = 0;
file_argsp = NULL;
while (count-- > 0) {
if ((ret = __rep_fileinfo_read(dbenv,
filelist, &filelist, &file_argsp)) != 0)
goto out;
if ((ret = __db_appname(dbenv,
DB_APP_DATA, file_argsp->info.data, 0, NULL, &namep)) != 0)
goto out;
(void)__os_unlink(dbenv, namep);
__os_free(dbenv, namep);
__os_free(dbenv, file_argsp);
file_argsp = NULL;
}
if (dbenv->db_data_dir == NULL)
ret = __rep_remove_by_prefix(dbenv, dbenv->db_home,
QUEUE_EXTENT_PREFIX, sizeof(QUEUE_EXTENT_PREFIX) - 1,
DB_APP_DATA);
else {
for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) {
if ((ret = __db_appname(dbenv, DB_APP_NONE,
*ddir, 0, NULL, &dir)) != 0)
break;
ret = __rep_remove_by_prefix(dbenv, dir,
QUEUE_EXTENT_PREFIX, sizeof(QUEUE_EXTENT_PREFIX)-1,
DB_APP_DATA);
__os_free(dbenv, dir);
if (ret != 0)
break;
}
}
out:
if (file_argsp != NULL)
__os_free(dbenv, file_argsp);
return (ret);
}