#include <sys/systm.h>
#include <sys/fcntl.h>
#include <sys/file_internal.h>
#include <sys/filedesc.h>
#include <sys/kernel.h>
#include <sys/vnode_internal.h>
#include <sys/malloc.h>
#include <sys/mount_internal.h>
#include <sys/param.h>
#include <sys/proc_internal.h>
#include <sys/sysctl.h>
#include <sys/unistd.h>
#include <sys/user.h>
#include <sys/aio_kern.h>
#include <sys/sysproto.h>
#include <machine/limits.h>
#include <mach/mach_types.h>
#include <kern/kern_types.h>
#include <kern/waitq.h>
#include <kern/zalloc.h>
#include <kern/task.h>
#include <kern/sched_prim.h>
#include <vm/vm_map.h>
#include <libkern/OSAtomic.h>
#include <sys/kdebug.h>
#define AIO_work_queued 1
#define AIO_worker_wake 2
#define AIO_completion_sig 3
#define AIO_completion_cleanup_wait 4
#define AIO_completion_cleanup_wake 5
#define AIO_completion_suspend_wake 6
#define AIO_fsync_delay 7
#define AIO_cancel 10
#define AIO_cancel_async_workq 11
#define AIO_cancel_sync_workq 12
#define AIO_cancel_activeq 13
#define AIO_cancel_doneq 14
#define AIO_fsync 20
#define AIO_read 30
#define AIO_write 40
#define AIO_listio 50
#define AIO_error 60
#define AIO_error_val 61
#define AIO_error_activeq 62
#define AIO_error_workq 63
#define AIO_return 70
#define AIO_return_val 71
#define AIO_return_activeq 72
#define AIO_return_workq 73
#define AIO_exec 80
#define AIO_exit 90
#define AIO_exit_sleep 91
#define AIO_close 100
#define AIO_close_sleep 101
#define AIO_suspend 110
#define AIO_suspend_sleep 111
#define AIO_worker_thread 120
#if 0
#undef KERNEL_DEBUG
#define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
#endif
typedef struct aio_workq {
TAILQ_HEAD(, aio_workq_entry) aioq_entries;
int aioq_count;
lck_mtx_t aioq_mtx;
struct waitq aioq_waitq;
} *aio_workq_t;
#define AIO_NUM_WORK_QUEUES 1
struct aio_anchor_cb
{
volatile int32_t aio_inflight_count;
volatile int32_t aio_done_count;
volatile int32_t aio_total_count;
int aio_num_workqs;
struct aio_workq aio_async_workqs[AIO_NUM_WORK_QUEUES];
};
typedef struct aio_anchor_cb aio_anchor_cb;
struct aio_lio_context
{
int io_waiter;
int io_issued;
int io_completed;
};
typedef struct aio_lio_context aio_lio_context;
#define AIO_SUSPEND_SLEEP_CHAN p_aio_active_count
#define AIO_CLEANUP_SLEEP_CHAN p_aio_total_count
#define ASSERT_AIO_FROM_PROC(aiop, theproc) \
if ((aiop)->procp != (theproc)) { \
panic("AIO on a proc list that does not belong to that proc.\n"); \
}
static void aio_proc_lock(proc_t procp);
static void aio_proc_lock_spin(proc_t procp);
static void aio_proc_unlock(proc_t procp);
static lck_mtx_t* aio_proc_mutex(proc_t procp);
static void aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp);
static void aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp);
static int aio_get_process_count(proc_t procp );
static int aio_active_requests_for_process(proc_t procp );
static int aio_proc_active_requests_for_file(proc_t procp, int fd);
static boolean_t is_already_queued(proc_t procp, user_addr_t aiocbp );
static boolean_t should_cancel(aio_workq_entry *entryp, user_addr_t aiocbp, int fd);
static void aio_entry_lock(aio_workq_entry *entryp);
static void aio_entry_lock_spin(aio_workq_entry *entryp);
static aio_workq_t aio_entry_workq(aio_workq_entry *entryp);
static lck_mtx_t* aio_entry_mutex(__unused aio_workq_entry *entryp);
static void aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
static void aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
static void aio_entry_ref_locked(aio_workq_entry *entryp);
static void aio_entry_unref_locked(aio_workq_entry *entryp);
static void aio_entry_ref(aio_workq_entry *entryp);
static void aio_entry_unref(aio_workq_entry *entryp);
static void aio_entry_update_for_cancel(aio_workq_entry *entryp, boolean_t cancelled,
int wait_for_completion, boolean_t disable_notification);
static int aio_entry_try_workq_remove(aio_workq_entry *entryp);
static boolean_t aio_delay_fsync_request( aio_workq_entry *entryp );
static int aio_free_request(aio_workq_entry *entryp);
static void aio_workq_init(aio_workq_t wq);
static void aio_workq_lock_spin(aio_workq_t wq);
static void aio_workq_unlock(aio_workq_t wq);
static lck_mtx_t* aio_workq_mutex(aio_workq_t wq);
static void aio_work_thread( void );
static aio_workq_entry *aio_get_some_work( void );
static int aio_get_all_queues_count( void );
static int aio_queue_async_request(proc_t procp, user_addr_t aiocbp, int kindOfIO );
static int aio_validate( aio_workq_entry *entryp );
static int aio_increment_total_count(void);
static int aio_decrement_total_count(void);
static int do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, int wait_for_completion, boolean_t disable_notification );
static void do_aio_completion( aio_workq_entry *entryp );
static int do_aio_fsync( aio_workq_entry *entryp );
static int do_aio_read( aio_workq_entry *entryp );
static int do_aio_write( aio_workq_entry *entryp );
static void do_munge_aiocb_user32_to_user( struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp );
static void do_munge_aiocb_user64_to_user( struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp );
static int lio_create_entry(proc_t procp,
user_addr_t aiocbp,
void *group_tag,
aio_workq_entry **entrypp );
static aio_workq_entry *aio_create_queue_entry(proc_t procp,
user_addr_t aiocbp,
void *group_tag,
int kindOfIO);
static user_addr_t *aio_copy_in_list(proc_t procp, user_addr_t aiocblist, int nent);
static void free_lio_context(aio_lio_context* context);
static void aio_enqueue_work( proc_t procp, aio_workq_entry *entryp, int proc_locked);
#define ASSERT_AIO_PROC_LOCK_OWNED(p) lck_mtx_assert(aio_proc_mutex((p)), LCK_MTX_ASSERT_OWNED)
#define ASSERT_AIO_WORKQ_LOCK_OWNED(q) lck_mtx_assert(aio_workq_mutex((q)), LCK_MTX_ASSERT_OWNED)
#define ASSERT_AIO_ENTRY_LOCK_OWNED(e) lck_mtx_assert(aio_entry_mutex((e)), LCK_MTX_ASSERT_OWNED)
extern int dofileread(vfs_context_t ctx, struct fileproc *fp,
user_addr_t bufp, user_size_t nbyte,
off_t offset, int flags, user_ssize_t *retval );
extern int dofilewrite(vfs_context_t ctx, struct fileproc *fp,
user_addr_t bufp, user_size_t nbyte, off_t offset,
int flags, user_ssize_t *retval );
#if DEBUG
static uint32_t lio_contexts_alloced = 0;
#endif
extern int aio_max_requests;
extern int aio_max_requests_per_process;
extern int aio_worker_threads;
static aio_anchor_cb aio_anchor;
static lck_grp_t *aio_proc_lock_grp;
static lck_grp_t *aio_entry_lock_grp;
static lck_grp_t *aio_queue_lock_grp;
static lck_attr_t *aio_lock_attr;
static lck_grp_attr_t *aio_lock_grp_attr;
static struct zone *aio_workq_zonep;
static lck_mtx_t aio_entry_mtx;
static lck_mtx_t aio_proc_mtx;
static void
aio_entry_lock(__unused aio_workq_entry *entryp)
{
lck_mtx_lock(&aio_entry_mtx);
}
static void
aio_entry_lock_spin(__unused aio_workq_entry *entryp)
{
lck_mtx_lock_spin(&aio_entry_mtx);
}
static void
aio_entry_unlock(__unused aio_workq_entry *entryp)
{
lck_mtx_unlock(&aio_entry_mtx);
}
static aio_workq_t
aio_entry_workq(__unused aio_workq_entry *entryp)
{
return &aio_anchor.aio_async_workqs[0];
}
static lck_mtx_t*
aio_entry_mutex(__unused aio_workq_entry *entryp)
{
return &aio_entry_mtx;
}
static void
aio_workq_init(aio_workq_t wq)
{
TAILQ_INIT(&wq->aioq_entries);
wq->aioq_count = 0;
lck_mtx_init(&wq->aioq_mtx, aio_queue_lock_grp, aio_lock_attr);
waitq_init(&wq->aioq_waitq, SYNC_POLICY_FIFO|SYNC_POLICY_DISABLE_IRQ);
}
static void
aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
{
ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
if (entryp->aio_workq_link.tqe_prev == NULL) {
panic("Trying to remove an entry from a work queue, but it is not on a queue\n");
}
TAILQ_REMOVE(&queue->aioq_entries, entryp, aio_workq_link);
queue->aioq_count--;
entryp->aio_workq_link.tqe_prev = NULL;
if (queue->aioq_count < 0) {
panic("Negative count on a queue.\n");
}
}
static void
aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
{
ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
TAILQ_INSERT_TAIL(&queue->aioq_entries, entryp, aio_workq_link);
if (queue->aioq_count < 0) {
panic("Negative count on a queue.\n");
}
queue->aioq_count++;
}
static void
aio_proc_lock(proc_t procp)
{
lck_mtx_lock(aio_proc_mutex(procp));
}
static void
aio_proc_lock_spin(proc_t procp)
{
lck_mtx_lock_spin(aio_proc_mutex(procp));
}
static void
aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp)
{
ASSERT_AIO_PROC_LOCK_OWNED(procp);
TAILQ_REMOVE(&procp->p_aio_activeq, entryp, aio_proc_link );
TAILQ_INSERT_TAIL( &procp->p_aio_doneq, entryp, aio_proc_link);
procp->p_aio_active_count--;
OSIncrementAtomic(&aio_anchor.aio_done_count);
}
static void
aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp)
{
TAILQ_REMOVE(&procp->p_aio_doneq, entryp, aio_proc_link);
OSDecrementAtomic(&aio_anchor.aio_done_count);
aio_decrement_total_count();
procp->p_aio_total_count--;
}
static void
aio_proc_unlock(proc_t procp)
{
lck_mtx_unlock(aio_proc_mutex(procp));
}
static lck_mtx_t*
aio_proc_mutex(proc_t procp)
{
return &procp->p_mlock;
}
static void
aio_entry_ref_locked(aio_workq_entry *entryp)
{
ASSERT_AIO_ENTRY_LOCK_OWNED(entryp);
if (entryp->aio_refcount < 0) {
panic("AIO workq entry with a negative refcount.\n");
}
entryp->aio_refcount++;
}
static void
aio_entry_unref_locked(aio_workq_entry *entryp)
{
ASSERT_AIO_ENTRY_LOCK_OWNED(entryp);
entryp->aio_refcount--;
if (entryp->aio_refcount < 0) {
panic("AIO workq entry with a negative refcount.\n");
}
}
static void
aio_entry_ref(aio_workq_entry *entryp)
{
aio_entry_lock_spin(entryp);
aio_entry_ref_locked(entryp);
aio_entry_unlock(entryp);
}
static void
aio_entry_unref(aio_workq_entry *entryp)
{
aio_entry_lock_spin(entryp);
aio_entry_unref_locked(entryp);
if ((entryp->aio_refcount == 0) && ((entryp->flags & AIO_DO_FREE) != 0)) {
aio_entry_unlock(entryp);
aio_free_request(entryp);
} else {
aio_entry_unlock(entryp);
}
return;
}
static void
aio_entry_update_for_cancel(aio_workq_entry *entryp, boolean_t cancelled, int wait_for_completion, boolean_t disable_notification)
{
aio_entry_lock_spin(entryp);
if (cancelled) {
aio_entry_ref_locked(entryp);
entryp->errorval = ECANCELED;
entryp->returnval = -1;
}
if ( wait_for_completion ) {
entryp->flags |= wait_for_completion;
}
if ( disable_notification ) {
entryp->flags |= AIO_DISABLE;
}
aio_entry_unlock(entryp);
}
static int
aio_entry_try_workq_remove(aio_workq_entry *entryp)
{
if (entryp->aio_workq_link.tqe_prev != NULL) {
aio_workq_t queue;
queue = aio_entry_workq(entryp);
aio_workq_lock_spin(queue);
if (entryp->aio_workq_link.tqe_prev != NULL) {
aio_workq_remove_entry_locked(queue, entryp);
aio_workq_unlock(queue);
return 1;
} else {
aio_workq_unlock(queue);
}
}
return 0;
}
static void
aio_workq_lock_spin(aio_workq_t wq)
{
lck_mtx_lock_spin(aio_workq_mutex(wq));
}
static void
aio_workq_unlock(aio_workq_t wq)
{
lck_mtx_unlock(aio_workq_mutex(wq));
}
static lck_mtx_t*
aio_workq_mutex(aio_workq_t wq)
{
return &wq->aioq_mtx;
}
int
aio_cancel(proc_t p, struct aio_cancel_args *uap, int *retval )
{
struct user_aiocb my_aiocb;
int result;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel)) | DBG_FUNC_START,
(int)p, (int)uap->aiocbp, 0, 0, 0 );
if (aio_get_all_queues_count() < 1) {
result = 0;
*retval = AIO_ALLDONE;
goto ExitRoutine;
}
*retval = -1;
if ( uap->aiocbp != USER_ADDR_NULL ) {
if ( proc_is64bit(p) ) {
struct user64_aiocb aiocb64;
result = copyin( uap->aiocbp, &aiocb64, sizeof(aiocb64) );
if (result == 0 )
do_munge_aiocb_user64_to_user(&aiocb64, &my_aiocb);
} else {
struct user32_aiocb aiocb32;
result = copyin( uap->aiocbp, &aiocb32, sizeof(aiocb32) );
if ( result == 0 )
do_munge_aiocb_user32_to_user( &aiocb32, &my_aiocb );
}
if ( result != 0 ) {
result = EAGAIN;
goto ExitRoutine;
}
if ( uap->fd != my_aiocb.aio_fildes ) {
result = EBADF;
goto ExitRoutine;
}
}
aio_proc_lock(p);
result = do_aio_cancel_locked( p, uap->fd, uap->aiocbp, 0, FALSE );
ASSERT_AIO_PROC_LOCK_OWNED(p);
aio_proc_unlock(p);
if ( result != -1 ) {
*retval = result;
result = 0;
goto ExitRoutine;
}
result = EBADF;
ExitRoutine:
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel)) | DBG_FUNC_END,
(int)p, (int)uap->aiocbp, result, 0, 0 );
return( result );
}
__private_extern__ void
_aio_close(proc_t p, int fd )
{
int error;
if (aio_get_all_queues_count() < 1) {
return;
}
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close)) | DBG_FUNC_START,
(int)p, fd, 0, 0, 0 );
aio_proc_lock(p);
error = do_aio_cancel_locked( p, fd, 0, AIO_CLOSE_WAIT, FALSE );
ASSERT_AIO_PROC_LOCK_OWNED(p);
if ( error == AIO_NOTCANCELED ) {
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close_sleep)) | DBG_FUNC_NONE,
(int)p, fd, 0, 0, 0 );
while (aio_proc_active_requests_for_file(p, fd) > 0) {
msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_close", 0 );
}
}
aio_proc_unlock(p);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close)) | DBG_FUNC_END,
(int)p, fd, 0, 0, 0 );
return;
}
int
aio_error(proc_t p, struct aio_error_args *uap, int *retval )
{
aio_workq_entry *entryp;
int error;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error)) | DBG_FUNC_START,
(int)p, (int)uap->aiocbp, 0, 0, 0 );
if (aio_get_all_queues_count() < 1) {
return EINVAL;
}
aio_proc_lock(p);
TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) {
if ( entryp->uaiocbp == uap->aiocbp ) {
ASSERT_AIO_FROM_PROC(entryp, p);
aio_entry_lock_spin(entryp);
*retval = entryp->errorval;
error = 0;
aio_entry_unlock(entryp);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_val)) | DBG_FUNC_NONE,
(int)p, (int)uap->aiocbp, *retval, 0, 0 );
goto ExitRoutine;
}
}
TAILQ_FOREACH( entryp, &p->p_aio_activeq, aio_proc_link) {
if ( entryp->uaiocbp == uap->aiocbp ) {
ASSERT_AIO_FROM_PROC(entryp, p);
*retval = EINPROGRESS;
error = 0;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_activeq)) | DBG_FUNC_NONE,
(int)p, (int)uap->aiocbp, *retval, 0, 0 );
goto ExitRoutine;
}
}
error = EINVAL;
ExitRoutine:
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error)) | DBG_FUNC_END,
(int)p, (int)uap->aiocbp, error, 0, 0 );
aio_proc_unlock(p);
return( error );
}
int
aio_fsync(proc_t p, struct aio_fsync_args *uap, int *retval )
{
int error;
int fsync_kind;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync)) | DBG_FUNC_START,
(int)p, (int)uap->aiocbp, uap->op, 0, 0 );
*retval = 0;
if (uap->op == O_SYNC || uap->op == 0)
fsync_kind = AIO_FSYNC;
else if ( uap->op == O_DSYNC )
fsync_kind = AIO_DSYNC;
else {
*retval = -1;
error = EINVAL;
goto ExitRoutine;
}
error = aio_queue_async_request( p, uap->aiocbp, fsync_kind );
if ( error != 0 )
*retval = -1;
ExitRoutine:
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync)) | DBG_FUNC_END,
(int)p, (int)uap->aiocbp, error, 0, 0 );
return( error );
}
int
aio_read(proc_t p, struct aio_read_args *uap, int *retval )
{
int error;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_read)) | DBG_FUNC_START,
(int)p, (int)uap->aiocbp, 0, 0, 0 );
*retval = 0;
error = aio_queue_async_request( p, uap->aiocbp, AIO_READ );
if ( error != 0 )
*retval = -1;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_read)) | DBG_FUNC_END,
(int)p, (int)uap->aiocbp, error, 0, 0 );
return( error );
}
int
aio_return(proc_t p, struct aio_return_args *uap, user_ssize_t *retval )
{
aio_workq_entry *entryp;
int error;
boolean_t proc_lock_held = FALSE;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return)) | DBG_FUNC_START,
(int)p, (int)uap->aiocbp, 0, 0, 0 );
if (aio_get_all_queues_count() < 1) {
error = EINVAL;
goto ExitRoutine;
}
aio_proc_lock(p);
proc_lock_held = TRUE;
*retval = 0;
TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) {
ASSERT_AIO_FROM_PROC(entryp, p);
if ( entryp->uaiocbp == uap->aiocbp ) {
aio_proc_remove_done_locked(p, entryp);
aio_entry_lock(entryp);
aio_proc_unlock(p);
proc_lock_held = FALSE;
*retval = entryp->returnval;
error = 0;
if (entryp->aio_refcount == 0) {
aio_entry_unlock(entryp);
aio_free_request(entryp);
}
else {
entryp->flags |= AIO_DO_FREE;
aio_entry_unlock(entryp);
}
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_val)) | DBG_FUNC_NONE,
(int)p, (int)uap->aiocbp, *retval, 0, 0 );
goto ExitRoutine;
}
}
TAILQ_FOREACH( entryp, &p->p_aio_activeq, aio_proc_link) {
ASSERT_AIO_FROM_PROC(entryp, p);
if ( entryp->uaiocbp == uap->aiocbp ) {
error = EINPROGRESS;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_activeq)) | DBG_FUNC_NONE,
(int)p, (int)uap->aiocbp, *retval, 0, 0 );
goto ExitRoutine;
}
}
error = EINVAL;
ExitRoutine:
if (proc_lock_held)
aio_proc_unlock(p);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return)) | DBG_FUNC_END,
(int)p, (int)uap->aiocbp, error, 0, 0 );
return( error );
}
__private_extern__ void
_aio_exec(proc_t p )
{
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exec)) | DBG_FUNC_START,
(int)p, 0, 0, 0, 0 );
_aio_exit( p );
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exec)) | DBG_FUNC_END,
(int)p, 0, 0, 0, 0 );
return;
}
__private_extern__ void
_aio_exit(proc_t p )
{
int error;
aio_workq_entry *entryp;
if (aio_get_all_queues_count() < 1) {
return;
}
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit)) | DBG_FUNC_START,
(int)p, 0, 0, 0, 0 );
aio_proc_lock(p);
error = do_aio_cancel_locked( p, 0, 0, AIO_EXIT_WAIT, TRUE );
ASSERT_AIO_PROC_LOCK_OWNED(p);
if ( error == AIO_NOTCANCELED ) {
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit_sleep)) | DBG_FUNC_NONE,
(int)p, 0, 0, 0, 0 );
while (p->p_aio_active_count != 0) {
msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_exit", 0 );
}
}
if (p->p_aio_active_count != 0) {
panic("Exiting process has %d active AIOs after cancellation has completed.\n", p->p_aio_active_count);
}
entryp = TAILQ_FIRST( &p->p_aio_doneq );
while ( entryp != NULL ) {
ASSERT_AIO_FROM_PROC(entryp, p);
aio_workq_entry *next_entryp;
next_entryp = TAILQ_NEXT( entryp, aio_proc_link);
aio_proc_remove_done_locked(p, entryp);
aio_entry_lock_spin(entryp);
if (entryp->aio_refcount == 0) {
aio_proc_unlock(p);
aio_entry_unlock(entryp);
aio_free_request(entryp);
aio_proc_lock(p);
entryp = TAILQ_FIRST( &p->p_aio_doneq );
continue;
}
else {
entryp->flags |= AIO_DO_FREE;
}
aio_entry_unlock(entryp);
entryp = next_entryp;
}
aio_proc_unlock(p);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit)) | DBG_FUNC_END,
(int)p, 0, 0, 0, 0 );
return;
}
static boolean_t
should_cancel(aio_workq_entry *entryp, user_addr_t aiocbp, int fd)
{
if ( (aiocbp == USER_ADDR_NULL && fd == 0) ||
(aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) ||
(aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) {
return TRUE;
}
return FALSE;
}
static int
do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp,
int wait_for_completion, boolean_t disable_notification )
{
ASSERT_AIO_PROC_LOCK_OWNED(p);
aio_workq_entry *entryp;
int result;
result = -1;
entryp = TAILQ_FIRST(&p->p_aio_activeq);
while ( entryp != NULL ) {
ASSERT_AIO_FROM_PROC(entryp, p);
aio_workq_entry *next_entryp;
next_entryp = TAILQ_NEXT( entryp, aio_proc_link);
if (!should_cancel(entryp, aiocbp, fd)) {
entryp = next_entryp;
continue;
}
if (aio_entry_try_workq_remove(entryp) != 0) {
aio_entry_update_for_cancel(entryp, TRUE, 0, disable_notification);
aio_proc_move_done_locked(p, entryp);
aio_proc_unlock(p);
result = AIO_CANCELED;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
do_aio_completion(entryp);
aio_entry_unref(entryp);
aio_proc_lock(p);
if ( aiocbp != USER_ADDR_NULL ) {
return( result );
}
entryp = TAILQ_FIRST(&p->p_aio_activeq);
result = -1;
} else {
result = AIO_NOTCANCELED;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_activeq)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
aio_entry_update_for_cancel(entryp, FALSE, wait_for_completion, disable_notification);
if ( aiocbp != USER_ADDR_NULL ) {
return( result );
}
entryp = next_entryp;
}
}
if ( result == -1 ) {
TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
ASSERT_AIO_FROM_PROC(entryp, p);
if (should_cancel(entryp, aiocbp, fd)) {
result = AIO_ALLDONE;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_doneq)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
if ( aiocbp != USER_ADDR_NULL ) {
return( result );
}
}
}
}
return( result );
}
int
aio_suspend(proc_t p, struct aio_suspend_args *uap, int *retval )
{
__pthread_testcancel(1);
return(aio_suspend_nocancel(p, (struct aio_suspend_nocancel_args *)uap, retval));
}
int
aio_suspend_nocancel(proc_t p, struct aio_suspend_nocancel_args *uap, int *retval )
{
int error;
int i, count;
uint64_t abstime;
struct user_timespec ts;
aio_workq_entry *entryp;
user_addr_t *aiocbpp;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend)) | DBG_FUNC_START,
(int)p, uap->nent, 0, 0, 0 );
*retval = -1;
abstime = 0;
aiocbpp = NULL;
count = aio_get_all_queues_count( );
if ( count < 1 ) {
error = EINVAL;
goto ExitThisRoutine;
}
if ( uap->nent < 1 || uap->nent > aio_max_requests_per_process ) {
error = EINVAL;
goto ExitThisRoutine;
}
if ( uap->timeoutp != USER_ADDR_NULL ) {
if ( proc_is64bit(p) ) {
struct user64_timespec temp;
error = copyin( uap->timeoutp, &temp, sizeof(temp) );
if ( error == 0 ) {
ts.tv_sec = temp.tv_sec;
ts.tv_nsec = temp.tv_nsec;
}
}
else {
struct user32_timespec temp;
error = copyin( uap->timeoutp, &temp, sizeof(temp) );
if ( error == 0 ) {
ts.tv_sec = temp.tv_sec;
ts.tv_nsec = temp.tv_nsec;
}
}
if ( error != 0 ) {
error = EAGAIN;
goto ExitThisRoutine;
}
if ( ts.tv_sec < 0 || ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000 ) {
error = EINVAL;
goto ExitThisRoutine;
}
nanoseconds_to_absolutetime( (uint64_t)ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec,
&abstime );
clock_absolutetime_interval_to_deadline( abstime, &abstime );
}
aiocbpp = aio_copy_in_list(p, uap->aiocblist, uap->nent);
if ( aiocbpp == NULL ) {
error = EAGAIN;
goto ExitThisRoutine;
}
check_for_our_aiocbp:
aio_proc_lock_spin(p);
for ( i = 0; i < uap->nent; i++ ) {
user_addr_t aiocbp;
aiocbp = *(aiocbpp + i);
if ( aiocbp == USER_ADDR_NULL )
continue;
TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) {
ASSERT_AIO_FROM_PROC(entryp, p);
if ( entryp->uaiocbp == aiocbp ) {
aio_proc_unlock(p);
*retval = 0;
error = 0;
goto ExitThisRoutine;
}
}
}
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend_sleep)) | DBG_FUNC_NONE,
(int)p, uap->nent, 0, 0, 0 );
error = msleep1(&p->AIO_SUSPEND_SLEEP_CHAN, aio_proc_mutex(p), PCATCH | PWAIT | PDROP, "aio_suspend", abstime);
if ( error == 0 ) {
goto check_for_our_aiocbp;
}
else if ( error == EWOULDBLOCK ) {
error = EAGAIN;
}
else {
error = EINTR;
}
ExitThisRoutine:
if ( aiocbpp != NULL )
FREE( aiocbpp, M_TEMP );
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend)) | DBG_FUNC_END,
(int)p, uap->nent, error, 0, 0 );
return( error );
}
int
aio_write(proc_t p, struct aio_write_args *uap, int *retval )
{
int error;
*retval = 0;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_write)) | DBG_FUNC_START,
(int)p, (int)uap->aiocbp, 0, 0, 0 );
error = aio_queue_async_request( p, uap->aiocbp, AIO_WRITE );
if ( error != 0 )
*retval = -1;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_write)) | DBG_FUNC_END,
(int)p, (int)uap->aiocbp, error, 0, 0 );
return( error );
}
static user_addr_t *
aio_copy_in_list(proc_t procp, user_addr_t aiocblist, int nent)
{
user_addr_t *aiocbpp;
int i, result;
MALLOC( aiocbpp, user_addr_t *, (nent * sizeof(user_addr_t)), M_TEMP, M_WAITOK );
if ( aiocbpp == NULL )
goto err;
result = copyin( aiocblist, aiocbpp,
proc_is64bit(procp) ? (nent * sizeof(user64_addr_t))
: (nent * sizeof(user32_addr_t)) );
if ( result) {
FREE( aiocbpp, M_TEMP );
aiocbpp = NULL;
goto err;
}
if ( !proc_is64bit(procp) ) {
user32_addr_t *my_ptrp = ((user32_addr_t *)aiocbpp) + (nent - 1);
user_addr_t *my_addrp = aiocbpp + (nent - 1);
for (i = 0; i < nent; i++, my_ptrp--, my_addrp--) {
*my_addrp = (user_addr_t) (*my_ptrp);
}
}
err:
return (aiocbpp);
}
static int
aio_copy_in_sigev(proc_t procp, user_addr_t sigp, struct user_sigevent *sigev)
{
int result = 0;
if (sigp == USER_ADDR_NULL)
goto out;
if ( proc_is64bit(procp) ) {
struct user64_sigevent sigevent64;
result = copyin( sigp, &sigevent64, sizeof(sigevent64) );
if ( result == 0 ) {
sigev->sigev_notify = sigevent64.sigev_notify;
sigev->sigev_signo = sigevent64.sigev_signo;
sigev->sigev_value.size_equivalent.sival_int = sigevent64.sigev_value.size_equivalent.sival_int;
sigev->sigev_notify_function = sigevent64.sigev_notify_function;
sigev->sigev_notify_attributes = sigevent64.sigev_notify_attributes;
}
} else {
struct user32_sigevent sigevent32;
result = copyin( sigp, &sigevent32, sizeof(sigevent32) );
if ( result == 0 ) {
sigev->sigev_notify = sigevent32.sigev_notify;
sigev->sigev_signo = sigevent32.sigev_signo;
sigev->sigev_value.size_equivalent.sival_int = sigevent32.sigev_value.sival_int;
sigev->sigev_notify_function = CAST_USER_ADDR_T(sigevent32.sigev_notify_function);
sigev->sigev_notify_attributes = CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes);
}
}
if ( result != 0 ) {
result = EAGAIN;
}
out:
return (result);
}
static void
aio_enqueue_work( proc_t procp, aio_workq_entry *entryp, int proc_locked)
{
#if 0
aio_workq_entry *my_entryp;
#endif
aio_workq_t queue = aio_entry_workq(entryp);
if (proc_locked == 0) {
aio_proc_lock(procp);
}
ASSERT_AIO_PROC_LOCK_OWNED(procp);
TAILQ_INSERT_TAIL(&procp->p_aio_activeq, entryp, aio_proc_link);
procp->p_aio_active_count++;
procp->p_aio_total_count++;
aio_workq_lock_spin(queue);
aio_workq_add_entry_locked(queue, entryp);
waitq_wakeup64_one(&queue->aioq_waitq, CAST_EVENT64_T(queue),
THREAD_AWAKENED, WAITQ_ALL_PRIORITIES);
aio_workq_unlock(queue);
if (proc_locked == 0) {
aio_proc_unlock(procp);
}
#if 0
entryp->priority = (((2 * NZERO) - 1) - procp->p_nice);
if (entryp->aiocb.aio_reqprio < 0)
entryp->aiocb.aio_reqprio = 0;
if (entryp->aiocb.aio_reqprio > 0) {
entryp->priority -= entryp->aiocb.aio_reqprio;
if (entryp->priority < 0)
entryp->priority = 0;
}
TAILQ_FOREACH(my_entryp, &aio_anchor.aio_async_workq, aio_workq_link) {
if ( entryp->priority <= my_entryp->priority) {
TAILQ_INSERT_BEFORE(my_entryp, entryp, aio_workq_link);
break;
}
}
if (my_entryp == NULL)
TAILQ_INSERT_TAIL( &aio_anchor.aio_async_workq, entryp, aio_workq_link );
#endif
}
int
lio_listio(proc_t p, struct lio_listio_args *uap, int *retval )
{
int i;
int call_result;
int result;
int old_count;
aio_workq_entry **entryp_listp;
user_addr_t *aiocbpp;
struct user_sigevent aiosigev;
aio_lio_context *lio_context;
boolean_t free_context = FALSE;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_listio)) | DBG_FUNC_START,
(int)p, uap->nent, uap->mode, 0, 0 );
entryp_listp = NULL;
lio_context = NULL;
aiocbpp = NULL;
call_result = -1;
*retval = -1;
if ( !(uap->mode == LIO_NOWAIT || uap->mode == LIO_WAIT) ) {
call_result = EINVAL;
goto ExitRoutine;
}
if ( uap->nent < 1 || uap->nent > AIO_LISTIO_MAX ) {
call_result = EINVAL;
goto ExitRoutine;
}
MALLOC( entryp_listp, void *, (uap->nent * sizeof(aio_workq_entry *)), M_TEMP, M_WAITOK );
if ( entryp_listp == NULL ) {
call_result = EAGAIN;
goto ExitRoutine;
}
MALLOC( lio_context, aio_lio_context*, sizeof(aio_lio_context), M_TEMP, M_WAITOK );
if ( lio_context == NULL ) {
call_result = EAGAIN;
goto ExitRoutine;
}
#if DEBUG
OSIncrementAtomic(&lio_contexts_alloced);
#endif
bzero(lio_context, sizeof(aio_lio_context));
aiocbpp = aio_copy_in_list(p, uap->aiocblist, uap->nent);
if ( aiocbpp == NULL ) {
call_result = EAGAIN;
goto ExitRoutine;
}
bzero(&aiosigev, sizeof(aiosigev));
if (uap->sigp != USER_ADDR_NULL) {
call_result = aio_copy_in_sigev(p, uap->sigp, &aiosigev);
if ( call_result)
goto ExitRoutine;
}
lio_context->io_issued = uap->nent;
lio_context->io_waiter = uap->mode == LIO_WAIT ? 1 : 0;
for ( i = 0; i < uap->nent; i++ ) {
user_addr_t my_aiocbp;
aio_workq_entry *entryp;
*(entryp_listp + i) = NULL;
my_aiocbp = *(aiocbpp + i);
if ( my_aiocbp == USER_ADDR_NULL ) {
aio_proc_lock_spin(p);
lio_context->io_issued--;
aio_proc_unlock(p);
continue;
}
result = lio_create_entry( p, my_aiocbp, lio_context, (entryp_listp + i) );
if ( result != 0 && call_result == -1 )
call_result = result;
entryp = *(entryp_listp + i);
if ( entryp == NULL ) {
aio_proc_lock_spin(p);
lio_context->io_issued--;
aio_proc_unlock(p);
continue;
}
if ( uap->mode == LIO_NOWAIT ) {
entryp->aiocb.aio_sigevent = aiosigev;
} else {
entryp->flags |= AIO_LIO_NOTIFY;
}
old_count = aio_increment_total_count();
aio_proc_lock_spin(p);
if ( old_count >= aio_max_requests ||
aio_get_process_count( entryp->procp ) >= aio_max_requests_per_process ||
is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) {
lio_context->io_issued--;
aio_proc_unlock(p);
aio_decrement_total_count();
if ( call_result == -1 )
call_result = EAGAIN;
aio_free_request(entryp);
entryp_listp[i] = NULL;
continue;
}
lck_mtx_convert_spin(aio_proc_mutex(p));
aio_enqueue_work(p, entryp, 1);
aio_proc_unlock(p);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE,
(int)p, (int)entryp->uaiocbp, 0, 0, 0 );
}
switch(uap->mode) {
case LIO_WAIT:
aio_proc_lock_spin(p);
while (lio_context->io_completed < lio_context->io_issued) {
result = msleep(lio_context, aio_proc_mutex(p), PCATCH | PRIBIO | PSPIN, "lio_listio", 0);
if (result != 0) {
call_result = EINTR;
lio_context->io_waiter = 0;
break;
}
}
if (lio_context->io_completed == lio_context->io_issued) {
free_context = TRUE;
}
aio_proc_unlock(p);
break;
case LIO_NOWAIT:
break;
}
if ( call_result == -1 ) {
call_result = 0;
*retval = 0;
}
ExitRoutine:
if ( entryp_listp != NULL )
FREE( entryp_listp, M_TEMP );
if ( aiocbpp != NULL )
FREE( aiocbpp, M_TEMP );
if ((lio_context != NULL) && ((lio_context->io_issued == 0) || (free_context == TRUE))) {
free_lio_context(lio_context);
}
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_listio)) | DBG_FUNC_END,
(int)p, call_result, 0, 0, 0 );
return( call_result );
}
static void
aio_work_thread( void )
{
aio_workq_entry *entryp;
int error;
vm_map_t currentmap;
vm_map_t oldmap = VM_MAP_NULL;
task_t oldaiotask = TASK_NULL;
struct uthread *uthreadp = NULL;
for( ;; ) {
entryp = aio_get_some_work();
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_START,
(int)entryp->procp, (int)entryp->uaiocbp, entryp->flags, 0, 0 );
currentmap = get_task_map( (current_proc())->task );
if ( currentmap != entryp->aio_map ) {
uthreadp = (struct uthread *) get_bsdthread_info(current_thread());
oldaiotask = uthreadp->uu_aio_task;
uthreadp->uu_aio_task = entryp->procp->task;
oldmap = vm_map_switch( entryp->aio_map );
}
if ( (entryp->flags & AIO_READ) != 0 ) {
error = do_aio_read( entryp );
}
else if ( (entryp->flags & AIO_WRITE) != 0 ) {
error = do_aio_write( entryp );
}
else if ( (entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0 ) {
error = do_aio_fsync( entryp );
}
else {
printf( "%s - unknown aio request - flags 0x%02X \n",
__FUNCTION__, entryp->flags );
error = EINVAL;
}
if ( currentmap != entryp->aio_map ) {
(void) vm_map_switch( oldmap );
uthreadp->uu_aio_task = oldaiotask;
}
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_END,
(int)entryp->procp, (int)entryp->uaiocbp, entryp->errorval,
entryp->returnval, 0 );
aio_entry_lock_spin(entryp);
entryp->errorval = error;
aio_entry_unlock(entryp);
aio_proc_lock(entryp->procp);
aio_proc_move_done_locked(entryp->procp, entryp);
aio_proc_unlock(entryp->procp);
OSDecrementAtomic(&aio_anchor.aio_inflight_count);
if ( VM_MAP_NULL != entryp->aio_map ) {
vm_map_t my_map;
my_map = entryp->aio_map;
entryp->aio_map = VM_MAP_NULL;
vm_map_deallocate( my_map );
}
do_aio_completion( entryp );
aio_entry_unref(entryp);
}
}
static aio_workq_entry *
aio_get_some_work( void )
{
aio_workq_entry *entryp = NULL;
aio_workq_t queue = NULL;
queue = &aio_anchor.aio_async_workqs[0];
aio_workq_lock_spin(queue);
if (queue->aioq_count == 0) {
goto nowork;
}
for(;;) {
entryp = TAILQ_FIRST(&queue->aioq_entries);
if (entryp == NULL) {
goto nowork;
}
aio_workq_remove_entry_locked(queue, entryp);
aio_workq_unlock(queue);
if ( (entryp->flags & AIO_FSYNC) != 0 ) {
aio_proc_lock_spin(entryp->procp);
if ( aio_delay_fsync_request( entryp ) ) {
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
aio_proc_unlock(entryp->procp);
aio_workq_lock_spin(queue);
aio_workq_add_entry_locked(queue, entryp);
continue;
}
aio_proc_unlock(entryp->procp);
}
break;
}
aio_entry_ref(entryp);
OSIncrementAtomic(&aio_anchor.aio_inflight_count);
return( entryp );
nowork:
waitq_assert_wait64(&queue->aioq_waitq, CAST_EVENT64_T(queue), THREAD_UNINT, 0);
aio_workq_unlock(queue);
thread_block( (thread_continue_t)aio_work_thread );
return NULL;
}
static boolean_t
aio_delay_fsync_request( aio_workq_entry *entryp )
{
if (entryp == TAILQ_FIRST(&entryp->procp->p_aio_activeq)) {
return FALSE;
}
return TRUE;
}
static aio_workq_entry *
aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, void *group_tag, int kindOfIO)
{
aio_workq_entry *entryp;
int result = 0;
entryp = (aio_workq_entry *) zalloc( aio_workq_zonep );
if ( entryp == NULL ) {
result = EAGAIN;
goto error_exit;
}
bzero( entryp, sizeof(*entryp) );
entryp->procp = procp;
entryp->uaiocbp = aiocbp;
entryp->flags |= kindOfIO;
entryp->group_tag = group_tag;
entryp->aio_map = VM_MAP_NULL;
entryp->aio_refcount = 0;
if ( proc_is64bit(procp) ) {
struct user64_aiocb aiocb64;
result = copyin( aiocbp, &aiocb64, sizeof(aiocb64) );
if (result == 0 )
do_munge_aiocb_user64_to_user(&aiocb64, &entryp->aiocb);
} else {
struct user32_aiocb aiocb32;
result = copyin( aiocbp, &aiocb32, sizeof(aiocb32) );
if ( result == 0 )
do_munge_aiocb_user32_to_user( &aiocb32, &entryp->aiocb );
}
if ( result != 0 ) {
result = EAGAIN;
goto error_exit;
}
entryp->aio_map = get_task_map( procp->task );
vm_map_reference( entryp->aio_map );
result = aio_validate( entryp );
if ( result != 0 )
goto error_exit_with_ref;
entryp->thread = current_thread();
thread_reference( entryp->thread );
return ( entryp );
error_exit_with_ref:
if ( VM_MAP_NULL != entryp->aio_map ) {
vm_map_deallocate( entryp->aio_map );
}
error_exit:
if ( result && entryp != NULL ) {
zfree( aio_workq_zonep, entryp );
entryp = NULL;
}
return ( entryp );
}
static int
aio_queue_async_request(proc_t procp, user_addr_t aiocbp, int kindOfIO )
{
aio_workq_entry *entryp;
int result;
int old_count;
old_count = aio_increment_total_count();
if (old_count >= aio_max_requests) {
result = EAGAIN;
goto error_noalloc;
}
entryp = aio_create_queue_entry( procp, aiocbp, 0, kindOfIO);
if ( entryp == NULL ) {
result = EAGAIN;
goto error_noalloc;
}
aio_proc_lock_spin(procp);
if ( is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) {
result = EAGAIN;
goto error_exit;
}
if (aio_get_process_count( procp ) >= aio_max_requests_per_process) {
printf("aio_queue_async_request(): too many in flight for proc: %d.\n", procp->p_aio_total_count);
result = EAGAIN;
goto error_exit;
}
lck_mtx_convert_spin(aio_proc_mutex(procp));
aio_enqueue_work(procp, entryp, 1);
aio_proc_unlock(procp);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE,
(int)procp, (int)aiocbp, 0, 0, 0 );
return( 0 );
error_exit:
aio_proc_unlock(procp);
aio_free_request(entryp);
error_noalloc:
aio_decrement_total_count();
return( result );
}
static int
lio_create_entry(proc_t procp, user_addr_t aiocbp, void *group_tag,
aio_workq_entry **entrypp )
{
aio_workq_entry *entryp;
int result;
entryp = aio_create_queue_entry( procp, aiocbp, group_tag, AIO_LIO);
if ( entryp == NULL ) {
result = EAGAIN;
goto error_exit;
}
if ( entryp->aiocb.aio_lio_opcode == LIO_NOP ) {
result = 0;
goto error_exit;
}
*entrypp = entryp;
return( 0 );
error_exit:
if ( entryp != NULL ) {
aio_free_request(entryp);
}
return( result );
}
static int
aio_free_request(aio_workq_entry *entryp)
{
if ( VM_MAP_NULL != entryp->aio_map) {
vm_map_deallocate(entryp->aio_map);
}
if ( NULL != entryp->thread ) {
thread_deallocate( entryp->thread );
}
entryp->aio_refcount = -1;
zfree( aio_workq_zonep, entryp );
return( 0 );
}
static int
aio_validate( aio_workq_entry *entryp )
{
struct fileproc *fp;
int flag;
int result;
result = 0;
if ( (entryp->flags & AIO_LIO) != 0 ) {
if ( entryp->aiocb.aio_lio_opcode == LIO_READ )
entryp->flags |= AIO_READ;
else if ( entryp->aiocb.aio_lio_opcode == LIO_WRITE )
entryp->flags |= AIO_WRITE;
else if ( entryp->aiocb.aio_lio_opcode == LIO_NOP )
return( 0 );
else
return( EINVAL );
}
flag = FREAD;
if ( (entryp->flags & (AIO_WRITE | AIO_FSYNC | AIO_DSYNC)) != 0 ) {
flag = FWRITE;
}
if ( (entryp->flags & (AIO_READ | AIO_WRITE)) != 0 ) {
if ( entryp->aiocb.aio_nbytes > INT_MAX ||
entryp->aiocb.aio_buf == USER_ADDR_NULL ||
entryp->aiocb.aio_offset < 0 )
return( EINVAL );
}
switch ( entryp->aiocb.aio_sigevent.sigev_notify ) {
case SIGEV_SIGNAL:
{
int signum;
signum = entryp->aiocb.aio_sigevent.sigev_signo;
if ( signum <= 0 || signum >= NSIG ||
signum == SIGKILL || signum == SIGSTOP )
return (EINVAL);
}
break;
case SIGEV_NONE:
break;
case SIGEV_THREAD:
default:
return (EINVAL);
}
proc_fdlock(entryp->procp);
result = fp_lookup( entryp->procp, entryp->aiocb.aio_fildes, &fp , 1);
if ( result == 0 ) {
if ( (fp->f_fglob->fg_flag & flag) == 0 ) {
result = EBADF;
}
else if ( FILEGLOB_DTYPE(fp->f_fglob) != DTYPE_VNODE ) {
result = ESPIPE;
} else
fp->f_flags |= FP_AIOISSUED;
fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp , 1);
}
else {
result = EBADF;
}
proc_fdunlock(entryp->procp);
return( result );
}
static int
aio_increment_total_count()
{
return OSIncrementAtomic(&aio_anchor.aio_total_count);
}
static int
aio_decrement_total_count()
{
int old = OSDecrementAtomic(&aio_anchor.aio_total_count);
if (old <= 0) {
panic("Negative total AIO count!\n");
}
return old;
}
static int
aio_get_process_count(proc_t procp )
{
return procp->p_aio_total_count;
}
static int
aio_get_all_queues_count( void )
{
return aio_anchor.aio_total_count;
}
static void
do_aio_completion( aio_workq_entry *entryp )
{
boolean_t lastLioCompleted = FALSE;
aio_lio_context *lio_context = NULL;
int waiter = 0;
lio_context = (aio_lio_context *)entryp->group_tag;
if (lio_context != NULL) {
aio_proc_lock_spin(entryp->procp);
lio_context->io_completed++;
if (lio_context->io_issued == lio_context->io_completed) {
lastLioCompleted = TRUE;
}
waiter = lio_context->io_waiter;
if ((entryp->flags & AIO_LIO_NOTIFY) && (lastLioCompleted) && (waiter != 0)) {
wakeup(lio_context);
}
aio_proc_unlock(entryp->procp);
}
if ( entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL &&
(entryp->flags & AIO_DISABLE) == 0 ) {
boolean_t performSignal = FALSE;
if (lio_context == NULL) {
performSignal = TRUE;
}
else {
performSignal = lastLioCompleted;
}
if (performSignal) {
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_sig)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp,
entryp->aiocb.aio_sigevent.sigev_signo, 0, 0 );
psignal( entryp->procp, entryp->aiocb.aio_sigevent.sigev_signo );
}
}
if ((entryp->flags & AIO_EXIT_WAIT) && (entryp->flags & AIO_CLOSE_WAIT)) {
panic("Close and exit flags set at the same time\n");
}
if ( (entryp->flags & AIO_EXIT_WAIT) != 0 ) {
int active_requests;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
aio_proc_lock_spin(entryp->procp);
active_requests = aio_active_requests_for_process( entryp->procp );
if ( active_requests < 1 ) {
wakeup_one((caddr_t)&entryp->procp->AIO_CLEANUP_SLEEP_CHAN);
aio_proc_unlock(entryp->procp);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
} else {
aio_proc_unlock(entryp->procp);
}
}
if ( (entryp->flags & AIO_CLOSE_WAIT) != 0 ) {
int active_requests;
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
aio_proc_lock_spin(entryp->procp);
active_requests = aio_proc_active_requests_for_file( entryp->procp, entryp->aiocb.aio_fildes);
if ( active_requests < 1 ) {
wakeup(&entryp->procp->AIO_CLEANUP_SLEEP_CHAN);
aio_proc_unlock(entryp->procp);
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
} else {
aio_proc_unlock(entryp->procp);
}
}
wakeup( (caddr_t) &entryp->procp->AIO_SUSPEND_SLEEP_CHAN );
KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_suspend_wake)) | DBG_FUNC_NONE,
(int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
if (lastLioCompleted && (waiter == 0))
free_lio_context (lio_context);
}
static int
do_aio_read( aio_workq_entry *entryp )
{
struct fileproc *fp;
int error;
struct vfs_context context;
if ( (error = fp_lookup(entryp->procp, entryp->aiocb.aio_fildes, &fp , 0)) )
return(error);
if ( (fp->f_fglob->fg_flag & FREAD) == 0 ) {
fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
return(EBADF);
}
context.vc_thread = entryp->thread;
context.vc_ucred = fp->f_fglob->fg_cred;
error = dofileread(&context, fp,
entryp->aiocb.aio_buf,
entryp->aiocb.aio_nbytes,
entryp->aiocb.aio_offset, FOF_OFFSET,
&entryp->returnval);
fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
return( error );
}
static int
do_aio_write( aio_workq_entry *entryp )
{
struct fileproc *fp;
int error, flags;
struct vfs_context context;
if ( (error = fp_lookup(entryp->procp, entryp->aiocb.aio_fildes, &fp , 0)) )
return(error);
if ( (fp->f_fglob->fg_flag & FWRITE) == 0 ) {
fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
return(EBADF);
}
flags = FOF_PCRED;
if ( (fp->f_fglob->fg_flag & O_APPEND) == 0 ) {
flags |= FOF_OFFSET;
}
context.vc_thread = entryp->thread;
context.vc_ucred = fp->f_fglob->fg_cred;
error = dofilewrite(&context,
fp,
entryp->aiocb.aio_buf,
entryp->aiocb.aio_nbytes,
entryp->aiocb.aio_offset,
flags,
&entryp->returnval);
if (entryp->returnval)
fp_drop_written(entryp->procp, entryp->aiocb.aio_fildes, fp);
else
fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
return( error );
}
static int
aio_active_requests_for_process(proc_t procp )
{
return( procp->p_aio_active_count );
}
static int
aio_proc_active_requests_for_file(proc_t procp, int fd)
{
int count = 0;
aio_workq_entry *entryp;
TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
if (entryp->aiocb.aio_fildes == fd) {
count++;
}
}
return count;
}
static int
do_aio_fsync( aio_workq_entry *entryp )
{
struct vfs_context context;
struct vnode *vp;
struct fileproc *fp;
int sync_flag;
int error;
if (entryp->flags & AIO_FSYNC)
sync_flag = MNT_WAIT;
else
sync_flag = MNT_DWAIT;
error = fp_getfvp( entryp->procp, entryp->aiocb.aio_fildes, &fp, &vp);
if ( error == 0 ) {
if ( (error = vnode_getwithref(vp)) ) {
fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
entryp->returnval = -1;
return(error);
}
context.vc_thread = current_thread();
context.vc_ucred = fp->f_fglob->fg_cred;
error = VNOP_FSYNC( vp, sync_flag, &context);
(void)vnode_put(vp);
fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
}
if ( error != 0 )
entryp->returnval = -1;
return( error );
}
static boolean_t
is_already_queued(proc_t procp,
user_addr_t aiocbp )
{
aio_workq_entry *entryp;
boolean_t result;
result = FALSE;
TAILQ_FOREACH( entryp, &procp->p_aio_doneq, aio_proc_link ) {
if ( aiocbp == entryp->uaiocbp ) {
result = TRUE;
goto ExitThisRoutine;
}
}
TAILQ_FOREACH( entryp, &procp->p_aio_activeq, aio_proc_link ) {
if ( aiocbp == entryp->uaiocbp ) {
result = TRUE;
goto ExitThisRoutine;
}
}
ExitThisRoutine:
return( result );
}
static void
free_lio_context(aio_lio_context* context)
{
#if DEBUG
OSDecrementAtomic(&lio_contexts_alloced);
#endif
FREE( context, M_TEMP );
}
__private_extern__ void
aio_init( void )
{
int i;
aio_lock_grp_attr = lck_grp_attr_alloc_init();
aio_proc_lock_grp = lck_grp_alloc_init("aio_proc", aio_lock_grp_attr);;
aio_entry_lock_grp = lck_grp_alloc_init("aio_entry", aio_lock_grp_attr);;
aio_queue_lock_grp = lck_grp_alloc_init("aio_queue", aio_lock_grp_attr);;
aio_lock_attr = lck_attr_alloc_init();
lck_mtx_init(&aio_entry_mtx, aio_entry_lock_grp, aio_lock_attr);
lck_mtx_init(&aio_proc_mtx, aio_proc_lock_grp, aio_lock_attr);
aio_anchor.aio_inflight_count = 0;
aio_anchor.aio_done_count = 0;
aio_anchor.aio_total_count = 0;
aio_anchor.aio_num_workqs = AIO_NUM_WORK_QUEUES;
for (i = 0; i < AIO_NUM_WORK_QUEUES; i++) {
aio_workq_init(&aio_anchor.aio_async_workqs[i]);
}
i = sizeof( aio_workq_entry );
aio_workq_zonep = zinit( i, i * aio_max_requests, i * aio_max_requests, "aiowq" );
_aio_create_worker_threads( aio_worker_threads );
}
__private_extern__ void
_aio_create_worker_threads( int num )
{
int i;
for ( i = 0; i < num; i++ ) {
thread_t myThread;
if ( KERN_SUCCESS != kernel_thread_start((thread_continue_t)aio_work_thread, NULL, &myThread) ) {
printf( "%s - failed to create a work thread \n", __FUNCTION__ );
}
else
thread_deallocate(myThread);
}
return;
}
task_t
get_aiotask(void)
{
return ((struct uthread *)get_bsdthread_info(current_thread()))->uu_aio_task;
}
static void
do_munge_aiocb_user32_to_user( struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp )
{
the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
the_user_aiocbp->aio_buf = CAST_USER_ADDR_T(my_aiocbp->aio_buf);
the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
the_user_aiocbp->aio_sigevent.sigev_value.size_equivalent.sival_int =
my_aiocbp->aio_sigevent.sigev_value.sival_int;
the_user_aiocbp->aio_sigevent.sigev_notify_function =
CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_function);
the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_attributes);
}
static void
do_munge_aiocb_user64_to_user( struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp )
{
the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
the_user_aiocbp->aio_buf = my_aiocbp->aio_buf;
the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
the_user_aiocbp->aio_sigevent.sigev_value.size_equivalent.sival_int =
my_aiocbp->aio_sigevent.sigev_value.size_equivalent.sival_int;
the_user_aiocbp->aio_sigevent.sigev_notify_function =
my_aiocbp->aio_sigevent.sigev_notify_function;
the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
my_aiocbp->aio_sigevent.sigev_notify_attributes;
}