mpsc_queue.h   [plain text]


/*
 * Copyright (c) 2018 Apple Inc. All rights reserved.
 *
 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
 *
 * This file contains Original Code and/or Modifications of Original Code
 * as defined in and that are subject to the Apple Public Source License
 * Version 2.0 (the 'License'). You may not use this file except in
 * compliance with the License. The rights granted to you under the License
 * may not be used to create, or enable the creation or redistribution of,
 * unlawful or unlicensed copies of an Apple operating system, or to
 * circumvent, violate, or enable the circumvention or violation of, any
 * terms of an Apple operating system software license agreement.
 *
 * Please obtain a copy of the License at
 * http://www.opensource.apple.com/apsl/ and read it before using this file.
 *
 * The Original Code and all software distributed under the License are
 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
 * Please see the License for the specific language governing rights and
 * limitations under the License.
 *
 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
 */

#ifndef _KERN_MPSC_QUEUE_H_
#define _KERN_MPSC_QUEUE_H_

#ifdef XNU_KERNEL_PRIVATE

#include <machine/atomic.h>
#include <kern/macro_help.h>
#include <kern/thread_call.h>

#endif // XNU_KERNEL_PRIVATE

#include <sys/cdefs.h>

__BEGIN_DECLS

/*!
 * @typedef struct mpsc_queue_chain
 *
 * @brief
 * Type for the intrusive linkage used by MPSC queues.
 */
typedef struct mpsc_queue_chain {
	struct mpsc_queue_chain *_Atomic mpqc_next;
} *mpsc_queue_chain_t;

/*!
 * @typedef struct mpsc_queue_head
 *
 * @brief
 * The type for a multi-producer single-consumer queue.
 *
 * @discussion
 * MPSC queues allow for producers to not be affected by other producers or the
 * consumer. Which means in turn that having producers in interrupt context
 * does not require that other producers disable interrupts like a traditional
 * spinlock based approach would require.
 *
 * These queues shine when data is produced from the entire system and is
 * consumed from a single serial context (logging, tracing, ...).
 * mpsc_daemon_queue_t is provided as a fully ready/easy-to-use pre-packaged
 * solution for these common use cases.
 *
 * - mpsc_queue_append() can be used to append a single item
 * - mpsc_queue_append_list() can be used to append a batch of items at once.
 *
 * Functions for the consumer side assume proper serialization that is not
 * provided by the MPSC queue itself. Dequeuing doesn't require preemption
 * to be disabled.
 *
 * <h2>Algorithm</h2>
 *
 * The base of the enqueue algorithm is a single atomic exchange (first half,
 * called __mpsc_queue_append_update_tail) and a list fixup (2nd half, called
 * __mpsc_queue_append_update_prev).
 *
 * Graphically, enqueuing `X` looks like this, with each step being done
 * atomically (for the empty queue case, `tail` points to `head`):
 *
 *     | orig state          | update_tail         | update_prev         |
 *     +---------------------+---------------------+---------------------+
 *     |                     |                     |                     |
 *     | head -> e1 -> e2 -. | head -> e1 -> e2 -. | head -> e1 -> e2 -. |
 *     |                   | |                   | |                   | |
 *     |         ,- ... <--' |         ,- ... <--' |         ,- ... <--' |
 *     |         |           |         |           |         |           |
 *     |         v           |         v           |         v           |
 *     | tail -> eN -> NULL  | tail    eN -> NULL  | tail    eN          |
 *     |                     |   |                 |   |     |           |
 *     |                     |   |                 |   |     v           |
 *     |         X -> NULL   |   `---> X -> NULL   |   '---> X -> NULL   |
 *     |                     |                     |                     |
 *     +---------------------+---------------------+---------------------+
 *
 *
 * There is a small 1-instruction gap of inconsistency which makes the chosen
 * algorithm non linearizable, and requires enqueuers to disable preemption
 * during the enqueue so as not to starve the consumer forever.
 *
 * As far as memory visibility is concerned, enqueuing uses a release fence in
 * update_tail which pairs with memory fences in mpsc_queue_dequeue_batch().
 *
 * Note: as far as the data structure in memory, its layout is equivalent to
 *       a BSD <sys/queue.h> STAILQ. However because of this inconsistency
 *       window and memory ordering concerns, it is incorrect to use STAILQ
 *       macros on an MPSC queue.
 */
