StreamSource.cpp   [plain text]


#include "StreamSource.h"
#include <string>
#include "misc.h"

using namespace std;

CFStringRef gStreamSourceName = CFSTR("StreamSource");

const CFIndex kMaximumSize = 2048;

StreamSource::StreamSource(CFReadStreamRef input, Transform* transform, CFStringRef name)
	: Source(gStreamSourceName, transform, name),
	mReadStream(input),
	mReading(dispatch_group_create())
{
	dispatch_group_enter(mReading);
	CFRetain(mReadStream);
}

void StreamSource::BackgroundActivate()
{
	CFIndex result = 0;
	
	do
	{
		// make a buffer big enough to handle the object
		// NOTE: allocating this on the stack and letting CFDataCreate copy it is _faster_ then malloc and CFDataCreateWithBytes(..., kCFAllactorMalloc) by a fair margin.   At least for 2K chunks.   Retest if changing the size.
		UInt8 buffer[kMaximumSize];
		
		result = CFReadStreamRead(mReadStream, buffer, kMaximumSize);
		
		if (result > 0) // was data returned?
		{
			// make the data and send it to the transform
			CFDataRef data = CFDataCreate(NULL, buffer, result);

			CFErrorRef error = mDestination->SetAttribute(mDestinationName, data);
			
			CFRelease(data);

			if (error != NULL) // we have a problem, there was probably an abort on the chain
			{
				return; // quiesce the source
			}
		}
	} while (result > 0);
	
	if (result < 0)
	{
		// we got an error!
		CFErrorRef error = CFReadStreamCopyError(mReadStream);
		mDestination->SetAttribute(mDestinationName, error);
		if (error)
		{
			// NOTE: CF doesn't always tell us about this error.   Arguably it could be better to
			// "invent" a generic error, but it is a hard argument that we want to crash in CFRelease(NULL)...
			CFRelease(error);
		}
	}
	else
	{
		// send an EOS
		mDestination->SetAttribute(mDestinationName, NULL); // end of stream
	}
}

void StreamSource::DoActivate()
{
	CFRetain(mDestination->GetCFObject());
	dispatch_group_async(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{ 
		this->BackgroundActivate();
		CFRelease(mDestination->GetCFObject());
	});
	dispatch_group_leave(mReading);
}

void StreamSource::Finalize()
{
	dispatch_group_notify(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
		delete this;
	});
}

StreamSource::~StreamSource()
{
	CFRelease(mReadStream);
	mReadStream = NULL;
	dispatch_release(mReading);
	mReading = NULL;
}


Boolean StreamSource::Equal(const CoreFoundationObject* object)
{
	// not equal if we are not the same object
	if (Source::Equal(object))
	{
		const StreamSource* ss = (StreamSource*) object;
		return CFEqual(ss->mReadStream, mReadStream);
	}

	return false;
}



CFTypeRef StreamSource::Make(CFReadStreamRef input, Transform* transform, CFStringRef name)
{
	return CoreFoundationHolder::MakeHolder(gInternalCFObjectName, new StreamSource(input, transform, name));
}



string StreamSource::DebugDescription()
{
	string result = Source::DebugDescription() + ": Stream ";
	
	char buffer[256];
	snprintf(buffer, sizeof(buffer), "(mReadStream = %p)", mReadStream);
	
	result += buffer;
	
	return result;
}