cReactorTransport.c [plain text]
#include "cReactor.h"
#include <unistd.h>
staticforward PyTypeObject cReactorTransportType;
static PyObject * cReactorTransport__implements__ = NULL;
void
cReactorTransport_Read(cReactorTransport *transport)
{
if (transport->do_read)
{
(*transport->do_read)(transport);
}
}
void
cReactorTransport_Write(cReactorTransport *transport)
{
PyObject *result;
if ( transport->do_write
&& (cReactorBuffer_DataAvailable(transport->out_buf) > 0))
{
(*transport->do_write)(transport);
}
if ( transport->producer
&& (transport->producer_streaming == 0)
&& (cReactorBuffer_DataAvailable(transport->out_buf) == 0))
{
result = PyObject_CallMethod(transport->producer, "resumeProducing", NULL);
Py_XDECREF(result);
if (!result)
{
PyErr_Print();
}
}
}
void
cReactorTransport_Close(cReactorTransport *transport)
{
PyObject *result;
if (transport->producer)
{
result = PyObject_CallMethod(transport->producer, "stopProducing", NULL);
Py_XDECREF(result);
if (!result)
{
PyErr_Print();
}
Py_DECREF(transport->producer);
transport->producer = NULL;
}
if (transport->do_close)
{
(*transport->do_close)(transport);
}
}
static PyObject *
cReactorTransport_write(PyObject *self, PyObject *args)
{
char *data;
int data_len;
cReactorTransport *transport = (cReactorTransport *)self;
if (!PyArg_ParseTuple(args, "s#:write", &data, &data_len))
{
return NULL;
}
if (! transport->out_buf)
{
transport->out_buf = cReactorBuffer_New(data_len * 2);
}
cReactorBuffer_Write(transport->out_buf, data, data_len);
*transport->event_mask = (*transport->event_mask) | POLLOUT;
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
cReactorTransport_loseConnection(PyObject *self, PyObject *args)
{
cReactorTransport *transport;
transport = (cReactorTransport *)self;
if (!PyArg_ParseTuple(args, ":loseConnection"))
{
return NULL;
}
transport->state = CREACTOR_TRANSPORT_STATE_CLOSING;
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
cReactorTransport_getPeer(PyObject *self, PyObject *args)
{
cReactorTransport *transport;
transport = (cReactorTransport *)self;
if (!PyArg_ParseTuple(args, ":getPeer"))
{
return NULL;
}
if (!transport->get_peer)
{
PyErr_SetString(PyExc_NotImplementedError, "getPeer");
return NULL;
}
return (*transport->get_peer)(transport);
}
static PyObject *
cReactorTransport_getHost(PyObject *self, PyObject *args)
{
cReactorTransport *transport;
transport = (cReactorTransport *)self;
if (!PyArg_ParseTuple(args, ":getHost"))
{
return NULL;
}
if (!transport->get_host)
{
PyErr_SetString(PyExc_NotImplementedError, "getHost");
return NULL;
}
return (*transport->get_host)(transport);
}
static PyObject *
cReactorTransport_registerProducer(PyObject *self, PyObject *args)
{
PyObject *producer;
int streaming;
cReactorTransport *transport;
transport = (cReactorTransport *)self;
if (!PyArg_ParseTuple(args, "Oi:registerProducer", &producer, &streaming))
{
return NULL;
}
if (transport->producer)
{
PyErr_SetString(PyExc_ValueError, "a producer is already registered!");
return NULL;
}
Py_INCREF(producer);
transport->producer = producer;
transport->producer_streaming = streaming;
*transport->event_mask = (*transport->event_mask) | POLLOUT;
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
cReactorTransport_unregisterProducer(PyObject *self, PyObject *args)
{
cReactorTransport *transport;
transport = (cReactorTransport *)self;
if (!PyArg_ParseTuple(args, ":unregisterProducer"))
{
return NULL;
}
Py_XDECREF(transport->producer);
transport->producer = NULL;
Py_INCREF(Py_None);
return Py_None;
}
cReactorTransport *
cReactorTransport_New(cReactor *reactor,
int fd,
cReactorTransportReadFunc do_read,
cReactorTransportWriteFunc do_write,
cReactorTransportCloseFunc do_close)
{
cReactorTransport *transport;
static const char *interfaces[] =
{
"ITransport",
"IConsumer",
};
if (! cReactorTransport__implements__)
{
cReactorTransport__implements__ = cReactorUtil_MakeImplements(interfaces,
sizeof(interfaces) / sizeof(interfaces[0]));
if (! cReactorTransport__implements__)
{
return NULL;
}
}
cReactorTransportType.ob_type = &PyType_Type;
transport = PyObject_New(cReactorTransport, &cReactorTransportType);
transport->next = NULL;
transport->state = CREACTOR_TRANSPORT_STATE_ACTIVE;
transport->fd = fd;
transport->event_mask = NULL;
transport->do_read = do_read;
transport->do_write = do_write;
transport->do_close = do_close;
transport->get_peer = NULL;
transport->get_host = NULL;
transport->out_buf = NULL;
transport->object = NULL;
Py_INCREF(reactor);
transport->reactor = reactor;
transport->producer = NULL;
transport->producer_streaming = 0;
return transport;
}
static void
cReactorTransport_dealloc(PyObject *self)
{
cReactorTransport *transport;
transport = (cReactorTransport *)self;
cReactorBuffer_Destroy(transport->out_buf);
transport->out_buf = NULL;
Py_DECREF(transport->reactor);
transport->reactor = NULL;
PyObject_Del(self);
}
static PyMethodDef cReactorTransport_methods[] =
{
{ "write", cReactorTransport_write, METH_VARARGS, "write" },
{ "loseConnection", cReactorTransport_loseConnection, METH_VARARGS, "loseConnection" },
{ "getPeer", cReactorTransport_getPeer, METH_VARARGS, "getPeer" },
{ "getHost", cReactorTransport_getHost, METH_VARARGS, "getHost" },
{ "registerProducer", cReactorTransport_registerProducer, METH_VARARGS, "registerProducer" },
{ "unregisterProducer", cReactorTransport_unregisterProducer, METH_VARARGS, "unregisterProducer" },
{ NULL, NULL, METH_VARARGS, NULL },
};
static PyObject *
cReactorTransport_getattr(PyObject *self, char *attr)
{
PyObject *obj;
cReactorTransport *transport = (cReactorTransport *)self;
obj = Py_FindMethod(cReactorTransport_methods, self, attr);
if (obj)
{
return obj;
}
PyErr_Clear();
if (strcmp(attr, "__implements__") == 0)
{
Py_INCREF(cReactorTransport__implements__);
return cReactorTransport__implements__;
}
else if (strcmp(attr, "disconnecting") == 0)
{
return PyInt_FromLong(transport->state >= CREACTOR_TRANSPORT_STATE_CLOSING);
}
PyErr_SetString(PyExc_AttributeError, attr);
return NULL;
}
static PyObject *
cReactorTransport_repr(PyObject *self)
{
UNUSED(self);
return PyString_FromString("<cReactorTransport>");
}
static PyTypeObject cReactorTransportType =
{
PyObject_HEAD_INIT(NULL)
0,
"cReactorTransport",
sizeof(cReactorTransport),
0,
cReactorTransport_dealloc,
NULL,
cReactorTransport_getattr,
NULL,
NULL,
cReactorTransport_repr,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
0,
NULL,
NULL,
NULL,
NULL,
0,
};