source.c   [plain text]


/*
 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
 *
 * @APPLE_APACHE_LICENSE_HEADER_START@
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * @APPLE_APACHE_LICENSE_HEADER_END@
 */

#include "internal.h"

static void _dispatch_source_handler_free(dispatch_source_t ds, long kind);
static void _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval);

#define DISPATCH_TIMERS_UNREGISTER 0x1
#define DISPATCH_TIMERS_RETAIN_2 0x2
static void _dispatch_timers_update(dispatch_unote_t du, uint32_t flags);
static void _dispatch_timers_unregister(dispatch_timer_source_refs_t dt);

static void _dispatch_source_timer_configure(dispatch_source_t ds);
static inline unsigned long _dispatch_source_timer_data(
		dispatch_source_t ds, dispatch_unote_t du);

#pragma mark -
#pragma mark dispatch_source_t

dispatch_source_t
dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
		unsigned long mask, dispatch_queue_t dq)
{
	dispatch_source_refs_t dr;
	dispatch_source_t ds;

	dr = dux_create(dst, handle, mask)._dr;
	if (unlikely(!dr)) {
		return DISPATCH_BAD_INPUT;
	}

	ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
			sizeof(struct dispatch_source_s));
	// Initialize as a queue first, then override some settings below.
	_dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
			DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
	ds->dq_label = "source";
	ds->do_ref_cnt++; // the reference the manager queue holds
	ds->ds_refs = dr;
	dr->du_owner_wref = _dispatch_ptr2wref(ds);

	if (slowpath(!dq)) {
		dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
	} else {
		_dispatch_retain((dispatch_queue_t _Nonnull)dq);
	}
	ds->do_targetq = dq;
	if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
		_dispatch_source_set_interval(ds, handle);
	}
	_dispatch_object_debug(ds, "%s", __func__);
	return ds;
}

void
_dispatch_source_dispose(dispatch_source_t ds, bool *allow_free)
{
	_dispatch_object_debug(ds, "%s", __func__);
	_dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
	_dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
	_dispatch_source_handler_free(ds, DS_CANCEL_HANDLER);
	_dispatch_unote_dispose(ds->ds_refs);
	ds->ds_refs = NULL;
	_dispatch_queue_destroy(ds->_as_dq, allow_free);
}

void
_dispatch_source_xref_dispose(dispatch_source_t ds)
{
	dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	if (unlikely(!(dqf & (DQF_LEGACY|DSF_CANCELED)))) {
		DISPATCH_CLIENT_CRASH(ds, "Release of a source that has not been "
				"cancelled, but has a mandatory cancel handler");
	}
	dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}

long
dispatch_source_testcancel(dispatch_source_t ds)
{
	return (bool)(ds->dq_atomic_flags & DSF_CANCELED);
}

unsigned long
dispatch_source_get_mask(dispatch_source_t ds)
{
	dispatch_source_refs_t dr = ds->ds_refs;
	if (ds->dq_atomic_flags & DSF_CANCELED) {
		return 0;
	}
#if DISPATCH_USE_MEMORYSTATUS
	if (dr->du_vmpressure_override) {
		return NOTE_VM_PRESSURE;
	}
#if TARGET_IPHONE_SIMULATOR
	if (dr->du_memorypressure_override) {
		return NOTE_MEMORYSTATUS_PRESSURE_WARN;
	}
#endif
#endif // DISPATCH_USE_MEMORYSTATUS
	return dr->du_fflags;
}

uintptr_t
dispatch_source_get_handle(dispatch_source_t ds)
{
	dispatch_source_refs_t dr = ds->ds_refs;
#if TARGET_IPHONE_SIMULATOR
	if (dr->du_memorypressure_override) {
		return 0;
	}
#endif
	return dr->du_ident;
}

unsigned long
dispatch_source_get_data(dispatch_source_t ds)
{
#if DISPATCH_USE_MEMORYSTATUS
	dispatch_source_refs_t dr = ds->ds_refs;
	if (dr->du_vmpressure_override) {
		return NOTE_VM_PRESSURE;
	}
#if TARGET_IPHONE_SIMULATOR
	if (dr->du_memorypressure_override) {
		return NOTE_MEMORYSTATUS_PRESSURE_WARN;
	}
#endif
#endif // DISPATCH_USE_MEMORYSTATUS
	uint64_t value = os_atomic_load2o(ds, ds_data, relaxed);
	return (unsigned long)(
		ds->ds_refs->du_data_action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET
		? DISPATCH_SOURCE_GET_DATA(value) : value);
}

size_t
dispatch_source_get_extended_data(dispatch_source_t ds,
		dispatch_source_extended_data_t edata, size_t size)
{
	size_t target_size = MIN(size,
		sizeof(struct dispatch_source_extended_data_s));
	if (size > 0) {
		unsigned long data, status = 0;
		if (ds->ds_refs->du_data_action
				== DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
			uint64_t combined = os_atomic_load(&ds->ds_data, relaxed);
			data = DISPATCH_SOURCE_GET_DATA(combined);
			status = DISPATCH_SOURCE_GET_STATUS(combined);
		} else {
			data = dispatch_source_get_data(ds);
		}
		if (size >= offsetof(struct dispatch_source_extended_data_s, data)
				+ sizeof(edata->data)) {
			edata->data = data;
		}
		if (size >= offsetof(struct dispatch_source_extended_data_s, status)
				+ sizeof(edata->status)) {
			edata->status = status;
		}
		if (size > sizeof(struct dispatch_source_extended_data_s)) {
			memset(
				(char *)edata + sizeof(struct dispatch_source_extended_data_s),
				0, size - sizeof(struct dispatch_source_extended_data_s));
		}
	}
	return target_size;
}

DISPATCH_NOINLINE
void
_dispatch_source_merge_data(dispatch_source_t ds, pthread_priority_t pp,
		unsigned long val)
{
	dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	int filter = ds->ds_refs->du_filter;

	if (unlikely(dqf & (DSF_CANCELED | DSF_DELETED))) {
		return;
	}

	switch (filter) {
	case DISPATCH_EVFILT_CUSTOM_ADD:
		os_atomic_add2o(ds, ds_pending_data, val, relaxed);
		break;
	case DISPATCH_EVFILT_CUSTOM_OR:
		os_atomic_or2o(ds, ds_pending_data, val, relaxed);
		break;
	case DISPATCH_EVFILT_CUSTOM_REPLACE:
		os_atomic_store2o(ds, ds_pending_data, val, relaxed);
		break;
	default:
		DISPATCH_CLIENT_CRASH(filter, "Invalid source type");
	}

	dx_wakeup(ds, _dispatch_qos_from_pp(pp), DISPATCH_WAKEUP_MAKE_DIRTY);
}

void
dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
{
	_dispatch_source_merge_data(ds, 0, val);
}

#pragma mark -
#pragma mark dispatch_source_handler

DISPATCH_ALWAYS_INLINE
static inline dispatch_continuation_t
_dispatch_source_get_handler(dispatch_source_refs_t dr, long kind)
{
	return os_atomic_load(&dr->ds_handler[kind], relaxed);
}
#define _dispatch_source_get_event_handler(dr) \
	_dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
#define _dispatch_source_get_cancel_handler(dr) \
	_dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
#define _dispatch_source_get_registration_handler(dr) \
	_dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)

DISPATCH_ALWAYS_INLINE
static inline dispatch_continuation_t
_dispatch_source_handler_alloc(dispatch_source_t ds, void *func, long kind,
		bool block)
{
	// sources don't propagate priority by default
	const dispatch_block_flags_t flags =
			DISPATCH_BLOCK_HAS_PRIORITY | DISPATCH_BLOCK_NO_VOUCHER;
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	if (func) {
		uintptr_t dc_flags = 0;

		if (kind != DS_EVENT_HANDLER) {
			dc_flags |= DISPATCH_OBJ_CONSUME_BIT;
		}
		if (block) {
#ifdef __BLOCKS__
			_dispatch_continuation_init(dc, ds, func, 0, flags, dc_flags);
#endif /* __BLOCKS__ */
		} else {
			dc_flags |= DISPATCH_OBJ_CTXT_FETCH_BIT;
			_dispatch_continuation_init_f(dc, ds, ds->do_ctxt, func,
					0, flags, dc_flags);
		}
		_dispatch_trace_continuation_push(ds->_as_dq, dc);
	} else {
		dc->dc_flags = 0;
		dc->dc_func = NULL;
	}
	return dc;
}

DISPATCH_NOINLINE
static void
_dispatch_source_handler_dispose(dispatch_continuation_t dc)
{
#ifdef __BLOCKS__
	if (dc->dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
		Block_release(dc->dc_ctxt);
	}
#endif /* __BLOCKS__ */
	if (dc->dc_voucher) {
		_voucher_release(dc->dc_voucher);
		dc->dc_voucher = VOUCHER_INVALID;
	}
	_dispatch_continuation_free(dc);
}

DISPATCH_ALWAYS_INLINE
static inline dispatch_continuation_t
_dispatch_source_handler_take(dispatch_source_t ds, long kind)
{
	return os_atomic_xchg(&ds->ds_refs->ds_handler[kind], NULL, relaxed);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_source_handler_free(dispatch_source_t ds, long kind)
{
	dispatch_continuation_t dc = _dispatch_source_handler_take(ds, kind);
	if (dc) _dispatch_source_handler_dispose(dc);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_source_handler_replace(dispatch_source_t ds, long kind,
		dispatch_continuation_t dc)
{
	if (!dc->dc_func) {
		_dispatch_continuation_free(dc);
		dc = NULL;
	} else if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
		dc->dc_ctxt = ds->do_ctxt;
	}
	dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
	if (dc) _dispatch_source_handler_dispose(dc);
}

DISPATCH_NOINLINE
static void
_dispatch_source_set_handler_slow(void *context)
{
	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);

	dispatch_continuation_t dc = context;
	long kind = (long)dc->dc_data;
	dc->dc_data = NULL;
	_dispatch_source_handler_replace(ds, kind, dc);
}

DISPATCH_NOINLINE
static void
_dispatch_source_set_handler(dispatch_source_t ds, long kind,
		dispatch_continuation_t dc)
{
	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
	if (_dispatch_queue_try_inactive_suspend(ds->_as_dq)) {
		_dispatch_source_handler_replace(ds, kind, dc);
		return dx_vtable(ds)->do_resume(ds, false);
	}
	if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
		DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source "
				"after it has been activated");
	}
	_dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds);
	if (kind == DS_REGISTN_HANDLER) {
		_dispatch_bug_deprecated("Setting registration handler after "
				"the source has been activated");
	}
	dc->dc_data = (void *)kind;
	_dispatch_barrier_trysync_or_async_f(ds->_as_dq, dc,
			_dispatch_source_set_handler_slow, 0);
}

#ifdef __BLOCKS__
void
dispatch_source_set_event_handler(dispatch_source_t ds,
		dispatch_block_t handler)
{
	dispatch_continuation_t dc;
	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
	_dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
}
#endif /* __BLOCKS__ */

