dgxq.c   [plain text]


/*
 * Copyright (c) 2010 Apple Inc. All rights reserved.
 *
 * @APPLE_LICENSE_HEADER_START@
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1.  Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 * 2.  Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 * 3.  Neither the name of Apple Inc. ("Apple") nor the names of its
 *     contributors may be used to endorse or promote products derived from
 *     this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Portions of this software have been released under the following terms:
 *
 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC.
 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY
 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION
 *
 * To anyone who acknowledges that this file is provided "AS IS"
 * without any express or implied warranty:
 * permission to use, copy, modify, and distribute this file for any
 * purpose is hereby granted without fee, provided that the above
 * copyright notices and this notice appears in all source code copies,
 * and that none of the names of Open Software Foundation, Inc., Hewlett-
 * Packard Company or Digital Equipment Corporation be used
 * in advertising or publicity pertaining to distribution of the software
 * without specific, written prior permission.  Neither Open Software
 * Foundation, Inc., Hewlett-Packard Company nor Digital
 * Equipment Corporation makes any representations about the suitability
 * of this software for any purpose.
 *
 * Copyright (c) 2007, Novell, Inc. All rights reserved.
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1.  Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 * 2.  Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 * 3.  Neither the name of Novell Inc. nor the names of its contributors
 *     may be used to endorse or promote products derived from this
 *     this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * @APPLE_LICENSE_HEADER_END@
 */

/*
**
**  NAME:
**
**      dgxq.c
**
**  FACILITY:
**
**      Remote Procedure Call (RPC)
**
**  ABSTRACT:
**
**  DG protocol service routines.  Handles transmit queues.
**
**
*/

#include <dg.h>
#include <dgxq.h>
#include <dgpkt.h>
#include <dgcall.h>

/*========================================================================= */

#define MAX_SENDMSG_RETRIES 5

/*========================================================================= */

#define XQE_FREE_LIST_MAX_LENGTH    32

INTERNAL struct {
    rpc_dg_xmitq_elt_p_t head;
    unsigned16 length;
} xqe_free_list ATTRIBUTE_UNUSED;

/*========================================================================= */

/*
 * com_timeout controllable parameters...
 *
 * max_xq_awaiting_ack_time:
 *     The maximum time that we will attempt to retransmit pkts to a receiver
 *     without any apparent type of acknowledgment.  This is a general xq
 *     related value and mechanism that removes the need for the old server
 *     specific "final" state timeout processing.
 */

typedef struct {
    unsigned32 max_xq_awaiting_ack_time;
} com_timeout_params_t;

/*========================================================================= */

/*
 * R P C _ _ D G _ X M I T Q _ A W A I T I N G _ A C K _ T M O
 *
 * Return true iff the xmitq is awaiting an ack and it's been waiting
 * too long.
 */

PRIVATE boolean rpc__dg_xmitq_awaiting_ack_tmo
(
    rpc_dg_xmitq_p_t xq,
    unsigned32 com_timeout_knob
)
{
    rpc_clock_t timestamp, wait_time;
    static com_timeout_params_t xq_com_timeout_params[] = {
                            /* max_xq_awaiting_ack_time */
        /*  0 min */        {RPC_CLOCK_SEC(1)},
        /*  1 */            {RPC_CLOCK_SEC(2)},
        /*  2 */            {RPC_CLOCK_SEC(4)},
        /*  3 */            {RPC_CLOCK_SEC(8)},
        /*  4 */            {RPC_CLOCK_SEC(15)},
        /*  5 def */        {RPC_CLOCK_SEC(30)},
        /*  6 */            {RPC_CLOCK_SEC(2*30)},
        /*  7 */            {RPC_CLOCK_SEC(4*30)},
        /*  8 */            {RPC_CLOCK_SEC(8*30)},
        /*  9 */            {RPC_CLOCK_SEC(16*30)},
        /* 10 infinite */   {RPC_CLOCK_SEC(0)}
    };

    timestamp = xq->awaiting_ack_timestamp;
    wait_time = xq_com_timeout_params[com_timeout_knob].max_xq_awaiting_ack_time;

    if (xq->awaiting_ack && rpc__clock_aged(timestamp, wait_time)
        && com_timeout_knob != rpc_c_binding_infinite_timeout)
    {
        RPC_DBG_GPRINTF(
            ("(rpc__dg_xmitq_awaiting_ack_tmo) timeout (timestamp=%lu, wait_time=%lu, now=%lu) [%s]\n",
            (unsigned long)timestamp, (unsigned long)wait_time,
	    (unsigned long)rpc__clock_stamp(),
            rpc__dg_act_seq_string(&xq->hdr)));
        return (true);
    }
    else
    {
        return (false);
    }
}

