IODataQueueClient.c [plain text]
#include "IODataQueueClientPrivate.h"
#include <IOKit/IODataQueueShared.h>
#include <mach/message.h>
#include <mach/mach_port.h>
#include <mach/port.h>
#include <mach/mach_init.h>
#include <IOKit/OSMessageNotification.h>
#include <libkern/OSAtomic.h>
static IOReturn _IODataQueueSendDataAvailableNotification(IODataQueueMemory *dataQueue, mach_msg_header_t *msgh);
Boolean IODataQueueDataAvailable(IODataQueueMemory *dataQueue)
{
return (dataQueue && (dataQueue->head != dataQueue->tail));
}
IODataQueueEntry *__IODataQueuePeek(IODataQueueMemory *dataQueue, uint64_t qSize)
{
IODataQueueEntry *entry = 0;
UInt32 headOffset;
UInt32 tailOffset;
if (!dataQueue) {
return NULL;
}
headOffset = __c11_atomic_load((_Atomic UInt32 *)&dataQueue->head, __ATOMIC_RELAXED);
tailOffset = __c11_atomic_load((_Atomic UInt32 *)&dataQueue->tail, __ATOMIC_ACQUIRE);
if (headOffset != tailOffset) {
IODataQueueEntry * head = 0;
UInt32 headSize = 0;
UInt32 queueSize = qSize ? qSize : dataQueue->queueSize;
if (headOffset > queueSize) {
return NULL;
}
head = (IODataQueueEntry *)((char *)dataQueue->queue + headOffset);
headSize = head->size;
if ((headOffset > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
(headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize) ||
(headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > UINT32_MAX - headSize) ||
(headOffset + headSize + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize)) {
entry = dataQueue->queue;
} else {
entry = head;
}
}
return entry;
}
IODataQueueEntry *IODataQueuePeek(IODataQueueMemory *dataQueue)
{
return __IODataQueuePeek(dataQueue, 0);
}
IODataQueueEntry *_IODataQueuePeek(IODataQueueMemory *dataQueue, uint64_t queueSize)
{
return __IODataQueuePeek(dataQueue, queueSize);
}
IOReturn
__IODataQueueDequeue(IODataQueueMemory *dataQueue, uint64_t qSize, void *data, uint32_t *dataSize)
{
IOReturn retVal = kIOReturnSuccess;
IODataQueueEntry * entry = 0;
UInt32 entrySize = 0;
UInt32 headOffset = 0;
UInt32 tailOffset = 0;
UInt32 newHeadOffset = 0;
if (!dataQueue || (data && !dataSize)) {
return kIOReturnBadArgument;
}
headOffset = __c11_atomic_load((_Atomic UInt32 *)&dataQueue->head, __ATOMIC_RELAXED);
tailOffset = __c11_atomic_load((_Atomic UInt32 *)&dataQueue->tail, __ATOMIC_ACQUIRE);
if (headOffset != tailOffset) {
IODataQueueEntry * head = 0;
UInt32 headSize = 0;
UInt32 queueSize = qSize ? qSize : dataQueue->queueSize;
if (headOffset > queueSize) {
return kIOReturnError;
}
head = (IODataQueueEntry *)((char *)dataQueue->queue + headOffset);
headSize = head->size;
if ((headOffset > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
(headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize) ||
(headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > UINT32_MAX - headSize) ||
(headOffset + headSize + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize)) {
entry = dataQueue->queue;
entrySize = entry->size;
if ((entrySize > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
(entrySize + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize)) {
return kIOReturnError;
}
newHeadOffset = entrySize + DATA_QUEUE_ENTRY_HEADER_SIZE;
} else {
entry = head;
entrySize = entry->size;
if ((entrySize > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
(entrySize + DATA_QUEUE_ENTRY_HEADER_SIZE > UINT32_MAX - headOffset) ||
(entrySize + DATA_QUEUE_ENTRY_HEADER_SIZE + headOffset > queueSize)) {
return kIOReturnError;
}
newHeadOffset = headOffset + entrySize + DATA_QUEUE_ENTRY_HEADER_SIZE;
}
} else {
return kIOReturnUnderrun;
}
if (data) {
if (entrySize > *dataSize) {
return kIOReturnNoSpace;
}
memcpy(data, &(entry->data), entrySize);
*dataSize = entrySize;
}
__c11_atomic_store((_Atomic UInt32 *)&dataQueue->head, newHeadOffset, __ATOMIC_RELEASE);
if (newHeadOffset == tailOffset) {
__c11_atomic_thread_fence(__ATOMIC_SEQ_CST);
}
return retVal;
}
IOReturn
IODataQueueDequeue(IODataQueueMemory *dataQueue, void *data, uint32_t *dataSize)
{
return __IODataQueueDequeue(dataQueue, 0, data, dataSize);
}
IOReturn _IODataQueueDequeue(IODataQueueMemory *dataQueue, uint64_t queueSize, void *data, uint32_t *dataSize)
{
return __IODataQueueDequeue(dataQueue, queueSize, data, dataSize);
}
static IOReturn
__IODataQueueEnqueue(IODataQueueMemory *dataQueue, uint64_t qSize, mach_msg_header_t *msgh, uint32_t dataSize, void *data, IODataQueueClientEnqueueReadBytesCallback callback, void * refcon)
{
UInt32 head;
UInt32 tail;
UInt32 newTail;
UInt32 queueSize = qSize ? qSize : dataQueue->queueSize;
UInt32 entrySize = dataSize + DATA_QUEUE_ENTRY_HEADER_SIZE;
IOReturn retVal = kIOReturnSuccess;
IODataQueueEntry * entry;
tail = __c11_atomic_load((_Atomic UInt32 *)&dataQueue->tail, __ATOMIC_RELAXED);
head = __c11_atomic_load((_Atomic UInt32 *)&dataQueue->head, __ATOMIC_ACQUIRE);
if (dataSize > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) {
return kIOReturnOverrun;
}
if (queueSize < tail || queueSize < head) {
return kIOReturnUnderrun;
}
if ( tail >= head )
{
if ((entrySize <= UINT32_MAX - tail) &&
((tail + entrySize) <= queueSize) )
{
entry = (IODataQueueEntry *)((UInt8 *)dataQueue->queue + tail);
if ( data )
memcpy(&(entry->data), data, dataSize);
else if ( callback )
(*callback)(refcon, &(entry->data), dataSize);
entry->size = dataSize;
newTail = tail + entrySize;
}
else if ( head > entrySize ) {
entry = (IODataQueueEntry *)((UInt8 *)dataQueue->queue);
if ( data )
memcpy(&(entry->data), data, dataSize);
else if ( callback )
(*callback)(refcon, &(entry->data), dataSize);
entry->size = dataSize;
if ( ( queueSize - tail ) >= DATA_QUEUE_ENTRY_HEADER_SIZE )
{
((IODataQueueEntry *)((UInt8 *)dataQueue->queue + tail))->size = dataSize;
}
newTail = entrySize;
}
else
{
retVal = kIOReturnOverrun; }
}
else
{
if ( (head - tail) > entrySize )
{
entry = (IODataQueueEntry *)((UInt8 *)dataQueue->queue + tail);
if ( data )
memcpy(&(entry->data), data, dataSize);
else if ( callback )
(*callback)(refcon, &(entry->data), dataSize);
entry->size = dataSize;
newTail = tail + entrySize;
}
else
{
retVal = kIOReturnOverrun; }
}
if ( retVal == kIOReturnSuccess ) {
__c11_atomic_store((_Atomic UInt32 *)&dataQueue->tail, newTail, __ATOMIC_RELEASE);
if (tail != head) {
__c11_atomic_thread_fence(__ATOMIC_SEQ_CST);
head = __c11_atomic_load((_Atomic UInt32 *)&dataQueue->head, __ATOMIC_RELAXED);
}
if (tail == head) {
retVal = _IODataQueueSendDataAvailableNotification(dataQueue, msgh);
}
#if TARGET_IPHONE_SIMULATOR
else
{
retVal = _IODataQueueSendDataAvailableNotification(dataQueue, msgh);
}
#endif
}
else if ( retVal == kIOReturnOverrun ) {
(void) _IODataQueueSendDataAvailableNotification(dataQueue, msgh);
}
return retVal;
}
IOReturn
IODataQueueEnqueue(IODataQueueMemory *dataQueue, void *data, uint32_t dataSize)
{
return __IODataQueueEnqueue(dataQueue, 0, NULL, dataSize, data, NULL, NULL);
}
IOReturn
_IODataQueueEnqueueWithReadCallback(IODataQueueMemory *dataQueue, uint64_t queueSize, mach_msg_header_t *msgh, uint32_t dataSize, IODataQueueClientEnqueueReadBytesCallback callback, void * refcon)
{
return __IODataQueueEnqueue(dataQueue, queueSize, msgh, dataSize, NULL, callback, refcon);
}
IOReturn IODataQueueWaitForAvailableData(IODataQueueMemory *dataQueue, mach_port_t notifyPort)
{
IOReturn kr;
struct {
mach_msg_header_t msgHdr;
mach_msg_trailer_t trailer;
} msg;
if (dataQueue && (notifyPort != MACH_PORT_NULL)) {
kr = mach_msg(&msg.msgHdr, MACH_RCV_MSG, 0, sizeof(msg), notifyPort, 0, MACH_PORT_NULL);
} else {
kr = kIOReturnBadArgument;
}
return kr;
}
mach_port_t IODataQueueAllocateNotificationPort()
{
mach_port_t port = MACH_PORT_NULL;
mach_port_limits_t limits;
mach_msg_type_number_t info_cnt;
kern_return_t kr;
kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &port);
if (kr != KERN_SUCCESS)
return MACH_PORT_NULL;
info_cnt = MACH_PORT_LIMITS_INFO_COUNT;
kr = mach_port_get_attributes(mach_task_self(),
port,
MACH_PORT_LIMITS_INFO,
(mach_port_info_t)&limits,
&info_cnt);
if (kr != KERN_SUCCESS) {
mach_port_mod_refs(mach_task_self(), port, MACH_PORT_RIGHT_RECEIVE, -1);
return MACH_PORT_NULL;
}
limits.mpl_qlimit = 1;
kr = mach_port_set_attributes(mach_task_self(),
port,
MACH_PORT_LIMITS_INFO,
(mach_port_info_t)&limits,
MACH_PORT_LIMITS_INFO_COUNT);
if (kr != KERN_SUCCESS) {
mach_port_mod_refs(mach_task_self(), port, MACH_PORT_RIGHT_RECEIVE, -1);
return MACH_PORT_NULL;
}
return port;
}
IOReturn IODataQueueSetNotificationPort(IODataQueueMemory *dataQueue, mach_port_t notifyPort)
{
IODataQueueAppendix * appendix = NULL;
UInt32 queueSize = 0;
if ( !dataQueue )
return kIOReturnBadArgument;
queueSize = dataQueue->queueSize;
appendix = (IODataQueueAppendix *)((UInt8 *)dataQueue + queueSize + DATA_QUEUE_MEMORY_HEADER_SIZE);
appendix->msgh.msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0);
appendix->msgh.msgh_size = sizeof(appendix->msgh);
appendix->msgh.msgh_remote_port = notifyPort;
appendix->msgh.msgh_local_port = MACH_PORT_NULL;
appendix->msgh.msgh_id = 0;
return kIOReturnSuccess;
}
IOReturn _IODataQueueSendDataAvailableNotification(IODataQueueMemory *dataQueue, mach_msg_header_t *msgh)
{
kern_return_t kr;
mach_msg_header_t header;
if (!msgh) {
IODataQueueAppendix *appendix = NULL;
appendix = (IODataQueueAppendix *)((UInt8 *)dataQueue + dataQueue->queueSize + DATA_QUEUE_MEMORY_HEADER_SIZE);
if ( appendix->msgh.msgh_remote_port == MACH_PORT_NULL )
return kIOReturnSuccess;
header = appendix->msgh;
} else {
header = *msgh;
}
kr = mach_msg(&header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, header.msgh_size, 0, MACH_PORT_NULL, MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
switch(kr) {
case MACH_SEND_TIMED_OUT: case MACH_MSG_SUCCESS:
break;
default:
break;
}
return kr;
}