#include "jabberd.h"
extern pool jabberd__runtime;
#define A_ERROR -1
#define A_READY 1
typedef struct queue_struct
{
int stamp;
xmlnode x;
struct queue_struct *next;
} *queue, _queue;
typedef struct accept_instance_st
{
mio m;
int state;
char *id;
pool p;
instance i;
char *ip;
char *secret;
int port;
int timeout;
int restrict_var;
xdbcache offline;
jid offjid;
queue q;
} *accept_instance, _accept_instance;
void base_accept_queue(accept_instance ai, xmlnode x)
{
queue q;
if(ai == NULL || x == NULL) return;
q = pmalloco(xmlnode_pool(x),sizeof(_queue));
q->stamp = time(NULL);
q->x = x;
q->next = ai->q;
ai->q = q;
}
result base_accept_deliver(instance i, dpacket p, void* arg)
{
accept_instance ai = (accept_instance)arg;
if (ai->state == A_READY)
{
mio_write(ai->m, p->x, NULL, 0);
return r_DONE;
}
base_accept_queue(ai, p->x);
return r_DONE;
}
void base_accept_process_xml(mio m, int state, void* arg, xmlnode x)
{
accept_instance ai = (accept_instance)arg;
xmlnode cur, off;
queue q, q2;
char hashbuf[41];
jpacket jp;
log_debug(ZONE, "process XML: m:%X state:%d, arg:%X, x:%X", m, state, arg, x);
switch(state)
{
case MIO_XML_ROOT:
if (j_strcmp(xmlnode_get_attrib(x, "xmlns"), "jabber:component:accept") != 0)
{
log_warn(ai->i->id, "Recv'd invalid namespace. Closing connection.");
mio_write(m, NULL, SERROR_NAMESPACE, -1);
mio_close(m);
break;
}
cur = xstream_header("jabber:component:accept", NULL, ai->i->id);
ai->id = pstrdup(ai->p, xmlnode_get_attrib(cur, "id"));
mio_write(m, NULL, xstream_header_char(cur), -1);
xmlnode_free(cur);
break;
case MIO_XML_NODE:
if(ai->state == A_READY && m == ai->m)
{
xmlnode_hide_attrib(x, "etherx:to");
xmlnode_hide_attrib(x, "etherx:from");
if(ai->restrict_var)
{
jp = jpacket_new(x);
if(jp->type == JPACKET_UNKNOWN || jp->to == NULL || jp->from == NULL || deliver_hostcheck(jp->from->server) != ai->i)
{
jutil_error(x,TERROR_INTERNAL);
mio_write(m,x,NULL,0);
return;
}
}
deliver(dpacket_new(x), ai->i);
return;
}
if(j_strcmp(xmlnode_get_name(x), "handshake") != 0)
{
mio_write(m, NULL, "<stream:error>Must send handshake first.</stream:error>", -1);
mio_close(m);
break;
}
shahash_r(spools(xmlnode_pool(x), ai->id, ai->secret, xmlnode_pool(x)), hashbuf);
if(j_strcmp(hashbuf, xmlnode_get_data(x)) != 0)
{
mio_write(m, NULL, "<stream:error>Invalid handshake</stream:error>", -1);
mio_close(m);
}
mio_write(m, NULL, "<handshake/>", -1);
if(ai->m != NULL)
{
log_warn(ai->i->id, "Socket override by another connection from %s",mio_ip(m));
mio_write(ai->m, NULL, "<stream:error>Socket override by another connection.</stream:error>", -1);
mio_close(ai->m);
}
ai->m = m;
ai->state = A_READY;
if(ai->offline != NULL)
{
off = xdb_get(ai->offline, ai->offjid, "base:accept:offline");
for(cur = xmlnode_get_firstchild(off); cur != NULL; cur = xmlnode_get_nextsibling(cur))
{
mio_write(m,xmlnode_dup(cur),NULL,0);
xmlnode_hide(cur);
}
xdb_set(ai->offline, ai->offjid, "base:accept:offline", off);
xmlnode_free(off);
}
q = ai->q;
while(q != NULL)
{
q2 = q->next;
mio_write(m, q->x, NULL, 0);
q = q2;
}
ai->q = NULL;
break;
case MIO_ERROR:
if(m != ai->m)
return;
ai->state = A_ERROR;
while((cur = mio_cleanup(m)) != NULL)
deliver_fail(dpacket_new(cur), "External Server Error");
return;
case MIO_CLOSED:
if(m != ai->m)
return;
log_debug(ZONE,"closing accepted socket");
ai->m = NULL;
ai->state = A_ERROR;
return;
default:
return;
}
xmlnode_free(x);
}
void base_accept_offline(accept_instance ai, xmlnode x)
{
jpacket p;
if(ai->offline == NULL)
{
deliver_fail(dpacket_new(x),"Internal Timeout");
return;
}
p = jpacket_new(x);
switch(p->type)
{
case JPACKET_MESSAGE:
case JPACKET_S10N:
if(xdb_act(ai->offline, ai->offjid, "base:accept:offline", "insert", NULL, x) == 0)
{
xmlnode_free(x);
return;
}
break;
default:
break;
}
deliver_fail(dpacket_new(x),"Internal Timeout");
}
result base_accept_beat(void *arg)
{
accept_instance ai = (accept_instance)arg;
queue bouncer, lastgood, cur, next;
int now = time(NULL);
cur = ai->q;
bouncer = lastgood = NULL;
while(cur != NULL)
{
if( (now - cur->stamp) <= ai->timeout)
{
lastgood = cur;
cur = cur->next;
continue;
}
next = cur->next;
if(lastgood == NULL)
ai->q = next;
else
lastgood->next = next;
cur->next = bouncer;
bouncer = cur;
cur = next;
}
while(bouncer != NULL)
{
next = bouncer->next;
base_accept_offline(ai, bouncer->x);
bouncer = next;
}
return r_DONE;
}
result base_accept_config(instance id, xmlnode x, void *arg)
{
char *secret = xmlnode_get_tag_data(x, "secret");
accept_instance inst;
int port = j_atoi(xmlnode_get_tag_data(x, "port"),0);
if(id == NULL)
{
log_debug(ZONE,"base_accept_config validating configuration...");
if (port == 0 || (xmlnode_get_tag(x,"secret") == NULL))
{
xmlnode_put_attrib(x,"error","<accept> requires the following subtags: <port>, and <secret>");
return r_ERR;
}
return r_PASS;
}
log_debug(ZONE,"base_accept_config performing configuration %s\n",xmlnode2str(x));
inst = pmalloco(id->p, sizeof(_accept_instance));
inst->p = id->p;
inst->i = id;
inst->secret = secret;
inst->ip = xmlnode_get_tag_data(x,"ip");
inst->port = port;
inst->timeout = j_atoi(xmlnode_get_tag_data(x, "timeout"),10);
if(xmlnode_get_tag(x,"restrict") != NULL)
inst->restrict_var = 1;
if(xmlnode_get_tag(x,"offline") != NULL)
{
inst->offline = xdb_cache(id);
inst->offjid = jid_new(id->p,id->id);
}
if(mio_listen(inst->port, inst->ip, base_accept_process_xml, (void*)inst, NULL, mio_handlers_new(NULL, NULL, MIO_XML_PARSER)) == NULL)
{
xmlnode_put_attrib(x,"error","<accept> unable to listen on the configured ip and port");
return r_ERR;
}
register_phandler(id, o_DELIVER, base_accept_deliver, (void *)inst);
register_beat(inst->timeout, base_accept_beat, (void *)inst);
return r_DONE;
}
void base_accept(void)
{
log_debug(ZONE,"base_accept loading...\n");
register_config("accept",base_accept_config,NULL);
}