void
dispatch_source_set_event_handler_f(dispatch_source_t ds,
		dispatch_function_t handler)
{
	dispatch_continuation_t dc;
	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
	_dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
}

#ifdef __BLOCKS__
DISPATCH_NOINLINE
static void
_dispatch_source_set_cancel_handler(dispatch_source_t ds,
		dispatch_block_t handler)
{
	dispatch_continuation_t dc;
	dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
	_dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
}

void
dispatch_source_set_cancel_handler(dispatch_source_t ds,
		dispatch_block_t handler)
{
	if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
		DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
				"this source");
	}
	return _dispatch_source_set_cancel_handler(ds, handler);
}

void
dispatch_source_set_mandatory_cancel_handler(dispatch_source_t ds,
		dispatch_block_t handler)
{
	_dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
	return _dispatch_source_set_cancel_handler(ds, handler);
}
#endif /* __BLOCKS__ */

DISPATCH_NOINLINE
static void
_dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
		dispatch_function_t handler)
{
	dispatch_continuation_t dc;
	dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
	_dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
}

void
dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
		dispatch_function_t handler)
{
	if (unlikely(!_dispatch_queue_is_legacy(ds->_as_dq))) {
		DISPATCH_CLIENT_CRASH(0, "Cannot set a non mandatory handler on "
				"this source");
	}
	return _dispatch_source_set_cancel_handler_f(ds, handler);
}

void
dispatch_source_set_mandatory_cancel_handler_f(dispatch_source_t ds,
		dispatch_function_t handler)
{
	_dispatch_queue_atomic_flags_clear(ds->_as_dq, DQF_LEGACY);
	return _dispatch_source_set_cancel_handler_f(ds, handler);
}

#ifdef __BLOCKS__
void
dispatch_source_set_registration_handler(dispatch_source_t ds,
		dispatch_block_t handler)
{
	dispatch_continuation_t dc;
	dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
	_dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
}
#endif /* __BLOCKS__ */

void
dispatch_source_set_registration_handler_f(dispatch_source_t ds,
	dispatch_function_t handler)
{
	dispatch_continuation_t dc;
	dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
	_dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
}

#pragma mark -
#pragma mark dispatch_source_invoke

static void
_dispatch_source_registration_callout(dispatch_source_t ds, dispatch_queue_t cq,
		dispatch_invoke_flags_t flags)
{
	dispatch_continuation_t dc;

	dc = _dispatch_source_handler_take(ds, DS_REGISTN_HANDLER);
	if (ds->dq_atomic_flags & (DSF_CANCELED | DQF_RELEASED)) {
		// no registration callout if source is canceled rdar://problem/8955246
		return _dispatch_source_handler_dispose(dc);
	}
	if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
		dc->dc_ctxt = ds->do_ctxt;
	}
	_dispatch_continuation_pop(dc, NULL, flags, cq);
}

static void
_dispatch_source_cancel_callout(dispatch_source_t ds, dispatch_queue_t cq,
		dispatch_invoke_flags_t flags)
{
	dispatch_continuation_t dc;

	dc = _dispatch_source_handler_take(ds, DS_CANCEL_HANDLER);
	ds->ds_pending_data = 0;
	ds->ds_data = 0;
	_dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
	_dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
	if (!dc) {
		return;
	}
	if (!(ds->dq_atomic_flags & DSF_CANCELED)) {
		return _dispatch_source_handler_dispose(dc);
	}
	if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
		dc->dc_ctxt = ds->do_ctxt;
	}
	_dispatch_continuation_pop(dc, NULL, flags, cq);
}

static void
_dispatch_source_latch_and_call(dispatch_source_t ds, dispatch_queue_t cq,
		dispatch_invoke_flags_t flags)
{
	dispatch_source_refs_t dr = ds->ds_refs;
	dispatch_continuation_t dc = _dispatch_source_get_handler(dr, DS_EVENT_HANDLER);
	uint64_t prev;

	if (dr->du_is_timer && !(dr->du_fflags & DISPATCH_TIMER_AFTER)) {
		prev = _dispatch_source_timer_data(ds, dr);
	} else {
		prev = os_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
	}
	if (dr->du_data_action == DISPATCH_UNOTE_ACTION_DATA_SET) {
		ds->ds_data = ~prev;
	} else {
		ds->ds_data = prev;
	}
	if (!dispatch_assume(prev != 0) || !dc) {
		return;
	}
	_dispatch_continuation_pop(dc, NULL, flags, cq);
	if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_AFTER)) {
		_dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
		dispatch_release(ds); // dispatch_after sources are one-shot
	}
}

DISPATCH_NOINLINE
static void
_dispatch_source_refs_finalize_unregistration(dispatch_source_t ds)
{
	dispatch_queue_flags_t dqf;
	dispatch_source_refs_t dr = ds->ds_refs;

	dqf = _dispatch_queue_atomic_flags_set_and_clear_orig(ds->_as_dq,
			DSF_DELETED, DSF_ARMED | DSF_DEFERRED_DELETE | DSF_CANCEL_WAITER);
	if (dqf & DSF_CANCEL_WAITER) {
		_dispatch_wake_by_address(&ds->dq_atomic_flags);
	}
	_dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
	_dispatch_release_tailcall(ds); // the retain is done at creation time
}

void
_dispatch_source_refs_unregister(dispatch_source_t ds, uint32_t options)
{
	_dispatch_object_debug(ds, "%s", __func__);
	dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	dispatch_source_refs_t dr = ds->ds_refs;

	if (dr->du_is_timer) {
		// Because of the optimization to unregister fired oneshot timers
		// from the target queue, we can't trust _dispatch_unote_registered()
		// to tell the truth, it may not have happened yet
		if (dqf & DSF_ARMED) {
			_dispatch_timers_unregister(ds->ds_timer_refs);
			_dispatch_release_2(ds);
		}
		dr->du_ident = DISPATCH_TIMER_IDENT_CANCELED;
	} else {
		if (_dispatch_unote_needs_rearm(dr) && !(dqf & DSF_ARMED)) {
			options |= DU_UNREGISTER_IMMEDIATE_DELETE;
		}
		if (!_dispatch_unote_unregister(dr, options)) {
			_dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
					ds, dr);
			_dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_DEFERRED_DELETE);
			return; // deferred unregistration
		}
	}

	ds->ds_is_installed = true;
	_dispatch_source_refs_finalize_unregistration(ds);
}

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_source_tryarm(dispatch_source_t ds)
{
	dispatch_queue_flags_t oqf, nqf;
	return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
		if (oqf & (DSF_DEFERRED_DELETE | DSF_DELETED)) {
			// the test is inside the loop because it's convenient but the
			// result should not change for the duration of the rmw_loop
			os_atomic_rmw_loop_give_up(break);
		}
		nqf = oqf | DSF_ARMED;
	});
}

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_source_refs_resume(dispatch_source_t ds)
{
	dispatch_source_refs_t dr = ds->ds_refs;
	if (dr->du_is_timer) {
		_dispatch_timers_update(dr, 0);
		return true;
	}
	if (unlikely(!_dispatch_source_tryarm(ds))) {
		return false;
	}
	_dispatch_unote_resume(dr);
	_dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds, dr);
	return true;
}

void
_dispatch_source_refs_register(dispatch_source_t ds, dispatch_wlh_t wlh,
		dispatch_priority_t pri)
{
	dispatch_source_refs_t dr = ds->ds_refs;
	dispatch_priority_t kbp;

	dispatch_assert(!ds->ds_is_installed);

	if (dr->du_is_timer) {
		dispatch_queue_t dq = ds->_as_dq;
		kbp = _dispatch_queue_compute_priority_and_wlh(dq, NULL);
		// aggressively coalesce background/maintenance QoS timers
		// <rdar://problem/12200216&27342536>
		if (_dispatch_qos_is_background(_dispatch_priority_qos(kbp))) {
			if (dr->du_fflags & DISPATCH_TIMER_STRICT) {
				_dispatch_ktrace1(DISPATCH_PERF_strict_bg_timer, ds);
			} else {
				dr->du_fflags |= DISPATCH_TIMER_BACKGROUND;
				dr->du_ident = _dispatch_source_timer_idx(dr);
			}
		}
		_dispatch_timers_update(dr, 0);
		return;
	}

	if (unlikely(!_dispatch_source_tryarm(ds) ||
			!_dispatch_unote_register(dr, wlh, pri))) {
		// Do the parts of dispatch_source_refs_unregister() that
		// are required after this partial initialization.
		_dispatch_source_refs_finalize_unregistration(ds);
	} else {
		_dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, dr);
	}
	_dispatch_object_debug(ds, "%s", __func__);
}

static void
_dispatch_source_set_event_handler_context(void *ctxt)
{
	dispatch_source_t ds = ctxt;
	dispatch_continuation_t dc = _dispatch_source_get_event_handler(ds->ds_refs);

	if (dc && (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT)) {
		dc->dc_ctxt = ds->do_ctxt;
	}
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_source_install(dispatch_source_t ds, dispatch_wlh_t wlh,
		dispatch_priority_t pri)
{
	_dispatch_source_refs_register(ds, wlh, pri);
	ds->ds_is_installed = true;
}

void
_dispatch_source_finalize_activation(dispatch_source_t ds, bool *allow_resume)
{
	dispatch_continuation_t dc;
	dispatch_source_refs_t dr = ds->ds_refs;
	dispatch_priority_t pri;
	dispatch_wlh_t wlh;

	if (unlikely(dr->du_is_direct &&
			(_dispatch_queue_atomic_flags(ds->_as_dq) & DSF_CANCELED))) {
		return _dispatch_source_refs_unregister(ds, 0);
	}

	dc = _dispatch_source_get_event_handler(dr);
	if (dc) {
		if (_dispatch_object_is_barrier(dc)) {
			_dispatch_queue_atomic_flags_set(ds->_as_dq, DQF_BARRIER_BIT);
		}
		ds->dq_priority = _dispatch_priority_from_pp_strip_flags(dc->dc_priority);
		if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
			_dispatch_barrier_async_detached_f(ds->_as_dq, ds,
					_dispatch_source_set_event_handler_context);
		}
	}

	// call "super"
	_dispatch_queue_finalize_activation(ds->_as_dq, allow_resume);

	if (dr->du_is_direct && !ds->ds_is_installed) {
		dispatch_queue_t dq = ds->_as_dq;
		pri = _dispatch_queue_compute_priority_and_wlh(dq, &wlh);
		if (pri) _dispatch_source_install(ds, wlh, pri);
	}
}

