#include "jabberd.h"
pth_t main__thread = NULL;
typedef struct
{
pth_message_t head;
dpacket p;
} *drop, _drop;
result base_stdout_heartbeat(void *arg)
{
static int parent = 0;
if(parent == 0) parent = getppid();
if(parent != getppid())
{
log_alert("stdout","Parent PID has changed, Server Exiting");
exit(1);
}
return r_DONE;
}
result base_stdout_phandler(instance i, dpacket p, void *arg)
{
pth_msgport_t mp = (pth_msgport_t)arg;
drop d;
log_debug(ZONE,"stdout packet being queued");
d = pmalloco(p->p, sizeof(_drop));
d->p = p;
pth_msgport_put(mp, (pth_message_t *)d);
return r_DONE;
}
void base_stdin_packets(int type, xmlnode x, void *arg)
{
switch(type)
{
case XSTREAM_ROOT:
log_debug(ZONE,"stdin opened stream");
xmlnode_free(x);
break;
case XSTREAM_NODE:
log_debug(ZONE,"stdin incoming packet");
deliver(dpacket_new(x), NULL);
break;
default:
xmlnode_free(x);
}
}
void *base_stdoutin(void *arg)
{
pth_msgport_t mp = (pth_msgport_t)arg;
xstream xs;
int len;
char buff[1024], *block;
dpacket p = NULL;
drop d;
xmlnode x;
pth_event_t eread, emp, ering;
pool xsp;
int sig;
sigset_t sigs;
pth_event_t esig;
sigemptyset(&sigs);
sigaddset(&sigs,SIGUSR2);
log_debug(ZONE,"io thread starting");
x = xstream_header("jabber:component:exec",NULL,NULL);
block = xstream_header_char(x);
MIO_WRITE_FUNC(STDOUT_FILENO,block,strlen(block));
xmlnode_free(x);
xsp = pool_new();
xs = xstream_new(xsp, base_stdin_packets, NULL);
eread = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE,STDIN_FILENO);
emp = pth_event(PTH_EVENT_MSG,mp);
esig = pth_event(PTH_EVENT_SIGS,&sigs,&sig);
ering = pth_event_concat(esig,eread, emp, NULL);
while(pth_wait(ering) > 0)
{
if(pth_event_occurred(esig))
{
break;
}
if(pth_event_occurred(eread))
{
log_debug(ZONE,"stdin read event");
len = MIO_READ_FUNC(STDIN_FILENO, buff, 1024);
if(len <= 0) break;
if(xstream_eat(xs, buff, len) > XSTREAM_NODE) break;
}
if(pth_event_occurred(emp))
{
log_debug(ZONE,"io incoming message event for stdout");
d = (drop)pth_msgport_get(mp);
p = d->p;
block = xmlnode2str(p->x);
if(MIO_WRITE_FUNC(STDOUT_FILENO, block, strlen(block)) <= 0)
break;
pool_free(p->p);
p = NULL;
}
}
log_debug(ZONE,"thread exiting");
pth_event_free(ering, PTH_FREE_ALL);
pth_msgport_destroy(mp);
pool_free(xsp);
return NULL;
}
void base_stdout_shutdown(void *arg)
{
drop d;
pth_msgport_t mp=(pth_msgport_t)arg;
while((d = (drop)pth_msgport_get(mp)) != NULL)
{
pool_free(d->p->p);
}
if(main__thread!=NULL) pth_raise(main__thread,SIGUSR2);
}
result base_stdout_config(instance id, xmlnode x, void *arg)
{
static pth_msgport_t mp = NULL;
if(id == NULL)
{
register_beat(2,base_stdout_heartbeat,NULL);
return r_PASS;
}
log_debug(ZONE,"base_stdout_config performing configuration");
if(mp == NULL)
{
mp = pth_msgport_create("base_stdout");
main__thread = pth_spawn(PTH_ATTR_DEFAULT, base_stdoutin, (void *)mp);
pool_cleanup(id->p, base_stdout_shutdown, (void*)mp);
}
register_phandler(id, o_DELIVER, base_stdout_phandler, (void *)mp);
return r_DONE;
}
void base_stdout(void)
{
log_debug(ZONE,"base_stdout loading...\n");
register_config("stdout",base_stdout_config,NULL);
}