WebSocketStream.cpp [plain text]
#include "config.h"
#include "WebSocketStream.h"
#include "DataReference.h"
#include "NetworkConnectionToWebProcessMessages.h"
#include "NetworkProcessConnection.h"
#include "NetworkSocketStreamMessages.h"
#include "WebCoreArgumentCoders.h"
#include "WebProcess.h"
#include "WebSocketIdentifier.h"
#include <WebCore/CookieRequestHeaderFieldProxy.h>
#include <WebCore/SocketStreamError.h>
#include <WebCore/SocketStreamHandleClient.h>
#include <wtf/NeverDestroyed.h>
namespace WebKit {
using namespace WebCore;
static Lock globalWebSocketStreamMapLock;
static HashMap<WebSocketIdentifier, WebSocketStream*>& globalWebSocketStreamMap()
{
static NeverDestroyed<HashMap<WebSocketIdentifier, WebSocketStream*>> globalMap;
return globalMap;
}
WebSocketStream* WebSocketStream::streamWithIdentifier(WebSocketIdentifier identifier)
{
LockHolder locker(globalWebSocketStreamMapLock);
return globalWebSocketStreamMap().get(identifier);
}
void WebSocketStream::networkProcessCrashed()
{
Vector<RefPtr<WebSocketStream>> sockets;
{
LockHolder locker(globalWebSocketStreamMapLock);
sockets.reserveInitialCapacity(globalWebSocketStreamMap().size());
for (auto& stream : globalWebSocketStreamMap().values())
sockets.uncheckedAppend(stream);
}
for (auto& stream : sockets) {
for (auto& callback : stream->m_sendDataCallbacks.values())
callback(false);
for (auto& callback : stream->m_sendHandshakeCallbacks.values())
callback(false, false);
stream->m_client.didFailSocketStream(*stream, SocketStreamError(0, { }, "Network process crashed."));
stream = nullptr;
}
LockHolder locker(globalWebSocketStreamMapLock);
globalWebSocketStreamMap().clear();
}
Ref<WebSocketStream> WebSocketStream::create(const URL& url, SocketStreamHandleClient& client, const String& credentialPartition)
{
return adoptRef(*new WebSocketStream(url, client, credentialPartition));
}
WebSocketStream::WebSocketStream(const URL& url, WebCore::SocketStreamHandleClient& client, const String& cachePartition)
: SocketStreamHandle(url, client)
, m_identifier(WebSocketIdentifier::generate())
, m_client(client)
{
WebProcess::singleton().ensureNetworkProcessConnection().connection().send(Messages::NetworkConnectionToWebProcess::CreateSocketStream(url, cachePartition, m_identifier), 0);
LockHolder locker(globalWebSocketStreamMapLock);
ASSERT(!globalWebSocketStreamMap().contains(m_identifier));
globalWebSocketStreamMap().set(m_identifier, this);
}
WebSocketStream::~WebSocketStream()
{
LockHolder locker(globalWebSocketStreamMapLock);
ASSERT(globalWebSocketStreamMap().contains(m_identifier));
globalWebSocketStreamMap().remove(m_identifier);
}
IPC::Connection* WebSocketStream::messageSenderConnection() const
{
return &WebProcess::singleton().ensureNetworkProcessConnection().connection();
}
uint64_t WebSocketStream::messageSenderDestinationID() const
{
return m_identifier.toUInt64();
}
void WebSocketStream::platformSend(const uint8_t* data, size_t length, Function<void(bool)>&& completionHandler)
{
static uint64_t nextDataIdentifier = 1;
uint64_t dataIdentifier = nextDataIdentifier++;
send(Messages::NetworkSocketStream::SendData(IPC::DataReference(data, length), dataIdentifier));
ASSERT(!m_sendDataCallbacks.contains(dataIdentifier));
m_sendDataCallbacks.add(dataIdentifier, WTFMove(completionHandler));
}
void WebSocketStream::platformSendHandshake(const uint8_t* data, size_t length, const Optional<CookieRequestHeaderFieldProxy>& headerFieldProxy, Function<void(bool, bool)>&& completionHandler)
{
static uint64_t nextDataIdentifier = 1;
uint64_t dataIdentifier = nextDataIdentifier++;
send(Messages::NetworkSocketStream::SendHandshake(IPC::DataReference(data, length), headerFieldProxy, dataIdentifier));
ASSERT(!m_sendHandshakeCallbacks.contains(dataIdentifier));
m_sendHandshakeCallbacks.add(dataIdentifier, WTFMove(completionHandler));
}
void WebSocketStream::didSendData(uint64_t identifier, bool success)
{
ASSERT(m_sendDataCallbacks.contains(identifier));
m_sendDataCallbacks.take(identifier)(success);
}
void WebSocketStream::didSendHandshake(uint64_t identifier, bool success, bool didAccessSecureCookies)
{
ASSERT(m_sendHandshakeCallbacks.contains(identifier));
m_sendHandshakeCallbacks.take(identifier)(success, didAccessSecureCookies);
}
void WebSocketStream::platformClose()
{
send(Messages::NetworkSocketStream::Close());
}
size_t WebSocketStream::bufferedAmount()
{
return m_bufferedAmount;
}
void WebSocketStream::didOpenSocketStream()
{
m_state = Open;
m_client.didOpenSocketStream(*this);
}
void WebSocketStream::didCloseSocketStream()
{
m_state = Closed;
m_client.didCloseSocketStream(*this);
}
void WebSocketStream::didReceiveSocketStreamData(const IPC::DataReference& data)
{
m_client.didReceiveSocketStreamData(*this, reinterpret_cast<const char*>(data.data()), data.size());
}
void WebSocketStream::didFailToReceiveSocketStreamData()
{
m_client.didFailToReceiveSocketStreamData(*this);
}
void WebSocketStream::didUpdateBufferedAmount(uint64_t newAmount)
{
m_bufferedAmount = newAmount;
m_client.didUpdateBufferedAmount(*this, newAmount);
}
void WebSocketStream::didFailSocketStream(WebCore::SocketStreamError&& error)
{
m_client.didFailSocketStream(*this, WTFMove(error));
}
}