#include "internal.h"
#include <os/assumes.h>
#include <mach/mach_port.h>
OS_OBJECT_CLASS_DECL(os_workgroup);
#if !USE_OBJC
OS_OBJECT_VTABLE_INSTANCE(os_workgroup,
(void (*)(_os_object_t))_os_workgroup_xref_dispose,
(void (*)(_os_object_t))_os_workgroup_dispose);
#endif // USE_OBJC
#define WORKGROUP_CLASS OS_OBJECT_VTABLE(os_workgroup)
OS_OBJECT_CLASS_DECL(os_workgroup_interval);
#if !USE_OBJC
OS_OBJECT_VTABLE_INSTANCE(os_workgroup_interval,
(void (*)(_os_object_t))_os_workgroup_interval_xref_dispose,
(void (*)(_os_object_t))_os_workgroup_interval_dispose);
#endif // USE_OBJC
#define WORKGROUP_INTERVAL_CLASS OS_OBJECT_VTABLE(os_workgroup_interval)
OS_OBJECT_CLASS_DECL(os_workgroup_parallel);
#if !USE_OBJC
OS_OBJECT_VTABLE_INSTANCE(os_workgroup_parallel,
(void (*)(_os_object_t))_os_workgroup_xref_dispose,
(void (*)(_os_object_t))_os_workgroup_dispose);
#endif // USE_OBJC
#define WORKGROUP_PARALLEL_CLASS OS_OBJECT_VTABLE(os_workgroup_parallel)
#pragma mark Internal functions
static const struct os_workgroup_attr_s _os_workgroup_attr_default = {
.sig = _OS_WORKGROUP_ATTR_RESOLVED_INIT,
.wg_type = OS_WORKGROUP_TYPE_DEFAULT,
.wg_attr_flags = 0,
};
static const struct os_workgroup_attr_s _os_workgroup_interval_attr_default = {
.sig = _OS_WORKGROUP_ATTR_RESOLVED_INIT,
.wg_type = OS_WORKGROUP_INTERVAL_TYPE_DEFAULT,
.wg_attr_flags = OS_WORKGROUP_ATTR_NONPROPAGATING
};
static const struct os_workgroup_attr_s _os_workgroup_parallel_attr_default = {
.sig = _OS_WORKGROUP_ATTR_RESOLVED_INIT,
.wg_type = OS_WORKGROUP_TYPE_PARALLEL,
.wg_attr_flags = OS_WORKGROUP_ATTR_NONPROPAGATING |
OS_WORKGROUP_ATTR_UNDIFFERENTIATED,
};
void
_os_workgroup_xref_dispose(os_workgroup_t wg)
{
os_workgroup_arena_t arena = wg->wg_arena;
if (arena == NULL) {
return;
}
arena->destructor(arena->client_arena);
free(arena);
}
void
_os_workgroup_interval_xref_dispose(os_workgroup_interval_t wgi)
{
uint64_t wg_state = wgi->wg_state;
if (wg_state & OS_WORKGROUP_INTERVAL_STARTED) {
os_crash("BUG IN CLIENT: Releasing last reference to workgroup interval "
"while an interval has been started");
}
}
static inline bool
_os_workgroup_is_configurable(uint64_t wg_state)
{
return (wg_state & OS_WORKGROUP_OWNER) == OS_WORKGROUP_OWNER;
}
void
_os_workgroup_dispose(os_workgroup_t wg)
{
dispatch_assert(wg->joined_cnt == 0);
kern_return_t kr;
uint64_t wg_state = os_atomic_load(&wg->wg_state, relaxed);
if (_os_workgroup_is_configurable(wg_state)) {
kr = work_interval_destroy(wg->wi);
} else {
kr = mach_port_mod_refs(mach_task_self(), wg->port, MACH_PORT_RIGHT_SEND, -1);
}
os_assumes(kr == KERN_SUCCESS);
if (wg_state & OS_WORKGROUP_LABEL_NEEDS_FREE) {
free((void *)wg->name);
}
}
void
_os_workgroup_debug(os_workgroup_t wg, char *buf, size_t size)
{
snprintf(buf, size, "wg[%p] = {xref = %d, ref = %d, name = %s}",
(void *) wg, wg->do_xref_cnt + 1, wg->do_ref_cnt + 1, wg->name);
}
void
_os_workgroup_interval_dispose(os_workgroup_interval_t wgi)
{
work_interval_instance_free(wgi->wii);
}
#define os_workgroup_inc_refcount(wg) \
_os_object_retain_internal(wg->_as_os_obj);
#define os_workgroup_dec_refcount(wg) \
_os_object_release_internal(wg->_as_os_obj);
void
_os_workgroup_tsd_cleanup(void *ctxt)
{
os_workgroup_t wg = (os_workgroup_t) ctxt;
if (wg != NULL) {
char buf[512];
snprintf(buf, sizeof(buf), "BUG IN CLIENT: Thread exiting without leaving workgroup '%s'", wg->name);
os_crash(buf);
}
}
static os_workgroup_t
_os_workgroup_get_current(void)
{
return (os_workgroup_t) pthread_getspecific(_os_workgroup_key);
}
static void
_os_workgroup_set_current(os_workgroup_t new_wg)
{
if (new_wg != NULL) {
os_workgroup_inc_refcount(new_wg);
}
os_workgroup_t old_wg = _os_workgroup_get_current();
pthread_setspecific(_os_workgroup_key, new_wg);
if (old_wg != NULL) {
os_workgroup_dec_refcount(old_wg);
}
}
static inline bool
_os_workgroup_attr_is_resolved(os_workgroup_attr_t attr)
{
return (attr->sig == _OS_WORKGROUP_ATTR_RESOLVED_INIT);
}
static inline bool
_os_workgroup_client_attr_initialized(os_workgroup_attr_t attr)
{
return (attr->sig == _OS_WORKGROUP_ATTR_SIG_DEFAULT_INIT) ||
(attr->sig == _OS_WORKGROUP_ATTR_SIG_EMPTY_INIT);
}
static inline bool
_os_workgroup_attr_is_propagating(os_workgroup_attr_t attr)
{
return (attr->wg_attr_flags & OS_WORKGROUP_ATTR_NONPROPAGATING) == 0;
}
static inline bool
_os_workgroup_attr_is_differentiated(os_workgroup_attr_t attr)
{
return (attr->wg_attr_flags & OS_WORKGROUP_ATTR_UNDIFFERENTIATED) == 0;
}
static inline bool
_os_workgroup_type_is_interval_type(os_workgroup_type_t wg_type)
{
return (wg_type >= OS_WORKGROUP_INTERVAL_TYPE_DEFAULT) &&
(wg_type <= OS_WORKGROUP_INTERVAL_TYPE_COREMEDIA);
}
static bool
_os_workgroup_type_is_audio_type(os_workgroup_type_t wg_type)
{
return (wg_type == OS_WORKGROUP_INTERVAL_TYPE_COREAUDIO) ||
(wg_type == OS_WORKGROUP_INTERVAL_TYPE_AUDIO_CLIENT);
}
static inline bool
_os_workgroup_type_is_parallel_type(os_workgroup_type_t wg_type)
{
return wg_type == OS_WORKGROUP_TYPE_PARALLEL;
}
static inline bool
_os_workgroup_type_is_default_type(os_workgroup_type_t wg_type)
{
return wg_type == OS_WORKGROUP_TYPE_DEFAULT;
}
static inline bool
_os_workgroup_has_backing_workinterval(os_workgroup_t wg)
{
return wg->wi != NULL;
}
#if !TARGET_OS_SIMULATOR
static os_workgroup_type_t
_wi_flags_to_wg_type(uint32_t wi_flags)
{
uint32_t type = wi_flags & WORK_INTERVAL_TYPE_MASK;
bool is_unrestricted = (wi_flags & WORK_INTERVAL_FLAG_UNRESTRICTED);
switch (type) {
case WORK_INTERVAL_TYPE_DEFAULT:
return OS_WORKGROUP_TYPE_DEFAULT;
case WORK_INTERVAL_TYPE_COREAUDIO:
return (is_unrestricted ? OS_WORKGROUP_INTERVAL_TYPE_AUDIO_CLIENT :
OS_WORKGROUP_INTERVAL_TYPE_COREAUDIO);
case WORK_INTERVAL_TYPE_COREANIMATION:
return OS_WORKGROUP_INTERVAL_TYPE_COREANIMATION;
case WORK_INTERVAL_TYPE_HID_DELIVERY:
return OS_WORKGROUP_INTERVAL_TYPE_HID_DELIVERY;
case WORK_INTERVAL_TYPE_COREMEDIA:
return OS_WORKGROUP_INTERVAL_TYPE_COREMEDIA;
case WORK_INTERVAL_TYPE_CA_CLIENT:
return OS_WORKGROUP_INTERVAL_TYPE_CA_CLIENT;
default:
{
char buf[512];
snprintf(buf, sizeof(buf), "BUG IN DISPATCH: Invalid wi flags = %u", wi_flags);
os_crash(buf);
}
}
}
#endif
static work_interval_t
_os_workgroup_create_work_interval(os_workgroup_attr_t attr)
{
uint32_t flags = WORK_INTERVAL_FLAG_JOINABLE;
switch (attr->wg_type) {
case OS_WORKGROUP_INTERVAL_TYPE_DEFAULT:
flags |= WORK_INTERVAL_TYPE_DEFAULT | WORK_INTERVAL_FLAG_UNRESTRICTED;
break;
case OS_WORKGROUP_INTERVAL_TYPE_COREAUDIO:
flags |= (WORK_INTERVAL_TYPE_COREAUDIO |
WORK_INTERVAL_FLAG_ENABLE_AUTO_JOIN |
WORK_INTERVAL_FLAG_ENABLE_DEFERRED_FINISH);
break;
case OS_WORKGROUP_INTERVAL_TYPE_COREANIMATION:
flags |= WORK_INTERVAL_TYPE_COREANIMATION;
break;
case OS_WORKGROUP_INTERVAL_TYPE_CA_RENDER_SERVER:
flags |= WORK_INTERVAL_TYPE_CA_RENDER_SERVER;
break;
case OS_WORKGROUP_INTERVAL_TYPE_HID_DELIVERY:
flags |= WORK_INTERVAL_TYPE_HID_DELIVERY;
break;
case OS_WORKGROUP_INTERVAL_TYPE_COREMEDIA:
flags |= WORK_INTERVAL_TYPE_COREMEDIA;
break;
case OS_WORKGROUP_INTERVAL_TYPE_AUDIO_CLIENT:
flags |= (WORK_INTERVAL_TYPE_COREAUDIO | WORK_INTERVAL_FLAG_UNRESTRICTED |
WORK_INTERVAL_FLAG_ENABLE_AUTO_JOIN |
WORK_INTERVAL_FLAG_ENABLE_DEFERRED_FINISH);
break;
case OS_WORKGROUP_INTERVAL_TYPE_CA_CLIENT:
flags |= WORK_INTERVAL_TYPE_CA_CLIENT | WORK_INTERVAL_FLAG_UNRESTRICTED;
break;
case OS_WORKGROUP_TYPE_DEFAULT:
flags |= WORK_INTERVAL_FLAG_UNRESTRICTED;
break;
default:
os_crash("Creating an os_workgroup of unknown type");
}
if (_os_workgroup_attr_is_differentiated(attr)) {
flags |= WORK_INTERVAL_FLAG_GROUP;
}
work_interval_t wi;
int rv = work_interval_create(&wi, flags);
if (rv) {
errno = rv;
return NULL;
}
return wi;
}
static inline bool
_os_workgroup_join_token_initialized(os_workgroup_join_token_t token)
{
return (token->sig == _OS_WORKGROUP_JOIN_TOKEN_SIG_INIT);
}
static inline void
_os_workgroup_set_name(os_workgroup_t wg, const char *name)
{
if (name) {
const char *tmp = _dispatch_strdup_if_mutable(name);
if (tmp != name) {
wg->wg_state |= OS_WORKGROUP_LABEL_NEEDS_FREE;
name = tmp;
}
}
wg->name = name;
}
static inline bool
_os_workgroup_client_attr_is_valid(os_workgroup_attr_t attr)
{
return (attr && _os_workgroup_client_attr_initialized(attr));
}
static inline bool
_start_time_is_in_past(os_clockid_t clock, uint64_t start)
{
switch (clock) {
case OS_CLOCK_MACH_ABSOLUTE_TIME:
return start <= mach_absolute_time();
}
}
#pragma mark Private functions
int
os_workgroup_attr_set_interval_type(os_workgroup_attr_t attr,
os_workgroup_interval_type_t interval_type)
{
int ret = 0;
if (_os_workgroup_client_attr_is_valid(attr) &&
_os_workgroup_type_is_interval_type(interval_type)) {
attr->wg_type = interval_type;
} else {
ret = EINVAL;
}
return ret;
}
int
os_workgroup_attr_set_flags(os_workgroup_attr_t attr,
os_workgroup_attr_flags_t flags)
{
int ret = 0;
if (_os_workgroup_client_attr_is_valid(attr)) {
attr->wg_attr_flags = flags;
} else {
ret = EINVAL;
}
return ret;
}
os_workgroup_t
os_workgroup_interval_copy_current_4AudioToolbox(void)
{
os_workgroup_t wg = _os_workgroup_get_current();
if (wg) {
if (_os_workgroup_type_is_audio_type(wg->wg_type)) {
wg = os_retain(wg);
} else {
wg = NULL;
}
}
return wg;
}
#pragma mark Public functions
os_workgroup_t
os_workgroup_create(const char *name, os_workgroup_attr_t attr)
{
os_workgroup_t wg = NULL;
work_interval_t wi = NULL;
os_workgroup_attr_s wga;
if (attr == NULL) {
wga = _os_workgroup_attr_default;
attr = &wga;
} else {
if (!_os_workgroup_client_attr_is_valid(attr)) {
errno = EINVAL;
return NULL;
}
wga = *attr;
attr = &wga;
switch (attr->sig) {
case _OS_WORKGROUP_ATTR_SIG_DEFAULT_INIT:
{
if (attr->wg_attr_flags == 0) {
attr->wg_attr_flags = _os_workgroup_attr_default.wg_attr_flags;
}
if (attr->wg_type == 0) {
attr->wg_type = _os_workgroup_attr_default.wg_type;
}
}
case _OS_WORKGROUP_ATTR_SIG_EMPTY_INIT:
break;
default:
errno = EINVAL;
return NULL;
}
attr->sig = _OS_WORKGROUP_ATTR_RESOLVED_INIT;
}
os_assert(_os_workgroup_attr_is_resolved(attr));
if (!_os_workgroup_type_is_default_type(attr->wg_type)){
errno = EINVAL;
return NULL;
}
if (_os_workgroup_attr_is_propagating(attr)) {
errno = ENOTSUP;
return NULL;
}
wi = _os_workgroup_create_work_interval(attr);
if (wi == NULL) {
return NULL;
}
wg = (os_workgroup_t) _os_object_alloc(WORKGROUP_CLASS,
sizeof(struct os_workgroup_s));
wg->wi = wi;
wg->wg_state = OS_WORKGROUP_OWNER;
wg->wg_type = attr->wg_type;
_os_workgroup_set_name(wg, name);
return wg;
}
os_workgroup_interval_t
os_workgroup_interval_create(const char *name, os_clockid_t clock,
os_workgroup_attr_t attr)
{
os_workgroup_interval_t wgi = NULL;
work_interval_t wi = NULL;
os_workgroup_attr_s wga;
if (attr == NULL) {
wga = _os_workgroup_interval_attr_default;
attr = &wga;
} else {
if (!_os_workgroup_client_attr_is_valid(attr)) {
errno = EINVAL;
return NULL;
}
wga = *attr;
attr = &wga;
if (attr->sig == _OS_WORKGROUP_ATTR_SIG_EMPTY_INIT) {
} else if (attr->sig == _OS_WORKGROUP_ATTR_SIG_DEFAULT_INIT) {
if (attr->wg_attr_flags == 0) {
attr->wg_attr_flags = _os_workgroup_interval_attr_default.wg_attr_flags;
}
if (attr->wg_type == 0) {
attr->wg_type = _os_workgroup_interval_attr_default.wg_type;
}
} else {
errno = EINVAL;
return NULL;
}
attr->sig = _OS_WORKGROUP_ATTR_RESOLVED_INIT;
}
os_assert(_os_workgroup_attr_is_resolved(attr));
if (!_os_workgroup_type_is_interval_type(attr->wg_type) ||
!_os_workgroup_attr_is_differentiated(attr)){
errno = EINVAL;
return NULL;
}
if (_os_workgroup_attr_is_propagating(attr)) {
errno = ENOTSUP;
return NULL;
}
wi = _os_workgroup_create_work_interval(attr);
if (wi == NULL) {
return NULL;
}
wgi = (os_workgroup_interval_t) _os_object_alloc(WORKGROUP_INTERVAL_CLASS,
sizeof(struct os_workgroup_interval_s));
wgi->wi = wi;
wgi->clock = clock;
wgi->wii = work_interval_instance_alloc(wi);
wgi->wii_lock = OS_UNFAIR_LOCK_INIT;
wgi->wg_type = attr->wg_type;
wgi->wg_state = OS_WORKGROUP_OWNER;
_os_workgroup_set_name(wgi->_as_wg, name);
return wgi;
}
int
os_workgroup_join_self(os_workgroup_t wg, os_workgroup_join_token_t token,
os_workgroup_index * __unused id_out)
{
return os_workgroup_join(wg, token);
}
void
os_workgroup_leave_self(os_workgroup_t wg, os_workgroup_join_token_t token)
{
return os_workgroup_leave(wg, token);
}
#pragma mark Public functions
os_workgroup_parallel_t
os_workgroup_parallel_create(const char *name, os_workgroup_attr_t attr)
{
os_workgroup_parallel_t wgp = NULL;
os_workgroup_attr_s wga;
if (attr == NULL) {
wga = _os_workgroup_parallel_attr_default;
attr = &wga;
} else {
if (!_os_workgroup_client_attr_is_valid(attr)) {
errno = EINVAL;
return NULL;
}
wga = *attr;
attr = &wga;
switch (attr->sig) {
case _OS_WORKGROUP_ATTR_SIG_DEFAULT_INIT:
{
if (attr->wg_attr_flags == 0) {
attr->wg_attr_flags = _os_workgroup_parallel_attr_default.wg_attr_flags;
}
if (attr->wg_type == 0) {
attr->wg_type = _os_workgroup_parallel_attr_default.wg_type;
}
}
case _OS_WORKGROUP_ATTR_SIG_EMPTY_INIT:
break;
default:
errno = EINVAL;
return NULL;
}
attr->sig = _OS_WORKGROUP_ATTR_RESOLVED_INIT;
}
os_assert(_os_workgroup_attr_is_resolved(attr));
if (!_os_workgroup_type_is_parallel_type(attr->wg_type)) {
errno = EINVAL;
return NULL;
}
if (_os_workgroup_attr_is_propagating(attr)) {
errno = ENOTSUP;
return NULL;
}
wgp = (os_workgroup_t) _os_object_alloc(WORKGROUP_PARALLEL_CLASS,
sizeof(struct os_workgroup_parallel_s));
wgp->wi = NULL;
wgp->wg_state = OS_WORKGROUP_OWNER;
wgp->wg_type = attr->wg_type;
_os_workgroup_set_name(wgp, name);
return wgp;
}
int
os_workgroup_copy_port(os_workgroup_t wg, mach_port_t *mach_port_out)
{
os_assert(wg != NULL);
os_assert(mach_port_out != NULL);
*mach_port_out = MACH_PORT_NULL;
uint64_t wg_state = os_atomic_load(&wg->wg_state, relaxed);
if (wg_state & OS_WORKGROUP_CANCELED) {
return EINVAL;
}
if (!_os_workgroup_has_backing_workinterval(wg)) {
return EINVAL;
}
if (_os_workgroup_is_configurable(wg_state)) {
return work_interval_copy_port(wg->wi, mach_port_out);
}
kern_return_t kr = mach_port_mod_refs(mach_task_self(), wg->port,
MACH_PORT_RIGHT_SEND, 1);
os_assumes(kr == KERN_SUCCESS);
*mach_port_out = wg->port;
return 0;
}
os_workgroup_t
os_workgroup_create_with_port(const char *name, mach_port_t port)
{
if (!MACH_PORT_VALID(port)) {
return NULL;
}
#if !TARGET_OS_SIMULATOR
uint32_t wi_flags = 0;
int ret = work_interval_get_flags_from_port(port, &wi_flags);
if (ret != 0) {
errno = ret;
return NULL;
}
#endif
os_workgroup_t wg = NULL;
wg = (os_workgroup_t) _os_object_alloc(WORKGROUP_CLASS,
sizeof(struct os_workgroup_s));
_os_workgroup_set_name(wg, name);
kern_return_t kr;
kr = mach_port_mod_refs(mach_task_self(), port, MACH_PORT_RIGHT_SEND, 1);
os_assumes(kr == KERN_SUCCESS);
wg->port = port;
#if !TARGET_OS_SIMULATOR
wg->wg_type = _wi_flags_to_wg_type(wi_flags);
#else
wg->wg_type = OS_WORKGROUP_TYPE_DEFAULT;
#endif
return wg;
}
os_workgroup_t
os_workgroup_create_with_workgroup(const char *name, os_workgroup_t wg)
{
uint64_t wg_state = os_atomic_load(&wg->wg_state, relaxed);
if (wg_state & OS_WORKGROUP_CANCELED) {
errno = EINVAL;
return NULL;
}
os_workgroup_t new_wg = NULL;
new_wg = (os_workgroup_t) _os_object_alloc(WORKGROUP_CLASS,
sizeof(struct os_workgroup_s));
_os_workgroup_set_name(new_wg, name);
new_wg->wg_type = wg->wg_type;
if (_os_workgroup_has_backing_workinterval(wg)) {
kern_return_t kr;
if (_os_workgroup_is_configurable(wg_state)) {
kr = work_interval_copy_port(wg->wi, &new_wg->port);
} else {
kr = mach_port_mod_refs(mach_task_self(), wg->port, MACH_PORT_RIGHT_SEND, 1);
new_wg->port = wg->port;
}
os_assumes(kr == KERN_SUCCESS);
}
return new_wg;
}
int
os_workgroup_max_parallel_threads(os_workgroup_t wg, os_workgroup_mpt_attr_t __unused attr)
{
os_assert(wg != NULL);
qos_class_t qos = QOS_CLASS_USER_INTERACTIVE;
switch (wg->wg_type) {
case OS_WORKGROUP_INTERVAL_TYPE_COREAUDIO:
case OS_WORKGROUP_INTERVAL_TYPE_AUDIO_CLIENT:
return pthread_time_constraint_max_parallelism(0);
default:
return pthread_qos_max_parallelism(qos, 0);
}
}
int
os_workgroup_join(os_workgroup_t wg, os_workgroup_join_token_t token)
{
os_workgroup_t cur_wg = _os_workgroup_get_current();
if (cur_wg) {
return EALREADY;
}
uint64_t wg_state = os_atomic_load(&wg->wg_state, relaxed);
if (wg_state & OS_WORKGROUP_CANCELED) {
return EINVAL;
}
int rv = 0;
if (_os_workgroup_has_backing_workinterval(wg)) {
if (_os_workgroup_is_configurable(wg_state)) {
rv = work_interval_join(wg->wi);
} else {
rv = work_interval_join_port(wg->port);
}
}
if (rv) {
errno = rv;
return rv;
}
os_atomic_inc(&wg->joined_cnt, relaxed);
bzero(token, sizeof(struct os_workgroup_join_token_s));
token->sig = _OS_WORKGROUP_JOIN_TOKEN_SIG_INIT;
token->thread = _dispatch_thread_port();
token->old_wg = cur_wg;
token->new_wg = wg;
_os_workgroup_set_current(wg);
return 0;
}
void
os_workgroup_leave(os_workgroup_t wg, os_workgroup_join_token_t token)
{
if (!_os_workgroup_join_token_initialized(token)) {
os_crash("Join token is corrupt");
}
if (token->thread != _dispatch_thread_port()) {
os_crash("Join token provided is for a different thread");
}
os_workgroup_t cur_wg = _os_workgroup_get_current();
if ((token->new_wg != cur_wg) || (cur_wg != wg)) {
os_crash("Join token provided is for a different workgroup than the "
"last one joined by thread");
}
os_assert(token->old_wg == NULL);
if (_os_workgroup_has_backing_workinterval(wg)) {
dispatch_assume(work_interval_leave() == 0);
}
uint32_t old_joined_cnt = os_atomic_dec_orig(&wg->joined_cnt, relaxed);
if (old_joined_cnt == 0) {
DISPATCH_INTERNAL_CRASH(0, "Joined count underflowed");
}
_os_workgroup_set_current(NULL);
}
int
os_workgroup_set_working_arena(os_workgroup_t wg, void * _Nullable client_arena,
uint32_t max_workers, os_workgroup_working_arena_destructor_t destructor)
{
size_t arena_size;
if (os_mul_and_add_overflow(sizeof(mach_port_t), max_workers, sizeof(struct os_workgroup_arena_s), &arena_size)) {
errno = ENOMEM;
return errno;
}
os_workgroup_arena_t wg_arena = calloc(arena_size, 1);
if (wg_arena == NULL) {
errno = ENOMEM;
return errno;
}
wg_arena->max_workers = max_workers;
wg_arena->client_arena = client_arena;
wg_arena->destructor = destructor;
_os_workgroup_atomic_flags old_state, new_state;
os_workgroup_arena_t old_arena = NULL;
bool success = os_atomic_rmw_loop(&wg->wg_atomic_flags, old_state, new_state, relaxed, {
if (_wg_joined_cnt(old_state) > 0) { os_atomic_rmw_loop_give_up(break);
}
old_arena = _wg_arena(old_state);
new_state = old_state;
new_state &= ~OS_WORKGROUP_ARENA_MASK;
new_state |= (uint64_t) wg_arena;
});
if (!success) {
errno = EBUSY;
free(wg_arena);
return errno;
}
if (old_arena) {
old_arena->destructor(old_arena->client_arena);
free(old_arena);
}
return 0;
}
void *
os_workgroup_get_working_arena(os_workgroup_t wg, os_workgroup_index *_Nullable index_out)
{
if (_os_workgroup_get_current() != wg) {
os_crash("Thread is not a member of the workgroup");
}
dispatch_assert(wg->joined_cnt > 0);
os_workgroup_arena_t arena = os_atomic_load(&wg->wg_arena, relaxed);
if (arena == NULL) {
return NULL;
}
if (index_out != NULL && arena->max_workers == 0) {
os_crash("The arena associated with workgroup is not to be partitioned");
}
if (index_out) {
uint32_t found_index = 0;
bool found = false;
for (uint32_t i = 0; i < arena->max_workers; i++) {
if (arena->arena_indices[i] == _dispatch_thread_port()) {
found_index = i;
found = true;
break;
}
}
if (!found) {
found_index = os_atomic_inc_orig(&arena->next_worker_index, relaxed);
if (found_index >= arena->max_workers) {
os_crash("Exceeded the maximum number of workers who can access the arena");
}
arena->arena_indices[found_index] = _dispatch_thread_port();
}
*index_out = found_index;
}
return arena->client_arena;
}
void
os_workgroup_cancel(os_workgroup_t wg)
{
os_atomic_or(&wg->wg_state, OS_WORKGROUP_CANCELED, relaxed);
}
bool
os_workgroup_testcancel(os_workgroup_t wg)
{
return os_atomic_load(&wg->wg_state, relaxed) & OS_WORKGROUP_CANCELED;
}
int
os_workgroup_interval_start(os_workgroup_interval_t wgi, uint64_t start,
uint64_t deadline, os_workgroup_interval_data_t __unused data)
{
os_workgroup_t cur_wg = _os_workgroup_get_current();
if (cur_wg != wgi->_as_wg) {
os_crash("Thread is not a member of the workgroup");
}
if (deadline < start || (!_start_time_is_in_past(wgi->clock, start))) {
return EINVAL;
}
bool success = os_unfair_lock_trylock(&wgi->wii_lock);
if (!success) {
return EBUSY;
}
int rv = 0;
uint64_t old_state, new_state;
os_atomic_rmw_loop(&wgi->wg_state, old_state, new_state, relaxed, {
if (old_state & (OS_WORKGROUP_CANCELED | OS_WORKGROUP_INTERVAL_STARTED)) {
rv = EINVAL;
os_atomic_rmw_loop_give_up(break);
}
if (!_os_workgroup_is_configurable(old_state)) {
rv = EPERM;
os_atomic_rmw_loop_give_up(break);
}
new_state = old_state | OS_WORKGROUP_INTERVAL_STARTED;
});
if (rv) {
os_unfair_lock_unlock(&wgi->wii_lock);
return rv;
}
work_interval_instance_t wii = wgi->wii;
work_interval_instance_clear(wii);
work_interval_instance_set_start(wii, start);
work_interval_instance_set_deadline(wii, deadline);
rv = work_interval_instance_start(wii);
if (rv != 0) {
rv = errno;
os_atomic_and(&wgi->wg_state, ~OS_WORKGROUP_INTERVAL_STARTED, relaxed);
}
os_unfair_lock_unlock(&wgi->wii_lock);
return rv;
}
int
os_workgroup_interval_update(os_workgroup_interval_t wgi, uint64_t deadline,
os_workgroup_interval_data_t __unused data)
{
os_workgroup_t cur_wg = _os_workgroup_get_current();
if (cur_wg != wgi->_as_wg) {
os_crash("Thread is not a member of the workgroup");
}
bool success = os_unfair_lock_trylock(&wgi->wii_lock);
if (!success) {
return EBUSY;
}
uint64_t wg_state = os_atomic_load(&wgi->wg_state, relaxed);
if (!_os_workgroup_is_configurable(wg_state)) {
os_unfair_lock_unlock(&wgi->wii_lock);
return EPERM;
}
if (!(wg_state & OS_WORKGROUP_INTERVAL_STARTED)) {
os_unfair_lock_unlock(&wgi->wii_lock);
return EINVAL;
}
work_interval_instance_t wii = wgi->wii;
work_interval_instance_set_deadline(wii, deadline);
int rv = work_interval_instance_update(wii);
if (rv != 0) {
rv = errno;
}
os_unfair_lock_unlock(&wgi->wii_lock);
return rv;
}
int
os_workgroup_interval_finish(os_workgroup_interval_t wgi,
os_workgroup_interval_data_t __unused data)
{
os_workgroup_t cur_wg = _os_workgroup_get_current();
if (cur_wg != wgi->_as_wg) {
os_crash("Thread is not a member of the workgroup");
}
bool success = os_unfair_lock_trylock(&wgi->wii_lock);
if (!success) {
return EBUSY;
}
uint64_t wg_state = os_atomic_load(&wgi->wg_state, relaxed);
if (!_os_workgroup_is_configurable(wg_state)) {
os_unfair_lock_unlock(&wgi->wii_lock);
return EPERM;
}
if (!(wg_state & OS_WORKGROUP_INTERVAL_STARTED)) {
os_unfair_lock_unlock(&wgi->wii_lock);
return EINVAL;
}
work_interval_instance_t wii = wgi->wii;
uint64_t current_finish = 0;
switch (wgi->clock) {
case OS_CLOCK_MACH_ABSOLUTE_TIME:
current_finish = mach_absolute_time();
break;
}
work_interval_instance_set_finish(wii, current_finish);
int rv = work_interval_instance_finish(wii);
if (rv != 0) {
rv = errno;
} else {
os_atomic_and(&wgi->wg_state, ~OS_WORKGROUP_INTERVAL_STARTED, relaxed);
}
os_unfair_lock_unlock(&wgi->wii_lock);
return rv;
}