#include "config.h"
#include "ParkingLot.h"
#include "CurrentTime.h"
#include "DataLog.h"
#include "HashFunctions.h"
#include "StringPrintStream.h"
#include "ThreadSpecific.h"
#include "ThreadingPrimitives.h"
#include "Vector.h"
#include "WeakRandom.h"
#include "WordLock.h"
#include <condition_variable>
#include <mutex>
#include <thread>
namespace WTF {
namespace {
const bool verbose = false;
struct ThreadData : public ThreadSafeRefCounted<ThreadData> {
WTF_MAKE_FAST_ALLOCATED;
public:
ThreadData();
~ThreadData();
Ref<Thread> thread;
Mutex parkingLock;
ThreadCondition parkingCondition;
const void* address { nullptr };
ThreadData* nextInQueue { nullptr };
intptr_t token { 0 };
};
enum class DequeueResult {
Ignore,
RemoveAndContinue,
RemoveAndStop
};
struct Bucket {
WTF_MAKE_FAST_ALLOCATED;
public:
Bucket()
: random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) {
}
void enqueue(ThreadData* data)
{
if (verbose)
dataLog(toString(currentThread(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
ASSERT(data->address);
ASSERT(!data->nextInQueue);
if (queueTail) {
queueTail->nextInQueue = data;
queueTail = data;
return;
}
queueHead = data;
queueTail = data;
}
template<typename Functor>
void genericDequeue(const Functor& functor)
{
if (verbose)
dataLog(toString(currentThread(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
if (!queueHead) {
if (verbose)
dataLog(toString(currentThread(), ": empty.\n"));
return;
}
bool shouldContinue = true;
ThreadData** currentPtr = &queueHead;
ThreadData* previous = nullptr;
double time = monotonicallyIncreasingTimeMS();
bool timeToBeFair = false;
if (time > nextFairTime)
timeToBeFair = true;
bool didDequeue = false;
while (shouldContinue) {
ThreadData* current = *currentPtr;
if (verbose)
dataLog(toString(currentThread(), ": got thread ", RawPointer(current), "\n"));
if (!current)
break;
DequeueResult result = functor(current, timeToBeFair);
switch (result) {
case DequeueResult::Ignore:
if (verbose)
dataLog(toString(currentThread(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
previous = current;
currentPtr = &(*currentPtr)->nextInQueue;
break;
case DequeueResult::RemoveAndStop:
shouldContinue = false;
FALLTHROUGH;
case DequeueResult::RemoveAndContinue:
if (verbose)
dataLog(toString(currentThread(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
if (current == queueTail)
queueTail = previous;
didDequeue = true;
*currentPtr = current->nextInQueue;
current->nextInQueue = nullptr;
break;
}
}
if (timeToBeFair && didDequeue)
nextFairTime = time + random.get();
ASSERT(!!queueHead == !!queueTail);
}
ThreadData* dequeue()
{
ThreadData* result = nullptr;
genericDequeue(
[&] (ThreadData* element, bool) -> DequeueResult {
result = element;
return DequeueResult::RemoveAndStop;
});
return result;
}
ThreadData* queueHead { nullptr };
ThreadData* queueTail { nullptr };
WordLock lock;
double nextFairTime { 0 };
WeakRandom random;
char padding[64];
};
struct Hashtable;
Vector<Hashtable*>* hashtables;
StaticWordLock hashtablesLock;
struct Hashtable {
unsigned size;
Atomic<Bucket*> data[1];
static Hashtable* create(unsigned size)
{
ASSERT(size >= 1);
Hashtable* result = static_cast<Hashtable*>(
fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
result->size = size;
{
WordLockHolder locker(hashtablesLock);
if (!hashtables)
hashtables = new Vector<Hashtable*>();
hashtables->append(result);
}
return result;
}
static void destroy(Hashtable* hashtable)
{
{
WordLockHolder locker(hashtablesLock);
hashtables->removeFirst(hashtable);
}
fastFree(hashtable);
}
};
Atomic<Hashtable*> hashtable;
Atomic<unsigned> numThreads;
const unsigned maxLoadFactor = 3;
const unsigned growthFactor = 2;
unsigned hashAddress(const void* address)
{
return WTF::PtrHash<const void*>::hash(address);
}
Hashtable* ensureHashtable()
{
for (;;) {
Hashtable* currentHashtable = hashtable.load();
if (currentHashtable)
return currentHashtable;
if (!currentHashtable) {
currentHashtable = Hashtable::create(maxLoadFactor);
if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
if (verbose)
dataLog(toString(currentThread(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
return currentHashtable;
}
Hashtable::destroy(currentHashtable);
}
}
}
Vector<Bucket*> lockHashtable()
{
for (;;) {
Hashtable* currentHashtable = ensureHashtable();
ASSERT(currentHashtable);
Vector<Bucket*> buckets;
for (unsigned i = currentHashtable->size; i--;) {
Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
for (;;) {
Bucket* bucket = bucketPointer.load();
if (!bucket) {
bucket = new Bucket();
if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
delete bucket;
continue;
}
}
buckets.append(bucket);
break;
}
}
std::sort(buckets.begin(), buckets.end());
for (Bucket* bucket : buckets)
bucket->lock.lock();
if (hashtable.load() == currentHashtable)
return buckets;
for (Bucket* bucket : buckets)
bucket->lock.unlock();
}
}
void unlockHashtable(const Vector<Bucket*>& buckets)
{
for (Bucket* bucket : buckets)
bucket->lock.unlock();
}
void ensureHashtableSize(unsigned numThreads)
{
Hashtable* oldHashtable = hashtable.load();
if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
if (verbose)
dataLog(toString(currentThread(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
return;
}
Vector<Bucket*> bucketsToUnlock = lockHashtable();
oldHashtable = hashtable.load();
if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
if (verbose)
dataLog(toString(currentThread(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
unlockHashtable(bucketsToUnlock);
return;
}
Vector<Bucket*> reusableBuckets = bucketsToUnlock;
Vector<ThreadData*> threadDatas;
for (Bucket* bucket : reusableBuckets) {
while (ThreadData* threadData = bucket->dequeue())
threadDatas.append(threadData);
}
unsigned newSize = numThreads * growthFactor * maxLoadFactor;
RELEASE_ASSERT(newSize > oldHashtable->size);
Hashtable* newHashtable = Hashtable::create(newSize);
if (verbose)
dataLog(toString(currentThread(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
for (ThreadData* threadData : threadDatas) {
if (verbose)
dataLog(toString(currentThread(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
unsigned hash = hashAddress(threadData->address);
unsigned index = hash % newHashtable->size;
if (verbose)
dataLog(toString(currentThread(), ": index = ", index, "\n"));
Bucket* bucket = newHashtable->data[index].load();
if (!bucket) {
if (reusableBuckets.isEmpty())
bucket = new Bucket();
else
bucket = reusableBuckets.takeLast();
newHashtable->data[index].store(bucket);
}
bucket->enqueue(threadData);
}
for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
if (bucketPtr.load())
continue;
bucketPtr.store(reusableBuckets.takeLast());
}
ASSERT(reusableBuckets.isEmpty());
bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable;
RELEASE_ASSERT(result);
unlockHashtable(bucketsToUnlock);
}
ThreadData::ThreadData()
: thread(Thread::current())
{
unsigned currentNumThreads;
for (;;) {
unsigned oldNumThreads = numThreads.load();
currentNumThreads = oldNumThreads + 1;
if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
break;
}
ensureHashtableSize(currentNumThreads);
}
ThreadData::~ThreadData()
{
for (;;) {
unsigned oldNumThreads = numThreads.load();
if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
break;
}
}
ThreadData* myThreadData()
{
static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData;
static std::once_flag initializeOnce;
std::call_once(
initializeOnce,
[] {
threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>();
});
RefPtr<ThreadData>& result = **threadData;
if (!result)
result = adoptRef(new ThreadData());
return result.get();
}
template<typename Functor>
bool enqueue(const void* address, const Functor& functor)
{
unsigned hash = hashAddress(address);
for (;;) {
Hashtable* myHashtable = ensureHashtable();
unsigned index = hash % myHashtable->size;
Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
Bucket* bucket;
for (;;) {
bucket = bucketPointer.load();
if (!bucket) {
bucket = new Bucket();
if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
delete bucket;
continue;
}
}
break;
}
if (verbose)
dataLog(toString(currentThread(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
bucket->lock.lock();
if (hashtable.load() != myHashtable) {
bucket->lock.unlock();
continue;
}
ThreadData* threadData = functor();
bool result;
if (threadData) {
if (verbose)
dataLog(toString(currentThread(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
bucket->enqueue(threadData);
result = true;
} else
result = false;
bucket->lock.unlock();
return result;
}
}
enum class BucketMode {
EnsureNonEmpty,
IgnoreEmpty
};
template<typename DequeueFunctor, typename FinishFunctor>
bool dequeue(
const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
const FinishFunctor& finishFunctor)
{
unsigned hash = hashAddress(address);
for (;;) {
Hashtable* myHashtable = ensureHashtable();
unsigned index = hash % myHashtable->size;
Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
Bucket* bucket = bucketPointer.load();
if (!bucket) {
if (bucketMode == BucketMode::IgnoreEmpty)
return false;
for (;;) {
bucket = bucketPointer.load();
if (!bucket) {
bucket = new Bucket();
if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
delete bucket;
continue;
}
}
break;
}
}
bucket->lock.lock();
if (hashtable.load() != myHashtable) {
bucket->lock.unlock();
continue;
}
bucket->genericDequeue(dequeueFunctor);
bool result = !!bucket->queueHead;
finishFunctor(result);
bucket->lock.unlock();
return result;
}
}
}
NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl(
const void* address,
const ScopedLambda<bool()>& validation,
const ScopedLambda<void()>& beforeSleep,
const TimeWithDynamicClockType& timeout)
{
if (verbose)
dataLog(toString(currentThread(), ": parking.\n"));
ThreadData* me = myThreadData();
me->token = 0;
RELEASE_ASSERT(!me->address);
bool enqueueResult = enqueue(
address,
[&] () -> ThreadData* {
if (!validation())
return nullptr;
me->address = address;
return me;
});
if (!enqueueResult)
return ParkResult();
beforeSleep();
bool didGetDequeued;
{
MutexLocker locker(me->parkingLock);
while (me->address && timeout.nowWithSameClock() < timeout) {
me->parkingCondition.timedWait(
me->parkingLock, timeout.approximateWallTime().secondsSinceEpoch().value());
me->parkingLock.unlock();
me->parkingLock.lock();
}
ASSERT(!me->address || me->address == address);
didGetDequeued = !me->address;
}
if (didGetDequeued) {
ParkResult result;
result.wasUnparked = true;
result.token = me->token;
return result;
}
bool didDequeue = false;
dequeue(
address, BucketMode::IgnoreEmpty,
[&] (ThreadData* element, bool) {
if (element == me) {
didDequeue = true;
return DequeueResult::RemoveAndStop;
}
return DequeueResult::Ignore;
},
[] (bool) { });
RELEASE_ASSERT(!me->nextInQueue);
{
MutexLocker locker(me->parkingLock);
if (!didDequeue) {
while (me->address)
me->parkingCondition.wait(me->parkingLock);
}
me->address = nullptr;
}
ParkResult result;
result.wasUnparked = !didDequeue;
if (!didDequeue) {
result.token = me->token;
}
return result;
}
NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
{
if (verbose)
dataLog(toString(currentThread(), ": unparking one.\n"));
UnparkResult result;
RefPtr<ThreadData> threadData;
result.mayHaveMoreThreads = dequeue(
address,
BucketMode::EnsureNonEmpty,
[&] (ThreadData* element, bool) {
if (element->address != address)
return DequeueResult::Ignore;
threadData = element;
result.didUnparkThread = true;
return DequeueResult::RemoveAndStop;
},
[] (bool) { });
if (!threadData) {
ASSERT(!result.didUnparkThread);
result.mayHaveMoreThreads = false;
return result;
}
ASSERT(threadData->address);
{
MutexLocker locker(threadData->parkingLock);
threadData->address = nullptr;
threadData->token = 0;
}
threadData->parkingCondition.signal();
return result;
}
NEVER_INLINE void ParkingLot::unparkOneImpl(
const void* address,
const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback)
{
if (verbose)
dataLog(toString(currentThread(), ": unparking one the hard way.\n"));
RefPtr<ThreadData> threadData;
bool timeToBeFair = false;
dequeue(
address,
BucketMode::EnsureNonEmpty,
[&] (ThreadData* element, bool passedTimeToBeFair) {
if (element->address != address)
return DequeueResult::Ignore;
threadData = element;
timeToBeFair = passedTimeToBeFair;
return DequeueResult::RemoveAndStop;
},
[&] (bool mayHaveMoreThreads) {
UnparkResult result;
result.didUnparkThread = !!threadData;
result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
if (timeToBeFair)
RELEASE_ASSERT(threadData);
result.timeToBeFair = timeToBeFair;
intptr_t token = callback(result);
if (threadData)
threadData->token = token;
});
if (!threadData)
return;
ASSERT(threadData->address);
{
MutexLocker locker(threadData->parkingLock);
threadData->address = nullptr;
}
threadData->parkingCondition.signal();
}
NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count)
{
if (!count)
return 0;
if (verbose)
dataLog(toString(currentThread(), ": unparking count = ", count, " from ", RawPointer(address), ".\n"));
Vector<RefPtr<ThreadData>, 8> threadDatas;
dequeue(
address,
BucketMode::IgnoreEmpty,
[&] (ThreadData* element, bool) {
if (verbose)
dataLog(toString(currentThread(), ": Observing element with address = ", RawPointer(element->address), "\n"));
if (element->address != address)
return DequeueResult::Ignore;
threadDatas.append(element);
if (threadDatas.size() == count)
return DequeueResult::RemoveAndStop;
return DequeueResult::RemoveAndContinue;
},
[] (bool) { });
for (RefPtr<ThreadData>& threadData : threadDatas) {
if (verbose)
dataLog(toString(currentThread(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n"));
ASSERT(threadData->address);
{
MutexLocker locker(threadData->parkingLock);
threadData->address = nullptr;
}
threadData->parkingCondition.signal();
}
if (verbose)
dataLog(toString(currentThread(), ": done unparking.\n"));
return threadDatas.size();
}
NEVER_INLINE void ParkingLot::unparkAll(const void* address)
{
unparkCount(address, UINT_MAX);
}
NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(Thread&, const void*)>& callback)
{
Vector<Bucket*> bucketsToUnlock = lockHashtable();
Hashtable* currentHashtable = hashtable.load();
for (unsigned i = currentHashtable->size; i--;) {
Bucket* bucket = currentHashtable->data[i].load();
if (!bucket)
continue;
for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
callback(currentThreadData->thread.get(), currentThreadData->address);
}
unlockHashtable(bucketsToUnlock);
}
}