#include "Python.h"
#include <sys/poll.h>
#include <pthread.h>
#define UNUSED(x) ((void)x)
enum
{
CREACTOR_NUM_EVENT_PHASES = 3,
};
typedef enum _cReactorEventPhase
{
CREACTOR_EVENT_PHASE_BEFORE = 0,
CREACTOR_EVENT_PHASE_DURING = 1,
CREACTOR_EVENT_PHASE_AFTER = 2,
} cReactorEventPhase;
typedef enum _cReactorState
{
CREACTOR_STATE_STOPPED = 0,
CREACTOR_STATE_RUNNING = 1,
CREACTOR_STATE_STOPPING = 2,
} cReactorState;
typedef enum _cReactorTransportState
{
CREACTOR_TRANSPORT_STATE_ACTIVE = 0,
CREACTOR_TRANSPORT_STATE_CLOSING = 1,
CREACTOR_TRANSPORT_STATE_CLOSED = 2,
} cReactorTransportState;
typedef struct _cDelayedCall cDelayedCall;
typedef struct _cReactorMethod cReactorMethod;
typedef struct _cReactorBuffer cReactorBuffer;
typedef struct _cReactorTransport cReactorTransport;
typedef struct _cReactor cReactor;
typedef struct _cReactorJob cReactorJob;
typedef struct _cReactorJobQueue cReactorJobQueue;
typedef struct _cReactorThread cReactorThread;
typedef struct _cEventTriggers cEventTriggers;
typedef enum _cReactorJobType
{
CREACTOR_JOB_APPLY = 1,
CREACTOR_JOB_EXIT = 2,
} cReactorJobType;
struct _cReactorJob
{
cReactorJob * next;
cReactorJobType type;
union
{
struct
{
PyObject * callable;
PyObject * args;
PyObject * kw;
} apply;
} u;
};
struct _cReactorJobQueue
{
pthread_mutex_t lock;
pthread_cond_t cond;
cReactorJob * jobs;
};
struct _cReactorThread
{
cReactorThread * next;
pthread_t thread_id;
cReactor * reactor;
PyInterpreterState * interp;
};
typedef void (* cReactorTransportReadFunc)(cReactorTransport *transport);
typedef void (* cReactorTransportWriteFunc)(cReactorTransport *transport);
typedef void (* cReactorTransportCloseFunc)(cReactorTransport *transport);
typedef PyObject * (* cReactorTransportGetPeerFunc)(cReactorTransport *transport);
typedef PyObject * (* cReactorTransportGetHostFunc)(cReactorTransport *transport);
struct _cReactorTransport
{
PyObject_HEAD
cReactorTransport * next;
cReactorTransportState state;
int fd;
short * event_mask;
cReactorTransportReadFunc do_read;
cReactorTransportWriteFunc do_write;
cReactorTransportCloseFunc do_close;
cReactorTransportGetPeerFunc get_peer;
cReactorTransportGetHostFunc get_host;
cReactorBuffer * out_buf;
PyObject * object;
cReactor * reactor;
PyObject * producer;
int producer_streaming;
};
struct _cDelayedCall
{
PyObject_HEAD
cReactor * reactor;
struct timeval call_time;
PyObject * callable;
PyObject * args;
PyObject * kw;
int called;
struct _cDelayedCall * next;
};
struct _cReactor
{
PyObject_HEAD
cReactorState state;
int ctrl_pipe;
PyObject * attr_dict;
cDelayedCall * timed_methods;
cEventTriggers * event_triggers;
cReactorTransport * transports;
unsigned int num_transports;
struct pollfd * pollfd_array;
unsigned int pollfd_size;
int pollfd_stale;
int multithreaded;
cReactorJobQueue * main_queue;
cReactorThread * thread_pool;
cReactorJobQueue * worker_queue;
int req_thread_pool_size;
};
PyObject * cReactor_New(void);
PyObject * cReactor_resolve(PyObject *self, PyObject *args, PyObject *kw);
PyObject * cReactor_run(PyObject *self, PyObject *args);
PyObject * cReactor_stop(PyObject *self, PyObject *args);
void cReactor_stop_finish(cReactor *reactor);
PyObject * cReactor_crash(PyObject *self, PyObject *args);
PyObject * cReactor_iterate(PyObject *self, PyObject *args, PyObject *kw);
PyObject * cReactor_fireSystemEvent(PyObject *self, PyObject *args);
PyObject * cReactor_addSystemEventTrigger(PyObject *self, PyObject *args, PyObject *kw);
PyObject * cReactor_removeSystemEventTrigger(PyObject *self, PyObject *args);
void fireSystemEvent_internal(cReactor *reactor, const char *event_type);
void cSystemEvent_FreeTriggers(cEventTriggers *triggers);
cReactorTransport * cReactorTransport_New(cReactor *reactor,
int fd,
cReactorTransportReadFunc do_read,
cReactorTransportWriteFunc do_write,
cReactorTransportCloseFunc do_close);
void cReactorTransport_Read(cReactorTransport *transport);
void cReactorTransport_Write(cReactorTransport *transport);
void cReactorTransport_Close(cReactorTransport *transport);
cReactorBuffer * cReactorBuffer_New(unsigned int size);
void cReactorBuffer_Destroy(cReactorBuffer *buffer);
void cReactorBuffer_Write(cReactorBuffer *buffer, const void *data, unsigned int size);
unsigned int cReactorBuffer_DataAvailable(cReactorBuffer *buffer);
const unsigned char * cReactorBuffer_GetPtr(cReactorBuffer *buffer);
void cReactorBuffer_Seek(cReactorBuffer *buffer, unsigned int forward);
PyObject * cReactorUtil_FromImport(const char *name, const char *from_item);
PyObject * cReactorUtil_MakeImplements(const char **names, unsigned int num_names);
PyObject * cReactorUtil_CreateDeferred(void);
int cReactorUtil_ConvertDelay(PyObject *delay_obj);
cDelayedCall *cReactorUtil_AddDelayedCall(cReactor *reactor,
int delay_ms,
PyObject *callable,
PyObject *args,
PyObject *kw);
void cReactorUtil_InsertDelayedCall(cReactor *reactor, cDelayedCall *call);
int cReactorUtil_RemoveDelayedCall(cReactor *reactor, cDelayedCall *call);
int cReactorUtil_ReInsertDelayedCall(cReactor *reactor, cDelayedCall *call);
int cReactorUtil_NextMethodDelay(cReactor *reactor);
int cReactorUtil_AddMethod(cReactorMethod **list,
PyObject *callable,
PyObject *args,
PyObject *kw);
int cReactorUtil_RemoveMethod(cReactorMethod **list, int call_id);
int cReactorUtil_RunDelayedCalls(cReactor *reactor);
typedef void (*cReactorMethodListIterator)(PyObject *callable,
PyObject *args,
PyObject *kw,
void *user_data);
void cReactorUtil_ForEachMethod(cReactorMethod *list,
cReactorMethodListIterator func,
void *user_data);
void cReactorUtil_DestroyMethods(cReactorMethod *list);
void cReactorUtil_DestroyDelayedCalls(cReactor *reactor);
int cReactorUtil_GetEventPhase(const char *str, cReactorEventPhase *out_phase);
PyObject * cReactor_not_implemented(PyObject *self, PyObject *args, const char *text);
void cReactor_AddTransport(cReactor *reactor, cReactorTransport *transport);
PyObject * cReactorTime_callLater(PyObject *self, PyObject *args, PyObject *kw);
PyObject * cReactorTime_cancelCallLater(PyObject *self, PyObject *args);
PyObject * cReactorTime_getDelayedCalls(PyObject *self, PyObject *args);
PyObject * cReactorTCP_listenTCP(PyObject *self, PyObject *args, PyObject *kw);
PyObject * cReactorTCP_connectTCP(PyObject *self, PyObject *args);
PyObject * cReactorThread_callFromThread(PyObject *self, PyObject *args, PyObject *kw);
PyObject * cReactorThread_callInThread(PyObject *self, PyObject *args, PyObject *kw);
PyObject * cReactorThread_suggestThreadPoolSize(PyObject *self, PyObject *args);
PyObject * cReactorThread_wakeUp(PyObject *self, PyObject *args);
PyObject * cReactorThread_initThreading(PyObject *self, PyObject *args);
void cReactorThread_freeThreadpool(cReactor *reactor);
cReactorJob * cReactorJob_NewApply(PyObject *callable, PyObject *args, PyObject *kw);
cReactorJob * cReactorJob_NewExit(void);
void cReactorJob_Destroy(cReactorJob *job);
cReactorJobQueue * cReactorJobQueue_New(void);
void cReactorJobQueue_Destroy(cReactorJobQueue *queue);
void cReactorJobQueue_AddJob(cReactorJobQueue *queue, cReactorJob *job);
cReactorJob * cReactorJobQueue_Pop(cReactorJobQueue *queue);
cReactorJob * cReactorJobQueue_PopWait(cReactorJobQueue *queue);
void cDelayedCall_init(void);
void cReactorTCP_init(void);
cDelayedCall * cDelayedCall_new(int delay_ms,
PyObject *callable,
PyObject *args,
PyObject *kw);