#include "internal.h"
#if DISPATCH_USE_INTERNAL_WORKQUEUE
#pragma mark static data for monitoring subsystem
typedef struct dispatch_workq_monitor_s {
dispatch_queue_global_t dq;
int32_t num_runnable;
int32_t target_runnable;
dispatch_unfair_lock_s registered_tid_lock;
dispatch_tid *registered_tids;
int num_registered_tids;
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_NBUCKETS];
#pragma mark Implementation of the monitoring subsystem.
#define WORKQ_MAX_TRACKED_TIDS DISPATCH_WORKQ_MAX_PTHREAD_COUNT
#define WORKQ_OVERSUBSCRIBE_FACTOR 2
static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
static dispatch_once_t _dispatch_workq_init_once_pred;
void
_dispatch_workq_worker_register(dispatch_queue_global_t root_q)
{
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
#if HAVE_DISPATCH_WORKQ_MONITORING
dispatch_qos_t qos = _dispatch_priority_qos(root_q->dq_priority);
if (qos == 0) qos = DISPATCH_QOS_DEFAULT;
int bucket = DISPATCH_QOS_BUCKET(qos);
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[bucket];
dispatch_assert(mon->dq == root_q);
dispatch_tid tid = _dispatch_tid_self();
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
dispatch_assert(mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1);
int worker_id = mon->num_registered_tids++;
mon->registered_tids[worker_id] = tid;
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
#endif // HAVE_DISPATCH_WORKQ_MONITORING
}
void
_dispatch_workq_worker_unregister(dispatch_queue_global_t root_q)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
dispatch_qos_t qos = _dispatch_priority_qos(root_q->dq_priority);
if (qos == 0) qos = DISPATCH_QOS_DEFAULT;
int bucket = DISPATCH_QOS_BUCKET(qos);
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[bucket];
dispatch_assert(mon->dq == root_q);
dispatch_tid tid = _dispatch_tid_self();
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
for (int i = 0; i < mon->num_registered_tids; i++) {
if (mon->registered_tids[i] == tid) {
int last = mon->num_registered_tids - 1;
mon->registered_tids[i] = mon->registered_tids[last];
mon->registered_tids[last] = 0;
mon->num_registered_tids--;
break;
}
}
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
#endif // HAVE_DISPATCH_WORKQ_MONITORING
}
#if HAVE_DISPATCH_WORKQ_MONITORING
#if defined(__linux__)
static void
_dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
{
char path[128];
char buf[4096];
int running_count = 0;
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
for (int i = 0; i < mon->num_registered_tids; i++) {
dispatch_tid tid = mon->registered_tids[i];
int fd;
ssize_t bytes_read = -1;
int r = snprintf(path, sizeof(path), "/proc/%d/stat", tid);
dispatch_assert(r > 0 && r < (int)sizeof(path));
fd = open(path, O_RDONLY | O_NONBLOCK);
if (unlikely(fd == -1)) {
DISPATCH_CLIENT_CRASH(tid,
"workq: registered worker exited prematurely");
} else {
bytes_read = read(fd, buf, sizeof(buf)-1);
(void)close(fd);
}
if (bytes_read > 0) {
buf[bytes_read] = '\0';
char state;
if (sscanf(buf, "%*d %*s %c", &state) == 1) {
if (state == 'R') {
running_count++;
}
} else {
_dispatch_debug("workq: sscanf of state failed for %d", tid);
}
} else {
_dispatch_debug("workq: Failed to read %s", path);
}
}
mon->num_runnable = running_count;
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
}
#else
#error must define _dispatch_workq_count_runnable_workers
#endif
#define foreach_qos_bucket_reverse(name) \
for (name = DISPATCH_QOS_BUCKET(DISPATCH_QOS_MAX); \
name >= DISPATCH_QOS_BUCKET(DISPATCH_QOS_MAINTENANCE); name--)
static void
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
{
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * (int)dispatch_hw_config(active_cpus);
int global_runnable = 0, i;
foreach_qos_bucket_reverse(i) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
dispatch_queue_global_t dq = mon->dq;
if (!_dispatch_queue_class_probe(dq)) {
_dispatch_debug("workq: %s is empty.", dq->dq_label);
continue;
}
_dispatch_workq_count_runnable_workers(mon);
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
dq->dq_label, mon->num_runnable, mon->target_runnable);
global_runnable += mon->num_runnable;
if (mon->num_runnable == 0) {
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
global_runnable += 1; } else if (mon->num_runnable < mon->target_runnable &&
global_runnable < global_soft_max) {
int32_t floor = (1 - WORKQ_OVERSUBSCRIBE_FACTOR) * mon->target_runnable;
int32_t floor2 = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
floor = MAX(floor, floor2);
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
global_runnable += 1; }
}
}
#endif // HAVE_DISPATCH_WORKQ_MONITORING
static void
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
int i, target_runnable = (int)dispatch_hw_config(active_cpus);
foreach_qos_bucket_reverse(i) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
mon->dq = _dispatch_get_root_queue(i, false);
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
mon->registered_tids = buf;
mon->target_runnable = target_runnable;
}
dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,
0, 0, &_dispatch_mgr_q);
dispatch_source_set_timer(ds, dispatch_time(DISPATCH_TIME_NOW, 0),
NSEC_PER_SEC, 0);
dispatch_source_set_event_handler_f(ds, _dispatch_workq_monitor_pools);
dispatch_set_context(ds, ds); dispatch_activate(ds);
#endif // HAVE_DISPATCH_WORKQ_MONITORING
}
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE