ParkingLot.cpp   [plain text]


/*
 * Copyright (C) 2015-2016 Apple Inc. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 */

#include "config.h"
#include <wtf/ParkingLot.h>

#include <condition_variable>
#include <mutex>
#include <thread>
#include <wtf/DataLog.h>
#include <wtf/HashFunctions.h>
#include <wtf/StringPrintStream.h>
#include <wtf/ThreadSpecific.h>
#include <wtf/Threading.h>
#include <wtf/Vector.h>
#include <wtf/WeakRandom.h>
#include <wtf/WordLock.h>

namespace WTF {

namespace {

const bool verbose = false;

struct ThreadData : public ThreadSafeRefCounted<ThreadData> {
    WTF_MAKE_FAST_ALLOCATED;
public:
    
    ThreadData();
    ~ThreadData();

    Ref<Thread> thread;
    
    Mutex parkingLock;
    ThreadCondition parkingCondition;

    const void* address { nullptr };
    
    ThreadData* nextInQueue { nullptr };
    
    intptr_t token { 0 };
};

enum class DequeueResult {
    Ignore,
    RemoveAndContinue,
    RemoveAndStop
};

struct Bucket {
    WTF_MAKE_FAST_ALLOCATED;
public:
    Bucket()
        : random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) // Cannot use default seed since that recurses into Lock.
    {
    }
    