DISPATCH_ALWAYS_INLINE
static inline dispatch_queue_wakeup_target_t
_dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
		dispatch_invoke_flags_t flags, uint64_t *owned)
{
	dispatch_source_t ds = dou._ds;
	dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
	dispatch_queue_t dq = _dispatch_queue_get_current();
	dispatch_source_refs_t dr = ds->ds_refs;
	dispatch_queue_flags_t dqf;

	if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN) &&
			_dispatch_unote_wlh_changed(dr, _dispatch_get_wlh())) {
		dqf = _dispatch_queue_atomic_flags_set_orig(ds->_as_dq,
				DSF_WLH_CHANGED);
		if (!(dqf & DSF_WLH_CHANGED)) {
			_dispatch_bug_deprecated("Changing target queue "
					"hierarchy after source was activated");
		}
	}

	if (_dispatch_queue_class_probe(ds)) {
		// Intentionally always drain even when on the manager queue
		// and not the source's regular target queue: we need to be able
		// to drain timer setting and the like there.
		dispatch_with_disabled_narrowing(dic, {
			retq = _dispatch_queue_serial_drain(ds->_as_dq, dic, flags, owned);
		});
	}

	// This function performs all source actions. Each action is responsible
	// for verifying that it takes place on the appropriate queue. If the
	// current queue is not the correct queue for this action, the correct queue
	// will be returned and the invoke will be re-driven on that queue.

	// The order of tests here in invoke and in wakeup should be consistent.

	dispatch_queue_t dkq = &_dispatch_mgr_q;
	bool prevent_starvation = false;

	if (dr->du_is_direct) {
		dkq = ds->do_targetq;
	}

	if (dr->du_is_timer &&
			os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
		dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
		if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
			// timer has to be configured on the kevent queue
			if (dq != dkq) {
				return dkq;
			}
			_dispatch_source_timer_configure(ds);
		}
	}

	if (!ds->ds_is_installed) {
		// The source needs to be installed on the kevent queue.
		if (dq != dkq) {
			return dkq;
		}
		_dispatch_source_install(ds, _dispatch_get_wlh(),
				_dispatch_get_basepri());
	}

	if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
		// Source suspended by an item drained from the source queue.
		return ds->do_targetq;
	}

	if (_dispatch_source_get_registration_handler(dr)) {
		// The source has been registered and the registration handler needs
		// to be delivered on the target queue.
		if (dq != ds->do_targetq) {
			return ds->do_targetq;
		}
		// clears ds_registration_handler
		_dispatch_source_registration_callout(ds, dq, flags);
	}

	dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	if ((dqf & DSF_DEFERRED_DELETE) && !(dqf & DSF_ARMED)) {
unregister_event:
		// DSF_DELETE: Pending source kevent unregistration has been completed
		// !DSF_ARMED: event was delivered and can safely be unregistered
		if (dq != dkq) {
			return dkq;
		}
		_dispatch_source_refs_unregister(ds, DU_UNREGISTER_IMMEDIATE_DELETE);
		dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	}

	if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
			os_atomic_load2o(ds, ds_pending_data, relaxed)) {
		// The source has pending data to deliver via the event handler callback
		// on the target queue. Some sources need to be rearmed on the kevent
		// queue after event delivery.
		if (dq == ds->do_targetq) {
			_dispatch_source_latch_and_call(ds, dq, flags);
			dqf = _dispatch_queue_atomic_flags(ds->_as_dq);

			// starvation avoidance: if the source triggers itself then force a
			// re-queue to give other things already queued on the target queue
			// a chance to run.
			//
			// however, if the source is directly targeting an overcommit root
			// queue, this would requeue the source and ask for a new overcommit
			// thread right away.
			prevent_starvation = dq->do_targetq ||
					!(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
			if (prevent_starvation &&
					os_atomic_load2o(ds, ds_pending_data, relaxed)) {
				retq = ds->do_targetq;
			}
		} else {
			// there is no point trying to be eager, the next thing to do is
			// to deliver the event
			return ds->do_targetq;
		}
	}

	if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
		// The source has been cancelled and needs to be uninstalled from the
		// kevent queue. After uninstallation, the cancellation handler needs
		// to be delivered to the target queue.
		if (!(dqf & DSF_DELETED)) {
			if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
				// timers can cheat if not armed because there's nothing left
				// to do on the manager queue and unregistration can happen
				// on the regular target queue
			} else if (dq != dkq) {
				return dkq;
			}
			_dispatch_source_refs_unregister(ds, 0);
			dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
			if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
				if (!(dqf & DSF_ARMED)) {
					goto unregister_event;
				}
				// we need to wait for the EV_DELETE
				return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
			}
		}
		if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
				_dispatch_source_get_cancel_handler(dr) ||
				_dispatch_source_get_registration_handler(dr))) {
			retq = ds->do_targetq;
		} else {
			_dispatch_source_cancel_callout(ds, dq, flags);
			dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
		}
		prevent_starvation = false;
	}

	if (_dispatch_unote_needs_rearm(dr) &&
			!(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
		// The source needs to be rearmed on the kevent queue.
		if (dq != dkq) {
			return dkq;
		}
		if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
			// no need for resume when we can directly unregister the kevent
			goto unregister_event;
		}
		if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
			// do not try to rearm the kevent if the source is suspended
			// from the source handler
			return ds->do_targetq;
		}
		if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
			// keep the old behavior to force re-enqueue to our target queue
			// for the rearm.
			//
			// if the handler didn't run, or this is a pending delete
			// or our target queue is a global queue, then starvation is
			// not a concern and we can rearm right away.
			return ds->do_targetq;
		}
		if (unlikely(!_dispatch_source_refs_resume(ds))) {
			goto unregister_event;
		}
		if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
			// try to redrive the drain from under the lock for sources
			// targeting an overcommit root queue to avoid parking
			// when the next event has already fired
			_dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
		}
	}

	return retq;
}

DISPATCH_NOINLINE
void
_dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
		dispatch_invoke_flags_t flags)
{
	_dispatch_queue_class_invoke(ds, dic, flags,
			DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);
}

void
_dispatch_source_wakeup(dispatch_source_t ds, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
	// This function determines whether the source needs to be invoked.
	// The order of tests here in wakeup and in invoke should be consistent.

	dispatch_source_refs_t dr = ds->ds_refs;
	dispatch_queue_wakeup_target_t dkq = DISPATCH_QUEUE_WAKEUP_MGR;
	dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
	dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	bool deferred_delete = (dqf & DSF_DEFERRED_DELETE);

	if (dr->du_is_direct) {
		dkq = DISPATCH_QUEUE_WAKEUP_TARGET;
	}

	if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && dr->du_is_timer &&
			os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
		// timer has to be configured on the kevent queue
		tq = dkq;
	} else if (!ds->ds_is_installed) {
		// The source needs to be installed on the kevent queue.
		tq = dkq;
	} else if (_dispatch_source_get_registration_handler(dr)) {
		// The registration handler needs to be delivered to the target queue.
		tq = DISPATCH_QUEUE_WAKEUP_TARGET;
	} else if (deferred_delete && !(dqf & DSF_ARMED)) {
		// Pending source kevent unregistration has been completed
		// or EV_ONESHOT event can be acknowledged
		tq = dkq;
	} else if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
			os_atomic_load2o(ds, ds_pending_data, relaxed)) {
		// The source has pending data to deliver to the target queue.
		tq = DISPATCH_QUEUE_WAKEUP_TARGET;
	} else if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !deferred_delete) {
		// The source needs to be uninstalled from the kevent queue, or the
		// cancellation handler needs to be delivered to the target queue.
		// Note: cancellation assumes installation.
		if (!(dqf & DSF_DELETED)) {
			if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
				// timers can cheat if not armed because there's nothing left
				// to do on the manager queue and unregistration can happen
				// on the regular target queue
				tq = DISPATCH_QUEUE_WAKEUP_TARGET;
			} else {
				tq = dkq;
			}
		} else if (_dispatch_source_get_event_handler(dr) ||
				_dispatch_source_get_cancel_handler(dr) ||
				_dispatch_source_get_registration_handler(dr)) {
			tq = DISPATCH_QUEUE_WAKEUP_TARGET;
		}
	} else if (_dispatch_unote_needs_rearm(dr) &&
			!(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
		// The source needs to be rearmed on the kevent queue.
		tq = dkq;
	}
	if (!tq && _dispatch_queue_class_probe(ds)) {
		tq = DISPATCH_QUEUE_WAKEUP_TARGET;
	}

	if ((tq == DISPATCH_QUEUE_WAKEUP_TARGET) &&
			ds->do_targetq == &_dispatch_mgr_q) {
		tq = DISPATCH_QUEUE_WAKEUP_MGR;
	}

	return _dispatch_queue_class_wakeup(ds->_as_dq, qos, flags, tq);
}

void
dispatch_source_cancel(dispatch_source_t ds)
{
	_dispatch_object_debug(ds, "%s", __func__);
	// Right after we set the cancel flag, someone else
	// could potentially invoke the source, do the cancellation,
	// unregister the source, and deallocate it. We would
	// need to therefore retain/release before setting the bit
	_dispatch_retain_2(ds);

	dispatch_queue_t q = ds->_as_dq;
	if (_dispatch_queue_atomic_flags_set_orig(q, DSF_CANCELED) & DSF_CANCELED) {
		_dispatch_release_2_tailcall(ds);
	} else {
		dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
	}
}

