NetworkCacheIOChannelSoup.cpp [plain text]
#include "config.h"
#include "NetworkCacheIOChannel.h"
#if ENABLE(NETWORK_CACHE)
#include "NetworkCacheFileSystem.h"
#include <wtf/MainThread.h>
#include <wtf/RunLoop.h>
#include <wtf/glib/GUniquePtr.h>
namespace WebKit {
namespace NetworkCache {
static const size_t gDefaultReadBufferSize = 4096;
IOChannel::IOChannel(const String& filePath, Type type)
: m_path(filePath)
, m_type(type)
{
auto path = WebCore::fileSystemRepresentation(filePath);
GRefPtr<GFile> file = adoptGRef(g_file_new_for_path(path.data()));
switch (m_type) {
case Type::Create: {
g_file_delete(file.get(), nullptr, nullptr);
m_outputStream = adoptGRef(G_OUTPUT_STREAM(g_file_create(file.get(), static_cast<GFileCreateFlags>(G_FILE_CREATE_PRIVATE), nullptr, nullptr)));
#if !HAVE(STAT_BIRTHTIME)
GUniquePtr<char> birthtimeString(g_strdup_printf("%" G_GUINT64_FORMAT, std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())));
g_file_set_attribute_string(file.get(), "xattr::birthtime", birthtimeString.get(), G_FILE_QUERY_INFO_NONE, nullptr, nullptr);
#endif
break;
}
case Type::Write: {
m_ioStream = adoptGRef(g_file_open_readwrite(file.get(), nullptr, nullptr));
break;
}
case Type::Read:
m_inputStream = adoptGRef(G_INPUT_STREAM(g_file_read(file.get(), nullptr, nullptr)));
break;
}
}
IOChannel::~IOChannel()
{
RELEASE_ASSERT(!m_wasDeleted.exchange(true));
}
Ref<IOChannel> IOChannel::open(const String& filePath, IOChannel::Type type)
{
return adoptRef(*new IOChannel(filePath, type));
}
static inline void runTaskInQueue(Function<void ()>&& task, WorkQueue* queue)
{
if (queue) {
queue->dispatch(WTFMove(task));
return;
}
RunLoop::main().dispatch(WTFMove(task));
}
static void fillDataFromReadBuffer(SoupBuffer* readBuffer, size_t size, Data& data)
{
GRefPtr<SoupBuffer> buffer;
if (size != readBuffer->length) {
buffer = adoptGRef(soup_buffer_new_subbuffer(readBuffer, 0, size));
} else
buffer = readBuffer;
if (data.isNull()) {
data = { reinterpret_cast<const uint8_t*>(buffer->data), size };
} else {
Data dataRead(WTFMove(buffer));
data = concatenate(data, dataRead);
}
}
struct ReadAsyncData {
RefPtr<IOChannel> channel;
GRefPtr<SoupBuffer> buffer;
RefPtr<WorkQueue> queue;
size_t bytesToRead;
std::function<void (Data&, int error)> completionHandler;
Data data;
};
static void inputStreamReadReadyCallback(GInputStream* stream, GAsyncResult* result, gpointer userData)
{
std::unique_ptr<ReadAsyncData> asyncData(static_cast<ReadAsyncData*>(userData));
gssize bytesRead = g_input_stream_read_finish(stream, result, nullptr);
if (bytesRead == -1) {
WorkQueue* queue = asyncData->queue.get();
runTaskInQueue([asyncData = WTFMove(asyncData)] {
asyncData->completionHandler(asyncData->data, -1);
}, queue);
return;
}
if (!bytesRead) {
WorkQueue* queue = asyncData->queue.get();
runTaskInQueue([asyncData = WTFMove(asyncData)] {
asyncData->completionHandler(asyncData->data, 0);
}, queue);
return;
}
ASSERT(bytesRead > 0);
fillDataFromReadBuffer(asyncData->buffer.get(), static_cast<size_t>(bytesRead), asyncData->data);
size_t pendingBytesToRead = asyncData->bytesToRead - asyncData->data.size();
if (!pendingBytesToRead) {
WorkQueue* queue = asyncData->queue.get();
runTaskInQueue([asyncData = WTFMove(asyncData)] {
asyncData->completionHandler(asyncData->data, 0);
}, queue);
return;
}
size_t bytesToRead = std::min(pendingBytesToRead, asyncData->buffer->length);
auto data = const_cast<char*>(asyncData->buffer->data);
g_input_stream_read_async(stream, data, bytesToRead, G_PRIORITY_DEFAULT, nullptr,
reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData.release());
}
void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
{
RefPtr<IOChannel> channel(this);
if (!m_inputStream) {
runTaskInQueue([channel, completionHandler] {
Data data;
completionHandler(data, -1);
}, queue);
return;
}
if (!isMainThread()) {
readSyncInThread(offset, size, queue, completionHandler);
return;
}
size_t bufferSize = std::min(size, gDefaultReadBufferSize);
uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
GRefPtr<SoupBuffer> buffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
ReadAsyncData* asyncData = new ReadAsyncData { this, buffer.get(), queue, size, completionHandler, { } };
g_input_stream_read_async(m_inputStream.get(), const_cast<char*>(buffer->data), bufferSize, G_PRIORITY_DEFAULT, nullptr,
reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData);
}
void IOChannel::readSyncInThread(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
{
ASSERT(!isMainThread());
RefPtr<IOChannel> channel(this);
detachThread(createThread("IOChannel::readSync", [channel, size, queue, completionHandler] {
size_t bufferSize = std::min(size, gDefaultReadBufferSize);
uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
GRefPtr<SoupBuffer> readBuffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
Data data;
size_t pendingBytesToRead = size;
size_t bytesToRead = bufferSize;
do {
gssize bytesRead = g_input_stream_read(channel->m_inputStream.get(), const_cast<char*>(readBuffer->data), bytesToRead, nullptr, nullptr);
if (bytesRead == -1) {
runTaskInQueue([channel, completionHandler] {
Data data;
completionHandler(data, -1);
}, queue);
return;
}
if (!bytesRead)
break;
ASSERT(bytesRead > 0);
fillDataFromReadBuffer(readBuffer.get(), static_cast<size_t>(bytesRead), data);
pendingBytesToRead = size - data.size();
bytesToRead = std::min(pendingBytesToRead, readBuffer->length);
} while (pendingBytesToRead);
GRefPtr<SoupBuffer> bufferCapture = data.soupBuffer();
runTaskInQueue([channel, bufferCapture, completionHandler] {
GRefPtr<SoupBuffer> buffer = bufferCapture;
Data data = { WTFMove(buffer) };
completionHandler(data, 0);
}, queue);
}));
}
struct WriteAsyncData {
RefPtr<IOChannel> channel;
GRefPtr<SoupBuffer> buffer;
RefPtr<WorkQueue> queue;
std::function<void (int error)> completionHandler;
};
static void outputStreamWriteReadyCallback(GOutputStream* stream, GAsyncResult* result, gpointer userData)
{
std::unique_ptr<WriteAsyncData> asyncData(static_cast<WriteAsyncData*>(userData));
gssize bytesWritten = g_output_stream_write_finish(stream, result, nullptr);
if (bytesWritten == -1) {
WorkQueue* queue = asyncData->queue.get();
runTaskInQueue([asyncData = WTFMove(asyncData)] {
asyncData->completionHandler(-1);
}, queue);
return;
}
gssize pendingBytesToWrite = asyncData->buffer->length - bytesWritten;
if (!pendingBytesToWrite) {
WorkQueue* queue = asyncData->queue.get();
runTaskInQueue([asyncData = WTFMove(asyncData)] {
asyncData->completionHandler(0);
}, queue);
return;
}
asyncData->buffer = adoptGRef(soup_buffer_new_subbuffer(asyncData->buffer.get(), bytesWritten, pendingBytesToWrite));
auto data = asyncData->buffer->data;
g_output_stream_write_async(stream, data, pendingBytesToWrite, G_PRIORITY_DEFAULT_IDLE, nullptr,
reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData.release());
}
void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, std::function<void (int error)> completionHandler)
{
RefPtr<IOChannel> channel(this);
if (!m_outputStream && !m_ioStream) {
runTaskInQueue([channel, completionHandler] {
completionHandler(-1);
}, queue);
return;
}
GOutputStream* stream = m_outputStream ? m_outputStream.get() : g_io_stream_get_output_stream(G_IO_STREAM(m_ioStream.get()));
if (!stream) {
runTaskInQueue([channel, completionHandler] {
completionHandler(-1);
}, queue);
return;
}
WriteAsyncData* asyncData = new WriteAsyncData { this, data.soupBuffer(), queue, completionHandler };
g_output_stream_write_async(stream, asyncData->buffer->data, data.size(), G_PRIORITY_DEFAULT_IDLE, nullptr,
reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData);
}
} }
#endif