#include "router.h"
#define MAX_JID 3072 // node(1023) + '@'(1) + domain(1023) + '/'(1) + resource(1023) + '\0'(1)
#define MAX_MESSAGE 65535
#define SECS_PER_DAY 86400
#define BYTES_PER_MEG 1048576
typedef struct broadcast_st {
router_t r;
component_t src;
nad_t nad;
} *broadcast_t;
static void _router_broadcast(xht routes, const char *key, void *val, void *arg) {
broadcast_t bc = (broadcast_t) arg;
component_t comp = (component_t) val;
if(comp == bc->src || comp->legacy)
return;
sx_nad_write(comp->s, nad_copy(bc->nad));
}
static void _router_advertise(router_t r, char *domain, component_t src, int unavail) {
struct broadcast_st bc;
int ns;
log_debug(ZONE, "advertising %s to all routes (unavail=%d)", domain, unavail);
bc.r = r;
bc.src = src;
bc.nad = nad_new(src->s->nad_cache);
ns = nad_add_namespace(bc.nad, uri_COMPONENT, NULL);
nad_append_elem(bc.nad, ns, "presence", 0);
nad_append_attr(bc.nad, -1, "from", domain);
if(unavail)
nad_append_attr(bc.nad, -1, "type", "unavailable");
xhash_walk(r->routes, _router_broadcast, (void *) &bc);
nad_free(bc.nad);
}
static void _router_advertise_reverse(xht routes, const char *key, void *val, void *arg) {
component_t dest = (component_t) arg, comp = (component_t) val;
int ns;
nad_t nad;
if(comp == dest)
return;
log_debug(ZONE, "informing component about %s", key);
nad = nad_new(dest->s->nad_cache);
ns = nad_add_namespace(nad, uri_COMPONENT, NULL);
nad_append_elem(nad, ns, "presence", 0);
nad_append_attr(nad, -1, "from", (char *) key);
sx_nad_write(dest->s, nad);
}
static void _router_process_handshake(component_t comp, nad_t nad) {
char *hash;
int hashlen;
if(NAD_CDATA_L(nad, 0) != 40) {
log_debug(ZONE, "handshake isn't long enough to be a sha1 hash");
sx_error(comp->s, stream_err_NOT_AUTHORIZED, "handshake isn't long enough to be a sha1 hash");
sx_close(comp->s);
nad_free(nad);
return;
}
hashlen = strlen(comp->s->id) + strlen(comp->r->local_secret) + 1;
if(hashlen < 41)
hashlen = 41;
hash = (char *) malloc(sizeof(char) * hashlen);
sprintf(hash, "%s%s", comp->s->id, comp->r->local_secret);
shahash_r(hash, hash);
log_debug(ZONE, "checking their hash %.*s against our hash %s", 40, NAD_CDATA(nad, 0), hash);
if(strncmp(hash, NAD_CDATA(nad, 0), 40) == 0) {
log_debug(ZONE, "handshake succeeded");
free(hash);
nad->elems[0].icdata = nad->elems[0].itail = -1;
nad->elems[0].lcdata = nad->elems[0].ltail = 0;
sx_nad_write(comp->s, nad);
sx_auth(comp->s, "handshake", comp->s->req_to);
return;
}
log_debug(ZONE, "auth failed");
free(hash);
sx_error(comp->s, stream_err_NOT_AUTHORIZED, "hash didn't match, auth failed");
sx_close(comp->s);
nad_free(nad);
}
static void _router_process_bind(component_t comp, nad_t nad) {
int attr;
jid_t name;
alias_t alias;
char *user, *c;
attr = nad_find_attr(nad, 0, -1, "name", NULL);
if(attr < 0 || (name = jid_new(comp->r->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) {
log_debug(ZONE, "no or invalid 'name' on bind packet, bouncing");
nad_set_attr(nad, 0, -1, "error", "400", 3);
sx_nad_write(comp->s, nad);
return;
}
user = strdup(comp->s->auth_id);
c = strchr(user, '@');
if(c != NULL) *c = '\0';
if(strcmp(user, name->domain) != 0 && !aci_check(comp->r->aci, "bind", user)) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to bind '%s', but their username (%s) is not permitted to bind other names", comp->ip, comp->port, name->domain, user);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
nad_set_attr(nad, 0, -1, "error", "403", 3);
sx_nad_write(comp->s, nad);
jid_free(name);
free(user);
return;
}
if(xhash_get(comp->r->routes, name->domain) != NULL) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to bind '%s', but it's already bound", comp->ip, comp->port, name->domain);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
nad_set_attr(nad, 0, -1, "error", "409", 3);
sx_nad_write(comp->s, nad);
jid_free(name);
free(user);
return;
}
for(alias = comp->r->aliases; alias != NULL; alias = alias->next)
if(strcmp(alias->name, name->domain) == 0) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to bind '%s', but that name is aliased", comp->ip, comp->port);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
nad_set_attr(nad, 0, -1, "error", "409", 3);
sx_nad_write(comp->s, nad);
jid_free(name);
free(user);
return;
}
if(nad_find_elem(nad, 0, NAD_ENS(nad, 0), "default", 1) >= 0) {
if(!aci_check(comp->r->aci, "default-route", user)) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to bind '%s' as the default route, but their username (%s) is not permitted to set a default route", comp->ip, comp->port, name->domain, user);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
nad_set_attr(nad, 0, -1, "error", "403", 3);
sx_nad_write(comp->s, nad);
jid_free(name);
free(user);
return;
}
if(comp->r->default_route != NULL) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to bind '%s' as the default route, but one already exists", comp->ip, comp->port, name->domain);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
nad_set_attr(nad, 0, -1, "error", "409", 3);
sx_nad_write(comp->s, nad);
jid_free(name);
return;
}
log_write(comp->r->log, LOG_NOTICE, "[%s] set as default route", name->domain);
comp->r->default_route = strdup(name->domain);
}
if(nad_find_elem(nad, 0, NAD_ENS(nad, 0), "log", 1) >= 0) {
if(!aci_check(comp->r->aci, "log", user)) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to bind '%s' as a log sink, but their username (%s) is not permitted to do this", comp->ip, comp->port, name->domain, user);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
nad_set_attr(nad, 0, -1, "error", "403", 3);
sx_nad_write(comp->s, nad);
jid_free(name);
free(user);
return;
}
log_write(comp->r->log, LOG_NOTICE, "[%s] set as log sink", name->domain);
xhash_put(comp->r->log_sinks, pstrdup(xhash_pool(comp->r->log_sinks), name->domain), (void *) comp);
}
free(user);
xhash_put(comp->r->routes, pstrdup(xhash_pool(comp->r->routes), name->domain), (void *) comp);
xhash_put(comp->routes, pstrdup(xhash_pool(comp->routes), name->domain), (void *) comp);
log_write(comp->r->log, LOG_NOTICE, "[%s] online (bound to %s, port %d)", name->domain, comp->ip, comp->port);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
sx_nad_write(comp->s, nad);
_router_advertise(comp->r, name->domain, comp, 0);
xhash_walk(comp->r->routes, _router_advertise_reverse, (void *) comp);
for(alias = comp->r->aliases; alias != NULL; alias = alias->next) {
if(strcmp(alias->target, name->domain) == 0) {
xhash_put(comp->r->routes, pstrdup(xhash_pool(comp->r->routes), alias->name), (void *) comp);
xhash_put(comp->routes, pstrdup(xhash_pool(comp->r->routes), alias->name), (void *) comp);
log_write(comp->r->log, LOG_NOTICE, "[%s] online (alias of '%s', bound to %s, port %d)", alias->name, name->domain, comp->ip, comp->port);
_router_advertise(comp->r, alias->name, comp, 0);
}
}
jid_free(name);
}
static void _router_process_unbind(component_t comp, nad_t nad) {
int attr;
jid_t name;
attr = nad_find_attr(nad, 0, -1, "name", NULL);
if(attr < 0 || (name = jid_new(comp->r->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) {
log_debug(ZONE, "no or invalid 'name' on unbind packet, bouncing");
nad_set_attr(nad, 0, -1, "error", "400", 3);
sx_nad_write(comp->s, nad);
return;
}
if(xhash_get(comp->routes, name->domain) == NULL) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to unbind '%s', but it's not bound to this component", comp->ip, comp->port, name->domain);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
nad_set_attr(nad, 0, -1, "error", "404", 3);
sx_nad_write(comp->s, nad);
jid_free(name);
return;
}
xhash_zap(comp->r->log_sinks, name->domain);
xhash_zap(comp->r->routes, name->domain);
xhash_zap(comp->routes, name->domain);
if(comp->r->default_route != NULL && strcmp(comp->r->default_route, name->domain) == 0) {
log_write(comp->r->log, LOG_NOTICE, "[%s] default route offline", name->domain);
free(comp->r->default_route);
comp->r->default_route = NULL;
}
log_write(comp->r->log, LOG_NOTICE, "[%s] offline", name->domain);
nad_set_attr(nad, 0, -1, "name", NULL, 0);
sx_nad_write(comp->s, nad);
_router_advertise(comp->r, name->domain, comp, 1);
jid_free(name);
}
static void _router_comp_write(component_t comp, nad_t nad) {
int attr;
if(comp->tq != NULL) {
log_debug(ZONE, "%s port %d is throttled, jqueueing packet", comp->ip, comp->port);
jqueue_push(comp->tq, nad, 0);
return;
}
if(!comp->legacy) {
sx_nad_write(comp->s, nad);
return;
}
log_debug(ZONE, "packet for legacy component, munging");
attr = nad_find_attr(nad, 0, -1, "error", NULL);
if(attr >= 0) {
if(NAD_AVAL_L(nad, attr) == 3 && strncmp("400", NAD_AVAL(nad, attr), 3) == 0)
stanza_error(nad, 1, stanza_err_BAD_REQUEST);
else
stanza_error(nad, 1, stanza_err_SERVICE_UNAVAILABLE);
}
sx_nad_write_elem(comp->s, nad, 1);
}
static void _router_route_log_sink(xht log_sinks, const char *key, void *val, void *arg) {
component_t comp = (component_t) val;
nad_t nad = (nad_t) arg;
log_debug(ZONE, "copying route to '%s' (%s, port %d)", key, comp->ip, comp->port);
_router_comp_write(comp, nad_copy(nad));
}
static void _router_process_route(component_t comp, nad_t nad) {
int atype, ato, afrom;
struct jid_st sto, sfrom;
jid_t to = NULL, from = NULL;
component_t target;
union xhashv xhv;
int i;
int attr_msg_to;
int attr_msg_from;
int attr_route_to;
int attr_route_from;
jid_t jid_msg_from = NULL;
jid_t jid_msg_to = NULL;
jid_t jid_route_from = NULL;
jid_t jid_route_to = NULL;
if(nad_find_attr(nad, 0, -1, "error", NULL) >= 0) {
log_debug(ZONE, "dropping error packet, trying to avoid loops");
nad_free(nad);
return;
}
atype = nad_find_attr(nad, 0, -1, "type", NULL);
ato = nad_find_attr(nad, 0, -1, "to", NULL);
afrom = nad_find_attr(nad, 0, -1, "from", NULL);
sto.pc = sfrom.pc = comp->r->pc;
if(ato >= 0) to = jid_reset(&sto, NAD_AVAL(nad, ato), NAD_AVAL_L(nad, ato));
if(afrom >= 0) from = jid_reset(&sfrom, NAD_AVAL(nad, afrom), NAD_AVAL_L(nad, afrom));
if(atype < 0) {
if(to == NULL || from == NULL) {
log_debug(ZONE, "unicast route with missing or invalid to or from, bouncing");
nad_set_attr(nad, 0, -1, "error", "400", 3);
_router_comp_write(comp, nad);
if (to != NULL)
jid_free(to);
if (from != NULL)
jid_free(from);
return;
}
log_debug(ZONE, "unicast route from %s to %s", from->domain, to->domain);
if(xhash_get(comp->routes, from->domain) == NULL) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to send a packet from '%s', but that name is not bound to this component", comp->ip, comp->port, from->domain);
nad_set_attr(nad, 0, -1, "error", "401", 3);
_router_comp_write(comp, nad);
jid_free(to);
jid_free(from);
return;
}
target = xhash_get(comp->r->routes, to->domain);
if(target == NULL) {
if(comp->r->default_route != NULL && strcmp(from->domain, comp->r->default_route) == 0) {
log_debug(ZONE, "%s is unbound, bouncing", from->domain);
nad_set_attr(nad, 0, -1, "error", "404", 3);
_router_comp_write(comp, nad);
jid_free(to);
jid_free(from);
return;
}
target = xhash_get(comp->r->routes, comp->r->default_route);
}
if(target == NULL) {
log_debug(ZONE, "%s is unbound, and no default route, bouncing", to->domain);
nad_set_attr(nad, 0, -1, "error", "404", 3);
_router_comp_write(comp, nad);
jid_free(to);
jid_free(from);
return;
}
if(xhash_count(comp->r->log_sinks) > 0)
xhash_walk(comp->r->log_sinks, _router_route_log_sink, (void *) nad);
log_debug(ZONE, "writing route for '%s' to %s, port %d", to->domain, target->ip, target->port);
if (comp->r->message_logging_enabled) {
if ((NAD_ENAME_L(nad, 1) == 7 && strncmp("message", NAD_ENAME(nad, 1), 7) == 0) && ((attr_route_from = nad_find_attr(nad, 0, -1, "from", NULL)) >= 0) &&
((nad_find_attr(nad, 1, -1, "type", "chat") >= 0) || ((comp->r->log_group_chats) && ((strncmp(NAD_AVAL(nad, attr_route_from), comp->r->filter_muc_messages_from, strlen(comp->r->filter_muc_messages_from)) != 0) &&
(nad_find_attr(nad, 1, -1, "type", "groupchat") >= 0)))) && ((attr_route_to = nad_find_attr(nad, 0, -1, "to", NULL)) >= 0) &&
((strncmp(NAD_AVAL(nad, attr_route_to), "c2s", 3)) != 0) && ((jid_route_from = jid_new(comp->r->pc, NAD_AVAL(nad, attr_route_from), NAD_AVAL_L(nad, attr_route_from))) != NULL) && ((jid_route_to = jid_new(comp->r->pc, NAD_AVAL(nad, attr_route_to), NAD_AVAL_L(nad, attr_route_to))) != NULL) && ((attr_msg_from = nad_find_attr(nad, 1, -1, "from", NULL)) >= 0) &&
((attr_msg_to = nad_find_attr(nad, 1, -1, "to", NULL)) >= 0) &&
((jid_msg_from = jid_new(comp->r->pc, NAD_AVAL(nad, attr_msg_from), NAD_AVAL_L(nad, attr_msg_from))) != NULL) && ((jid_msg_to = jid_new(comp->r->pc, NAD_AVAL(nad, attr_msg_to), NAD_AVAL_L(nad, attr_msg_to))) != NULL)) {
message_log(nad, comp->r, jid_full(jid_msg_from), jid_full(jid_msg_to));
}
if (jid_msg_from != NULL)
jid_free(jid_msg_from);
if (jid_msg_to != NULL)
jid_free(jid_msg_to);
if (jid_route_from != NULL)
jid_free(jid_route_from);
if (jid_route_to != NULL)
jid_free(jid_route_to);
}
_router_comp_write(target, nad);
return;
}
if(NAD_AVAL_L(nad, atype) == 9 && strncmp("broadcast", NAD_AVAL(nad, atype), 9) == 0) {
if(from == NULL) {
log_debug(ZONE, "broadcast route with missing or invalid from, bouncing");
nad_set_attr(nad, 0, -1, "error", "400", 3);
_router_comp_write(comp, nad);
if (to != NULL)
jid_free(to);
return;
}
log_debug(ZONE, "broadcast route from %s", from->domain);
if(xhash_get(comp->routes, from->domain) == NULL) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to send a packet from '%s', but that name is not bound to this component", comp->ip, comp->port, from->domain);
nad_set_attr(nad, 0, -1, "error", "401", 3);
_router_comp_write(comp, nad);
if (to != NULL)
jid_free(to);
if (from != NULL)
jid_free(from);
return;
}
if(xhash_iter_first(comp->r->components))
do {
xhv.comp_val = ⌖
xhash_iter_get(comp->r->components, NULL, xhv.val);
if(target != comp) {
log_debug(ZONE, "writing broadcast to %s, port %d", target->ip, target->port);
_router_comp_write(target, nad_copy(nad));
}
} while(xhash_iter_next(comp->r->components));
if (to != NULL)
jid_free(to);
if (from != NULL)
jid_free(from);
nad_free(nad);
return;
}
log_debug(ZONE, "unknown route type '%.*s', dropping", NAD_AVAL_L(nad, atype), NAD_AVAL(nad, atype));
if (to != NULL)
jid_free(to);
if (from != NULL)
jid_free(from);
nad_free(nad);
}
static void _router_process_throttle(component_t comp, nad_t nad) {
jqueue_t tq;
nad_t pkt;
if(comp->tq == NULL) {
_router_comp_write(comp, nad);
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] throttling packets on request", comp->ip, comp->port);
comp->tq = jqueue_new();
}
else {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] unthrottling packets on request", comp->ip, comp->port);
tq = comp->tq;
comp->tq = NULL;
_router_comp_write(comp, nad);
while((pkt = jqueue_pull(tq)) != NULL)
_router_comp_write(comp, pkt);
jqueue_free(tq);
}
}
static int _router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
component_t comp = (component_t) arg;
sx_buf_t buf = (sx_buf_t) data;
int rlen, len, attr, ns, sns;
sx_error_t *sxe;
nad_t nad;
struct jid_st sto, sfrom;
jid_t to, from;
alias_t alias;
switch(e) {
case event_WANT_READ:
log_debug(ZONE, "want read");
mio_read(comp->r->mio, comp->fd);
break;
case event_WANT_WRITE:
log_debug(ZONE, "want write");
mio_write(comp->r->mio, comp->fd);
break;
case event_READ:
log_debug(ZONE, "reading from %d", comp->fd);
if(comp->rate != NULL) {
if(rate_check(comp->rate) == 0) {
if(!comp->rate_log) {
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] is being byte rate limited", comp->ip, comp->port);
comp->rate_log = 1;
}
log_debug(ZONE, "%d is throttled, delaying read", comp->fd);
buf->len = 0;
return 0;
}
rlen = rate_left(comp->rate);
if(rlen > buf->len)
rlen = buf->len;
}
else
rlen = buf->len;
len = recv(comp->fd, buf->data, rlen, 0);
if(comp->rate != NULL && len > 0) {
comp->rate_log = 0;
rate_add(comp->rate, len);
}
if(len < 0) {
if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN) {
buf->len = 0;
return 0;
}
log_debug(ZONE, "read failed: %s", strerror(errno));
sx_kill(comp->s);
return -1;
}
else if(len == 0) {
sx_kill(comp->s);
return -1;
}
log_debug(ZONE, "read %d bytes", len);
buf->len = len;
return len;
case event_WRITE:
log_debug(ZONE, "writing to %d", comp->fd);
len = send(comp->fd, buf->data, buf->len, 0);
if(len >= 0) {
log_debug(ZONE, "%d bytes written", len);
return len;
}
if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN)
return 0;
log_debug(ZONE, "write failed: %s", strerror(errno));
sx_kill(comp->s);
return -1;
case event_ERROR:
sxe = (sx_error_t *) data;
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] error: %s (%s)", comp->ip, comp->port, sxe->generic, sxe->specific);
break;
case event_STREAM:
if(s->ns == NULL || strcmp("jabber:component:accept", s->ns) != 0)
return 0;
comp->legacy = 1;
if(comp->r->local_secret == NULL) {
sx_error(s, stream_err_INVALID_NAMESPACE, "support for legacy components not available");
sx_close(s);
return 0;
}
if(s->req_to == NULL) {
sx_error(s, stream_err_HOST_UNKNOWN, "no 'to' attribute on stream header");
sx_close(s);
return 0;
}
break;
case event_OPEN:
log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] authenticated as %s", comp->ip, comp->port, comp->s->auth_id);
if(comp->legacy) {
if(xhash_get(comp->r->routes, s->req_to) != NULL) {
sx_error(s, stream_err_HOST_UNKNOWN, "requested name is already in use");
sx_close(s);
return 0;
}
for(alias = comp->r->aliases; alias != NULL; alias = alias->next)
if(strcmp(alias->name, s->req_to) == 0) {
sx_error(s, stream_err_HOST_UNKNOWN, "requested name is already in use");
sx_close(s);
return 0;
}
xhash_put(comp->r->routes, pstrdup(xhash_pool(comp->r->routes), s->req_to), (void *) comp);
xhash_put(comp->routes, pstrdup(xhash_pool(comp->routes), s->req_to), (void *) comp);
log_write(comp->r->log, LOG_NOTICE, "[%s] online (bound to %s, port %d)", s->req_to, comp->ip, comp->port);
_router_advertise(comp->r, s->req_to, comp, 0);
for(alias = comp->r->aliases; alias != NULL; alias = alias->next) {
if(strcmp(alias->target, s->req_to) == 0) {
xhash_put(comp->r->routes, pstrdup(xhash_pool(comp->r->routes), alias->name), (void *) comp);
xhash_put(comp->routes, pstrdup(xhash_pool(comp->r->routes), alias->name), (void *) comp);
log_write(comp->r->log, LOG_NOTICE, "[%s] online (alias of '%s', bound to %s, port %d)", alias->name, s->req_to, comp->ip, comp->port);
_router_advertise(comp->r, alias->name, comp, 0);
}
}
}
break;
case event_PACKET:
nad = (nad_t) data;
if(comp->s->state == state_STREAM) {
if(!comp->legacy) {
log_debug(ZONE, "stream is preauth, dropping packet");
nad_free(nad);
return 0;
}
if(NAD_ENAME_L(nad, 0) != 9 || strncmp("handshake", NAD_ENAME(nad, 0), NAD_ENAME_L(nad, 0)) != 0) {
log_debug(ZONE, "unknown preauth packet %.*s, dropping", NAD_ENAME_L(nad, 0), NAD_ENAME(nad, 0));
nad_free(nad);
return 0;
}
_router_process_handshake(comp, nad);
return 0;
}
if(comp->legacy) {
log_debug(ZONE, "packet from legacy component, munging it");
sto.pc = sfrom.pc = comp->r->pc;
attr = nad_find_attr(nad, 0, -1, "to", NULL);
if(attr < 0 || (to = jid_reset(&sto, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) {
log_debug(ZONE, "invalid or missing 'to' address on legacy packet, dropping it");
nad_free(nad);
return 0;
}
attr = nad_find_attr(nad, 0, -1, "from", NULL);
if(attr < 0 || (from = jid_reset(&sfrom, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) {
log_debug(ZONE, "invalid or missing 'from' address on legacy packet, dropping it");
nad_free(nad);
return 0;
}
ns = nad_find_namespace(nad, 0, "jabber:component:accept", NULL);
if(ns >= 0) {
if(nad->elems[0].ns == ns)
nad->elems[0].ns = nad->nss[nad->elems[0].ns].next;
else {
for(sns = nad->elems[0].ns; sns >= 0 && nad->nss[sns].next != ns; sns = nad->nss[sns].next);
nad->nss[sns].next = nad->nss[nad->nss[sns].next].next;
}
}
ns = nad_find_namespace(nad, 0, uri_CLIENT, NULL);
if(ns < 0) {
ns = nad_add_namespace(nad, uri_CLIENT, NULL);
nad->scope = -1;
nad->nss[ns].next = nad->elems[0].ns;
nad->elems[0].ns = ns;
}
nad->elems[0].my_ns = ns;
ns = nad_add_namespace(nad, uri_COMPONENT, "comp");
nad_wrap_elem(nad, 0, ns, "route");
nad_set_attr(nad, 0, -1, "to", to->domain, 0);
nad_set_attr(nad, 0, -1, "from", from->domain, 0);
}
if(NAD_NURI_L(nad, NAD_ENS(nad, 0)) != strlen(uri_COMPONENT) || strncmp(uri_COMPONENT, NAD_NURI(nad, NAD_ENS(nad, 0)), strlen(uri_COMPONENT)) != 0) {
log_debug(ZONE, "invalid packet namespace, dropping");
nad_free(nad);
return 0;
}
if(NAD_ENAME_L(nad, 0) == 4 && strncmp("bind", NAD_ENAME(nad, 0), 4) == 0) {
_router_process_bind(comp, nad);
return 0;
}
if(NAD_ENAME_L(nad, 0) == 6 && strncmp("unbind", NAD_ENAME(nad, 0), 6) == 0) {
_router_process_unbind(comp, nad);
return 0;
}
if(NAD_ENAME_L(nad, 0) == 5 && strncmp("route", NAD_ENAME(nad, 0), 5) == 0) {
_router_process_route(comp, nad);
return 0;
}
if(NAD_ENAME_L(nad, 0) == 8 && strncmp("throttle", NAD_ENAME(nad, 0), 8) == 0) {
_router_process_throttle(comp, nad);
return 0;
}
log_debug(ZONE, "unknown packet, dropping");
nad_free(nad);
return 0;
case event_CLOSED:
mio_close(comp->r->mio, comp->fd);
break;
}
return 0;
}
static int _router_accept_check(router_t r, int fd, char *ip) {
rate_t rt;
if(access_check(r->access, ip) == 0) {
log_write(r->log, LOG_NOTICE, "[%d] [%s] access denied by configuration", fd, ip);
return 1;
}
if(r->conn_rate_total != 0) {
rt = (rate_t) xhash_get(r->conn_rates, ip);
if(rt == NULL) {
rt = rate_new(r->conn_rate_total, r->conn_rate_seconds, r->conn_rate_wait);
xhash_put(r->conn_rates, pstrdup(xhash_pool(r->conn_rates), ip), (void *) rt);
}
if(rate_check(rt) == 0) {
log_write(r->log, LOG_NOTICE, "[%d] [%s] is being rate limited", fd, ip);
return 1;
}
rate_add(rt, 1);
}
return 0;
}
static void _router_route_unbind_walker(xht routes, const char *key, void *val, void *arg) {
component_t comp = (component_t) arg;
xhash_zap(comp->r->log_sinks, key);
xhash_zap(comp->r->routes, key);
xhash_zap(comp->routes, key);
if(comp->r->default_route != NULL && strcmp(key, comp->r->default_route) == 0) {
log_write(comp->r->log, LOG_NOTICE, "[%s] default route offline", key);
free(comp->r->default_route);
comp->r->default_route = NULL;
}
log_write(comp->r->log, LOG_NOTICE, "[%s] offline", key);
_router_advertise(comp->r, (char *) key, comp, 1);
}
int router_mio_callback(mio_t m, mio_action_t a, int fd, void *data, void *arg) {
component_t comp = (component_t) arg;
router_t r = (router_t) arg;
struct sockaddr_storage sa;
int namelen = sizeof(sa), port, nbytes;
switch(a) {
case action_READ:
log_debug(ZONE, "read action on fd %d", fd);
comp->last_activity = time(NULL);
ioctl(fd, FIONREAD, &nbytes);
if(nbytes == 0) {
sx_kill(comp->s);
return 0;
}
return sx_can_read(comp->s);
case action_WRITE:
log_debug(ZONE, "write action on fd %d", fd);
comp->last_activity = time(NULL);
return sx_can_write(comp->s);
case action_CLOSE:
log_debug(ZONE, "close action on fd %d", fd);
r = comp->r;
log_write(r->log, LOG_NOTICE, "[%s, port=%d] disconnect", comp->ip, comp->port);
xhash_walk(comp->routes, _router_route_unbind_walker, (void *) comp);
xhash_zap(r->components, comp->ipport);
xhash_free(comp->routes);
if(comp->tq != NULL)
jqueue_free(comp->tq);
rate_free(comp->rate);
jqueue_push(comp->r->dead, (void *) comp->s, 0);
free(comp);
break;
case action_ACCEPT:
log_debug(ZONE, "accept action on fd %d", fd);
getpeername(fd, (struct sockaddr *) &sa, &namelen);
port = j_inet_getport(&sa);
log_write(r->log, LOG_NOTICE, "[%s, port=%d] connect", (char *) data, port);
if(_router_accept_check(r, fd, (char *) data) != 0)
return 1;
comp = (component_t) malloc(sizeof(struct component_st));
memset(comp, 0, sizeof(struct component_st));
comp->r = r;
comp->fd = fd;
snprintf(comp->ip, INET6_ADDRSTRLEN, "%s", (char *) data);
comp->port = port;
snprintf(comp->ipport, INET6_ADDRSTRLEN, "%s:%d", comp->ip, comp->port);
comp->s = sx_new(r->sx_env, fd, _router_sx_callback, (void *) comp);
mio_app(m, fd, router_mio_callback, (void *) comp);
if(r->byte_rate_total != 0)
comp->rate = rate_new(r->byte_rate_total, r->byte_rate_seconds, r->byte_rate_wait);
comp->routes = xhash_new(51);
xhash_put(r->components, comp->ipport, (void *) comp);
#ifdef HAVE_SSL
sx_server_init(comp->s, SX_SSL_STARTTLS_OFFER | SX_SASL_OFFER);
#else
sx_server_init(comp->s, SX_SASL_OFFER);
#endif
break;
}
return 0;
}
int message_log(nad_t nad, router_t r, char *msg_from, char *msg_to)
{
time_t t;
char *time_pos;
int time_sz;
struct stat filestat;
FILE *message_file;
short int new_msg_file = 0;
int i;
int nad_body_len = 0;
long int nad_body_start = 0;
int body_count;
char *nad_body = NULL;
char body[MAX_MESSAGE*2];
assert((int) nad);
t = time(NULL);
time_pos = ctime(&t);
time_sz = strlen(time_pos);
time_pos[time_sz-1]=' ';
for (i = 0; NAD_ENAME_L(nad, i) > 0; i++)
{
if((NAD_ENAME_L(nad, i) == 4) && (strncmp("body", NAD_ENAME(nad, i), 4) == 0))
{
nad_body_len = NAD_CDATA_L(nad, i);
if (nad_body_len > 0) {
nad_body = NAD_CDATA(nad, i);
} else {
log_write(r->log, LOG_NOTICE, "message_log received a message with empty body");
return 0;
}
break;
}
}
if (nad_body == NULL) {
return 0;
}
nad_body_start = nad_body;
for (body_count = 0; (nad_body < nad_body_start + nad_body_len) && (body_count < (MAX_MESSAGE*2)-3); nad_body++) {
if (*nad_body == '\n') {
body[body_count++] = '\\';
body[body_count++] = 'n';
} else {
body[body_count++] = *nad_body;
}
}
body[body_count] = '\0';
umask((mode_t) 0077);
if (stat(r->message_logging_fullpath, &filestat)) {
new_msg_file = 1;
}
if ((message_file = fopen(r->message_logging_fullpath, "a")) == NULL)
{
log_write(r->log, LOG_ERR, "Unable to open message log for writing: %s", strerror(errno));
return 1;
}
if (new_msg_file) {
if (! fprintf(message_file, "# This message log is created by the jabberd router.\n"))
{
log_write(r->log, LOG_ERR, "Unable to write to message log: %s", strerror(errno));
return 1;
}
fprintf(message_file, "# See router.xml for logging options.\n");
fprintf(message_file, "# Format: (Date)<tab>(From JID)<tab>(To JID)<tab>(Message Body)<line end>\n");
}
if (! fprintf(message_file, "%s\t%s\t%s\t%s\n", time_pos, msg_from, msg_to, body))
{
log_write(r->log, LOG_ERR, "Unable to write to message log: %s", strerror(errno));
return 1;
}
fclose(message_file);
return 0;
}
int roll_message_log(router_t r)
{
char logfile_fullpath[243] = {0}; char logfile_compressed_path[255] = {0}; char logfile_uncompressed_path[243] = {0}; char logfile_compressed_path_new[255] = {0}; char logfile_uncompressed_path_new[255] = {0}; char gzip_command[640] = {0};
struct stat64 filestat;
struct stat64 compressed_stat;
short int bool_time_to_roll = 0;
long curr_time = time(NULL);
int oldest_log_file;
int num_old_files;
pid_t pid;
int i;
FILE *message_file;
snprintf(logfile_fullpath, sizeof(logfile_fullpath), "%s/%s", r->message_logging_dir, r->message_logging_file);
if (stat64(logfile_fullpath, &filestat))
{
log_debug(ZONE, "cannot stat message log: %s", logfile_fullpath);
return 0;
}
if (((r->message_logging_roll_days > 0) && (filestat.st_birthtime + (r->message_logging_roll_days*SECS_PER_DAY) <= curr_time)) ||
((r->message_logging_roll_megs > 0) && (filestat.st_size > 0) && (r->message_logging_roll_megs <= ((long long)filestat.st_size/BYTES_PER_MEG))))
{
for (num_old_files = 0; ; num_old_files++)
{
snprintf(logfile_compressed_path, sizeof(logfile_compressed_path), "%s/%s.%d.gz", r->message_logging_dir, r->message_logging_file, num_old_files);
if (stat64(logfile_compressed_path, &compressed_stat))
{
break;
}
}
num_old_files--;
umask((mode_t) 0077);
for (i = num_old_files; i >= 0; i--)
{
snprintf(logfile_compressed_path, sizeof(logfile_compressed_path), "%s/%s.%d.gz", r->message_logging_dir, r->message_logging_file, i);
snprintf(logfile_compressed_path_new, sizeof(logfile_compressed_path_new), "%s/%s.%d.gz", r->message_logging_dir, r->message_logging_file, i+1);
if ((rename(logfile_compressed_path, logfile_compressed_path_new)) != 0)
{
log_write(r->log, LOG_ERR, "Unable to rename message log: %s ... to: %s", logfile_compressed_path, logfile_compressed_path_new);
return 1;
}
}
snprintf(logfile_uncompressed_path, sizeof(logfile_uncompressed_path), "%s/%s", r->message_logging_dir, r->message_logging_file);
snprintf(logfile_uncompressed_path_new, sizeof(logfile_uncompressed_path_new), "%s/%s.0", r->message_logging_dir, r->message_logging_file);
if ((rename(logfile_uncompressed_path, logfile_uncompressed_path_new)) != 0)
{
log_write(r->log, LOG_ERR, "Unable to rename message log: %s ... to: %s : %s", logfile_uncompressed_path, logfile_uncompressed_path_new, strerror(errno));
return 1;
}
if ((pid = fork()) < 0)
{
log_write(r->log, LOG_ERR, "fork() problem when attempting to roll and compress logs: %s", strerror(errno));
} else if (pid == 0)
{ execl("/usr/bin/gzip", "gzip", "-q", logfile_uncompressed_path_new, (char *)NULL);
log_write(r->log, LOG_ERR, "execl() error: %s", strerror(errno));
}
if ((message_file = fopen(logfile_uncompressed_path, "w+")) == NULL)
{
log_write(r->log, LOG_ERR, "Unable to open message log for writing: %s: %s", logfile_uncompressed_path, strerror(errno));
return 1;
}
fprintf(message_file, "# This message log is created by the jabberd router.\n");
fprintf(message_file, "# See router.xml for logging options.\n");
fprintf(message_file, "# Format: (Date)<tab>(From JID)<tab>(To JID)<tab>(Message Body)<line end>\n");
fclose(message_file);
}
return 0;
}