/*- * See the file LICENSE for redistribution information. * * Copyright (c) 1999,2007 Oracle. All rights reserved. * * $Id: bt_compact.c,v 12.62 2007/05/17 15:14:46 bostic Exp $ */ #include "db_config.h" #include "db_int.h" #include "dbinc/db_page.h" #include "dbinc/btree.h" #include "dbinc/lock.h" #include "dbinc/log.h" #include "dbinc/mp.h" #include "dbinc/txn.h" static int __bam_compact_dups __P((DBC *, PAGE **, u_int32_t, int, DB_COMPACT *, int *)); static int __bam_compact_int __P((DBC *, DBT *, DBT *, u_int32_t, int *, DB_COMPACT *, int *)); static int __bam_csearch __P((DBC *, DBT *, u_int32_t, int)); static int __bam_merge __P((DBC *, DBC *, u_int32_t, DBT *, DB_COMPACT *,int *)); static int __bam_merge_internal __P((DBC *, DBC *, int, DB_COMPACT *, int *)); static int __bam_merge_pages __P((DBC *, DBC *, DB_COMPACT *)); static int __bam_merge_records __P((DBC *, DBC*, u_int32_t, DB_COMPACT *)); static int __bam_truncate_internal_overflow __P((DBC *, PAGE *, DB_COMPACT *)); static int __bam_truncate_overflow __P((DBC *, db_pgno_t, db_pgno_t, DB_COMPACT *)); static int __bam_truncate_page __P((DBC *, PAGE **, int)); static int __bam_truncate_root_page __P((DBC *, PAGE *, u_int32_t, DB_COMPACT *)); #ifdef HAVE_FTRUNCATE static int __bam_free_freelist __P((DB *, DB_TXN *)); static int __bam_savekey __P((DBC *, int, DBT *)); static int __bam_setup_freelist __P((DB *, struct pglist *, u_int32_t)); static int __bam_truncate_internal __P((DB *, DB_TXN *, DB_COMPACT *)); #endif #define SAVE_START \ do { \ save_data = *c_data; \ ret = __db_retcopy(dbenv, \ &save_start, current.data, current.size, \ &save_start.data, &save_start.ulen); \ } while (0) /* * Only restore those things that are negated by aborting the * transaction. We don't restore the number of deadlocks, for example. */ #define RESTORE_START \ do { \ c_data->compact_pages_free = \ save_data.compact_pages_free; \ c_data->compact_levels = save_data.compact_levels; \ c_data->compact_truncate = save_data.compact_truncate; \ ret = __db_retcopy(dbenv, ¤t, \ save_start.data, save_start.size, \ ¤t.data, ¤t.ulen); \ } while (0) /* * __bam_compact -- compact a btree. * * PUBLIC: int __bam_compact __P((DB *, DB_TXN *, * PUBLIC: DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *)); */ int __bam_compact(dbp, txn, start, stop, c_data, flags, end) DB *dbp; DB_TXN *txn; DBT *start, *stop; DB_COMPACT *c_data; u_int32_t flags; DBT *end; { DBT current, save_start; DBC *dbc; DB_COMPACT save_data; DB_ENV *dbenv; u_int32_t factor; int deadlock, done, ret, span, t_ret, txn_local; #ifdef HAVE_FTRUNCATE struct pglist *list; db_pgno_t last_pgno; u_int32_t nelems, truncated; #endif dbenv = dbp->dbenv; memset(¤t, 0, sizeof(current)); memset(&save_start, 0, sizeof(save_start)); dbc = NULL; factor = 0; deadlock = done = ret = span = 0; #ifdef HAVE_FTRUNCATE list = NULL; last_pgno = 0; nelems = truncated = 0; #endif /* * We pass "current" to the internal routine, indicating where that * routine should begin its work and expecting that it will return to * us the last key that it processed. */ if (start != NULL && (ret = __db_retcopy(dbenv, ¤t, start->data, start->size, ¤t.data, ¤t.ulen)) != 0) return (ret); if (IS_DB_AUTO_COMMIT(dbp, txn)) txn_local = 1; else txn_local = 0; if (!LF_ISSET(DB_FREE_SPACE | DB_FREELIST_ONLY)) goto no_free; if (LF_ISSET(DB_FREELIST_ONLY)) LF_SET(DB_FREE_SPACE); #ifdef HAVE_FTRUNCATE /* Sort the freelist and set up the in-memory list representation. */ if (txn_local && (ret = __txn_begin(dbenv, NULL, &txn, 0)) != 0) goto err; if ((ret = __db_free_truncate(dbp, txn, flags, c_data, &list, &nelems, &last_pgno)) != 0) { LF_CLR(DB_FREE_SPACE); goto terr; } /* If the freelist is empty and we are not filling, get out. */ if (nelems == 0 && LF_ISSET(DB_FREELIST_ONLY)) { ret = 0; LF_CLR(DB_FREE_SPACE); goto terr; } if ((ret = __bam_setup_freelist(dbp, list, nelems)) != 0) { /* Someone else owns the free list. */ if (ret == EBUSY) ret = 0; } /* Commit the txn and release the meta page lock. */ terr: if (txn_local) { if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0) ret = t_ret; txn = NULL; } if (ret != 0) goto err; /* Save the number truncated so far, we will add what we get below. */ truncated = c_data->compact_pages_truncated; if (LF_ISSET(DB_FREELIST_ONLY)) goto done; #endif /* * We want factor to be the target number of free bytes on each page, * so we know when to stop adding items to a page. Make sure to * subtract the page overhead when computing this target. This can * result in a 1-2% error on the smallest page. * First figure out how many bytes we should use: */ no_free: factor = dbp->pgsize - SIZEOF_PAGE; if (c_data->compact_fillpercent != 0) { factor *= c_data->compact_fillpercent; factor /= 100; } /* Now convert to the number of free bytes to target. */ factor = (dbp->pgsize - SIZEOF_PAGE) - factor; if (c_data->compact_pages == 0) c_data->compact_pages = DB_MAX_PAGES; do { deadlock = 0; SAVE_START; if (ret != 0) break; if (txn_local) { if ((ret = __txn_begin(dbenv, NULL, &txn, 0)) != 0) break; if (c_data->compact_timeout != 0 && (ret = __txn_set_timeout(txn, c_data->compact_timeout, DB_SET_LOCK_TIMEOUT)) != 0) goto err; } if ((ret = __db_cursor(dbp, txn, &dbc, 0)) != 0) goto err; if ((ret = __bam_compact_int(dbc, ¤t, stop, factor, &span, c_data, &done)) == DB_LOCK_DEADLOCK && txn_local) { /* * We retry on deadlock. Cancel the statistics * and reset the start point to before this * iteration. */ deadlock = 1; c_data->compact_deadlock++; RESTORE_START; } if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) ret = t_ret; err: if (txn_local && txn != NULL) { if (ret == 0 && deadlock == 0) ret = __txn_commit(txn, DB_TXN_NOSYNC); else if ((t_ret = __txn_abort(txn)) != 0 && ret == 0) ret = t_ret; txn = NULL; } } while (ret == 0 && !done); if (ret == 0 && end != NULL) ret = __db_retcopy(dbenv, end, current.data, current.size, &end->data, &end->ulen); if (current.data != NULL) __os_free(dbenv, current.data); if (save_start.data != NULL) __os_free(dbenv, save_start.data); #ifdef HAVE_FTRUNCATE /* * Finish up truncation work. If there are pages left in the free * list then search the internal nodes of the tree as we may have * missed some while walking the leaf nodes. Then calculate how * many pages we have truncated and release the in-memory free list. */ done: if (LF_ISSET(DB_FREE_SPACE)) { DBMETA *meta; db_pgno_t pgno; pgno = PGNO_BASE_MD; done = 1; if (ret == 0 && !LF_ISSET(DB_FREELIST_ONLY) && (t_ret = __memp_fget(dbp->mpf, &pgno, txn, 0, &meta)) == 0) { done = meta->free == PGNO_INVALID; ret = __memp_fput(dbp->mpf, meta, dbc->priority); } if (!done) ret = __bam_truncate_internal(dbp, txn, c_data); /* Clean up the free list. */ if (list != NULL) __os_free(dbenv, list); if ((t_ret = __memp_fget(dbp->mpf, &pgno, txn, 0, &meta)) == 0) { c_data->compact_pages_truncated = truncated + last_pgno - meta->last_pgno; if ((t_ret = __memp_fput( dbp->mpf, meta, dbc->priority)) != 0 && ret == 0) ret = t_ret; } else if (ret == 0) ret = t_ret; if ((t_ret = __bam_free_freelist(dbp, txn)) != 0 && ret == 0) t_ret = ret; } #endif return (ret); } /* * __bam_csearch -- isolate search code for bam_compact. * This routine hides the differences between searching * a BTREE and a RECNO from the rest of the code. */ #define CS_READ 0 /* We are just reading. */ #define CS_PARENT 1 /* We want the parent too, write lock. */ #define CS_NEXT 2 /* Get the next page. */ #define CS_NEXT_WRITE 3 /* Get the next page and write lock. */ #define CS_DEL 4 /* Get a stack to delete a page. */ #define CS_START 5 /* Starting level for stack, write lock. */ #define CS_GETRECNO 0x80 /* Extract record number from start. */ static int __bam_csearch(dbc, start, sflag, level) DBC *dbc; DBT *start; u_int32_t sflag; int level; { BTREE_CURSOR *cp; int not_used, ret; cp = (BTREE_CURSOR *)dbc->internal; if (dbc->dbtype == DB_RECNO) { /* If GETRECNO is not set the cp->recno is what we want. */ if (FLD_ISSET(sflag, CS_GETRECNO)) { if (start == NULL || start->size == 0) cp->recno = 1; else if ((ret = __ram_getno(dbc, start, &cp->recno, 0)) != 0) return (ret); FLD_CLR(sflag, CS_GETRECNO); } switch (sflag) { case CS_READ: sflag = SR_READ; break; case CS_NEXT: sflag = SR_PARENT | SR_READ; break; case CS_START: level = LEAFLEVEL; /* FALLTHROUGH */ case CS_DEL: case CS_NEXT_WRITE: sflag = SR_STACK; break; case CS_PARENT: sflag = SR_PARENT | SR_WRITE; break; default: return (__db_panic(dbc->dbp->dbenv, EINVAL)); } if ((ret = __bam_rsearch(dbc, &cp->recno, sflag, level, ¬_used)) != 0) return (ret); /* Reset the cursor's recno to the beginning of the page. */ cp->recno -= cp->csp->indx; } else { FLD_CLR(sflag, CS_GETRECNO); switch (sflag) { case CS_READ: sflag = SR_READ | SR_DUPFIRST; break; case CS_DEL: sflag = SR_DEL; break; case CS_NEXT: sflag = SR_NEXT; break; case CS_NEXT_WRITE: sflag = SR_NEXT | SR_WRITE; break; case CS_START: sflag = SR_START | SR_WRITE; break; case CS_PARENT: sflag = SR_PARENT | SR_WRITE; break; default: return (__db_panic(dbc->dbp->dbenv, EINVAL)); } if (start == NULL || start->size == 0) FLD_SET(sflag, SR_MIN); if ((ret = __bam_search(dbc, cp->root, start, sflag, level, NULL, ¬_used)) != 0) return (ret); } return (0); } /* * __bam_compact_int -- internal compaction routine. * Called either with a cursor on the main database * or a cursor initialized to the root of an off page duplicate * tree. */ static int __bam_compact_int(dbc, start, stop, factor, spanp, c_data, donep) DBC *dbc; DBT *start, *stop; u_int32_t factor; int *spanp; DB_COMPACT *c_data; int *donep; { BTREE_CURSOR *cp, *ncp; DB *dbp; DBC *ndbc; DB_ENV *dbenv; DB_LOCK nolock; EPG *epg; DB_MPOOLFILE *dbmp; PAGE *pg, *ppg, *npg; db_pgno_t npgno; db_recno_t next_recno; u_int32_t sflag; int check_dups, check_trunc, done, level; int merged, nentry, next_page, pgs_done, ret, t_ret, tdone; #ifdef DEBUG #define CTRACE(dbc, location, t, start, f) do { \ DBT __trace; \ DB_SET_DBT(__trace, t, strlen(t)); \ DEBUG_LWRITE( \ dbc, (dbc)->txn, location, &__trace, start, f) \ } while (0) #define PTRACE(dbc, location, p, start, f) do { \ char __buf[32]; \ (void)snprintf(__buf, \ sizeof(__buf), "pgno: %lu", (u_long)p); \ CTRACE(dbc, location, __buf, start, f); \ } while (0) #else #define CTRACE(dbc, location, t, start, f) #define PTRACE(dbc, location, p, start, f) #endif ndbc = NULL; pg = NULL; npg = NULL; done = 0; tdone = 0; pgs_done = 0; next_recno = 0; next_page = 0; LOCK_INIT(nolock); check_trunc = c_data->compact_truncate != PGNO_INVALID; check_dups = (!F_ISSET(dbc, DBC_OPD) && F_ISSET(dbc->dbp, DB_AM_DUP)) || check_trunc; dbp = dbc->dbp; dbenv = dbp->dbenv; dbmp = dbp->mpf; cp = (BTREE_CURSOR *)dbc->internal; /* Search down the tree for the starting point. */ if ((ret = __bam_csearch(dbc, start, CS_READ | CS_GETRECNO, LEAFLEVEL)) != 0) { /* Its not an error to compact an empty db. */ if (ret == DB_NOTFOUND) ret = 0; done = 1; goto err; } /* * Get the first leaf page. The loop below will change pg so * we clear the stack reference so we don't put a a page twice. */ pg = cp->csp->page; cp->csp->page = NULL; next_recno = cp->recno; next: /* * This is the start of the main compaction loop. There are 3 * parts to the process: * 1) Walk the leaf pages of the tree looking for a page to * process. We do this with read locks. Save the * key from the page and release it. * 2) Set up a cursor stack which will write lock the page * and enough of its ancestors to get the job done. * This could go to the root if we might delete a subtree * or we have record numbers to update. * 3) Loop fetching pages after the above page and move enough * data to fill it. * We exit the loop if we are at the end of the leaf pages, are * about to lock a new subtree (we span) or on error. */ /* Walk the pages looking for something to fill up. */ while ((npgno = NEXT_PGNO(pg)) != PGNO_INVALID) { c_data->compact_pages_examine++; PTRACE(dbc, "Next", PGNO(pg), start, 0); /* If we have fetched the next page, get the new key. */ if (next_page == 1 && dbc->dbtype != DB_RECNO && NUM_ENT(pg) != 0) { if ((ret = __db_ret(dbp, dbc->txn, pg, 0, start, &start->data, &start->ulen)) != 0) goto err; } next_recno += NUM_ENT(pg); if (P_FREESPACE(dbp, pg) > factor || (check_trunc && PGNO(pg) > c_data->compact_truncate)) break; /* * The page does not need more data or to be swapped, * check to see if we want to look at possible duplicate * trees or overflow records and the move on to the next page. */ cp->recno += NUM_ENT(pg); next_page = 1; tdone = pgs_done; PTRACE(dbc, "Dups", PGNO(pg), start, 0); if (check_dups && (ret = __bam_compact_dups( dbc, &pg, factor, 0, c_data, &pgs_done)) != 0) goto err; npgno = NEXT_PGNO(pg); if ((ret = __memp_fput(dbmp, pg, dbc->priority)) != 0) goto err; pg = NULL; /* * If we don't do anything we don't need to hold * the lock on the previous page, so couple always. */ if ((ret = __db_lget(dbc, tdone == pgs_done ? LCK_COUPLE_ALWAYS : LCK_COUPLE, npgno, DB_LOCK_READ, 0, &cp->csp->lock)) != 0) goto err; if ((ret = __memp_fget(dbmp, &npgno, dbc->txn, 0, &pg)) != 0) goto err; } /* * When we get here we have 3 cases: * 1) We've reached the end of the leaf linked list and are done. * 2) A page whose freespace exceeds our target and therefore needs * to have data added to it. * 3) A page that doesn't have too much free space but needs to be * checked for truncation. * In both cases 2 and 3, we need that page's first key or record * number. We may already have it, if not get it here. */ if ((nentry = NUM_ENT(pg)) != 0) { next_page = 0; /* Get a copy of the first recno on the page. */ if (dbc->dbtype == DB_RECNO) { if ((ret = __db_retcopy(dbp->dbenv, start, &cp->recno, sizeof(cp->recno), &start->data, &start->ulen)) != 0) goto err; } else if (start->size == 0 && (ret = __db_ret(dbp, dbc->txn, pg, 0, start, &start->data, &start->ulen)) != 0) goto err; if (npgno == PGNO_INVALID) { /* End of the tree, check its duplicates and exit. */ PTRACE(dbc, "GoDone", PGNO(pg), start, 0); if (check_dups && (ret = __bam_compact_dups(dbc, &pg, factor, 0, c_data, &pgs_done)) != 0) goto err; c_data->compact_pages_examine++; done = 1; goto done; } } /* Release the page so we don't deadlock getting its parent. */ if ((ret = __LPUT(dbc, cp->csp->lock)) != 0) goto err; if ((ret = __memp_fput(dbmp, pg, dbc->priority)) != 0) goto err; BT_STK_CLR(cp); pg = NULL; /* * Setup the cursor stack. There are 3 cases: * 1) the page is empty and will be deleted: nentry == 0. * 2) the next page has the same parent: *spanp == 0. * 3) the next page has a different parent: *spanp == 1. * * We now need to search the tree again, getting a write lock * on the page we are going to merge or delete. We do this by * searching down the tree and locking as much of the subtree * above the page as needed. In the case of a delete we will * find the maximal subtree that can be deleted. In the case * of merge if the current page and the next page are siblings * with the same parent then we only need to lock the parent. * Otherwise *span will be set and we need to search to find the * lowest common ancestor. Dbc will be set to contain the subtree * containing the page to be merged or deleted. Ndbc will contain * the minimal subtree containing that page and its next sibling. * In all cases for DB_RECNO we simplify things and get the whole * tree if we need more than a single parent. */ /* Case 1 -- page is empty. */ if (nentry == 0) { CTRACE(dbc, "Empty", "", start, 0); if (next_page == 1) sflag = CS_NEXT_WRITE; else sflag = CS_DEL; if ((ret = __bam_csearch(dbc, start, sflag, LEAFLEVEL)) != 0) goto err; pg = cp->csp->page; /* Check to see if the page is still empty. */ if (NUM_ENT(pg) != 0) npgno = PGNO(pg); else { npgno = NEXT_PGNO(pg); /* If this is now the root, we are very done. */ if (PGNO(pg) == cp->root) done = 1; else { if ((ret = __bam_dpages(dbc, 0, 0)) != 0) goto err; c_data->compact_pages_free++; goto next_no_release; } } goto next_page; } /* case 3 -- different parents. */ if (*spanp) { CTRACE(dbc, "Span", "", start, 0); if (ndbc == NULL && (ret = __dbc_dup(dbc, &ndbc, 0)) != 0) goto err; ncp = (BTREE_CURSOR *)ndbc->internal; ncp->recno = next_recno; /* * Search the tree looking for the next page after the * current key. For RECNO get the whole stack. * For BTREE the return will contain the stack that * dominates both the current and next pages. */ if ((ret = __bam_csearch(ndbc, start, CS_NEXT_WRITE, 0)) != 0) goto err; if (dbc->dbtype == DB_RECNO) { /* * The record we are looking for may have moved * to the previous page. This page should * be at the beginning of its parent. * If not, then start over. */ if (ncp->csp[-1].indx != 0) { *spanp = 0; goto deleted; } } if ((ret = __memp_dirty(dbp->mpf, &ncp->csp->page, dbc->txn, dbc->priority, 0)) != 0) goto err; PTRACE(dbc, "SDups", PGNO(ncp->csp->page), start, 0); if (check_dups && (ret = __bam_compact_dups(ndbc, &ncp->csp->page, factor, 1, c_data, &pgs_done)) != 0) goto err; /* Check to see if the tree collapsed. */ if (PGNO(ncp->csp->page) == ncp->root) goto done; /* * We need the stacks to be the same height * so that we can merge parents. */ level = LEVEL(ncp->sp->page); sflag = CS_START; if ((ret = __bam_csearch(dbc, start, sflag, level)) != 0) goto err; pg = cp->csp->page; *spanp = 0; /* * The page may have emptied while we waited for the lock. * Reset npgno so we re-get this page when we go back to the * top. */ if (NUM_ENT(pg) == 0) { npgno = PGNO(pg); goto next_page; } if (check_trunc && PGNO(pg) > c_data->compact_truncate) { pgs_done++; /* Get a fresh low numbered page. */ if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0) goto err1; } if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page, dbc->txn, dbc->priority, 0)) != 0) goto err1; pg = cp->csp->page; npgno = NEXT_PGNO(pg); PTRACE(dbc, "SDups", PGNO(pg), start, 0); if (check_dups && (ret = __bam_compact_dups(dbc, &cp->csp->page, factor, 1, c_data, &pgs_done)) != 0) goto err1; /* * We may have dropped our locks, check again * to see if we still need to fill this page and * we are in a spanning situation. */ if (P_FREESPACE(dbp, pg) <= factor || cp->csp[-1].indx != NUM_ENT(cp->csp[-1].page) - 1) goto next_page; /* * Try to move things into a single parent. */ merged = 0; for (epg = cp->sp; epg != cp->csp; epg++) { if (PGNO(epg->page) == cp->root) continue; PTRACE(dbc, "PMerge", PGNO(epg->page), start, 0); if ((ret = __bam_merge_internal(dbc, ndbc, LEVEL(epg->page), c_data, &merged)) != 0) goto err1; if (merged) break; } /* If we merged the parent, then we nolonger span. */ if (merged) { pgs_done++; if (cp->csp->page == NULL) goto deleted; npgno = PGNO(pg); goto next_page; } PTRACE(dbc, "SMerge", PGNO(cp->csp->page), start, 0); npgno = NEXT_PGNO(ncp->csp->page); if ((ret = __bam_merge(dbc, ndbc, factor, stop, c_data, &done)) != 0) goto err1; pgs_done++; /* * __bam_merge could have freed our stack if it * deleted a page possibly collapsing the tree. */ if (cp->csp->page == NULL) goto deleted; cp->recno += NUM_ENT(pg); /* If we did not bump to the next page something did not fit. */ if (npgno != NEXT_PGNO(pg)) { npgno = NEXT_PGNO(pg); goto next_page; } } else { /* Case 2 -- same parents. */ CTRACE(dbc, "Sib", "", start, 0); if ((ret = __bam_csearch(dbc, start, CS_PARENT, LEAFLEVEL)) != 0) goto err; pg = cp->csp->page; npgno = PGNO(pg); /* We now have a write lock, recheck the page. */ if ((nentry = NUM_ENT(pg)) == 0) goto next_page; if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page, dbc->txn, dbc->priority, 0)) != 0) goto err; pg = cp->csp->page; npgno = NEXT_PGNO(pg); /* Check duplicate trees, we have a write lock on the page. */ PTRACE(dbc, "SibDup", PGNO(pg), start, 0); if (check_dups && (ret = __bam_compact_dups(dbc, &cp->csp->page, factor, 1, c_data, &pgs_done)) != 0) goto err1; pg = cp->csp->page; /* Check to see if the tree collapsed. */ if (PGNO(pg) == cp->root) goto err1; DB_ASSERT(dbenv, cp->csp - cp->sp == 1); if (check_trunc && PGNO(pg) > c_data->compact_truncate) { pgs_done++; /* Get a fresh low numbered page. */ if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0) goto err1; } /* After re-locking check to see if we still need to fill. */ if (P_FREESPACE(dbp, pg) <= factor) goto next_page; /* If they have the same parent, just dup the cursor */ if (ndbc != NULL && (ret = __dbc_close(ndbc)) != 0) goto err1; if ((ret = __dbc_dup(dbc, &ndbc, DB_POSITION)) != 0) goto err1; ncp = (BTREE_CURSOR *)ndbc->internal; /* * ncp->recno needs to have the recno of the next page. * Bump it by the number of records on the current page. */ ncp->recno += NUM_ENT(pg); } /* Fetch pages until we fill this one. */ while (!done && npgno != PGNO_INVALID && P_FREESPACE(dbp, pg) > factor && c_data->compact_pages != 0) { /* * If our current position is the last one on a parent * page, then we are about to merge across different * internal nodes. Thus, we need to lock higher up * in the tree. We will exit the routine and commit * what we have done so far. Set spanp so we know * we are in this case when we come back. */ if (cp->csp[-1].indx == NUM_ENT(cp->csp[-1].page) - 1) { *spanp = 1; npgno = PGNO(pg); next_recno = cp->recno; goto next_page; } /* Lock and get the next page. */ if ((ret = __db_lget(dbc, LCK_COUPLE, npgno, DB_LOCK_WRITE, 0, &ncp->lock)) != 0) goto err1; if ((ret = __memp_fget(dbmp, &npgno, dbc->txn, DB_MPOOL_DIRTY, &npg)) != 0) goto err1; /* Fix up the next page cursor with its parent node. */ if ((ret = __memp_fget(dbmp, &PGNO(cp->csp[-1].page), dbc->txn, 0, &ppg)) != 0) goto err1; BT_STK_PUSH(dbenv, ncp, ppg, cp->csp[-1].indx + 1, nolock, DB_LOCK_NG, ret); if (ret != 0) goto err1; /* Put the page on the stack. */ BT_STK_ENTER(dbenv, ncp, npg, 0, ncp->lock, DB_LOCK_WRITE, ret); LOCK_INIT(ncp->lock); npg = NULL; c_data->compact_pages_examine++; PTRACE(dbc, "MDups", PGNO(ncp->csp->page), start, 0); if (check_dups && (ret = __bam_compact_dups(ndbc, &ncp->csp->page, factor, 1, c_data, &pgs_done)) != 0) goto err1; npgno = NEXT_PGNO(ncp->csp->page); /* * Merge the pages. This will either free the next * page or just update its parent pointer. */ PTRACE(dbc, "Merge", PGNO(cp->csp->page), start, 0); if ((ret = __bam_merge(dbc, ndbc, factor, stop, c_data, &done)) != 0) goto err1; pgs_done++; /* * __bam_merge could have freed our stack if it * deleted a page possibly collapsing the tree. */ if (cp->csp->page == NULL) goto deleted; /* If we did not bump to the next page something did not fit. */ if (npgno != NEXT_PGNO(pg)) break; } /* Bottom of the main loop. Move to the next page. */ npgno = NEXT_PGNO(pg); cp->recno += NUM_ENT(pg); next_recno = cp->recno; next_page: if ((ret = __bam_stkrel(dbc, pgs_done == 0 ? STK_NOLOCK : 0)) != 0) goto err1; if (ndbc != NULL && (ret = __bam_stkrel(ndbc, pgs_done == 0 ? STK_NOLOCK : 0)) != 0) goto err1; next_no_release: pg = NULL; if (npgno == PGNO_INVALID || c_data->compact_pages == 0) done = 1; if (!done) { /* * If we are at the end of this parent commit the * transaction so we don't tie things up. */ if (pgs_done != 0 && *spanp) { deleted: if (((ret = __bam_stkrel(ndbc, 0)) != 0 || (ret = __dbc_close(ndbc)) != 0)) goto err; *donep = 0; return (0); } /* Reget the next page to look at. */ cp->recno = next_recno; if ((ret = __db_lget(dbc, pgs_done ? LCK_COUPLE_ALWAYS : LCK_COUPLE, npgno, DB_LOCK_READ, 0, &cp->csp->lock)) != 0 || (ret = __memp_fget(dbmp, &npgno, dbc->txn, 0, &pg)) != 0) goto err; next_page = 1; goto next; } done: if (0) { /* We come here if pg is the same as cp->csp->page. */ err1: pg = NULL; } err: if (dbc != NULL && (t_ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0 && ret == 0) ret = t_ret; if (ndbc != NULL) { if ((t_ret = __bam_stkrel(ndbc, STK_CLRDBC)) != 0 && ret == 0) ret = t_ret; else if ((t_ret = __dbc_close(ndbc)) != 0 && ret == 0) ret = t_ret; } if (pg != NULL && (t_ret = __memp_fput(dbmp, pg, dbc->priority) != 0) && ret == 0) ret = t_ret; if (npg != NULL && (t_ret = __memp_fput(dbmp, npg, dbc->priority) != 0) && ret == 0) ret = t_ret; *donep = done; return (ret); } /* * __bam_merge -- do actual merging of leaf pages. */ static int __bam_merge(dbc, ndbc, factor, stop, c_data, donep) DBC *dbc, *ndbc; u_int32_t factor; DBT *stop; DB_COMPACT *c_data; int *donep; { BTREE_CURSOR *cp, *ncp; BTREE *t; DB *dbp; PAGE *pg, *npg; db_indx_t adj, nent; db_recno_t recno; int cmp, ret; int (*func) __P((DB *, const DBT *, const DBT *)); dbp = dbc->dbp; t = dbp->bt_internal; cp = (BTREE_CURSOR *)dbc->internal; ncp = (BTREE_CURSOR *)ndbc->internal; pg = cp->csp->page; npg = ncp->csp->page; nent = NUM_ENT(npg); /* If the page is empty just throw it away. */ if (nent == 0) goto free; adj = TYPE(npg) == P_LBTREE ? P_INDX : O_INDX; /* Find if the stopping point is on this page. */ if (stop != NULL && stop->size != 0) { if (dbc->dbtype == DB_RECNO) { if ((ret = __ram_getno(dbc, stop, &recno, 0)) != 0) goto err; if (ncp->recno > recno) { *donep = 1; if (cp->recno > recno) goto done; } } else { func = TYPE(npg) == P_LBTREE ? (dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare) : t->bt_compare; if ((ret = __bam_cmp(dbp, dbc->txn, stop, npg, nent - adj, func, &cmp)) != 0) goto err; /* * If the last record is beyond the stopping * point we are done after this page. If the * first record is beyond the stopping point * don't even bother with this page. */ if (cmp <= 0) { *donep = 1; if ((ret = __bam_cmp(dbp, dbc->txn, stop, npg, 0, func, &cmp)) != 0) goto err; if (cmp <= 0) goto done; } } } /* * If there is too much data then just move records one at a time. * Otherwise copy the data space over and fix up the index table. * If we are on the left most child we will effect our parent's * index entry so we call merge_records to figure out key sizes. */ if ((dbc->dbtype == DB_BTREE && ncp->csp[-1].indx == 0 && ncp->csp[-1].entries != 1) || (int)(P_FREESPACE(dbp, pg) - ((dbp->pgsize - P_OVERHEAD(dbp)) - P_FREESPACE(dbp, npg))) < (int)factor) ret = __bam_merge_records(dbc, ndbc, factor, c_data); else free: ret = __bam_merge_pages(dbc, ndbc, c_data); done: err: return (ret); } static int __bam_merge_records(dbc, ndbc, factor, c_data) DBC *dbc, *ndbc; u_int32_t factor; DB_COMPACT *c_data; { BINTERNAL *bi; BKEYDATA *bk, *tmp_bk; BTREE *t; BTREE_CURSOR *cp, *ncp; DB *dbp; DBT a, b, data, hdr; DB_ENV *dbenv; EPG *epg; PAGE *pg, *npg; db_indx_t adj, indx, nent, *ninp, pind; int32_t adjust; u_int32_t freespace, nksize, pfree, size; int first_dup, is_dup, next_dup, n_ok, ret; size_t (*func) __P((DB *, const DBT *, const DBT *)); dbp = dbc->dbp; dbenv = dbp->dbenv; t = dbp->bt_internal; cp = (BTREE_CURSOR *)dbc->internal; ncp = (BTREE_CURSOR *)ndbc->internal; pg = cp->csp->page; npg = ncp->csp->page; memset(&hdr, 0, sizeof(hdr)); pind = NUM_ENT(pg); n_ok = 0; adjust = 0; ret = 0; nent = NUM_ENT(npg); DB_ASSERT(dbenv, nent != 0); /* See if we want to swap out this page. */ if (c_data->compact_truncate != PGNO_INVALID && PGNO(npg) > c_data->compact_truncate) { /* Get a fresh low numbered page. */ if ((ret = __bam_truncate_page(ndbc, &npg, 1)) != 0) goto err; } ninp = P_INP(dbp, npg); /* * pg is the page that is being filled, it is in the stack in cp. * npg is the next page, it is in the stack in ncp. */ freespace = P_FREESPACE(dbp, pg); adj = TYPE(npg) == P_LBTREE ? P_INDX : O_INDX; /* * Loop through the records and find the stopping point. */ for (indx = 0; indx < nent; indx += adj) { bk = GET_BKEYDATA(dbp, npg, indx); /* Size of the key. */ size = BITEM_PSIZE(bk); /* Size of the data. */ if (TYPE(pg) == P_LBTREE) size += BITEM_PSIZE(GET_BKEYDATA(dbp, npg, indx + 1)); /* * If we are at a duplicate set, skip ahead to see and * get the total size for the group. */ n_ok = adj; if (TYPE(pg) == P_LBTREE && indx < nent - adj && ninp[indx] == ninp[indx + adj]) { do { /* Size of index for key reference. */ size += sizeof(db_indx_t); n_ok++; /* Size of data item. */ size += BITEM_PSIZE( GET_BKEYDATA(dbp, npg, indx + n_ok)); n_ok++; } while (indx + n_ok < nent && ninp[indx] == ninp[indx + n_ok]); } /* if the next set will not fit on the page we are done. */ if (freespace < size) break; /* * Otherwise figure out if we are past the goal and if * adding this set will put us closer to the goal than * we are now. */ if ((freespace - size) < factor) { if (freespace - factor > factor - (freespace - size)) indx += n_ok; break; } freespace -= size; indx += n_ok - adj; } if (indx == 0) goto done; if (TYPE(pg) != P_LBTREE && TYPE(pg) != P_LDUP) { if (indx == nent) return (__bam_merge_pages(dbc, ndbc, c_data)); goto no_check; } /* * We need to update npg's parent key. Avoid creating a new key * that will be too big. Get what space will be available on the * parents. Then if there will not be room for this key, see if * prefix compression will make it work, if not backup till we * find something that will. (Needless to say, this is a very * unlikely event.) If we are deleting this page then we will * need to propagate the next key to our grand parents, so we * see if that will fit. */ pfree = dbp->pgsize; for (epg = &ncp->csp[-1]; epg >= ncp->sp; epg--) if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) { bi = GET_BINTERNAL(dbp, epg->page, epg->indx); /* Add back in the key we will be deleting. */ freespace += BINTERNAL_PSIZE(bi->len); if (freespace < pfree) pfree = freespace; if (epg->indx != 0) break; } /* * If we are at the end, we will delete this page. We need to * check the next parent key only if we are the leftmost page and * will therefore have to propagate the key up the tree. */ if (indx == nent) { if (ncp->csp[-1].indx != 0 || ncp->csp[-1].entries == 1 || BINTERNAL_PSIZE(GET_BINTERNAL(dbp, ncp->csp[-1].page, 1)->len) <= pfree) return (__bam_merge_pages(dbc, ndbc, c_data)); indx -= adj; } bk = GET_BKEYDATA(dbp, npg, indx); if (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) { if (F_ISSET(dbc, DBC_OPD)) { if (dbp->dup_compare == __bam_defcmp) func = __bam_defpfx; else func = NULL; } else func = t->bt_prefix; } else func = NULL; /* Skip to the beginning of a duplicate set. */ while (indx != 0 && ninp[indx] == ninp[indx - adj]) indx -= adj; while (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) { if (B_TYPE(bk->type) != B_KEYDATA) goto noprefix; /* * Figure out if we can truncate this key. * Code borrowed from bt_split.c */ if (func == NULL) goto noprefix; tmp_bk = GET_BKEYDATA(dbp, npg, indx - adj); if (B_TYPE(tmp_bk->type) != B_KEYDATA) goto noprefix; memset(&a, 0, sizeof(a)); a.size = tmp_bk->len; a.data = tmp_bk->data; memset(&b, 0, sizeof(b)); b.size = bk->len; b.data = bk->data; nksize = (u_int32_t)func(dbp, &a, &b); if (BINTERNAL_PSIZE(nksize) < pfree) break; noprefix: /* Skip to the beginning of a duplicate set. */ do { indx -= adj; } while (indx != 0 && ninp[indx] == ninp[indx - adj]); bk = GET_BKEYDATA(dbp, npg, indx); } if (indx == 0) goto done; DB_ASSERT(dbenv, indx <= nent); /* Loop through the records and move them from npg to pg. */ no_check: is_dup = first_dup = next_dup = 0; if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page, dbc->txn, dbc->priority, 0)) != 0 || (ret = __memp_dirty(dbp->mpf, &ncp->csp->page, dbc->txn, dbc->priority, 0)) != 0) goto err; pg = cp->csp->page; npg = ncp->csp->page; ninp = P_INP(dbp, npg); do { bk = GET_BKEYDATA(dbp, npg, 0); /* Figure out if we are in a duplicate group or not. */ if ((NUM_ENT(npg) % 2) == 0) { if (NUM_ENT(npg) > 2 && ninp[0] == ninp[2]) { if (!is_dup) { first_dup = 1; is_dup = 1; } else first_dup = 0; next_dup = 1; } else if (next_dup) { is_dup = 1; first_dup = 0; next_dup = 0; } else is_dup = 0; } if (is_dup && !first_dup && (pind % 2) == 0) { /* Duplicate key. */ if ((ret = __bam_adjindx(dbc, pg, pind, pind - P_INDX, 1)) != 0) goto err; if (!next_dup) is_dup = 0; } else switch (B_TYPE(bk->type)) { case B_KEYDATA: hdr.data = bk; hdr.size = SSZA(BKEYDATA, data); data.size = bk->len; data.data = bk->data; if ((ret = __db_pitem(dbc, pg, pind, BKEYDATA_SIZE(bk->len), &hdr, &data)) != 0) goto err; break; case B_OVERFLOW: case B_DUPLICATE: data.size = BOVERFLOW_SIZE; data.data = bk; if ((ret = __db_pitem(dbc, pg, pind, BOVERFLOW_SIZE, &data, NULL)) != 0) goto err; break; default: __db_errx(dbenv, "Unknown record format, page %lu, indx 0", (u_long)PGNO(pg)); ret = EINVAL; goto err; } pind++; if (next_dup && (NUM_ENT(npg) % 2) == 0) { if ((ret = __bam_adjindx(ndbc, npg, 0, O_INDX, 0)) != 0) goto err; } else { if ((ret = __db_ditem(ndbc, npg, 0, BITEM_SIZE(bk))) != 0) goto err; } adjust++; } while (--indx != 0); DB_ASSERT(dbenv, NUM_ENT(npg) != 0); if (adjust != 0 && (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))) { DB_ASSERT(dbenv, cp->csp - cp->sp == ncp->csp - ncp->sp); if (TYPE(pg) == P_LBTREE) adjust /= P_INDX; if ((ret = __bam_adjust(ndbc, -adjust)) != 0) goto err; if ((ret = __bam_adjust(dbc, adjust)) != 0) goto err; } /* Update parent with new key. */ if (ndbc->dbtype == DB_BTREE && (ret = __bam_pupdate(ndbc, pg)) != 0) goto err; done: ret = __bam_stkrel(ndbc, STK_CLRDBC); err: return (ret); } static int __bam_merge_pages(dbc, ndbc, c_data) DBC *dbc, *ndbc; DB_COMPACT *c_data; { BTREE_CURSOR *cp, *ncp; DB *dbp; DBT data, hdr, ind; DB_MPOOLFILE *dbmp; PAGE *pg, *npg; db_indx_t nent, *ninp, *pinp; db_pgno_t ppgno; u_int8_t *bp; u_int32_t len; int i, level, ret; COMPQUIET(ppgno, PGNO_INVALID); dbp = dbc->dbp; dbmp = dbp->mpf; cp = (BTREE_CURSOR *)dbc->internal; ncp = (BTREE_CURSOR *)ndbc->internal; pg = cp->csp->page; npg = ncp->csp->page; memset(&hdr, 0, sizeof(hdr)); nent = NUM_ENT(npg); /* If the page is empty just throw it away. */ if (nent == 0) goto free; if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page, dbc->txn, dbc->priority, 0)) != 0 || (ret = __memp_dirty(dbp->mpf, &ncp->csp->page, dbc->txn, dbc->priority, 0)) != 0) goto err; pg = cp->csp->page; npg = ncp->csp->page; DB_ASSERT(dbp->dbenv, nent == NUM_ENT(npg)); /* Bulk copy the data to the new page. */ len = dbp->pgsize - HOFFSET(npg); if (DBC_LOGGING(dbc)) { data.data = (u_int8_t *)npg + HOFFSET(npg); data.size = len; ind.data = P_INP(dbp, npg); ind.size = NUM_ENT(npg) * sizeof(db_indx_t); if ((ret = __bam_merge_log(dbp, dbc->txn, &LSN(pg), 0, PGNO(pg), &LSN(pg), PGNO(npg), &LSN(npg), NULL, &data, &ind)) != 0) goto err; } else LSN_NOT_LOGGED(LSN(pg)); LSN(npg) = LSN(pg); bp = (u_int8_t *)pg + HOFFSET(pg) - len; memcpy(bp, (u_int8_t *)npg + HOFFSET(npg), len); /* Copy index table offset by what was there already. */ pinp = P_INP(dbp, pg) + NUM_ENT(pg); ninp = P_INP(dbp, npg); for (i = 0; i < NUM_ENT(npg); i++) *pinp++ = *ninp++ - (dbp->pgsize - HOFFSET(pg)); HOFFSET(pg) -= len; NUM_ENT(pg) += i; NUM_ENT(npg) = 0; HOFFSET(npg) += len; if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD)) { DB_ASSERT(dbp->dbenv, cp->csp - cp->sp == ncp->csp - ncp->sp); if (TYPE(pg) == P_LBTREE) i /= P_INDX; if ((ret = __bam_adjust(ndbc, -i)) != 0) goto err; if ((ret = __bam_adjust(dbc, i)) != 0) goto err; } free: /* * __bam_dpages may decide to collapse the tree. * This can happen if we have the root and there * are exactly 2 pointers left in it. * If it can collapse the tree we must free the other * stack since it will nolonger be valid. This * must be done before hand because we cannot * hold a page pinned if it might be truncated. */ if (PGNO(ncp->sp->page) == ncp->root && NUM_ENT(ncp->sp->page) == 2) { if ((ret = __bam_stkrel(dbc, STK_CLRDBC | STK_PGONLY)) != 0) goto err; level = LEVEL(ncp->sp->page); ppgno = PGNO(ncp->csp[-1].page); } else level = 0; if (c_data->compact_truncate > PGNO(npg)) c_data->compact_truncate--; if ((ret = __bam_dpages(ndbc, 0, ndbc->dbtype == DB_RECNO ? 0 : 1)) != 0) goto err; npg = NULL; c_data->compact_pages_free++; c_data->compact_pages--; if (level != 0) { if ((ret = __memp_fget(dbmp, &ncp->root, dbc->txn, 0, &npg)) != 0) goto err; if (level == LEVEL(npg)) level = 0; if ((ret = __memp_fput(dbmp, npg, dbc->priority)) != 0) goto err; npg = NULL; if (level != 0) { c_data->compact_levels++; c_data->compact_pages_free++; if (c_data->compact_truncate > ppgno) c_data->compact_truncate--; if (c_data->compact_pages != 0) c_data->compact_pages--; } } err: return (ret); } /* * __bam_merge_internal -- * Merge internal nodes of the tree. */ static int __bam_merge_internal(dbc, ndbc, level, c_data, merged) DBC *dbc, *ndbc; int level; DB_COMPACT *c_data; int *merged; { BINTERNAL bi, *bip, *fip; BTREE_CURSOR *cp, *ncp; DB *dbp; DBT data, hdr; DB_MPOOLFILE *dbmp; EPG *epg, *save_csp, *nsave_csp; PAGE *pg, *npg; RINTERNAL *rk; db_indx_t indx, pind; db_pgno_t ppgno; int32_t trecs; u_int16_t size; u_int32_t freespace, pfree; int ret; COMPQUIET(bip, NULL); COMPQUIET(ppgno, PGNO_INVALID); /* * ndbc will contain the the dominating parent of the subtree. * dbc will have the tree containing the left child. * * The stacks descend to the leaf level. * If this is a recno tree then both stacks will start at the root. */ dbp = dbc->dbp; dbmp = dbp->mpf; cp = (BTREE_CURSOR *)dbc->internal; ncp = (BTREE_CURSOR *)ndbc->internal; *merged = 0; ret = 0; /* * Set the stacks to the level requested. * Save the old value to restore when we exit. */ save_csp = cp->csp; cp->csp = &cp->csp[-level + 1]; pg = cp->csp->page; pind = NUM_ENT(pg); nsave_csp = ncp->csp; ncp->csp = &ncp->csp[-level + 1]; npg = ncp->csp->page; indx = NUM_ENT(npg); /* * The caller may have two stacks that include common ancestors, we * check here for convenience. */ if (npg == pg) goto done; if ((ret = __memp_dirty(dbmp, &cp->csp->page, dbc->txn, dbc->priority, 0)) != 0 || (ret = __memp_dirty(dbmp, &ncp->csp->page, dbc->txn, dbc->priority, 0)) != 0) goto err; pg = cp->csp->page; npg = ncp->csp->page; if (TYPE(pg) == P_IBTREE) { /* * Check for overflow keys on both pages while we have * them locked. */ if ((ret = __bam_truncate_internal_overflow(dbc, pg, c_data)) != 0) goto err; if ((ret = __bam_truncate_internal_overflow(dbc, npg, c_data)) != 0) goto err; } /* * If we are about to move data off the left most page of an * internal node we will need to update its parents, make sure there * will be room for the new key on all the parents in the stack. * If not, move less data. */ fip = NULL; if (TYPE(pg) == P_IBTREE) { /* See where we run out of space. */ freespace = P_FREESPACE(dbp, pg); /* * The leftmost key of an internal page is not accurate. * Go up the tree to find a non-leftmost parent. */ epg = ncp->csp; while (--epg >= ncp->sp && epg->indx == 0) continue; fip = bip = GET_BINTERNAL(dbp, epg->page, epg->indx); epg = ncp->csp; for (indx = 0;;) { size = BINTERNAL_PSIZE(bip->len); if (size > freespace) break; freespace -= size; if (++indx >= NUM_ENT(npg)) break; bip = GET_BINTERNAL(dbp, npg, indx); } /* See if we are deleting the page and we are not left most. */ if (indx == NUM_ENT(npg) && epg[-1].indx != 0) goto fits; pfree = dbp->pgsize; for (epg--; epg >= ncp->sp; epg--) if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) { bip = GET_BINTERNAL(dbp, epg->page, epg->indx); /* Add back in the key we will be deleting. */ freespace += BINTERNAL_PSIZE(bip->len); if (freespace < pfree) pfree = freespace; if (epg->indx != 0) break; } epg = ncp->csp; /* If we are at the end of the page we will delete it. */ if (indx == NUM_ENT(npg)) { if (NUM_ENT(epg[-1].page) == 1) goto fits; bip = GET_BINTERNAL(dbp, epg[-1].page, epg[-1].indx + 1); } else bip = GET_BINTERNAL(dbp, npg, indx); /* Back up until we have a key that fits. */ while (indx != 0 && BINTERNAL_PSIZE(bip->len) > pfree) { indx--; bip = GET_BINTERNAL(dbp, npg, indx); } if (indx == 0) goto done; } fits: memset(&bi, 0, sizeof(bi)); memset(&hdr, 0, sizeof(hdr)); memset(&data, 0, sizeof(data)); trecs = 0; /* * Copy data between internal nodes till one is full * or the other is empty. */ do { if (dbc->dbtype == DB_BTREE) { bip = GET_BINTERNAL(dbp, npg, 0); size = fip == NULL ? BINTERNAL_SIZE(bip->len) : BINTERNAL_SIZE(fip->len); if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t)) break; if (fip == NULL) { data.size = bip->len; data.data = bip->data; } else { data.size = fip->len; data.data = fip->data; } bi.len = data.size; B_TSET(bi.type, bip->type); bi.pgno = bip->pgno; bi.nrecs = bip->nrecs; hdr.data = &bi; hdr.size = SSZA(BINTERNAL, data); if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD)) trecs += (int32_t)bip->nrecs; } else { rk = GET_RINTERNAL(dbp, npg, 0); size = RINTERNAL_SIZE; if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t)) break; hdr.data = rk; hdr.size = size; trecs += (int32_t)rk->nrecs; } if ((ret = __db_pitem(dbc, pg, pind, size, &hdr, &data)) != 0) goto err; pind++; if (fip != NULL) { /* reset size to be for the record being deleted. */ size = BINTERNAL_SIZE(bip->len); fip = NULL; } if ((ret = __db_ditem(ndbc, npg, 0, size)) != 0) goto err; *merged = 1; } while (--indx != 0); if (c_data->compact_truncate != PGNO_INVALID && PGNO(pg) > c_data->compact_truncate && cp->csp != cp->sp) { if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0) goto err; } if (NUM_ENT(npg) != 0 && c_data->compact_truncate != PGNO_INVALID && PGNO(npg) > c_data->compact_truncate && ncp->csp != ncp->sp) { if ((ret = __bam_truncate_page(ndbc, &npg, 1)) != 0) goto err; } if (!*merged) goto done; if (trecs != 0) { DB_ASSERT(dbp->dbenv, cp->csp - cp->sp == ncp->csp - ncp->sp); cp->csp--; if ((ret = __bam_adjust(dbc, trecs)) != 0) goto err; ncp->csp--; if ((ret = __bam_adjust(ndbc, -trecs)) != 0) goto err; ncp->csp++; } cp->csp = save_csp; /* * Either we emptied the page or we need to update its * parent to reflect the first page we now point to. * First get rid of the bottom of the stack, * bam_dpages will clear the stack. We can drop * the locks on those pages as we have not done * anything to them. */ do { if ((ret = __memp_fput(dbmp, nsave_csp->page, dbc->priority)) != 0) goto err; if ((ret = __LPUT(dbc, nsave_csp->lock)) != 0) goto err; nsave_csp--; } while (nsave_csp != ncp->csp); if (NUM_ENT(npg) == 0) { /* * __bam_dpages may decide to collapse the tree * so we need to free our other stack. The tree * will change in hight and our stack will nolonger * be valid. */ if (PGNO(ncp->sp->page) == ncp->root && NUM_ENT(ncp->sp->page) == 2) { if ((ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0) goto err; level = LEVEL(ncp->sp->page); ppgno = PGNO(ncp->csp[-1].page); } else level = 0; if (c_data->compact_truncate > PGNO(npg)) c_data->compact_truncate--; ret = __bam_dpages(ndbc, 0, ndbc->dbtype == DB_RECNO ? 0 : 1); c_data->compact_pages_free++; if (ret == 0 && level != 0) { if ((ret = __memp_fget(dbmp, &ncp->root, dbc->txn, 0, &npg)) != 0) goto err; if (level == LEVEL(npg)) level = 0; if ((ret = __memp_fput(dbmp, npg, dbc->priority)) != 0) goto err; npg = NULL; if (level != 0) { c_data->compact_levels++; c_data->compact_pages_free++; if (c_data->compact_truncate > ppgno) c_data->compact_truncate--; if (c_data->compact_pages != 0) c_data->compact_pages--; } } } else ret = __bam_pupdate(ndbc, npg); return (ret); done: err: cp->csp = save_csp; ncp->csp = nsave_csp; return (ret); } /* * __bam_compact_dups -- try to compress off page dup trees. * We may or may not have a write lock on this page. */ static int __bam_compact_dups(dbc, ppg, factor, have_lock, c_data, donep) DBC *dbc; PAGE **ppg; u_int32_t factor; int have_lock; DB_COMPACT *c_data; int *donep; { BOVERFLOW *bo; BTREE_CURSOR *cp; DB *dbp; DBC *opd; DBT start; DB_ENV *dbenv; DB_MPOOLFILE *dbmp; PAGE *dpg, *pg; db_indx_t i; int done, level, ret, span, t_ret; span = 0; ret = 0; opd = NULL; dbp = dbc->dbp; dbenv = dbp->dbenv; dbmp = dbp->mpf; cp = (BTREE_CURSOR *)dbc->internal; pg = *ppg; for (i = 0; i < NUM_ENT(pg); i++) { bo = GET_BOVERFLOW(dbp, pg, i); if (B_TYPE(bo->type) == B_KEYDATA) continue; c_data->compact_pages_examine++; if (bo->pgno > c_data->compact_truncate) { (*donep)++; if (!have_lock) { if ((ret = __db_lget(dbc, 0, PGNO(pg), DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0) goto err; have_lock = 1; if ((ret = __memp_dirty(dbp->mpf, ppg, dbc->txn, dbc->priority, 0)) != 0) goto err; pg = *ppg; } if ((ret = __bam_truncate_root_page(dbc, pg, i, c_data)) != 0) goto err; /* Just in case it should move. Could it? */ bo = GET_BOVERFLOW(dbp, pg, i); } if (B_TYPE(bo->type) == B_OVERFLOW) { if ((ret = __bam_truncate_overflow(dbc, bo->pgno, have_lock ? PGNO_INVALID : PGNO(pg), c_data)) != 0) goto err; (*donep)++; continue; } /* * Take a peek at the root. If it's a leaf then * there is no tree here, avoid all the trouble. */ if ((ret = __memp_fget(dbmp, &bo->pgno, dbc->txn, 0, &dpg)) != 0) goto err; level = dpg->level; if ((ret = __memp_fput(dbmp, dpg, dbc->priority)) != 0) goto err; if (level == LEAFLEVEL) continue; if ((ret = __dbc_newopd(dbc, bo->pgno, NULL, &opd)) != 0) return (ret); if (!have_lock) { if ((ret = __db_lget(dbc, 0, PGNO(pg), DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0) goto err; have_lock = 1; if ((ret = __memp_dirty(dbp->mpf, ppg, dbc->txn, dbc->priority, 0)) != 0) goto err; pg = *ppg; } (*donep)++; memset(&start, 0, sizeof(start)); do { if ((ret = __bam_compact_int(opd, &start, NULL, factor, &span, c_data, &done)) != 0) break; } while (!done); if (start.data != NULL) __os_free(dbenv, start.data); if (ret != 0) goto err; ret = __dbc_close(opd); opd = NULL; if (ret != 0) goto err; } err: if (opd != NULL && (t_ret = __dbc_close(opd)) != 0 && ret == 0) ret = t_ret; return (ret); } /* * __bam_truncate_page -- swap a page with a lower numbered page. * The cusor has a stack which includes at least the * immediate parent of this page. */ static int __bam_truncate_page(dbc, pgp, update_parent) DBC *dbc; PAGE **pgp; int update_parent; { BTREE_CURSOR *cp; DB *dbp; DBT data, hdr, ind; DB_LSN lsn; EPG *epg; PAGE *newpage; db_pgno_t newpgno, *pgnop; int ret; dbp = dbc->dbp; /* * We want to free a page that lives in the part of the file that * can be truncated, so we're going to move it onto a free page * that is in the part of the file that need not be truncated. * Since the freelist is ordered now, we can simply call __db_new * which will grab the first element off the freelist; we know this * is the lowest numbered free page. */ if ((ret = __db_new(dbc, P_DONTEXTEND | TYPE(*pgp), &newpage)) != 0) return (ret); /* * If newpage is null then __db_new would have had to allocate * a new page from the filesystem, so there is no reason * to continue this action. */ if (newpage == NULL) return (0); /* * It is possible that a higher page is allocated if other threads * are allocating at the same time, if so, just put it back. */ if (PGNO(newpage) > PGNO(*pgp)) { /* Its unfortunate but you can't just free a new overflow. */ if (TYPE(newpage) == P_OVERFLOW) OV_LEN(newpage) = 0; return (__db_free(dbc, newpage)); } if ((ret = __memp_dirty(dbp->mpf, &newpage, dbc->txn, dbc->priority, 0)) != 0) goto err; /* Log if necessary. */ if (DBC_LOGGING(dbc)) { hdr.data = *pgp; hdr.size = P_OVERHEAD(dbp); if (TYPE(*pgp) == P_OVERFLOW) { data.data = (u_int8_t *)*pgp + P_OVERHEAD(dbp); data.size = OV_LEN(*pgp); ind.size = 0; } else { data.data = (u_int8_t *)*pgp + HOFFSET(*pgp); data.size = dbp->pgsize - HOFFSET(*pgp); ind.data = P_INP(dbp, *pgp); ind.size = NUM_ENT(*pgp) * sizeof(db_indx_t); } if ((ret = __bam_merge_log(dbp, dbc->txn, &LSN(newpage), 0, PGNO(newpage), &LSN(newpage), PGNO(*pgp), &LSN(*pgp), &hdr, &data, &ind)) != 0) goto err; } else LSN_NOT_LOGGED(LSN(newpage)); newpgno = PGNO(newpage); lsn = LSN(newpage); memcpy(newpage, *pgp, dbp->pgsize); PGNO(newpage) = newpgno; LSN(newpage) = lsn; /* Empty the old page. */ if ((ret = __memp_dirty(dbp->mpf, pgp, dbc->txn, dbc->priority, 0)) != 0) goto err; if (TYPE(*pgp) == P_OVERFLOW) OV_LEN(*pgp) = 0; else { HOFFSET(*pgp) = dbp->pgsize; NUM_ENT(*pgp) = 0; } LSN(*pgp) = lsn; /* Update siblings. */ switch (TYPE(newpage)) { case P_OVERFLOW: case P_LBTREE: case P_LRECNO: case P_LDUP: if (NEXT_PGNO(newpage) == PGNO_INVALID && PREV_PGNO(newpage) == PGNO_INVALID) break; if ((ret = __bam_relink(dbc, *pgp, PGNO(newpage))) != 0) goto err; break; default: break; } cp = (BTREE_CURSOR*)dbc->internal; /* * Now, if we free this page, it will get truncated, when we free * all the pages after it in the file. */ ret = __db_free(dbc, *pgp); /* db_free always puts the page. */ *pgp = newpage; if (ret != 0) return (ret); if (!update_parent) goto done; /* Update the parent. */ epg = &cp->csp[-1]; if ((ret = __memp_dirty(dbp->mpf, &epg->page, dbc->txn, dbc->priority, 0)) != 0) return (ret); switch (TYPE(epg->page)) { case P_IBTREE: pgnop = &GET_BINTERNAL(dbp, epg->page, epg->indx)->pgno; break; case P_IRECNO: pgnop = &GET_RINTERNAL(dbp, epg->page, epg->indx)->pgno; break; default: pgnop = &GET_BOVERFLOW(dbp, epg->page, epg->indx)->pgno; break; } if (DBC_LOGGING(dbc)) { if ((ret = __bam_pgno_log(dbp, dbc->txn, &LSN(epg->page), 0, PGNO(epg->page), &LSN(epg->page), (u_int32_t)epg->indx, *pgnop, PGNO(newpage))) != 0) return (ret); } else LSN_NOT_LOGGED(LSN(epg->page)); *pgnop = PGNO(newpage); cp->csp->page = newpage; done: return (0); err: (void)__memp_fput(dbp->mpf, newpage, dbc->priority); return (ret); } /* * __bam_truncate_overflow -- find overflow pages to truncate. * Walk the pages of an overflow chain and swap out * high numbered pages. We are passed the first page * but only deal with the second and subsequent pages. */ static int __bam_truncate_overflow(dbc, pgno, pg_lock, c_data) DBC *dbc; db_pgno_t pgno; db_pgno_t pg_lock; DB_COMPACT *c_data; { DB *dbp; DB_LOCK lock; PAGE *page; int ret, t_ret; dbp = dbc->dbp; page = NULL; LOCK_INIT(lock); if ((ret = __memp_fget(dbp->mpf, &pgno, dbc->txn, 0, &page)) != 0) return (ret); while ((pgno = NEXT_PGNO(page)) != PGNO_INVALID) { if ((ret = __memp_fput(dbp->mpf, page, dbc->priority)) != 0) return (ret); if ((ret = __memp_fget(dbp->mpf, &pgno, dbc->txn, 0, &page)) != 0) return (ret); if (pgno <= c_data->compact_truncate) continue; if (pg_lock != PGNO_INVALID) { if ((ret = __db_lget(dbc, 0, pg_lock, DB_LOCK_WRITE, 0, &lock)) != 0) break; pg_lock = PGNO_INVALID; } if ((ret = __bam_truncate_page(dbc, &page, 0)) != 0) break; } if (page != NULL && (t_ret = __memp_fput( dbp->mpf, page, dbc->priority)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) ret = t_ret; return (ret); } /* * __bam_truncate_root_page -- swap a page which is * the root of an off page dup tree or the head of an overflow. * The page is reference by the pg/indx passed in. */ static int __bam_truncate_root_page(dbc, pg, indx, c_data) DBC *dbc; PAGE *pg; u_int32_t indx; DB_COMPACT *c_data; { BINTERNAL *bi; BOVERFLOW *bo; DB *dbp; DBT orig; PAGE *page; db_pgno_t newpgno, *pgnop; int ret, t_ret; COMPQUIET(c_data, NULL); COMPQUIET(bo, NULL); COMPQUIET(newpgno, PGNO_INVALID); dbp = dbc->dbp; page = NULL; if (TYPE(pg) == P_IBTREE) { bi = GET_BINTERNAL(dbp, pg, indx); if (B_TYPE(bi->type) == B_OVERFLOW) { bo = (BOVERFLOW *)(bi->data); pgnop = &bo->pgno; } else pgnop = &bi->pgno; } else { bo = GET_BOVERFLOW(dbp, pg, indx); pgnop = &bo->pgno; } DB_ASSERT(dbp->dbenv, IS_DIRTY(pg)); if ((ret = __memp_fget(dbp->mpf, pgnop, dbc->txn, 0, &page)) != 0) goto err; /* * If this is a multiply reference overflow key, then we will just * copy it and decrement the reference count. This is part of a * fix to get rid of multiple references. */ if (TYPE(page) == P_OVERFLOW && OV_REF(page) > 1) { if ((ret = __db_ovref(dbc, bo->pgno)) != 0) goto err; memset(&orig, 0, sizeof(orig)); if ((ret = __db_goff(dbp, dbc->txn, &orig, bo->tlen, bo->pgno, &orig.data, &orig.size)) == 0) ret = __db_poff(dbc, &orig, &newpgno); if (orig.data != NULL) __os_free(dbp->dbenv, orig.data); if (ret != 0) goto err; } else { if ((ret = __bam_truncate_page(dbc, &page, 0)) != 0) goto err; newpgno = PGNO(page); /* If we could not allocate from the free list, give up.*/ if (newpgno == *pgnop) goto err; } /* Update the reference. */ if (DBC_LOGGING(dbc)) { if ((ret = __bam_pgno_log(dbp, dbc->txn, &LSN(pg), 0, PGNO(pg), &LSN(pg), (u_int32_t)indx, *pgnop, newpgno)) != 0) goto err; } else LSN_NOT_LOGGED(LSN(pg)); *pgnop = newpgno; err: if (page != NULL && (t_ret = __memp_fput(dbp->mpf, page, dbc->priority)) != 0 && ret == 0) ret = t_ret; return (ret); } /* * -- bam_truncate_internal_overflow -- find overflow keys * on internal pages and if they have high page * numbers swap them with lower pages and truncate them. * Note that if there are overflow keys in the internal * nodes they will get copied adding pages to the database. */ static int __bam_truncate_internal_overflow(dbc, page, c_data) DBC *dbc; PAGE *page; DB_COMPACT *c_data; { BINTERNAL *bi; BOVERFLOW *bo; db_indx_t indx; int ret; COMPQUIET(bo, NULL); ret = 0; for (indx = 0; indx < NUM_ENT(page); indx++) { bi = GET_BINTERNAL(dbc->dbp, page, indx); if (B_TYPE(bi->type) != B_OVERFLOW) continue; bo = (BOVERFLOW *)(bi->data); if (bo->pgno > c_data->compact_truncate && (ret = __bam_truncate_root_page(dbc, page, indx, c_data)) != 0) break; if ((ret = __bam_truncate_overflow( dbc, bo->pgno, PGNO_INVALID, c_data)) != 0) break; } return (ret); } #ifdef HAVE_FTRUNCATE /* * __bam_savekey -- save the key from an internal page. * We need to save information so that we can * fetch then next internal node of the tree. This means * we need the btree key on this current page, or the * next record number. */ static int __bam_savekey(dbc, next, start) DBC *dbc; int next; DBT *start; { BINTERNAL *bi; BOVERFLOW *bo; BTREE_CURSOR *cp; DB *dbp; DB_ENV *dbenv; PAGE *pg; RINTERNAL *ri; db_indx_t indx, top; dbp = dbc->dbp; dbenv = dbp->dbenv; cp = (BTREE_CURSOR *)dbc->internal; pg = cp->csp->page; if (dbc->dbtype == DB_RECNO) { if (next) for (indx = 0, top = NUM_ENT(pg); indx != top; indx++) { ri = GET_RINTERNAL(dbp, pg, indx); cp->recno += ri->nrecs; } return (__db_retcopy(dbenv, start, &cp->recno, sizeof(cp->recno), &start->data, &start->ulen)); } bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1); if (B_TYPE(bi->type) == B_OVERFLOW) { bo = (BOVERFLOW *)(bi->data); return (__db_goff(dbp, dbc->txn, start, bo->tlen, bo->pgno, &start->data, &start->ulen)); } return (__db_retcopy(dbenv, start, bi->data, bi->len, &start->data, &start->ulen)); } /* * bam_truncate_internal -- * Find high numbered pages in the internal nodes of a tree and * swap them. */ static int __bam_truncate_internal(dbp, txn, c_data) DB *dbp; DB_TXN *txn; DB_COMPACT *c_data; { BTREE_CURSOR *cp; DBC *dbc; DBT start; PAGE *pg; db_pgno_t pgno; u_int32_t sflag; int level, local_txn, ret, t_ret; dbc = NULL; memset(&start, 0, sizeof(start)); if (IS_DB_AUTO_COMMIT(dbp, txn)) { local_txn = 1; txn = NULL; } else local_txn = 0; level = LEAFLEVEL + 1; sflag = CS_READ | CS_GETRECNO; new_txn: if (local_txn && (ret = __txn_begin(dbp->dbenv, NULL, &txn, 0)) != 0) goto err; if ((ret = __db_cursor(dbp, txn, &dbc, 0)) != 0) goto err; cp = (BTREE_CURSOR *)dbc->internal; pgno = PGNO_INVALID; do { if ((ret = __bam_csearch(dbc, &start, sflag, level)) != 0) { /* No more at this level, go up one. */ if (ret == DB_NOTFOUND) { level++; if (start.data != NULL) __os_free(dbp->dbenv, start.data); memset(&start, 0, sizeof(start)); sflag = CS_READ | CS_GETRECNO; continue; } goto err; } c_data->compact_pages_examine++; pg = cp->csp->page; pgno = PGNO(pg); sflag = CS_NEXT | CS_GETRECNO; /* Grab info about the page and drop the stack. */ if (pgno != cp->root && (ret = __bam_savekey(dbc, pgno <= c_data->compact_truncate, &start)) != 0) goto err; if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0) goto err; if (pgno == cp->root) break; if (pgno <= c_data->compact_truncate) continue; /* Reget the page with a write lock, and its parent too. */ if ((ret = __bam_csearch(dbc, &start, CS_PARENT | CS_GETRECNO, level)) != 0) goto err; pg = cp->csp->page; pgno = PGNO(pg); if (pgno > c_data->compact_truncate) { if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0) goto err; } if ((ret = __bam_stkrel(dbc, pgno > c_data->compact_truncate ? 0 : STK_NOLOCK)) != 0) goto err; /* We are locking subtrees, so drop the write locks asap. */ if (local_txn && pgno > c_data->compact_truncate) break; } while (pgno != cp->root); if ((ret = __dbc_close(dbc)) != 0) goto err; dbc = NULL; if (local_txn) { if ((ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0) goto err; txn = NULL; } if (pgno != ((BTREE *)dbp->bt_internal)->bt_root) goto new_txn; err: if (dbc != NULL && (t_ret = __bam_stkrel(dbc, 0)) != 0 && ret == 0) ret = t_ret; if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) ret = t_ret; if (local_txn && txn != NULL && (t_ret = __txn_abort(txn)) != 0 && ret == 0) ret = t_ret; if (start.data != NULL) __os_free(dbp->dbenv, start.data); return (ret); } static int __bam_setup_freelist(dbp, list, nelems) DB *dbp; struct pglist *list; u_int32_t nelems; { DB_MPOOLFILE *mpf; db_pgno_t *plist; int ret; mpf = dbp->mpf; if ((ret = __memp_alloc_freelist(mpf, nelems, &plist)) != 0) return (ret); while (nelems-- != 0) *plist++ = list++->pgno; return (0); } static int __bam_free_freelist(dbp, txn) DB *dbp; DB_TXN *txn; { DBC *dbc; DB_LOCK lock; int auto_commit, ret, t_ret; LOCK_INIT(lock); auto_commit = ret = 0; /* * If we are not in a transaction then we need to get * a lock on the meta page, otherwise we should already * have the lock. */ dbc = NULL; if (IS_DB_AUTO_COMMIT(dbp, txn)) { /* * We must not timeout the lock or we will not free the list. * We ignore errors from txn_begin as there is little that * the application can do with the error and we want to * get the lock and free the list if at all possible. */ if (__txn_begin(dbp->dbenv, NULL, &txn, 0) == 0) { (void)__lock_set_timeout(dbp->dbenv, txn->locker, 0, DB_SET_TXN_TIMEOUT); (void)__lock_set_timeout(dbp->dbenv, txn->locker, 0, DB_SET_LOCK_TIMEOUT); auto_commit = 1; } /* Get a cursor so we can call __db_lget. */ if ((ret = __db_cursor(dbp, txn, &dbc, 0)) != 0) return (ret); if ((ret = __db_lget(dbc, 0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &lock)) != 0) goto err; } ret = __memp_free_freelist(dbp->mpf); err: if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) ret = t_ret; if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) ret = t_ret; if (auto_commit && __txn_abort(txn) != 0 && ret == 0) ret = t_ret; return (ret); } #endif