AsyncFileStream.cpp [plain text]
#include "config.h"
#include "AsyncFileStream.h"
#include "FileStream.h"
#include "FileStreamClient.h"
#include <mutex>
#include <wtf/AutodrainedPool.h>
#include <wtf/Function.h>
#include <wtf/MainThread.h>
#include <wtf/MessageQueue.h>
#include <wtf/NeverDestroyed.h>
#include <wtf/Threading.h>
#include <wtf/URL.h>
namespace WebCore {
struct AsyncFileStream::Internals {
WTF_MAKE_STRUCT_FAST_ALLOCATED;
explicit Internals(FileStreamClient&);
FileStream stream;
FileStreamClient& client;
#if !COMPILER(MSVC)
std::atomic_bool destroyed { false };
#else
std::atomic_bool destroyed;
#endif
};
inline AsyncFileStream::Internals::Internals(FileStreamClient& client)
: client(client)
{
#if COMPILER(MSVC)
atomic_init(&destroyed, false);
#endif
}
static void callOnFileThread(Function<void ()>&& function)
{
ASSERT(isMainThread());
ASSERT(function);
static NeverDestroyed<MessageQueue<Function<void ()>>> queue;
static std::once_flag createFileThreadOnce;
std::call_once(createFileThreadOnce, [] {
Thread::create("WebCore: AsyncFileStream", [] {
for (;;) {
AutodrainedPool pool;
auto function = queue.get().waitForMessage();
ASSERT(function);
ASSERT(*function);
(*function)();
}
});
});
queue.get().append(makeUnique<Function<void ()>>(WTFMove(function)));
}
AsyncFileStream::AsyncFileStream(FileStreamClient& client)
: m_internals(makeUnique<Internals>(client))
{
ASSERT(isMainThread());
}
AsyncFileStream::~AsyncFileStream()
{
ASSERT(isMainThread());
m_internals->destroyed = true;
callOnFileThread([internals = WTFMove(m_internals)]() mutable {
callOnMainThread([internals = WTFMove(internals)] {
});
});
}
void AsyncFileStream::perform(WTF::Function<WTF::Function<void(FileStreamClient&)>(FileStream&)>&& operation)
{
auto& internals = *m_internals;
callOnFileThread([&internals, operation = WTFMove(operation)] {
if (internals.destroyed)
return;
callOnMainThread([&internals, mainThreadWork = operation(internals.stream)] {
if (internals.destroyed)
return;
mainThreadWork(internals.client);
});
});
}
void AsyncFileStream::getSize(const String& path, Optional<WallTime> expectedModificationTime)
{
perform([path = path.isolatedCopy(), expectedModificationTime](FileStream& stream) -> WTF::Function<void(FileStreamClient&)> {
long long size = stream.getSize(path, expectedModificationTime);
return [size](FileStreamClient& client) {
client.didGetSize(size);
};
});
}
void AsyncFileStream::openForRead(const String& path, long long offset, long long length)
{
perform([path = path.isolatedCopy(), offset, length](FileStream& stream) -> WTF::Function<void(FileStreamClient&)> {
bool success = stream.openForRead(path, offset, length);
return [success](FileStreamClient& client) {
client.didOpen(success);
};
});
}
void AsyncFileStream::close()
{
auto& internals = *m_internals;
callOnFileThread([&internals] {
internals.stream.close();
});
}
void AsyncFileStream::read(char* buffer, int length)
{
perform([buffer, length](FileStream& stream) -> WTF::Function<void(FileStreamClient&)> {
int bytesRead = stream.read(buffer, length);
return [bytesRead](FileStreamClient& client) {
client.didRead(bytesRead);
};
});
}
}