#include <IOKit/assert.h>
#include <IOKit/IOWorkLoop.h>
#include <IOKit/network/IOOutputQueue.h>
#include <IOKit/network/IOBasicOutputQueue.h>
#include <IOKit/network/IOGatedOutputQueue.h>
#include <IOKit/network/IONetworkStats.h>
#include <IOKit/network/IONetworkController.h>
#include "IOMbufQueue.h"
#include <libkern/OSAtomic.h>
#define STATE_IS(bits) (_state == (bits))
#define STATE_HAS(bits) ((_state & (bits)) == (bits))
#define STATE_SET(bits) (_state |= (bits))
#define STATE_CLR(bits) (_state &= ~(bits))
#undef super
#define super OSObject
OSDefineMetaClassAndAbstractStructors( IOOutputQueue, OSObject )
OSMetaClassDefineReservedUnused( IOOutputQueue, 1);
OSMetaClassDefineReservedUnused( IOOutputQueue, 2);
OSMetaClassDefineReservedUnused( IOOutputQueue, 3);
OSMetaClassDefineReservedUnused( IOOutputQueue, 4);
OSMetaClassDefineReservedUnused( IOOutputQueue, 5);
OSMetaClassDefineReservedUnused( IOOutputQueue, 6);
OSMetaClassDefineReservedUnused( IOOutputQueue, 7);
OSMetaClassDefineReservedUnused( IOOutputQueue, 8);
OSMetaClassDefineReservedUnused( IOOutputQueue, 9);
OSMetaClassDefineReservedUnused( IOOutputQueue, 10);
OSMetaClassDefineReservedUnused( IOOutputQueue, 11);
OSMetaClassDefineReservedUnused( IOOutputQueue, 12);
OSMetaClassDefineReservedUnused( IOOutputQueue, 13);
OSMetaClassDefineReservedUnused( IOOutputQueue, 14);
OSMetaClassDefineReservedUnused( IOOutputQueue, 15);
bool IOOutputQueue::init()
{
if (super::init() == false)
return false;
_callEntry = thread_call_allocate((thread_call_func_t) &runServiceThread,
(void *) this);
if (_callEntry == 0)
return false;
return true;
}
void IOOutputQueue::free()
{
if (_callEntry)
{
cancelServiceThread();
thread_call_free(_callEntry);
_callEntry = 0;
}
super::free();
}
bool IOOutputQueue::scheduleServiceThread(void * param)
{
return thread_call_enter1(_callEntry, (thread_call_param_t) param);
}
bool IOOutputQueue::cancelServiceThread()
{
if (_callEntry == 0)
return false;
else
return thread_call_cancel(_callEntry);
}
void
IOOutputQueue::runServiceThread(thread_call_param_t param0,
thread_call_param_t param1)
{
assert(param0);
((IOOutputQueue *) param0)->serviceThread(param1);
}
void IOOutputQueue::serviceThread(void * param)
{
}
IOOutputAction IOOutputQueue::getOutputHandler() const
{
return (IOOutputAction) &IOOutputQueue::enqueue;
}
IONetworkData * IOOutputQueue::getStatisticsData() const
{
return 0;
}
OSMetaClassDefineReservedUsed( IOOutputQueue, 0);
UInt32 IOOutputQueue::getMbufPriority(mbuf_t m)
{
return 0;
}
#undef super
#define super IOOutputQueue
OSDefineMetaClassAndStructors( IOBasicOutputQueue, IOOutputQueue )
#define QUEUE_LOCK IOLockLock(_queueLock)
#define QUEUE_UNLOCK IOLockUnlock(_queueLock)
#define kIOOutputQueueSignature ((void *) 0xfacefeed)
IOReturn
IOBasicOutputQueue::dispatchNetworkDataNotification(void * target,
void * param,
IONetworkData * data,
UInt32 type)
{
IOBasicOutputQueue * self = (IOBasicOutputQueue *) target;
return self->handleNetworkDataAccess(data, type, param);
}
bool IOBasicOutputQueue::init(OSObject * target,
IOOutputAction action,
UInt32 capacity,
UInt32 priorities)
{
if (super::init() == false)
return false;
if ((target == 0) || (action == 0) || (priorities == 0) || (priorities > 256))
return false;
_target = target;
_action = action;
_statsData = IONetworkData::withInternalBuffer(
kIOOutputQueueStatsKey,
sizeof(IOOutputQueueStats),
kIONetworkDataBasicAccessTypes,
this,
(IONetworkData::Action)
&IOBasicOutputQueue::dispatchNetworkDataNotification,
kIOOutputQueueSignature);
if (_statsData == 0)
return false;
_stats = (IOOutputQueueStats *) _statsData->getBuffer();
assert(_stats);
_stats->capacity = capacity;
_priorities = priorities;
_primaryQueues = IONew(IOMbufQueue, priorities);
_shadowQueues = IONew(IOMbufQueue, priorities);
if ( (_primaryQueues == 0) || (_shadowQueues == 0) )
return false;
for(UInt32 i = 0; i < priorities; i++)
{
IOMbufQueueInit(&(_primaryQueues[i]), capacity);
IOMbufQueueInit(&(_shadowQueues[i]), capacity);
}
_inQueues = _primaryQueues;
_queueLock = IOLockAlloc();
if (_queueLock == 0)
return false;
return true;
}
IOBasicOutputQueue *
IOBasicOutputQueue::withTarget(IONetworkController * target,
UInt32 capacity)
{
return IOBasicOutputQueue::withTarget(target, capacity, 1 );
}
IOBasicOutputQueue *
IOBasicOutputQueue::withTarget(IONetworkController * target,
UInt32 capacity,
UInt32 priorities)
{
IOBasicOutputQueue * queue = new IOBasicOutputQueue;
if (queue && !queue->init(target, target->getOutputHandler(), capacity, priorities))
{
queue->release();
queue = 0;
}
return queue;
}
IOBasicOutputQueue *
IOBasicOutputQueue::withTarget(OSObject * target,
IOOutputAction action,
UInt32 capacity)
{
return IOBasicOutputQueue::withTarget(target, action, capacity, 1 );
}
IOBasicOutputQueue *
IOBasicOutputQueue::withTarget(OSObject * target,
IOOutputAction action,
UInt32 capacity,
UInt32 priorities)
{
IOBasicOutputQueue * queue = new IOBasicOutputQueue;
if (queue && !queue->init(target, action, capacity, priorities))
{
queue->release();
queue = 0;
}
return queue;
}
void IOBasicOutputQueue::free()
{
cancelServiceThread();
if (_queueLock)
{
flush();
IOLockFree(_queueLock);
_queueLock = 0;
}
if(_primaryQueues) IODelete(_primaryQueues, IOMbufQueue, _priorities);
if(_shadowQueues) IODelete(_shadowQueues, IOMbufQueue, _priorities);
_primaryQueues = _shadowQueues = 0;
if (_statsData)
{
_statsData->release();
_statsData = 0;
}
super::free();
}
void IOBasicOutputQueue::serviceThread(void * param)
{
QUEUE_LOCK;
STATE_CLR((uintptr_t) param);
STATE_SET(kStateOutputActive);
dequeue();
QUEUE_UNLOCK;
}
UInt32 IOBasicOutputQueue::enqueue(mbuf_t m, void * param)
{
bool success;
UInt32 priority = getMbufPriority(m);
if ( priority >= _priorities )
{
priority = _priorities - 1;
}
QUEUE_LOCK;
success = IOMbufQueueEnqueue(&(_inQueues[priority]), m);
if ( STATE_IS( kStateRunning ) )
{
STATE_SET( kStateOutputActive );
dequeue();
}
QUEUE_UNLOCK;
if (success == false)
{
OSAddAtomic( IOMbufFree(m),
(SInt32 *) &_stats->dropCount );
}
return 0;
}
void IOBasicOutputQueue::dequeue()
{
IOMbufQueue * outQueues = _primaryQueues;
UInt32 newState = 0;
UInt32 myServiceCount;
_inQueues = _shadowQueues;
UInt32 priority = 0;
while ( STATE_IS( kStateRunning | kStateOutputActive ) &&
priority < _priorities )
{
if (IOMbufQueueGetSize(&(outQueues[priority])) > 0)
{
myServiceCount = _serviceCount;
QUEUE_UNLOCK;
output( &(outQueues[priority]), &newState );
QUEUE_LOCK;
if ( newState )
{
if ( myServiceCount != _serviceCount )
newState &= ~kStateOutputStalled;
STATE_SET( newState );
}
int newPriority = -1;
for (UInt32 i = 0; i < _priorities; i++)
{
IOMbufQueueEnqueue( &(outQueues[i]), &(_inQueues[i]));
if ((newPriority < 0) && (i <= priority) &&
(IOMbufQueueGetSize(&(outQueues[i])) > 0))
{
newPriority = i;
}
}
if (newPriority >= 0)
{
priority = newPriority;
continue;
}
}
priority++;
}
_inQueues = _primaryQueues;
STATE_CLR( kStateOutputActive );
if ( newState & kStateOutputServiceMask )
{
scheduleServiceThread(
(void *)(uintptr_t) (newState & kStateOutputServiceMask));
}
if (_waitDequeueDone)
{
_waitDequeueDone = false;
thread_wakeup((void *) &_waitDequeueDone);
}
}
void IOBasicOutputQueue::output(IOMbufQueue * queue, UInt32 * state)
{
mbuf_t pkt;
UInt32 status;
do {
pkt = IOMbufQueueDequeue(queue);
assert(pkt);
status = (_target->*_action)( pkt, 0 );
if ( status == ( kIOOutputStatusAccepted | kIOOutputCommandNone ) )
{
_stats->outputCount++;
}
else
{
switch (status & kIOOutputStatusMask)
{
default:
case kIOOutputStatusAccepted:
_stats->outputCount++;
break;
case kIOOutputStatusRetry:
IOMbufQueuePrepend(queue, pkt);
_stats->retryCount++;
break;
}
switch (status & kIOOutputCommandMask)
{
case kIOOutputCommandStall:
*state = kStateOutputStalled;
_stats->stallCount++;
break;
default:
break;
}
}
}
while ( IOMbufQueueGetSize(queue) && (*state == 0) );
}
bool IOBasicOutputQueue::start()
{
QUEUE_LOCK;
STATE_SET( kStateRunning );
STATE_CLR( kStateOutputStalled );
_serviceCount++;
if ( STATE_IS( kStateRunning ) )
{
STATE_SET( kStateOutputActive );
dequeue();
}
QUEUE_UNLOCK;
return true;
}
bool IOBasicOutputQueue::stop()
{
bool wasRunning;
QUEUE_LOCK;
wasRunning = STATE_HAS( kStateRunning );
STATE_CLR( kStateRunning );
if ( STATE_HAS( kStateOutputActive ) )
{
_waitDequeueDone = true;
assert_wait((void *) &_waitDequeueDone, false);
}
QUEUE_UNLOCK;
thread_block((void (*)(void*, int)) 0);
return wasRunning;
}
bool IOBasicOutputQueue::service(IOOptionBits options)
{
bool doDequeue = false;
bool async = (options & kServiceAsync);
UInt32 oldState;
QUEUE_LOCK;
oldState = _state;
STATE_CLR( kStateOutputStalled );
_serviceCount++;
bool workToDo = false;
for(UInt32 i = 0; i < _priorities; i++)
{
if(IOMbufQueueGetSize(&(_primaryQueues[i])) > 0)
{
workToDo = true;
break;
}
}
if ( ( oldState & kStateOutputStalled ) &&
STATE_IS( kStateRunning ) &&
workToDo )
{
doDequeue = true;
STATE_SET( kStateOutputActive );
if (async == false) dequeue();
}
QUEUE_UNLOCK;
if ( doDequeue && async )
{
scheduleServiceThread();
}
return doDequeue;
}
UInt32 IOBasicOutputQueue::flush()
{
UInt32 flushCount;
mbuf_t m;
flushCount = 0;
for(UInt32 i = 0; i < _priorities; i++)
{
QUEUE_LOCK;
m = IOMbufQueueDequeueAll( &(_inQueues[i]) );
QUEUE_UNLOCK;
flushCount = IOMbufFree(m);
OSAddAtomic(flushCount, (SInt32 *) &_stats->dropCount);
}
return flushCount;
}
bool IOBasicOutputQueue::setCapacity(UInt32 capacity)
{
QUEUE_LOCK;
for(UInt32 i = 0; i < _priorities; i++)
{
IOMbufQueueSetCapacity(&(_primaryQueues[i]), capacity);
IOMbufQueueSetCapacity(&(_shadowQueues[i]), capacity);
}
_stats->capacity = capacity * _priorities;
QUEUE_UNLOCK;
return true;
}
UInt32 IOBasicOutputQueue::getCapacity() const
{
return _stats->capacity;
}
UInt32 IOBasicOutputQueue::getSize() const
{
UInt32 total = 0;
for(UInt32 i = 0; i < _priorities; i++)
{
total += IOMbufQueueGetSize(&(_primaryQueues[i]));
}
return total;
}
UInt32 IOBasicOutputQueue::getDropCount()
{
return _stats->dropCount;
}
UInt32 IOBasicOutputQueue::getOutputCount()
{
return _stats->outputCount;
}
UInt32 IOBasicOutputQueue::getRetryCount()
{
return _stats->retryCount;
}
UInt32 IOBasicOutputQueue::getStallCount()
{
return _stats->stallCount;
}
UInt32 IOBasicOutputQueue::getState() const
{
return _state;
}
IOReturn
IOBasicOutputQueue::handleNetworkDataAccess(IONetworkData * data,
UInt32 accessType,
void * arg)
{
IOReturn ret = kIOReturnSuccess;
assert(data && (arg == kIOOutputQueueSignature));
switch (accessType)
{
case kIONetworkDataAccessTypeRead:
case kIONetworkDataAccessTypeSerialize:
{
UInt32 size;
QUEUE_LOCK;
size = getSize(); for(UInt32 i = 0; i < _priorities; i++)
{
size += IOMbufQueueGetSize(&(_shadowQueues[i]));
}
QUEUE_UNLOCK;
_stats->size = size;
break;
}
default:
ret = kIOReturnNotWritable;
break;
}
return ret;
}
IONetworkData * IOBasicOutputQueue::getStatisticsData() const
{
return _statsData;
}
#undef super
#define super IOBasicOutputQueue
OSDefineMetaClassAndStructors( IOGatedOutputQueue, IOBasicOutputQueue )
bool IOGatedOutputQueue::init(OSObject * target,
IOOutputAction action,
IOWorkLoop * workloop,
UInt32 capacity,
UInt32 priorities)
{
if (super::init(target, action, capacity, priorities) == false)
return false;
if (OSDynamicCast(IOWorkLoop, workloop) == 0)
return false;
_gate = IOCommandGate::commandGate(this);
if (!_gate || (workloop->addEventSource(_gate) != kIOReturnSuccess))
return false;
_interruptSrc = IOInterruptEventSource::interruptEventSource(
this,
(IOInterruptEventSource::Action) restartDeferredOutput
);
if ( !_interruptSrc ||
(workloop->addEventSource(_interruptSrc) != kIOReturnSuccess) )
return false;
return true;
}
IOGatedOutputQueue *
IOGatedOutputQueue::withTarget(IONetworkController * target,
IOWorkLoop * workloop,
UInt32 capacity)
{
return IOGatedOutputQueue::withTarget(target, workloop, capacity, 1 );
}
IOGatedOutputQueue *
IOGatedOutputQueue::withTarget(IONetworkController * target,
IOWorkLoop * workloop,
UInt32 capacity,
UInt32 priorities)
{
IOGatedOutputQueue * queue = new IOGatedOutputQueue;
if (queue && !queue->init(target, target->getOutputHandler(), workloop,
capacity, priorities))
{
queue->release();
queue = 0;
}
return queue;
}
IOGatedOutputQueue *
IOGatedOutputQueue::withTarget(OSObject * target,
IOOutputAction action,
IOWorkLoop * workloop,
UInt32 capacity)
{
return IOGatedOutputQueue::withTarget(target, action, workloop, capacity, 1 );
}
IOGatedOutputQueue *
IOGatedOutputQueue::withTarget(OSObject * target,
IOOutputAction action,
IOWorkLoop * workloop,
UInt32 capacity,
UInt32 priorities)
{
IOGatedOutputQueue * queue = new IOGatedOutputQueue;
if (queue && !queue->init(target, action, workloop, capacity, priorities))
{
queue->release();
queue = 0;
}
return queue;
}
void IOGatedOutputQueue::free()
{
cancelServiceThread();
if (_gate)
{
IOWorkLoop *wl = _gate->getWorkLoop();
if(wl) wl->removeEventSource(_gate);
_gate->release();
_gate = 0;
}
if (_interruptSrc)
{
IOWorkLoop * wl = _interruptSrc->getWorkLoop();
if (wl) wl->removeEventSource(_interruptSrc);
_interruptSrc->release();
_interruptSrc = 0;
}
super::free();
}
void IOGatedOutputQueue::gatedOutput(OSObject * ,
IOGatedOutputQueue * self,
IOMbufQueue * queue,
UInt32 * state)
{
mbuf_t pkt;
UInt32 status;
do {
pkt = IOMbufQueueDequeue(queue);
assert(pkt);
status = ((self->_target)->*(self->_action))( pkt, 0 );
if ( status == ( kIOOutputStatusAccepted | kIOOutputCommandNone ) )
{
self->_stats->outputCount++;
}
else
{
switch (status & kIOOutputStatusMask)
{
default:
case kIOOutputStatusAccepted:
self->_stats->outputCount++;
break;
case kIOOutputStatusRetry:
IOMbufQueuePrepend(queue, pkt);
self->_stats->retryCount++;
break;
}
switch (status & kIOOutputCommandMask)
{
case kIOOutputCommandStall:
*state = kStateOutputStalled;
self->_stats->stallCount++;
break;
default:
break;
}
}
}
while ( IOMbufQueueGetSize(queue) && (*state == 0) );
}
enum {
kStateOutputDeferred = 0x100
};
void IOGatedOutputQueue::output(IOMbufQueue * queue, UInt32 * state)
{
if ( _gate->attemptAction((IOCommandGate::Action)
&IOGatedOutputQueue::gatedOutput,
(void *) this,
(void *) queue,
(void *) state) == kIOReturnCannotLock )
{
*state = kStateOutputDeferred;
}
}
bool IOGatedOutputQueue::scheduleServiceThread(void * param)
{
if ( ((uintptr_t) param) & kStateOutputDeferred )
{
_interruptSrc->interruptOccurred(0, 0, 0);
return true;
}
else
{
return super::scheduleServiceThread(param);
}
}
void IOGatedOutputQueue::restartDeferredOutput(
OSObject * owner,
IOInterruptEventSource * sender,
int count)
{
IOGatedOutputQueue * self = (IOGatedOutputQueue *) owner;
self->serviceThread((void *) kStateOutputDeferred);
}