mupdate-slave.c   [plain text]


/* mupdate-slave.c -- cyrus murder database clients
 *
 * $Id: mupdate-slave.c,v 1.29 2007/02/05 18:41:47 jeaton Exp $
 * Copyright (c) 1998-2003 Carnegie Mellon University.  All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer. 
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. The name "Carnegie Mellon University" must not be used to
 *    endorse or promote products derived from this software without
 *    prior written permission. For permission or any other legal
 *    details, please contact  
 *      Office of Technology Transfer
 *      Carnegie Mellon University
 *      5000 Forbes Avenue
 *      Pittsburgh, PA  15213-3890
 *      (412) 268-4387, fax: (412) 268-7395
 *      tech-transfer@andrew.cmu.edu
 *
 * 4. Redistributions of any form whatsoever must retain the following
 *    acknowledgment:
 *    "This product includes software developed by Computing Services
 *     at Carnegie Mellon University (http://www.cmu.edu/computing/)."
 *
 * CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
 * THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
 * FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
 * AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <config.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <ctype.h>
#include <sasl/sasl.h>
#include <sasl/saslutil.h>
#include <syslog.h>
#ifdef HAVE_STDARG_H
#include <stdarg.h>
#else
#include <varargs.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/stat.h>
#include <netinet/in.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif

#include "prot.h"
#include "xmalloc.h"
#include "xstrlcpy.h"
#include "xstrlcat.h"
#include "global.h"
#include "assert.h"
#include "imparse.h"
#include "iptostring.h"
#include "mupdate.h"
#include "exitcodes.h"

/* Returns file descriptor of kick socket (or does not return) */
static int open_kick_socket() 
{
    int r,s,len;
    char fnamebuf[2048];
    struct sockaddr_un srvaddr;
    mode_t oldumask;

    s = socket(AF_UNIX, SOCK_STREAM, 0);
    if (s == -1) {
	syslog(LOG_ERR, "socket: %m");
	fatal("socket failed", EC_OSERR);
    }

    strlcpy(fnamebuf, config_dir, sizeof(fnamebuf));
    strlcat(fnamebuf, FNAME_MUPDATE_TARGET_SOCK, sizeof(fnamebuf));

    (void) unlink(fnamebuf);
    memset((char *)&srvaddr, 0, sizeof(srvaddr));
    srvaddr.sun_family = AF_UNIX;
    strlcpy(srvaddr.sun_path, fnamebuf, sizeof(srvaddr.sun_path));
    len = strlen(srvaddr.sun_path) + sizeof(srvaddr.sun_family) + 1;
    oldumask = umask((mode_t) 0); /* for Linux */
    r = bind(s, (struct sockaddr *)&srvaddr, len);
    umask(oldumask); /* for Linux */
    chmod(fnamebuf, 0777); /* for DUX */
    if (r == -1) {
	syslog(LOG_ERR, "bind: %s: %m", fnamebuf);
	fatal("bind failed", EC_OSERR);
    }
    r = listen(s, 10);
    if (r == -1) {
	syslog(LOG_ERR, "listen: %m");
	fatal("listen failed", EC_OSERR);
    }

    return s;
}

/* Accept up to max_fds connections on kicksock, put the fds into
 * the array fd_list (atleast max_fds big), and the number of connections
 * into num_fds */
static int get_kick_fds(int kicksock,
			int *fd_list, int *num_fds, int max_fds) {
    fd_set read_set;
    int highest_fd = kicksock + 1;
    struct timeval tv;

    tv.tv_sec = 0;
    tv.tv_usec = 0;

    FD_ZERO(&read_set);
    FD_SET(kicksock, &read_set);

    *num_fds = 0;

    for(*num_fds = 0; *num_fds<max_fds; (*num_fds)++) {
        int gotdata;
	fd_set rset;

	rset = read_set;
	gotdata = select(highest_fd, &rset, NULL, NULL, &tv);

	if(gotdata == -1) {
	  /* Select Error! */
	  syslog(LOG_ERR, "kicksock select failed");
	  return -1;
	} else if (gotdata != 0 && FD_ISSET(kicksock, &rset)) {
	  struct sockaddr_un clientaddr;
	  int len = sizeof(clientaddr);

	  fd_list[*num_fds] =
	      accept(kicksock, (struct sockaddr *)&clientaddr, (socklen_t *)&len);
	  if (fd_list[*num_fds] == -1) {
	    syslog(LOG_WARNING, "kicksock accept() failed: %m %d", kicksock);
	    return -1;
	  }
	} else {
	  /* Timeout Expired, we're done! */
	  break;
	}
    }

    return 0;
}

#define KICK_FDS_LEN 5

