#include "StreamSource.h"
#include <string>
#include "misc.h"
using namespace std;
CFStringRef gStreamSourceName = CFSTR("StreamSource");
const CFIndex kMaximumSize = 2048;
StreamSource::StreamSource(CFReadStreamRef input, Transform* transform, CFStringRef name)
: Source(gStreamSourceName, transform, name),
mReadStream(input),
mReading(dispatch_group_create())
{
dispatch_group_enter(mReading);
CFRetain(mReadStream);
}
void StreamSource::BackgroundActivate()
{
CFIndex result = 0;
do
{
UInt8 buffer[kMaximumSize];
result = CFReadStreamRead(mReadStream, buffer, kMaximumSize);
if (result > 0) {
CFDataRef data = CFDataCreate(NULL, buffer, result);
CFErrorRef error = mDestination->SetAttribute(mDestinationName, data);
CFRelease(data);
if (error != NULL) {
return; }
}
} while (result > 0);
if (result < 0)
{
CFErrorRef error = CFReadStreamCopyError(mReadStream);
mDestination->SetAttribute(mDestinationName, error);
if (error)
{
CFRelease(error);
}
}
else
{
mDestination->SetAttribute(mDestinationName, NULL); }
}
void StreamSource::DoActivate()
{
CFRetain(mDestination->GetCFObject());
dispatch_group_async(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
this->BackgroundActivate();
CFRelease(mDestination->GetCFObject());
});
dispatch_group_leave(mReading);
}
void StreamSource::Finalize()
{
dispatch_group_notify(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
delete this;
});
}
StreamSource::~StreamSource()
{
CFRelease(mReadStream);
mReadStream = NULL;
dispatch_release(mReading);
mReading = NULL;
}
Boolean StreamSource::Equal(const CoreFoundationObject* object)
{
if (Source::Equal(object))
{
const StreamSource* ss = (StreamSource*) object;
return CFEqual(ss->mReadStream, mReadStream);
}
return false;
}
CFTypeRef StreamSource::Make(CFReadStreamRef input, Transform* transform, CFStringRef name)
{
return CoreFoundationHolder::MakeHolder(gInternalCFObjectName, new StreamSource(input, transform, name));
}
string StreamSource::DebugDescription()
{
string result = Source::DebugDescription() + ": Stream ";
char buffer[256];
snprintf(buffer, sizeof(buffer), "(mReadStream = %p)", mReadStream);
result += buffer;
return result;
}