/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2001-2002 * Sleepycat Software. All rights reserved. */ #include "db_config.h" #ifndef lint static const char revid[] = "$Id: rep_record.c,v 1.1.1.1 2003/02/15 04:56:12 zarzycki Exp $"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES #include #include #endif #include "db_int.h" #include "dbinc/db_page.h" #include "dbinc/db_am.h" #include "dbinc/log.h" #include "dbinc/rep.h" #include "dbinc/txn.h" static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *)); static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *)); static int __rep_lsn_cmp __P((const void *, const void *)); static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DBT *, DB_LSN *)); #define IS_SIMPLE(R) ((R) != DB___txn_regop && \ (R) != DB___txn_ckp && (R) != DB___dbreg_register) /* * __rep_process_message -- * * This routine takes an incoming message and processes it. * * control: contains the control fields from the record * rec: contains the actual record * eidp: contains the machine id of the sender of the message; * in the case of a DB_NEWMASTER message, returns the eid * of the new master. * * PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *)); */ int __rep_process_message(dbenv, control, rec, eidp) DB_ENV *dbenv; DBT *control, *rec; int *eidp; { DB_LOG *dblp; DB_LOGC *logc; DB_LSN init_lsn, lsn, newfilelsn, oldfilelsn; DB_REP *db_rep; DBT *d, data_dbt, lsndbt, mylog; LOG *lp; REP *rep; REP_CONTROL *rp; REP_VOTE_INFO *vi; u_int32_t bytes, gen, gbytes, type, unused; int check_limit, cmp, done, do_req, i; int master, old, recovering, ret, t_ret, *tally; PANIC_CHECK(dbenv); ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN); /* Control argument must be non-Null. */ if (control == NULL || control->size == 0) { __db_err(dbenv, "DB_ENV->rep_process_message: control argument must be specified"); return (EINVAL); } ret = 0; db_rep = dbenv->rep_handle; rep = db_rep->region; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; MUTEX_LOCK(dbenv, db_rep->mutexp); gen = rep->gen; recovering = F_ISSET(rep, REP_F_RECOVER); rep->stat.st_msgs_processed++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); rp = (REP_CONTROL *)control->data; #if 0 __rep_print_message(dbenv, *eidp, rp, "rep_process_message"); #endif /* Complain if we see an improper version number. */ if (rp->rep_version != DB_REPVERSION) { __db_err(dbenv, "unexpected replication message version %d, expected %d", rp->rep_version, DB_REPVERSION); return (EINVAL); } if (rp->log_version != DB_LOGVERSION) { __db_err(dbenv, "unexpected log record version %d, expected %d", rp->log_version, DB_LOGVERSION); return (EINVAL); } /* * Check for generation number matching. Ignore any old messages * except requests that are indicative of a new client that needs * to get in sync. */ if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ && rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ) { /* * We don't hold the rep mutex, and could miscount if we race. */ rep->stat.st_msgs_badgen++; return (0); } if (rp->gen > gen && rp->rectype != REP_ALIVE && rp->rectype != REP_NEWMASTER) return (__rep_send_message(dbenv, DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0)); /* * We need to check if we're in recovery and if we are * then we need to ignore any messages except VERIFY, VOTE, * ELECT (the master might fail while we are recovering), and * ALIVE_REQ. */ if (recovering) switch(rp->rectype) { case REP_ALIVE: case REP_ALIVE_REQ: case REP_ELECT: case REP_NEWCLIENT: case REP_NEWMASTER: case REP_NEWSITE: case REP_VERIFY: R_LOCK(dbenv, &dblp->reginfo); cmp = log_compare(&lp->verify_lsn, &rp->lsn); R_UNLOCK(dbenv, &dblp->reginfo); if (cmp != 0) goto skip; /* FALLTHROUGH */ case REP_VOTE1: case REP_VOTE2: break; default: skip: /* * We don't hold the rep mutex, and could * miscount if we race. */ rep->stat.st_msgs_recover++; /* Check for need to retransmit. */ R_LOCK(dbenv, &dblp->reginfo); do_req = *eidp == rep->master_id && ++lp->rcvd_recs >= lp->wait_recs; if (do_req) { lp->wait_recs *= 2; if (lp->wait_recs + rep->max_gap) lp->wait_recs = rep->max_gap; lp->rcvd_recs = 0; lsn = lp->verify_lsn; } R_UNLOCK(dbenv, &dblp->reginfo); if (do_req) ret = __rep_send_message(dbenv, *eidp, REP_VERIFY_REQ, &lsn, NULL, 0); return (ret); } switch(rp->rectype) { case REP_ALIVE: ANYSITE(dbenv); if (rp->gen > gen && rp->flags) return (__rep_new_master(dbenv, rp, *eidp)); break; case REP_ALIVE_REQ: ANYSITE(dbenv); dblp = dbenv->lg_handle; R_LOCK(dbenv, &dblp->reginfo); lsn = ((LOG *)dblp->reginfo.primary)->lsn; R_UNLOCK(dbenv, &dblp->reginfo); return (__rep_send_message(dbenv, *eidp, REP_ALIVE, &lsn, NULL, F_ISSET(dbenv, DB_ENV_REP_MASTER) ? 1 : 0)); case REP_ALL_REQ: MASTER_ONLY(dbenv); gbytes = bytes = 0; MUTEX_LOCK(dbenv, db_rep->mutexp); gbytes = rep->gbytes; bytes = rep->bytes; MUTEX_UNLOCK(dbenv, db_rep->mutexp); check_limit = gbytes != 0 || bytes != 0; if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); memset(&data_dbt, 0, sizeof(data_dbt)); oldfilelsn = lsn = rp->lsn; type = REP_LOG; for (ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET); ret == 0 && type == REP_LOG; ret = logc->get(logc, &lsn, &data_dbt, DB_NEXT)) { /* * lsn.offset will only be 0 if this is the * beginning of the log; DB_SET, but not DB_NEXT, * can set the log cursor to [n][0]. */ if (lsn.offset == 0) ret = __rep_send_message(dbenv, *eidp, REP_NEWFILE, &lsn, NULL, 0); else { /* * DB_NEXT will never run into offsets * of 0; thus, when a log file changes, * we'll have a real log record with * some lsn [n][m], and we'll also want to send * a NEWFILE message with lsn [n][0]. * So that the client can detect gaps, * send in the rec parameter the * last LSN in the old file. */ if (lsn.file != oldfilelsn.file) { newfilelsn.file = lsn.file; newfilelsn.offset = 0; memset(&lsndbt, 0, sizeof(DBT)); lsndbt.size = sizeof(DB_LSN); lsndbt.data = &oldfilelsn; if ((ret = __rep_send_message(dbenv, *eidp, REP_NEWFILE, &newfilelsn, &lsndbt, 0)) != 0) break; } if (check_limit) { /* * data_dbt.size is only the size of * the log record; it doesn't count * the size of the control structure. * Factor that in as well so we're * not off by a lot if our log * records are small. */ while (bytes < data_dbt.size + sizeof(REP_CONTROL)) { if (gbytes > 0) { bytes += GIGABYTE; --gbytes; continue; } /* * We don't hold the rep mutex, * and may miscount. */ rep->stat.st_nthrottles++; type = REP_LOG_MORE; goto send; } bytes -= (data_dbt.size + sizeof(REP_CONTROL)); } send: ret = __rep_send_message(dbenv, *eidp, type, &lsn, &data_dbt, 0); } /* * In case we're about to change files and need it * for a NEWFILE message, save the current LSN. */ oldfilelsn = lsn; } if (ret == DB_NOTFOUND) ret = 0; if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_ELECT: if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) { R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); MUTEX_LOCK(dbenv, db_rep->mutexp); rep->gen++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); } MUTEX_LOCK(dbenv, db_rep->mutexp); ret = IN_ELECTION(rep) ? 0 : DB_REP_HOLDELECTION; MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (ret); #ifdef NOTYET case REP_FILE: /* TODO */ CLIENT_ONLY(dbenv); break; case REP_FILE_REQ: MASTER_ONLY(dbenv); return (__rep_send_file(dbenv, rec, *eidp)); break; #endif case REP_LOG: case REP_LOG_MORE: CLIENT_ONLY(dbenv); if ((ret = __rep_apply(dbenv, rp, rec)) != 0) return (ret); if (rp->rectype == REP_LOG_MORE) { MUTEX_LOCK(dbenv, db_rep->db_mutexp); master = rep->master_id; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); ret = __rep_send_message(dbenv, master, REP_ALL_REQ, &lsn, NULL, 0); } return (ret); case REP_LOG_REQ: MASTER_ONLY(dbenv); if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); memset(&data_dbt, 0, sizeof(data_dbt)); lsn = rp->lsn; /* * There are three different cases here. * 1. We asked for a particular LSN and got it. * 2. We asked for an LSN of X,0 which is invalid and got the * first log record in a particular file. * 3. We asked for an LSN and it's not found because it is * beyond the end of a log file and we need a NEWFILE msg. */ ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET); cmp = log_compare(&lsn, &rp->lsn); if (ret == 0 && cmp == 0) /* Case 1 */ ret = __rep_send_message(dbenv, *eidp, REP_LOG, &rp->lsn, &data_dbt, 0); else if (ret == DB_NOTFOUND || (ret == 0 && cmp < 0 && rp->lsn.offset == 0)) /* Cases 2 and 3: Send a NEWFILE message. */ ret = __rep_send_message(dbenv, *eidp, REP_NEWFILE, &lsn, NULL, 0); if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_NEWSITE: /* We don't hold the rep mutex, and may miscount. */ rep->stat.st_newsites++; /* This is a rebroadcast; simply tell the application. */ if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) { dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); (void)__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0); } return (DB_REP_NEWSITE); case REP_NEWCLIENT: /* * This message was received and should have resulted in the * application entering the machine ID in its machine table. * We respond to this with an ALIVE to send relevant information * to the new client. But first, broadcast the new client's * record to all the clients. */ if ((ret = __rep_send_message(dbenv, DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0)) != 0) return (ret); if (F_ISSET(dbenv, DB_ENV_REP_CLIENT)) return (0); /* FALLTHROUGH */ case REP_MASTER_REQ: ANYSITE(dbenv); if (F_ISSET(dbenv, DB_ENV_REP_CLIENT)) return (0); dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); case REP_NEWFILE: CLIENT_ONLY(dbenv); return (__rep_apply(dbenv, rp, rec)); case REP_NEWMASTER: ANYSITE(dbenv); if (F_ISSET(dbenv, DB_ENV_REP_MASTER) && *eidp != dbenv->rep_eid) { /* We don't hold the rep mutex, and may miscount. */ rep->stat.st_dupmasters++; return (DB_REP_DUPMASTER); } return (__rep_new_master(dbenv, rp, *eidp)); case REP_PAGE: /* TODO */ CLIENT_ONLY(dbenv); break; case REP_PAGE_REQ: /* TODO */ MASTER_ONLY(dbenv); break; case REP_PLIST: /* TODO */ CLIENT_ONLY(dbenv); break; case REP_PLIST_REQ: /* TODO */ MASTER_ONLY(dbenv); break; case REP_VERIFY: CLIENT_ONLY(dbenv); DB_ASSERT((F_ISSET(rep, REP_F_RECOVER) && !IS_ZERO_LSN(lp->verify_lsn)) || (!F_ISSET(rep, REP_F_RECOVER) && IS_ZERO_LSN(lp->verify_lsn))); if (IS_ZERO_LSN(lp->verify_lsn)) return (0); if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); memset(&mylog, 0, sizeof(mylog)); if ((ret = logc->get(logc, &rp->lsn, &mylog, DB_SET)) != 0) goto rep_verify_err; if (mylog.size == rec->size && memcmp(mylog.data, rec->data, rec->size) == 0) { /* * If we're a logs-only client, we can simply truncate * the log to the point where it last agreed with the * master's; otherwise, recover to that point. */ R_LOCK(dbenv, &dblp->reginfo); ZERO_LSN(lp->verify_lsn); R_UNLOCK(dbenv, &dblp->reginfo); if (F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) { INIT_LSN(init_lsn); if ((ret = dbenv->log_flush(dbenv, &rp->lsn)) != 0 || (ret = __log_vtruncate(dbenv, &rp->lsn, &init_lsn)) != 0) goto rep_verify_err; } else if ((ret = __db_apprec(dbenv, &rp->lsn, 0)) != 0) goto rep_verify_err; /* * The log has been truncated (either by __db_apprec or * directly). We want to make sure we're waiting for * the LSN at the new end-of-log, not some later point. */ R_LOCK(dbenv, &dblp->reginfo); lp->ready_lsn = lp->lsn; ZERO_LSN(lp->waiting_lsn); R_UNLOCK(dbenv, &dblp->reginfo); /* * Discard any log records we have queued; we're * about to re-request them, and can't trust the * ones in the queue. */ MUTEX_LOCK(dbenv, db_rep->db_mutexp); if ((ret = db_rep->rep_db->truncate(db_rep->rep_db, NULL, &unused, 0)) != 0) { MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); goto rep_verify_err; } rep->stat.st_log_queued = 0; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); MUTEX_LOCK(dbenv, db_rep->mutexp); F_CLR(rep, REP_F_RECOVER); /* * If the master_id is invalid, this means that since * the last record was sent, somebody declared an * election and we may not have a master to request * things of. * * This is not an error; when we find a new master, * we'll re-negotiate where the end of the log is and * try to bring ourselves up to date again anyway. */ if ((master = rep->master_id) == DB_EID_INVALID) { DB_ASSERT(IN_ELECTION(rep)); MUTEX_UNLOCK(dbenv, db_rep->mutexp); ret = 0; } else { MUTEX_UNLOCK(dbenv, db_rep->mutexp); ret = __rep_send_message(dbenv, master, REP_ALL_REQ, &rp->lsn, NULL, 0); } } else if ((ret = logc->get(logc, &lsn, &mylog, DB_PREV)) == 0) { R_LOCK(dbenv, &dblp->reginfo); lp->verify_lsn = lsn; lp->rcvd_recs = 0; lp->wait_recs = rep->request_gap; R_UNLOCK(dbenv, &dblp->reginfo); ret = __rep_send_message(dbenv, *eidp, REP_VERIFY_REQ, &lsn, NULL, 0); } rep_verify_err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_VERIFY_FAIL: rep->stat.st_outdated++; return (DB_REP_OUTDATED); case REP_VERIFY_REQ: MASTER_ONLY(dbenv); type = REP_VERIFY; if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); d = &data_dbt; memset(d, 0, sizeof(data_dbt)); F_SET(logc, DB_LOG_SILENT_ERR); ret = logc->get(logc, &rp->lsn, d, DB_SET); /* * If the LSN was invalid, then we might get a not * found, we might get an EIO, we could get anything. * If we get a DB_NOTFOUND, then there is a chance that * the LSN comes before the first file present in which * case we need to return a fail so that the client can return * a DB_OUTDATED. */ if (ret == DB_NOTFOUND && __log_is_outdated(dbenv, rp->lsn.file, &old) == 0 && old != 0) type = REP_VERIFY_FAIL; if (ret != 0) d = NULL; ret = __rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0); if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_VOTE1: if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) { #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Master received vote"); #endif R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); } vi = (REP_VOTE_INFO *)rec->data; MUTEX_LOCK(dbenv, db_rep->mutexp); /* * If you get a vote and you're not in an election, simply * return an indicator to hold an election which will trigger * this site to send its vote again. */ if (!IN_ELECTION(rep)) { #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Not in election, but received vote1"); #endif ret = DB_REP_HOLDELECTION; goto unlock; } if (F_ISSET(rep, REP_F_EPHASE2)) goto unlock; /* Check if this site knows about more sites than we do. */ if (vi->nsites > rep->nsites) rep->nsites = vi->nsites; /* Check if we've heard from this site already. */ tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off); for (i = 0; i < rep->sites; i++) { if (tally[i] == *eidp) /* Duplicate vote. */ goto unlock; } /* * We are keeping vote, let's see if that changes our count of * the number of sites. */ if (rep->sites + 1 > rep->nsites) rep->nsites = rep->sites + 1; if (rep->nsites > rep->asites && (ret = __rep_grow_sites(dbenv, rep->nsites)) != 0) goto unlock; tally[rep->sites] = *eidp; rep->sites++; /* * Change winners if the incoming record has a higher * priority, or an equal priority but a larger LSN, or * an equal priority and LSN but higher "tiebreaker" value. */ #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) { __db_err(dbenv, "%s(eid)%d (pri)%d (gen)%d (sites)%d [%d,%d]", "Existing vote: ", rep->winner, rep->w_priority, rep->w_gen, rep->sites, rep->w_lsn.file, rep->w_lsn.offset); __db_err(dbenv, "Incoming vote: (eid)%d (pri)%d (gen)%d [%d,%d]", *eidp, vi->priority, rp->gen, rp->lsn.file, rp->lsn.offset); } #endif cmp = log_compare(&rp->lsn, &rep->w_lsn); if (vi->priority > rep->w_priority || (vi->priority != 0 && vi->priority == rep->w_priority && (cmp > 0 || (cmp == 0 && vi->tiebreaker > rep->w_tiebreaker)))) { #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Accepting new vote"); #endif rep->winner = *eidp; rep->w_priority = vi->priority; rep->w_lsn = rp->lsn; rep->w_gen = rp->gen; } master = rep->winner; lsn = rep->w_lsn; done = rep->sites == rep->nsites && rep->w_priority != 0; if (done) { #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) { __db_err(dbenv, "Phase1 election done"); __db_err(dbenv, "Voting for %d%s", master, master == rep->eid ? "(self)" : ""); } #endif F_CLR(rep, REP_F_EPHASE1); F_SET(rep, REP_F_EPHASE2); } if (done && master == rep->eid) { rep->votes++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (0); } MUTEX_UNLOCK(dbenv, db_rep->mutexp); /* Vote for someone else. */ if (done) return (__rep_send_message(dbenv, master, REP_VOTE2, NULL, NULL, 0)); /* Election is still going on. */ break; case REP_VOTE2: #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "We received a vote%s", F_ISSET(dbenv, DB_ENV_REP_MASTER) ? " (master)" : ""); #endif if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) { R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); rep->stat.st_elections_won++; return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); } MUTEX_LOCK(dbenv, db_rep->mutexp); /* If we have priority 0, we should never get a vote. */ DB_ASSERT(rep->priority != 0); if (!IN_ELECTION(rep)) { #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Not in election, got vote"); #endif MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (DB_REP_HOLDELECTION); } /* avoid counting duplicates. */ rep->votes++; done = rep->votes > rep->nsites / 2; if (done) { rep->master_id = rep->eid; rep->gen = rep->w_gen + 1; ELECTION_DONE(rep); F_CLR(rep, REP_F_UPGRADE); F_SET(rep, REP_F_MASTER); *eidp = rep->master_id; #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Got enough votes to win; election done; winner is %d", rep->master_id); #endif } MUTEX_UNLOCK(dbenv, db_rep->mutexp); if (done) { R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); /* Declare me the winner. */ #ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "I won, sending NEWMASTER"); #endif rep->stat.st_elections_won++; if ((ret = __rep_send_message(dbenv, DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0)) != 0) break; return (DB_REP_NEWMASTER); } break; default: __db_err(dbenv, "DB_ENV->rep_process_message: unknown replication message: type %lu", (u_long)rp->rectype); return (EINVAL); } return (0); unlock: MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (ret); } /* * __rep_apply -- * * Handle incoming log records on a client, applying when possible and * entering into the bookkeeping table otherwise. This is the guts of * the routine that handles the state machine that describes how we * process and manage incoming log records. */ static int __rep_apply(dbenv, rp, rec) DB_ENV *dbenv; REP_CONTROL *rp; DBT *rec; { __dbreg_register_args dbreg_args; __txn_ckp_args ckp_args; DB_REP *db_rep; DBT control_dbt, key_dbt, lsn_dbt, nextrec_dbt, rec_dbt; DB *dbp; DBC *dbc; DB_LOG *dblp; DB_LSN ckp_lsn, lsn, newfile_lsn, next_lsn, waiting_lsn; LOG *lp; REP *rep; REP_CONTROL lsn_rc; u_int32_t rectype, txnid; int cmp, do_req, eid, have_mutex, ret, t_ret; db_rep = dbenv->rep_handle; rep = db_rep->region; dbp = db_rep->rep_db; dbc = NULL; have_mutex = ret = 0; memset(&control_dbt, 0, sizeof(control_dbt)); memset(&rec_dbt, 0, sizeof(rec_dbt)); /* * If this is a log record and it's the next one in line, simply * write it to the log. If it's a "normal" log record, i.e., not * a COMMIT or CHECKPOINT or something that needs immediate processing, * just return. If it's a COMMIT, CHECKPOINT or LOG_REGISTER (i.e., * not SIMPLE), handle it now. If it's a NEWFILE record, then we * have to be prepared to deal with a logfile change. */ dblp = dbenv->lg_handle; R_LOCK(dbenv, &dblp->reginfo); lp = dblp->reginfo.primary; cmp = log_compare(&rp->lsn, &lp->ready_lsn); /* * This is written to assume that you don't end up with a lot of * records after a hole. That is, it optimizes for the case where * there is only a record or two after a hole. If you have a lot * of records after a hole, what you'd really want to do is write * all of them and then process all the commits, checkpoints, etc. * together. That is more complicated processing that we can add * later if necessary. * * That said, I really don't want to do db operations holding the * log mutex, so the synchronization here is tricky. */ if (cmp == 0) { /* We got the log record that we are expecting. */ if (rp->rectype == REP_NEWFILE) { newfile: ret = __rep_newfile(dbenv, rp, rec, &lp->ready_lsn); /* Make this evaluate to a simple rectype. */ rectype = 0; } else { DB_ASSERT(log_compare(&rp->lsn, &lp->lsn) == 0); ret = __log_rep_put(dbenv, &rp->lsn, rec); lp->ready_lsn = lp->lsn; memcpy(&rectype, rec->data, sizeof(rectype)); if (ret == 0) /* * We may miscount if we race, since we * don't currently hold the rep mutex. */ rep->stat.st_log_records++; } while (ret == 0 && IS_SIMPLE(rectype) && log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) { /* * We just filled in a gap in the log record stream. * Write subsequent records to the log. */ gap_check: lp->wait_recs = 0; lp->rcvd_recs = 0; R_UNLOCK(dbenv, &dblp->reginfo); if (have_mutex == 0) { MUTEX_LOCK(dbenv, db_rep->db_mutexp); have_mutex = 1; } if (dbc == NULL && (ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0) goto err; /* The DBTs need to persist through another call. */ F_SET(&control_dbt, DB_DBT_REALLOC); F_SET(&rec_dbt, DB_DBT_REALLOC); if ((ret = dbc->c_get(dbc, &control_dbt, &rec_dbt, DB_RMW | DB_FIRST)) != 0) goto err; rp = (REP_CONTROL *)control_dbt.data; rec = &rec_dbt; memcpy(&rectype, rec->data, sizeof(rectype)); R_LOCK(dbenv, &dblp->reginfo); /* * We need to check again, because it's possible that * some other thread of control changed the waiting_lsn * or removed that record from the database. */ if (log_compare(&lp->ready_lsn, &rp->lsn) == 0) { if (rp->rectype != REP_NEWFILE) { DB_ASSERT(log_compare (&rp->lsn, &lp->lsn) == 0); ret = __log_rep_put(dbenv, &rp->lsn, rec); lp->ready_lsn = lp->lsn; /* * We may miscount if we race, since we * don't currently hold the rep mutex. */ if (ret == 0) rep->stat.st_log_records++; } else { ret = __rep_newfile(dbenv, rp, rec, &lp->ready_lsn); rectype = 0; } waiting_lsn = lp->waiting_lsn; R_UNLOCK(dbenv, &dblp->reginfo); if ((ret = dbc->c_del(dbc, 0)) != 0) goto err; /* * We may miscount, as we don't hold the rep * mutex. */ --rep->stat.st_log_queued; /* * Update waiting_lsn. We need to move it * forward to the LSN of the next record * in the queue. */ memset(&lsn_dbt, 0, sizeof(lsn_dbt)); F_SET(&lsn_dbt, DB_DBT_USERMEM); lsn_dbt.data = &lsn_rc; lsn_dbt.ulen = sizeof(lsn_rc); memset(&lsn_rc, 0, sizeof(lsn_rc)); /* * If the next item in the database is a log * record--the common case--we're not * interested in its contents, just in its LSN. * If it's a newfile message, though, the * data field may be the LSN of the last * record in the old file, and we need to use * that to determine whether or not there's * a gap. * * Optimize both these cases by doing a partial * get of the data item. If it's a newfile * record, we'll get the whole LSN, and if * it's not, we won't waste time allocating. */ memset(&nextrec_dbt, 0, sizeof(nextrec_dbt)); F_SET(&nextrec_dbt, DB_DBT_USERMEM | DB_DBT_PARTIAL); nextrec_dbt.ulen = nextrec_dbt.dlen = sizeof(newfile_lsn); ZERO_LSN(newfile_lsn); nextrec_dbt.data = &newfile_lsn; ret = dbc->c_get(dbc, &lsn_dbt, &nextrec_dbt, DB_NEXT); if (ret != DB_NOTFOUND && ret != 0) goto err; R_LOCK(dbenv, &dblp->reginfo); if (ret == DB_NOTFOUND) { /* * Do a quick double-check to make * sure waiting_lsn hasn't changed. * It's possible that between the * DB_NOTFOUND return and the R_LOCK, * some record was added to the * database, and we don't want to lose * sight of the fact that it's there. */ if (log_compare(&waiting_lsn, &lp->waiting_lsn) == 0) ZERO_LSN( lp->waiting_lsn); /* * Whether or not the current record is * simple, there's no next one, and * therefore we haven't got anything * else to do right now. Break out. */ break; } DB_ASSERT(lsn_dbt.size == sizeof(lsn_rc)); /* * NEWFILE records have somewhat convoluted * semantics, so there are five cases * pertaining to what the newly-gotten record * is and what we want to do about it. * * 1) This isn't a NEWFILE record. Advance * waiting_lsn and proceed. * * 2) NEWFILE, no LSN stored as the datum, * lsn_rc.lsn == ready_lsn. The NEWFILE * record is next, so set waiting_lsn = * ready_lsn. * * 3) NEWFILE, no LSN stored as the datum, but * lsn_rc.lsn > ready_lsn. There's still a * gap; set waiting_lsn = lsn_rc.lsn. * * 4) NEWFILE, newfile_lsn in datum, and it's < * ready_lsn. (If the datum is non-empty, * it's the LSN of the last record in a log * file, not the end of the log, and * lsn_rc.lsn is the LSN of the start of * the new file--we didn't have the end of * the old log handy when we sent the * record.) No gap--we're ready to * proceed. Set both waiting and ready_lsn * to lsn_rc.lsn. * * 5) NEWFILE, newfile_lsn in datum, and it's >= * ready_lsn. We're still missing at * least one record; set waiting_lsn, * but not ready_lsn, to lsn_rc.lsn. */ if (lsn_rc.rectype == REP_NEWFILE && nextrec_dbt.size > 0 && log_compare( &newfile_lsn, &lp->ready_lsn) < 0) /* Case 4. */ lp->ready_lsn = lp->waiting_lsn = lsn_rc.lsn; else { /* Cases 1, 2, 3, and 5. */ DB_ASSERT(log_compare(&lsn_rc.lsn, &lp->ready_lsn) >= 0); lp->waiting_lsn = lsn_rc.lsn; } /* * If the current rectype is simple, we're * done with it, and we should check and see * whether the next record queued is the next * one we're ready for. This is just the loop * condition, so we continue. * * Otherwise, we need to break out of this loop * and process this record first. */ if (!IS_SIMPLE(rectype)) break; } } /* * Check if we're at a gap in the table and if so, whether we * need to ask for any records. */ do_req = 0; if (!IS_ZERO_LSN(lp->waiting_lsn) && log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) { next_lsn = lp->ready_lsn; do_req = ++lp->rcvd_recs >= lp->wait_recs; if (do_req) { lp->wait_recs = rep->request_gap; lp->rcvd_recs = 0; } } R_UNLOCK(dbenv, &dblp->reginfo); if (dbc != NULL) { if ((ret = dbc->c_close(dbc)) != 0) goto err; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); have_mutex = 0; } dbc = NULL; if (do_req) { MUTEX_LOCK(dbenv, db_rep->mutexp); eid = db_rep->region->master_id; MUTEX_UNLOCK(dbenv, db_rep->mutexp); if (eid != DB_EID_INVALID) { rep->stat.st_log_requested++; if ((ret = __rep_send_message(dbenv, eid, REP_LOG_REQ, &next_lsn, NULL, 0)) != 0) goto err; } } } else if (cmp > 0) { /* * The LSN is higher than the one we were waiting for. * If it is a NEWFILE message, this may not mean that * there's a gap; in some cases, NEWFILE messages contain * the LSN of the beginning of the new file instead * of the end of the old. * * In these cases, the rec DBT will contain the last LSN * of the old file, so we can tell whether there's a gap. */ if (rp->rectype == REP_NEWFILE && rp->lsn.file == lp->ready_lsn.file + 1 && rp->lsn.offset == 0) { DB_ASSERT(rec != NULL && rec->data != NULL && rec->size == sizeof(DB_LSN)); memcpy(&lsn, rec->data, sizeof(DB_LSN)); if (log_compare(&lp->ready_lsn, &lsn) > 0) /* * The last LSN in the old file is smaller * than the one we're expecting, so there's * no gap--the one we're expecting just * doesn't exist. */ goto newfile; } /* * This record isn't in sequence; add it to the table and * update waiting_lsn if necessary. */ memset(&key_dbt, 0, sizeof(key_dbt)); key_dbt.data = rp; key_dbt.size = sizeof(*rp); next_lsn = lp->lsn; do_req = 0; if (lp->wait_recs == 0) { /* * This is a new gap. Initialize the number of * records that we should wait before requesting * that it be resent. We grab the limits out of * the rep without the mutex. */ lp->wait_recs = rep->request_gap; lp->rcvd_recs = 0; } if (++lp->rcvd_recs >= lp->wait_recs) { /* * If we've waited long enough, request the record * and double the wait interval. */ do_req = 1; lp->wait_recs <<= 1; lp->rcvd_recs = 0; if (lp->wait_recs > rep->max_gap) lp->wait_recs = rep->max_gap; } R_UNLOCK(dbenv, &dblp->reginfo); MUTEX_LOCK(dbenv, db_rep->db_mutexp); ret = dbp->put(dbp, NULL, &key_dbt, rec, 0); rep->stat.st_log_queued++; rep->stat.st_log_queued_total++; if (rep->stat.st_log_queued_max < rep->stat.st_log_queued) rep->stat.st_log_queued_max = rep->stat.st_log_queued; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); if (ret != 0) return (ret); R_LOCK(dbenv, &dblp->reginfo); if (IS_ZERO_LSN(lp->waiting_lsn) || log_compare(&rp->lsn, &lp->waiting_lsn) < 0) lp->waiting_lsn = rp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); if (do_req) { /* Request the LSN we are still waiting for. */ MUTEX_LOCK(dbenv, db_rep->mutexp); /* May as well do this after we grab the mutex. */ eid = db_rep->region->master_id; /* * If the master_id is invalid, this means that since * the last record was sent, somebody declared an * election and we may not have a master to request * things of. * * This is not an error; when we find a new master, * we'll re-negotiate where the end of the log is and * try to to bring ourselves up to date again anyway. */ if (eid != DB_EID_INVALID) { rep->stat.st_log_requested++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); ret = __rep_send_message(dbenv, eid, REP_LOG_REQ, &next_lsn, NULL, 0); } else MUTEX_UNLOCK(dbenv, db_rep->mutexp); } return (ret); } else { R_UNLOCK(dbenv, &dblp->reginfo); /* * We may miscount if we race, since we * don't currently hold the rep mutex. */ rep->stat.st_log_duplicated++; } if (ret != 0 || cmp < 0 || (cmp == 0 && IS_SIMPLE(rectype))) goto done; /* * If we got here, then we've got a log record in rp and rec that * we need to process. */ switch(rectype) { case DB___dbreg_register: /* * DB opens occur in the context of a transaction, so we can * simply handle them when we process the transaction. Closes, * however, are not transaction-protected, so we have to * handle them here. * * Note that it should be unsafe for the master to do a close * of a file that was opened in an active transaction, so we * should be guaranteed to get the ordering right. */ memcpy(&txnid, (u_int8_t *)rec->data + ((u_int8_t *)&dbreg_args.txnid - (u_int8_t *)&dbreg_args), sizeof(u_int32_t)); if (txnid == TXN_INVALID && !F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) ret = __db_dispatch(dbenv, dbenv->recover_dtab, dbenv->recover_dtab_size, rec, &rp->lsn, DB_TXN_APPLY, NULL); break; case DB___txn_ckp: /* Sync the memory pool. */ memcpy(&ckp_lsn, (u_int8_t *)rec->data + ((u_int8_t *)&ckp_args.ckp_lsn - (u_int8_t *)&ckp_args), sizeof(DB_LSN)); if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) ret = dbenv->memp_sync(dbenv, &ckp_lsn); else /* * We ought to make sure the logs on a logs-only * replica get flushed now and again. */ ret = dbenv->log_flush(dbenv, &ckp_lsn); /* Update the last_ckp in the txn region. */ if (ret == 0) __txn_updateckp(dbenv, &rp->lsn); break; case DB___txn_regop: if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) do { /* * If an application is doing app-specific * recovery and acquires locks while applying * a transaction, it can deadlock. Any other * locks held by this thread should have been * discarded in the __rep_process_txn error * path, so if we simply retry, we should * eventually succeed. */ ret = __rep_process_txn(dbenv, rec); } while (ret == DB_LOCK_DEADLOCK); break; default: goto err; } /* Check if we need to go back into the table. */ if (ret == 0) { R_LOCK(dbenv, &dblp->reginfo); if (log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) goto gap_check; R_UNLOCK(dbenv, &dblp->reginfo); } done: err: if (dbc != NULL && (t_ret = dbc->c_close(dbc)) != 0 && ret == 0) ret = t_ret; if (have_mutex) MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); if (control_dbt.data != NULL) __os_ufree(dbenv, control_dbt.data); if (rec_dbt.data != NULL) __os_ufree(dbenv, rec_dbt.data); return (ret); } /* * __rep_process_txn -- * * This is the routine that actually gets a transaction ready for * processing. * * PUBLIC: int __rep_process_txn __P((DB_ENV *, DBT *)); */ int __rep_process_txn(dbenv, rec) DB_ENV *dbenv; DBT *rec; { DBT data_dbt; DB_LOCKREQ req, *lvp; DB_LOGC *logc; DB_LSN prev_lsn, *lsnp; DB_REP *db_rep; LSN_COLLECTION lc; REP *rep; __txn_regop_args *txn_args; __txn_xa_regop_args *prep_args; u_int32_t lockid, op, rectype; int i, ret, t_ret; int (**dtab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *)); size_t dtabsize; void *txninfo; db_rep = dbenv->rep_handle; rep = db_rep->region; logc = NULL; txninfo = NULL; memset(&data_dbt, 0, sizeof(data_dbt)); if (F_ISSET(dbenv, DB_ENV_THREAD)) F_SET(&data_dbt, DB_DBT_REALLOC); /* * There are two phases: First, we have to traverse * backwards through the log records gathering the list * of all LSNs in the transaction. Once we have this information, * we can loop through, acquire the locks we need for each record, * and then apply it. */ dtab = NULL; /* * We may be passed a prepare (if we're restoring a prepare * on upgrade) instead of a commit (the common case). * Check which and behave appropriately. */ memcpy(&rectype, rec->data, sizeof(rectype)); memset(&lc, 0, sizeof(lc)); if (rectype == DB___txn_regop) { /* * We're the end of a transaction. Make sure this is * really a commit and not an abort! */ if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0) return (ret); op = txn_args->opcode; prev_lsn = txn_args->prev_lsn; __os_free(dbenv, txn_args); if (op != TXN_COMMIT) return (0); } else { /* We're a prepare. */ DB_ASSERT(rectype == DB___txn_xa_regop); if ((ret = __txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0) return (ret); prev_lsn = prep_args->prev_lsn; __os_free(dbenv, prep_args); } /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */ if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0) return (ret); qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp); if ((ret = dbenv->lock_id(dbenv, &lockid)) != 0) goto err; /* Initialize the getpgno dispatch table. */ if ((ret = __rep_lockpgno_init(dbenv, &dtab, &dtabsize)) != 0) goto err; /* * The set of records for a transaction may include dbreg_register * records. Create a txnlist so that they can keep track of file * state between records. */ if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0) goto err; /* Phase 2: Apply updates. */ if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) goto err; for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) { if ((ret = __rep_lockpages(dbenv, dtab, dtabsize, lsnp, NULL, NULL, lockid)) != 0) goto err; if ((ret = logc->get(logc, lsnp, &data_dbt, DB_SET)) != 0) goto err; if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab, dbenv->recover_dtab_size, &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) goto err; } err: memset(&req, 0, sizeof(req)); req.op = DB_LOCK_PUT_ALL; if ((t_ret = dbenv->lock_vec(dbenv, lockid, DB_LOCK_FREE_LOCKER, &req, 1, &lvp)) != 0 && ret == 0) ret = t_ret; if (lc.nalloc != 0) __os_free(dbenv, lc.array); if ((t_ret = dbenv->lock_id_free(dbenv, lockid)) != 0 && ret == 0) ret = t_ret; if (logc != NULL && (t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; if (txninfo != NULL) __db_txnlist_end(dbenv, txninfo); if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL) __os_ufree(dbenv, data_dbt.data); if (dtab != NULL) __os_free(dbenv, dtab); if (ret == 0) /* * We don't hold the rep mutex, and could miscount if we race. */ rep->stat.st_txns_applied++; return (ret); } /* * __rep_collect_txn * Recursive function that will let us visit every entry in a transaction * chain including all child transactions so that we can then apply * the entire transaction family at once. */ static int __rep_collect_txn(dbenv, lsnp, lc) DB_ENV *dbenv; DB_LSN *lsnp; LSN_COLLECTION *lc; { __txn_child_args *argp; DB_LOGC *logc; DB_LSN c_lsn; DBT data; u_int32_t rectype; int nalloc, ret, t_ret; memset(&data, 0, sizeof(data)); F_SET(&data, DB_DBT_REALLOC); if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); while (!IS_ZERO_LSN(*lsnp) && (ret = logc->get(logc, lsnp, &data, DB_SET)) == 0) { memcpy(&rectype, data.data, sizeof(rectype)); if (rectype == DB___txn_child) { if ((ret = __txn_child_read(dbenv, data.data, &argp)) != 0) goto err; c_lsn = argp->c_lsn; *lsnp = argp->prev_lsn; __os_free(dbenv, argp); ret = __rep_collect_txn(dbenv, &c_lsn, lc); } else { if (lc->nalloc < lc->nlsns + 1) { nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2; if ((ret = __os_realloc(dbenv, nalloc * sizeof(DB_LSN), &lc->array)) != 0) goto err; lc->nalloc = nalloc; } lc->array[lc->nlsns++] = *lsnp; /* * Explicitly copy the previous lsn. The record * starts with a u_int32_t record type, a u_int32_t * txn id, and then the DB_LSN (prev_lsn) that we * want. We copy explicitly because we have no idea * what kind of record this is. */ memcpy(lsnp, (u_int8_t *)data.data + sizeof(u_int32_t) + sizeof(u_int32_t), sizeof(DB_LSN)); } if (ret != 0) goto err; } err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; if (data.data != NULL) __os_ufree(dbenv, data.data); return (ret); } /* * __rep_lsn_cmp -- * qsort-type-compatible wrapper for log_compare. */ static int __rep_lsn_cmp(lsn1, lsn2) const void *lsn1, *lsn2; { return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2)); } /* * __rep_newfile -- * NEWFILE messages can contain either the last LSN of the old file * or the first LSN of the new one, depending on which we have available * when the message is sent. When applying a NEWFILE message, make sure * we haven't already swapped files, as it's possible (given the right sequence * of out-of-order messages) to wind up with a NEWFILE message of each * variety, and __rep_apply won't detect the two as duplicates of each other. */ static int __rep_newfile(dbenv, rc, msgdbt, lsnp) DB_ENV *dbenv; REP_CONTROL *rc; DBT *msgdbt; DB_LSN *lsnp; { DB_LOG *dblp; LOG *lp; u_int32_t newfile; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; /* * A NEWFILE message containing the old file's LSN will be * accompanied by a NULL rec DBT; one containing the new one's LSN * will need to supply the last record in the old file by * sending it in the rec DBT. */ if (msgdbt == NULL || msgdbt->size == 0) newfile = rc->lsn.file + 1; else newfile = rc->lsn.file; if (newfile > lp->lsn.file) return (__log_newfile(dblp, lsnp)); else { /* We've already applied this NEWFILE. Just ignore it. */ *lsnp = lp->lsn; return (0); } }