typedef struct mpsc_queue_head {
	struct mpsc_queue_chain mpqh_head;
	struct mpsc_queue_chain *_Atomic mpqh_tail;
} *mpsc_queue_head_t;

/*!
 * @macro MPSC_QUEUE_INITIALIZER
 *
 * @brief
 * Macro to use in static initializers for mpsc queues.
 *
 * @param head
 * The name of the variable to initialize.
 */
#define MPSC_QUEUE_INITIALIZER(head)   { .mpqh_tail = &(head).mpqh_head }

#ifdef XNU_KERNEL_PRIVATE

/*!
 * @function mpsc_queue_init
 *
 * @brief
 * Dynamically initialize an mpsc queue.
 *
 * @discussion
 * This initialization assumes that the object holding the queue head
 * is initialized before it can be made visible to other threads/cores.
 *
 * @param q
 * The queue to initialize.
 */
static inline void
mpsc_queue_init(mpsc_queue_head_t q)
{
	os_atomic_init(&q->mpqh_head.mpqc_next, NULL);
	os_atomic_init(&q->mpqh_tail, &q->mpqh_head);
}

/*!
 * @typedef enum mpsc_queue_options
 */
typedef enum mpsc_queue_options {
	MPSC_QUEUE_NONE                = 0,
	MPSC_QUEUE_DISABLE_PREEMPTION  = 1 << 0,
} mpsc_queue_options_t;

/*!
 * @const MPSC_QUEUE_NOTQUEUED_MARKER
 *
 * @brief
 * Magical marker that implementations can use to poison the chain pointer of
 * elements not on any MPSC queue.
 */
#define MPSC_QUEUE_NOTQUEUED_MARKER ((mpsc_queue_chain_t)~0ul)

/*!
 * @macro mpsc_queue_element
 *
 * @brief
 * Macro to find the pointer of an element back from its MPSC chain linkage.
 */
#define mpsc_queue_element(ptr, type, field) __container_of(ptr, type, field)


#pragma mark Advanced Multi Producer calls

/**
 * @function __mpsc_queue_append_update_tail
 *
 * @brief
 * First half of the enqueue operation onto a multi-producer single-consumer
 * queue.
 *
 * @discussion
 * This function is available for algorithms that need to do things (such as
 * taking a refcount) before calling __mpsc_queue_append_update_prev().
 *
 * Preemption should be disabled before calling
 * __mpsc_queue_append_update_tail(), and until
 * __mpsc_queue_append_update_prev() has returned.
 *
 * @param q
 * The queue to update.
 *
 * @param elm
 * The element to append to `q`.
 *
 * @returns
 * A token to later pass to __mpsc_queue_append_update_prev()
 * to complete the enqueue.
 */
static inline mpsc_queue_chain_t
__mpsc_queue_append_update_tail(mpsc_queue_head_t q, mpsc_queue_chain_t elm)
{
	os_atomic_store(&elm->mpqc_next, NULL, relaxed);
	return os_atomic_xchg(&q->mpqh_tail, elm, release);
}

/**
 * @function __mpsc_queue_append_was_empty
 *
 * @brief
 * Tests whether the queue was empty at the time
 * __mpsc_queue_append_update_tail() was called.
 *
 * @param q
 * The queue to test emptiness for.
 *
 * @param prev
 * The token returned by __mpsc_queue_append_update_tail().
 *
 * @returns
 * Whether the queue was empty (true) or not (false).
 */
static inline bool
__mpsc_queue_append_was_empty(mpsc_queue_head_t q, mpsc_queue_chain_t prev)
{
	return &q->mpqh_head == prev;
}