static void mupdate_listen(mupdate_handle *handle, int pingtimeout)
{
    int gotdata = 0;
    fd_set rset, read_set;
    int highest_fd, kicksock;
    int waiting_for_noop = 0;
    int kick_fds[KICK_FDS_LEN];
    int num_kick_fds = 0;
    
    if (!handle || !handle->saslcompleted) return;

    /* don't handle connections (and drop current connections)
     * while we sync */
    mupdate_unready();

    /* First, resync the database */
    if(mupdate_synchronize(handle)) return;

    mupdate_signal_db_synced();
    
    /* Okay, we're all set to go */
    mupdate_ready();

    kicksock = open_kick_socket();
    highest_fd = ((kicksock > handle->conn->sock) ? kicksock : handle->conn->sock) + 1;

    FD_ZERO(&read_set);
    FD_SET(handle->conn->sock, &read_set);
    FD_SET(kicksock, &read_set);

    /* Now just listen to the rest of the updates */
    while(1) {
	struct timeval tv;

	tv.tv_sec = pingtimeout;
	tv.tv_usec = 0;

	prot_flush(handle->conn->out);

	rset = read_set;

	gotdata = select(highest_fd, &rset, NULL, NULL, &tv);

	if(gotdata == -1) {
	    /* Oops? */
	    syslog(LOG_ERR, "select failed");
	    break;
	} else if(gotdata != 0) {
	    if (FD_ISSET(handle->conn->sock, &rset)) {
		/* If there is a fatal error, die, other errors ignore */
		if (mupdate_scarf(handle, cmd_change, NULL, 
				  waiting_for_noop, NULL) != 0) {
		    break;
		}
	    } 
	    
	    /* If we were waiting on a noop, we no longer are.
	     * If we have been kicked, tell them we're done now */
	    if(waiting_for_noop) {
		waiting_for_noop = 0;

		for(;num_kick_fds;num_kick_fds--) {
		    if (write(kick_fds[num_kick_fds-1], "ok", 2) < 0) {
			syslog(LOG_WARNING,
			       "can't write to IPC socket (ignoring)");
		    }
		    close(kick_fds[num_kick_fds-1]);
		}
	    }
	    
	    if (FD_ISSET(kicksock, &rset)) {
	        /* We were kicked--collect outstanding kicks! */
	        if(get_kick_fds(kicksock,
				kick_fds, &num_kick_fds, KICK_FDS_LEN)) {
		    /* Nonzero return code -- Error */
		    break;
		}

		prot_printf(handle->conn->out, "N%u NOOP\r\n", handle->tagn++);
		prot_flush(handle->conn->out);
		waiting_for_noop = 1;
	    }
	} else /* (gotdata == 0) */ {
	    /* Timeout, send a NOOP */
	    if(!waiting_for_noop) {
		prot_printf(handle->conn->out, "N%u NOOP\r\n", handle->tagn++);
		prot_flush(handle->conn->out);
		waiting_for_noop = 1;
	    } else {
		/* We were already waiting on a noop! */
		syslog(LOG_ERR, "connection to master timed out.");
		break;
	    }
	}
    } /* Loop */

    /* Don't leak the descriptors! */
    for(;num_kick_fds;num_kick_fds--) {
      close(kick_fds[num_kick_fds-1]);
    }
    close(kicksock);
}

void *mupdate_client_start(void *rock __attribute__((unused)))
{
    mupdate_handle *h = NULL;
    int retry_delay = 20, real_delay;
    int ret;

    srand(time(NULL) * getpid());

    if(!config_mupdate_server) {
	fatal("couldn't get mupdate server name", EC_UNAVAILABLE);
    }

    retry_delay = config_getint(IMAPOPT_MUPDATE_RETRY_DELAY);
    if(retry_delay < 0) {
	fatal("invalid value for mupdate_retry_delay", EC_UNAVAILABLE);
    }

    while(1) {
	ret = mupdate_connect(config_mupdate_server, NULL, &h, NULL);
	if(ret) {
	    syslog(LOG_ERR,"couldn't connect to mupdate server");
	    goto retry;
	}
   
	syslog(LOG_ERR, "successful mupdate connection to %s",
	       config_mupdate_server);

	mupdate_listen(h, retry_delay);

    retry:
	/* Cleanup */
	mupdate_disconnect(&h);

	real_delay = retry_delay + (rand() % (retry_delay / 2));
	
	syslog(LOG_ERR,
	       "retrying connection to mupdate server in %d seconds",
	       real_delay);

	/* Wait before retrying */
	sleep(real_delay);
    }

    return NULL;
}

void *mupdate_placebo_kick_start(void *rock __attribute__((unused)))
{
    int kicksock, kickconn = -1;
    
    kicksock = open_kick_socket();

    /* Now just listen to the rest of the updates */
    while(1) {
        struct sockaddr_un clientaddr;
	int len;
      
	/* Only handle one kick at a time -- they're fast */
	len = sizeof(clientaddr);
	kickconn =
	    accept(kicksock, (struct sockaddr *)&clientaddr, (socklen_t *)&len);
	
	if (kickconn == -1) {
	  syslog(LOG_WARNING, "accept(): %m");
	  break;
	} else {
	  if (write(kickconn, "ok", 2) < 0) {
	    syslog(LOG_WARNING, "can't write to IPC socket?");
	  }
	  close(kickconn);
	  kickconn = -1;
	}
    } /* Loop */

    /* Don't leak the descriptor! */
    if(kickconn >= 0) close(kickconn);
    close(kicksock);

    return NULL;
}