SocketStreamHandleSoup.cpp [plain text]
#include "config.h"
#include "SocketStreamHandle.h"
#include "KURL.h"
#include "Logging.h"
#include "NotImplemented.h"
#include "SocketStreamError.h"
#include "SocketStreamHandleClient.h"
#include <gio/gio.h>
#include <glib.h>
#include <wtf/NotFound.h>
#include <wtf/Vector.h>
#include <wtf/gobject/GOwnPtr.h>
#include <wtf/text/CString.h>
#define READ_BUFFER_SIZE 1024
namespace WebCore {
static void connectedCallback(GSocketClient*, GAsyncResult*, void*);
static void readReadyCallback(GInputStream*, GAsyncResult*, void*);
static gboolean writeReadyCallback(GPollableOutputStream*, void*);
static HashMap<void*, SocketStreamHandle*> gActiveHandles;
static SocketStreamHandle* getHandleFromId(void* id)
{
if (!gActiveHandles.contains(id))
return 0;
return gActiveHandles.get(id);
}
static void deactivateHandle(SocketStreamHandle* handle)
{
gActiveHandles.remove(handle->id());
}
static void* activateHandle(SocketStreamHandle* handle)
{
static gint currentHandleId = 1;
void* id = GINT_TO_POINTER(currentHandleId++);
gActiveHandles.set(id, handle);
return id;
}
SocketStreamHandle::SocketStreamHandle(const KURL& url, SocketStreamHandleClient* client)
: SocketStreamHandleBase(url, client)
, m_readBuffer(0)
{
unsigned int port = url.hasPort() ? url.port() : (url.protocolIs("wss") ? 443 : 80);
m_id = activateHandle(this);
GRefPtr<GSocketClient> socketClient = adoptGRef(g_socket_client_new());
if (url.protocolIs("wss"))
g_socket_client_set_tls(socketClient.get(), TRUE);
g_socket_client_connect_to_host_async(socketClient.get(), url.host().utf8().data(), port, 0,
reinterpret_cast<GAsyncReadyCallback>(connectedCallback), m_id);
}
SocketStreamHandle::~SocketStreamHandle()
{
deactivateHandle(this);
setClient(0);
}
void SocketStreamHandle::connected(GSocketConnection* socketConnection, GError* error)
{
if (error) {
m_client->didFailSocketStream(this, SocketStreamError(error->code, error->message));
return;
}
m_socketConnection = adoptGRef(socketConnection);
m_outputStream = G_POLLABLE_OUTPUT_STREAM(g_io_stream_get_output_stream(G_IO_STREAM(m_socketConnection.get())));
m_inputStream = g_io_stream_get_input_stream(G_IO_STREAM(m_socketConnection.get()));
m_readBuffer = new char[READ_BUFFER_SIZE];
g_input_stream_read_async(m_inputStream.get(), m_readBuffer, READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, 0,
reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), m_id);
RefPtr<SocketStreamHandle> protect(this);
m_state = Open;
m_client->didOpenSocketStream(this);
if (!m_socketConnection) return;
}
void SocketStreamHandle::readBytes(signed long bytesRead, GError* error)
{
if (error) {
m_client->didFailSocketStream(this, SocketStreamError(error->code, error->message));
return;
}
if (!bytesRead) {
close();
return;
}
RefPtr<SocketStreamHandle> protect(this);
m_client->didReceiveSocketStreamData(this, m_readBuffer, bytesRead);
if (m_inputStream) g_input_stream_read_async(m_inputStream.get(), m_readBuffer, READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, 0,
reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), m_id);
}
void SocketStreamHandle::writeReady()
{
if (!bufferedAmount()) {
stopWaitingForSocketWritability();
return;
}
sendPendingData();
}
int SocketStreamHandle::platformSend(const char* data, int length)
{
GOwnPtr<GError> error;
gssize written = g_pollable_output_stream_write_nonblocking(m_outputStream.get(), data, length, 0, &error.outPtr());
if (error) {
if (g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
beginWaitingForSocketWritability();
else
m_client->didFailSocketStream(this, SocketStreamError(error->code, error->message));
return 0;
}
if (written < length)
beginWaitingForSocketWritability();
return written;
}
void SocketStreamHandle::platformClose()
{
deactivateHandle(this);
stopWaitingForSocketWritability();
if (m_socketConnection) {
GOwnPtr<GError> error;
g_io_stream_close(G_IO_STREAM(m_socketConnection.get()), 0, &error.outPtr());
if (error)
m_client->didFailSocketStream(this, SocketStreamError(error->code, error->message));
m_socketConnection = 0;
}
m_outputStream = 0;
m_inputStream = 0;
delete m_readBuffer;
m_client->didCloseSocketStream(this);
}
void SocketStreamHandle::didReceiveAuthenticationChallenge(const AuthenticationChallenge&)
{
notImplemented();
}
void SocketStreamHandle::receivedCredential(const AuthenticationChallenge&, const Credential&)
{
notImplemented();
}
void SocketStreamHandle::receivedRequestToContinueWithoutCredential(const AuthenticationChallenge&)
{
notImplemented();
}
void SocketStreamHandle::receivedCancellation(const AuthenticationChallenge&)
{
notImplemented();
}
void SocketStreamHandle::beginWaitingForSocketWritability()
{
if (m_writeReadySource) return;
m_writeReadySource = adoptGRef(g_pollable_output_stream_create_source(m_outputStream.get(), 0));
g_source_set_callback(m_writeReadySource.get(), reinterpret_cast<GSourceFunc>(writeReadyCallback), m_id, 0);
g_source_attach(m_writeReadySource.get(), 0);
}
void SocketStreamHandle::stopWaitingForSocketWritability()
{
if (!m_writeReadySource) return;
g_source_remove(g_source_get_id(m_writeReadySource.get()));
m_writeReadySource = 0;
}
static void connectedCallback(GSocketClient* client, GAsyncResult* result, void* id)
{
GOwnPtr<GError> error;
GSocketConnection* socketConnection = g_socket_client_connect_to_host_finish(client, result, &error.outPtr());
SocketStreamHandle* handle = getHandleFromId(id);
if (!handle) {
if (socketConnection)
g_io_stream_close(G_IO_STREAM(socketConnection), 0, 0);
return;
}
handle->connected(socketConnection, error.get());
}
static void readReadyCallback(GInputStream* stream, GAsyncResult* result, void* id)
{
GOwnPtr<GError> error;
gssize bytesRead = g_input_stream_read_finish(stream, result, &error.outPtr());
SocketStreamHandle* handle = getHandleFromId(id);
if (!handle)
return;
handle->readBytes(bytesRead, error.get());
}
static gboolean writeReadyCallback(GPollableOutputStream*, void* id)
{
SocketStreamHandle* handle = getHandleFromId(id);
if (!handle)
return FALSE;
handle->writeReady();
return TRUE;
}
}