#include <freeradius-devel/ident.h>
RCSID("$Id$")
#include <freeradius-devel/radiusd.h>
#include <freeradius-devel/rad_assert.h>
#ifdef HAVE_SEMAPHORE_H
#include <semaphore.h>
#endif
#ifdef DARWIN
#include <mach/task.h>
#include <mach/semaphore.h>
#undef sem_t
#define sem_t semaphore_t
#undef sem_init
#define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
#undef sem_wait
#define sem_wait(s) semaphore_wait(*s)
#undef sem_post
#define sem_post(s) semaphore_signal(*s)
#endif
#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
#ifdef HAVE_PTHREAD_H
#ifdef HAVE_OPENSSL_CRYPTO_H
#include <openssl/crypto.h>
#endif
#ifdef HAVE_OPENSSL_ERR_H
#include <openssl/err.h>
#endif
#ifdef HAVE_OPENSSL_EVP_H
#include <openssl/evp.h>
#endif
#define SEMAPHORE_LOCKED (0)
#define SEMAPHORE_UNLOCKED (1)
#define THREAD_RUNNING (1)
#define THREAD_CANCELLED (2)
#define THREAD_EXITED (3)
#define NUM_FIFOS RAD_LISTEN_MAX
typedef struct THREAD_HANDLE {
struct THREAD_HANDLE *prev;
struct THREAD_HANDLE *next;
pthread_t pthread_id;
int thread_num;
int status;
unsigned int request_count;
time_t timestamp;
REQUEST *request;
} THREAD_HANDLE;
typedef struct request_queue_t {
REQUEST *request;
RAD_REQUEST_FUNP fun;
} request_queue_t;
typedef struct thread_fork_t {
pid_t pid;
int status;
int exited;
} thread_fork_t;
typedef struct THREAD_POOL {
THREAD_HANDLE *head;
THREAD_HANDLE *tail;
int total_threads;
int active_threads;
int max_thread_num;
int start_threads;
int max_threads;
int min_spare_threads;
int max_spare_threads;
unsigned int max_requests_per_thread;
unsigned long request_count;
time_t time_last_spawned;
int cleanup_delay;
int spawn_flag;
#ifdef WNOHANG
pthread_mutex_t wait_mutex;
fr_hash_table_t *waiters;
#endif
sem_t semaphore;
pthread_mutex_t queue_mutex;
int max_queue_size;
int num_queued;
fr_fifo_t *fifo[NUM_FIFOS];
} THREAD_POOL;
static THREAD_POOL thread_pool;
static int pool_initialized = FALSE;
static time_t last_cleaned = 0;
static time_t almost_now = 0;
static void thread_pool_manage(time_t now);
static const CONF_PARSER thread_config[] = {
{ "start_servers", PW_TYPE_INTEGER, 0, &thread_pool.start_threads, "5" },
{ "max_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_threads, "32" },
{ "min_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads, "3" },
{ "max_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads, "10" },
{ "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
{ "cleanup_delay", PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay, "5" },
{ "max_queue_size", PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size, "65536" },
{ NULL, -1, 0, NULL, NULL }
};
#ifdef HAVE_OPENSSL_CRYPTO_H
static pthread_mutex_t *ssl_mutexes = NULL;
static unsigned long ssl_id_function(void)
{
return (unsigned long) pthread_self();
}
static void ssl_locking_function(int mode, int n, const char *file, int line)
{
file = file;
line = line;
if (mode & CRYPTO_LOCK) {
pthread_mutex_lock(&(ssl_mutexes[n]));
} else {
pthread_mutex_unlock(&(ssl_mutexes[n]));
}
}
static int setup_ssl_mutexes(void)
{
int i;
#ifdef HAVE_OPENSSL_EVP_H
OpenSSL_add_all_algorithms();
#endif
ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
if (!ssl_mutexes) {
radlog(L_ERR, "Error allocating memory for SSL mutexes!");
return 0;
}
for (i = 0; i < CRYPTO_num_locks(); i++) {
pthread_mutex_init(&(ssl_mutexes[i]), NULL);
}
CRYPTO_set_id_callback(ssl_id_function);
CRYPTO_set_locking_callback(ssl_locking_function);
return 1;
}
#endif
#ifdef WNOHANG
static void reap_children(void)
{
pid_t pid;
int status;
thread_fork_t mytf, *tf;
pthread_mutex_lock(&thread_pool.wait_mutex);
do {
retry:
pid = waitpid(0, &status, WNOHANG);
if (pid <= 0) break;
mytf.pid = pid;
tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
if (!tf) goto retry;
tf->status = status;
tf->exited = 1;
} while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
pthread_mutex_unlock(&thread_pool.wait_mutex);
}
#else
#define reap_children()
#endif
static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
{
request_queue_t *entry;
pthread_mutex_lock(&thread_pool.queue_mutex);
thread_pool.request_count++;
if (thread_pool.num_queued >= thread_pool.max_queue_size) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
radlog(L_ERR, "Something is blocking the server. There are %d packets in the queue, waiting to be processed. Ignoring the new request.", thread_pool.max_queue_size);
request->child_state = REQUEST_DONE;
return 0;
}
request->child_state = REQUEST_QUEUED;
request->component = "<core>";
request->module = "<queue>";
entry = rad_malloc(sizeof(*entry));
entry->request = request;
entry->fun = fun;
if (!fr_fifo_push(thread_pool.fifo[request->priority],
entry)) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
request->child_state = REQUEST_DONE;
return 0;
}
thread_pool.num_queued++;
pthread_mutex_unlock(&thread_pool.queue_mutex);
sem_post(&thread_pool.semaphore);
return 1;
}
static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
{
int blocked;
RAD_LISTEN_TYPE i, start;
request_queue_t *entry;
reap_children();
pthread_mutex_lock(&thread_pool.queue_mutex);
for (i = 0; i < RAD_LISTEN_MAX; i++) {
entry = fr_fifo_peek(thread_pool.fifo[i]);
if (!entry ||
(entry->request->master_state != REQUEST_STOP_PROCESSING)) {
continue;
}
entry = fr_fifo_pop(thread_pool.fifo[i]);
rad_assert(entry != NULL);
entry->request->child_state = REQUEST_DONE;
thread_pool.num_queued--;
free(entry);
entry = NULL;
}
start = 0;
retry:
for (i = start; i < RAD_LISTEN_MAX; i++) {
entry = fr_fifo_pop(thread_pool.fifo[i]);
if (entry) {
start = i;
break;
}
}
if (!entry) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
*request = NULL;
*fun = NULL;
return 0;
}
rad_assert(thread_pool.num_queued > 0);
thread_pool.num_queued--;
*request = entry->request;
*fun = entry->fun;
free(entry);
entry = NULL;
rad_assert(*request != NULL);
rad_assert((*request)->magic == REQUEST_MAGIC);
rad_assert(*fun != NULL);
(*request)->component = "<core>";
(*request)->module = "<thread>";
if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
(*request)->module = "<done>";
(*request)->child_state = REQUEST_DONE;
goto retry;
}
rad_assert(almost_now != 0);
blocked = almost_now - (*request)->timestamp;
if (blocked < 5) {
blocked = 0;
} else {
static time_t last_complained = 0;
if (last_complained != almost_now) {
last_complained = almost_now;
} else {
blocked = 0;
}
}
thread_pool.active_threads++;
pthread_mutex_unlock(&thread_pool.queue_mutex);
if (blocked) {
radlog(L_ERR, "Request %u has been waiting in the processing queue for %d seconds. Check that all databases are running properly!",
(*request)->number, blocked);
}
return 1;
}
static void *request_handler_thread(void *arg)
{
RAD_REQUEST_FUNP fun;
THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
do {
DEBUG2("Thread %d waiting to be assigned a request",
self->thread_num);
re_wait:
if (sem_wait(&thread_pool.semaphore) != 0) {
if (errno == EINTR) {
DEBUG2("Re-wait %d", self->thread_num);
goto re_wait;
}
radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
self->thread_num, strerror(errno));
break;
}
DEBUG2("Thread %d got semaphore", self->thread_num);
#ifdef HAVE_OPENSSL_ERR_H
ERR_clear_error ();
#endif
if (!request_dequeue(&self->request, &fun)) continue;
self->request->child_pid = self->pthread_id;
self->request_count++;
DEBUG2("Thread %d handling request %d, (%d handled so far)",
self->thread_num, self->request->number,
self->request_count);
radius_handle_request(self->request, fun);
pthread_mutex_lock(&thread_pool.queue_mutex);
rad_assert(thread_pool.active_threads > 0);
thread_pool.active_threads--;
pthread_mutex_unlock(&thread_pool.queue_mutex);
} while (self->status != THREAD_CANCELLED);
DEBUG2("Thread %d exiting...", self->thread_num);
#ifdef HAVE_OPENSSL_ERR_H
ERR_remove_state(0);
#endif
self->request = NULL;
self->status = THREAD_EXITED;
return NULL;
}
static void delete_thread(THREAD_HANDLE *handle)
{
THREAD_HANDLE *prev;
THREAD_HANDLE *next;
rad_assert(handle->request == NULL);
DEBUG2("Deleting thread %d", handle->thread_num);
prev = handle->prev;
next = handle->next;
rad_assert(thread_pool.total_threads > 0);
thread_pool.total_threads--;
if (prev == NULL) {
rad_assert(thread_pool.head == handle);
thread_pool.head = next;
} else {
prev->next = next;
}
if (next == NULL) {
rad_assert(thread_pool.tail == handle);
thread_pool.tail = prev;
} else {
next->prev = prev;
}
free(handle);
}
static THREAD_HANDLE *spawn_thread(time_t now)
{
int rcode;
THREAD_HANDLE *handle;
pthread_attr_t attr;
if (thread_pool.total_threads >= thread_pool.max_threads) {
DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads);
return NULL;
}
handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
memset(handle, 0, sizeof(THREAD_HANDLE));
handle->prev = NULL;
handle->next = NULL;
handle->thread_num = thread_pool.max_thread_num++;
handle->request_count = 0;
handle->status = THREAD_RUNNING;
handle->timestamp = time(NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
rcode = pthread_create(&handle->pthread_id, &attr,
request_handler_thread, handle);
if (rcode != 0) {
radlog(L_ERR, "Thread create failed: %s",
strerror(rcode));
return NULL;
}
pthread_attr_destroy(&attr);
thread_pool.total_threads++;
DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
handle->thread_num, thread_pool.total_threads);
if (thread_pool.tail) {
thread_pool.tail->next = handle;
handle->prev = thread_pool.tail;
thread_pool.tail = handle;
} else {
rad_assert(thread_pool.head == NULL);
thread_pool.head = thread_pool.tail = handle;
}
thread_pool.time_last_spawned = now;
return handle;
}
int total_active_threads(void)
{
return thread_pool.active_threads;
}
#ifdef WNOHANG
static uint32_t pid_hash(const void *data)
{
const thread_fork_t *tf = data;
return fr_hash(&tf->pid, sizeof(tf->pid));
}
static int pid_cmp(const void *one, const void *two)
{
const thread_fork_t *a = one;
const thread_fork_t *b = two;
return (a->pid - b->pid);
}
#endif
int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
{
int i, rcode;
CONF_SECTION *pool_cf;
time_t now;
now = time(NULL);
rad_assert(spawn_flag != NULL);
rad_assert(*spawn_flag == TRUE);
rad_assert(pool_initialized == FALSE);
pool_cf = cf_subsection_find_next(cs, NULL, "thread");
if (!pool_cf) *spawn_flag = FALSE;
memset(&thread_pool, 0, sizeof(THREAD_POOL));
thread_pool.head = NULL;
thread_pool.tail = NULL;
thread_pool.total_threads = 0;
thread_pool.max_thread_num = 1;
thread_pool.cleanup_delay = 5;
thread_pool.spawn_flag = *spawn_flag;
if (!*spawn_flag) return 0;
#ifdef WNOHANG
if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
strerror(errno));
return -1;
}
thread_pool.waiters = fr_hash_table_create(pid_hash,
pid_cmp,
free);
if (!thread_pool.waiters) {
radlog(L_ERR, "FATAL: Failed to set up wait hash");
return -1;
}
#endif
if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
return -1;
}
if (thread_pool.min_spare_threads < 1)
thread_pool.min_spare_threads = 1;
if (thread_pool.max_spare_threads < 1)
thread_pool.max_spare_threads = 1;
if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
thread_pool.max_spare_threads = thread_pool.min_spare_threads;
if (pool_initialized) {
return 0;
}
memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
if (rcode != 0) {
radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
strerror(errno));
return -1;
}
rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
if (rcode != 0) {
radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
strerror(errno));
return -1;
}
for (i = 0; i < RAD_LISTEN_MAX; i++) {
thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
if (!thread_pool.fifo[i]) {
radlog(L_ERR, "FATAL: Failed to set up request fifo");
return -1;
}
}
#ifdef HAVE_OPENSSL_CRYPTO_H
if (!setup_ssl_mutexes()) {
radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
return -1;
}
#endif
for (i = 0; i < thread_pool.start_threads; i++) {
if (spawn_thread(now) == NULL) {
return -1;
}
}
DEBUG2("Thread pool initialized");
pool_initialized = TRUE;
return 0;
}
int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
{
almost_now = request->timestamp;
if (!thread_pool.spawn_flag) {
radius_handle_request(request, fun);
#ifdef WNOHANG
wait(NULL);
#endif
return 1;
}
if (!request_enqueue(request, fun)) return 0;
if ((last_cleaned < almost_now) ||
(thread_pool.active_threads == thread_pool.total_threads)) {
thread_pool_manage(almost_now);
}
return 1;
}
static void thread_pool_manage(time_t now)
{
int spare;
int i, total;
THREAD_HANDLE *handle, *next;
int active_threads;
active_threads = thread_pool.active_threads;
spare = thread_pool.total_threads - active_threads;
if (debug_flag) {
static int old_total = -1;
static int old_active = -1;
if ((old_total != thread_pool.total_threads) ||
(old_active != active_threads)) {
DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
thread_pool.total_threads, active_threads, spare);
old_total = thread_pool.total_threads;
old_active = active_threads;
}
}
if (spare < thread_pool.min_spare_threads) {
total = thread_pool.min_spare_threads - spare;
DEBUG2("Threads: Spawning %d spares", total);
for (i = 0; i < total; i++) {
handle = spawn_thread(now);
if (handle == NULL) {
return;
}
}
return;
}
if (now == last_cleaned) {
return;
}
last_cleaned = now;
for (handle = thread_pool.head; handle; handle = next) {
next = handle->next;
if (handle->status == THREAD_EXITED) {
delete_thread(handle);
}
}
if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
return;
}
if (spare > thread_pool.max_spare_threads) {
spare -= thread_pool.max_spare_threads;
DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
next = handle->next;
if ((handle->request == NULL) &&
(handle->status == THREAD_RUNNING)) {
handle->status = THREAD_CANCELLED;
sem_post(&thread_pool.semaphore);
spare--;
break;
}
}
}
if (thread_pool.max_requests_per_thread > 0) {
for (handle = thread_pool.head; handle; handle = next) {
next = handle->next;
if ((handle->request == NULL) &&
(handle->status == THREAD_RUNNING) &&
(handle->request_count > thread_pool.max_requests_per_thread)) {
handle->status = THREAD_CANCELLED;
sem_post(&thread_pool.semaphore);
}
}
}
return;
}
#ifdef WNOHANG
pid_t rad_fork(void)
{
pid_t child_pid;
if (!pool_initialized) return fork();
reap_children();
if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
return -1;
}
child_pid = fork();
if (child_pid > 0) {
int rcode;
thread_fork_t *tf;
tf = rad_malloc(sizeof(*tf));
memset(tf, 0, sizeof(*tf));
tf->pid = child_pid;
pthread_mutex_lock(&thread_pool.wait_mutex);
rcode = fr_hash_table_insert(thread_pool.waiters, tf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
if (!rcode) {
radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
(int) child_pid);
free(tf);
}
}
return child_pid;
}
pid_t rad_waitpid(pid_t pid, int *status)
{
int i;
thread_fork_t mytf, *tf;
if (!pool_initialized) return waitpid(pid, status, 0);
if (pid <= 0) return -1;
mytf.pid = pid;
pthread_mutex_lock(&thread_pool.wait_mutex);
tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
if (!tf) return -1;
for (i = 0; i < 100; i++) {
reap_children();
if (tf->exited) {
*status = tf->status;
pthread_mutex_lock(&thread_pool.wait_mutex);
fr_hash_table_delete(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
return pid;
}
usleep(100000);
}
pthread_mutex_lock(&thread_pool.wait_mutex);
fr_hash_table_delete(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
return 0;
}
#else
#endif
void thread_pool_lock(void)
{
pthread_mutex_lock(&thread_pool.queue_mutex);
}
void thread_pool_unlock(void)
{
pthread_mutex_unlock(&thread_pool.queue_mutex);
}
void thread_pool_queue_stats(int *array)
{
int i;
if (pool_initialized) {
for (i = 0; i < RAD_LISTEN_MAX; i++) {
array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
}
} else {
for (i = 0; i < RAD_LISTEN_MAX; i++) {
array[i] = 0;
}
}
}
#endif