#include <NetInfo/config.h>
#include "ni_server.h"
#include "ni_globals.h"
#include "notify.h"
#include "event.h"
#include <NetInfo/system_log.h>
#include <NetInfo/socket_lock.h>
#include "ni_dir.h"
#include "getstuff.h"
#include <sys/errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <stdlib.h>
extern SVCXPRT * svcfd_create(int, u_int, u_int);
extern int svc_maxfd;
extern int _rpc_dtablesize();
extern void readall_cleanup(void);
void log_connection_info(int fd);
int ni_svc_connections;
int ni_svc_topconnections;
int ni_svc_maxconnections;
int ni_svc_lruclosed;
typedef struct lrustuff
{
unsigned len;
int *val;
} lrustuff;
static void
fd_and(unsigned max, fd_set *a, fd_set *b, fd_set *c)
{
int i;
for (i = 0; i < howmany(max, NFDBITS); i++)
{
c->fds_bits[i] = a->fds_bits[i] & b->fds_bits[i];
}
}
#ifdef notdef
static void
fd_or(unsigned max, fd_set *a, fd_set *b, fd_set *c)
{
int i;
for (i = 0; i < howmany(max, NFDBITS); i++)
{
c->fds_bits[i] = a->fds_bits[i] | b->fds_bits[i];
}
}
#endif
static void
fd_clr(unsigned max, fd_set *a, fd_set *b, fd_set *c)
{
int i;
for (i = 0; i < howmany(max, NFDBITS); i++)
{
c->fds_bits[i] = ~(a->fds_bits[i]) & b->fds_bits[i];
}
}
static int
bitcount(unsigned u)
{
int count;
for (count = 0; u > 0; u >>=1) count += (u & 1);
return count;
}
static int
fd_count(unsigned max, fd_set *fds)
{
int i, count;
count = 0;
for (i = 0; i < howmany(max, NFDBITS); i++)
{
count += bitcount((unsigned)fds->fds_bits[i]);
}
return count;
}
static lrustuff
lru_init(unsigned max)
{
lrustuff lru;
lru.val = (int *)malloc(sizeof(int) * max);
lru.len = 0;
return lru;
}
static void
lru_markone(lrustuff *lru, unsigned which)
{
int i, j, mark, new;
new = 1;
mark = lru->len;
for (i = 0; i < lru->len; i++)
{
if (lru->val[i] == which)
{
new = 0;
mark = i;
lru->len--;
break;
}
}
for (j = mark; j > 0; j--)
{
lru->val[j] = lru->val[j - 1];
}
lru->val[0] = which;
lru->len++;
if (new == 1)
{
setsockopt(which, IPPROTO_TCP, TCP_NODELAY, &new, sizeof(int));
}
}
static void
lru_mark(unsigned max, lrustuff *lru, fd_set *fds)
{
int i, j;
fd_mask mask, mask2;
for (i = 0; i < howmany(max, NFDBITS); i++)
{
mask = fds->fds_bits[i];
for (j = 0, mask2 = 1; mask && j < NFDBITS; j++, mask2 <<= 1)
{
if (mask & mask2)
{
lru_markone(lru, NFDBITS * i + j);
mask ^= mask2;
}
}
}
}
static void
lru_unmarkone(lrustuff *lru, unsigned which)
{
int i;
for (i = 0; i < lru->len; i++)
{
if (lru->val[i] == which)
{
while (i < lru->len)
{
lru->val[i] = lru->val[i + 1];
i++;
}
lru->len--;
return;
}
}
}
static void
lru_unmark(unsigned max, lrustuff *lru, fd_set *fds)
{
int i, j;
fd_mask mask, mask2;
for (i = 0; i < howmany(max, NFDBITS); i++)
{
mask = fds->fds_bits[i];
for (j = 0, mask2 = 1; mask && j < NFDBITS; j++, mask2 <<= 1)
{
if (mask & mask2)
{
lru_unmarkone(lru, NFDBITS * i + j);
mask ^= mask2;
}
}
}
}
static void
lru_close(lrustuff *lru)
{
fd_set mask;
int fd;
fd = lru->val[lru->len - 1];
log_connection_info(fd);
lru_unmarkone(lru, fd);
FD_ZERO(&mask);
FD_SET(fd, &mask);
socket_lock();
close(fd);
svc_getreqset(&mask);
socket_unlock();
if (FD_ISSET(fd, &svc_fdset))
{
system_log(LOG_ERR, "closed descriptor is still set");
}
ni_svc_lruclosed++;
}
#if CONNECTION_CHECK
void
open_connections(unsigned maxfds, fd_set *fds)
{
int newfd;
struct sockaddr_in from;
int fromlen;
SVCXPRT *transp;
if (FD_ISSET(tcp_sock, fds))
{
FD_CLR(tcp_sock, fds);
fromlen = sizeof(from);
socket_lock();
newfd = accept(tcp_sock, (struct sockaddr *)&from, &fromlen);
socket_unlock();
if (newfd >= 0)
{
if (is_trusted_network(db_ni, &from))
{
transp = (SVCXPRT *)svcfd_create(newfd, NI_SENDSIZE, NI_RECVSIZE);
if (transp != NULL)
{
transp->xp_raddr = from;
transp->xp_addrlen = fromlen;
if (!FD_ISSET(newfd, &svc_fdset))
{
system_log(LOG_ERR, "new descriptor is not set");
}
}
else
{
socket_lock();
close(newfd);
socket_unlock();
if (FD_ISSET(newfd, &svc_fdset))
{
system_log(LOG_ERR, "closed descriptor is still set");
}
}
}
else
{
socket_lock();
close(newfd);
socket_unlock();
}
}
}
if (FD_ISSET(udp_sock, fds))
{
socket_lock();
svc_getreqset(fds);
socket_unlock();
}
}
#endif
void
ni_svc_run(int maxlisteners)
{
fd_set readfds;
fd_set orig;
fd_set fds;
fd_set save;
fd_set shut;
unsigned maxfds;
unsigned lastfd;
extern int errno;
int event_fd;
lrustuff lru;
struct timeval now;
bool_t saw_ebadf = FALSE;
FD_ZERO(&readfds);
FD_ZERO(&orig);
FD_ZERO(&fds);
FD_ZERO(&save);
FD_ZERO(&shut);
gettimeofday(&now, NULL);
cleanuptime = now.tv_sec + cleanupwait;
orig = svc_fdset;
maxfds = _rpc_dtablesize();
if (FD_SETSIZE < maxfds)
{
maxfds = FD_SETSIZE;
}
lru = lru_init(maxfds);
ni_svc_connections = 0;
ni_svc_topconnections = 0;
ni_svc_maxconnections = maxlisteners;
ni_svc_lruclosed = 0;
while (!shutdown_server)
{
if (readall_done) readall_cleanup();
fd_clr(maxfds, &clnt_fdset, &svc_fdset, &readfds);
event_fd = event_pipe[0];
if (event_fd >= 0) FD_SET(event_fd, &readfds);
lastfd = svc_maxfd;
if ((event_fd >= 0) && (event_fd > lastfd)) lastfd = event_fd;
switch (select(lastfd+1, &readfds, NULL, NULL, NULL))
{
case -1:
if (errno == EBADF)
{
if (saw_ebadf)
{
system_log(LOG_WARNING, "2nd straight bad file number in readfds; flushing LRU cache (%d entr%s)", ni_svc_connections, ni_svc_connections != 1 ? "ies" : "y");
for (; ni_svc_connections > 0; ni_svc_connections--)
{
lru_close(&lru);
}
saw_ebadf = FALSE;
}
else
{
system_log(LOG_WARNING, "Bad file number in readfds; cocking...");
saw_ebadf = TRUE;
}
}
else if (errno != EINTR)
{
saw_ebadf = FALSE;
system_log(LOG_ERR, "unexpected errno: %m");
sleep(10);
}
else
{
saw_ebadf = FALSE;
if (readall_done) readall_cleanup();
}
break;
case 0:
saw_ebadf = FALSE;
break;
default:
saw_ebadf = FALSE;
if (readall_done) readall_cleanup();
if (event_fd >= 0 && FD_ISSET(event_fd, &readfds))
{
FD_CLR(event_fd, &readfds);
event_handle();
}
fd_clr(maxfds, &clnt_fdset, &svc_fdset, &save);
fd_and(maxfds, &orig, &readfds, &fds);
#if CONNECTION_CHECK
open_connections(maxfds, &fds);
#else
socket_lock();
svc_getreqset(&fds);
socket_unlock();
#endif
fd_clr(maxfds, &save, &svc_fdset, &fds);
fd_clr(maxfds, &clnt_fdset, &fds, &fds);
ni_svc_connections += fd_count(maxfds, &fds);
if (ni_svc_connections > ni_svc_topconnections)
{
ni_svc_topconnections = ni_svc_connections;
}
lru_mark(maxfds, &lru, &fds);
fd_clr(maxfds, &orig, &readfds, &fds);
lru_mark(maxfds, &lru, &fds);
socket_lock();
svc_getreqset(&fds);
socket_unlock();
fd_clr(maxfds, &svc_fdset, &save, &shut);
fd_clr(maxfds, &clnt_fdset, &shut, &shut);
ni_svc_connections -= fd_count(maxfds, &shut);
lru_unmark(maxfds, &lru, &shut);
if (ni_svc_connections > ni_svc_maxconnections)
{
system_log(LOG_INFO, "Over max FDs; closing LRU (used %d, max %d)", ni_svc_connections, ni_svc_maxconnections);
}
while (ni_svc_connections > ni_svc_maxconnections)
{
lru_close(&lru);
ni_svc_connections--;
}
}
gettimeofday(&now, NULL);
if ((0 < cleanupwait) && (now.tv_sec > cleanuptime))
{
#ifdef FLUSHCACHE
ni_forget(db_ni);
#endif
cleanuptime = now.tv_sec + cleanupwait;
system_log(LOG_DEBUG, "cleaning up...");
if (i_am_clone)
{
dir_clonecheck();
have_transferred = 0;
}
else
{
notify_resync();
}
}
}
}
void
log_connection_info(int fd)
{
struct sockaddr_in us;
struct sockaddr_in them;
int count;
count = sizeof(us);
if (0 != getsockname(fd, (struct sockaddr *)&us, &count))
{
system_log(LOG_DEBUG, "lru_close %d; can't getsockname - %m", fd);
return;
}
count = sizeof(them);
if (0 != getpeername(fd, (struct sockaddr *)&them, &count))
{
system_log(LOG_DEBUG, "lru_close %d: can't getpeername - %m", fd);
return;
}
system_log(LOG_DEBUG, "lru_close %d from %s:%hu to %s:%hu", fd,
inet_ntoa(us.sin_addr), ntohs(us.sin_port),
inet_ntoa(them.sin_addr), ntohs(them.sin_port));
}