void
dispatch_source_cancel_and_wait(dispatch_source_t ds)
{
	dispatch_queue_flags_t old_dqf, dqf, new_dqf;
	dispatch_source_refs_t dr = ds->ds_refs;

	if (unlikely(_dispatch_source_get_cancel_handler(dr))) {
		DISPATCH_CLIENT_CRASH(ds, "Source has a cancel handler");
	}

	_dispatch_object_debug(ds, "%s", __func__);
	os_atomic_rmw_loop2o(ds, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
		new_dqf = old_dqf | DSF_CANCELED;
		if (old_dqf & DSF_CANCEL_WAITER) {
			os_atomic_rmw_loop_give_up(break);
		}
		if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
			// just add DSF_CANCELED
		} else if ((old_dqf & DSF_DEFERRED_DELETE) || !dr->du_is_direct) {
			new_dqf |= DSF_CANCEL_WAITER;
		}
	});
	dqf = new_dqf;

	if (old_dqf & DQF_RELEASED) {
		DISPATCH_CLIENT_CRASH(ds, "Dispatch source used after last release");
	}
	if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
		return;
	}
	if (dqf & DSF_CANCEL_WAITER) {
		goto wakeup;
	}

	// simplified version of _dispatch_queue_drain_try_lock
	// that also sets the DIRTY bit on failure to lock
	uint64_t set_owner_and_set_full_width = _dispatch_lock_value_for_self() |
			DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
	uint64_t old_state, new_state;

	os_atomic_rmw_loop2o(ds, dq_state, old_state, new_state, seq_cst, {
		new_state = old_state;
		if (likely(_dq_state_is_runnable(old_state) &&
				!_dq_state_drain_locked(old_state))) {
			new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
			new_state |= set_owner_and_set_full_width;
		} else if (old_dqf & DSF_CANCELED) {
			os_atomic_rmw_loop_give_up(break);
		} else {
			// this case needs a release barrier, hence the seq_cst above
			new_state |= DISPATCH_QUEUE_DIRTY;
		}
	});

	if (unlikely(_dq_state_is_suspended(old_state))) {
		if (unlikely(_dq_state_suspend_cnt(old_state))) {
			DISPATCH_CLIENT_CRASH(ds, "Source is suspended");
		}
		// inactive sources have never been registered and there is no need
		// to wait here because activation will notice and mark the source
		// as deleted without ever trying to use the fd or mach port.
		return dispatch_activate(ds);
	}

	if (likely(_dq_state_is_runnable(old_state) &&
			!_dq_state_drain_locked(old_state))) {
		// same thing _dispatch_source_invoke2() does when handling cancellation
		dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
		if (!(dqf & (DSF_DEFERRED_DELETE | DSF_DELETED))) {
			_dispatch_source_refs_unregister(ds, 0);
			dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
			if (likely((dqf & DSF_STATE_MASK) == DSF_DELETED)) {
				_dispatch_source_cancel_callout(ds, NULL, DISPATCH_INVOKE_NONE);
			}
		}
		dx_wakeup(ds, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
	} else if (unlikely(_dq_state_drain_locked_by_self(old_state))) {
		DISPATCH_CLIENT_CRASH(ds, "dispatch_source_cancel_and_wait "
				"called from a source handler");
	} else {
		dispatch_qos_t qos;
wakeup:
		qos = _dispatch_qos_from_pp(_dispatch_get_priority());
		dx_wakeup(ds, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
		dispatch_activate(ds);
	}

	dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	while (unlikely((dqf & DSF_STATE_MASK) != DSF_DELETED)) {
		if (unlikely(!(dqf & DSF_CANCEL_WAITER))) {
			if (!os_atomic_cmpxchgv2o(ds, dq_atomic_flags,
					dqf, dqf | DSF_CANCEL_WAITER, &dqf, relaxed)) {
				continue;
			}
			dqf |= DSF_CANCEL_WAITER;
		}
		_dispatch_wait_on_address(&ds->dq_atomic_flags, dqf, DLOCK_LOCK_NONE);
		dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	}
}

void
_dispatch_source_merge_evt(dispatch_unote_t du, uint32_t flags, uintptr_t data,
		uintptr_t status, pthread_priority_t pp)
{
	dispatch_source_refs_t dr = du._dr;
	dispatch_source_t ds = _dispatch_source_from_refs(dr);
	dispatch_wakeup_flags_t wflags = 0;
	dispatch_queue_flags_t dqf;

	if (_dispatch_unote_needs_rearm(dr) || (flags & (EV_DELETE | EV_ONESHOT))) {
		// once we modify the queue atomic flags below, it will allow concurrent
		// threads running _dispatch_source_invoke2 to dispose of the source,
		// so we can't safely borrow the reference we get from the muxnote udata
		// anymore, and need our own
		wflags = DISPATCH_WAKEUP_CONSUME_2;
		_dispatch_retain_2(ds); // rdar://20382435
	}

	if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
			!(flags & EV_DELETE)) {
		dqf = _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq,
				DSF_DEFERRED_DELETE, DSF_ARMED);
		if (flags & EV_VANISHED) {
			_dispatch_bug_kevent_client("kevent", dr->du_type->dst_kind,
					"monitored resource vanished before the source "
					"cancel handler was invoked", 0);
		}
		_dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds,
				(flags & EV_VANISHED) ? "vanished" :
				"deferred delete oneshot", dr);
	} else if (flags & (EV_DELETE | EV_ONESHOT)) {
		_dispatch_source_refs_unregister(ds, DU_UNREGISTER_ALREADY_DELETED);
		_dispatch_debug("kevent-source[%p]: deleted kevent[%p]", ds, dr);
		if (flags & EV_DELETE) goto done;
		dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	} else if (_dispatch_unote_needs_rearm(dr)) {
		dqf = _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
		_dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dr);
	} else {
		dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
	}

	if (dqf & (DSF_CANCELED | DQF_RELEASED)) {
		goto done; // rdar://20204025
	}

	dispatch_unote_action_t action = dr->du_data_action;
	if ((flags & EV_UDATA_SPECIFIC) && (flags & EV_ONESHOT) &&
			(flags & EV_VANISHED)) {
		// if the resource behind the ident vanished, the event handler can't
		// do anything useful anymore, so do not try to call it at all
		//
		// Note: if the kernel doesn't support EV_VANISHED we always get it
		// back unchanged from the flags passed at EV_ADD (registration) time
		// Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
		// if we get both bits it was a real EV_VANISHED delivery
		os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
#if HAVE_MACH
	} else if (dr->du_filter == EVFILT_MACHPORT) {
		os_atomic_store2o(ds, ds_pending_data, data, relaxed);
#endif
	} else if (action == DISPATCH_UNOTE_ACTION_DATA_SET) {
		os_atomic_store2o(ds, ds_pending_data, data, relaxed);
	} else if (action == DISPATCH_UNOTE_ACTION_DATA_ADD) {
		os_atomic_add2o(ds, ds_pending_data, data, relaxed);
	} else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR) {
		os_atomic_or2o(ds, ds_pending_data, data, relaxed);
	} else if (data && action == DISPATCH_UNOTE_ACTION_DATA_OR_STATUS_SET) {
		// We combine the data and status into a single 64-bit value.
		uint64_t odata, ndata;
		uint64_t value = DISPATCH_SOURCE_COMBINE_DATA_AND_STATUS(data, status);
		os_atomic_rmw_loop2o(ds, ds_pending_data, odata, ndata, relaxed, {
            ndata = DISPATCH_SOURCE_GET_DATA(odata) | value;
		});
	} else if (data) {
		DISPATCH_INTERNAL_CRASH(action, "Unexpected source action value");
	}
	_dispatch_debug("kevent-source[%p]: merged kevent[%p]", ds, dr);

done:
	_dispatch_object_debug(ds, "%s", __func__);
	dx_wakeup(ds, _dispatch_qos_from_pp(pp), wflags | DISPATCH_WAKEUP_MAKE_DIRTY);
}

#pragma mark -
#pragma mark dispatch_source_timer

#if DISPATCH_USE_DTRACE
static dispatch_timer_source_refs_t
		_dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
#define _dispatch_trace_next_timer_set(x, q) \
		_dispatch_trace_next_timer[(q)] = (x)
#define _dispatch_trace_next_timer_program(d, q) \
		_dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_mgr_trace_timers_wakes(void)
{
	uint32_t qos;

	if (_dispatch_timers_will_wake) {
		if (slowpath(DISPATCH_TIMER_WAKE_ENABLED())) {
			for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
				if (_dispatch_timers_will_wake & (1 << qos)) {
					_dispatch_trace_timer_wake(_dispatch_trace_next_timer[qos]);
				}
			}
		}
		_dispatch_timers_will_wake = 0;
	}
}
#else
#define _dispatch_trace_next_timer_set(x, q)
#define _dispatch_trace_next_timer_program(d, q)
#define _dispatch_mgr_trace_timers_wakes()
#endif

#define _dispatch_source_timer_telemetry_enabled() false

DISPATCH_NOINLINE
static void
_dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
		dispatch_clock_t clock, struct dispatch_timer_source_s *values)
{
	if (_dispatch_trace_timer_configure_enabled()) {
		_dispatch_trace_timer_configure(ds, clock, values);
	}
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_source_timer_telemetry(dispatch_source_t ds, dispatch_clock_t clock,
		struct dispatch_timer_source_s *values)
{
	if (_dispatch_trace_timer_configure_enabled() ||
			_dispatch_source_timer_telemetry_enabled()) {
		_dispatch_source_timer_telemetry_slow(ds, clock, values);
		asm(""); // prevent tailcall
	}
}

DISPATCH_NOINLINE
static void
_dispatch_source_timer_configure(dispatch_source_t ds)
{
	dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
	dispatch_timer_config_t dtc;

	dtc = os_atomic_xchg2o(dt, dt_pending_config, NULL, dependency);
	if (dtc->dtc_clock == DISPATCH_CLOCK_MACH) {
		dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
	} else {
		dt->du_fflags &= ~(uint32_t)DISPATCH_TIMER_CLOCK_MACH;
	}
	dt->dt_timer = dtc->dtc_timer;
	free(dtc);
	if (ds->ds_is_installed) {
		// Clear any pending data that might have accumulated on
		// older timer params <rdar://problem/8574886>
		os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
		_dispatch_timers_update(dt, 0);
	}
}

static dispatch_timer_config_t
_dispatch_source_timer_config_create(dispatch_time_t start,
		uint64_t interval, uint64_t leeway)
{
	dispatch_timer_config_t dtc;
	dtc = _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s));
	if (unlikely(interval == 0)) {
		if (start != DISPATCH_TIME_FOREVER) {
			_dispatch_bug_deprecated("Setting timer interval to 0 requests "
					"a 1ns timer, did you mean FOREVER (a one-shot timer)?");
		}
		interval = 1;
	} else if ((int64_t)interval < 0) {
		// 6866347 - make sure nanoseconds won't overflow
		interval = INT64_MAX;
	}
	if ((int64_t)leeway < 0) {
		leeway = INT64_MAX;
	}
	if (start == DISPATCH_TIME_NOW) {
		start = _dispatch_absolute_time();
	} else if (start == DISPATCH_TIME_FOREVER) {
		start = INT64_MAX;
	}

	if ((int64_t)start < 0) {
		// wall clock
		start = (dispatch_time_t)-((int64_t)start);
		dtc->dtc_clock = DISPATCH_CLOCK_WALL;
	} else {
		// absolute clock
		interval = _dispatch_time_nano2mach(interval);
		if (interval < 1) {
			// rdar://problem/7287561 interval must be at least one in
			// in order to avoid later division by zero when calculating
			// the missed interval count. (NOTE: the wall clock's
			// interval is already "fixed" to be 1 or more)
			interval = 1;
		}
		leeway = _dispatch_time_nano2mach(leeway);
		dtc->dtc_clock = DISPATCH_CLOCK_MACH;
	}
	if (interval < INT64_MAX && leeway > interval / 2) {
		leeway = interval / 2;
	}

	dtc->dtc_timer.target = start;
	dtc->dtc_timer.interval = interval;
	if (start + leeway < INT64_MAX) {
		dtc->dtc_timer.deadline = start + leeway;
	} else {
		dtc->dtc_timer.deadline = INT64_MAX;
	}
	return dtc;
}

DISPATCH_NOINLINE
void
dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
		uint64_t interval, uint64_t leeway)
{
	dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
	dispatch_timer_config_t dtc;

	if (unlikely(!dt->du_is_timer || (dt->du_fflags&DISPATCH_TIMER_INTERVAL))) {
		DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
	}

	dtc = _dispatch_source_timer_config_create(start, interval, leeway);
	_dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
	dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
	if (dtc) free(dtc);
	dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}