/*
 * S E N D _ B R O A D C A S T
 *
 * Send a datagram out all the appropriate broadcast addresses.
 */

INTERNAL void send_broadcast (
        rpc_dg_call_p_t  /*call*/,
        rpc_socket_iovec_p_t  /*iov*/,
        int  /*iovlen*/
    );

INTERNAL void send_broadcast
(
    rpc_dg_call_p_t call,
    rpc_socket_iovec_p_t iov,
    int iovlen
)
{
    unsigned32 st, j;
    rpc_socket_error_t serr;
    int i;
    size_t sentcc, sendcc;
    rpc_dg_sock_pool_elt_p_t sp;
    unsigned_char_p_t endpoint = NULL;
    rpc_dg_ccall_p_t ccall = (rpc_dg_ccall_p_t) call;

    assert(RPC_DG_CALL_IS_CLIENT(call));

    sp = ccall->c.sock_ref;

    /*
     * See if we have already inquired about the broadcast addresses
     * for this socket.  If not, enable broadcasts on the socket (if
     * necessary) and call the NAF routine for the broadcast addresses.
     * (Note that we're ignoring the result of "rpc__socket_set_broadcast"
     * because what's the worst that can happen?)
     */

    if (sp->brd_addrs == NULL)
    {
        (void) rpc__socket_set_broadcast(sp->sock);

        rpc__naf_get_broadcast(rpc_g_protseq_id[sp->pseq_id].naf_id,
                               ccall->h->c.c.rpc_addr->rpc_protseq_id,
                               &sp->brd_addrs, &st);
        if (st != rpc_s_ok)
        {
            return;
        }
    }

    sendcc = 0;
    for (i = 0; i < iovlen; i++)
        sendcc += iov[i].iov_len;

    rpc__naf_addr_inq_endpoint(call->addr, &endpoint, &st);

    for (j = 0; j < sp->brd_addrs->len; j++)
    {
        rpc__naf_addr_set_endpoint(endpoint, &sp->brd_addrs->addrs[j], &st);

        RPC_DG_SOCKET_SENDMSG_OOL(sp->sock, iov, iovlen,
                sp->brd_addrs->addrs[j], &sentcc, &serr);

        RPC_DG_SOCK_UPDATE_ERR_COUNT(sp, serr);

        if (RPC_SOCKET_IS_ERR(serr) || sentcc != sendcc)
        {
            RPC_DBG_GPRINTF(("(send_broadcast) sendmsg failed, sendcc = %ld, sentcc = %ld, error = %d\n",
                sendcc, sentcc, RPC_SOCKET_ETOI(serr)));
            break;
        }
        RPC_DG_STATS_INCR(pkts_sent);
        RPC_DG_STATS_INCR(brds_sent);
    }

    RPC_MEM_FREE(endpoint, RPC_C_MEM_STRING);
}

/*
 * R P C _ _ D G _ X M I T Q _ E L T _ X M I T
 *
 * Send the request/response packet denoted by the transmit queue element
 * on the specified call.
 */

