#include "internal.h"
#if DISPATCH_EVENT_BACKEND_WINDOWS
static HANDLE hPort = NULL;
enum _dispatch_windows_port {
DISPATCH_PORT_POKE = 0,
DISPATCH_PORT_TIMER_CLOCK_WALL,
DISPATCH_PORT_TIMER_CLOCK_UPTIME,
DISPATCH_PORT_TIMER_CLOCK_MONOTONIC,
DISPATCH_PORT_FILE_HANDLE,
DISPATCH_PORT_PIPE_HANDLE_READ,
DISPATCH_PORT_PIPE_HANDLE_WRITE,
DISPATCH_PORT_SOCKET_READ,
DISPATCH_PORT_SOCKET_WRITE,
};
enum _dispatch_muxnote_events {
DISPATCH_MUXNOTE_EVENT_READ = 1 << 0,
DISPATCH_MUXNOTE_EVENT_WRITE = 1 << 1,
};
#pragma mark dispatch_unote_t
typedef struct dispatch_muxnote_s {
LIST_ENTRY(dispatch_muxnote_s) dmn_list;
LIST_HEAD(, dispatch_unote_linkage_s) dmn_readers_head;
LIST_HEAD(, dispatch_unote_linkage_s) dmn_writers_head;
os_atomic(uintptr_t) dmn_refcount;
dispatch_unote_ident_t dmn_ident;
int8_t dmn_filter;
enum _dispatch_muxnote_handle_type {
DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID,
DISPATCH_MUXNOTE_HANDLE_TYPE_FILE,
DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE,
DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET,
} dmn_handle_type;
enum _dispatch_muxnote_events dmn_events;
HANDLE dmn_event;
HANDLE dmn_thread;
os_atomic(bool) dmn_stop;
long dmn_network_events;
PTP_WAIT dmn_threadpool_wait;
} *dispatch_muxnote_t;
static LIST_HEAD(dispatch_muxnote_bucket_s, dispatch_muxnote_s)
_dispatch_sources[DSL_HASH_SIZE];
DISPATCH_ALWAYS_INLINE
static inline struct dispatch_muxnote_bucket_s *
_dispatch_unote_muxnote_bucket(uint32_t ident)
{
return &_dispatch_sources[DSL_HASH(ident)];
}
DISPATCH_ALWAYS_INLINE
static inline dispatch_muxnote_t
_dispatch_unote_muxnote_find(struct dispatch_muxnote_bucket_s *dmb,
dispatch_unote_ident_t ident, int8_t filter)
{
dispatch_muxnote_t dmn;
if (filter == EVFILT_WRITE) filter = EVFILT_READ;
LIST_FOREACH(dmn, dmb, dmn_list) {
if (dmn->dmn_ident == ident && dmn->dmn_filter == filter) {
break;
}
}
return dmn;
}
static dispatch_muxnote_t
_dispatch_muxnote_create(dispatch_unote_t du,
enum _dispatch_muxnote_events events)
{
dispatch_muxnote_t dmn;
int8_t filter = du._du->du_filter;
HANDLE handle = (HANDLE)du._du->du_ident;
dmn = _dispatch_calloc(1, sizeof(*dmn));
if (dmn == NULL) {
DISPATCH_INTERNAL_CRASH(0, "_dispatch_calloc");
}
os_atomic_store(&dmn->dmn_refcount, 1, relaxed);
dmn->dmn_ident = (dispatch_unote_ident_t)handle;
dmn->dmn_filter = filter;
dmn->dmn_events = events;
LIST_INIT(&dmn->dmn_readers_head);
LIST_INIT(&dmn->dmn_writers_head);
switch (filter) {
case EVFILT_SIGNAL:
WIN_PORT_ERROR();
free(dmn);
return NULL;
case EVFILT_WRITE:
case EVFILT_READ:
switch (GetFileType(handle)) {
case FILE_TYPE_UNKNOWN:
(void)dispatch_assume(GetLastError() == NO_ERROR);
DISPATCH_INTERNAL_CRASH(0, "unknown handle type");
case FILE_TYPE_REMOTE:
DISPATCH_INTERNAL_CRASH(0, "unused handle type");
case FILE_TYPE_CHAR:
WIN_PORT_ERROR();
free(dmn);
return NULL;
case FILE_TYPE_DISK:
dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_FILE;
break;
case FILE_TYPE_PIPE:
dmn->dmn_handle_type = _dispatch_handle_is_socket(handle)
? DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET
: DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE;
break;
}
break;
default:
DISPATCH_INTERNAL_CRASH(0, "unexpected filter");
}
return dmn;
}
static void
_dispatch_muxnote_stop(dispatch_muxnote_t dmn)
{
if (dmn->dmn_thread) {
os_atomic_store(&dmn->dmn_stop, true, relaxed);
SetEvent(dmn->dmn_event);
do {
CancelIoEx((HANDLE)dmn->dmn_ident, NULL);
} while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT);
CloseHandle(dmn->dmn_thread);
dmn->dmn_thread = NULL;
}
if (dmn->dmn_threadpool_wait) {
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
FALSE);
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
dmn->dmn_threadpool_wait = NULL;
}
if (dmn->dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET) {
WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
}
}
static void
_dispatch_muxnote_dispose(dispatch_muxnote_t dmn)
{
if (dmn->dmn_thread || dmn->dmn_threadpool_wait) {
DISPATCH_INTERNAL_CRASH(0, "disposed a muxnote with an active thread");
}
if (dmn->dmn_event) {
CloseHandle(dmn->dmn_event);
}
free(dmn);
}
static void
_dispatch_muxnote_retain(dispatch_muxnote_t dmn)
{
uintptr_t refcount = os_atomic_inc(&dmn->dmn_refcount, relaxed);
if (refcount == 0) {
DISPATCH_INTERNAL_CRASH(0, "muxnote refcount overflow");
}
if (refcount == 1) {
DISPATCH_INTERNAL_CRASH(0, "retained a disposing muxnote");
}
}
static void
_dispatch_muxnote_release(dispatch_muxnote_t dmn)
{
uintptr_t refcount = os_atomic_dec(&dmn->dmn_refcount, relaxed);
if (refcount == 0) {
_dispatch_muxnote_dispose(dmn);
} else if (refcount == UINTPTR_MAX) {
DISPATCH_INTERNAL_CRASH(0, "muxnote refcount underflow");
}
}
static unsigned WINAPI
_dispatch_pipe_monitor_thread(void *context)
{
dispatch_muxnote_t dmn = (dispatch_muxnote_t)context;
HANDLE hPipe = (HANDLE)dmn->dmn_ident;
do {
char cBuffer[1];
DWORD dwNumberOfBytesTransferred;
OVERLAPPED ov = {0};
BOOL bSuccess = ReadFile(hPipe, cBuffer, 0,
&dwNumberOfBytesTransferred, &ov);
DWORD dwBytesAvailable;
DWORD dwError = GetLastError();
if (!bSuccess && dwError == ERROR_IO_PENDING) {
bSuccess = GetOverlappedResult(hPipe, &ov,
&dwNumberOfBytesTransferred, TRUE);
dwError = GetLastError();
}
if (bSuccess) {
bSuccess = PeekNamedPipe(hPipe, NULL, 0, NULL, &dwBytesAvailable,
NULL);
dwError = GetLastError();
}
if (bSuccess) {
if (dwBytesAvailable == 0) {
continue;
}
} else if (dwError == ERROR_NO_DATA) {
Sleep(0);
continue;
} else {
_dispatch_debug("pipe[0x%llx]: GetLastError() returned %lu",
(long long)hPipe, dwError);
if (dwError == ERROR_OPERATION_ABORTED) {
continue;
}
os_atomic_store(&dmn->dmn_stop, true, relaxed);
dwBytesAvailable = 0;
}
_dispatch_muxnote_retain(dmn);
bSuccess = PostQueuedCompletionStatus(hPort,
dwBytesAvailable, (ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_READ,
(LPOVERLAPPED)dmn);
if (!bSuccess) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
WaitForSingleObject(dmn->dmn_event, INFINITE);
} while (!os_atomic_load(&dmn->dmn_stop, relaxed));
_dispatch_debug("pipe[0x%llx]: monitor exiting", (long long)hPipe);
return 0;
}
static DWORD
_dispatch_pipe_write_availability(HANDLE hPipe)
{
IO_STATUS_BLOCK iosb;
FILE_PIPE_LOCAL_INFORMATION fpli;
NTSTATUS status = _dispatch_NtQueryInformationFile(hPipe, &iosb, &fpli,
sizeof(fpli), FilePipeLocalInformation);
if (!NT_SUCCESS(status)) {
return 1;
}
return fpli.WriteQuotaAvailable;
}
static VOID CALLBACK
_dispatch_socket_callback(PTP_CALLBACK_INSTANCE inst, void *context,
PTP_WAIT pwa, TP_WAIT_RESULT res)
{
dispatch_muxnote_t dmn = (dispatch_muxnote_t)context;
SOCKET sock = (SOCKET)dmn->dmn_ident;
WSANETWORKEVENTS events;
if (WSAEnumNetworkEvents(sock, (WSAEVENT)dmn->dmn_event, &events) == 0) {
long lNetworkEvents = events.lNetworkEvents;
DWORD dwBytesAvailable = 1;
if (lNetworkEvents & FD_CLOSE) {
dwBytesAvailable = 0;
lNetworkEvents |= FD_READ | FD_WRITE;
} else if (lNetworkEvents & FD_READ) {
ioctlsocket(sock, FIONREAD, &dwBytesAvailable);
}
if (lNetworkEvents & FD_READ) {
_dispatch_muxnote_retain(dmn);
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
(ULONG_PTR)DISPATCH_PORT_SOCKET_READ, (LPOVERLAPPED)dmn)) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
if (lNetworkEvents & FD_WRITE) {
_dispatch_muxnote_retain(dmn);
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE, (LPOVERLAPPED)dmn)) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
} else {
_dispatch_debug("socket[0x%llx]: WSAEnumNetworkEvents() failed (%d)",
(long long)sock, WSAGetLastError());
}
SetThreadpoolWait(pwa, dmn->dmn_event, NULL);
}
static BOOL
_dispatch_io_trigger(dispatch_muxnote_t dmn)
{
BOOL bSuccess;
long lNetworkEvents;
switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
_dispatch_muxnote_retain(dmn);
bSuccess = PostQueuedCompletionStatus(hPort, 0,
(ULONG_PTR)DISPATCH_PORT_FILE_HANDLE, (LPOVERLAPPED)dmn);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
break;
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
if ((dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) &&
!dmn->dmn_thread) {
dmn->dmn_thread = (HANDLE)_beginthreadex( NULL,
1, _dispatch_pipe_monitor_thread,
(void *)dmn, 0, NULL);
if (!dmn->dmn_thread) {
DISPATCH_INTERNAL_CRASH(errno, "_beginthread");
}
dmn->dmn_event = CreateEventW(NULL, FALSE,
FALSE, NULL);
if (!dmn->dmn_event) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
}
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
_dispatch_muxnote_retain(dmn);
DWORD available =
_dispatch_pipe_write_availability((HANDLE)dmn->dmn_ident);
bSuccess = PostQueuedCompletionStatus(hPort, available,
(ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_WRITE,
(LPOVERLAPPED)dmn);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
break;
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
if (!dmn->dmn_event) {
dmn->dmn_event = CreateEventW(NULL, FALSE,
FALSE, NULL);
if (!dmn->dmn_event) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
}
}
if (!dmn->dmn_threadpool_wait) {
dmn->dmn_threadpool_wait = CreateThreadpoolWait(
_dispatch_socket_callback, dmn,
NULL);
if (!dmn->dmn_threadpool_wait) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateThreadpoolWait");
}
SetThreadpoolWait(dmn->dmn_threadpool_wait, dmn->dmn_event,
NULL);
}
lNetworkEvents = FD_CLOSE;
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) {
lNetworkEvents |= FD_READ;
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
lNetworkEvents |= FD_WRITE;
}
if (dmn->dmn_network_events != lNetworkEvents) {
if (WSAEventSelect((SOCKET)dmn->dmn_ident, (WSAEVENT)dmn->dmn_event,
lNetworkEvents) != 0) {
DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect");
}
dmn->dmn_network_events = lNetworkEvents;
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
if (send((SOCKET)dmn->dmn_ident, "", 0, 0) == 0) {
_dispatch_muxnote_retain(dmn);
bSuccess = PostQueuedCompletionStatus(hPort, 1,
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE,
(LPOVERLAPPED)dmn);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
}
break;
}
return TRUE;
}
DISPATCH_ALWAYS_INLINE
static inline enum _dispatch_muxnote_events
_dispatch_unote_required_events(dispatch_unote_t du)
{
switch (du._du->du_filter) {
case DISPATCH_EVFILT_CUSTOM_ADD:
case DISPATCH_EVFILT_CUSTOM_OR:
case DISPATCH_EVFILT_CUSTOM_REPLACE:
return 0;
case EVFILT_WRITE:
return DISPATCH_MUXNOTE_EVENT_WRITE;
default:
return DISPATCH_MUXNOTE_EVENT_READ;
}
}
bool
_dispatch_unote_register_muxed(dispatch_unote_t du)
{
struct dispatch_muxnote_bucket_s *dmb;
dispatch_muxnote_t dmn;
enum _dispatch_muxnote_events events;
events = _dispatch_unote_required_events(du);
dmb = _dispatch_unote_muxnote_bucket(du._du->du_ident);
dmn = _dispatch_unote_muxnote_find(dmb, du._du->du_ident,
du._du->du_filter);
if (dmn) {
WIN_PORT_ERROR();
DISPATCH_INTERNAL_CRASH(0, "muxnote updating is not supported");
} else {
dmn = _dispatch_muxnote_create(du, events);
if (!dmn) {
return false;
}
if (_dispatch_io_trigger(dmn) == FALSE) {
_dispatch_muxnote_release(dmn);
return false;
}
LIST_INSERT_HEAD(dmb, dmn, dmn_list);
}
dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du);
switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
if (events & DISPATCH_MUXNOTE_EVENT_READ) {
LIST_INSERT_HEAD(&dmn->dmn_readers_head, dul, du_link);
} else if (events & DISPATCH_MUXNOTE_EVENT_WRITE) {
LIST_INSERT_HEAD(&dmn->dmn_writers_head, dul, du_link);
}
break;
}
dul->du_muxnote = dmn;
_dispatch_unote_state_set(du, DISPATCH_WLH_ANON, DU_STATE_ARMED);
return true;
}
void
_dispatch_unote_resume_muxed(dispatch_unote_t du)
{
dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du);
dispatch_muxnote_t dmn = dul->du_muxnote;
dispatch_assert(_dispatch_unote_registered(du));
_dispatch_io_trigger(dmn);
}
bool
_dispatch_unote_unregister_muxed(dispatch_unote_t du)
{
dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du);
dispatch_muxnote_t dmn = dul->du_muxnote;
switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
LIST_REMOVE(dul, du_link);
_LIST_TRASH_ENTRY(dul, du_link);
break;
}
dul->du_muxnote = NULL;
LIST_REMOVE(dmn, dmn_list);
_dispatch_muxnote_stop(dmn);
_dispatch_muxnote_release(dmn);
_dispatch_unote_state_set(du, DU_STATE_UNREGISTERED);
return true;
}
static void
_dispatch_event_merge_file_handle(dispatch_muxnote_t dmn)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_retain_unote_owner(du);
dispatch_assert(dux_needs_rearm(du._du));
_dispatch_unote_state_clear_bit(du, DU_STATE_ARMED);
os_atomic_store2o(du._dr, ds_pending_data, ~1, relaxed);
dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, 1, 0);
}
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_retain_unote_owner(du);
dispatch_assert(dux_needs_rearm(du._du));
_dispatch_unote_state_clear_bit(du, DU_STATE_ARMED);
os_atomic_store2o(du._dr, ds_pending_data, ~1, relaxed);
dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, 1, 0);
}
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_pipe_handle_read(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_retain_unote_owner(du);
dispatch_unote_state_t du_state = _dispatch_unote_state(du);
du_state &= ~DU_STATE_ARMED;
uintptr_t data = dwBytesAvailable;
uint32_t flags;
if (dwBytesAvailable > 0) {
flags = EV_ADD | EV_ENABLE | EV_DISPATCH;
} else {
du_state |= DU_STATE_NEEDS_DELETE;
flags = EV_DELETE | EV_DISPATCH;
}
_dispatch_unote_state_set(du, du_state);
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
dux_merge_evt(du._du, flags, data, 0);
}
SetEvent(dmn->dmn_event);
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_retain_unote_owner(du);
_dispatch_unote_state_clear_bit(du, DU_STATE_ARMED);
uintptr_t data = dwBytesAvailable;
if (dwBytesAvailable > 0) {
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
} else {
os_atomic_store2o(du._dr, ds_pending_data, 0, relaxed);
}
dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, data, 0);
}
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_socket(dispatch_unote_t du, DWORD dwBytesAvailable)
{
_dispatch_retain_unote_owner(du);
dispatch_unote_state_t du_state = _dispatch_unote_state(du);
du_state &= ~DU_STATE_ARMED;
uintptr_t data = dwBytesAvailable;
uint32_t flags;
if (dwBytesAvailable > 0) {
flags = EV_ADD | EV_ENABLE | EV_DISPATCH;
} else {
du_state |= DU_STATE_NEEDS_DELETE;
flags = EV_DELETE | EV_DISPATCH;
}
_dispatch_unote_state_set(du, du_state);
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
dux_merge_evt(du._du, flags, data, 0);
}
static void
_dispatch_event_merge_socket_read(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_socket(du, dwBytesAvailable);
}
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_socket_write(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_socket(du, dwBytesAvailable);
}
_dispatch_muxnote_release(dmn);
}
#pragma mark timers
typedef struct _dispatch_windows_timeout_s {
PTP_TIMER pTimer;
enum _dispatch_windows_port ullIdent;
bool bArmed;
} *dispatch_windows_timeout_t;
#define DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(clock) \
[DISPATCH_CLOCK_##clock] = { \
.pTimer = NULL, \
.ullIdent = DISPATCH_PORT_TIMER_CLOCK_##clock, \
.bArmed = FALSE, \
}
static struct _dispatch_windows_timeout_s _dispatch_windows_timeout[] = {
DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(WALL),
DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(UPTIME),
DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(MONOTONIC),
};
static void
_dispatch_event_merge_timer(dispatch_clock_t clock)
{
uint32_t tidx = DISPATCH_TIMER_INDEX(clock, 0);
_dispatch_windows_timeout[clock].bArmed = FALSE;
_dispatch_timers_heap_dirty(_dispatch_timers_heap, tidx);
_dispatch_timers_heap[tidx].dth_needs_program = true;
_dispatch_timers_heap[tidx].dth_armed = false;
}
static void CALLBACK
_dispatch_timer_callback(PTP_CALLBACK_INSTANCE Instance, PVOID Context,
PTP_TIMER Timer)
{
BOOL bSuccess;
bSuccess = PostQueuedCompletionStatus(hPort, 0, (ULONG_PTR)Context,
NULL);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
void
_dispatch_event_loop_timer_arm(dispatch_timer_heap_t dth DISPATCH_UNUSED,
uint32_t tidx, dispatch_timer_delay_s range,
dispatch_clock_now_cache_t nows)
{
dispatch_windows_timeout_t timer;
FILETIME ftDueTime;
LARGE_INTEGER liTime;
switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
liTime.QuadPart = range.delay +
_dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
break;
case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
liTime.QuadPart = -((range.delay + 99) / 100);
break;
}
if (timer->pTimer == NULL) {
timer->pTimer = CreateThreadpoolTimer(_dispatch_timer_callback,
(LPVOID)timer->ullIdent, NULL);
if (timer->pTimer == NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"CreateThreadpoolTimer");
}
}
ftDueTime.dwHighDateTime = liTime.HighPart;
ftDueTime.dwLowDateTime = liTime.LowPart;
SetThreadpoolTimer(timer->pTimer, &ftDueTime, 0,
0);
timer->bArmed = TRUE;
}
void
_dispatch_event_loop_timer_delete(dispatch_timer_heap_t dth DISPATCH_UNUSED,
uint32_t tidx)
{
dispatch_windows_timeout_t timer;
switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
break;
case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
break;
}
SetThreadpoolTimer(timer->pTimer, NULL, 0,
0);
timer->bArmed = FALSE;
}
#pragma mark dispatch_loop
static void
_dispatch_windows_port_init(void *context DISPATCH_UNUSED)
{
hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
if (hPort == NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"CreateIoCompletionPort");
}
#if DISPATCH_USE_MGR_THREAD
_dispatch_trace_item_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
dx_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0);
#endif
}
void
_dispatch_event_loop_poke(dispatch_wlh_t wlh DISPATCH_UNUSED,
uint64_t dq_state DISPATCH_UNUSED, uint32_t flags DISPATCH_UNUSED)
{
static dispatch_once_t _dispatch_windows_port_init_pred;
BOOL bSuccess;
dispatch_once_f(&_dispatch_windows_port_init_pred, NULL,
_dispatch_windows_port_init);
bSuccess = PostQueuedCompletionStatus(hPort, 0, DISPATCH_PORT_POKE,
NULL);
(void)dispatch_assume(bSuccess);
}
DISPATCH_NOINLINE
void
_dispatch_event_loop_drain(uint32_t flags)
{
DWORD dwNumberOfBytesTransferred;
ULONG_PTR ulCompletionKey;
LPOVERLAPPED pOV;
BOOL bSuccess;
pOV = (LPOVERLAPPED)&pOV;
bSuccess = GetQueuedCompletionStatus(hPort, &dwNumberOfBytesTransferred,
&ulCompletionKey, &pOV,
(flags & KEVENT_FLAG_IMMEDIATE) ? 0 : INFINITE);
while (bSuccess) {
switch (ulCompletionKey) {
case DISPATCH_PORT_POKE:
break;
case DISPATCH_PORT_TIMER_CLOCK_WALL:
_dispatch_event_merge_timer(DISPATCH_CLOCK_WALL);
break;
case DISPATCH_PORT_TIMER_CLOCK_UPTIME:
_dispatch_event_merge_timer(DISPATCH_CLOCK_UPTIME);
break;
case DISPATCH_PORT_TIMER_CLOCK_MONOTONIC:
_dispatch_event_merge_timer(DISPATCH_CLOCK_MONOTONIC);
break;
case DISPATCH_PORT_FILE_HANDLE:
_dispatch_event_merge_file_handle((dispatch_muxnote_t)pOV);
break;
case DISPATCH_PORT_PIPE_HANDLE_READ:
_dispatch_event_merge_pipe_handle_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
case DISPATCH_PORT_PIPE_HANDLE_WRITE:
_dispatch_event_merge_pipe_handle_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
case DISPATCH_PORT_SOCKET_READ:
_dispatch_event_merge_socket_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
case DISPATCH_PORT_SOCKET_WRITE:
_dispatch_event_merge_socket_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
default:
DISPATCH_INTERNAL_CRASH(ulCompletionKey,
"unsupported completion key");
}
bSuccess = GetQueuedCompletionStatus(hPort,
&dwNumberOfBytesTransferred, &ulCompletionKey, &pOV, 0);
}
if (bSuccess == FALSE && pOV != NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"GetQueuedCompletionStatus");
}
}
void
_dispatch_event_loop_cancel_waiter(dispatch_sync_context_t dsc DISPATCH_UNUSED)
{
WIN_PORT_ERROR();
}
void
_dispatch_event_loop_wake_owner(dispatch_sync_context_t dsc,
dispatch_wlh_t wlh, uint64_t old_state, uint64_t new_state)
{
(void)dsc; (void)wlh; (void)old_state; (void)new_state;
}
void
_dispatch_event_loop_wait_for_ownership(dispatch_sync_context_t dsc)
{
if (dsc->dsc_release_storage) {
_dispatch_queue_release_storage(dsc->dc_data);
}
}
void
_dispatch_event_loop_end_ownership(dispatch_wlh_t wlh, uint64_t old_state,
uint64_t new_state, uint32_t flags)
{
(void)wlh; (void)old_state; (void)new_state; (void)flags;
}
#if DISPATCH_WLH_DEBUG
void
_dispatch_event_loop_assert_not_owned(dispatch_wlh_t wlh)
{
(void)wlh;
}
#endif
void
_dispatch_event_loop_leave_immediate(uint64_t dq_state)
{
(void)dq_state;
}
#endif // DISPATCH_EVENT_BACKEND_WINDOWS