static void
_dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
{
#define NSEC_PER_FRAME (NSEC_PER_SEC/60)
// approx 1 year (60s * 60m * 24h * 365d)
#define FOREVER_NSEC 31536000000000000ull

	dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
	const bool animation = dr->du_fflags & DISPATCH_INTERVAL_UI_ANIMATION;
	if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
			FOREVER_NSEC/NSEC_PER_MSEC))) {
		interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
	} else {
		interval = FOREVER_NSEC;
	}
	interval = _dispatch_time_nano2mach(interval);
	uint64_t target = _dispatch_absolute_time() + interval;
	target -= (target % interval);
	const uint64_t leeway = animation ?
			_dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
	dr->dt_timer.target = target;
	dr->dt_timer.deadline = target + leeway;
	dr->dt_timer.interval = interval;
	_dispatch_source_timer_telemetry(ds, DISPATCH_CLOCK_MACH, &dr->dt_timer);
}

#pragma mark -
#pragma mark dispatch_after

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
		void *ctxt, void *handler, bool block)
{
	dispatch_timer_source_refs_t dt;
	dispatch_source_t ds;
	uint64_t leeway, delta;

	if (when == DISPATCH_TIME_FOREVER) {
#if DISPATCH_DEBUG
		DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
		return;
	}

	delta = _dispatch_timeout(when);
	if (delta == 0) {
		if (block) {
			return dispatch_async(queue, handler);
		}
		return dispatch_async_f(queue, ctxt, handler);
	}
	leeway = delta / 10; // <rdar://problem/13447496>

	if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
	if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;

	// this function can and should be optimized to not use a dispatch source
	ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
	dt = ds->ds_timer_refs;

	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	if (block) {
		_dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
	} else {
		_dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
	}
	// reference `ds` so that it doesn't show up as a leak
	dc->dc_data = ds;
	_dispatch_trace_continuation_push(ds->_as_dq, dc);
	os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);

	if ((int64_t)when < 0) {
		// wall clock
		when = (dispatch_time_t)-((int64_t)when);
	} else {
		// absolute clock
		dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
		leeway = _dispatch_time_nano2mach(leeway);
	}
	dt->dt_timer.target = when;
	dt->dt_timer.interval = UINT64_MAX;
	dt->dt_timer.deadline = when + leeway;
	dispatch_activate(ds);
}

DISPATCH_NOINLINE
void
dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
		dispatch_function_t func)
{
	_dispatch_after(when, queue, ctxt, func, false);
}

#ifdef __BLOCKS__
void
dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
		dispatch_block_t work)
{
	_dispatch_after(when, queue, NULL, work, true);
}
#endif

#pragma mark -
#pragma mark dispatch_timers

/*
 * The dispatch_timer_heap_t structure is a double min-heap of timers,
 * interleaving the by-target min-heap in the even slots, and the by-deadline
 * in the odd ones.
 *
 * The min element of these is held inline in the dispatch_timer_heap_t
 * structure, and further entries are held in segments.
 *
 * dth_segments is the number of allocated segments.
 *
 * Segment 0 has a size of `DISPATCH_HEAP_INIT_SEGMENT_CAPACITY` pointers
 * Segment k has a size of (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (k - 1))
 *
 * Segment n (dth_segments - 1) is the last segment and points its final n
 * entries to previous segments. Its address is held in the `dth_heap` field.
 *
 * segment n   [ regular timer pointers | n-1 | k | 0 ]
 *                                         |    |   |
 * segment n-1 <---------------------------'    |   |
 * segment k   <--------------------------------'   |
 * segment 0   <------------------------------------'
 */
#define DISPATCH_HEAP_INIT_SEGMENT_CAPACITY 8u

/*
 * There are two min-heaps stored interleaved in a single array,
 * even indices are for the by-target min-heap, and odd indices for
 * the by-deadline one.
 */
#define DTH_HEAP_ID_MASK (DTH_ID_COUNT - 1)
#define DTH_HEAP_ID(idx) ((idx) & DTH_HEAP_ID_MASK)
#define DTH_IDX_FOR_HEAP_ID(idx, heap_id) \
		(((idx) & ~DTH_HEAP_ID_MASK) | (heap_id))

DISPATCH_ALWAYS_INLINE
static inline uint32_t
_dispatch_timer_heap_capacity(uint32_t segments)
{
	if (segments == 0) return 2;
	uint32_t seg_no = segments - 1;
	// for C = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY,
	// 2 + C + SUM(C << (i-1), i = 1..seg_no) - seg_no
	return 2 + (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << seg_no) - seg_no;
}

DISPATCH_NOINLINE
static void
_dispatch_timer_heap_grow(dispatch_timer_heap_t dth)
{
	uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
	uint32_t seg_no = dth->dth_segments++;
	void **heap, **heap_prev = dth->dth_heap;

	if (seg_no > 0) {
		seg_capacity <<= (seg_no - 1);
	}
	heap = _dispatch_calloc(seg_capacity, sizeof(void *));
	if (seg_no > 1) {
		uint32_t prev_seg_no = seg_no - 1;
		uint32_t prev_seg_capacity = seg_capacity >> 1;
		memcpy(&heap[seg_capacity - prev_seg_no],
				&heap_prev[prev_seg_capacity - prev_seg_no],
				prev_seg_no * sizeof(void *));
	}
	if (seg_no > 0) {
		heap[seg_capacity - seg_no] = heap_prev;
	}
	dth->dth_heap = heap;
}

DISPATCH_NOINLINE
static void
_dispatch_timer_heap_shrink(dispatch_timer_heap_t dth)
{
	uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
	uint32_t seg_no = --dth->dth_segments;
	void **heap = dth->dth_heap, **heap_prev = NULL;

	if (seg_no > 0) {
		seg_capacity <<= (seg_no - 1);
		heap_prev = heap[seg_capacity - seg_no];
	}
	if (seg_no > 1) {
		uint32_t prev_seg_no = seg_no - 1;
		uint32_t prev_seg_capacity = seg_capacity >> 1;
		memcpy(&heap_prev[prev_seg_capacity - prev_seg_no],
				&heap[seg_capacity - prev_seg_no],
				prev_seg_no * sizeof(void *));
	}
	dth->dth_heap = heap_prev;
	free(heap);
}

