#include <unistd.h>
#include <stdio.h>
#include <math.h>
#include <sys/wait.h>
#include <sys/param.h>
#include <sys/kdebug.h>
#include <sys/types.h>
#include <sys/ptrace.h>
#include <semaphore.h>
#include <stdlib.h>
#include <pthread.h>
#include <fcntl.h>
#include <errno.h>
#include <err.h>
#include <string.h>
#include <spawn.h>
#include <spawn_private.h>
#include <sys/spawn_internal.h>
#include <mach-o/dyld.h>
#include <libkern/OSAtomic.h>
#include <mach/mach_time.h>
#include <mach/mach.h>
#include <mach/task.h>
#include <mach/semaphore.h>
typedef enum wake_type { WAKE_BROADCAST_ONESEM, WAKE_BROADCAST_PERTHREAD, WAKE_CHAIN } wake_type_t;
typedef enum my_policy_type { MY_POLICY_REALTIME, MY_POLICY_TIMESHARE, MY_POLICY_FIXEDPRI } my_policy_type_t;
#define assert(truth, label) do { if(!(truth)) { printf("Thread %p: failure on line %d\n", pthread_self(), __LINE__); goto label; } } while (0)
#define CONSTRAINT_NANOS (20000000ll)
#define COMPUTATION_NANOS (10000000ll)
#define TRACEWORTHY_NANOS (10000000ll)
#if DEBUG
#define debug_log(args...) printf(args)
#else
#define debug_log(args...) do { } while(0)
#endif
void* child_thread_func(void *arg);
void print_usage();
int thread_setup(int my_id);
my_policy_type_t parse_thread_policy(const char *str);
int thread_finish_iteration();
void selfexec_with_apptype(int argc, char *argv[]);
int g_numthreads;
wake_type_t g_waketype;
policy_t g_policy;
int g_iterations;
struct mach_timebase_info g_mti;
semaphore_t g_main_sem;
uint64_t *g_thread_endtimes_abs;
volatile int32_t g_done_threads;
boolean_t g_do_spin = FALSE;
boolean_t g_verbose = FALSE;
boolean_t g_do_affinity = FALSE;
uint64_t g_starttime_abs;
#if MIMIC_DIGI_LEAD_TIME
int g_long_spinid;
uint64_t g_spinlength_abs;
#endif
semaphore_t g_machsem;
semaphore_t g_leadersem;
semaphore_t *g_semarr;
uint64_t
abs_to_nanos(uint64_t abstime)
{
return (uint64_t)(abstime * (((double)g_mti.numer) / ((double)g_mti.denom)));
}
uint64_t
nanos_to_abs(uint64_t ns)
{
return (uint64_t)(ns * (((double)g_mti.denom) / ((double)g_mti.numer)));
}
my_policy_type_t
parse_thread_policy(const char *str)
{
if (strcmp(str, "timeshare") == 0) {
return MY_POLICY_TIMESHARE;
} else if (strcmp(str, "realtime") == 0) {
return MY_POLICY_REALTIME;
} else if (strcmp(str, "fixed") == 0) {
return MY_POLICY_FIXEDPRI;
} else {
printf("Invalid thread policy %s\n", str);
exit(1);
}
}
wake_type_t
parse_wakeup_pattern(const char *str)
{
if (strcmp(str, "chain") == 0) {
return WAKE_CHAIN;
} else if (strcmp(str, "broadcast-single-sem") == 0) {
return WAKE_BROADCAST_ONESEM;
} else if (strcmp(str, "broadcast-per-thread") == 0) {
return WAKE_BROADCAST_PERTHREAD;
} else {
print_usage();
exit(1);
}
}
int
thread_setup(int my_id)
{
int res;
switch (g_policy) {
case MY_POLICY_TIMESHARE:
{
res = KERN_SUCCESS;
break;
}
case MY_POLICY_REALTIME:
{
thread_time_constraint_policy_data_t pol;
pol.period = 100000;
pol.constraint = nanos_to_abs(CONSTRAINT_NANOS);
pol.computation = nanos_to_abs(COMPUTATION_NANOS);
pol.preemptible = 0;
res = thread_policy_set(mach_thread_self(), THREAD_TIME_CONSTRAINT_POLICY, (thread_policy_t) &pol, THREAD_TIME_CONSTRAINT_POLICY_COUNT);
assert(res == 0, fail);
break;
}
case MY_POLICY_FIXEDPRI:
{
thread_extended_policy_data_t pol;
pol.timeshare = 0;
res = thread_policy_set(mach_thread_self(), THREAD_EXTENDED_POLICY, (thread_policy_t) &pol, THREAD_EXTENDED_POLICY_COUNT);
assert(res == 0, fail);
break;
}
default:
{
printf("invalid policy type\n");
return 1;
}
}
if (g_do_affinity) {
thread_affinity_policy_data_t affinity;
affinity.affinity_tag = my_id % 2;
res = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, (thread_policy_t)&affinity, THREAD_AFFINITY_POLICY_COUNT);
assert(res == 0, fail);
}
return 0;
fail:
return 1;
}
int
thread_finish_iteration(int id)
{
int32_t new;
int res = 0;
volatile float x = 0.0;
volatile float y = 0.0;
debug_log("Thread %p finished iteration.\n", pthread_self());
#if MIMIC_DIGI_LEAD_TIME
if (g_do_spin) {
if (g_long_spinid == id) {
uint64_t endspin;
endspin = g_starttime_abs + g_spinlength_abs;
while (mach_absolute_time() < endspin) {
y = y + 1.5 + x;
x = sqrt(y);
}
}
}
#endif
new = OSAtomicIncrement32(&g_done_threads);
debug_log("New value is %d\n", new);
if (new == g_numthreads) {
debug_log("Thread %p signalling main thread.\n", pthread_self());
res = semaphore_signal(g_main_sem);
} else {
#ifndef MIMIC_DIGI_LEAD_TIME
if (g_do_spin) {
while (g_done_threads < g_numthreads) {
y = y + 1.5 + x;
x = sqrt(y);
}
}
#endif
}
return res;
}
void*
child_thread_func(void *arg)
{
int my_id = (int)(uintptr_t)arg;
int res;
int i, j;
int32_t new;
thread_setup(my_id);
new = OSAtomicIncrement32(&g_done_threads);
semaphore_signal(g_main_sem);
for (i = 0; i < g_iterations; i++) {
if (my_id == 0) {
res = semaphore_wait(g_leadersem);
assert(res == 0, fail);
g_thread_endtimes_abs[my_id] = mach_absolute_time();
#if MIMIC_DIGI_LEAD_TIME
g_long_spinid = rand() % g_numthreads;
#endif
switch (g_waketype) {
case WAKE_CHAIN:
semaphore_signal(g_semarr[my_id + 1]);
break;
case WAKE_BROADCAST_ONESEM:
semaphore_signal_all(g_machsem);
break;
case WAKE_BROADCAST_PERTHREAD:
for (j = 1; j < g_numthreads; j++) {
semaphore_signal(g_semarr[j]);
}
break;
default:
printf("Invalid wakeup type?!\n");
exit(1);
}
} else {
switch(g_waketype) {
case WAKE_BROADCAST_ONESEM:
res = semaphore_wait(g_machsem);
assert(res == KERN_SUCCESS, fail);
g_thread_endtimes_abs[my_id] = mach_absolute_time();
break;
case WAKE_BROADCAST_PERTHREAD:
res = semaphore_wait(g_semarr[my_id]);
assert(res == 0, fail);
g_thread_endtimes_abs[my_id] = mach_absolute_time();
break;
case WAKE_CHAIN:
res = semaphore_wait(g_semarr[my_id]);
assert(res == 0, fail);
g_thread_endtimes_abs[my_id] = mach_absolute_time();
if (my_id < (g_numthreads - 1)) {
res = semaphore_signal(g_semarr[my_id + 1]);
assert(res == 0, fail);
}
break;
default:
printf("Invalid wake type.\n");
goto fail;
}
}
res = thread_finish_iteration(my_id);
assert(res == 0, fail);
}
return 0;
fail:
exit(1);
}
void
print_usage()
{
printf("Usage: zn <num threads> <chain | broadcast-single-sem | broadcast-per-thread> <realtime | timeshare | fixed> <num iterations> [-trace <traceworthy latency in ns>] [-spin] [-affinity] [-verbose]\n");
}
void
compute_stats(uint64_t *values, uint64_t count, float *averagep, uint64_t *maxp, uint64_t *minp, float *stddevp)
{
int i;
uint64_t _sum = 0;
uint64_t _max = 0;
uint64_t _min = UINT64_MAX;
float _avg = 0;
float _dev = 0;
for (i = 0; i < count; i++) {
_sum += values[i];
_max = values[i] > _max ? values[i] : _max;
_min = values[i] < _min ? values[i] : _min;
}
_avg = ((float)_sum) / ((float)count);
_dev = 0;
for (i = 0; i < count; i++) {
_dev += powf((((float)values[i]) - _avg), 2);
}
_dev /= count;
_dev = sqrtf(_dev);
*averagep = _avg;
*maxp = _max;
*minp = _min;
*stddevp = _dev;
}
int
main(int argc, char **argv)
{
int i;
int res;
pthread_t *threads;
uint64_t *worst_latencies_ns;
uint64_t *worst_latencies_from_first_ns;
uint64_t last_end;
uint64_t max, min;
uint64_t traceworthy_latency_ns = TRACEWORTHY_NANOS;
float avg, stddev;
boolean_t seen_apptype = FALSE;
srand(time(NULL));
if (argc < 5 || argc > 10) {
print_usage();
goto fail;
}
g_numthreads = atoi(argv[1]);
g_waketype = parse_wakeup_pattern(argv[2]);
g_policy = parse_thread_policy(argv[3]);
g_iterations = atoi(argv[4]);
for (i = 5; i < argc; i++) {
if (strcmp(argv[i], "-spin") == 0) {
g_do_spin = TRUE;
} else if (strcmp(argv[i], "-verbose") == 0) {
g_verbose = TRUE;
} else if ((strcmp(argv[i], "-trace") == 0) &&
(i < (argc - 1))) {
traceworthy_latency_ns = strtoull(argv[++i], NULL, 10);
} else if (strcmp(argv[i], "-affinity") == 0) {
g_do_affinity = TRUE;
} else if (strcmp(argv[i], "-switched_apptype") == 0) {
seen_apptype = TRUE;
} else {
print_usage();
goto fail;
}
}
if (!seen_apptype) {
selfexec_with_apptype(argc, argv);
}
mach_timebase_info(&g_mti);
#if MIMIC_DIGI_LEAD_TIME
g_spinlength_abs = nanos_to_abs(COMPUTATION_NANOS) / 2;
#endif
threads = (pthread_t*) malloc(sizeof(pthread_t) * g_numthreads);
assert(threads, fail);
g_thread_endtimes_abs = (uint64_t*) malloc(sizeof(uint64_t) * g_numthreads);
assert(g_thread_endtimes_abs, fail);
worst_latencies_ns = (uint64_t*) malloc(sizeof(uint64_t) * g_iterations);
assert(worst_latencies_ns, fail);
worst_latencies_from_first_ns = (uint64_t*) malloc(sizeof(uint64_t) * g_iterations);
assert(worst_latencies_from_first_ns, fail);
res = semaphore_create(mach_task_self(), &g_main_sem, SYNC_POLICY_FIFO, 0);
assert(res == KERN_SUCCESS, fail);
if (g_waketype == WAKE_CHAIN || g_waketype == WAKE_BROADCAST_PERTHREAD) {
g_semarr = malloc(sizeof(semaphore_t) * g_numthreads);
assert(g_semarr != NULL, fail);
for (i = 0; i < g_numthreads; i++) {
res = semaphore_create(mach_task_self(), &g_semarr[i], SYNC_POLICY_FIFO, 0);
assert(res == KERN_SUCCESS, fail);
}
g_leadersem = g_semarr[0];
} else {
res = semaphore_create(mach_task_self(), &g_machsem, SYNC_POLICY_FIFO, 0);
assert(res == KERN_SUCCESS, fail);
res = semaphore_create(mach_task_self(), &g_leadersem, SYNC_POLICY_FIFO, 0);
assert(res == KERN_SUCCESS, fail);
}
g_done_threads = 0;
for (i = 0; i < g_numthreads; i++) {
res = pthread_create(&threads[i], NULL, child_thread_func, (void*)(uintptr_t)i);
assert(res == 0, fail);
}
res = setpriority(PRIO_DARWIN_ROLE, 0, PRIO_DARWIN_ROLE_UI_FOCAL);
assert(res == 0, fail);
thread_setup(0);
if (g_policy == MY_POLICY_FIXEDPRI) {
thread_precedence_policy_data_t prec;
mach_msg_type_number_t count;
boolean_t get_default = FALSE;
count = THREAD_PRECEDENCE_POLICY_COUNT;
res = thread_policy_get(mach_thread_self(), THREAD_PRECEDENCE_POLICY, (thread_policy_t) &prec, &count, &get_default);
assert(res == 0, fail);
prec.importance += 16;
res = thread_policy_set(mach_thread_self(), THREAD_PRECEDENCE_POLICY, (thread_policy_t) &prec, THREAD_PRECEDENCE_POLICY_COUNT);
assert(res == 0, fail);
}
for (i = 0; i < g_numthreads; i++) {
res = semaphore_wait(g_main_sem);
assert(res == 0, fail);
}
usleep(g_numthreads * 10);
for (i = 0; i < g_iterations; i++) {
int j;
uint64_t worst_abs = 0, best_abs = UINT64_MAX;
g_done_threads = 0;
OSMemoryBarrier();
g_starttime_abs = mach_absolute_time();
semaphore_signal(g_leadersem);
semaphore_wait(g_main_sem);
assert(res == KERN_SUCCESS, fail);
for (j = 0; j < g_numthreads; j++) {
uint64_t latency_abs;
latency_abs = g_thread_endtimes_abs[j] - g_starttime_abs;
worst_abs = worst_abs < latency_abs ? latency_abs : worst_abs;
}
worst_latencies_ns[i] = abs_to_nanos(worst_abs);
worst_abs = 0;
for (j = 1; j < g_numthreads; j++) {
uint64_t latency_abs;
latency_abs = g_thread_endtimes_abs[j] - g_thread_endtimes_abs[0];
worst_abs = worst_abs < latency_abs ? latency_abs : worst_abs;
best_abs = best_abs > latency_abs ? latency_abs : best_abs;
}
worst_latencies_from_first_ns[i] = abs_to_nanos(worst_abs);
if (worst_latencies_from_first_ns[i] > traceworthy_latency_ns) {
int _tmp;
if (g_verbose) {
printf("Worst on this round was %.2f us.\n", ((float)worst_latencies_from_first_ns[i]) / 1000.0);
}
_tmp = kdebug_trace(0xeeeee0 | DBG_FUNC_NONE,
worst_latencies_from_first_ns[i] >> 32,
worst_latencies_from_first_ns[i] & 0xFFFFFFFF,
traceworthy_latency_ns >> 32,
traceworthy_latency_ns & 0xFFFFFFFF);
}
usleep(g_numthreads * 10);
}
last_end = 0;
for (i = 0; i < g_numthreads; i++) {
res = pthread_join(threads[i], NULL);
assert(res == 0, fail);
}
compute_stats(worst_latencies_ns, g_iterations, &avg, &max, &min, &stddev);
printf("Results (from a stop):\n");
printf("Max:\t\t%.2f us\n", ((float)max) / 1000.0);
printf("Min:\t\t%.2f us\n", ((float)min) / 1000.0);
printf("Avg:\t\t%.2f us\n", avg / 1000.0);
printf("Stddev:\t\t%.2f us\n", stddev / 1000.0);
putchar('\n');
compute_stats(worst_latencies_from_first_ns, g_iterations, &avg, &max, &min, &stddev);
printf("Results (relative to first thread):\n");
printf("Max:\t\t%.2f us\n", ((float)max) / 1000.0);
printf("Min:\t\t%.2f us\n", ((float)min) / 1000.0);
printf("Avg:\t\t%.2f us\n", avg / 1000.0);
printf("Stddev:\t\t%.2f us\n", stddev / 1000.0);
#if 0
for (i = 0; i < g_iterations; i++) {
printf("Iteration %d: %f us\n", i, worst_latencies_ns[i] / 1000.0);
}
#endif
return 0;
fail:
return 1;
}
void
selfexec_with_apptype(int argc, char *argv[])
{
int ret;
posix_spawnattr_t attr;
extern char **environ;
char *new_argv[argc + 1 + 1 ];
int i;
char prog[PATH_MAX];
uint32_t prog_size = PATH_MAX;
ret = _NSGetExecutablePath(prog, &prog_size);
if (ret != 0) err(1, "_NSGetExecutablePath");
for (i=0; i < argc; i++) {
new_argv[i] = argv[i];
}
new_argv[i] = "-switched_apptype";
new_argv[i+1] = NULL;
ret = posix_spawnattr_init(&attr);
if (ret != 0) errc(1, ret, "posix_spawnattr_init");
ret = posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETEXEC);
if (ret != 0) errc(1, ret, "posix_spawnattr_setflags");
ret = posix_spawnattr_setprocesstype_np(&attr, POSIX_SPAWN_PROC_TYPE_APP_DEFAULT);
if (ret != 0) errc(1, ret, "posix_spawnattr_setprocesstype_np");
ret = posix_spawn(NULL, prog, NULL, &attr, new_argv, environ);
if (ret != 0) errc(1, ret, "posix_spawn");
}