#include "ringBufferIo.h"
#include <strings.h>
#include <stdio.h>
#include <stdlib.h>
#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacErrors.h>
pthread_mutex_t printfMutex = PTHREAD_MUTEX_INITIALIZER;
void ringBufSetup(
RingBuffer *ring,
const char *bufName,
size_t numElements,
size_t bufSize)
{
unsigned dex;
memset(ring, 0, sizeof(*ring));
ring->numElements = numElements;
ring->elements = (RingElement *)malloc(sizeof(RingElement) * numElements);
memset(ring->elements, 0, sizeof(RingElement) * numElements);
for(dex=0; dex<numElements; dex++) {
RingElement *elt = &ring->elements[dex];
elt->buf = (unsigned char *)malloc(bufSize);
elt->capacity = bufSize;
}
ring->writerDex = 0;
ring->readerDex = 0;
ring->bufName = bufName;
}
#define LOG_RING 0
#define LOG_RING_DUMP 0
#if LOG_RING
static void logRingWrite(
RingBuffer *ring,
size_t written,
unsigned dex,
void *from)
{
pthread_mutex_lock(&printfMutex);
printf("+++ wrote %4u bytes to %s buf %2u\n",
(unsigned)written, ring->bufName, dex);
#if LOG_RING_DUMP
{
unsigned i;
unsigned char *cp = (unsigned char *)from;
for(i=0; i<written; i++) {
printf("%02X ", cp[i]);
if((i < (written - 1)) && ((i % 16) == 15)) {
printf("\n");
}
}
printf("\n");
}
#endif
pthread_mutex_unlock(&printfMutex);
}
static void logRingRead(
RingBuffer *ring,
size_t bytesRead,
unsigned dex,
void *to)
{
pthread_mutex_lock(&printfMutex);
printf("--- read %4u bytes from %s buf %2u\n",
(unsigned)bytesRead, ring->bufName, dex);
#if LOG_RING_DUMP
{
unsigned i;
unsigned char *cp = (unsigned char *)to;
for(i=0; i<bytesRead; i++) {
printf("%02X ", cp[i]);
if((i < (bytesRead - 1)) && ((i % 16) == 15)) {
printf("\n");
}
}
printf("\n");
}
#endif
pthread_mutex_unlock(&printfMutex);
}
static void logRingStall(
RingBuffer *ring,
char *readerOrWriter,
unsigned dex)
{
pthread_mutex_lock(&printfMutex);
printf("=== %s stalled on %s buf %u\n",
readerOrWriter, ring->bufName, dex);
pthread_mutex_unlock(&printfMutex);
}
static void logRingClose(
RingBuffer *ring,
char *readerOrWriter)
{
pthread_mutex_lock(&printfMutex);
printf("=== %s CLOSED by %s\n",
ring->bufName, readerOrWriter);
pthread_mutex_unlock(&printfMutex);
}
static void logRingReset(
RingBuffer *ring)
{
pthread_mutex_lock(&printfMutex);
printf("=== %s RESET\n", ring->bufName);
pthread_mutex_unlock(&printfMutex);
}
#else
#define logRingWrite(r, w, d, t)
#define logRingRead(r, b, d, t)
#define logRingStall(r, row, d)
#define logRingClose(r, row)
#define logRingReset(r)
#endif
void ringBufferReset(
RingBuffer *ring)
{
unsigned dex;
for(dex=0; dex<ring->numElements; dex++) {
RingElement *elt = &ring->elements[dex];
elt->validBytes = 0;
elt->readOffset = 0;
}
ring->writerDex = 0;
ring->readerDex = 0;
ring->closed = false;
logRingReset(ring);
}
OSStatus ringReadFunc(
SSLConnectionRef connRef,
void *data,
size_t *dataLen)
{
RingBuffer *ring = ((RingBuffers *)connRef)->rdBuf;
if(ring->writerDex == ring->readerDex) {
if(ring->closed) {
if(ring->writerDex == ring->readerDex) {
*dataLen = 0;
return errSSLClosedAbort;
}
}
else {
*dataLen = 0;
return errSSLWouldBlock;
}
}
unsigned char *outp = (unsigned char *)data;
size_t toMove = *dataLen;
size_t haveMoved = 0;
do {
RingElement *elt = &ring->elements[ring->readerDex];
size_t thisMove = elt->validBytes;
if(thisMove > toMove) {
thisMove = toMove;
}
memmove(outp, elt->buf + elt->readOffset, thisMove);
logRingRead(ring, thisMove, ring->readerDex, outp);
if(thisMove == 0) {
printf("***thisMove 0!\n");
return internalComponentErr;
}
elt->validBytes -= thisMove;
elt->readOffset += thisMove;
toMove -= thisMove;
haveMoved += thisMove;
outp += thisMove;
if(elt->validBytes == 0) {
unsigned nextDex;
elt->readOffset = 0;
nextDex = ring->readerDex + 1;
if(nextDex == ring->numElements) {
nextDex = 0;
}
ring->readerDex = nextDex;
}
if(toMove == 0) {
break;
}
if(ring->readerDex == ring->writerDex) {
logRingStall(ring, "reader ", ring->readerDex);
break;
}
} while(toMove);
OSStatus ortn = noErr;
if(haveMoved != *dataLen) {
if((haveMoved == 0) && ring->closed) {
ortn = errSSLClosedAbort;
}
else {
ortn = errSSLWouldBlock;
}
}
*dataLen = haveMoved;
return ortn;
}
OSStatus ringWriteFunc(
SSLConnectionRef connRef,
const void *data,
size_t *dataLen)
{
RingBuffer *ring = ((RingBuffers *)connRef)->wrtBuf;
unsigned char *inp = (unsigned char *)data;
size_t toMove = *dataLen;
size_t haveMoved = 0;
unsigned nextDex;
OSStatus ortn = noErr;
do {
RingElement *elt = &ring->elements[ring->writerDex];
elt->validBytes = 0;
size_t thisMove = toMove;
if(thisMove > elt->capacity) {
thisMove = elt->capacity;
}
memmove(elt->buf, inp, thisMove);
logRingWrite(ring, thisMove, ring->writerDex, inp);
elt->validBytes = thisMove;
toMove -= thisMove;
haveMoved += thisMove;
inp += thisMove;
nextDex = ring->writerDex + 1;
if(nextDex == ring->numElements) {
nextDex = 0;
}
if(nextDex == ring->readerDex) {
logRingStall(ring, "writer", nextDex);
while(nextDex == ring->readerDex) {
;
}
}
ring->writerDex = nextDex;
if(ring->closed) {
break;
}
} while(toMove);
if(ring->closed && (haveMoved == 0)) {
ortn = errSSLClosedAbort;
}
*dataLen = haveMoved;
return ortn;
}
void ringBuffersClose(
RingBuffers *rbs)
{
if(rbs == NULL) {
return;
}
if(rbs->rdBuf) {
logRingClose(rbs->rdBuf, "reader");
rbs->rdBuf->closed = true;
}
if(rbs->wrtBuf) {
logRingClose(rbs->wrtBuf, "writer");
rbs->wrtBuf->closed = true;
}
}