/**
 * @function __mpsc_queue_append_update_prev
 *
 * @brief
 * Second half of the enqueue operation onto a multi-producer single-consumer
 * queue.
 *
 * @discussion
 * This function is available for algorithms that need to do things (such as
 * taking a refcount) before calling __mpsc_queue_append_update_prev().
 *
 * Preemption should be disabled before calling
 * __mpsc_queue_append_update_tail(), and until
 * __mpsc_queue_append_update_prev() has returned.
 *
 * @param prev
 * The token returned by __mpsc_queue_append_update_tail().
 *
 * @param elm
 * The element to append to the queue.
 */
static inline void
__mpsc_queue_append_update_prev(mpsc_queue_chain_t prev, mpsc_queue_chain_t elm)
{
	os_atomic_store(&prev->mpqc_next, elm, relaxed);
}


#pragma mark Multi Producer calls

/**
 * @function mpsc_queue_append_list
 *
 * @brief
 * Enqueues a list of elements onto a queue.
 *
 * @discussion
 * This enqueues a list that has to be fully formed from `first` to `last`
 * at the end of `q`.
 *
 * Preemption should be disabled when calling mpsc_queue_append_list().
 *
 * @param q
 * The queue to update.
 *
 * @param first
 * The first of the list elements being appended.
 *
 * @param last
 * The last of the list elements being appended.
 */
static inline bool
mpsc_queue_append_list(mpsc_queue_head_t q, mpsc_queue_chain_t first,
    mpsc_queue_chain_t last)
{
	mpsc_queue_chain_t prev = __mpsc_queue_append_update_tail(q, last);
	__mpsc_queue_append_update_prev(prev, first);
	return __mpsc_queue_append_was_empty(q, prev);
}

/**
 * @function __mpsc_queue_append_update_tail
 *
 * @brief
 * Enqueues an element onto a queue.
 *
 * @discussion
 * Preemption should be disabled when calling mpsc_queue_append().
 *
 * @param q    the queue to update
 * @param elm  the element to append
 */
static inline bool
mpsc_queue_append(mpsc_queue_head_t q, mpsc_queue_chain_t elm)
{
	return mpsc_queue_append_list(q, elm, elm);
}


#pragma mark Single Consumer calls

/**
 * @function mpsc_queue_dequeue_batch()
 *
 * @brief
 * Atomically empty a queue at once and return the batch head and tail.
 *
 * @discussion
 * Consumer function, must be called in a serialized way with respect to any
 * other consumer function.
 *
 * @param q
 * The queue
 *
 * @param tail
 * An out pointer filled with the last element captured.
 *
 * @param dependency
 * A dependency token (to rely on consume / hardware dependencies)
 * When not trying to take advantage of hardware dependencies, just pass NULL.
 *
 * @returns
 * The first element of the batch if any, or NULL the queue was empty.
 */
mpsc_queue_chain_t
mpsc_queue_dequeue_batch(mpsc_queue_head_t q, mpsc_queue_chain_t *tail,
    os_atomic_dependency_t dependency);

/**
 * @function mpsc_queue_batch_next()
 *
 * @brief
 * Function used to consume an element from a batch dequeued with
 * mpsc_queue_dequeue_batch().
 *
 * @discussion
 * Once a batch has been dequeued, there is no need to hold the consumer lock
 * anymore to consume it.
 *
 * mpsc_queue_batch_foreach_safe() is the preferred interface to consume
 * the whole batch.
 *
 * @param cur
 * The current inspected element of the batch (must be the batch head or
 * a value returned by mpsc_queue_batch_next()).
 *
 * @param tail
 * The last element of the batch.
 *
 * @returns
 * The next element if any.
 */
mpsc_queue_chain_t
mpsc_queue_batch_next(mpsc_queue_chain_t cur, mpsc_queue_chain_t tail);

/**
 * @macro mpsc_queue_batch_foreach_safe
 *
 * @brief
 * Macro used to enumerate a batch dequeued with mpsc_queue_dequeue_batch().
 *
 * @param item
 * The item being currently visited.
 *
 * @param head
 * The first element of the batch.
 *
 * @param tail
 * The last element of the batch.
 */
#define mpsc_queue_batch_foreach_safe(item, head, tail) \
	        for (mpsc_queue_chain_t __tmp, __item = (head), __tail = (tail); \
	                        __tmp = mpsc_queue_batch_next(__item, __tail), (item) = __item; \
	                        __item = __tmp)