DISPATCH_ALWAYS_INLINE
static inline dispatch_timer_source_refs_t *
_dispatch_timer_heap_get_slot(dispatch_timer_heap_t dth, uint32_t idx)
{
	uint32_t seg_no, segments = dth->dth_segments;
	void **segment;

	if (idx < DTH_ID_COUNT) {
		return &dth->dth_min[idx];
	}
	idx -= DTH_ID_COUNT;

	// Derive the segment number from the index. Naming
	// DISPATCH_HEAP_INIT_SEGMENT_CAPACITY `C`, the segments index ranges are:
	// 0: 0 .. (C - 1)
	// 1: C .. 2 * C - 1
	// k: 2^(k-1) * C .. 2^k * C - 1
	// so `k` can be derived from the first bit set in `idx`
	seg_no = (uint32_t)(__builtin_clz(DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1) -
			__builtin_clz(idx | (DISPATCH_HEAP_INIT_SEGMENT_CAPACITY - 1)));
	if (seg_no + 1 == segments) {
		segment = dth->dth_heap;
	} else {
		uint32_t seg_capacity = DISPATCH_HEAP_INIT_SEGMENT_CAPACITY;
		seg_capacity <<= (segments - 2);
		segment = dth->dth_heap[seg_capacity - seg_no - 1];
	}
	if (seg_no) {
		idx -= DISPATCH_HEAP_INIT_SEGMENT_CAPACITY << (seg_no - 1);
	}
	return (dispatch_timer_source_refs_t *)(segment + idx);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_timer_heap_set(dispatch_timer_source_refs_t *slot,
		dispatch_timer_source_refs_t dt, uint32_t idx)
{
	*slot = dt;
	dt->dt_heap_entry[DTH_HEAP_ID(idx)] = idx;
}

DISPATCH_ALWAYS_INLINE
static inline uint32_t
_dispatch_timer_heap_parent(uint32_t idx)
{
	uint32_t heap_id = DTH_HEAP_ID(idx);
	idx = (idx - DTH_ID_COUNT) / 2; // go to the parent
	return DTH_IDX_FOR_HEAP_ID(idx, heap_id);
}

DISPATCH_ALWAYS_INLINE
static inline uint32_t
_dispatch_timer_heap_left_child(uint32_t idx)
{
	uint32_t heap_id = DTH_HEAP_ID(idx);
	// 2 * (idx - heap_id) + DTH_ID_COUNT + heap_id
	return 2 * idx + DTH_ID_COUNT - heap_id;
}

#if DISPATCH_HAVE_TIMER_COALESCING
DISPATCH_ALWAYS_INLINE
static inline uint32_t
_dispatch_timer_heap_walk_skip(uint32_t idx, uint32_t count)
{
	uint32_t heap_id = DTH_HEAP_ID(idx);

	idx -= heap_id;
	if (unlikely(idx + DTH_ID_COUNT == count)) {
		// reaching `count` doesn't mean we're done, but there is a weird
		// corner case if the last item of the heap is a left child:
		//
		//     /\
		//    /  \
		//   /  __\
		//  /__/
		//     ^
		//
		// The formula below would return the sibling of `idx` which is
		// out of bounds. Fortunately, the correct answer is the same
		// as for idx's parent
		idx = _dispatch_timer_heap_parent(idx);
	}

	//
	// When considering the index in a non interleaved, 1-based array
	// representation of a heap, hence looking at (idx / DTH_ID_COUNT + 1)
	// for a given idx in our dual-heaps, that index is in one of two forms:
	//
	//     (a) 1xxxx011111    or    (b) 111111111
	//         d    i    0              d       0
	//
	// The first bit set is the row of the binary tree node (0-based).
	// The following digits from most to least significant represent the path
	// to that node, where `0` is a left turn and `1` a right turn.
	//
	// For example 0b0101 (5) is a node on row 2 accessed going left then right:
	//
	// row 0          1
	//              /   .
	// row 1      2       3
	//           . \     . .
	// row 2    4   5   6   7
	//         : : : : : : : :
	//
	// Skipping a sub-tree in walk order means going to the sibling of the last
	// node reached after we turned left. If the node was of the form (a),
	// this node is 1xxxx1, which for the above example is 0b0011 (3).
	// If the node was of the form (b) then we never took a left, meaning
	// we reached the last element in traversal order.
	//

	//
	// we want to find
	// - the least significant bit set to 0 in (idx / DTH_ID_COUNT + 1)
	// - which is offset by log_2(DTH_ID_COUNT) from the position of the least
	//   significant 0 in (idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
	//   since idx is a multiple of DTH_ID_COUNT and DTH_ID_COUNT a power of 2.
	// - which in turn is the same as the position of the least significant 1 in
	//   ~(idx + DTH_ID_COUNT + DTH_ID_COUNT - 1)
	//
	dispatch_static_assert(powerof2(DTH_ID_COUNT));
	idx += DTH_ID_COUNT + DTH_ID_COUNT - 1;
	idx >>= __builtin_ctz(~idx);

	//
	// `idx` is now either:
	// - 0 if it was the (b) case above, in which case the walk is done
	// - 1xxxx0 as the position in a 0 based array representation of a non
	//   interleaved heap, so we just have to compute the interleaved index.
	//
	return likely(idx) ? DTH_ID_COUNT * idx + heap_id : UINT32_MAX;
}

DISPATCH_ALWAYS_INLINE
static inline uint32_t
_dispatch_timer_heap_walk_next(uint32_t idx, uint32_t count)
{
	//
	// Goes to the next element in heap walk order, which is the prefix ordered
	// walk of the tree.
	//
	// From a given node, the next item to return is the left child if it
	// exists, else the first right sibling we find by walking our parent chain,
	// which is exactly what _dispatch_timer_heap_walk_skip() returns.
	//
	uint32_t lchild = _dispatch_timer_heap_left_child(idx);
	if (lchild < count) {
		return lchild;
	}
	return _dispatch_timer_heap_walk_skip(idx, count);
}

DISPATCH_NOINLINE
static uint64_t
_dispatch_timer_heap_max_target_before(dispatch_timer_heap_t dth, uint64_t limit)
{
	dispatch_timer_source_refs_t dri;
	uint32_t idx = _dispatch_timer_heap_left_child(DTH_TARGET_ID);
	uint32_t count = dth->dth_count;
	uint64_t tmp, target = dth->dth_min[DTH_TARGET_ID]->dt_timer.target;

	while (idx < count) {
		dri = *_dispatch_timer_heap_get_slot(dth, idx);
		tmp = dri->dt_timer.target;
		if (tmp > limit) {
			// skip subtree since none of the targets below can be before limit
			idx = _dispatch_timer_heap_walk_skip(idx, count);
		} else {
			target = tmp;
			idx = _dispatch_timer_heap_walk_next(idx, count);
		}
	}
	return target;
}
#endif // DISPATCH_HAVE_TIMER_COALESCING

DISPATCH_NOINLINE
static void
_dispatch_timer_heap_resift(dispatch_timer_heap_t dth,
		dispatch_timer_source_refs_t dt, uint32_t idx)
{
	dispatch_static_assert(offsetof(struct dispatch_timer_source_s, target) ==
			offsetof(struct dispatch_timer_source_s, heap_key[DTH_TARGET_ID]));
	dispatch_static_assert(offsetof(struct dispatch_timer_source_s, deadline) ==
			offsetof(struct dispatch_timer_source_s, heap_key[DTH_DEADLINE_ID]));
#define dth_cmp(hid, dt1, op, dt2) \
		(((dt1)->dt_timer.heap_key)[hid] op ((dt2)->dt_timer.heap_key)[hid])

	dispatch_timer_source_refs_t *pslot, pdt;
	dispatch_timer_source_refs_t *cslot, cdt;
	dispatch_timer_source_refs_t *rslot, rdt;
	uint32_t cidx, dth_count = dth->dth_count;
	dispatch_timer_source_refs_t *slot;
	int heap_id = DTH_HEAP_ID(idx);
	bool sifted_up = false;

	// try to sift up

	slot = _dispatch_timer_heap_get_slot(dth, idx);
	while (idx >= DTH_ID_COUNT) {
		uint32_t pidx = _dispatch_timer_heap_parent(idx);
		pslot = _dispatch_timer_heap_get_slot(dth, pidx);
		pdt = *pslot;
		if (dth_cmp(heap_id, pdt, <=, dt)) {
			break;
		}
		_dispatch_timer_heap_set(slot, pdt, idx);
		slot = pslot;
		idx = pidx;
		sifted_up = true;
	}
	if (sifted_up) {
		goto done;
	}

	// try to sift down

	while ((cidx = _dispatch_timer_heap_left_child(idx)) < dth_count) {
		uint32_t ridx = cidx + DTH_ID_COUNT;
		cslot = _dispatch_timer_heap_get_slot(dth, cidx);
		cdt = *cslot;
		if (ridx < dth_count) {
			rslot = _dispatch_timer_heap_get_slot(dth, ridx);
			rdt = *rslot;
			if (dth_cmp(heap_id, cdt, >, rdt)) {
				cidx = ridx;
				cdt = rdt;
				cslot = rslot;
			}
		}
		if (dth_cmp(heap_id, dt, <=, cdt)) {
			break;
		}
		_dispatch_timer_heap_set(slot, cdt, idx);
		slot = cslot;
		idx = cidx;
	}

done:
	_dispatch_timer_heap_set(slot, dt, idx);
#undef dth_cmp
}

DISPATCH_ALWAYS_INLINE
static void
_dispatch_timer_heap_insert(dispatch_timer_heap_t dth,
		dispatch_timer_source_refs_t dt)
{
	uint32_t idx = (dth->dth_count += DTH_ID_COUNT) - DTH_ID_COUNT;

	DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], ==,
			DTH_INVALID_ID, "target idx");
	DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], ==,
			DTH_INVALID_ID, "deadline idx");

	if (idx == 0) {
		dt->dt_heap_entry[DTH_TARGET_ID] = DTH_TARGET_ID;
		dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_DEADLINE_ID;
		dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = dt;
		return;
	}

	if (unlikely(idx + DTH_ID_COUNT >
			_dispatch_timer_heap_capacity(dth->dth_segments))) {
		_dispatch_timer_heap_grow(dth);
	}
	_dispatch_timer_heap_resift(dth, dt, idx + DTH_TARGET_ID);
	_dispatch_timer_heap_resift(dth, dt, idx + DTH_DEADLINE_ID);
}

DISPATCH_NOINLINE
static void
_dispatch_timer_heap_remove(dispatch_timer_heap_t dth,
		dispatch_timer_source_refs_t dt)
{
	uint32_t idx = (dth->dth_count -= DTH_ID_COUNT);

	DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
			DTH_INVALID_ID, "target idx");
	DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
			DTH_INVALID_ID, "deadline idx");

	if (idx == 0) {
		DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_TARGET_ID], ==, dt,
				"target slot");
		DISPATCH_TIMER_ASSERT(dth->dth_min[DTH_DEADLINE_ID], ==, dt,
				"deadline slot");
		dth->dth_min[DTH_TARGET_ID] = dth->dth_min[DTH_DEADLINE_ID] = NULL;
		goto clear_heap_entry;
	}

	for (uint32_t heap_id = 0; heap_id < DTH_ID_COUNT; heap_id++) {
		dispatch_timer_source_refs_t *slot, last_dt;
		slot = _dispatch_timer_heap_get_slot(dth, idx + heap_id);
		last_dt = *slot; *slot = NULL;
		if (last_dt != dt) {
			uint32_t removed_idx = dt->dt_heap_entry[heap_id];
			_dispatch_timer_heap_resift(dth, last_dt, removed_idx);
		}
	}
	if (unlikely(idx <= _dispatch_timer_heap_capacity(dth->dth_segments - 1))) {
		_dispatch_timer_heap_shrink(dth);
	}

clear_heap_entry:
	dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
	dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_timer_heap_update(dispatch_timer_heap_t dth,
		dispatch_timer_source_refs_t dt)
{
	DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_TARGET_ID], !=,
			DTH_INVALID_ID, "target idx");
	DISPATCH_TIMER_ASSERT(dt->dt_heap_entry[DTH_DEADLINE_ID], !=,
			DTH_INVALID_ID, "deadline idx");


	_dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_TARGET_ID]);
	_dispatch_timer_heap_resift(dth, dt, dt->dt_heap_entry[DTH_DEADLINE_ID]);
}

DISPATCH_ALWAYS_INLINE
static bool
_dispatch_timer_heap_has_new_min(dispatch_timer_heap_t dth,
		uint32_t count, uint32_t mask)
{
	dispatch_timer_source_refs_t dt;
	bool changed = false;
	uint64_t tmp;
	uint32_t tidx;

	for (tidx = 0; tidx < count; tidx++) {
		if (!(mask & (1u << tidx))) {
			continue;
		}

		dt = dth[tidx].dth_min[DTH_TARGET_ID];
		tmp = dt ? dt->dt_timer.target : UINT64_MAX;
		if (dth[tidx].dth_target != tmp) {
			dth[tidx].dth_target = tmp;
			changed = true;
		}
		dt = dth[tidx].dth_min[DTH_DEADLINE_ID];
		tmp = dt ? dt->dt_timer.deadline : UINT64_MAX;
		if (dth[tidx].dth_deadline != tmp) {
			dth[tidx].dth_deadline = tmp;
			changed = true;
		}
	}
	return changed;
}

static inline void
_dispatch_timers_unregister(dispatch_timer_source_refs_t dt)
{
	uint32_t tidx = dt->du_ident;
	dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];

	_dispatch_timer_heap_remove(heap, dt);
	_dispatch_timers_reconfigure = true;
	_dispatch_timers_processing_mask |= 1 << tidx;
	dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
	dt->du_wlh = NULL;
}

static inline void
_dispatch_timers_register(dispatch_timer_source_refs_t dt, uint32_t tidx)
{
	dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
	if (_dispatch_unote_registered(dt)) {
		DISPATCH_TIMER_ASSERT(dt->du_ident, ==, tidx, "tidx");
		_dispatch_timer_heap_update(heap, dt);
	} else {
		dt->du_ident = tidx;
		_dispatch_timer_heap_insert(heap, dt);
	}
	_dispatch_timers_reconfigure = true;
	_dispatch_timers_processing_mask |= 1 << tidx;
	dispatch_assert(dt->du_wlh == NULL || dt->du_wlh == DISPATCH_WLH_ANON);
	dt->du_wlh = DISPATCH_WLH_ANON;
}

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_source_timer_tryarm(dispatch_source_t ds)
{
	dispatch_queue_flags_t oqf, nqf;
	return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
		if (oqf & (DSF_CANCELED | DQF_RELEASED)) {
			// do not install a cancelled timer
			os_atomic_rmw_loop_give_up(break);
		}
		nqf = oqf | DSF_ARMED;
	});
}