PRIVATE void rpc__dg_xmitq_elt_xmit
(
    rpc_dg_xmitq_elt_p_t xqe,
    rpc_dg_call_p_t call,
    boolean32 block
)
{
    rpc_socket_iovec_t iov[RPC_C_DG_MAX_NUM_PKTS_IN_FRAG+1];
    int iovlen;
    rpc_dg_xmitq_elt_p_t last_xqe = xqe;
    rpc_dg_xmitq_p_t xq = &call->xq;
    rpc_key_info_p_t key_info;
    rpc_dg_auth_epv_p_t auth_epv;
    size_t sentcc, sendcc;
    unsigned32  original_seq = 0;
    rpc_socket_error_t serr;
    unsigned16 i;
    int ptype;
#ifdef MISPACKED_HDR
    rpc_dg_raw_pkt_hdr_t raw_hdr;
#endif

    RPC_DG_CALL_LOCK_ASSERT(call);

    /*
     * First, make sure the socket we're about to use has not been
     * disabled.  If it has been, we might as well fault the call
     * now.
     */

    if (RPC_DG_SOCK_IS_DISABLED(call->sock_ref))
    {
        RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
            ("(rpc__dg_xmitq_elt_xmit) socket %p has been disabled\n",
            call->sock_ref->sock));
        rpc__dg_call_signal_failure(call, rpc_s_socket_failure);
        return;
    }

    key_info = call->key_info;
    auth_epv = call->auth_epv;

    /*
     * Fill in the prototype header in the transmit queue using values
     * from transmit queue element.
     */

    xq->hdr.fragnum = xqe->fragnum;
    xq->hdr.flags   = xq->base_flags | xqe->flags;
    xq->hdr.flags2  = xq->base_flags2;
    xq->hdr.len     = xqe->frag_len;
    xq->hdr.auth_proto = 0;

    /*
     * Tag this packet with a serial number, and record that number
     * into the packet header.
     */

    xqe->serial_num = xq->next_serial_num++;
    xq->hdr.serial_hi = (xqe->serial_num & 0xFF00) >> 8;
    xq->hdr.serial_lo = xqe->serial_num & 0xFF;;

    ptype = RPC_DG_HDR_INQ_PTYPE(&xq->hdr);

    /*
     * For response packets we need to use the highest sequence number
     * we've seen up to this time, so that the client can keep track of
     * the sequence number space.  To work with 1.5.1 clients, we still
     * need to use the original sequence number for facks, ping responses,
     * etc.
     */

    if (ptype == RPC_C_DG_PT_RESPONSE)
    {
        original_seq = xq->hdr.seq;
        xq->hdr.seq = call->high_seq;
    }

    /*
     * Only authenticate data packets -- setting key_info to NULL
     * disables authentication machinery for this packet.
     */

    if (!RPC_DG_PT_IS_DATA(ptype))
        key_info = NULL;

    /*
     * Set "last frag" bit as necessary.  (Note that we make sure that
     * last frags never ask for facks.  Also, we don't set any of the
     * frag bits if all the data is being sent in a single packet [i.e.,
     * we're NOT fragmenting at all]).
     */

    if (xq->push && xqe == xq->tail &&
            RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG))
        xq->hdr.flags |= RPC_C_DG_PF_LAST_FRAG | RPC_C_DG_PF_NO_FACK;

    /*
     * If this is the call's final xqe (fragmented or not) or we're
     * requesting a fack, then start up the awaiting_ack detection
     * machinery (if it isn't already setup).  The receiver needs to
     * send a xmitq acknowledgment (i.e. a fack, working, response or
     * ack pkt) sooner or later.
     */

    if (! xq->awaiting_ack
        && (! RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG)
            || RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_LAST_FRAG)
            || ! RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_NO_FACK)))
    {
        RPC_DG_XMITQ_AWAITING_ACK_SET(xq);
    }

    /*
     * If we're authenticating, set auth_proto field in packet header
     * to indicate which auth protocol we're using.
     */
    if (key_info != NULL)
        xq->hdr.auth_proto = auth_epv->auth_proto;

    /*
     * Set up an I/O vector with two elements:  The first points to the
     * header (from the transmit queue) and the second points to the
     * body (from the transmit queue element).  Note that in the case
     * of "mispacked header" systems, we have to make a correctly formatted
     * header before we can transmit.
     */

    iov[0].iov_base = (byte_p_t) &xq->hdr;
    iov[0].iov_len  = RPC_C_DG_RAW_PKT_HDR_SIZE;
    iovlen = 1;
    sendcc = iov[0].iov_len;

    iov[1].iov_base = (byte_p_t) xqe->body;
    iov[1].iov_len  = xqe->body_len;

    if (xqe->body_len != 0)
    {
        iov[iovlen].iov_base = (byte_p_t) last_xqe->body;
        iov[iovlen].iov_len  = last_xqe->body_len;
        sendcc += iov[iovlen++].iov_len;

        while (last_xqe->more_data != NULL)
        {
            iov[iovlen].iov_base = (byte_p_t) last_xqe->more_data->body;
            iov[iovlen].iov_len  = last_xqe->more_data->body_len;
            sendcc += iov[iovlen++].iov_len;
            last_xqe = last_xqe->more_data;
    }
    }

    assert(iovlen > 0 && iovlen <= RPC_C_DG_MAX_NUM_PKTS_IN_FRAG + 1);
    assert(xqe->frag_len == 0 ||
           (unsigned32)sendcc == (xqe->frag_len + RPC_C_DG_RAW_PKT_HDR_SIZE));

    xqe->frag_len = xq->hdr.len = sendcc - RPC_C_DG_RAW_PKT_HDR_SIZE;

    RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
        ("(rpc__dg_xmitq_elt_xmit) %s %lu.%u.%u len=%lu %s\n",
        rpc__dg_pkt_name(RPC_DG_HDR_INQ_PTYPE(&call->xq.hdr)),
        (unsigned long)call->xq.hdr.seq, xqe->fragnum,
        xqe->serial_num, (unsigned long)xq->hdr.len,
        RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_NO_FACK) ? "" : "frq"));

