#include "cReactor.h"
#include <signal.h>
#include <unistd.h>
static int cReactorThread_initThreadpool(cReactor *reactor);
static void *
worker_thread_main(void *arg)
{
cReactorThread *thread;
cReactorJob *job;
PyThreadState *thread_state;
PyThreadState *old_thread_state;
PyObject *result;
int done;
sigset_t sigmask;
sigfillset(&sigmask);
pthread_sigmask(SIG_SETMASK, &sigmask, NULL);
thread = (cReactorThread *)arg;
thread_state = PyThreadState_New(thread->interp);
done = 0;
while (! done)
{
job = cReactorJobQueue_PopWait(thread->reactor->worker_queue);
switch (job->type)
{
case CREACTOR_JOB_APPLY:
PyEval_AcquireLock();
old_thread_state = PyThreadState_Swap(thread_state);
result = PyEval_CallObjectWithKeywords(job->u.apply.callable,
job->u.apply.args,
job->u.apply.kw);
Py_XDECREF(result);
if (! result)
{
PyErr_Print();
}
cReactorJob_Destroy(job);
PyThreadState_Swap(old_thread_state);
PyEval_ReleaseLock();
break;
case CREACTOR_JOB_EXIT:
done = 1;
cReactorJob_Destroy(job);
break;
}
}
PyThreadState_Delete(thread_state);
return NULL;
}
static void
wake_up_internal(cReactor *reactor)
{
char byte;
byte = 'W';
write(reactor->ctrl_pipe, &byte, 1);
}
PyObject *
cReactorThread_callInThread(PyObject *self, PyObject *args, PyObject *kw)
{
PyObject *req_args;
PyObject *callable;
cReactor *reactor;
PyObject *callable_args;
cReactorJob *job;
reactor = (cReactor *)self;
req_args = PyTuple_GetSlice(args, 0, 1);
if (!PyArg_ParseTuple(req_args, "O:callInThread", &callable))
{
Py_DECREF(req_args);
return NULL;
}
Py_DECREF(req_args);
if (!PyCallable_Check(callable))
{
PyErr_SetString(PyExc_ValueError,
"callInThread arg 1 is not callable!");
return NULL;
}
if (!reactor->thread_pool) {
if (cReactorThread_initThreadpool(reactor) != 0)
return NULL;
}
callable_args = PyTuple_GetSlice(args, 1, PyTuple_Size(args));
job = cReactorJob_NewApply(callable, callable_args, kw);
Py_DECREF(callable_args);
cReactorJobQueue_AddJob(reactor->worker_queue, job);
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactorThread_callFromThread(PyObject *self, PyObject *args, PyObject *kw)
{
PyObject *req_args;
PyObject *callable;
cReactor *reactor;
PyObject *callable_args;
cReactorJob *job;
reactor = (cReactor *)self;
req_args = PyTuple_GetSlice(args, 0, 1);
if (!PyArg_ParseTuple(req_args, "O:callFromThread", &callable))
{
Py_DECREF(req_args);
return NULL;
}
Py_DECREF(req_args);
if (!PyCallable_Check(callable))
{
PyErr_SetString(PyExc_ValueError, "callFromThread arg 1 is not callable!");
return NULL;
}
if (! reactor->multithreaded)
{
PyErr_SetString(PyExc_RuntimeError,
"callFromThread received before initThreading!");
return NULL;
}
callable_args = PyTuple_GetSlice(args, 1, PyTuple_Size(args));
job = cReactorJob_NewApply(callable, callable_args, kw);
Py_DECREF(callable_args);
cReactorJobQueue_AddJob(reactor->main_queue, job);
wake_up_internal(reactor);
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactorThread_wakeUp(PyObject *self, PyObject *args)
{
cReactor *reactor;
if (!PyArg_ParseTuple(args, ":wakeUp"))
{
return NULL;
}
reactor = (cReactor *)self;
wake_up_internal(reactor);
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactorThread_suggestThreadPoolSize(PyObject *self, PyObject *args)
{
int pool_size;
cReactor *reactor;
reactor = (cReactor *)self;
if (!PyArg_ParseTuple(args, "i:suggestThreadPoolSize", &pool_size))
{
return NULL;
}
reactor->req_thread_pool_size = pool_size;
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactorThread_initThreading(PyObject *self, PyObject *args)
{
cReactor *reactor;
reactor = (cReactor *)self;
if (!PyArg_ParseTuple(args, ":initThreading"))
{
return NULL;
}
if (! reactor->multithreaded)
{
PyEval_InitThreads();
reactor->multithreaded = 1;
reactor->main_queue = cReactorJobQueue_New();
}
Py_INCREF(Py_None);
return Py_None;
}
static int
cReactorThread_initThreadpool(cReactor *reactor)
{
PyThreadState *thread_state;
int i;
cReactorThread *thread;
if (reactor->thread_pool)
return 0;
if (!reactor->multithreaded)
{
PyObject *threadable_init, *obj;
threadable_init = cReactorUtil_FromImport("twisted.python.threadable",
"init");
if (!threadable_init)
return -1;
obj = PyObject_CallFunction(threadable_init, "(i)", 1);
Py_DECREF(threadable_init);
Py_XDECREF(obj);
if (!obj)
return -1;
if (!reactor->multithreaded) {
PyErr_SetString(PyExc_RuntimeError,
"initThreading failed to init threading");
}
}
reactor->worker_queue = cReactorJobQueue_New();
if (reactor->req_thread_pool_size < 1)
{
reactor->req_thread_pool_size = 1;
}
thread_state = PyThreadState_Get();
for (i = 0; i < reactor->req_thread_pool_size; ++i)
{
thread = (cReactorThread *)malloc(sizeof(cReactorThread));
if (!thread) {
PyErr_SetString(PyExc_MemoryError,
"could not allocate a worker thread");
return -1;
}
memset(thread, 0x00, sizeof(cReactorThread));
thread->reactor = reactor;
thread->interp = thread_state->interp;
thread->next = reactor->thread_pool;
reactor->thread_pool = thread;
pthread_create(&thread->thread_id,
NULL,
worker_thread_main,
thread);
}
return 0;
}
void
cReactorThread_freeThreadpool(cReactor *reactor)
{
cReactorThread *thread;
PyThreadState *thread_state;
if (! reactor->multithreaded)
{
return;
}
thread_state = PyThreadState_Swap(NULL);
PyEval_ReleaseLock();
thread = reactor->thread_pool;
while (thread)
{
cReactorJobQueue_AddJob(reactor->worker_queue, cReactorJob_NewExit());
thread = thread->next;
}
thread = reactor->thread_pool;
while (thread)
{
pthread_join(thread->thread_id, NULL);
thread = thread->next;
}
PyEval_AcquireLock();
PyThreadState_Swap(thread_state);
}
cReactorJob *
cReactorJob_NewApply(PyObject *callable, PyObject *args, PyObject *kw)
{
cReactorJob *job;
job = (cReactorJob *)malloc(sizeof(cReactorJob));
memset(job, 0x00, sizeof(cReactorJob));
job->type = CREACTOR_JOB_APPLY;
Py_INCREF(callable);
job->u.apply.callable = callable;
Py_XINCREF(args);
job->u.apply.args = args;
Py_XINCREF(kw);
job->u.apply.kw = kw;
return job;
}
cReactorJob *
cReactorJob_NewExit(void)
{
cReactorJob *job;
job = (cReactorJob *)malloc(sizeof(cReactorJob));
memset(job, 0x00, sizeof(cReactorJob));
job->type = CREACTOR_JOB_EXIT;
return job;
}
void
cReactorJob_Destroy(cReactorJob *job)
{
switch (job->type)
{
case CREACTOR_JOB_APPLY:
Py_DECREF(job->u.apply.callable);
Py_XDECREF(job->u.apply.args);
Py_XDECREF(job->u.apply.kw);
break;
case CREACTOR_JOB_EXIT:
break;
}
free(job);
}
cReactorJobQueue *
cReactorJobQueue_New(void)
{
cReactorJobQueue *queue;
queue = (cReactorJobQueue *)malloc(sizeof(cReactorJobQueue));
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
queue->jobs = NULL;
return queue;
}
void
cReactorJobQueue_Destroy(cReactorJobQueue *queue)
{
if (queue)
{
pthread_mutex_destroy(&queue->lock);
pthread_cond_destroy(&queue->cond);
free(queue);
}
}
void
cReactorJobQueue_AddJob(cReactorJobQueue *queue, cReactorJob *job)
{
cReactorJob *search;
pthread_mutex_lock(&queue->lock);
search = queue->jobs;
if (search)
{
while (search->next)
{
search = search->next;
}
search->next = job;
job->next = NULL;
}
else
{
queue->jobs = job;
job->next = NULL;
}
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}
cReactorJob *
cReactorJobQueue_Pop(cReactorJobQueue *queue)
{
cReactorJob *job = NULL;
pthread_mutex_lock(&queue->lock);
if (queue->jobs)
{
job = queue->jobs;
queue->jobs = job->next;
job->next = NULL;
}
pthread_mutex_unlock(&queue->lock);
return job;
}
cReactorJob *
cReactorJobQueue_PopWait(cReactorJobQueue *queue)
{
cReactorJob *job = NULL;
pthread_mutex_lock(&queue->lock);
while (! job)
{
if (! queue->jobs)
{
pthread_cond_wait(&queue->cond, &queue->lock);
}
if (queue->jobs)
{
job = queue->jobs;
queue->jobs = job->next;
job->next = NULL;
}
}
pthread_mutex_unlock(&queue->lock);
return job;
}