// Updates the ordered list of timers based on next fire date for changes to ds.
// Should only be called from the context of _dispatch_mgr_q.
static void
_dispatch_timers_update(dispatch_unote_t du, uint32_t flags)
{
	dispatch_timer_source_refs_t dr = du._dt;
	dispatch_source_t ds = _dispatch_source_from_refs(dr);
	const char *verb = "updated";
	bool will_register, disarm = false;

	DISPATCH_ASSERT_ON_MANAGER_QUEUE();

	if (unlikely(dr->du_ident == DISPATCH_TIMER_IDENT_CANCELED)) {
		dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
		return;
	}

	// Unregister timers that are unconfigured, disabled, suspended or have
	// missed intervals. Rearm after dispatch_set_timer(), resume or source
	// invoke will reenable them
	will_register = !(flags & DISPATCH_TIMERS_UNREGISTER) &&
			dr->dt_timer.target < INT64_MAX &&
			!os_atomic_load2o(ds, ds_pending_data, relaxed) &&
			!DISPATCH_QUEUE_IS_SUSPENDED(ds) &&
			!os_atomic_load2o(dr, dt_pending_config, relaxed);
	if (likely(!_dispatch_unote_registered(dr))) {
		dispatch_assert((flags & DISPATCH_TIMERS_RETAIN_2) == 0);
		if (unlikely(!will_register || !_dispatch_source_timer_tryarm(ds))) {
			return;
		}
		verb = "armed";
	} else if (unlikely(!will_register)) {
		disarm = true;
		verb = "disarmed";
	}

	// The heap owns a +2 on dispatch sources it references
	//
	// _dispatch_timers_run2() also sometimes passes DISPATCH_TIMERS_RETAIN_2
	// when it wants to take over this +2 at the same time we are unregistering
	// the timer from the heap.
	//
	// Compute our refcount balance according to these rules, if our balance
	// would become negative we retain the source upfront, if it is positive, we
	// get rid of the extraneous refcounts after we're done touching the source.
	int refs = will_register ? -2 : 0;
	if (_dispatch_unote_registered(dr) && !(flags & DISPATCH_TIMERS_RETAIN_2)) {
		refs += 2;
	}
	if (refs < 0) {
		dispatch_assert(refs == -2);
		_dispatch_retain_2(ds);
	}

	uint32_t tidx = _dispatch_source_timer_idx(dr);
	if (unlikely(_dispatch_unote_registered(dr) &&
			(!will_register || dr->du_ident != tidx))) {
		_dispatch_timers_unregister(dr);
	}
	if (likely(will_register)) {
		_dispatch_timers_register(dr, tidx);
	}

	if (disarm) {
		_dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
	}
	_dispatch_debug("kevent-source[%p]: %s timer[%p]", ds, verb, dr);
	_dispatch_object_debug(ds, "%s", __func__);
	if (refs > 0) {
		dispatch_assert(refs == 2);
		_dispatch_release_2_tailcall(ds);
	}
}

#define DISPATCH_TIMER_MISSED_MARKER  1ul

DISPATCH_ALWAYS_INLINE
static inline unsigned long
_dispatch_source_timer_compute_missed(dispatch_timer_source_refs_t dt,
		uint64_t now, unsigned long prev)
{
	uint64_t missed = (now - dt->dt_timer.target) / dt->dt_timer.interval;
	if (++missed + prev > LONG_MAX) {
		missed = LONG_MAX - prev;
	}
	if (dt->dt_timer.interval < INT64_MAX) {
		uint64_t push_by = missed * dt->dt_timer.interval;
		dt->dt_timer.target += push_by;
		dt->dt_timer.deadline += push_by;
	} else {
		dt->dt_timer.target = UINT64_MAX;
		dt->dt_timer.deadline = UINT64_MAX;
	}
	prev += missed;
	return prev;
}

DISPATCH_ALWAYS_INLINE
static inline unsigned long
_dispatch_source_timer_data(dispatch_source_t ds, dispatch_unote_t du)
{
	dispatch_timer_source_refs_t dr = du._dt;
	unsigned long data, prev, clear_prev = 0;

	os_atomic_rmw_loop2o(ds, ds_pending_data, prev, clear_prev, relaxed, {
		data = prev >> 1;
		if (unlikely(prev & DISPATCH_TIMER_MISSED_MARKER)) {
			os_atomic_rmw_loop_give_up(goto handle_missed_intervals);
		}
	});
	return data;

handle_missed_intervals:
	// The timer may be in _dispatch_source_invoke2() already for other
	// reasons such as running the registration handler when ds_pending_data
	// is changed by _dispatch_timers_run2() without holding the drain lock.
	//
	// We hence need dependency ordering to pair with the release barrier
	// done by _dispatch_timers_run2() when setting the MISSED_MARKER bit.
	os_atomic_thread_fence(dependency);
	dr = os_atomic_force_dependency_on(dr, data);

	uint64_t now = _dispatch_time_now(DISPATCH_TIMER_CLOCK(dr->du_ident));
	if (now >= dr->dt_timer.target) {
		OS_COMPILER_CAN_ASSUME(dr->dt_timer.interval < INT64_MAX);
		data = _dispatch_source_timer_compute_missed(dr, now, data);
	}

	// When we see the MISSED_MARKER the manager has given up on this timer
	// and expects the handler to call "resume".
	//
	// However, it may not have reflected this into the atomic flags yet
	// so make sure _dispatch_source_invoke2() sees the timer is disarmed
	//
	// The subsequent _dispatch_source_refs_resume() will enqueue the source
	// on the manager and make the changes to `ds_timer` above visible.
	_dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
	os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
	return data;
}

static inline void
_dispatch_timers_run2(dispatch_clock_now_cache_t nows, uint32_t tidx)
{
	dispatch_timer_source_refs_t dr;
	dispatch_source_t ds;
	uint64_t data, pending_data;
	uint64_t now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);

	while ((dr = _dispatch_timers_heap[tidx].dth_min[DTH_TARGET_ID])) {
		DISPATCH_TIMER_ASSERT(dr->du_filter, ==, DISPATCH_EVFILT_TIMER,
				"invalid filter");
		DISPATCH_TIMER_ASSERT(dr->du_ident, ==, tidx, "tidx");
		DISPATCH_TIMER_ASSERT(dr->dt_timer.target, !=, 0, "missing target");
		ds = _dispatch_source_from_refs(dr);
		if (dr->dt_timer.target > now) {
			// Done running timers for now.
			break;
		}
		if (dr->du_fflags & DISPATCH_TIMER_AFTER) {
			_dispatch_trace_timer_fire(dr, 1, 1);
			_dispatch_source_merge_evt(dr, EV_ONESHOT, 1, 0, 0);
			_dispatch_debug("kevent-source[%p]: fired after timer[%p]", ds, dr);
			_dispatch_object_debug(ds, "%s", __func__);
			continue;
		}

		data = os_atomic_load2o(ds, ds_pending_data, relaxed);
		if (unlikely(data)) {
			// the release barrier is required to make the changes
			// to `ds_timer` visible to _dispatch_source_timer_data()
			if (os_atomic_cmpxchg2o(ds, ds_pending_data, data,
					data | DISPATCH_TIMER_MISSED_MARKER, release)) {
				_dispatch_timers_update(dr, DISPATCH_TIMERS_UNREGISTER);
				continue;
			}
		}

		data = _dispatch_source_timer_compute_missed(dr, now, 0);
		_dispatch_timers_update(dr, DISPATCH_TIMERS_RETAIN_2);
		pending_data = data << 1;
		if (!_dispatch_unote_registered(dr) && dr->dt_timer.target < INT64_MAX){
			// if we unregistered because of suspension we have to fake we
			// missed events.
			pending_data |= DISPATCH_TIMER_MISSED_MARKER;
			os_atomic_store2o(ds, ds_pending_data, pending_data, release);
		} else {
			os_atomic_store2o(ds, ds_pending_data, pending_data, relaxed);
		}
		_dispatch_trace_timer_fire(dr, data, data);
		_dispatch_debug("kevent-source[%p]: fired timer[%p]", ds, dr);
		_dispatch_object_debug(ds, "%s", __func__);
		dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
	}
}

DISPATCH_NOINLINE
static void
_dispatch_timers_run(dispatch_clock_now_cache_t nows)
{
	uint32_t tidx;
	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
		if (_dispatch_timers_heap[tidx].dth_count) {
			_dispatch_timers_run2(nows, tidx);
		}
	}
}

#if DISPATCH_HAVE_TIMER_COALESCING
#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
		[DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC

static const uint64_t _dispatch_kevent_coalescing_window[] = {
	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
#if DISPATCH_HAVE_TIMER_QOS
	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
#endif
};
#endif // DISPATCH_HAVE_TIMER_COALESCING

static inline dispatch_timer_delay_s
_dispatch_timers_get_delay(dispatch_timer_heap_t dth, dispatch_clock_t clock,
		uint32_t qos, dispatch_clock_now_cache_t nows)
{
	uint64_t target = dth->dth_target, deadline = dth->dth_deadline;
	uint64_t delta = INT64_MAX, dldelta = INT64_MAX;
	dispatch_timer_delay_s rc;

	dispatch_assert(target <= deadline);
	if (delta == 0 || target >= INT64_MAX) {
		goto done;
	}

	if (qos < DISPATCH_TIMER_QOS_COUNT && dth->dth_count > 2) {
#if DISPATCH_HAVE_TIMER_COALESCING
		// Timer pre-coalescing <rdar://problem/13222034>
		// When we have several timers with this target/deadline bracket:
		//
		//      Target        window  Deadline
		//        V           <-------V
		// t1:    [...........|.................]
		// t2:         [......|.......]
		// t3:             [..|..........]
		// t4:                | [.............]
		//                 ^
		//          Optimal Target
		//
		// Coalescing works better if the Target is delayed to "Optimal", by
		// picking the latest target that isn't too close to the deadline.
		uint64_t window = _dispatch_kevent_coalescing_window[qos];
		if (target + window < deadline) {
			uint64_t latest = deadline - window;
			target = _dispatch_timer_heap_max_target_before(dth, latest);
		}
#endif
	}

	uint64_t now = _dispatch_time_now_cached(clock, nows);
	if (target <= now) {
		delta = 0;
		dldelta = 0;
		goto done;
	}

	uint64_t tmp = target - now;
	if (clock != DISPATCH_CLOCK_WALL) {
		tmp = _dispatch_time_mach2nano(tmp);
	}
	if (tmp < delta) {
		delta = tmp;
	}

	tmp = deadline - now;
	if (clock != DISPATCH_CLOCK_WALL) {
		tmp = _dispatch_time_mach2nano(tmp);
	}
	if (tmp < dldelta) {
		dldelta = tmp;
	}

done:
	rc.delay = delta;
	rc.leeway = delta < INT64_MAX ? dldelta - delta : INT64_MAX;
	return rc;
}

static bool
_dispatch_timers_program2(dispatch_clock_now_cache_t nows, uint32_t tidx)
{
	uint32_t qos = DISPATCH_TIMER_QOS(tidx);
	dispatch_clock_t clock = DISPATCH_TIMER_CLOCK(tidx);
	dispatch_timer_heap_t heap = &_dispatch_timers_heap[tidx];
	dispatch_timer_delay_s range;

	range = _dispatch_timers_get_delay(heap, clock, qos, nows);
	if (range.delay == 0 || range.delay >= INT64_MAX) {
		_dispatch_trace_next_timer_set(NULL, qos);
		if (heap->dth_flags & DTH_ARMED) {
			_dispatch_event_loop_timer_delete(tidx);
		}
		return range.delay == 0;
	}

	_dispatch_trace_next_timer_set(heap->dth_min[DTH_TARGET_ID], qos);
	_dispatch_trace_next_timer_program(range.delay, qos);
	_dispatch_event_loop_timer_arm(tidx, range, nows);
	return false;
}

DISPATCH_NOINLINE
static bool
_dispatch_timers_program(dispatch_clock_now_cache_t nows)
{
	bool poll = false;
	uint32_t tidx, timerm = _dispatch_timers_processing_mask;

	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
		if (timerm & (1 << tidx)) {
			poll |= _dispatch_timers_program2(nows, tidx);
		}
	}
	return poll;
}