    void enqueue(ThreadData* data)
    {
        if (verbose)
            dataLog(toString(Thread::current(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
        ASSERT(data->address);
        ASSERT(!data->nextInQueue);
        
        if (queueTail) {
            queueTail->nextInQueue = data;
            queueTail = data;
            return;
        }

        queueHead = data;
        queueTail = data;
    }

    template<typename Functor>
    void genericDequeue(const Functor& functor)
    {
        if (verbose)
            dataLog(toString(Thread::current(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
        
        if (!queueHead) {
            if (verbose)
                dataLog(toString(Thread::current(), ": empty.\n"));
            return;
        }

        // This loop is a very clever abomination. The induction variables are the pointer to the
        // pointer to the current node, and the pointer to the previous node. This gives us everything
        // we need to both proceed forward to the next node, and to remove nodes while maintaining the
        // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
        // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
        // we'd want queueTail to be set to nullptr. This works because:
        //
        //     currentPtr == &queueHead
        //     previous == nullptr
        //
        // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
        // that used to point to this node to instead point to this node's successor. Another example:
        // if we were at the second node in the queue, then we'd have:
        //
        //     currentPtr == &queueHead->nextInQueue
        //     previous == queueHead
        //
        // If this node is not equal to queueTail, then removing it simply means making
        // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
        // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
        // queueTail to previous, which in this case is queueHead - thus making the queue look like a
        // proper one-element queue with queueHead == queueTail.
        bool shouldContinue = true;
        ThreadData** currentPtr = &queueHead;
        ThreadData* previous = nullptr;

        MonotonicTime time = MonotonicTime::now();
        bool timeToBeFair = false;
        if (time > nextFairTime)
            timeToBeFair = true;
        
        bool didDequeue = false;
        
        while (shouldContinue) {
            ThreadData* current = *currentPtr;
            if (verbose)
                dataLog(toString(Thread::current(), ": got thread ", RawPointer(current), "\n"));
            if (!current)
                break;
            DequeueResult result = functor(current, timeToBeFair);
            switch (result) {
            case DequeueResult::Ignore:
                if (verbose)
                    dataLog(toString(Thread::current(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
                previous = current;
                currentPtr = &(*currentPtr)->nextInQueue;
                break;
            case DequeueResult::RemoveAndStop:
                shouldContinue = false;
                FALLTHROUGH;
            case DequeueResult::RemoveAndContinue:
                if (verbose)
                    dataLog(toString(Thread::current(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
                if (current == queueTail)
                    queueTail = previous;
                didDequeue = true;
                *currentPtr = current->nextInQueue;
                current->nextInQueue = nullptr;
                break;
            }
        }
        
        if (timeToBeFair && didDequeue)
            nextFairTime = time + Seconds::fromMilliseconds(random.get());

        ASSERT(!!queueHead == !!queueTail);
    }
    
    ThreadData* dequeue()
    {
        ThreadData* result = nullptr;
        genericDequeue(
            [&] (ThreadData* element, bool) -> DequeueResult {
                result = element;
                return DequeueResult::RemoveAndStop;
            });
        return result;
    }

    ThreadData* queueHead { nullptr };
    ThreadData* queueTail { nullptr };

    // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
    // this lock.
    WordLock lock;
    
    MonotonicTime nextFairTime;
    
    WeakRandom random;

    // Put some distane between buckets in memory. This is one of several mitigations against false
    // sharing.
    char padding[64];
};

struct Hashtable;

// We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
Vector<Hashtable*>* hashtables;
WordLock hashtablesLock;

struct Hashtable {
    unsigned size;
    Atomic<Bucket*> data[1];

    static Hashtable* create(unsigned size)
    {
        ASSERT(size >= 1);
        
        Hashtable* result = static_cast<Hashtable*>(
            fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
        result->size = size;

        {
            // This is not fast and it's not data-access parallel, but that's fine, because
            // hashtable resizing is guaranteed to be rare and it will never happen in steady
            // state.
            WordLockHolder locker(hashtablesLock);
            if (!hashtables)
                hashtables = new Vector<Hashtable*>();
            hashtables->append(result);
        }
        
        return result;
    }

    static void destroy(Hashtable* hashtable)
    {
        {
            // This is not fast, but that's OK. See comment in create().
            WordLockHolder locker(hashtablesLock);
            hashtables->removeFirst(hashtable);
        }
        
        fastFree(hashtable);
    }
};

Atomic<Hashtable*> hashtable;
Atomic<unsigned> numThreads;

// With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
// memory usage per thread will still be less than 1KB.
const unsigned maxLoadFactor = 3;

const unsigned growthFactor = 2;

unsigned hashAddress(const void* address)
{
    return WTF::PtrHash<const void*>::hash(address);
}

Hashtable* ensureHashtable()
{
    for (;;) {
        Hashtable* currentHashtable = hashtable.load();

        if (currentHashtable)
            return currentHashtable;

        if (!currentHashtable) {
            currentHashtable = Hashtable::create(maxLoadFactor);
            if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
                if (verbose)
                    dataLog(toString(Thread::current(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
                return currentHashtable;
            }

            Hashtable::destroy(currentHashtable);
        }
    }
}

// Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
// after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
// slow and not scalable, so it's only used during thread creation and for debugging/testing.
Vector<Bucket*> lockHashtable()
{
    for (;;) {
        Hashtable* currentHashtable = ensureHashtable();

        ASSERT(currentHashtable);

        // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
        // we can lock all of the buckets, not just the ones that are materialized.
        Vector<Bucket*> buckets;
        for (unsigned i = currentHashtable->size; i--;) {
            Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];

            for (;;) {
                Bucket* bucket = bucketPointer.load();

                if (!bucket) {
                    bucket = new Bucket();
                    if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
                        delete bucket;
                        continue;
                    }
                }

                buckets.append(bucket);
                break;
            }
        }

        // Now lock the buckets in the right order.
        std::sort(buckets.begin(), buckets.end());
        for (Bucket* bucket : buckets)
            bucket->lock.lock();

        // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
        // now.
        if (hashtable.load() == currentHashtable)
            return buckets;

        // The hashtable rehashed. Unlock everything and try again.
        for (Bucket* bucket : buckets)
            bucket->lock.unlock();
    }
}

void unlockHashtable(const Vector<Bucket*>& buckets)
{
    for (Bucket* bucket : buckets)
        bucket->lock.unlock();
}

// Rehash the hashtable to handle numThreads threads.
void ensureHashtableSize(unsigned numThreads)
{
    // We try to ensure that the size of the hashtable used for thread queues is always large enough
    // to avoid collisions. So, since we started a new thread, we may need to increase the size of the
    // hashtable. This does just that. Note that we never free the old spine, since we never lock
    // around spine accesses (i.e. the "hashtable" global variable).

    // First do a fast check to see if rehashing is needed.
    Hashtable* oldHashtable = hashtable.load();
    if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
        if (verbose)
            dataLog(toString(Thread::current(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
        return;
    }

    // Seems like we *might* have to rehash, so lock the hashtable and try again.
    Vector<Bucket*> bucketsToUnlock = lockHashtable();

    // Check again, since the hashtable could have rehashed while we were locking it. Also,
    // lockHashtable() creates an initial hashtable for us.
    oldHashtable = hashtable.load();
    if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
        if (verbose)
            dataLog(toString(Thread::current(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
        unlockHashtable(bucketsToUnlock);
        return;
    }

    Vector<Bucket*> reusableBuckets = bucketsToUnlock;

    // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
    // are placed into the vector in queue order.
    Vector<ThreadData*> threadDatas;
    for (Bucket* bucket : reusableBuckets) {
        while (ThreadData* threadData = bucket->dequeue())
            threadDatas.append(threadData);
    }

    unsigned newSize = numThreads * growthFactor * maxLoadFactor;
    RELEASE_ASSERT(newSize > oldHashtable->size);
    
    Hashtable* newHashtable = Hashtable::create(newSize);
    if (verbose)
        dataLog(toString(Thread::current(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
    for (ThreadData* threadData : threadDatas) {
        if (verbose)
            dataLog(toString(Thread::current(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
        unsigned hash = hashAddress(threadData->address);
        unsigned index = hash % newHashtable->size;
        if (verbose)
            dataLog(toString(Thread::current(), ": index = ", index, "\n"));
        Bucket* bucket = newHashtable->data[index].load();
        if (!bucket) {
            if (reusableBuckets.isEmpty())
                bucket = new Bucket();
            else
                bucket = reusableBuckets.takeLast();
            newHashtable->data[index].store(bucket);
        }
        
        bucket->enqueue(threadData);
    }
    
    // At this point there may be some buckets left unreused. This could easily happen if the
    // number of enqueued threads right now is low but the high watermark of the number of threads
    // enqueued was high. We place these buckets into the hashtable basically at random, just to
    // make sure we don't leak them.
    for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
        Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
        if (bucketPtr.load())
            continue;
        bucketPtr.store(reusableBuckets.takeLast());
    }
    
    // Since we increased the size of the hashtable, we should have exhausted our preallocated
    // buckets by now.
    ASSERT(reusableBuckets.isEmpty());
    
    // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
    // roll. After we install the new hashtable, we can release all bucket locks.
    
    bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable;
    RELEASE_ASSERT(result);

    unlockHashtable(bucketsToUnlock);
}

ThreadData::ThreadData()
    : thread(Thread::current())
{
    unsigned currentNumThreads;
    for (;;) {
        unsigned oldNumThreads = numThreads.load();
        currentNumThreads = oldNumThreads + 1;
        if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
            break;
    }

    ensureHashtableSize(currentNumThreads);
}

ThreadData::~ThreadData()
{
    for (;;) {
        unsigned oldNumThreads = numThreads.load();
        if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
            break;
    }
}

ThreadData* myThreadData()
{
    static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData;
    static std::once_flag initializeOnce;
    std::call_once(
        initializeOnce,
        [] {
            threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>();
        });
    
    RefPtr<ThreadData>& result = **threadData;
    
    if (!result)
        result = adoptRef(new ThreadData());
    
    return result.get();
}

template<typename Functor>
bool enqueue(const void* address, const Functor& functor)
{
    unsigned hash = hashAddress(address);

    for (;;) {
        Hashtable* myHashtable = ensureHashtable();
        unsigned index = hash % myHashtable->size;
        Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
        Bucket* bucket;
        for (;;) {
            bucket = bucketPointer.load();
            if (!bucket) {
                bucket = new Bucket();
                if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
                    delete bucket;
                    continue;
                }
            }
            break;
        }
        if (verbose)
            dataLog(toString(Thread::current(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
        bucket->lock.lock();

        // At this point the hashtable could have rehashed under us.
        if (hashtable.load() != myHashtable) {
            bucket->lock.unlock();
            continue;
        }

        ThreadData* threadData = functor();
        bool result;
        if (threadData) {
            if (verbose)
                dataLog(toString(Thread::current(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
            bucket->enqueue(threadData);
            result = true;
        } else
            result = false;
        bucket->lock.unlock();
        return result;
    }
}

enum class BucketMode {
    EnsureNonEmpty,
    IgnoreEmpty
};

template<typename DequeueFunctor, typename FinishFunctor>
bool dequeue(
    const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
    const FinishFunctor& finishFunctor)
{
    unsigned hash = hashAddress(address);

    for (;;) {
        Hashtable* myHashtable = ensureHashtable();
        unsigned index = hash % myHashtable->size;
        Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
        Bucket* bucket = bucketPointer.load();
        if (!bucket) {
            if (bucketMode == BucketMode::IgnoreEmpty)
                return false;

            for (;;) {
                bucket = bucketPointer.load();
                if (!bucket) {
                    bucket = new Bucket();
                    if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
                        delete bucket;
                        continue;
                    }
                }
                break;
            }
        }

        bucket->lock.lock();

        // At this point the hashtable could have rehashed under us.
        if (hashtable.load() != myHashtable) {
            bucket->lock.unlock();
            continue;
        }

        bucket->genericDequeue(dequeueFunctor);
        bool result = !!bucket->queueHead;
        finishFunctor(result);
        bucket->lock.unlock();
        return result;
    }
}

} // anonymous namespace

NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl(
    const void* address,
    const ScopedLambda<bool()>& validation,
    const ScopedLambda<void()>& beforeSleep,
    const TimeWithDynamicClockType& timeout)
{
    if (verbose)
        dataLog(toString(Thread::current(), ": parking.\n"));
    
    ThreadData* me = myThreadData();
    me->token = 0;

    // Guard against someone calling parkConditionally() recursively from beforeSleep().
    RELEASE_ASSERT(!me->address);

    bool enqueueResult = enqueue(
        address,
        [&] () -> ThreadData* {
            if (!validation())
                return nullptr;

            me->address = address;
            return me;
        });

    if (!enqueueResult)
        return ParkResult();

    beforeSleep();
    
    bool didGetDequeued;
    {
        MutexLocker locker(me->parkingLock);
        while (me->address && timeout.nowWithSameClock() < timeout) {
            me->parkingCondition.timedWait(
                me->parkingLock, timeout.approximateWallTime());
            
            // It's possible for the OS to decide not to wait. If it does that then it will also
            // decide not to release the lock. If there's a bug in the time math, then this could
            // result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating
            // spin.
            me->parkingLock.unlock();
            me->parkingLock.lock();
        }
        ASSERT(!me->address || me->address == address);
        didGetDequeued = !me->address;
    }
    
    if (didGetDequeued) {
        // Great! We actually got dequeued rather than the timeout expiring.
        ParkResult result;
        result.wasUnparked = true;
        result.token = me->token;
        return result;
    }

    // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.

    bool didDequeue = false;
    dequeue(
        address, BucketMode::IgnoreEmpty,
        [&] (ThreadData* element, bool) {
            if (element == me) {
                didDequeue = true;
                return DequeueResult::RemoveAndStop;
            }
            return DequeueResult::Ignore;
        },
        [] (bool) { });
    
    // If didDequeue is true, then we dequeued ourselves. This means that we were not unparked.
    // If didDequeue is false, then someone unparked us.
    
    RELEASE_ASSERT(!me->nextInQueue);

    // Make sure that no matter what, me->address is null after this point.
    {
        MutexLocker locker(me->parkingLock);
        if (!didDequeue) {
            // If we did not dequeue ourselves, then someone else did. They will set our address to
            // null. We don't want to proceed until they do this, because otherwise, they may set
            // our address to null in some distant future when we're already trying to wait for
            // other things.
            while (me->address)
                me->parkingCondition.wait(me->parkingLock);
        }
        me->address = nullptr;
    }

    ParkResult result;
    result.wasUnparked = !didDequeue;
    if (!didDequeue) {
        // If we were unparked then there should be a token.
        result.token = me->token;
    }
    return result;
}

NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
{
    if (verbose)
        dataLog(toString(Thread::current(), ": unparking one.\n"));
    
    UnparkResult result;

    RefPtr<ThreadData> threadData;
    result.mayHaveMoreThreads = dequeue(
        address,
        // Why is this here?
        // FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty
        // without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform
        // some operation while holding the bucket lock, which usually goes into the finish func.
        // But if that operation is a no-op, then it's not clear why we need this.
        BucketMode::EnsureNonEmpty,
        [&] (ThreadData* element, bool) {
            if (element->address != address)
                return DequeueResult::Ignore;
            threadData = element;
            result.didUnparkThread = true;
            return DequeueResult::RemoveAndStop;
        },
        [] (bool) { });

    if (!threadData) {
        ASSERT(!result.didUnparkThread);
        result.mayHaveMoreThreads = false;
        return result;
    }
    
    ASSERT(threadData->address);
    
    {
        MutexLocker locker(threadData->parkingLock);
        threadData->address = nullptr;
        threadData->token = 0;
    }
    threadData->parkingCondition.signal();

    return result;
}

NEVER_INLINE void ParkingLot::unparkOneImpl(
    const void* address,
    const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback)
{
    if (verbose)
        dataLog(toString(Thread::current(), ": unparking one the hard way.\n"));
    
    RefPtr<ThreadData> threadData;
    bool timeToBeFair = false;
    dequeue(
        address,
        BucketMode::EnsureNonEmpty,
        [&] (ThreadData* element, bool passedTimeToBeFair) {
            if (element->address != address)
                return DequeueResult::Ignore;
            threadData = element;
            timeToBeFair = passedTimeToBeFair;
            return DequeueResult::RemoveAndStop;
        },
        [&] (bool mayHaveMoreThreads) {
            UnparkResult result;
            result.didUnparkThread = !!threadData;
            result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
            if (timeToBeFair)
                RELEASE_ASSERT(threadData);
            result.timeToBeFair = timeToBeFair;
            intptr_t token = callback(result);
            if (threadData)
                threadData->token = token;
        });

    if (!threadData)
        return;

    ASSERT(threadData->address);
    
    {
        MutexLocker locker(threadData->parkingLock);
        threadData->address = nullptr;
    }
    // At this point, the threadData may die. Good thing we have a RefPtr<> on it.
    threadData->parkingCondition.signal();
}

NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count)
{
    if (!count)
        return 0;
    
    if (verbose)
        dataLog(toString(Thread::current(), ": unparking count = ", count, " from ", RawPointer(address), ".\n"));
    
    Vector<RefPtr<ThreadData>, 8> threadDatas;
    dequeue(
        address,
        // FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does,
        // but that seems wrong.
        BucketMode::IgnoreEmpty,
        [&] (ThreadData* element, bool) {
            if (verbose)
                dataLog(toString(Thread::current(), ": Observing element with address = ", RawPointer(element->address), "\n"));
            if (element->address != address)
                return DequeueResult::Ignore;
            threadDatas.append(element);
            if (threadDatas.size() == count)
                return DequeueResult::RemoveAndStop;
            return DequeueResult::RemoveAndContinue;
        },
        [] (bool) { });

    for (RefPtr<ThreadData>& threadData : threadDatas) {
        if (verbose)
            dataLog(toString(Thread::current(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n"));
        ASSERT(threadData->address);
        {
            MutexLocker locker(threadData->parkingLock);
            threadData->address = nullptr;
        }
        threadData->parkingCondition.signal();
    }

    if (verbose)
        dataLog(toString(Thread::current(), ": done unparking.\n"));
    
    return threadDatas.size();
}

NEVER_INLINE void ParkingLot::unparkAll(const void* address)
{
    unparkCount(address, UINT_MAX);
}

NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(Thread&, const void*)>& callback)
{
    Vector<Bucket*> bucketsToUnlock = lockHashtable();

    Hashtable* currentHashtable = hashtable.load();
    for (unsigned i = currentHashtable->size; i--;) {
        Bucket* bucket = currentHashtable->data[i].load();
        if (!bucket)
            continue;
        for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
            callback(currentThreadData->thread.get(), currentThreadData->address);
    }
    
    unlockHashtable(bucketsToUnlock);
}

} // namespace WTF