CurlJobManager.cpp [plain text]
#include "config.h"
#if USE(CURL)
#include "CurlJobManager.h"
#include <wtf/MainThread.h>
#include <wtf/text/CString.h>
using namespace WebCore;
namespace WebCore {
CurlJobManager::CurlJobManager()
{
}
CurlJobManager::~CurlJobManager()
{
stopThread();
}
bool CurlJobManager::add(CURL* curlHandle)
{
ASSERT(isMainThread());
{
LockHolder locker(m_mutex);
m_pendingHandleList.append(curlHandle);
}
startThreadIfNeeded();
return true;
}
bool CurlJobManager::remove(CURL* curlHandle)
{
LockHolder locker(m_mutex);
m_removedHandleList.append(curlHandle);
return true;
}
int CurlJobManager::getActiveCount() const
{
LockHolder locker(m_mutex);
return m_activeHandleList.size();
}
int CurlJobManager::getPendingCount() const
{
LockHolder locker(m_mutex);
return m_pendingHandleList.size();
}
void CurlJobManager::startThreadIfNeeded()
{
ASSERT(isMainThread());
if (!runThread()) {
if (m_thread)
m_thread->waitForCompletion();
setRunThread(true);
m_thread = Thread::create("curlThread", [this] {
workerThread();
});
}
}
void CurlJobManager::stopThread()
{
setRunThread(false);
if (m_thread) {
m_thread->waitForCompletion();
m_thread = nullptr;
}
}
void CurlJobManager::stopThreadIfIdle()
{
if (!getActiveCount() && !getPendingCount())
setRunThread(false);
}
void CurlJobManager::updateHandleList()
{
LockHolder locker(m_mutex);
int size = m_removedHandleList.size();
for (int i = 0; i < size; i++) {
removeFromCurl(m_removedHandleList[0]);
m_removedHandleList.remove(0);
}
size = m_pendingHandleList.size();
for (int i = 0; i < size; i++) {
addToCurl(m_pendingHandleList[0]);
m_pendingHandleList.remove(0);
}
}
bool CurlJobManager::addToCurl(CURL* curlHandle)
{
CURLMcode retval = m_curlMultiHandle.addHandle(curlHandle);
if (retval == CURLM_OK) {
m_activeHandleList.append(curlHandle);
return true;
}
return false;
}
bool CurlJobManager::removeFromCurl(CURL* curlHandle)
{
int handlePos = m_activeHandleList.find(curlHandle);
if (handlePos < 0)
return true;
CURLMcode retval = m_curlMultiHandle.removeHandle(curlHandle);
if (retval == CURLM_OK) {
m_activeHandleList.remove(handlePos);
return true;
}
return false;
}
void CurlJobManager::workerThread()
{
ASSERT(!isMainThread());
while (runThread()) {
updateHandleList();
int rc = 0;
do {
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
int maxfd = 0;
const int selectTimeoutMS = 5;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = selectTimeoutMS * 1000;
m_curlMultiHandle.getFdSet(fdread, fdwrite, fdexcep, maxfd);
if (maxfd >= 0)
rc = ::select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
} while (rc == -1 && errno == EINTR);
int runningHandles = 0;
while (m_curlMultiHandle.perform(runningHandles) == CURLM_CALL_MULTI_PERFORM) { }
int messagesInQueue = 0;
CURLMsg* msg = m_curlMultiHandle.readInfo(messagesInQueue);
if (!msg)
continue;
CurlJob* job = nullptr;
CURLcode err = curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &job);
UNUSED_PARAM(err);
ASSERT(job);
CurlJobAction action = job->handleCurlMsg(msg);
if (action == CurlJobAction::Finished)
removeFromCurl(msg->easy_handle);
stopThreadIfIdle();
}
}
}
#endif