/**
 * @function mpsc_queue_restore_batch()
 *
 * @brief
 * "Restore"s a batch at the head of the queue.
 *
 * @discussion
 * Consumer function, must be called in a serialized way with respect to any
 * other consumer function.
 *
 * @param q
 * The queue
 *
 * @param first
 * The first element to put back.
 *
 * @param last
 * The last element to put back.
 * It is the responsibility of the caller to ensure the linkages from first to
 * last are properly set up before calling this function.
 */
void
mpsc_queue_restore_batch(mpsc_queue_head_t q, mpsc_queue_chain_t first,
    mpsc_queue_chain_t last);


#pragma mark "GCD"-like facilities

/*!
 * @typedef struct mpsc_daemon_queue
 *
 * @brief
 * Daemon queues are a ready-to use packaging of the low level MPSC queue
 * primitive.
 *
 * @discussion
 * mpsc_queue_t requires handling of state transitions of the queue and
 * dequeuing yourself, which is a non trivial task.
 *
 * Daemon queues are a simple packaged solution that allows for mpsc_queue_t to
 * form hierarchies (mostly for layering purposes), and be serviced at the
 * bottom of such a hierarchy by a thread or a thread call.
 *
 * Daemon queues assume homogenous items, and are setup with an `invoke`
 * callback that is called in the dequeuer on every item as they are dequeued.
 */
typedef struct mpsc_daemon_queue *mpsc_daemon_queue_t;

/*!
 * @typedef struct mpsc_daemon_queue
 *
 * @brief
 * The type for MPSC Daemon Queues invoke callbacks.
 */
typedef void (*mpsc_daemon_invoke_fn_t)(mpsc_queue_chain_t elm,
    mpsc_daemon_queue_t dq);

/*!
 * @enum mpsc_daemon_queue_kind
 *
 * @brief
 * Internal type, not to be used by clients.
 */
typedef enum mpsc_daemon_queue_kind {
	MPSC_QUEUE_KIND_UNKNOWN,
	MPSC_QUEUE_KIND_NESTED,
	MPSC_QUEUE_KIND_THREAD,
	MPSC_QUEUE_KIND_THREAD_CRITICAL,
	MPSC_QUEUE_KIND_THREAD_CALL,
} mpsc_daemon_queue_kind_t;

/*!
 * @enum mpsc_daemon_queue_state
 *
 * @brief
 * Internal type, not to be used by clients.
 */
__options_decl(mpsc_daemon_queue_state_t, uint32_t, {
	MPSC_QUEUE_STATE_DRAINING = 0x0001,
	MPSC_QUEUE_STATE_WAKEUP   = 0x0002,
	MPSC_QUEUE_STATE_CANCELED = 0x0004,
});

struct mpsc_daemon_queue {
	mpsc_daemon_queue_kind_t    mpd_kind;
	mpsc_daemon_queue_state_t _Atomic mpd_state;
	mpsc_daemon_invoke_fn_t     mpd_invoke;
	union {
		mpsc_daemon_queue_t     mpd_target;
		struct thread          *mpd_thread;
		struct thread_call     *mpd_call;
	};
	struct mpsc_queue_head      mpd_queue;
	struct mpsc_queue_chain     mpd_chain;
};

/*!
 * @function mpsc_daemon_queue_init_with_thread
 *
 * @brief
 * Sets up a daemon queue to be a base queue drained by a kernel thread.
 *
 * @discussion
 * The function will allocate the thread and start it in assert_wait.
 *
 * @param dq
 * The queue to initialize
 *
 * @param invoke
 * The invoke function called on individual items on the queue during drain.
 *
 * @param pri
 * The scheduler priority for the created thread.
 *
 * @param name
 * The name to give to the created thread.
 *
 * @returns
 * Whether creating the thread was successful.
 */
kern_return_t
mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
    mpsc_daemon_invoke_fn_t invoke, int pri, const char *name);