#ifdef MISPACKED_HDR
    /* !!! ...compress hdr pointed to by iov[0] into raw_hdr... !!! */
    hdrp = iov[0].base;
    compress_hdr(&xq->hdr, &raw_hdr);
    iov[0].base = (byte_p_t) &raw_hdr;
#endif

    /*
     * Attach any authentication information now.
     */

    if (key_info != NULL)
    {
        unsigned32 st;
        int overhead;

        dce_pointer_t cksum = last_xqe->body->args + last_xqe->body_len;

        (*auth_epv->pre_send) (key_info, xqe, &xq->hdr, iov, iovlen, cksum, &st);
        if (st != 0)
        {
            RPC_DBG_GPRINTF(
                ("(rpc__dg_xmitq_elt_xmit) auth pre_send failed, status = %x\n",
                st));
            rpc__dg_call_signal_failure (call, st);
            return;
        }
        overhead = auth_epv->overhead;

        if (iovlen == 1)
        {
        iovlen = 2;
        iov[1].iov_len += overhead;
        }
        else
        {
            iov[iovlen-1].iov_len += overhead;
        }
        sendcc += overhead;
    }
    RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
        ("(rpc__dg_xmitq_elt_xmit) iovlen %d, sendcc %ld\n",
         iovlen, sendcc));

    /*
     * Send out the datagram (checking whether we should broadcast it).
     * For the non-broadcast cast, we accept a few EWOULDBLOCK send
     * failures (by sleeping for a bit and then trying again).  We wish
     * we could have the socket in blocking I/O mode, but the receive
     * path really wants the socket in NON-blocking mode.
     */

    if (RPC_DG_FLAG_IS_SET(call->xq.base_flags, RPC_C_DG_PF_BROADCAST))
    {
        send_broadcast(call, iov, iovlen);
    }
    else
    {
        for (i = 0; i < MAX_SENDMSG_RETRIES; i++)
        {
            RPC_DG_SOCKET_SENDMSG(call->sock_ref->sock, iov, iovlen,
                               call->addr, &sentcc, &serr);

            RPC_DG_SOCK_UPDATE_ERR_COUNT(call->sock_ref, serr);

            if (! RPC_SOCKET_IS_ERR(serr) && sentcc == sendcc)
            {
                RPC_DG_STATS_INCR(pkts_sent);
                RPC_DG_STATS_INCR(pstats[RPC_DG_HDR_INQ_PTYPE(&xq->hdr)].sent);
                break;
            }

            if (! RPC_SOCKET_ERR_EQ(serr, RPC_C_SOCKET_EWOULDBLOCK))
            {
                RPC_DBG_GPRINTF(
                    ("(rpc__dg_xmitq_elt_xmit) sendmsg failed, sendcc = %ld, sentcc = %ld, error = %d\n",
                    sendcc, sentcc, RPC_SOCKET_ETOI(serr)));
                break;
            }

            if (! block)
                break;

            /*
             * Handle EWOULDBLOCKs waiting for the condition to go away.
             */

            RPC_DBG_PRINTF(rpc_e_dbg_xmit, 2,
                ("(rpc__dg_xmitq_elt_xmit) sendmsg failed with EWOULDBLOCK; waiting\n"));

            rpc__socket_nowriteblock_wait(call->sock_ref->sock, NULL);
        }
    }

    xq->timestamp = rpc__clock_stamp();

    if (RPC_DG_HDR_INQ_PTYPE(&xq->hdr) == RPC_C_DG_PT_RESPONSE)
    {
        xq->hdr.seq = original_seq;
    }
}

