ntp_worker.h   [plain text]


/*
 * ntp_worker.h
 */

#ifndef NTP_WORKER_H
#define NTP_WORKER_H

#include "ntp_workimpl.h"

#ifdef WORKER
# if defined(WORK_THREAD) && defined(WORK_PIPE)
#  ifdef HAVE_SEMAPHORE_H
#   include <semaphore.h>
#  endif
# endif
# if defined(WORK_DISPATCH)
#  include <dispatch/dispatch.h>
# endif
#include "ntp_stdlib.h"

/* #define TEST_BLOCKING_WORKER */	/* ntp_config.c ntp_intres.c */

typedef enum blocking_work_req_tag {
	BLOCKING_GETNAMEINFO,
	BLOCKING_GETADDRINFO,
} blocking_work_req;

typedef void (*blocking_work_callback)(blocking_work_req, void *, size_t, void *);

typedef enum blocking_magic_sig_e {
	BLOCKING_REQ_MAGIC  = 0x510c7ecf,
	BLOCKING_RESP_MAGIC = 0x510c7e54,
} blocking_magic_sig;

/*
 * The same header is used for both requests to and responses from
 * the child.  In the child, done_func and context are opaque.
 */
typedef struct blocking_pipe_header_tag {
	size_t			octets;
	blocking_magic_sig	magic_sig;
	blocking_work_req	rtype;
	u_int			child_idx;
	blocking_work_callback	done_func;
	void *			context;
} blocking_pipe_header;

# ifdef WORK_THREAD
#  ifdef SYS_WINNT
typedef struct { HANDLE thnd; } thread_type;
typedef struct { HANDLE shnd; } sema_type;
#  else
typedef pthread_t	thread_type;
typedef sem_t		sema_type;
#  endif
typedef thread_type	*thr_ref;
typedef sema_type	*sem_ref;
# endif

/*
 *
 */
#if defined(WORK_FORK)

typedef struct blocking_child_tag {
	int		reusable;
	int		pid;
	int		req_write_pipe;		/* parent */
	int		resp_read_pipe;
	void *		resp_read_ctx;
	int		req_read_pipe;		/* child */
	int		resp_write_pipe;
	int		ispipe;	
	volatile u_int	resp_ready_seen;	/* signal/scan */
	volatile u_int	resp_ready_done;	/* consumer/mainloop */
} blocking_child;

#elif defined(WORK_THREAD)

typedef struct blocking_child_tag {
	/*
	 * blocking workitems and blocking_responses are
	 * dynamically-sized one-dimensional arrays of pointers to
	 * blocking worker requests and responses.
	 *
	 * IMPORTANT: This structure is shared between threads, and all
	 * access that is not atomic (especially queue operations) must
	 * hold the 'accesslock' semaphore to avoid data races.
	 *
	 * The resource management (thread/semaphore
	 * creation/destruction) functions and functions just testing a
	 * handle are safe because these are only changed by the main
	 * thread when no worker is running on the same data structure.
	 */
	int			reusable;
	sem_ref			accesslock;	/* shared access lock */
	thr_ref			thread_ref;	/* thread 'handle' */

	/* the reuest queue */
	blocking_pipe_header ** volatile
				workitems;
	volatile size_t		workitems_alloc;
	size_t			head_workitem;		/* parent */
	size_t			tail_workitem;		/* child */
	sem_ref			workitems_pending;	/* signalling */

	/* the response queue */
	blocking_pipe_header ** volatile
				responses;
	volatile size_t		responses_alloc;
	size_t			head_response;		/* child */
	size_t			tail_response;		/* parent */

	/* event handles / sem_t pointers */
	sem_ref			wake_scheduled_sleep;

	/* some systems use a pipe for notification, others a semaphore.
	 * Both employ the queue above for the actual data transfer.
	 */
#ifdef WORK_PIPE
	int			resp_read_pipe;		/* parent */
	int			resp_write_pipe;	/* child */
	int			ispipe;
	void *			resp_read_ctx;		/* child */
#else
	sem_ref			responses_pending;	/* signalling */
#endif
	volatile u_int		resp_ready_seen;	/* signal/scan */
	volatile u_int		resp_ready_done;	/* consumer/mainloop */
	sema_type		sem_table[4];
	thread_type		thr_table[1];
} blocking_child;

#elif defined(WORK_DISPATCH)
typedef struct blocking_child_tag {
	int			reusable;
	int			req_write_pipe;		
	int			resp_read_pipe;
	int			req_read_pipe;		
	int			resp_write_pipe;
	int			ispipe;
	void *			resp_read_ctx;		
	volatile u_int		resp_ready_seen;	/* signal/scan */
	volatile u_int		resp_ready_done;	/* consumer/mainloop */
	dispatch_queue_t	queue;
	dispatch_semaphore_t	sleep_sem;
} blocking_child;
#endif

/* we need some global tag to indicate any blocking child may be ready: */
extern volatile u_int		blocking_child_ready_seen;/* signal/scan */
extern volatile u_int		blocking_child_ready_done;/* consumer/mainloop */

extern	blocking_child **	blocking_children;
extern	size_t			blocking_children_alloc;
extern	int			worker_per_query;	/* boolean */
extern	int			intres_req_pending;

extern	u_int	available_blocking_child_slot(void);
extern	int	queue_blocking_request(blocking_work_req, void *,
				       size_t, blocking_work_callback,
				       void *);
extern	int	queue_blocking_response(blocking_child *,
					blocking_pipe_header *, size_t,
					const blocking_pipe_header *);
extern	void	process_blocking_resp(blocking_child *);
extern	void	harvest_blocking_responses(void);
extern	int	send_blocking_req_internal(blocking_child *,
					   blocking_pipe_header *,
					   void *);
extern	int	send_blocking_resp_internal(blocking_child *,
					    blocking_pipe_header *);
extern	blocking_pipe_header *
		receive_blocking_req_internal(blocking_child *);
extern	blocking_pipe_header *
		receive_blocking_resp_internal(blocking_child *);
extern	int	blocking_child_common(blocking_child *);
extern	void	exit_worker(int)
			__attribute__ ((__noreturn__));
extern	int	worker_sleep(blocking_child *, time_t);
extern	void	worker_idle_timer_fired(void);
extern	void	interrupt_worker_sleep(void);
extern	int	req_child_exit(blocking_child *);
#ifndef HAVE_IO_COMPLETION_PORT
extern	int	pipe_socketpair(int fds[2], int *is_pipe);
extern	void	close_all_beyond(int);
extern	void	close_all_except(int);
extern	void	kill_asyncio	(int);
#endif

extern void worker_global_lock(int inOrOut);

# ifdef WORK_PIPE
typedef	void	(*addremove_io_fd_func)(int, int, int);
extern	addremove_io_fd_func		addremove_io_fd;
# elif defined(WORK_THREAD)
extern	void	handle_blocking_resp_sem(void *);
typedef	void	(*addremove_io_semaphore_func)(sem_ref, int);
extern	addremove_io_semaphore_func	addremove_io_semaphore;
# endif

# ifdef WORK_FORK
extern	int				worker_process;
# endif

#endif	/* WORKER */

#if defined(HAVE_DROPROOT) && defined(WORK_FORK)
extern void	fork_deferred_worker(void);
#else
# define	fork_deferred_worker()	do {} while (0)
#endif

#endif	/* !NTP_WORKER_H */