#include <mach/port.h>
#include <mach/message.h>
#include <mach/sync_policy.h>
#include <kern/assert.h>
#include <kern/counters.h>
#include <kern/sched_prim.h>
#include <kern/ipc_kobject.h>
#include <kern/ipc_mig.h>
#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
#include <kern/wait_queue.h>
#include <ipc/ipc_mqueue.h>
#include <ipc/ipc_kmsg.h>
#include <ipc/ipc_port.h>
#include <ipc/ipc_pset.h>
#include <ipc/ipc_space.h>
#include <ddb/tr.h>
#if CONFIG_MACF_MACH
#include <security/mac_mach_internal.h>
#endif
int ipc_mqueue_full;
int ipc_mqueue_rcv;
#define TR_ENABLE 0
void ipc_mqueue_receive_results(wait_result_t result);
void
ipc_mqueue_init(
ipc_mqueue_t mqueue,
boolean_t is_set)
{
if (is_set) {
wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
} else {
wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
ipc_kmsg_queue_init(&mqueue->imq_messages);
mqueue->imq_seqno = 0;
mqueue->imq_msgcount = 0;
mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
mqueue->imq_fullwaiters = FALSE;
}
}
boolean_t
ipc_mqueue_member(
ipc_mqueue_t port_mqueue,
ipc_mqueue_t set_mqueue)
{
wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
return (wait_queue_member(port_waitq, set_waitq));
}
kern_return_t
ipc_mqueue_remove(
ipc_mqueue_t mqueue,
ipc_mqueue_t set_mqueue)
{
wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
return wait_queue_unlink(mq_waitq, set_waitq);
}
void
ipc_mqueue_remove_from_all(
ipc_mqueue_t mqueue)
{
wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
wait_queue_unlink_all(mq_waitq);
return;
}
void
ipc_mqueue_remove_all(
ipc_mqueue_t mqueue)
{
wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
wait_queue_set_unlink_all(mq_setq);
return;
}
kern_return_t
ipc_mqueue_add(
ipc_mqueue_t port_mqueue,
ipc_mqueue_t set_mqueue)
{
wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg, next;
kern_return_t kr;
spl_t s;
kr = wait_queue_link(port_waitq, set_waitq);
if (kr != KERN_SUCCESS)
return kr;
s = splsched();
imq_lock(port_mqueue);
kmsgq = &port_mqueue->imq_messages;
for (kmsg = ipc_kmsg_queue_first(kmsgq);
kmsg != IKM_NULL;
kmsg = next) {
next = ipc_kmsg_queue_next(kmsgq, kmsg);
for (;;) {
thread_t th;
th = wait_queue_wakeup64_identity_locked(
port_waitq,
IPC_MQUEUE_RECEIVE,
THREAD_AWAKENED,
FALSE);
if (th == THREAD_NULL)
goto leave;
if (th->ith_msize <
kmsg->ikm_header->msgh_size +
REQUESTED_TRAILER_SIZE(th->ith_option)) {
th->ith_state = MACH_RCV_TOO_LARGE;
th->ith_msize = kmsg->ikm_header->msgh_size;
if (th->ith_option & MACH_RCV_LARGE) {
th->ith_kmsg = IKM_NULL;
th->ith_seqno = 0;
thread_unlock(th);
continue;
}
} else {
th->ith_state = MACH_MSG_SUCCESS;
}
ipc_kmsg_rmqueue(kmsgq, kmsg);
ipc_mqueue_release_msgcount(port_mqueue);
th->ith_kmsg = kmsg;
th->ith_seqno = port_mqueue->imq_seqno++;
thread_unlock(th);
break;
}
}
leave:
imq_unlock(port_mqueue);
splx(s);
return KERN_SUCCESS;
}
void
ipc_mqueue_changed(
ipc_mqueue_t mqueue)
{
wait_queue_wakeup64_all_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_RECEIVE,
THREAD_RESTART,
FALSE);
}
mach_msg_return_t
ipc_mqueue_send(
ipc_mqueue_t mqueue,
ipc_kmsg_t kmsg,
mach_msg_option_t option,
mach_msg_timeout_t send_timeout)
{
int wresult;
spl_t s;
s = splsched();
imq_lock(mqueue);
if (!imq_full(mqueue) ||
(option & MACH_SEND_ALWAYS) ||
(MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
MACH_MSG_TYPE_PORT_SEND_ONCE)) {
mqueue->imq_msgcount++;
assert(mqueue->imq_msgcount > 0);
imq_unlock(mqueue);
splx(s);
} else {
thread_t cur_thread = current_thread();
uint64_t deadline;
if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
imq_unlock(mqueue);
splx(s);
return MACH_SEND_TIMED_OUT;
}
mqueue->imq_fullwaiters = TRUE;
thread_lock(cur_thread);
if (option & MACH_SEND_TIMEOUT)
clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
else
deadline = 0;
wresult = wait_queue_assert_wait64_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_ABORTSAFE, deadline,
cur_thread);
thread_unlock(cur_thread);
imq_unlock(mqueue);
splx(s);
if (wresult == THREAD_WAITING) {
wresult = thread_block(THREAD_CONTINUE_NULL);
counter(c_ipc_mqueue_send_block++);
}
switch (wresult) {
case THREAD_TIMED_OUT:
assert(option & MACH_SEND_TIMEOUT);
return MACH_SEND_TIMED_OUT;
case THREAD_AWAKENED:
assert(mqueue->imq_msgcount > 0);
break;
case THREAD_INTERRUPTED:
return MACH_SEND_INTERRUPTED;
case THREAD_RESTART:
default:
panic("ipc_mqueue_send");
}
}
ipc_mqueue_post(mqueue, kmsg);
return MACH_MSG_SUCCESS;
}
void
ipc_mqueue_release_msgcount(
ipc_mqueue_t mqueue)
{
assert(imq_held(mqueue));
assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
mqueue->imq_msgcount--;
if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
if (wait_queue_wakeup64_one_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_AWAKENED,
FALSE) != KERN_SUCCESS) {
mqueue->imq_fullwaiters = FALSE;
} else {
mqueue->imq_msgcount++;
}
}
}
void
ipc_mqueue_post(
register ipc_mqueue_t mqueue,
register ipc_kmsg_t kmsg)
{
spl_t s;
s = splsched();
imq_lock(mqueue);
for (;;) {
wait_queue_t waitq = &mqueue->imq_wait_queue;
thread_t receiver;
receiver = wait_queue_wakeup64_identity_locked(
waitq,
IPC_MQUEUE_RECEIVE,
THREAD_AWAKENED,
FALSE);
if (receiver == THREAD_NULL) {
assert(mqueue->imq_msgcount > 0);
ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
break;
}
if (receiver->ith_msize <
(kmsg->ikm_header->msgh_size) +
REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
receiver->ith_msize = kmsg->ikm_header->msgh_size;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
receiver->ith_state = MACH_MSG_SUCCESS;
}
if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
!(receiver->ith_option & MACH_RCV_LARGE)) {
receiver->ith_kmsg = kmsg;
receiver->ith_seqno = mqueue->imq_seqno++;
thread_unlock(receiver);
ipc_mqueue_release_msgcount(mqueue);
break;
}
receiver->ith_kmsg = IKM_NULL;
receiver->ith_seqno = 0;
thread_unlock(receiver);
}
imq_unlock(mqueue);
splx(s);
current_task()->messages_sent++;
return;
}
void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)
{
thread_t self = current_thread();
mach_msg_option_t option = self->ith_option;
switch (saved_wait_result) {
case THREAD_TIMED_OUT:
self->ith_state = MACH_RCV_TIMED_OUT;
return;
case THREAD_INTERRUPTED:
self->ith_state = MACH_RCV_INTERRUPTED;
return;
case THREAD_RESTART:
self->ith_state = MACH_RCV_PORT_CHANGED;
return;
case THREAD_AWAKENED:
switch (self->ith_state) {
case MACH_RCV_SCATTER_SMALL:
case MACH_RCV_TOO_LARGE:
if (option & MACH_RCV_LARGE) {
return;
}
case MACH_MSG_SUCCESS:
return;
default:
panic("ipc_mqueue_receive_results: strange ith_state");
}
default:
panic("ipc_mqueue_receive_results: strange wait_result");
}
}
void
ipc_mqueue_receive_continue(
__unused void *param,
wait_result_t wresult)
{
ipc_mqueue_receive_results(wresult);
mach_msg_receive_continue();
}
void
ipc_mqueue_receive(
ipc_mqueue_t mqueue,
mach_msg_option_t option,
mach_msg_size_t max_size,
mach_msg_timeout_t rcv_timeout,
int interruptible)
{
ipc_kmsg_queue_t kmsgs;
wait_result_t wresult;
thread_t self;
uint64_t deadline;
spl_t s;
#if CONFIG_MACF_MACH
ipc_labelh_t lh;
task_t task;
int rc;
#endif
s = splsched();
imq_lock(mqueue);
self = current_thread();
if (imq_is_set(mqueue)) {
wait_queue_link_t wql;
ipc_mqueue_t port_mq;
queue_t q;
q = &mqueue->imq_setlinks;
search_set:
queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
port_mq = (ipc_mqueue_t)wql->wql_queue;
kmsgs = &port_mq->imq_messages;
if (!imq_lock_try(port_mq)) {
imq_unlock(mqueue);
splx(s);
mutex_pause(0);
s = splsched();
imq_lock(mqueue);
goto search_set;
}
if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
imq_unlock(port_mq);
continue;
}
queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
imq_unlock(mqueue);
ipc_mqueue_select(port_mq, option, max_size);
imq_unlock(port_mq);
#if CONFIG_MACF_MACH
if (self->ith_kmsg != NULL &&
self->ith_kmsg->ikm_sender != NULL) {
lh = self->ith_kmsg->ikm_sender->label;
task = current_task();
tasklabel_lock(task);
ip_lock(lh->lh_port);
rc = mac_port_check_receive(&task->maclabel,
&lh->lh_label);
ip_unlock(lh->lh_port);
tasklabel_unlock(task);
if (rc)
self->ith_state = MACH_RCV_INVALID_DATA;
}
#endif
splx(s);
return;
}
} else {
kmsgs = &mqueue->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
ipc_mqueue_select(mqueue, option, max_size);
imq_unlock(mqueue);
#if CONFIG_MACF_MACH
if (self->ith_kmsg != NULL &&
self->ith_kmsg->ikm_sender != NULL) {
lh = self->ith_kmsg->ikm_sender->label;
task = current_task();
tasklabel_lock(task);
ip_lock(lh->lh_port);
rc = mac_port_check_receive(&task->maclabel,
&lh->lh_label);
ip_unlock(lh->lh_port);
tasklabel_unlock(task);
if (rc)
self->ith_state = MACH_RCV_INVALID_DATA;
}
#endif
splx(s);
return;
}
}
if (option & MACH_RCV_TIMEOUT) {
if (rcv_timeout == 0) {
imq_unlock(mqueue);
splx(s);
self->ith_state = MACH_RCV_TIMED_OUT;
return;
}
}
thread_lock(self);
self->ith_state = MACH_RCV_IN_PROGRESS;
self->ith_option = option;
self->ith_msize = max_size;
if (option & MACH_RCV_TIMEOUT)
clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
else
deadline = 0;
wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
IPC_MQUEUE_RECEIVE,
interruptible, deadline,
self);
thread_unlock(self);
imq_unlock(mqueue);
splx(s);
if (wresult == THREAD_WAITING) {
counter((interruptible == THREAD_ABORTSAFE) ?
c_ipc_mqueue_receive_block_user++ :
c_ipc_mqueue_receive_block_kernel++);
if (self->ith_continuation)
thread_block(ipc_mqueue_receive_continue);
wresult = thread_block(THREAD_CONTINUE_NULL);
}
ipc_mqueue_receive_results(wresult);
}
void
ipc_mqueue_select(
ipc_mqueue_t mqueue,
mach_msg_option_t option,
mach_msg_size_t max_size)
{
thread_t self = current_thread();
ipc_kmsg_t kmsg;
mach_msg_return_t mr;
mach_msg_size_t rcv_size;
mr = MACH_MSG_SUCCESS;
kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
assert(kmsg != IKM_NULL);
rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
mr = MACH_RCV_TOO_LARGE;
if (option & MACH_RCV_LARGE) {
self->ith_kmsg = IKM_NULL;
self->ith_msize = rcv_size;
self->ith_seqno = 0;
self->ith_state = mr;
return;
}
}
ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
ipc_mqueue_release_msgcount(mqueue);
self->ith_seqno = mqueue->imq_seqno++;
self->ith_kmsg = kmsg;
self->ith_state = mr;
current_task()->messages_received++;
return;
}
void
ipc_mqueue_destroy(
ipc_mqueue_t mqueue)
{
ipc_kmsg_queue_t kmqueue;
ipc_kmsg_t kmsg;
spl_t s;
s = splsched();
imq_lock(mqueue);
mqueue->imq_fullwaiters = FALSE;
wait_queue_wakeup64_all_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_AWAKENED,
FALSE);
kmqueue = &mqueue->imq_messages;
while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
imq_unlock(mqueue);
splx(s);
ipc_kmsg_destroy_dest(kmsg);
s = splsched();
imq_lock(mqueue);
}
imq_unlock(mqueue);
splx(s);
}
void
ipc_mqueue_set_qlimit(
ipc_mqueue_t mqueue,
mach_port_msgcount_t qlimit)
{
spl_t s;
assert(qlimit <= MACH_PORT_QLIMIT_MAX);
s = splsched();
imq_lock(mqueue);
if (qlimit > mqueue->imq_qlimit) {
mach_port_msgcount_t i, wakeup;
wakeup = qlimit - mqueue->imq_qlimit;
for (i = 0; i < wakeup; i++) {
if (wait_queue_wakeup64_one_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_AWAKENED,
FALSE) == KERN_NOT_WAITING) {
mqueue->imq_fullwaiters = FALSE;
break;
}
mqueue->imq_msgcount++;
}
}
mqueue->imq_qlimit = qlimit;
imq_unlock(mqueue);
splx(s);
}
void
ipc_mqueue_set_seqno(
ipc_mqueue_t mqueue,
mach_port_seqno_t seqno)
{
spl_t s;
s = splsched();
imq_lock(mqueue);
mqueue->imq_seqno = seqno;
imq_unlock(mqueue);
splx(s);
}
mach_msg_return_t
ipc_mqueue_copyin(
ipc_space_t space,
mach_port_name_t name,
ipc_mqueue_t *mqueuep,
ipc_object_t *objectp)
{
ipc_entry_t entry;
ipc_object_t object;
ipc_mqueue_t mqueue;
is_read_lock(space);
if (!space->is_active) {
is_read_unlock(space);
return MACH_RCV_INVALID_NAME;
}
entry = ipc_entry_lookup(space, name);
if (entry == IE_NULL) {
is_read_unlock(space);
return MACH_RCV_INVALID_NAME;
}
object = entry->ie_object;
if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
ipc_port_t port;
port = (ipc_port_t) object;
assert(port != IP_NULL);
ip_lock(port);
assert(ip_active(port));
assert(port->ip_receiver_name == name);
assert(port->ip_receiver == space);
is_read_unlock(space);
mqueue = &port->ip_messages;
} else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
ipc_pset_t pset;
pset = (ipc_pset_t) object;
assert(pset != IPS_NULL);
ips_lock(pset);
assert(ips_active(pset));
assert(pset->ips_local_name == name);
is_read_unlock(space);
mqueue = &pset->ips_messages;
} else {
is_read_unlock(space);
return MACH_RCV_INVALID_NAME;
}
io_reference(object);
io_unlock(object);
*objectp = object;
*mqueuep = mqueue;
return MACH_MSG_SUCCESS;
}