/*
 * R P C _ _ D G _ X M I T Q _ I N I T
 *
 * Initialize a transmit queue (rpc_dg_xmit_q_t).  Note that we DON'T
 * fill in (all) the prototype packet header here because we don't have
 * enough info to do it at this point.
 */

PRIVATE void rpc__dg_xmitq_init
(
    rpc_dg_xmitq_p_t xq
)
{
    /*
     * Presumably the call is either locked or "private" at this point
     * RPC_DG_CALL_LOCK_ASSERT(call);
     */

    rpc__dg_xmitq_reinit(xq);

    xq->max_rcv_tsdu                = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
    xq->max_snd_tsdu                = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
    xq->max_frag_size               = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
    xq->snd_frag_size               = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
    xq->max_blast_size              = RPC_C_DG_INITIAL_MAX_BLAST_SIZE;
    xq->xq_timer                    = RPC_C_DG_INITIAL_XQ_TIMER;
    xq->xq_timer_throttle           = 1;
    xq->high_cwindow                = 0;

    /*
     * Initialize some highly constant fields (RPC protocol version and
     * local NDR drep) in the xmitq's prototype packet header.
     */

    RPC_DG_HDR_SET_VERS(&xq->hdr);
    RPC_DG_HDR_SET_DREP(&xq->hdr);

    xq->hdr.auth_proto = 0;
}

/*
 * R P C _ _ D G _ X M I T Q _ R E I N I T
 *
 * Reinitialize a transmit queue (rpc_dg_xmit_q_t).  Note that we DON'T
 * fill in the prototype packet header here because we don't have enough
 * info to do it at this point.  Calling this routine will retain some
 * state between calls; see xmitq_init for which state is retained.
 */

PRIVATE void rpc__dg_xmitq_reinit
(
    rpc_dg_xmitq_p_t xq
)
{
    /*
     * Presumably the call is either locked or "private" at this point
     * RPC_DG_CALL_LOCK_ASSERT(call);
     */

    xq->head = xq->first_unsent = xq->tail = xq->part_xqe = xq->rexmitq = NULL;

    xq->next_fragnum                = 0;
    xq->next_serial_num             = 0;
    xq->last_fack_serial            = -1;
    xq->cwindow_size                = 0;
    xq->window_size                 = RPC_C_DG_INITIAL_WINDOW_SIZE;
    xq->blast_size                  = 0;
    xq->freqs_out                   = 0;
    xq->push                        = false;
    xq->awaiting_ack                = false;
    xq->rexmit_timeout              = RPC_C_DG_INITIAL_REXMIT_TIMEOUT;
    /*
     * Temporarily, we turn off the first fack wait logic for recovering
     * the small in/out performance. We'll revisit it later.
     */
    xq->first_fack_seen             = true;
}

