/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2005,2008 Oracle. All rights reserved. * * $Id: repmgr_util.c,v 1.45 2008/04/30 02:33:34 alexg Exp $ */ #include "db_config.h" #define __INCLUDE_NETWORKING 1 #include "db_int.h" /* * Schedules a future attempt to re-establish a connection with the given site. * Usually, we wait the configured retry_wait period. But if the "immediate" * parameter is given as TRUE, we'll make the wait time 0, and put the request * at the _beginning_ of the retry queue. Note how this allows us to preserve * the property that the queue stays in time order simply by appending to the * end. * * PUBLIC: int __repmgr_schedule_connection_attempt __P((ENV *, u_int, int)); * * !!! * Caller should hold mutex. * * Unless an error occurs, we always attempt to wake the main thread; * __repmgr_bust_connection relies on this behavior. */ int __repmgr_schedule_connection_attempt(env, eid, immediate) ENV *env; u_int eid; int immediate; { DB_REP *db_rep; REPMGR_RETRY *retry; REPMGR_SITE *site; db_timespec t; int ret; db_rep = env->rep_handle; if ((ret = __os_malloc(env, sizeof(*retry), &retry)) != 0) return (ret); __os_gettime(env, &t, 1); if (immediate) TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries); else { TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->connection_retry_wait); TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries); } retry->eid = eid; retry->time = t; site = SITE_FROM_EID(eid); site->state = SITE_IDLE; site->ref.retry = retry; return (__repmgr_wake_main_thread(env)); } /* * Initialize the necessary control structures to begin reading a new input * message. * * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *)); */ void __repmgr_reset_for_reading(con) REPMGR_CONNECTION *con; { con->reading_phase = SIZES_PHASE; __repmgr_iovec_init(&con->iovecs); __repmgr_add_buffer(&con->iovecs, &con->msg_type, sizeof(con->msg_type)); __repmgr_add_buffer(&con->iovecs, &con->control_size_buf, sizeof(con->control_size_buf)); __repmgr_add_buffer(&con->iovecs, &con->rec_size_buf, sizeof(con->rec_size_buf)); } /* * Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of * connections. It does not initialize eid, since that isn't needed and/or * immediately known in all cases. * * PUBLIC: int __repmgr_new_connection __P((ENV *, REPMGR_CONNECTION **, * PUBLIC: socket_t, int)); */ int __repmgr_new_connection(env, connp, s, state) ENV *env; REPMGR_CONNECTION **connp; socket_t s; int state; { DB_REP *db_rep; REPMGR_CONNECTION *c; int ret; db_rep = env->rep_handle; if ((ret = __os_calloc(env, 1, sizeof(REPMGR_CONNECTION), &c)) != 0) return (ret); if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) { __os_free(env, c); return (ret); } c->blockers = 0; c->fd = s; c->state = state; STAILQ_INIT(&c->outbound_queue); c->out_queue_length = 0; __repmgr_reset_for_reading(c); TAILQ_INSERT_TAIL(&db_rep->connections, c, entries); *connp = c; return (0); } /* * PUBLIC: int __repmgr_new_site __P((ENV *, REPMGR_SITE**, * PUBLIC: const repmgr_netaddr_t *, int)); * * !!! * Caller must hold mutex. */ int __repmgr_new_site(env, sitep, addr, state) ENV *env; REPMGR_SITE **sitep; const repmgr_netaddr_t *addr; int state; { DB_REP *db_rep; REPMGR_SITE *site; SITE_STRING_BUFFER buffer; u_int new_site_max, eid; int ret; db_rep = env->rep_handle; if (db_rep->site_cnt >= db_rep->site_max) { #define INITIAL_SITES_ALLOCATION 10 /* Arbitrary guess. */ new_site_max = db_rep->site_max == 0 ? INITIAL_SITES_ALLOCATION : db_rep->site_max * 2; if ((ret = __os_realloc(env, sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0) return (ret); db_rep->site_max = new_site_max; } eid = db_rep->site_cnt++; site = &db_rep->sites[eid]; memcpy(&site->net_addr, addr, sizeof(*addr)); ZERO_LSN(site->max_ack); site->flags = 0; timespecclear(&site->last_rcvd_timestamp); site->state = state; RPRINT(env, DB_VERB_REPMGR_MISC, (env, "EID %u is assigned for %s", eid, __repmgr_format_site_loc(site, buffer))); *sitep = site; return (0); } /* * Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to * by the addr. * * PUBLIC: void __repmgr_cleanup_netaddr __P((ENV *, repmgr_netaddr_t *)); */ void __repmgr_cleanup_netaddr(env, addr) ENV *env; repmgr_netaddr_t *addr; { if (addr->address_list != NULL) { __os_freeaddrinfo(env, addr->address_list); addr->address_list = addr->current = NULL; } if (addr->host != NULL) { __os_free(env, addr->host); addr->host = NULL; } } /* * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *)); */ void __repmgr_iovec_init(v) REPMGR_IOVECS *v; { v->offset = v->count = 0; v->total_bytes = 0; } /* * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t)); * * !!! * There is no checking for overflow of the vectors[5] array. */ void __repmgr_add_buffer(v, address, length) REPMGR_IOVECS *v; void *address; size_t length; { v->vectors[v->count].iov_base = address; v->vectors[v->count++].iov_len = length; v->total_bytes += length; } /* * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *)); */ void __repmgr_add_dbt(v, dbt) REPMGR_IOVECS *v; const DBT *dbt; { v->vectors[v->count].iov_base = dbt->data; v->vectors[v->count++].iov_len = dbt->size; v->total_bytes += dbt->size; } /* * Update a set of iovecs to reflect the number of bytes transferred in an I/O * operation, so that the iovecs can be used to continue transferring where we * left off. * Returns TRUE if the set of buffers is now fully consumed, FALSE if more * remains. * * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t)); */ int __repmgr_update_consumed(v, byte_count) REPMGR_IOVECS *v; size_t byte_count; { db_iovec_t *iov; int i; for (i = v->offset; ; i++) { DB_ASSERT(NULL, i < v->count && byte_count > 0); iov = &v->vectors[i]; if (byte_count > iov->iov_len) { /* * We've consumed (more than) this vector's worth. * Adjust count and continue. */ byte_count -= iov->iov_len; } else { /* * Adjust length of remaining portion of vector. * byte_count can never be greater than iov_len, or we * would not be in this section of the if clause. */ iov->iov_len -= (u_int32_t)byte_count; if (iov->iov_len > 0) { /* * Still some left in this vector. Adjust base * address too, and leave offset pointing here. */ iov->iov_base = (void *) ((u_int8_t *)iov->iov_base + byte_count); v->offset = i; } else { /* * Consumed exactly to a vector boundary. * Advance to next vector for next time. */ v->offset = i+1; } /* * If offset has reached count, the entire thing is * consumed. */ return (v->offset >= v->count); } } } /* * Builds a buffer containing our network address information, suitable for * publishing as cdata via a call to rep_start, and sets up the given DBT to * point to it. The buffer is dynamically allocated memory, and the caller must * assume responsibility for it. * * PUBLIC: int __repmgr_prepare_my_addr __P((ENV *, DBT *)); */ int __repmgr_prepare_my_addr(env, dbt) ENV *env; DBT *dbt; { DB_REP *db_rep; size_t size, hlen; u_int16_t port_buffer; u_int8_t *ptr; int ret; db_rep = env->rep_handle; /* * The cdata message consists of the 2-byte port number, in network byte * order, followed by the null-terminated host name string. */ port_buffer = htons(db_rep->my_addr.port); size = sizeof(port_buffer) + (hlen = strlen(db_rep->my_addr.host) + 1); if ((ret = __os_malloc(env, size, &ptr)) != 0) return (ret); DB_INIT_DBT(*dbt, ptr, size); memcpy(ptr, &port_buffer, sizeof(port_buffer)); ptr = &ptr[sizeof(port_buffer)]; memcpy(ptr, db_rep->my_addr.host, hlen); return (0); } /* * Provide the appropriate value for nsites, the number of sites in the * replication group. If the application has specified a value, use that. * Otherwise, just use the number of sites we know of. * * !!! * This may only be called after the environment has been opened, because we * assume we have a rep region. That should be OK, because we only need this * for starting an election, or counting acks after sending a PERM message. * * PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *)); */ u_int __repmgr_get_nsites(db_rep) DB_REP *db_rep; { REP *rep; rep = db_rep->region; if (rep->config_nsites > 0) return ((u_int)rep->config_nsites); /* * The number of other sites in our table, plus 1 to count ourself. */ return (db_rep->site_cnt + 1); } /* * PUBLIC: void __repmgr_thread_failure __P((ENV *, int)); */ void __repmgr_thread_failure(env, why) ENV *env; int why; { (void)__repmgr_stop_threads(env); (void)__env_panic(env, why); } /* * Format a printable representation of a site location, suitable for inclusion * in an error message. The buffer must be at least as big as * MAX_SITE_LOC_STRING. * * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *)); */ char * __repmgr_format_eid_loc(db_rep, eid, buffer) DB_REP *db_rep; int eid; char *buffer; { if (IS_VALID_EID(eid)) return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer)); snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)"); return (buffer); } /* * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *)); */ char * __repmgr_format_site_loc(site, buffer) REPMGR_SITE *site; char *buffer; { snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu", site->net_addr.host, (u_long)site->net_addr.port); return (buffer); } /* * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t)); */ int __repmgr_repstart(env, flags) ENV *env; u_int32_t flags; { DBT my_addr; int ret; if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0) return (ret); ret = __rep_start(env->dbenv, &my_addr, flags); __os_free(env, my_addr.data); if (ret != 0) __db_err(env, ret, "rep_start"); return (ret); }