AsyncFileStream.cpp [plain text]
#include "config.h"
#include "AsyncFileStream.h"
#include "FileStream.h"
#include "FileStreamClient.h"
#include "URL.h"
#include <wtf/AutodrainedPool.h>
#include <wtf/MainThread.h>
#include <wtf/MessageQueue.h>
#include <wtf/NeverDestroyed.h>
namespace WebCore {
struct AsyncFileStream::Internals {
explicit Internals(FileStreamClient&);
FileStream stream;
FileStreamClient& client;
#if !COMPILER(MSVC)
std::atomic_bool destroyed { false };
#else
std::atomic_bool destroyed;
#endif
};
inline AsyncFileStream::Internals::Internals(FileStreamClient& client)
: client(client)
{
#if COMPILER(MSVC)
atomic_init(&destroyed, false);
#endif
}
static void callOnFileThread(std::function<void()>&& function)
{
ASSERT(isMainThread());
ASSERT(function);
static NeverDestroyed<MessageQueue<std::function<void()>>> queue;
static std::once_flag createFileThreadOnce;
std::call_once(createFileThreadOnce, [] {
createThread("WebCore: AsyncFileStream", [] {
for (;;) {
AutodrainedPool pool;
auto function = queue.get().waitForMessage();
ASSERT(function);
ASSERT(*function);
(*function)();
}
});
});
queue.get().append(std::make_unique<std::function<void()>>(WTF::move(function)));
}
AsyncFileStream::AsyncFileStream(FileStreamClient& client)
: m_internals(std::make_unique<Internals>(client))
{
ASSERT(isMainThread());
}
AsyncFileStream::~AsyncFileStream()
{
ASSERT(isMainThread());
auto& internals = *m_internals.release();
internals.destroyed = true;
callOnFileThread([&internals] {
callOnMainThread([&internals] {
delete &internals;
});
});
}
void AsyncFileStream::perform(std::function<std::function<void(FileStreamClient&)>(FileStream&)> operation)
{
auto& internals = *m_internals;
callOnFileThread([&internals, operation] {
if (internals.destroyed)
return;
auto mainThreadWork = operation(internals.stream);
callOnMainThread([&internals, mainThreadWork] {
if (internals.destroyed)
return;
mainThreadWork(internals.client);
});
});
}
void AsyncFileStream::getSize(const String& path, double expectedModificationTime)
{
StringCapture capturedPath(path);
perform([capturedPath, expectedModificationTime](FileStream& stream) -> std::function<void(FileStreamClient&)> {
long long size = stream.getSize(capturedPath.string(), expectedModificationTime);
return [size](FileStreamClient& client) {
client.didGetSize(size);
};
});
}
void AsyncFileStream::openForRead(const String& path, long long offset, long long length)
{
StringCapture capturedPath(path);
perform([capturedPath, offset, length](FileStream& stream) -> std::function<void(FileStreamClient&)> {
bool success = stream.openForRead(capturedPath.string(), offset, length);
return [success](FileStreamClient& client) {
client.didOpen(success);
};
});
}
void AsyncFileStream::openForWrite(const String& path)
{
StringCapture capturedPath(path);
perform([capturedPath](FileStream& stream) -> std::function<void(FileStreamClient&)> {
bool success = stream.openForWrite(capturedPath.string());
return [success](FileStreamClient& client) {
client.didOpen(success);
};
});
}
void AsyncFileStream::close()
{
auto& internals = *m_internals;
callOnFileThread([&internals] {
internals.stream.close();
});
}
void AsyncFileStream::read(char* buffer, int length)
{
perform([buffer, length](FileStream& stream) -> std::function<void(FileStreamClient&)> {
int bytesRead = stream.read(buffer, length);
return [bytesRead](FileStreamClient& client) {
client.didRead(bytesRead);
};
});
}
void AsyncFileStream::write(const URL& blobURL, long long position, int length)
{
URLCapture capturedURL(blobURL);
perform([capturedURL, position, length](FileStream& stream) -> std::function<void(FileStreamClient&)> {
int bytesWritten = stream.write(capturedURL.url(), position, length);
return [bytesWritten](FileStreamClient& client) {
client.didWrite(bytesWritten);
};
});
}
void AsyncFileStream::truncate(long long position)
{
perform([position](FileStream& stream) -> std::function<void(FileStreamClient&)> {
bool success = stream.truncate(position);
return [success](FileStreamClient& client) {
client.didTruncate(success);
};
});
}
}