/*
 * R P C _ _ D G _ X M I T Q _ F R E E
 *
 * Frees data referenced by a transmit queue (rpc_dg_xmit_q_t).  The
 * transmit queue itself is NOT freed, since it's (assumed to be) part
 * of a larger structure.  Clearly this means any sort of xmitq related
 * ack must have arrived.
 */

PRIVATE void rpc__dg_xmitq_free
(
    rpc_dg_xmitq_p_t xq,
    rpc_dg_call_p_t call
)
{
    /*
     * presumably the call is either locked or 'private' at this point
     * RPC_DG_CALL_LOCK_ASSERT(call);
     */

    RPC_DG_XMITQ_AWAITING_ACK_CLR(xq);

    while (xq->head != NULL)
    {
        rpc_dg_xmitq_elt_p_t xqe = xq->head;

        xq->head = xqe->next;
        rpc__dg_pkt_free_xqe(xqe, call);
    }
    xq->first_unsent = xq->tail = xq->rexmitq = NULL;

    /*
     * Clear any previously set blast_size.
     */

    xq->blast_size = 0;
}

/*
 * R P C _ _ D G _ X M I T Q _ A P P E N D _ P P
 *
 * Append a transmit queue's partial packet to the transmit queue itself.
 */

PRIVATE void rpc__dg_xmitq_append_pp
(
        rpc_dg_call_p_t call,
        unsigned32 *st
)
{
    rpc_dg_xmitq_p_t xq = &call->xq;
    rpc_dg_xmitq_elt_p_t xqe = xq->part_xqe;
    rpc_key_info_p_t key_info = call->key_info;
    int ptype;
    unsigned32 frag_length = 0; /* # bytes in the body of the fragment */
    rpc_dg_xmitq_elt_p_t last_xqe = xqe;

    *st = rpc_s_ok;

    RPC_DG_CALL_LOCK_ASSERT(call);

    if (xqe == NULL)
        return;

    /*
     * Compute the fragment length and store it.
     */

    frag_length = last_xqe->body_len;

    while (last_xqe->more_data != NULL)
    {
        frag_length += last_xqe->more_data->body_len;
        last_xqe = last_xqe->more_data;
    }

    xqe->frag_len = frag_length;

    xqe->next       = NULL;
    xqe->fragnum    = xq->next_fragnum++;
    xqe->flags      = 0;

    /*
     * Add the partial xqe to the queue.
     */

    if (xq->head == NULL)
    {
        xq->head = xq->tail = xq->first_unsent = xqe;
    }
    else
    {
        xq->tail->next = xqe;
        xq->tail = xqe;
        if (xq->first_unsent == NULL)
            xq->first_unsent = xqe;
    }

    /*
     * "Normal" idempotent operations with *large ins* get tagged as
     * non-idempotent calls.  Old (1.5.1) servers (and 2.0 as well) can
     * dispose of their response packet in the case of idempotent calls
     * with "small outs".  V2.0 clients dispose of acknowledged input
     * stream frags hence, they are unable to rerun an idempotent call
     * with large ins in the event that the server response is lost.
     *
     * The condition below is triggered for fragnum 0 only if we aren't
     * 'pushing' the queue; ie. there are more packets to follow so we
     * can assume the call has large INS.
     */

    if (xqe->fragnum == 0 && ! xq->push)
    {
        if (RPC_DG_HDR_INQ_PTYPE(&xq->hdr) == RPC_C_DG_PT_REQUEST
            && ! RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_MAYBE))
        {
            xq->base_flags &= ~RPC_C_DG_PF_IDEMPOTENT;
        }
    }

    /*
     * Set the "frag" bit appropriately.  The only time we don't set
     * this bit is when the first packet is being pushed from xmitq_push.
     * If a packet is appended at any time before we begin "pushing,"
     * we can be sure that the xmit is fragmented.  Once the flag is
     * set, it never gets unset, so no need to worry that the last frag
     * will reset the flag.
     */

    if (! xq->push)
    {
        xq->base_flags |= RPC_C_DG_PF_FRAG;
    }

    xq->part_xqe = NULL;

    /*
     * Only encrypt data packets -- setting key_info to NULL
     * disables encrypt machinery for this packet.
     */

    ptype = RPC_DG_HDR_INQ_PTYPE(&xq->hdr);
    if (!RPC_DG_PT_IS_DATA(ptype))
        key_info = NULL;

    if (key_info != NULL)
    {
        rpc_dg_auth_epv_p_t auth_epv = call->auth_epv;
        unsigned32 blocksize = auth_epv->blocksize;

        /*
         * If the packet length isn't a multiple of the encryption block
         * size, round it up now.
         */
        frag_length = (((frag_length) + blocksize - 1)
                       / blocksize) * blocksize;
        last_xqe->body_len += (frag_length - xqe->frag_len);
        xqe->frag_len = frag_length;

        assert(RPC_C_DG_RAW_PKT_HDR_SIZE + frag_length + auth_epv->overhead <= xq->snd_frag_size);

        if (last_xqe->body_len + auth_epv->overhead
            > RPC_C_DG_MAX_PKT_BODY_SIZE)
        {
            /*
             * This can happen if the fragment gets pushed.
             */
            last_xqe->more_data = rpc__dg_pkt_alloc_xqe(call, st);
            if (*st != rpc_s_ok)
                return;
        }
        (*auth_epv->encrypt) (key_info, xqe, st);
        if (*st != rpc_s_ok)
            return;
    }
}

