SocketStreamHandleCurl.cpp [plain text]
#include "config.h"
#include "SocketStreamHandle.h"
#if USE(CURL)
#include "Logging.h"
#include "SocketStreamHandleClient.h"
#include "URL.h"
#include <wtf/MainThread.h>
#include <wtf/text/CString.h>
namespace WebCore {
SocketStreamHandle::SocketStreamHandle(const URL& url, SocketStreamHandleClient& client)
: SocketStreamHandleBase(url, client)
{
LOG(Network, "SocketStreamHandle %p new client %p", this, &m_client);
ASSERT(isMainThread());
startThread();
}
SocketStreamHandle::~SocketStreamHandle()
{
LOG(Network, "SocketStreamHandle %p delete", this);
ASSERT(!m_workerThread);
}
int SocketStreamHandle::platformSend(const char* data, int length)
{
LOG(Network, "SocketStreamHandle %p platformSend", this);
ASSERT(isMainThread());
startThread();
auto copy = createCopy(data, length);
std::lock_guard<Lock> lock(m_mutexSend);
m_sendData.append(SocketData { WTFMove(copy), length });
return length;
}
void SocketStreamHandle::platformClose()
{
LOG(Network, "SocketStreamHandle %p platformClose", this);
ASSERT(isMainThread());
stopThread();
m_client.didCloseSocketStream(*this);
}
bool SocketStreamHandle::readData(CURL* curlHandle)
{
ASSERT(!isMainThread());
const int bufferSize = 1024;
std::unique_ptr<char[]> data(new char[bufferSize]);
size_t bytesRead = 0;
CURLcode ret = curl_easy_recv(curlHandle, data.get(), bufferSize, &bytesRead);
if (ret == CURLE_OK && bytesRead >= 0) {
m_mutexReceive.lock();
m_receiveData.append(SocketData { WTFMove(data), static_cast<int>(bytesRead) });
m_mutexReceive.unlock();
ref();
callOnMainThread([this] {
didReceiveData();
deref();
});
return true;
}
if (ret == CURLE_AGAIN)
return true;
return false;
}
bool SocketStreamHandle::sendData(CURL* curlHandle)
{
ASSERT(!isMainThread());
while (true) {
m_mutexSend.lock();
if (!m_sendData.size()) {
m_mutexSend.unlock();
break;
}
auto sendData = m_sendData.takeFirst();
m_mutexSend.unlock();
int totalBytesSent = 0;
while (totalBytesSent < sendData.size) {
size_t bytesSent = 0;
CURLcode ret = curl_easy_send(curlHandle, sendData.data.get() + totalBytesSent, sendData.size - totalBytesSent, &bytesSent);
if (ret == CURLE_OK)
totalBytesSent += bytesSent;
else
break;
}
if (totalBytesSent < sendData.size) {
const int restLength = sendData.size - totalBytesSent;
auto copy = createCopy(sendData.data.get() + totalBytesSent, restLength);
std::lock_guard<Lock> lock(m_mutexSend);
m_sendData.prepend(SocketData { WTFMove(copy), restLength });
return false;
}
}
return true;
}
bool SocketStreamHandle::waitForAvailableData(CURL* curlHandle, std::chrono::milliseconds selectTimeout)
{
ASSERT(!isMainThread());
std::chrono::microseconds usec = std::chrono::duration_cast<std::chrono::microseconds>(selectTimeout);
struct timeval timeout;
if (usec <= std::chrono::microseconds(0)) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
} else {
timeout.tv_sec = usec.count() / 1000000;
timeout.tv_usec = usec.count() % 1000000;
}
long socket;
if (curl_easy_getinfo(curlHandle, CURLINFO_LASTSOCKET, &socket) != CURLE_OK)
return false;
fd_set fdread;
FD_ZERO(&fdread);
FD_SET(socket, &fdread);
int rc = ::select(0, &fdread, nullptr, nullptr, &timeout);
return rc == 1;
}
void SocketStreamHandle::startThread()
{
ASSERT(isMainThread());
if (m_workerThread)
return;
ref();
m_workerThread = createThread("WebSocket thread", [this] {
ASSERT(!isMainThread());
CURL* curlHandle = curl_easy_init();
if (!curlHandle)
return;
curl_easy_setopt(curlHandle, CURLOPT_URL, m_url.host().utf8().data());
curl_easy_setopt(curlHandle, CURLOPT_PORT, m_url.port());
curl_easy_setopt(curlHandle, CURLOPT_CONNECT_ONLY, 1L);
if (curl_easy_perform(curlHandle) != CURLE_OK)
return;
ref();
callOnMainThread([this] {
if (refCount() > 1)
didOpenSocket();
deref();
});
while (!m_stopThread) {
sendData(curlHandle);
if (waitForAvailableData(curlHandle, std::chrono::milliseconds(20)))
readData(curlHandle);
}
curl_easy_cleanup(curlHandle);
});
}
void SocketStreamHandle::stopThread()
{
ASSERT(isMainThread());
if (!m_workerThread)
return;
m_stopThread = true;
waitForThreadCompletion(m_workerThread);
m_workerThread = 0;
deref();
}
void SocketStreamHandle::didReceiveData()
{
ASSERT(isMainThread());
m_mutexReceive.lock();
auto receiveData = WTFMove(m_receiveData);
m_mutexReceive.unlock();
for (auto& socketData : receiveData) {
if (socketData.size > 0) {
if (state() == Open)
m_client.didReceiveSocketStreamData(*this, socketData.data.get(), socketData.size);
} else
platformClose();
}
}
void SocketStreamHandle::didOpenSocket()
{
ASSERT(isMainThread());
m_state = Open;
m_client.didOpenSocketStream(*this);
}
std::unique_ptr<char[]> SocketStreamHandle::createCopy(const char* data, int length)
{
std::unique_ptr<char[]> copy(new char[length]);
memcpy(copy.get(), data, length);
return WTFMove(copy);
}
}
#endif