ConnectionUnix.cpp [plain text]
#include "config.h"
#include "Connection.h"
#include "DataReference.h"
#include "SharedMemory.h"
#include "UnixMessage.h"
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <wtf/Assertions.h>
#include <wtf/StdLibExtras.h>
#include <wtf/UniStdExtras.h>
#if USE(GLIB)
#include <gio/gio.h>
#endif
#if defined(SOCK_SEQPACKET) && !OS(DARWIN)
#define SOCKET_TYPE SOCK_SEQPACKET
#else
#if USE(GLIB)
#define SOCKET_TYPE SOCK_STREAM
#else
#define SOCKET_TYPE SOCK_DGRAM
#endif
#endif // SOCK_SEQPACKET
namespace IPC {
static const size_t messageMaxSize = 4096;
static const size_t attachmentMaxAmount = 254;
class AttachmentInfo {
WTF_MAKE_FAST_ALLOCATED;
public:
AttachmentInfo() = default;
void setType(Attachment::Type type) { m_type = type; }
Attachment::Type type() const { return m_type; }
void setSize(size_t size)
{
ASSERT(m_type == Attachment::MappedMemoryType);
m_size = size;
}
size_t size() const
{
ASSERT(m_type == Attachment::MappedMemoryType);
return m_size;
}
void setNull() { m_isNull = true; }
bool isNull() const { return m_isNull; }
private:
Attachment::Type m_type { Attachment::Uninitialized };
bool m_isNull { false };
size_t m_size { 0 };
};
static_assert(sizeof(MessageInfo) + sizeof(AttachmentInfo) * attachmentMaxAmount <= messageMaxSize, "messageMaxSize is too small.");
void Connection::platformInitialize(Identifier identifier)
{
m_socketDescriptor = identifier;
#if USE(GLIB)
m_socket = adoptGRef(g_socket_new_from_fd(m_socketDescriptor, nullptr));
#endif
m_readBuffer.reserveInitialCapacity(messageMaxSize);
m_fileDescriptors.reserveInitialCapacity(attachmentMaxAmount);
}
void Connection::platformInvalidate()
{
#if USE(GLIB)
m_socket = nullptr;
#else
if (m_socketDescriptor != -1)
closeWithRetry(m_socketDescriptor);
#endif
if (!m_isConnected)
return;
#if USE(GLIB)
m_readSocketMonitor.stop();
m_writeSocketMonitor.stop();
#endif
#if PLATFORM(PLAYSTATION)
if (m_socketMonitor) {
m_socketMonitor->detach();
m_socketMonitor = nullptr;
}
#endif
m_socketDescriptor = -1;
m_isConnected = false;
}
bool Connection::processMessage()
{
if (m_readBuffer.size() < sizeof(MessageInfo))
return false;
uint8_t* messageData = m_readBuffer.data();
MessageInfo messageInfo;
memcpy(&messageInfo, messageData, sizeof(messageInfo));
messageData += sizeof(messageInfo);
if (messageInfo.attachmentCount() > attachmentMaxAmount || (!messageInfo.isBodyOutOfLine() && messageInfo.bodySize() > messageMaxSize)) {
ASSERT_NOT_REACHED();
return false;
}
size_t messageLength = sizeof(MessageInfo) + messageInfo.attachmentCount() * sizeof(AttachmentInfo) + (messageInfo.isBodyOutOfLine() ? 0 : messageInfo.bodySize());
if (m_readBuffer.size() < messageLength)
return false;
size_t attachmentFileDescriptorCount = 0;
size_t attachmentCount = messageInfo.attachmentCount();
Vector<AttachmentInfo> attachmentInfo(attachmentCount);
if (attachmentCount) {
memcpy(attachmentInfo.data(), messageData, sizeof(AttachmentInfo) * attachmentCount);
messageData += sizeof(AttachmentInfo) * attachmentCount;
for (size_t i = 0; i < attachmentCount; ++i) {
switch (attachmentInfo[i].type()) {
case Attachment::MappedMemoryType:
case Attachment::SocketType:
if (!attachmentInfo[i].isNull())
attachmentFileDescriptorCount++;
break;
case Attachment::Uninitialized:
default:
break;
}
}
if (messageInfo.isBodyOutOfLine())
attachmentCount--;
}
Vector<Attachment> attachments(attachmentCount);
RefPtr<WebKit::SharedMemory> oolMessageBody;
size_t fdIndex = 0;
for (size_t i = 0; i < attachmentCount; ++i) {
int fd = -1;
switch (attachmentInfo[i].type()) {
case Attachment::MappedMemoryType:
if (!attachmentInfo[i].isNull())
fd = m_fileDescriptors[fdIndex++];
attachments[attachmentCount - i - 1] = Attachment(fd, attachmentInfo[i].size());
break;
case Attachment::SocketType:
if (!attachmentInfo[i].isNull())
fd = m_fileDescriptors[fdIndex++];
attachments[attachmentCount - i - 1] = Attachment(fd);
break;
case Attachment::Uninitialized:
attachments[attachmentCount - i - 1] = Attachment();
default:
break;
}
}
if (messageInfo.isBodyOutOfLine()) {
ASSERT(messageInfo.bodySize());
if (attachmentInfo[attachmentCount].isNull() || attachmentInfo[attachmentCount].size() != messageInfo.bodySize()) {
ASSERT_NOT_REACHED();
return false;
}
WebKit::SharedMemory::Handle handle;
handle.adoptAttachment(IPC::Attachment(m_fileDescriptors[attachmentFileDescriptorCount - 1], attachmentInfo[attachmentCount].size()));
oolMessageBody = WebKit::SharedMemory::map(handle, WebKit::SharedMemory::Protection::ReadOnly);
if (!oolMessageBody) {
ASSERT_NOT_REACHED();
return false;
}
}
ASSERT(attachments.size() == (messageInfo.isBodyOutOfLine() ? messageInfo.attachmentCount() - 1 : messageInfo.attachmentCount()));
uint8_t* messageBody = messageData;
if (messageInfo.isBodyOutOfLine())
messageBody = reinterpret_cast<uint8_t*>(oolMessageBody->data());
auto decoder = Decoder::create(messageBody, messageInfo.bodySize(), nullptr, WTFMove(attachments));
ASSERT(decoder);
if (!decoder)
return false;
processIncomingMessage(WTFMove(decoder));
if (m_readBuffer.size() > messageLength) {
memmove(m_readBuffer.data(), m_readBuffer.data() + messageLength, m_readBuffer.size() - messageLength);
m_readBuffer.shrink(m_readBuffer.size() - messageLength);
} else
m_readBuffer.shrink(0);
if (attachmentFileDescriptorCount) {
if (m_fileDescriptors.size() > attachmentFileDescriptorCount) {
memmove(m_fileDescriptors.data(), m_fileDescriptors.data() + attachmentFileDescriptorCount, (m_fileDescriptors.size() - attachmentFileDescriptorCount) * sizeof(int));
m_fileDescriptors.shrink(m_fileDescriptors.size() - attachmentFileDescriptorCount);
} else
m_fileDescriptors.shrink(0);
}
return true;
}
static ssize_t readBytesFromSocket(int socketDescriptor, Vector<uint8_t>& buffer, Vector<int>& fileDescriptors)
{
struct msghdr message;
memset(&message, 0, sizeof(message));
struct iovec iov[1];
memset(&iov, 0, sizeof(iov));
message.msg_controllen = CMSG_SPACE(sizeof(int) * attachmentMaxAmount);
MallocPtr<char> attachmentDescriptorBuffer = MallocPtr<char>::malloc(sizeof(char) * message.msg_controllen);
memset(attachmentDescriptorBuffer.get(), 0, sizeof(char) * message.msg_controllen);
message.msg_control = attachmentDescriptorBuffer.get();
size_t previousBufferSize = buffer.size();
buffer.grow(buffer.capacity());
iov[0].iov_base = buffer.data() + previousBufferSize;
iov[0].iov_len = buffer.size() - previousBufferSize;
message.msg_iov = iov;
message.msg_iovlen = 1;
while (true) {
ssize_t bytesRead = recvmsg(socketDescriptor, &message, MSG_NOSIGNAL);
if (bytesRead < 0) {
if (errno == EINTR)
continue;
buffer.shrink(previousBufferSize);
return -1;
}
if (message.msg_flags & MSG_CTRUNC) {
buffer.shrink(previousBufferSize);
return -1;
}
struct cmsghdr* controlMessage;
for (controlMessage = CMSG_FIRSTHDR(&message); controlMessage; controlMessage = CMSG_NXTHDR(&message, controlMessage)) {
if (controlMessage->cmsg_level == SOL_SOCKET && controlMessage->cmsg_type == SCM_RIGHTS) {
if (controlMessage->cmsg_len < CMSG_LEN(0) || controlMessage->cmsg_len > CMSG_LEN(sizeof(int) * attachmentMaxAmount)) {
ASSERT_NOT_REACHED();
break;
}
size_t previousFileDescriptorsSize = fileDescriptors.size();
size_t fileDescriptorsCount = (controlMessage->cmsg_len - CMSG_LEN(0)) / sizeof(int);
fileDescriptors.grow(fileDescriptors.size() + fileDescriptorsCount);
memcpy(fileDescriptors.data() + previousFileDescriptorsSize, CMSG_DATA(controlMessage), sizeof(int) * fileDescriptorsCount);
for (size_t i = 0; i < fileDescriptorsCount; ++i) {
if (!setCloseOnExec(fileDescriptors[previousFileDescriptorsSize + i])) {
ASSERT_NOT_REACHED();
break;
}
}
break;
}
}
buffer.shrink(previousBufferSize + bytesRead);
return bytesRead;
}
return -1;
}
void Connection::readyReadHandler()
{
while (true) {
ssize_t bytesRead = readBytesFromSocket(m_socketDescriptor, m_readBuffer, m_fileDescriptors);
if (bytesRead < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
if (errno == ECONNRESET) {
connectionDidClose();
return;
}
if (m_isConnected) {
WTFLogAlways("Error receiving IPC message on socket %d in process %d: %s", m_socketDescriptor, getpid(), strerror(errno));
connectionDidClose();
}
return;
}
if (!bytesRead) {
connectionDidClose();
return;
}
while (true) {
if (!processMessage())
break;
}
}
}
bool Connection::open()
{
if (!setNonBlock(m_socketDescriptor)) {
ASSERT_NOT_REACHED();
return false;
}
RefPtr<Connection> protectedThis(this);
m_isConnected = true;
#if USE(GLIB)
m_readSocketMonitor.start(m_socket.get(), G_IO_IN, m_connectionQueue->runLoop(), [protectedThis] (GIOCondition condition) -> gboolean {
if (condition & G_IO_HUP || condition & G_IO_ERR || condition & G_IO_NVAL) {
protectedThis->connectionDidClose();
return G_SOURCE_REMOVE;
}
if (condition & G_IO_IN) {
protectedThis->readyReadHandler();
return G_SOURCE_CONTINUE;
}
ASSERT_NOT_REACHED();
return G_SOURCE_REMOVE;
});
#endif
#if PLATFORM(PLAYSTATION)
m_socketMonitor = Thread::create("SocketMonitor", [protectedThis] {
{
int fd;
while ((fd = protectedThis->m_socketDescriptor) != -1) {
int maxFd = fd;
fd_set fdSet;
FD_ZERO(&fdSet);
FD_SET(fd, &fdSet);
if (-1 != select(maxFd + 1, &fdSet, 0, 0, 0)) {
if (FD_ISSET(fd, &fdSet))
protectedThis->readyReadHandler();
}
}
}
});
return true;
#endif
m_connectionQueue->dispatch([protectedThis] {
protectedThis->readyReadHandler();
});
return true;
}
bool Connection::platformCanSendOutgoingMessages() const
{
return !m_pendingOutputMessage;
}
bool Connection::sendOutgoingMessage(std::unique_ptr<Encoder> encoder)
{
COMPILE_ASSERT(sizeof(MessageInfo) + attachmentMaxAmount * sizeof(size_t) <= messageMaxSize, AttachmentsFitToMessageInline);
UnixMessage outputMessage(*encoder);
if (outputMessage.attachments().size() > (attachmentMaxAmount - 1)) {
ASSERT_NOT_REACHED();
return false;
}
size_t messageSizeWithBodyInline = sizeof(MessageInfo) + (outputMessage.attachments().size() * sizeof(AttachmentInfo)) + outputMessage.bodySize();
if (messageSizeWithBodyInline > messageMaxSize && outputMessage.bodySize()) {
RefPtr<WebKit::SharedMemory> oolMessageBody = WebKit::SharedMemory::allocate(encoder->bufferSize());
if (!oolMessageBody)
return false;
WebKit::SharedMemory::Handle handle;
if (!oolMessageBody->createHandle(handle, WebKit::SharedMemory::Protection::ReadOnly))
return false;
outputMessage.messageInfo().setBodyOutOfLine();
memcpy(oolMessageBody->data(), outputMessage.body(), outputMessage.bodySize());
outputMessage.appendAttachment(handle.releaseAttachment());
}
return sendOutputMessage(outputMessage);
}
bool Connection::sendOutputMessage(UnixMessage& outputMessage)
{
ASSERT(!m_pendingOutputMessage);
auto& messageInfo = outputMessage.messageInfo();
struct msghdr message;
memset(&message, 0, sizeof(message));
struct iovec iov[3];
memset(&iov, 0, sizeof(iov));
message.msg_iov = iov;
int iovLength = 1;
iov[0].iov_base = reinterpret_cast<void*>(&messageInfo);
iov[0].iov_len = sizeof(messageInfo);
Vector<AttachmentInfo> attachmentInfo;
MallocPtr<char> attachmentFDBuffer;
auto& attachments = outputMessage.attachments();
if (!attachments.isEmpty()) {
int* fdPtr = 0;
size_t attachmentFDBufferLength = std::count_if(attachments.begin(), attachments.end(),
[](const Attachment& attachment) {
return attachment.fileDescriptor() != -1;
});
if (attachmentFDBufferLength) {
attachmentFDBuffer = MallocPtr<char>::malloc(sizeof(char) * CMSG_SPACE(sizeof(int) * attachmentFDBufferLength));
message.msg_control = attachmentFDBuffer.get();
message.msg_controllen = CMSG_SPACE(sizeof(int) * attachmentFDBufferLength);
memset(message.msg_control, 0, message.msg_controllen);
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&message);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int) * attachmentFDBufferLength);
fdPtr = reinterpret_cast<int*>(CMSG_DATA(cmsg));
}
attachmentInfo.resize(attachments.size());
int fdIndex = 0;
for (size_t i = 0; i < attachments.size(); ++i) {
attachmentInfo[i].setType(attachments[i].type());
switch (attachments[i].type()) {
case Attachment::MappedMemoryType:
attachmentInfo[i].setSize(attachments[i].size());
FALLTHROUGH;
case Attachment::SocketType:
if (attachments[i].fileDescriptor() != -1) {
ASSERT(fdPtr);
fdPtr[fdIndex++] = attachments[i].fileDescriptor();
} else
attachmentInfo[i].setNull();
break;
case Attachment::Uninitialized:
default:
break;
}
}
iov[iovLength].iov_base = attachmentInfo.data();
iov[iovLength].iov_len = sizeof(AttachmentInfo) * attachments.size();
++iovLength;
}
if (!messageInfo.isBodyOutOfLine() && outputMessage.bodySize()) {
iov[iovLength].iov_base = reinterpret_cast<void*>(outputMessage.body());
iov[iovLength].iov_len = outputMessage.bodySize();
++iovLength;
}
message.msg_iovlen = iovLength;
while (sendmsg(m_socketDescriptor, &message, MSG_NOSIGNAL) == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
#if USE(GLIB)
m_pendingOutputMessage = makeUnique<UnixMessage>(WTFMove(outputMessage));
m_writeSocketMonitor.start(m_socket.get(), G_IO_OUT, m_connectionQueue->runLoop(), [this, protectedThis = makeRef(*this)] (GIOCondition condition) -> gboolean {
if (condition & G_IO_OUT) {
ASSERT(m_pendingOutputMessage);
m_connectionQueue->dispatch([this, protectedThis = makeRef(*this)] {
m_writeSocketMonitor.stop();
auto message = WTFMove(m_pendingOutputMessage);
if (m_isConnected) {
sendOutputMessage(*message);
sendOutgoingMessages();
}
});
}
return G_SOURCE_REMOVE;
});
return false;
#else
struct pollfd pollfd;
pollfd.fd = m_socketDescriptor;
pollfd.events = POLLOUT;
pollfd.revents = 0;
poll(&pollfd, 1, -1);
continue;
#endif
}
#if OS(LINUX)
if (errno == EPIPE || errno == ECONNRESET)
#else
if (errno == ECONNRESET)
#endif
{
connectionDidClose();
return false;
}
if (m_isConnected)
WTFLogAlways("Error sending IPC message: %s", strerror(errno));
return false;
}
return true;
}
Connection::SocketPair Connection::createPlatformConnection(unsigned options)
{
int sockets[2];
RELEASE_ASSERT(socketpair(AF_UNIX, SOCKET_TYPE, 0, sockets) != -1);
if (options & SetCloexecOnServer) {
if (!setCloseOnExec(sockets[1]))
RELEASE_ASSERT_NOT_REACHED();
}
if (options & SetCloexecOnClient) {
if (!setCloseOnExec(sockets[0]))
RELEASE_ASSERT_NOT_REACHED();
}
SocketPair socketPair = { sockets[0], sockets[1] };
return socketPair;
}
void Connection::willSendSyncMessage(OptionSet<SendSyncOption>)
{
}
void Connection::didReceiveSyncReply(OptionSet<SendSyncOption>)
{
}
}