/*
 * R P C _ _ D G _ X M I T Q _ R E S T A R T
 *
 * Timeout everything on the transmit queue.  This routine is used when
 * we infer that the receiver has lost some of the data previously sent
 * it.  Situations in which we are forced to *infer* data loss include
 * receiving a no-call in response to a ping, and receiving a ping while
 * we are in the xmit of final states.  In such situations we recover
 * by beginning to send the xmitq again, and hoping the flow control
 * login will kick in.
 */

PRIVATE void rpc__dg_xmitq_restart
(
    rpc_dg_call_p_t call
)
{
    rpc_dg_xmitq_p_t xq = &call->xq;
    rpc_dg_xmitq_elt_p_t xqe, tail = NULL;
    unsigned32 rexmit_cnt = 0;

    /*
     * If the xmitq has already been set up to do a transmission, leave
     * it alone.   Since 'restarting' the queue is a meat-axe approach
     * to error recovery, we'll defer to any 'normal' processing that
     * might also be trying to handle the xmitq.
     */

    if (RPC_DG_CALL_READY_TO_SEND(call))
    {
        RPC_DG_START_XMIT(call);
        return;
    }

    for (xqe = xq->head; xqe != NULL && xqe != xq->first_unsent;
         xqe = xqe->next)
    {
        rexmit_cnt++;

        /*
         * If the packets is counted in the current congestion window,
         * remove it.  Also check to see if the packet counted as one
         * of our outstanding fack requests.
         */

        if (xqe->in_cwindow)
        {
            xqe->in_cwindow = false;
            xq->cwindow_size--;
            if (! RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_NO_FACK) ||
                RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_LAST_FRAG))
            {
                xq->freqs_out--;
            }
        }
        if (rexmit_cnt == 1)
            xq->rexmitq = xqe;
        else
            tail->next_rexmit = xqe;

        xqe->next_rexmit = NULL;
        tail = xqe;
    }

    /*
     * If we didn't find any packets to retransmit, then let's send
     * the first unsent packet.
     */

    if (rexmit_cnt == 0 && xq->first_unsent != NULL)
    {
        rexmit_cnt = 1;
    }

    xq->blast_size = MIN(rexmit_cnt, RPC_C_DG_INITIAL_BLAST_SIZE);

    if (RPC_DG_CALL_READY_TO_SEND(call))
	 {
        RPC_DG_START_XMIT(call);
	 }
}