#include <mach/port.h>
#include <mach/message.h>
#include <mach/sync_policy.h>
#include <kern/assert.h>
#include <kern/counters.h>
#include <kern/sched_prim.h>
#include <kern/ipc_kobject.h>
#include <kern/ipc_mig.h>
#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
#include <kern/waitq.h>
#include <ipc/ipc_mqueue.h>
#include <ipc/ipc_kmsg.h>
#include <ipc/ipc_port.h>
#include <ipc/ipc_pset.h>
#include <ipc/ipc_space.h>
#if MACH_FLIPC
#include <ipc/flipc.h>
#endif
#ifdef __LP64__
#include <vm/vm_map.h>
#endif
#include <sys/event.h>
extern char *proc_name_address(void *p);
int ipc_mqueue_full;
int ipc_mqueue_rcv;
void ipc_mqueue_receive_results(wait_result_t result);
static void ipc_mqueue_peek_on_thread(
ipc_mqueue_t port_mq,
mach_msg_option_t option,
thread_t thread);
void
ipc_mqueue_init(
ipc_mqueue_t mqueue,
boolean_t is_set,
uint64_t *reserved_link)
{
if (is_set) {
waitq_set_init(&mqueue->imq_set_queue,
SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST,
reserved_link, NULL);
} else {
waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
ipc_kmsg_queue_init(&mqueue->imq_messages);
mqueue->imq_seqno = 0;
mqueue->imq_msgcount = 0;
mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
mqueue->imq_fullwaiters = FALSE;
#if MACH_FLIPC
mqueue->imq_fport = FPORT_NULL;
#endif
}
klist_init(&mqueue->imq_klist);
}
void ipc_mqueue_deinit(
ipc_mqueue_t mqueue)
{
boolean_t is_set = imq_is_set(mqueue);
if (is_set)
waitq_set_deinit(&mqueue->imq_set_queue);
else
waitq_deinit(&mqueue->imq_wait_queue);
}
void
imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost)
{
*reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
WAITQ_KEEP_LOCKED);
}
void
imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost)
{
assert(imq_held(mq));
waitq_unlock(&mq->imq_wait_queue);
waitq_prepost_release_reserve(reserved_prepost);
}
boolean_t
ipc_mqueue_member(
ipc_mqueue_t port_mqueue,
ipc_mqueue_t set_mqueue)
{
struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
return waitq_member(port_waitq, set_waitq);
}
kern_return_t
ipc_mqueue_remove(
ipc_mqueue_t mqueue,
ipc_mqueue_t set_mqueue)
{
struct waitq *mq_waitq = &mqueue->imq_wait_queue;
struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
return waitq_unlink(mq_waitq, set_waitq);
}
void
ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
{
struct waitq *mq_waitq = &mqueue->imq_wait_queue;
kern_return_t kr;
imq_lock(mqueue);
assert(waitq_valid(mq_waitq));
kr = waitq_unlink_all_unlock(mq_waitq);
}
void
ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
{
struct waitq_set *mq_setq = &mqueue->imq_set_queue;
imq_lock(mqueue);
assert(waitqs_is_set(mq_setq));
waitq_set_unlink_all_unlock(mq_setq);
}
kern_return_t
ipc_mqueue_add(
ipc_mqueue_t port_mqueue,
ipc_mqueue_t set_mqueue,
uint64_t *reserved_link,
uint64_t *reserved_prepost)
{
struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg, next;
kern_return_t kr;
assert(reserved_link && *reserved_link != 0);
imq_lock(port_mqueue);
kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
if (kr != KERN_SUCCESS) {
imq_unlock(port_mqueue);
return kr;
}
kmsgq = &port_mqueue->imq_messages;
for (kmsg = ipc_kmsg_queue_first(kmsgq);
kmsg != IKM_NULL;
kmsg = next) {
next = ipc_kmsg_queue_next(kmsgq, kmsg);
for (;;) {
thread_t th;
mach_msg_size_t msize;
spl_t th_spl;
th = waitq_wakeup64_identify_locked(
port_waitq,
IPC_MQUEUE_RECEIVE,
THREAD_AWAKENED, &th_spl,
reserved_prepost, WAITQ_ALL_PRIORITIES,
WAITQ_KEEP_LOCKED);
if (th == THREAD_NULL)
goto leave;
if (th->ith_state != MACH_RCV_IN_PROGRESS) {
if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
ipc_mqueue_peek_on_thread(port_mqueue,
th->ith_option,
th);
}
thread_unlock(th);
splx(th_spl);
continue;
}
msize = ipc_kmsg_copyout_size(kmsg, th->map);
if (th->ith_rsize <
(msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
th->ith_state = MACH_RCV_TOO_LARGE;
th->ith_msize = msize;
if (th->ith_option & MACH_RCV_LARGE) {
th->ith_receiver_name = port_mqueue->imq_receiver_name;
th->ith_kmsg = IKM_NULL;
th->ith_seqno = 0;
thread_unlock(th);
splx(th_spl);
continue;
}
} else {
th->ith_state = MACH_MSG_SUCCESS;
}
ipc_kmsg_rmqueue(kmsgq, kmsg);
#if MACH_FLIPC
mach_node_t node = kmsg->ikm_node;
#endif
ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
th->ith_kmsg = kmsg;
th->ith_seqno = port_mqueue->imq_seqno++;
thread_unlock(th);
splx(th_spl);
#if MACH_FLIPC
if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport))
flipc_msg_ack(node, port_mqueue, TRUE);
#endif
break;
}
}
leave:
imq_unlock(port_mqueue);
return KERN_SUCCESS;
}
void
ipc_mqueue_changed(
ipc_mqueue_t mqueue)
{
knote_vanish(&mqueue->imq_klist);
waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
IPC_MQUEUE_RECEIVE,
THREAD_RESTART,
NULL,
WAITQ_ALL_PRIORITIES,
WAITQ_KEEP_LOCKED);
}
mach_msg_return_t
ipc_mqueue_send(
ipc_mqueue_t mqueue,
ipc_kmsg_t kmsg,
mach_msg_option_t option,
mach_msg_timeout_t send_timeout)
{
int wresult;
if (!imq_full(mqueue) ||
(!imq_full_kernel(mqueue) &&
((option & MACH_SEND_ALWAYS) ||
(MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
mqueue->imq_msgcount++;
assert(mqueue->imq_msgcount > 0);
imq_unlock(mqueue);
} else {
thread_t cur_thread = current_thread();
uint64_t deadline;
if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
imq_unlock(mqueue);
return MACH_SEND_TIMED_OUT;
}
if (imq_full_kernel(mqueue)) {
imq_unlock(mqueue);
return MACH_SEND_NO_BUFFER;
}
mqueue->imq_fullwaiters = TRUE;
if (option & MACH_SEND_TIMEOUT)
clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
else
deadline = 0;
wresult = waitq_assert_wait64_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_ABORTSAFE,
TIMEOUT_URGENCY_USER_NORMAL,
deadline, TIMEOUT_NO_LEEWAY,
cur_thread);
imq_unlock(mqueue);
if (wresult == THREAD_WAITING) {
wresult = thread_block(THREAD_CONTINUE_NULL);
counter(c_ipc_mqueue_send_block++);
}
switch (wresult) {
case THREAD_AWAKENED:
break;
case THREAD_TIMED_OUT:
assert(option & MACH_SEND_TIMEOUT);
return MACH_SEND_TIMED_OUT;
case THREAD_INTERRUPTED:
return MACH_SEND_INTERRUPTED;
case THREAD_RESTART:
return MACH_SEND_INVALID_DEST;
default:
panic("ipc_mqueue_send");
}
}
ipc_mqueue_post(mqueue, kmsg, option);
return MACH_MSG_SUCCESS;
}
extern void ipc_mqueue_override_send(
ipc_mqueue_t mqueue,
mach_msg_priority_t override)
{
boolean_t __unused full_queue_empty = FALSE;
imq_lock(mqueue);
assert(imq_valid(mqueue));
assert(!imq_is_set(mqueue));
if (imq_full(mqueue)) {
ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override))
KNOTE(&mqueue->imq_klist, 0);
if (!first)
full_queue_empty = TRUE;
}
imq_unlock(mqueue);
#if DEVELOPMENT || DEBUG
if (full_queue_empty) {
ipc_port_t port = ip_from_mq(mqueue);
int dst_pid = 0;
if (ip_active(port) && !port->ip_tempowner &&
port->ip_receiver_name && port->ip_receiver &&
port->ip_receiver != ipc_space_kernel) {
dst_pid = task_pid(port->ip_receiver->is_task);
}
printf("%s[%d] could not override mqueue (dst:%d) with 0x%x: "
"queue slots are full, but there are no messages!\n",
proc_name_address(current_task()->bsd_info),
task_pid(current_task()), dst_pid, override);
}
#endif
}
void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
{
(void)set_mq;
assert(imq_held(port_mq));
assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
port_mq->imq_msgcount--;
if (!imq_full(port_mq) && port_mq->imq_fullwaiters) {
if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_AWAKENED,
NULL,
WAITQ_PROMOTE_PRIORITY,
WAITQ_KEEP_LOCKED) != KERN_SUCCESS) {
port_mq->imq_fullwaiters = FALSE;
} else {
port_mq->imq_msgcount++;
}
}
if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
waitq_clear_prepost_locked(&port_mq->imq_wait_queue);
}
}
void
ipc_mqueue_post(
ipc_mqueue_t mqueue,
ipc_kmsg_t kmsg,
mach_msg_option_t __unused option)
{
uint64_t reserved_prepost = 0;
boolean_t destroy_msg = FALSE;
ipc_kmsg_trace_send(kmsg, option);
imq_reserve_and_lock(mqueue, &reserved_prepost);
if (!imq_valid(mqueue)) {
destroy_msg = TRUE;
goto out_unlock;
}
for (;;) {
struct waitq *waitq = &mqueue->imq_wait_queue;
spl_t th_spl;
thread_t receiver;
mach_msg_size_t msize;
receiver = waitq_wakeup64_identify_locked(waitq,
IPC_MQUEUE_RECEIVE,
THREAD_AWAKENED,
&th_spl,
&reserved_prepost,
WAITQ_ALL_PRIORITIES,
WAITQ_KEEP_LOCKED);
if (receiver == THREAD_NULL) {
if (mqueue->imq_msgcount > 0) {
if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg))
KNOTE(&mqueue->imq_klist, 0);
break;
}
destroy_msg = TRUE;
goto out_unlock;
}
if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
ipc_mqueue_peek_on_thread(mqueue, receiver->ith_option, receiver);
thread_unlock(receiver);
splx(th_spl);
break;
}
if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
thread_unlock(receiver);
splx(th_spl);
continue;
}
msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
if (receiver->ith_rsize <
(msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
receiver->ith_msize = msize;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
receiver->ith_state = MACH_MSG_SUCCESS;
}
if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
!(receiver->ith_option & MACH_RCV_LARGE)) {
receiver->ith_kmsg = kmsg;
receiver->ith_seqno = mqueue->imq_seqno++;
#if MACH_FLIPC
mach_node_t node = kmsg->ikm_node;
#endif
thread_unlock(receiver);
splx(th_spl);
ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
#if MACH_FLIPC
if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport))
flipc_msg_ack(node, mqueue, TRUE);
#endif
break;
}
receiver->ith_receiver_name = mqueue->imq_receiver_name;
receiver->ith_kmsg = IKM_NULL;
receiver->ith_seqno = 0;
thread_unlock(receiver);
splx(th_spl);
}
out_unlock:
waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
imq_release_and_unlock(mqueue, reserved_prepost);
if (destroy_msg)
ipc_kmsg_destroy(kmsg);
current_task()->messages_sent++;
return;
}
void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)
{
thread_t self = current_thread();
mach_msg_option_t option = self->ith_option;
switch (saved_wait_result) {
case THREAD_TIMED_OUT:
self->ith_state = MACH_RCV_TIMED_OUT;
return;
case THREAD_INTERRUPTED:
self->ith_state = MACH_RCV_INTERRUPTED;
return;
case THREAD_RESTART:
self->ith_state = MACH_RCV_PORT_CHANGED;
return;
case THREAD_AWAKENED:
switch (self->ith_state) {
case MACH_RCV_SCATTER_SMALL:
case MACH_RCV_TOO_LARGE:
if (option & MACH_RCV_LARGE) {
return;
}
case MACH_MSG_SUCCESS:
case MACH_PEEK_READY:
return;
default:
panic("ipc_mqueue_receive_results: strange ith_state");
}
default:
panic("ipc_mqueue_receive_results: strange wait_result");
}
}
void
ipc_mqueue_receive_continue(
__unused void *param,
wait_result_t wresult)
{
ipc_mqueue_receive_results(wresult);
mach_msg_receive_continue();
}
void
ipc_mqueue_receive(
ipc_mqueue_t mqueue,
mach_msg_option_t option,
mach_msg_size_t max_size,
mach_msg_timeout_t rcv_timeout,
int interruptible)
{
wait_result_t wresult;
thread_t self = current_thread();
imq_lock(mqueue);
wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
rcv_timeout, interruptible,
self);
if (wresult == THREAD_NOT_WAITING)
return;
if (wresult == THREAD_WAITING) {
counter((interruptible == THREAD_ABORTSAFE) ?
c_ipc_mqueue_receive_block_user++ :
c_ipc_mqueue_receive_block_kernel++);
if (self->ith_continuation)
thread_block(ipc_mqueue_receive_continue);
wresult = thread_block(THREAD_CONTINUE_NULL);
}
ipc_mqueue_receive_results(wresult);
}
static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
struct waitq_set *wqset)
{
ipc_mqueue_t port_mq, *pmq_ptr;
(void)wqset;
port_mq = (ipc_mqueue_t)waitq;
if (ipc_kmsg_queue_empty(&port_mq->imq_messages))
return WQ_ITERATE_INVALIDATE_CONTINUE;
pmq_ptr = (ipc_mqueue_t *)ctx;
if (pmq_ptr)
*pmq_ptr = port_mq;
return WQ_ITERATE_BREAK_KEEP_LOCKED;
}
wait_result_t
ipc_mqueue_receive_on_thread(
ipc_mqueue_t mqueue,
mach_msg_option_t option,
mach_msg_size_t max_size,
mach_msg_timeout_t rcv_timeout,
int interruptible,
thread_t thread)
{
wait_result_t wresult;
uint64_t deadline;
if (!imq_valid(mqueue)) {
imq_unlock(mqueue);
return THREAD_RESTART;
}
if (imq_is_set(mqueue)) {
ipc_mqueue_t port_mq = IMQ_NULL;
(void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
&port_mq,
mqueue_process_prepost_receive);
if (port_mq != IMQ_NULL) {
imq_unlock(mqueue);
if (option & MACH_PEEK_MSG)
ipc_mqueue_peek_on_thread(port_mq, option, thread);
else
ipc_mqueue_select_on_thread(port_mq, mqueue, option,
max_size, thread);
imq_unlock(port_mq);
return THREAD_NOT_WAITING;
}
} else if (imq_is_queue(mqueue)) {
ipc_kmsg_queue_t kmsgs;
kmsgs = &mqueue->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
if (option & MACH_PEEK_MSG)
ipc_mqueue_peek_on_thread(mqueue, option, thread);
else
ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
max_size, thread);
imq_unlock(mqueue);
return THREAD_NOT_WAITING;
}
} else {
panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
mqueue->imq_wait_queue.waitq_type);
}
if (option & MACH_RCV_TIMEOUT) {
if (rcv_timeout == 0) {
imq_unlock(mqueue);
thread->ith_state = MACH_RCV_TIMED_OUT;
return THREAD_NOT_WAITING;
}
}
thread->ith_option = option;
thread->ith_rsize = max_size;
thread->ith_msize = 0;
if (option & MACH_PEEK_MSG)
thread->ith_state = MACH_PEEK_IN_PROGRESS;
else
thread->ith_state = MACH_RCV_IN_PROGRESS;
if (option & MACH_RCV_TIMEOUT)
clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
else
deadline = 0;
wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
IPC_MQUEUE_RECEIVE,
interruptible,
TIMEOUT_URGENCY_USER_NORMAL,
deadline,
TIMEOUT_NO_LEEWAY,
thread);
if (wresult == THREAD_AWAKENED)
panic("ipc_mqueue_receive_on_thread: sleep walking");
imq_unlock(mqueue);
return wresult;
}
void
ipc_mqueue_peek_on_thread(
ipc_mqueue_t port_mq,
mach_msg_option_t option,
thread_t thread)
{
(void)option;
assert(option & MACH_PEEK_MSG);
assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
ip_reference_mq(port_mq);
thread->ith_peekq = port_mq;
thread->ith_state = MACH_PEEK_READY;
}
void
ipc_mqueue_select_on_thread(
ipc_mqueue_t port_mq,
ipc_mqueue_t set_mq,
mach_msg_option_t option,
mach_msg_size_t max_size,
thread_t thread)
{
ipc_kmsg_t kmsg;
mach_msg_return_t mr = MACH_MSG_SUCCESS;
mach_msg_size_t msize;
kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
assert(kmsg != IKM_NULL);
msize = ipc_kmsg_copyout_size(kmsg, thread->map);
if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
mr = MACH_RCV_TOO_LARGE;
if (option & MACH_RCV_LARGE) {
thread->ith_receiver_name = port_mq->imq_receiver_name;
thread->ith_kmsg = IKM_NULL;
thread->ith_msize = msize;
thread->ith_seqno = 0;
thread->ith_state = mr;
return;
}
}
ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
#if MACH_FLIPC
if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport))
flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
#endif
ipc_mqueue_release_msgcount(port_mq, set_mq);
thread->ith_seqno = port_mq->imq_seqno++;
thread->ith_kmsg = kmsg;
thread->ith_state = mr;
current_task()->messages_received++;
return;
}
unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,
mach_port_seqno_t * seqnop,
mach_msg_size_t * msg_sizep,
mach_msg_id_t * msg_idp,
mach_msg_max_trailer_t * msg_trailerp,
ipc_kmsg_t *kmsgp)
{
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg;
mach_port_seqno_t seqno, msgoff;
unsigned res = 0;
assert(!imq_is_set(mq));
seqno = 0;
if (seqnop != NULL)
seqno = *seqnop;
if (seqno == 0) {
seqno = mq->imq_seqno;
msgoff = 0;
} else if (seqno >= mq->imq_seqno &&
seqno < mq->imq_seqno + mq->imq_msgcount) {
msgoff = seqno - mq->imq_seqno;
} else
goto out;
kmsgq = &mq->imq_messages;
kmsg = ipc_kmsg_queue_first(kmsgq);
while (msgoff-- && kmsg != IKM_NULL) {
kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
}
if (kmsg == IKM_NULL)
goto out;
if (seqnop != NULL)
*seqnop = seqno;
if (msg_sizep != NULL)
*msg_sizep = kmsg->ikm_header->msgh_size;
if (msg_idp != NULL)
*msg_idp = kmsg->ikm_header->msgh_id;
if (msg_trailerp != NULL)
memcpy(msg_trailerp,
(mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
round_msg(kmsg->ikm_header->msgh_size)),
sizeof(mach_msg_max_trailer_t));
if (kmsgp != NULL)
*kmsgp = kmsg;
res = 1;
out:
return res;
}
unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,
mach_port_seqno_t * seqnop,
mach_msg_size_t * msg_sizep,
mach_msg_id_t * msg_idp,
mach_msg_max_trailer_t * msg_trailerp,
ipc_kmsg_t *kmsgp)
{
unsigned res;
imq_lock(mq);
res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
msg_trailerp, kmsgp);
imq_unlock(mq);
return res;
}
void ipc_mqueue_release_peek_ref(ipc_mqueue_t mq)
{
assert(!imq_is_set(mq));
assert(imq_held(mq));
waitq_clear_prepost_locked(&mq->imq_wait_queue);
imq_unlock(mq);
ip_release_mq(mq);
}
static int mqueue_peek_iterator(void *ctx, struct waitq *waitq,
struct waitq_set *wqset)
{
ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
(void)ctx;
(void)wqset;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
return WQ_ITERATE_BREAK;
return WQ_ITERATE_CONTINUE;
}
unsigned
ipc_mqueue_set_peek(ipc_mqueue_t mq)
{
int ret;
imq_lock(mq);
if (!imq_is_valid(mq))
return 1;
ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
mqueue_peek_iterator);
imq_unlock(mq);
return (ret == WQ_ITERATE_BREAK);
}
void
ipc_mqueue_set_gather_member_names(
ipc_space_t space,
ipc_mqueue_t set_mq,
ipc_entry_num_t maxnames,
mach_port_name_t *names,
ipc_entry_num_t *actualp)
{
ipc_entry_t table;
ipc_entry_num_t tsize;
struct waitq_set *wqset;
ipc_entry_num_t actual = 0;
assert(set_mq != IMQ_NULL);
wqset = &set_mq->imq_set_queue;
assert(space != IS_NULL);
is_read_lock(space);
if (!is_active(space)) {
is_read_unlock(space);
goto out;
}
if (!waitq_set_is_valid(wqset)) {
is_read_unlock(space);
goto out;
}
table = space->is_table;
tsize = space->is_table_size;
for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
ipc_entry_t entry = &table[idx];
if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
__IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object);
ipc_mqueue_t mq = &port->ip_messages;
assert(IP_VALID(port));
if (ip_active(port) &&
waitq_member(&mq->imq_wait_queue, wqset)) {
if (actual < maxnames)
names[actual] = mq->imq_receiver_name;
actual++;
}
}
}
is_read_unlock(space);
out:
*actualp = actual;
}
boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
{
ipc_kmsg_queue_t kmqueue;
ipc_kmsg_t kmsg;
boolean_t reap = FALSE;
assert(!imq_is_set(mqueue));
mqueue->imq_fullwaiters = FALSE;
waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_RESTART,
NULL,
WAITQ_ALL_PRIORITIES,
WAITQ_KEEP_LOCKED);
kmqueue = &mqueue->imq_messages;
while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
#if MACH_FLIPC
if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport))
flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
#endif
boolean_t first;
first = ipc_kmsg_delayed_destroy(kmsg);
if (first)
reap = first;
}
mqueue->imq_msgcount = 0;
waitq_invalidate_locked(&mqueue->imq_wait_queue);
waitq_clear_prepost_locked(&mqueue->imq_wait_queue);
assert(mqueue->imq_preposts == 0);
assert(mqueue->imq_in_pset == 0);
return reap;
}
void
ipc_mqueue_set_qlimit(
ipc_mqueue_t mqueue,
mach_port_msgcount_t qlimit)
{
assert(qlimit <= MACH_PORT_QLIMIT_MAX);
imq_lock(mqueue);
if (qlimit > mqueue->imq_qlimit) {
mach_port_msgcount_t i, wakeup;
wakeup = qlimit - mqueue->imq_qlimit;
for (i = 0; i < wakeup; i++) {
if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_AWAKENED,
NULL,
WAITQ_PROMOTE_PRIORITY,
WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) {
mqueue->imq_fullwaiters = FALSE;
break;
}
mqueue->imq_msgcount++;
}
}
mqueue->imq_qlimit = qlimit;
imq_unlock(mqueue);
}
void
ipc_mqueue_set_seqno(
ipc_mqueue_t mqueue,
mach_port_seqno_t seqno)
{
imq_lock(mqueue);
mqueue->imq_seqno = seqno;
imq_unlock(mqueue);
}
mach_msg_return_t
ipc_mqueue_copyin(
ipc_space_t space,
mach_port_name_t name,
ipc_mqueue_t *mqueuep,
ipc_object_t *objectp)
{
ipc_entry_t entry;
ipc_object_t object;
ipc_mqueue_t mqueue;
is_read_lock(space);
if (!is_active(space)) {
is_read_unlock(space);
return MACH_RCV_INVALID_NAME;
}
entry = ipc_entry_lookup(space, name);
if (entry == IE_NULL) {
is_read_unlock(space);
return MACH_RCV_INVALID_NAME;
}
object = entry->ie_object;
if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
ipc_port_t port;
__IGNORE_WCASTALIGN(port = (ipc_port_t) object);
assert(port != IP_NULL);
ip_lock(port);
assert(ip_active(port));
assert(port->ip_receiver_name == name);
assert(port->ip_receiver == space);
is_read_unlock(space);
mqueue = &port->ip_messages;
} else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
ipc_pset_t pset;
__IGNORE_WCASTALIGN(pset = (ipc_pset_t) object);
assert(pset != IPS_NULL);
ips_lock(pset);
assert(ips_active(pset));
is_read_unlock(space);
mqueue = &pset->ips_messages;
} else {
is_read_unlock(space);
return MACH_RCV_INVALID_NAME;
}
io_reference(object);
io_unlock(object);
*objectp = object;
*mqueuep = mqueue;
return MACH_MSG_SUCCESS;
}