#include "xfercore.h"
#include <Security/debugging.h>
namespace Security {
namespace Network {
TransferEngine::Client::Client()
: mMode(invalidInput), mAutoCopyOut(false),
mSink(NULL), mSource(NULL),
mAutoFlush(true),
mReadBuffer(16384), mWriteBuffer(16384)
{
}
TransferEngine::Client::~Client()
{
}
void TransferEngine::add(Client *client)
{
client->io = client->fileDesc(); Selector::add(client->io, *client, input | critical); }
void TransferEngine::remove(Client *client)
{
#ifndef NDEBUG
if (!client->mReadBuffer.isEmpty())
debug("xferengine", "xfer %p(%d) HAD %ld BYTES READ LEFT",
client, client->fileDesc(), client->mReadBuffer.length());
if (!client->mWriteBuffer.isEmpty())
debug("xferengine", "xfer %p(%d) HAD %ld BYTES WRITE LEFT",
client, client->fileDesc(), client->mWriteBuffer.length());
#endif //NDEBUG
if (client->io.fd () != -1) { Selector::remove(client->io);
}
client->io = FileDesc(); }
void TransferEngine::Client::mode(InputMode newMode)
{
debug("xferengine", "xfer %p(%d) switching to mode %d", this, fileDesc(), newMode);
switch (newMode) {
case rawInput:
case lineInput:
mMode = newMode;
break;
case connecting:
enable(output);
mMode = connecting;
break;
default:
assert(false); }
}
void TransferEngine::Client::mode(Sink &sink, size_t byteCount)
{
mMode = autoReadInput;
mSink = &sink;
mResidualReadCount = byteCount;
debug("xferengine", "xfer %p(%d) switching to autoReadInput (%ld bytes)",
this, fileDesc(), byteCount);
}
void TransferEngine::Client::mode(Source &source, size_t byteCount)
{
assert (!mAutoCopyOut); mAutoCopyOut = true;
mSource = &source;
mResidualWriteCount = byteCount;
debug("xferengine", "xfer %p(%d) enabling autoCopyOut mode (%ld bytes)",
this, fileDesc(), byteCount);
enable(output);
}
void TransferEngine::Client::printf(const char *format, ...)
{
va_list args;
va_start(args, format);
vprintf(format, args);
va_end(args);
}
void TransferEngine::Client::vprintf(const char *format, va_list args)
{
mWriteBuffer.vprintf(format, args);
#if !defined(NDEBUG)
char buffer[1024];
vsnprintf(buffer, sizeof(buffer), format, args);
debug("engineio", "%p(%d) <-- %s", this, fileDesc(), buffer);
#endif //NDEBUG
startOutput();
}
void TransferEngine::Client::printfe(const char *format, ...)
{
va_list args;
va_start(args, format);
vprintfe(format, args);
va_end(args);
}
void TransferEngine::Client::vprintfe(const char *format, va_list args)
{
mWriteBuffer.vprintf(format, args);
mWriteBuffer.printf("\r\n");
#if !defined(NDEBUG)
char buffer[1024];
vsnprintf(buffer, sizeof(buffer), format, args);
debug("engineio", "%p(%d) <-- %s[CRNL]", this, fileDesc(), buffer);
#endif //NDEBUG
startOutput();
}
void TransferEngine::Client::flushOutput(bool autoFlush)
{
mAutoFlush = autoFlush;
debug("engineio", "%p(%d) output flush %s", this, fileDesc(), autoFlush? "on" : "off");
if (mAutoFlush)
startOutput();
}
void TransferEngine::Client::startOutput()
{
if (mAutoFlush) {
if (mAutoCopyOut && !mWriteBuffer.isFull())
autoCopy(); if (!mWriteBuffer.isEmpty()) {
mWriteBuffer.write(*this);
if (mAutoFlush || !mWriteBuffer.isEmpty()) { enable(output); } else {
disable(output); }
}
}
}
void TransferEngine::Client::flushInput()
{
if (!mReadBuffer.isEmpty()) {
debug("engineio", "flushing %ld bytes of input", mReadBuffer.length());
mReadBuffer.clear();
mInputFlushed = true; }
}
size_t TransferEngine::Client::autoCopy()
{
size_t len = mWriteBuffer.available(); if (mResidualWriteCount && mResidualWriteCount < len)
len = mResidualWriteCount;
void *addr; mWriteBuffer.locatePut(addr, len);
mSource->produce(addr, len);
debug("xferengine", "xfer %p(%d) autoCopyOut source delivered %ld bytes",
this, fileDesc(), len);
mWriteBuffer.usePut(len);
return len;
}
void TransferEngine::Client::notify(int fd, Type type)
{
try {
if (type & Selector::output) {
if (mMode == connecting) {
Socket s; s = fd; int error = s.error();
debug("xferengine", "xfer %p(%d) connect (errno %d)",
this, fd, error);
transit(connectionDone, NULL, error);
return;
}
if (mAutoCopyOut && !mWriteBuffer.isFull()) {
if (autoCopy() == 0) {
switch (mSource->state()) {
case Source::stalled:
debug("xferengine", "xfer %p(%d) autoCopyOut source is stalled", this, fd);
break;
case Source::endOfData:
mAutoCopyOut = false; debug("xferengine", "xfer %p(%d) autoCopyOut end of data", this, fd);
if (mResidualWriteCount > 0)
debug("xferengine", "xfer %p(%d) has %ld autoCopy bytes left",
this, fd, mResidualWriteCount);
transit(autoWriteDone);
if (!isActive())
return; break;
default:
assert(false);
}
}
}
if (mWriteBuffer.isEmpty()) { debug("xferengine", "xfer %p(%d) disabling output (empty)", this, fd);
disable(output);
} else { size_t length = mWriteBuffer.write(*this);
debug("xferengine", "xfer %p(%d) writing %ld bytes", this, fd, length);
}
}
if (type & Selector::input) {
IFDEBUG(debug("xferengine", "xfer %p(%d) input ready %d bytes",
this, fd, io.iocget<int>(FIONREAD)));
do {
mInputFlushed = false;
if (!atEnd() && mReadBuffer.read(*this) == 0 && !atEnd()) {
mReadBuffer.read(*this, true);
}
if (mReadBuffer.isEmpty() && atEnd()) {
transit(endOfInput);
break;
}
switch (mMode) {
case rawInput:
rawInputTransit();
break;
case lineInput:
if (!lineInputTransit())
return; break;
case autoReadInput:
autoReadInputTransit();
if (mMode != autoIODone)
break;
case autoIODone:
mMode = invalidInput; transit(autoReadDone); if (!isActive()) return; assert(mMode != invalidInput); break;
case connecting:
{
Socket s; s = fd;
debug("xferengine",
"fd %d input while connecting (errno=%d, type=%d)",
fd, s.error(), type);
UnixError::throwMe(ECONNREFUSED); }
default:
debug("xferengine", "mode error in input sequencer (mode=%d)", mMode);
assert(false);
}
if (!io) flushInput();
} while (!mReadBuffer.isEmpty());
}
} catch (const CssmCommonError &err) {
transitError(err);
} catch (...) {
transitError(UnixError::make(EIO)); }
}
void TransferEngine::Client::rawInputTransit()
{
char *addr; size_t length = mReadBuffer.length();
mReadBuffer.locateGet(addr, length);
IFDEBUG(debug("engineio", "%p(%d) --> %d bytes RAW",
this, fileDesc(), io.iocget<int>(FIONREAD)));
transit(inputAvailable, addr, length);
if (!mInputFlushed)
mReadBuffer.useGet(length);
}
bool TransferEngine::Client::lineInputTransit()
{
char *line; size_t length = mReadBuffer.length();
mReadBuffer.locateGet(line, length);
char *nl;
for (nl = line; nl < line + length && *nl != '\n'; nl++) ;
if (nl == line + length) return false;
if (nl > line && nl[-1] == '\r') { nl[-1] = '\0'; debug("engineio", "%p(%d) --> %s", this, fileDesc(), line);
transit(inputAvailable, line, nl - line - 1);
} else { nl[0] = '\0'; debug("engineio", "%p(%d) [IMPROPER] --> %s", this, fileDesc(), line);
transit(inputAvailable, line, nl - line);
}
if (!mInputFlushed)
mReadBuffer.useGet(nl - line + 1);
return true;
}
void TransferEngine::Client::autoReadInputTransit()
{
debug("xferengine", "xfer %p(%d) %ld pending %d available",
this, fileDesc(), mReadBuffer.length(), io.iocget<int>(FIONREAD));
void *data; size_t length = mReadBuffer.length();
if (mResidualReadCount && mResidualReadCount < length)
length = mResidualReadCount;
mReadBuffer.locateGet(data, length);
debug("engineio", "%p(%d) --> %ld bytes autoReadInput", this, fileDesc(), length);
mSink->consume(data, length);
if (!mInputFlushed)
mReadBuffer.useGet(length);
if (mResidualReadCount && (mResidualReadCount -= length) == 0)
mMode = autoIODone;
}
void TransferEngine::Client::tickle()
{
notify(io, input | critical);
}
size_t TransferEngine::Client::read(void *data, size_t size)
{ return io.read(data, size); }
size_t TransferEngine::Client::write(const void *data, size_t size)
{ return io.write(data, size); }
bool TransferEngine::Client::atEnd() const
{ return io.atEnd(); }
} }