/*!
 * @function mpsc_daemon_queue_init_with_thread_call
 *
 * @brief
 * Sets up a daemon queue to be a base queue drained by a thread call.
 *
 * @param dq
 * The queue to initialize
 *
 * @param invoke
 * The invoke function called on individual items on the queue during drain.
 *
 * @param pri
 * The priority the thread call will run at.
 */
void
mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq,
    mpsc_daemon_invoke_fn_t invoke, thread_call_priority_t pri);

/*!
 * @function mpsc_daemon_queue_init_with_target
 *
 * @brief
 * Sets up a daemon queue to target another daemon queue.
 *
 * @discussion
 * The targetting relationship is useful for subsystem layering purposes only.
 * Because draining a given queue is atomic with respect to its target, target
 * queue hierarchies are prone to starvation.
 *
 * @param dq
 * The queue to initialize
 *
 * @param invoke
 * The invoke function called on individual items on the queue during drain.
 *
 * @param target
 * The target queue of the initialized queue, which has to be initialized with
 * the mpsc_daemon_queue_nested_invoke invoke handler.
 */
void
mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq,
    mpsc_daemon_invoke_fn_t invoke, mpsc_daemon_queue_t target);

/*!
 * @function mpsc_daemon_queue_nested_invoke
 *
 * @brief
 * The invoke function to pass to mpsc_daemon_queue_init_* when a queue is meant
 * to be targeted by other queues.
 */
void
mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm,
    mpsc_daemon_queue_t dq);

/*!
 * @function mpsc_daemon_queue_cancel_and_wait
 *
 * @brief
 * Cancels the queue so that the object owning it can be destroyed.
 *
 * @discussion
 * This interface will cancel the queue and wait synchronously for the
 * cancelation to have taken effect, possibly waiting on elements currently
 * draining.
 *
 * Sending objects to the daemon queue after cancelation is undefined.
 *
 * Calling this function multiple times is undefined.
 *
 * Tearing down daemon queue hierarchies is the responsibility of the adopter.
 */
void
mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq);

/*!
 * @function mpsc_daemon_enqueue
 *
 * @brief
 * Send ("async") an item to a given daemon on a given queue.
 *
 * @discussion
 * It is the responsibility of the caller to ensure preemption is disabled when
 * this call is made.
 *
 * @param dq
 * The daemon queue to enqueue the element onto.
 *
 * @param elm
 * The item to enqueue.
 *
 * @param options
 * Options applicable to the enqueue. In particupar passing
 * MPSC_QUEUE_DISABLE_PREEMPTION makes sure preemption is properly disabled
 * during the enqueue.
 */
void
mpsc_daemon_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm,
    mpsc_queue_options_t options);


#pragma mark Deferred deallocation daemon

/*!
 * @function thread_deallocate_daemon_init
 *
 * @brief
 * Initializes the deferred deallocation daemon, called by thread_daemon_init().
 *
 * @discussion
 * The deferred deallocation daemon is a kernel thread based daemon queue that
 * is targeted by nested daemon queues.
 *
 * It is used to perform deferred deallocation for objects that can't safely be
 * deallocated from the context where the deallocation should normally occur.
 *
 * Subsystems using it are for example: turnstiles, workqueues, threads.
 *
 * @warning
 * New queues should be added to this daemon with great care,
 * as abusing it can lead to unbounded amount of kernel work.
 */
void
thread_deallocate_daemon_init(void);

/*!
 * @function thread_deallocate_daemon_register_queue
 *
 * @brief
 * Dynamically register a queue for deferred deletion with the deferred
 * deallocation daemon.
 *
 * @param dq
 * The daemon queue to register with the deferred deallocation daemon.
 *
 * @param invoke
 * The callback called on every element of this queue by the deallocation
 * daemon.
 */
void
thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq,
    mpsc_daemon_invoke_fn_t invoke);


#pragma mark tests
#if DEBUG || DEVELOPMENT

int
mpsc_test_pingpong(uint64_t count, uint64_t *out);

#endif /* DEBUG || DEVELOPMENT */

#endif /* XNU_KERNEL_PRIVATE */

__END_DECLS

#endif /* _KERN_MPSC_QUEUE_H_ */