#include "cReactor.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <netdb.h>
staticforward PyTypeObject cReactorType;
static PyMethodDef cReactor_methods[] =
{
{ "resolve", (PyCFunction)cReactor_resolve,
(METH_VARARGS | METH_KEYWORDS), "resolve" },
{ "run", cReactor_run,
METH_VARARGS, "run" },
{ "stop", cReactor_stop,
METH_VARARGS, "stop" },
{ "crash", cReactor_crash,
METH_VARARGS, "crash" },
{ "iterate", (PyCFunction)cReactor_iterate,
(METH_VARARGS | METH_KEYWORDS), "iterate" },
{ "fireSystemEvent", cReactor_fireSystemEvent,
METH_VARARGS, "fireSystemEvent" },
{ "addSystemEventTrigger", (PyCFunction)cReactor_addSystemEventTrigger,
(METH_VARARGS | METH_KEYWORDS), "addSystemEventTrigger" },
{ "removeSystemEventTrigger", cReactor_removeSystemEventTrigger,
METH_VARARGS, "removeSystemEventTrigger" },
{ "callLater", (PyCFunction)cReactorTime_callLater,
(METH_VARARGS | METH_KEYWORDS), "callLater" },
{ "getDelayedCalls", cReactorTime_getDelayedCalls,
METH_VARARGS, "getDelayedCalls" },
{ "cancelCallLater", cReactorTime_cancelCallLater,
METH_KEYWORDS, "cancelCallLater" },
{ "listenTCP", (PyCFunction)cReactorTCP_listenTCP,
(METH_VARARGS | METH_KEYWORDS), "listenTCP" },
{ "connectTCP", cReactorTCP_connectTCP,
METH_VARARGS, "connectTCP" },
{ "callFromThread", (PyCFunction)cReactorThread_callFromThread,
(METH_VARARGS | METH_KEYWORDS), "callFromThread" },
{ "callInThread", (PyCFunction)cReactorThread_callInThread,
(METH_VARARGS | METH_KEYWORDS), "callInThread" },
{ "suggestThreadPoolSize", cReactorThread_suggestThreadPoolSize,
METH_VARARGS, "suggestThreadPoolSize" },
{ "wakeUp", cReactorThread_wakeUp,
METH_VARARGS, "wakeUp" },
{ "initThreading", cReactorThread_initThreading,
METH_VARARGS, "initThreading" },
{ NULL, NULL, METH_VARARGS, NULL },
};
PyObject *
cReactor_not_implemented(PyObject *self,
PyObject *args,
const char *text)
{
UNUSED(self);
UNUSED(args);
PyErr_SetString(PyExc_NotImplementedError, text);
return NULL;
}
PyObject *
cReactor_resolve(PyObject *self, PyObject *args, PyObject *kw)
{
cReactor *reactor;
const char *name;
struct hostent *host;
PyObject *defer;
PyObject *defer_args;
PyObject *callback;
PyObject *errback;
struct in_addr addr;
int type = 1;
int timeout = 10;
static char *kwlist[] = { "name", "type", "timeout", NULL };
reactor = (cReactor *)self;
if (!PyArg_ParseTupleAndKeywords(args, kw, "s|ii:resolve", kwlist,
&name, &type, &timeout))
{
return NULL;
}
defer = cReactorUtil_CreateDeferred();
if (!defer)
{
return NULL;
}
errback = PyObject_GetAttrString(defer, "errback");
if (!errback)
{
Py_DECREF(defer);
return NULL;
}
callback = PyObject_GetAttrString(defer, "callback");
if (!callback)
{
Py_DECREF(defer);
Py_DECREF(errback);
return NULL;
}
if (type == 1)
{
host = gethostbyname(name);
if (host)
{
if (host->h_length == sizeof(addr))
{
memcpy(&addr, host->h_addr_list[0], host->h_length);
defer_args = Py_BuildValue("(s)", inet_ntoa(addr));
cReactorUtil_AddDelayedCall(reactor, 0, callback, defer_args, NULL);
}
else
{
defer_args = Py_BuildValue("(s)", "h_length != sizeof(addr)");
cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL);
}
Py_DECREF(defer_args);
}
else
{
defer_args = Py_BuildValue("(s)", hstrerror(h_errno));
cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL);
Py_DECREF(defer_args);
}
}
else
{
defer_args = Py_BuildValue("(s)", "only type 1 is supported");
cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL);
Py_DECREF(defer_args);
}
Py_DECREF(errback);
Py_DECREF(callback);
return defer;
}
void
cReactor_stop_finish(cReactor *reactor)
{
reactor->state = CREACTOR_STATE_STOPPED;
}
static void
stop_internal(cReactor *reactor)
{
reactor->state = CREACTOR_STATE_STOPPING;
fireSystemEvent_internal(reactor, "shutdown");
}
PyObject *
cReactor_stop(PyObject *self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ":stop"))
{
return NULL;
}
stop_internal((cReactor *)self);
Py_INCREF(Py_None);
return Py_None;
}
static volatile int received_signal;
static void
cReactor_sighandler(int sig)
{
received_signal = sig;
}
static void
iterate_rebuild_pollfd_arrray(cReactor *reactor)
{
unsigned int num_transports;
struct pollfd *pfd;
cReactorTransport *transport;
cReactorTransport *shadow;
cReactorTransport *target;
if (reactor->pollfd_size < reactor->num_transports)
{
if (reactor->pollfd_array)
{
free(reactor->pollfd_array);
}
reactor->pollfd_size = reactor->num_transports * 2;
reactor->pollfd_array = (struct pollfd *)malloc(sizeof(struct pollfd) * reactor->pollfd_size);
}
num_transports = 0;
pfd = reactor->pollfd_array;
transport = reactor->transports;
shadow = NULL;
while (transport)
{
if (transport->state == CREACTOR_TRANSPORT_STATE_CLOSED)
{
target = transport;
transport = transport->next;
if (shadow)
{
shadow->next = transport;
}
else
{
reactor->transports = transport;
}
cReactorTransport_Close(target);
Py_DECREF((PyObject *)target);
}
else
{
pfd->fd = transport->fd;
pfd->events = 0;
if ( (transport->state == CREACTOR_TRANSPORT_STATE_ACTIVE)
&& transport->do_read)
{
pfd->events |= POLLIN;
}
if ( transport->do_write
&& ( (cReactorBuffer_DataAvailable(transport->out_buf) > 0)
|| transport->producer))
{
pfd->events |= POLLOUT;
}
transport->event_mask = &pfd->events;
++pfd;
++num_transports;
shadow = transport;
transport = transport->next;
}
}
reactor->num_transports = num_transports;
reactor->pollfd_stale = 0;
}
static void
iterate_process_pollfd_array(cReactor *reactor)
{
struct pollfd *pfd;
cReactorTransport *transport;
for (pfd = reactor->pollfd_array, transport = reactor->transports;
transport;
++pfd, transport = transport->next)
{
if (pfd->fd != transport->fd)
{
kill(0, SIGTRAP);
}
if (! pfd->revents)
{
continue;
}
if (pfd->revents & POLLIN)
{
cReactorTransport_Read(transport);
}
if (pfd->revents & POLLOUT)
{
cReactorTransport_Write(transport);
}
if (pfd->revents & (~(POLLIN | POLLOUT)))
{
transport->state = CREACTOR_TRANSPORT_STATE_CLOSED;
reactor->pollfd_stale = 1;
}
}
}
static void
ctrl_pipe_do_read(cReactorTransport *transport)
{
char buf[16];
read(transport->fd, buf, sizeof(buf));
}
static int
iterate_internal_init(cReactor *reactor)
{
cReactorTransport *transport;
int ctrl_pipes[2];
received_signal = 0;
signal(SIGINT, cReactor_sighandler);
signal(SIGTERM, cReactor_sighandler);
if (pipe(ctrl_pipes) < 0)
{
PyErr_SetFromErrno(PyExc_RuntimeError);
return -1;
}
if (fcntl(ctrl_pipes[0], F_SETFL, O_NONBLOCK) < 0)
{
close(ctrl_pipes[0]);
close(ctrl_pipes[1]);
PyErr_SetFromErrno(PyExc_RuntimeError);
return -1;
}
reactor->ctrl_pipe = ctrl_pipes[1];
transport = cReactorTransport_New(reactor,
ctrl_pipes[0],
ctrl_pipe_do_read,
NULL,
NULL);
cReactor_AddTransport(reactor, transport);
return 0;
}
static int
iterate_internal(cReactor *reactor, int delay)
{
int method_delay;
int sleep_delay;
PyObject *result;
cReactorJob *job;
int poll_res;
PyThreadState *thread_state = NULL;
method_delay = cReactorUtil_NextMethodDelay(reactor);
if (method_delay < 0)
{
sleep_delay = delay;
}
else if (delay >= 0)
{
sleep_delay = (method_delay < delay) ? method_delay : delay;
}
else
{
sleep_delay = method_delay;
}
if (reactor->pollfd_stale)
{
iterate_rebuild_pollfd_arrray(reactor);
}
if (reactor->multithreaded)
{
thread_state = PyThreadState_Swap(NULL);
PyEval_ReleaseLock();
}
poll_res = poll(reactor->pollfd_array,
reactor->num_transports,
sleep_delay);
if (reactor->multithreaded)
{
PyEval_AcquireLock();
PyThreadState_Swap(thread_state);
}
if (poll_res < 0)
{
if (errno != EINTR)
{
PyErr_SetFromErrno(PyExc_RuntimeError);
return -1;
}
}
else
{
iterate_process_pollfd_array(reactor);
}
cReactorUtil_RunDelayedCalls(reactor);
if (reactor->main_queue)
{
for ( ; ; )
{
job = cReactorJobQueue_Pop(reactor->main_queue);
if (! job)
{
break;
}
switch (job->type)
{
case CREACTOR_JOB_APPLY:
result = PyEval_CallObjectWithKeywords(job->u.apply.callable,
job->u.apply.args,
job->u.apply.kw);
Py_XDECREF(result);
if (! result)
{
PyErr_Print();
}
break;
case CREACTOR_JOB_EXIT:
break;
}
cReactorJob_Destroy(job);
}
}
if (received_signal)
{
if (reactor->state == CREACTOR_STATE_RUNNING)
{
stop_internal(reactor);
}
}
return 0;
}
PyObject *
cReactor_iterate(PyObject *self, PyObject *args, PyObject *kw)
{
cReactor *reactor;
PyObject *delay_obj = NULL;
int delay = 0;
static char *kwlist[] = { "delay", NULL };
reactor = (cReactor *)self;
if (!PyArg_ParseTupleAndKeywords(args, kw, "|O:delay", kwlist, &delay_obj))
{
return NULL;
}
if (delay_obj)
{
delay = cReactorUtil_ConvertDelay(delay_obj);
if (delay < 0)
{
return NULL;
}
}
if (iterate_internal(reactor, delay) < 0)
{
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactor_run(PyObject *self, PyObject *args)
{
cReactor *reactor;
reactor = (cReactor *)self;
if (!PyArg_ParseTuple(args, ":run"))
{
return NULL;
}
if (reactor->state != CREACTOR_STATE_STOPPED)
{
if (reactor->state == CREACTOR_STATE_RUNNING)
PyErr_SetString(PyExc_RuntimeError,
"the reactor was already running!");
else
PyErr_SetString(PyExc_RuntimeError,
"the reactor was trying to stop!");
return NULL;
}
reactor->state = CREACTOR_STATE_RUNNING;
fireSystemEvent_internal(reactor, "startup");
while (reactor->state != CREACTOR_STATE_STOPPED)
{
if (iterate_internal(reactor, -1) < 0)
{
return NULL;
}
}
cReactorThread_freeThreadpool(reactor);
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactor_crash(PyObject *self, PyObject *args)
{
cReactor *reactor;
reactor = (cReactor *)self;
if (!PyArg_ParseTuple(args, ":crash"))
{
return NULL;
}
reactor->state = CREACTOR_STATE_STOPPED;
Py_INCREF(Py_None);
return Py_None;
}
void
cReactor_AddTransport(cReactor *reactor, cReactorTransport *transport)
{
transport->next = reactor->transports;
reactor->transports = transport;
++(reactor->num_transports);
reactor->pollfd_stale = 1;
}
static int
cReactor_init(cReactor *reactor)
{
PyObject *when_threaded;
PyObject *init_threading;
PyObject *obj;
static const char * interfaces[] =
{
"IReactorCore",
"IReactorTime",
"IReactorTCP",
"IReactorThreads",
};
obj = cReactorUtil_MakeImplements(interfaces, sizeof(interfaces) / sizeof(interfaces[0]));
if (!obj)
{
return -1;
}
if (PyDict_SetItemString(reactor->attr_dict, "__implements__", obj) != 0)
{
Py_DECREF(obj);
return -1;
}
obj = (PyObject *)reactor->ob_type;
if (PyDict_SetItemString(reactor->attr_dict, "__class__", obj) != 0)
{
return -1;
}
reactor->state = CREACTOR_STATE_STOPPED;
when_threaded = cReactorUtil_FromImport("twisted.python.threadable",
"whenThreaded");
if (! when_threaded)
{
return -1;
}
init_threading = Py_FindMethod(cReactor_methods,
(PyObject *)reactor,
"initThreading");
if (! init_threading)
{
Py_DECREF(when_threaded);
return -1;
}
obj = PyObject_CallFunction(when_threaded, "(O)", init_threading);
Py_DECREF(when_threaded);
Py_DECREF(init_threading);
Py_XDECREF(obj);
if (! obj)
{
return -1;
}
if (iterate_internal_init(reactor))
return -1;
return 0;
}
PyObject *
cReactor_New(void)
{
cReactor *reactor;
cReactorType.ob_type = &PyType_Type;
reactor = PyObject_New(cReactor, &cReactorType);
reactor->ctrl_pipe = -1;
reactor->attr_dict = PyDict_New();
reactor->timed_methods = NULL;
reactor->event_triggers = NULL;
reactor->transports = NULL;
reactor->num_transports = 0;
reactor->pollfd_array = NULL;
reactor->pollfd_size = 0;
reactor->pollfd_stale = 0;
reactor->multithreaded = 0;
reactor->main_queue = NULL;
reactor->thread_pool = NULL;
reactor->worker_queue = NULL;
reactor->req_thread_pool_size = 3;
if ( (! reactor->attr_dict)
|| (cReactor_init(reactor) < 0))
{
Py_DECREF((PyObject *)reactor);
return NULL;
}
return (PyObject *)reactor;
}
static void
cReactor_dealloc(PyObject *self)
{
cReactor *reactor;
cReactorTransport *transport;
cReactorTransport *target;
reactor = (cReactor *)self;
Py_DECREF(reactor->attr_dict);
reactor->attr_dict = NULL;
cReactorUtil_DestroyDelayedCalls(reactor);
reactor->timed_methods = NULL;
cSystemEvent_FreeTriggers(reactor->event_triggers);
reactor->event_triggers = NULL;
transport = reactor->transports;
while (transport)
{
target = transport;
transport = transport->next;
Py_DECREF(target);
}
reactor->transports = NULL;
free(reactor->pollfd_array);
reactor->pollfd_array = NULL;
PyObject_Del(self);
}
static PyObject *
cReactor_getattr(PyObject *self, char *attr_name)
{
cReactor *reactor;
PyObject *obj;
reactor = (cReactor *)self;
obj = Py_FindMethod(cReactor_methods, self, attr_name);
if (obj)
{
return obj;
}
PyErr_Clear();
if (!strcmp("__dict__", attr_name))
{
return reactor->attr_dict;
}
obj = PyDict_GetItemString(reactor->attr_dict, attr_name);
if (!obj)
{
PyErr_SetString(PyExc_AttributeError, attr_name);
return NULL;
}
Py_INCREF(obj);
return obj;
}
static PyObject *
cReactor_repr(PyObject *self)
{
char buf[100];
snprintf(buf, sizeof(buf) - 1, "<cReactor instance %p>", self);
buf[sizeof(buf) - 1] = 0x00;
return PyString_FromString(buf);
}
static PyTypeObject cReactorType =
{
PyObject_HEAD_INIT(NULL)
0,
"cReactor",
sizeof(cReactor),
0,
cReactor_dealloc,
NULL,
cReactor_getattr,
NULL,
NULL,
cReactor_repr,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
0,
NULL,
NULL,
NULL,
NULL,
0,
};