DISPATCH_NOINLINE
static bool
_dispatch_timers_configure(void)
{
	// Find out if there is a new target/deadline on the timer lists
	return _dispatch_timer_heap_has_new_min(_dispatch_timers_heap,
			countof(_dispatch_timers_heap), _dispatch_timers_processing_mask);
}

static inline bool
_dispatch_mgr_timers(void)
{
	dispatch_clock_now_cache_s nows = { };
	bool expired = _dispatch_timers_expired;
	if (unlikely(expired)) {
		_dispatch_timers_run(&nows);
	}
	_dispatch_mgr_trace_timers_wakes();
	bool reconfigure = _dispatch_timers_reconfigure;
	if (unlikely(reconfigure || expired)) {
		if (reconfigure) {
			reconfigure = _dispatch_timers_configure();
			_dispatch_timers_reconfigure = false;
		}
		if (reconfigure || expired) {
			expired = _dispatch_timers_expired = _dispatch_timers_program(&nows);
		}
		_dispatch_timers_processing_mask = 0;
	}
	return expired;
}

#pragma mark -
#pragma mark dispatch_mgr

void
_dispatch_mgr_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
		DISPATCH_UNUSED dispatch_qos_t qos)
{
	uint64_t dq_state;
	_dispatch_trace_continuation_push(dq, dou._do);
	if (unlikely(_dispatch_queue_push_update_tail(dq, dou._do))) {
		_dispatch_queue_push_update_head(dq, dou._do);
		dq_state = os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release);
		if (!_dq_state_drain_locked_by_self(dq_state)) {
			_dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
		}
	}
}

DISPATCH_NORETURN
void
_dispatch_mgr_queue_wakeup(DISPATCH_UNUSED dispatch_queue_t dq,
		DISPATCH_UNUSED dispatch_qos_t qos,
		DISPATCH_UNUSED dispatch_wakeup_flags_t flags)
{
	DISPATCH_INTERNAL_CRASH(0, "Don't try to wake up or override the manager");
}

#if DISPATCH_USE_MGR_THREAD
DISPATCH_NOINLINE DISPATCH_NORETURN
static void
_dispatch_mgr_invoke(void)
{
#if DISPATCH_EVENT_BACKEND_KEVENT
	dispatch_kevent_s evbuf[DISPATCH_DEFERRED_ITEMS_EVENT_COUNT];
#endif
	dispatch_deferred_items_s ddi = {
#if DISPATCH_EVENT_BACKEND_KEVENT
		.ddi_maxevents = DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
		.ddi_eventlist = evbuf,
#endif
	};
	bool poll;

	_dispatch_deferred_items_set(&ddi);
	for (;;) {
		_dispatch_mgr_queue_drain();
		poll = _dispatch_mgr_timers();
		poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
		_dispatch_event_loop_drain(poll ? KEVENT_FLAG_IMMEDIATE : 0);
	}
}
#endif // DISPATCH_USE_MGR_THREAD

DISPATCH_NORETURN
void
_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED,
		dispatch_invoke_context_t dic DISPATCH_UNUSED,
		dispatch_invoke_flags_t flags DISPATCH_UNUSED)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
	if (_dispatch_kevent_workqueue_enabled) {
		DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
				"kevent workqueue enabled");
	}
#endif
#if DISPATCH_USE_MGR_THREAD
	_dispatch_queue_set_current(&_dispatch_mgr_q);
	_dispatch_mgr_priority_init();
	_dispatch_queue_mgr_lock(&_dispatch_mgr_q);
	// never returns, so burn bridges behind us & clear stack 2k ahead
	_dispatch_clear_stack(2048);
	_dispatch_mgr_invoke();
#endif
}

#if DISPATCH_USE_KEVENT_WORKQUEUE

#define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((dispatch_priority_t)~0u)

_Static_assert(WORKQ_KEVENT_EVENT_BUFFER_LEN >=
		DISPATCH_DEFERRED_ITEMS_EVENT_COUNT,
		"our list should not be longer than the kernel's");

DISPATCH_ALWAYS_INLINE
static inline dispatch_priority_t
_dispatch_wlh_worker_thread_init(dispatch_wlh_t wlh,
		dispatch_deferred_items_t ddi)
{
	dispatch_assert(wlh);
	dispatch_priority_t old_dbp;

	pthread_priority_t pp = _dispatch_get_priority();
	if (!(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
		// If this thread does not have the event manager flag set, don't setup
		// as the dispatch manager and let the caller know to only process
		// the delivered events.
		//
		// Also add the NEEDS_UNBIND flag so that
		// _dispatch_priority_compute_update knows it has to unbind
		pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
		if (wlh == DISPATCH_WLH_ANON) {
			pp |= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
		} else {
			// pthread sets the flag when it is an event delivery thread
			// so we need to explicitly clear it
			pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
		}
		_dispatch_thread_setspecific(dispatch_priority_key,
				(void *)(uintptr_t)pp);
		if (wlh != DISPATCH_WLH_ANON) {
			_dispatch_debug("wlh[%p]: handling events", wlh);
		} else {
			ddi->ddi_can_stash = true;
		}
		return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER;
	}

	if ((pp & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) ||
			!(pp & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
		// When the phtread kext is delivering kevents to us, and pthread
		// root queues are in use, then the pthread priority TSD is set
		// to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
		//
		// Given that this isn't a valid QoS we need to fixup the TSD,
		// and the best option is to clear the qos/priority bits which tells
		// us to not do any QoS related calls on this thread.
		//
		// However, in that case the manager thread is opted out of QoS,
		// as far as pthread is concerned, and can't be turned into
		// something else, so we can't stash.
		pp &= (pthread_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK;
	}
	// Managers always park without mutating to a regular worker thread, and
	// hence never need to unbind from userland, and when draining a manager,
	// the NEEDS_UNBIND flag would cause the mutation to happen.
	// So we need to strip this flag
	pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
	_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);

	// ensure kevents registered from this thread are registered at manager QoS
	old_dbp = _dispatch_set_basepri(DISPATCH_PRIORITY_FLAG_MANAGER);
	_dispatch_queue_set_current(&_dispatch_mgr_q);
	_dispatch_queue_mgr_lock(&_dispatch_mgr_q);
	return old_dbp;
}

DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_wlh_worker_thread_reset(dispatch_priority_t old_dbp)
{
	bool needs_poll = _dispatch_queue_mgr_unlock(&_dispatch_mgr_q);
	_dispatch_reset_basepri(old_dbp);
	_dispatch_reset_basepri_override();
	_dispatch_queue_set_current(NULL);
	return needs_poll;
}

DISPATCH_ALWAYS_INLINE
static void
_dispatch_wlh_worker_thread(dispatch_wlh_t wlh, dispatch_kevent_t events,
		int *nevents)
{
	_dispatch_introspection_thread_add();
	DISPATCH_PERF_MON_VAR_INIT

	dispatch_deferred_items_s ddi = {
		.ddi_eventlist = events,
	};
	dispatch_priority_t old_dbp;

	old_dbp = _dispatch_wlh_worker_thread_init(wlh, &ddi);
	if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
		_dispatch_perfmon_start_impl(true);
	} else {
		dispatch_assert(wlh == DISPATCH_WLH_ANON);
		wlh = DISPATCH_WLH_ANON;
	}
	_dispatch_deferred_items_set(&ddi);
	_dispatch_event_loop_merge(events, *nevents);

	if (old_dbp != DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
		_dispatch_mgr_queue_drain();
		bool poll = _dispatch_mgr_timers();
		if (_dispatch_wlh_worker_thread_reset(old_dbp)) {
			poll = true;
		}
		if (poll) _dispatch_event_loop_poke(DISPATCH_WLH_MANAGER, 0, 0);
	} else if (ddi.ddi_stashed_dou._do) {
		_dispatch_debug("wlh[%p]: draining deferred item %p", wlh,
				ddi.ddi_stashed_dou._do);
		if (wlh == DISPATCH_WLH_ANON) {
			dispatch_assert(ddi.ddi_nevents == 0);
			_dispatch_deferred_items_set(NULL);
			_dispatch_root_queue_drain_deferred_item(&ddi
					DISPATCH_PERF_MON_ARGS);
		} else {
			_dispatch_root_queue_drain_deferred_wlh(&ddi
					DISPATCH_PERF_MON_ARGS);
		}
	}

	_dispatch_deferred_items_set(NULL);
	if (old_dbp == DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER &&
			!ddi.ddi_stashed_dou._do) {
		_dispatch_perfmon_end(perfmon_thread_event_no_steal);
	}
	_dispatch_debug("returning %d deferred kevents", ddi.ddi_nevents);
	*nevents = ddi.ddi_nevents;
}

DISPATCH_NOINLINE
void
_dispatch_kevent_worker_thread(dispatch_kevent_t *events, int *nevents)
{
	if (!events && !nevents) {
		// events for worker thread request have already been delivered earlier
		return;
	}
	if (!dispatch_assume(*nevents && *events)) return;
	_dispatch_adopt_wlh_anon();
	_dispatch_wlh_worker_thread(DISPATCH_WLH_ANON, *events, nevents);
	_dispatch_reset_wlh();
}


#endif // DISPATCH_USE_KEVENT_WORKQUEUE
#pragma mark -
#pragma mark dispatch_source_debug

static size_t
_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
{
	dispatch_queue_t target = ds->do_targetq;
	dispatch_source_refs_t dr = ds->ds_refs;
	return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%x, "
			"mask = 0x%x, pending_data = 0x%llx, registered = %d, "
			"armed = %d, deleted = %d%s, canceled = %d, ",
			target && target->dq_label ? target->dq_label : "", target,
			dr->du_ident, dr->du_fflags, (unsigned long long)ds->ds_pending_data,
			ds->ds_is_installed, (bool)(ds->dq_atomic_flags & DSF_ARMED),
			(bool)(ds->dq_atomic_flags & DSF_DELETED),
			(ds->dq_atomic_flags & DSF_DEFERRED_DELETE) ? " (pending)" : "",
			(bool)(ds->dq_atomic_flags & DSF_CANCELED));
}

static size_t
_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
{
	dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
	return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx"
			", interval = 0x%llx, flags = 0x%x }, ",
			(unsigned long long)dr->dt_timer.target,
			(unsigned long long)dr->dt_timer.deadline,
			(unsigned long long)dr->dt_timer.interval, dr->du_fflags);
}

size_t
_dispatch_source_debug(dispatch_source_t ds, char *buf, size_t bufsiz)
{
	dispatch_source_refs_t dr = ds->ds_refs;
	size_t offset = 0;
	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
			dx_kind(ds), ds);
	offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
	offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
	if (dr->du_is_timer) {
		offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
	}
	offset += dsnprintf(&buf[offset], bufsiz - offset, "kevent = %p%s, "
			"filter = %s }", dr,  dr->du_is_direct ? " (direct)" : "",
			dr->du_type->dst_kind);
	return offset;
}