#include "jabberd.h"
int exec_and_capture(char* const args[], int* in, int* out)
{
int left_fds[2], right_fds[2];
int pid;
char *filename;
if (pipe(left_fds) < 0 || pipe(right_fds) < 0)
return r_ERR;
pid = fork();
if (pid < 0)
return r_ERR;
else if (pid > 0)
{
close(left_fds[0]);
close(right_fds[1]);
*in = right_fds[0];
*out = left_fds[1];
return pid;
}
else
{
char *last,*cur;
close(left_fds[1]);
close(right_fds[0]);
if (left_fds[0] != 0)
{
dup2(left_fds[0], 0);
close(left_fds[0]);
}
if (right_fds[1] != 1)
{
dup2(right_fds[1], 1);
close(right_fds[1]);
}
for(last=NULL,cur=strchr(args[0],'/');cur!=NULL;last=cur+1,cur=strchr(last,'/'));
filename=(char*)args[0];
if(last!=NULL)
{
last--;
last[0]='\0';
chdir(args[0]);
filename=last+1;
}
if( execv(filename, args) < 0)
exit(1);
}
return 0;
}
char** tokenize_args(pool p, const char* cmdstr)
{
char** result = NULL;
char* result_array[100];
char* result_data = NULL;
char* token = NULL;
int tokencnt = 0;
int i = 0;
if (cmdstr == NULL)
return NULL;
result_data = pstrdup(p, cmdstr);
while ( (token = strchr(result_data,' ')) != NULL && (tokencnt < 100) )
{
*token++ = '\0';
result_array[tokencnt++] = token;
}
result = pmalloco(p, (tokencnt + 1) * sizeof(char*));
for (i = 0; i < tokencnt; i++)
{
result[i] = result_array[i];
}
result[tokencnt] = NULL;
return result;
}
typedef enum { p_OPEN, p_CLOSED, p_RESTART } pstate;
typedef struct
{
char** args;
int pid;
pstate state;
pool mempool;
instance inst;
int in;
int out;
pth_msgport_t write_queue;
pth_event_t e_write;
pth_event_t e_read;
pth_event_t events;
} *process_info, _process_info;
typedef struct
{
pth_message_t head;
dpacket packet;
} *process_write_buf, _process_write_buf;
result base_exec_deliver(instance i, dpacket p, void* args)
{
process_info pi = (process_info)args;
process_write_buf wb = NULL;
wb = pmalloco(p->p, sizeof(_process_write_buf));
wb->packet = p;
pth_msgport_put(pi->write_queue, (pth_message_t*)wb);
return r_DONE;
}
void base_exec_handle_xstream_event(int type, xmlnode x, void* arg)
{
process_info pi = (process_info)arg;
char* header;
xmlnode headernode;
switch(type)
{
case XSTREAM_ROOT:
if ( j_strcmp(xmlnode_get_attrib(x, "xmlns"), "jabber:component:exec") != 0)
{
log_warn(pi->inst->id, "Recv'd invalid namespace. Stopping component.");
MIO_WRITE_FUNC(pi->out, SERROR_NAMESPACE, strlen(SERROR_NAMESPACE));
pi->state = p_CLOSED;
xmlnode_free(x);
return;
}
headernode = xstream_header("jabber:component:exec",NULL, pi->inst->id);
header = xstream_header_char(headernode);
MIO_WRITE_FUNC(pi->out, header, strlen(header));
xmlnode_free(headernode);
pi->e_write = pth_event(PTH_EVENT_MSG, pi->write_queue);
pi->events = pth_event_concat(pi->e_read, pi->e_write, NULL);
xmlnode_free(x);
break;
case XSTREAM_NODE:
deliver(dpacket_new(x), pi->inst);
break;
case XSTREAM_CLOSE:
case XSTREAM_ERR:
xmlnode_free(x);
pi->state=p_RESTART;
}
}
void* base_exec_process_io(void* threadarg)
{
process_info pi = (process_info)threadarg;
int retcode = 0;
char readbuf[1024];
int readlen = 0;
xstream xs;
process_write_buf pwb;
char* writebuf;
pi->e_read = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE, pi->in);
pi->events = pth_event_concat(pi->e_read, NULL);
xs = xstream_new(pi->mempool, base_exec_handle_xstream_event, threadarg);
while (pth_wait(pi->events) > 0)
{
if (pth_event_occurred(pi->e_read))
{
readlen = MIO_READ_FUNC(pi->in, readbuf, sizeof(readbuf));
if (readlen <= 0)
{
log_debug(ZONE,"base_exec_process_io Read error on process!\n");
break;
}
if (xstream_eat(xs, readbuf, readlen) > XSTREAM_NODE)
break;
if (pi->state == p_CLOSED || pi->state == p_RESTART)
break;
}
if (pth_event_occurred(pi->e_write))
{
pwb = (process_write_buf)pth_msgport_get(pi->write_queue);
if(pwb == NULL) break;
writebuf = xmlnode2tstr(pwb->packet->x);
if (MIO_WRITE_FUNC(pi->out, writebuf, strlen(writebuf)) < 0)
{
log_debug(ZONE,"base_exec_process_io Write error.\n");
pool_free(pwb->packet->p);
break;
}
pool_free(pwb->packet->p);
}
}
close(pi->out);
close(pi->in);
pth_event_free(pi->e_read, PTH_FREE_THIS);
pth_event_free(pi->e_write, PTH_FREE_THIS);
pth_waitpid(pi->pid, &retcode, 0);
if (pi->state != p_CLOSED)
{
pi->pid = exec_and_capture(pi->args, &(pi->in), &(pi->out));
pth_spawn(PTH_ATTR_DEFAULT, base_exec_process_io, (void*) pi);
}
else
{
pth_msgport_destroy(pi->write_queue);
}
return NULL;
}
result base_exec_config(instance id, xmlnode x, void *arg)
{
process_info pi = NULL;
if(id == NULL)
{
if (xmlnode_get_data(x) == NULL)
{
log_debug(ZONE,"base_exec_config error: no script provided\n");
xmlnode_put_attrib(x,"error","'exec' tag must contain a command line to run");
return r_ERR;
}
log_debug(ZONE,"base_exec_config validating configuration\n");
return r_PASS;
}
pi = pmalloco(id->p, sizeof(_process_info));
pi->inst = id;
pi->mempool = id->p;
pi->write_queue = pth_msgport_create(id->id);
pi->state = p_OPEN;
pi->args = tokenize_args(pi->mempool, xmlnode_get_data(x));
pi->pid = exec_and_capture(pi->args, &(pi->in), &(pi->out));
pth_spawn(PTH_ATTR_DEFAULT, base_exec_process_io, (void*) pi);
register_phandler(id, o_DELIVER, base_exec_deliver, (void*) pi);
log_debug(ZONE,"base_exec_config performing configuration %s\n",xmlnode2str(x));
return r_DONE;
}
void base_exec(void)
{
log_debug(ZONE,"base_exec loading...\n");
register_config("exec",base_exec_config,NULL);
}