#include "netmanager.h"
#include "protocol.h"
#include "transfer.h"
#include "netconnection.h"
#include "neterror.h"
namespace Security {
namespace Network {
Manager::Manager() : mActiveTransfers(0), mRetainConnections(true), mObserver(NULL)
{
}
Manager::~Manager()
{
}
void Manager::add(Transfer *xfer)
{
assert(xfer->state() == Transfer::cold);
mTransfers.insert(xfer);
xfer->mState = Transfer::warm;
}
void Manager::remove(Transfer *xfer)
{
assert(mTransfers.find(xfer) != mTransfers.end()); assert(xfer->state() != Transfer::active);
mTransfers.erase(xfer);
}
void Manager::start(Transfer *xfer)
{
assert(mTransfers.find(xfer) != mTransfers.end()); assert(xfer->state() == Transfer::warm);
try {
xfer->start();
xfer->mState = Transfer::active;
xfer->observe(Observer::transferStarting);
mActiveTransfers++;
secdebug("netmanager", "%ld active transfers", mActiveTransfers);
} catch (...) {
xfer->mState = Transfer::failed;
secdebug("netmanager", "Transfer %p failed to start", xfer);
throw;
}
}
void Manager::abort(Transfer *xfer)
{
assert(mTransfers.find(xfer) != mTransfers.end()); switch (xfer->state()) {
case Transfer::active:
try {
secdebug("netmanager", "xfer %p request abort", xfer);
xfer->abort();
} catch (...) {
secdebug("netmanager", "xfer %p failed to abort; forcing the issue", xfer);
xfer->Transfer::abort();
}
break;
case Transfer::finished:
case Transfer::failed:
secdebug("netmanager", "xfer %p abort ignored (already done)", xfer);
break;
default:
assert(false); }
}
void Manager::done(Transfer *xfer)
{
assert(mTransfers.find(xfer) != mTransfers.end()); assert(xfer->state() == Transfer::finished || xfer->state() == Transfer::failed);
assert(mActiveTransfers > 0);
mActiveTransfers--;
secdebug("netmanager", "%ld active transfers", mActiveTransfers);
}
void Manager::addIO(TransferEngine::Client *client)
{
mEngine.add(client);
}
void Manager::removeIO(TransferEngine::Client *client)
{
mEngine.remove(client);
}
void Manager::retainConnection(Connection *connection)
{
if (mRetainConnections)
mConnections.retain(connection);
else
closeConnection(connection);
}
void Manager::closeConnection(Connection *connection)
{
mConnections.remove(connection);
mMorgue.insert(connection);
}
Connection *Manager::pickConnection(const HostTarget &host)
{
while (Connection *connection = mConnections.get(host)) {
if (connection->validate()) {
connection->restarting(true); return connection; }
secdebug("netmanager", "%p connection %p failed to validate",
this, connection);
}
return NULL; }
void Manager::reuseConnections(bool retain)
{
mRetainConnections = retain;
}
void Manager::flushConnections()
{
mConnections.purge();
}
void Manager::setTimer(Timer *timer, Time::Absolute when)
{
mTimers.schedule(timer, when);
}
void Manager::clearTimer(Timer *timer)
{
if (timer->scheduled())
mTimers.unschedule(timer);
}
void Manager::runTimers()
{
while (Timer *top = static_cast<Timer *>(mTimers.pop(Time::now()))) {
secdebug("netmanager", "%p timer %p executing at %.3f",
this, top, Time::now().internalForm());
try {
top->action();
secdebug("machsrvtime", "%p timer %p done", this, top);
} catch (...) {
secdebug("machsrvtime",
"%p server timer %p failed with exception", this, top);
}
}
}
void Manager::step()
{
prepare();
if (!mEngine.isEmpty()) {
secdebug("mgrstep", "operations step");
mEngine();
}
}
void Manager::run(Time::Absolute stopTime)
{
secdebug("netmanager",
"starting run with %ld active transfers", mActiveTransfers);
while (mActiveTransfers > 0) {
prepare();
Time::Absolute limit = mTimers.empty() ? stopTime : min(stopTime, mTimers.next());
mEngine(limit - Time::now());
if (Time::now() > stopTime)
break;
}
secdebug("netmanager", "ending run");
}
void Manager::run()
{
run(Time::heatDeath());
}
void Manager::prepare()
{
if (!mMorgue.empty()) {
secdebug("netmanager",
"clearing morgue of %ld connections", mMorgue.size());
for (set<Connection *>::iterator it = mMorgue.begin(); it != mMorgue.end(); it++)
delete *it;
mMorgue.erase(mMorgue.begin(), mMorgue.end());
}
runTimers();
}
} }