#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdbool.h>
#include <errno.h>
#include <libkern/OSAtomic.h>
#include "darwintest_defaults.h"
#include <darwintest_utils.h>
struct context {
pthread_cond_t cond;
pthread_mutex_t mutex;
pthread_cond_t ready_cond;
long waiters;
long count;
bool ready;
char _padding[7];
};
static void *wait_thread(void *ptr) {
struct context *context = ptr;
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_mutex_lock(&context->mutex), "pthread_mutex_lock");
context->ready = true;
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_cond_signal(&context->ready_cond), "pthread_cond_signal");
bool loop = true;
while (loop) {
if (context->count > 0) {
++context->waiters;
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_cond_wait(&context->cond, &context->mutex), "[%ld] pthread_rwlock_unlock", context->count);
--context->waiters;
--context->count;
} else {
loop = false;
}
}
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_mutex_unlock(&context->mutex), "[%ld] pthread_mutex_unlock", context->count);
return NULL;
}
T_DECL(cond, "pthread_cond",
T_META_ALL_VALID_ARCHS(YES), T_META_TIMEOUT(120), T_META_CHECK_LEAKS(NO))
{
struct context context = {
.cond = PTHREAD_COND_INITIALIZER,
.mutex = PTHREAD_MUTEX_INITIALIZER,
.ready_cond = PTHREAD_COND_INITIALIZER,
.waiters = 0,
.count = 50000 * dt_ncpu(),
.ready = false,
};
int i;
int res;
int threads = 2;
pthread_t p[threads];
for (i = 0; i < threads; ++i) {
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_mutex_lock(&context.mutex), "pthread_mutex_lock");
context.ready = false;
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_create(&p[i], NULL, wait_thread, &context), "pthread_create");
do {
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_cond_wait(&context.ready_cond, &context.mutex), "pthread_cond_wait");
} while (context.ready == false);
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_mutex_unlock(&context.mutex), "pthread_mutex_lock");
T_LOG("Thread %d ready.", i);
}
T_LOG("All threads ready.");
long half = context.count / 2;
bool loop = true;
while (loop) {
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_mutex_lock(&context.mutex), "[%ld] pthread_mutex_lock", context.count);
if (context.waiters) {
char *str;
if (context.count > half) {
str = "pthread_cond_broadcast";
res = pthread_cond_broadcast(&context.cond);
} else {
str = "pthread_cond_signal";
res = pthread_cond_signal(&context.cond);
}
T_QUIET;
T_ASSERT_POSIX_ZERO(res, "[%ld] %s", context.count, str);
}
if (context.count <= 0) {
loop = false;
T_PASS("Completed stres test successfully.");
}
T_QUIET;
T_ASSERT_POSIX_ZERO(pthread_mutex_unlock(&context.mutex),
"[%ld] pthread_mutex_unlock", context.count);
}
for (i = 0; i < threads; ++i) {
T_ASSERT_POSIX_ZERO(pthread_join(p[i], NULL), NULL);
}
}
#pragma mark invalid concurrent mutex use
#if 0
static pthread_cond_t concurrent_cond = PTHREAD_COND_INITIALIZER;
static void *
invalid_wait_thread(void *arg)
{
pthread_mutex_t *mutex = arg;
int rc = pthread_mutex_lock(mutex);
T_ASSERT_POSIX_ZERO(rc, "lock mutex");
while (true) {
rc = pthread_cond_wait(&concurrent_cond, mutex);
if (rc == EINVAL) {
T_PASS("Detected EINVAL");
T_END;
} else {
T_ASSERT_POSIX_ZERO(rc, "cond_wait");
}
}
}
T_DECL(cond_invalid_concurrent_mutex, "Detect concurrent waits with different mutexes as invalid")
{
int rc;
pthread_t threads[2];
pthread_mutex_t mutexes[2] = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER,
};
for (int i = 0; i < 2; i++) {
rc = pthread_create(&threads[i], NULL, invalid_wait_thread,
&mutexes[i]);
T_ASSERT_POSIX_ZERO(rc, "pthread_create");
}
pthread_join(threads[0], NULL);
}
#endif
#pragma mark mutex ping pong test
struct cond_mutex_ping_pong_ctx_s {
pthread_mutex_t mutex;
int arrived;
int group;
};
static struct {
pthread_mutex_t sync_mutex;
pthread_cond_t sync_cond;
int group;
pthread_cond_t shared_cond;
} ping_pong = {
.sync_mutex = PTHREAD_MUTEX_INITIALIZER,
.sync_cond = PTHREAD_COND_INITIALIZER,
.group = 0,
.shared_cond = PTHREAD_COND_INITIALIZER,
};
#define PING_PONG_NGROUPS 2
#define PING_PONG_GROUP_NTHREADS 3
#define PING_PONG_ITERATIONS 5000
static void *
ping_pong_thread(void *arg)
{
int rc;
struct cond_mutex_ping_pong_ctx_s *ctx = arg;
for (int i = 1; i < PING_PONG_ITERATIONS; i++) {
if (i % 5000 == 0) {
T_LOG("Iteration %d", i);
}
rc = pthread_mutex_lock(&ping_pong.sync_mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "lock sync_mutex");
while (ping_pong.group != ctx->group) {
rc = pthread_cond_wait(&ping_pong.sync_cond, &ping_pong.sync_mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "sync cond_wait");
}
rc = pthread_mutex_unlock(&ping_pong.sync_mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "unlock sync_mutex");
rc = pthread_mutex_lock(&ctx->mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "lock mutex");
ctx->arrived++;
if (ctx->arrived == i * PING_PONG_GROUP_NTHREADS) {
rc = pthread_cond_broadcast(&ping_pong.shared_cond);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "shared cond_broadcast");
rc = pthread_mutex_lock(&ping_pong.sync_mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "lock sync_mutex");
ping_pong.group = (ping_pong.group + 1) % PING_PONG_NGROUPS;
rc = pthread_mutex_unlock(&ping_pong.sync_mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "unlock sync_mutex");
rc = pthread_cond_broadcast(&ping_pong.sync_cond);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "sync cond_broadcast");
} else {
while (ctx->arrived < i * PING_PONG_GROUP_NTHREADS) {
rc = pthread_cond_wait(&ping_pong.shared_cond, &ctx->mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "shared cond_wait");
}
}
rc = pthread_mutex_unlock(&ctx->mutex);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "unlock mutex");
}
return NULL;
}
T_DECL(cond_mutex_ping_pong, "Wait on the same condition variable with different mutexes",
T_META_ENVVAR("PTHREAD_MUTEX_USE_ULOCK=1"))
{
int rc;
pthread_t threads[PING_PONG_NGROUPS][PING_PONG_GROUP_NTHREADS];
struct cond_mutex_ping_pong_ctx_s ctxs[PING_PONG_NGROUPS];
for (int i = 0; i < PING_PONG_NGROUPS; i++) {
struct cond_mutex_ping_pong_ctx_s *ctx = &ctxs[i];
*ctx = (struct cond_mutex_ping_pong_ctx_s){
.mutex = PTHREAD_MUTEX_INITIALIZER,
.group = i,
};
for (int j = 0; j < PING_PONG_GROUP_NTHREADS; j++) {
rc = pthread_create(&threads[i][j], NULL, ping_pong_thread, ctx);
T_ASSERT_POSIX_ZERO(rc, "pthread_create");
}
}
for (int i = 0; i < PING_PONG_NGROUPS; i++) {
for (int j = 0; j < PING_PONG_GROUP_NTHREADS; j++) {
rc = pthread_join(threads[i][j], NULL);
T_ASSERT_POSIX_ZERO(rc, "pthread_join");
}
}
}
#pragma mark signal_thread_np tests
static struct signal_thread_ctx_s {
pthread_mutex_t mutex;
pthread_cond_t cond;
bool signaled;
} signal_thread_ctx = {
.mutex = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
};
static void *
chosen_waiter(void *arg __unused)
{
struct signal_thread_ctx_s *ctx = &signal_thread_ctx;
int rc = pthread_mutex_lock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "chosen waiter lock");
while (!ctx->signaled) {
rc = pthread_cond_wait(&ctx->cond, &ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "chosen waiter cond_wait");
}
T_PASS("chosen waiter woke");
T_END;
}
static void *
other_waiter_thread(void *arg __unused)
{
struct signal_thread_ctx_s *ctx = &signal_thread_ctx;
int rc = pthread_mutex_lock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "other waiter lock");
while (true) {
rc = pthread_cond_wait(&ctx->cond, &ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "other waiter cond_wait");
}
T_ASSERT_FAIL("Not reached");
return NULL;
}
T_DECL(cond_signal_thread_np_waiting, "signal a specific thread that's waiting")
{
int rc;
struct signal_thread_ctx_s *ctx = &signal_thread_ctx;
pthread_attr_t other_attr;
rc = pthread_attr_init(&other_attr);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "pthread_attr_init");
rc = pthread_attr_set_qos_class_np(&other_attr,
QOS_CLASS_USER_INTERACTIVE, 0);
T_ASSERT_POSIX_ZERO(rc, "pthread_attr_set_qos_class_np");
pthread_t other;
rc = pthread_create(&other, &other_attr, other_waiter_thread, NULL);
T_ASSERT_POSIX_ZERO(rc, "create other thread");
pthread_t chosen;
rc = pthread_create(&chosen, NULL, chosen_waiter, NULL);
T_ASSERT_POSIX_ZERO(rc, "create chosen thread");
T_LOG("Waiting for threads to wait");
sleep(5);
rc = pthread_mutex_lock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "lock mutex");
ctx->signaled = true;
rc = pthread_mutex_unlock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "unlock mutex");
rc = pthread_cond_signal_thread_np(&ctx->cond, chosen);
T_ASSERT_POSIX_ZERO(rc, "cond_signal_thread_np");
pthread_join(chosen, NULL);
}
static void *
absent_chosen_waiter(void *arg __unused)
{
T_LOG("chosen thread doing nothing forever");
while (true) {
sleep(100);
}
T_ASSERT_FAIL("Not reached");
return NULL;
}
static void *
not_absent_waiter(void *arg __unused)
{
struct signal_thread_ctx_s *ctx = &signal_thread_ctx;
int rc = pthread_mutex_lock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "other waiter lock");
while (!ctx->signaled) {
rc = pthread_cond_wait(&ctx->cond, &ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "other waiter cond_wait");
}
T_PASS("other waiter woke");
T_END;
}
T_DECL(cond_signal_thread_np_not_waiting, "signal a specific thread that isn't waiting")
{
int rc;
struct signal_thread_ctx_s *ctx = &signal_thread_ctx;
pthread_t other;
rc = pthread_create(&other, NULL, not_absent_waiter, NULL);
T_ASSERT_POSIX_ZERO(rc, "create other thread");
pthread_t chosen;
rc = pthread_create(&chosen, NULL, absent_chosen_waiter, NULL);
T_ASSERT_POSIX_ZERO(rc, "create chosen thread");
T_LOG("Waiting for threads to wait");
sleep(5);
rc = pthread_mutex_lock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "lock mutex");
ctx->signaled = true;
rc = pthread_mutex_unlock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "unlock mutex");
rc = pthread_cond_signal_thread_np(&ctx->cond, chosen);
T_ASSERT_POSIX_ZERO(rc, "cond_signal_thread_np");
pthread_join(other, NULL);
}
#pragma mark cancel signal race test
static struct cancel_signal_race_context_s {
pthread_mutex_t mutex;
pthread_cond_t cond;
} cancel_signal_race_context = {
.mutex = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
};
static void
cancelee_cleanup_handler(void *arg __unused)
{
T_LOG("cancelee cleanup handler");
struct cancel_signal_race_context_s *ctx = &cancel_signal_race_context;
int rc = pthread_mutex_unlock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "cleanup mutex unlock");
}
static void *
cancelee_thread(void *arg __unused)
{
struct cancel_signal_race_context_s *ctx = &cancel_signal_race_context;
int rc = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "disabled cancelation of cancelee thread");
rc = pthread_mutex_lock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "cancelee lock");
rc = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
if (rc) {
T_ASSERT_POSIX_ZERO(rc, "cancelation re-enabled");
}
pthread_cleanup_push(cancelee_cleanup_handler, NULL);
rc = pthread_cond_wait(&ctx->cond, &ctx->mutex);
pthread_cleanup_pop(0);
int rc2 = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
T_QUIET; T_ASSERT_POSIX_ZERO(rc2, "re-disabled cancelation of cancelee thread");
T_ASSERT_POSIX_ZERO(rc, "cancelee woke from cond_wait");
rc = pthread_mutex_unlock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "cancelee unlocked");
return NULL;
}
static struct {
int dummy;
} other_thread_timed_out;
static void *
other_racing_thread(void *arg __unused)
{
struct cancel_signal_race_context_s *ctx = &cancel_signal_race_context;
int rc = pthread_mutex_lock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc, "other lock");
struct timespec ts = {
.tv_sec = 10,
};
rc = pthread_cond_timedwait_relative_np(&ctx->cond, &ctx->mutex, &ts);
int rc2 = pthread_mutex_unlock(&ctx->mutex);
T_ASSERT_POSIX_ZERO(rc2, "other thread unlocked");
if (rc == ETIMEDOUT) {
T_LOG("other thread timed out");
return &other_thread_timed_out;
} else {
T_ASSERT_POSIX_ZERO(rc, "other thread woke from wait");
return NULL;
}
}
T_DECL(cond_cancel_signal_race, "Validate waiter cancelation does not eat wakes",
T_META_ENVVAR("PTHREAD_MUTEX_USE_ULOCK=1"))
{
int rc;
struct cancel_signal_race_context_s *ctx = &cancel_signal_race_context;
pthread_attr_t cancelee_attr;
rc = pthread_attr_init(&cancelee_attr);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "pthread_attr_init");
rc = pthread_attr_set_qos_class_np(&cancelee_attr,
QOS_CLASS_USER_INTERACTIVE, 0);
T_ASSERT_POSIX_ZERO(rc, "pthread_attr_set_qos_class_np");
pthread_t cancelee;
rc = pthread_create(&cancelee, &cancelee_attr, cancelee_thread, NULL);
T_ASSERT_POSIX_SUCCESS(rc, "create cancelee");
pthread_attr_t other_attr;
rc = pthread_attr_init(&other_attr);
T_QUIET; T_ASSERT_POSIX_ZERO(rc, "pthread_attr_init");
rc = pthread_attr_set_qos_class_np(&other_attr,
QOS_CLASS_USER_INITIATED, 0);
T_ASSERT_POSIX_ZERO(rc, "pthread_attr_set_qos_class_np");
pthread_t other;
rc = pthread_create(&other, &other_attr, other_racing_thread, NULL);
T_ASSERT_POSIX_SUCCESS(rc, "create other thread");
sleep(2);
rc = pthread_cond_signal(&ctx->cond);
(void)pthread_cancel(cancelee);
T_ASSERT_POSIX_ZERO(rc, "signal cancelee");
void *cancelee_retval, *other_retval;
rc = pthread_join(cancelee, &cancelee_retval);
T_ASSERT_POSIX_ZERO(rc, "join cancelee");
rc = pthread_join(other, &other_retval);
T_ASSERT_POSIX_ZERO(rc, "join other");
if (cancelee_retval == PTHREAD_CANCELED) {
T_LOG("cancelee was canceled");
T_ASSERT_EQ(other_retval, NULL, "other thread must have woken");
} else {
T_LOG("cancelee was not canceled quickly enough");
T_ASSERT_EQ(cancelee_retval, NULL, "cancelee returned success");
T_ASSERT_EQ(other_retval, &other_thread_timed_out, "other thread timed out");
}
}