ThreadConditionWin.cpp [plain text]
#include "config.h"
#include "Logging.h"
#include "Threading.h"
#include <limits>
#include <errno.h>
namespace WebCore {
static const long MaxSemaphoreCount = static_cast<long>(~0UL >> 1);
ThreadCondition::ThreadCondition()
{
m_condition.m_timedOut = 0;
m_condition.m_blocked = 0;
m_condition.m_waitingForRemoval = 0;
m_condition.m_gate = ::CreateSemaphore(0, 1, 1, 0);
m_condition.m_queue = ::CreateSemaphore(0, 0, MaxSemaphoreCount, 0);
m_condition.m_mutex = ::CreateMutex(0, 0, 0);
if (!m_condition.m_gate || !m_condition.m_queue || !m_condition.m_mutex) {
if (m_condition.m_gate)
::CloseHandle(m_condition.m_gate);
if (m_condition.m_queue)
::CloseHandle(m_condition.m_queue);
if (m_condition.m_mutex)
::CloseHandle(m_condition.m_mutex);
}
}
ThreadCondition::~ThreadCondition()
{
::CloseHandle(m_condition.m_gate);
::CloseHandle(m_condition.m_queue);
::CloseHandle(m_condition.m_mutex);
}
void ThreadCondition::wait(Mutex& mutex)
{
PlatformMutex& cs = mutex.impl();
DWORD res = ::WaitForSingleObject(m_condition.m_gate, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
++m_condition.m_blocked;
res = ::ReleaseSemaphore(m_condition.m_gate, 1, 0);
ASSERT(res);
::LeaveCriticalSection(&cs.m_internalMutex);
res = ::WaitForSingleObject(m_condition.m_queue, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
res = ::WaitForSingleObject(m_condition.m_mutex, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
size_t wasWaiting = m_condition.m_waitingForRemoval;
size_t wasTimedOut = m_condition.m_timedOut;
if (wasWaiting != 0) {
if (--m_condition.m_waitingForRemoval == 0) {
if (m_condition.m_blocked != 0) {
res = ::ReleaseSemaphore(m_condition.m_gate, 1, 0); ASSERT(res);
wasWaiting = 0;
}
else if (m_condition.m_timedOut != 0)
m_condition.m_timedOut = 0;
}
} else if (++m_condition.m_timedOut == ((std::numeric_limits<unsigned>::max)() / 2)) {
res = ::WaitForSingleObject(m_condition.m_gate, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
m_condition.m_blocked -= m_condition.m_timedOut;
res = ::ReleaseSemaphore(m_condition.m_gate, 1, 0);
ASSERT(res);
m_condition.m_timedOut = 0;
}
res = ::ReleaseMutex(m_condition.m_mutex);
ASSERT(res);
if (wasWaiting == 1) {
for ( ; wasTimedOut; --wasTimedOut) {
res = ::WaitForSingleObject(m_condition.m_queue, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
}
res = ::ReleaseSemaphore(m_condition.m_gate, 1, 0);
ASSERT(res);
}
::EnterCriticalSection (&cs.m_internalMutex);
}
void ThreadCondition::signal()
{
unsigned signals = 0;
DWORD res = ::WaitForSingleObject(m_condition.m_mutex, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
if (m_condition.m_waitingForRemoval != 0) { if (m_condition.m_blocked == 0) {
res = ::ReleaseMutex(m_condition.m_mutex);
ASSERT(res);
return;
}
++m_condition.m_waitingForRemoval;
--m_condition.m_blocked;
signals = 1;
} else {
res = ::WaitForSingleObject(m_condition.m_gate, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
if (m_condition.m_blocked > m_condition.m_timedOut) {
if (m_condition.m_timedOut != 0) {
m_condition.m_blocked -= m_condition.m_timedOut;
m_condition.m_timedOut = 0;
}
signals = m_condition.m_waitingForRemoval = 1;
--m_condition.m_blocked;
} else {
res = ::ReleaseSemaphore(m_condition.m_gate, 1, 0);
ASSERT(res);
}
}
res =::ReleaseMutex(m_condition.m_mutex);
ASSERT(res);
if (signals) {
res = ::ReleaseSemaphore(m_condition.m_queue, signals, 0);
ASSERT(res);
}
}
void ThreadCondition::broadcast()
{
unsigned signals = 0;
WORD res = ::WaitForSingleObject(m_condition.m_mutex, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
if (m_condition.m_waitingForRemoval != 0) { if (m_condition.m_blocked == 0) {
res = ::ReleaseMutex(m_condition.m_mutex);
ASSERT(res);
return;
}
m_condition.m_waitingForRemoval += (signals = m_condition.m_blocked);
m_condition.m_blocked = 0;
} else {
res = ::WaitForSingleObject(m_condition.m_gate, INFINITE);
ASSERT(res == WAIT_OBJECT_0);
if (m_condition.m_blocked > m_condition.m_timedOut) {
if (m_condition.m_timedOut != 0) {
m_condition.m_blocked -= m_condition.m_timedOut;
m_condition.m_timedOut = 0;
}
signals = m_condition.m_waitingForRemoval = m_condition.m_blocked;
m_condition.m_blocked = 0;
} else {
res = ::ReleaseSemaphore(m_condition.m_gate, 1, 0);
ASSERT(res);
}
}
res = ::ReleaseMutex(m_condition.m_mutex);
ASSERT(res);
if (signals) {
res = ::ReleaseSemaphore(m_condition.m_queue, signals, 0);
ASSERT(res);
}
}
}