ntp_iocompletionport.c [plain text]
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#if defined (HAVE_IO_COMPLETION_PORT)
#include <stddef.h>
#include <stdio.h>
#include <process.h>
#include "ntp_stdlib.h"
#include "syslog.h"
#include "ntp_machine.h"
#include "ntp_fp.h"
#include "ntp.h"
#include "ntpd.h"
#include "ntp_refclock.h"
#include "ntp_iocompletionport.h"
#include "transmitbuff.h"
static HANDLE hIoCompletionPort = NULL;
static HANDLE WaitableIoEventHandle = NULL;
HANDLE
get_io_event()
{
return( WaitableIoEventHandle );
}
static int
OnExitRequest(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
{
(void) Info;
(void) Bytes;
(void) Key;
return 0;
}
static IoCompletionInfo ExitRequest;
static void
signal_io_completion_port_exit()
{
ExitRequest.iofunction = OnExitRequest;
if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, &ExitRequest.overlapped)) {
msyslog(LOG_ERR, "Can't request service thread to exit: %m");
exit(1);
}
}
static void __cdecl
iocompletionthread(void *NotUsed)
{
DWORD BytesTransferred = 0;
DWORD Key = 0;
IoCompletionInfo * Overlapped = NULL;
(void) NotUsed;
if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL)) {
msyslog(LOG_ERR, "Can't set thread priority: %m");
}
for (;;) {
while (GetQueuedCompletionStatus(hIoCompletionPort, &BytesTransferred, &Key, & (LPOVERLAPPED) Overlapped, INFINITE)) {
if (Overlapped != NULL &&
Overlapped->iofunction != NULL) {
if (!Overlapped->iofunction(Key, Overlapped, BytesTransferred)) {
#ifdef DEBUG
msyslog(LOG_INFO, "Overlapped IO Thread Exits: %m");
#endif
return ;
}
}
}
}
}
extern void
init_io_completion_port(
void
)
{
WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
if (WaitableIoEventHandle == NULL) {
msyslog(LOG_ERR, "Can't create I/O event handle: %m");
exit(1);
}
hIoCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (hIoCompletionPort == NULL) {
msyslog(LOG_ERR, "Can't create I/O completion port: %m");
exit(1);
}
_beginthread(iocompletionthread, 0, NULL);
}
extern void
uninit_io_completion_port(
void
)
{
if (hIoCompletionPort != NULL) {
signal_io_completion_port_exit();
}
}
static int QueueIORead( struct refclockio *rio ) {
struct recvbuf *buff;
buff = get_free_recv_buffer();
if (buff == NULL)
return 0;
buff->fd = rio->fd;
buff->iocompletioninfo.iofunction = OnIoReadComplete;
if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) {
DWORD Result = GetLastError();
switch (Result) {
case NO_ERROR :
case ERROR_HANDLE_EOF :
case ERROR_IO_PENDING :
break ;
default:
msyslog(LOG_ERR, "Can't read from Refclock: %m");
freerecvbuf(buff);
return 0;
}
}
return 1;
}
static int
OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
{
struct recvbuf *buff = NULL;
struct refclockio * rio = (struct refclockio *) i;
buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
if (Bytes > 0) {
get_systime(&buff->recv_time);
buff->recv_length = (int) Bytes;
buff->receiver = rio->clock_recv;
buff->dstadr = NULL;
buff->recv_srcclock = rio->srcclock;
add_full_recv_buffer(buff);
if( !SetEvent( WaitableIoEventHandle ) ) {
#ifdef DEBUG
if (debug > 3) {
printf( "Error %d setting IoEventHandle\n", GetLastError() );
}
#endif
}
buff = NULL;
}
else
freerecvbuf(buff);
QueueIORead( rio );
return 1;
}
int
io_completion_port_add_clock_io(
struct refclockio *rio
)
{
if (NULL == CreateIoCompletionPort((HANDLE) rio->fd, hIoCompletionPort, (DWORD) rio, 0)) {
msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m");
return 1;
}
QueueIORead( rio );
return 0;
}
static unsigned long QueueSocketRecv(SOCKET s) {
struct recvbuf *buff;
buff = get_free_recv_buffer();
if (buff != NULL) {
DWORD BytesReceived = 0;
DWORD Flags = 0;
buff->iocompletioninfo.iofunction = OnSocketRecv;
buff->fd = s;
buff->AddressLength = sizeof(struct sockaddr_in);
if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1,
&BytesReceived, &Flags,
(struct sockaddr *) &buff->recv_srcadr, &buff->AddressLength,
&buff->iocompletioninfo.overlapped, NULL)) {
DWORD Result = WSAGetLastError();
switch (Result) {
case NO_ERROR :
case WSA_IO_INCOMPLETE :
case WSA_WAIT_IO_COMPLETION :
case WSA_IO_PENDING :
break ;
case WSAENOTSOCK :
msyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m");
freerecvbuf(buff);
return 0;
break;
case WSAEFAULT :
msyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
freerecvbuf(buff);
return 0;
break;
default :
;
}
}
}
else
return 0;
return 1;
}
static int
OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
{
struct recvbuf *buff = NULL;
struct interface * inter = (struct interface *) i;
buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
get_systime(&buff->recv_time);
if (Bytes > 0) {
buff->recv_length = (int) Bytes;
buff->receiver = receive;
buff->dstadr = inter;
#ifdef DEBUG
if (debug > 3) {
char strbuffer[256];
DWORD strlength = sizeof(strbuffer);
if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength))
printf("Received %d bytes from %s\n", Bytes, strbuffer);
}
#endif
add_full_recv_buffer(buff);
if( !SetEvent( WaitableIoEventHandle ) ) {
#ifdef DEBUG
if (debug > 3) {
printf( "Error %d setting IoEventHandle\n", GetLastError() );
}
#endif
}
}
else {
freerecvbuf(buff);
}
QueueSocketRecv(inter->fd);
return 1;
}
extern void
io_completion_port_add_socket(struct interface *inter)
{
if (NULL == CreateIoCompletionPort((HANDLE) inter->fd, hIoCompletionPort, (DWORD) inter, 0)) {
msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m");
}
else QueueSocketRecv(inter->fd);
}
static int
OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
{
transmitbuf *buff = NULL;
(void) Bytes;
(void) Key;
buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo));
free_transmit_buffer(buff);
return 1;
}
static int
OnWriteComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
{
transmitbuf *buff = NULL;
(void) Bytes;
(void) Key;
buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo));
free_transmit_buffer(buff);
return 1;
}
DWORD
io_completion_port_sendto(
struct interface *inter,
struct pkt *pkt,
int len,
struct sockaddr_in* dest)
{
transmitbuf *buff = NULL;
DWORD Result = ERROR_SUCCESS;
if (len <= sizeof(buff->pkt)) {
buff = get_free_transmit_buffer();
if (buff != NULL) {
DWORD BytesSent = 0;
DWORD Flags = 0;
memcpy(&buff->pkt, pkt, len);
buff->wsabuf.buf = buff->pkt;
buff->wsabuf.len = len;
buff->iocompletioninfo.iofunction = OnSendToComplete;
Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, sizeof(struct sockaddr_in), &buff->iocompletioninfo.overlapped, NULL);
#ifdef DEBUG
if (debug > 2) {
char strbuffer[256];
DWORD strlength = sizeof(strbuffer);
if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength))
printf("SendTo - %d bytes to %s : %d\n", len, strbuffer, Result);
}
#endif
}
else {
#ifdef DEBUG
if (debug)
printf("No more transmit buffers left - data discarded\n");
#endif
return ~ERROR_SUCCESS;
}
}
else {
#ifdef DEBUG
if (debug) printf("Packet too large\n");
#endif
return ~ERROR_SUCCESS;
}
return Result;
}
DWORD
io_completion_port_write(
HANDLE fd,
char *pkt,
int len)
{
transmitbuf *buff = NULL;
DWORD lpNumberOfBytesWritten;
DWORD Result = -1;
if (len <= sizeof(buff->pkt)) {
buff = get_free_transmit_buffer();
if (buff != NULL) {
DWORD BytesSent = 0;
DWORD Flags = 0;
memcpy(&buff->pkt, pkt, len);
buff->iocompletioninfo.iofunction = OnWriteComplete;
Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, &buff->iocompletioninfo.overlapped);
#ifdef DEBUG
if (debug > 2) {
printf("SendTo - %d bytes %d\n", len, Result);
}
#endif
if (Result) return len;
}
else {
#ifdef DEBUG
if (debug)
printf("No more transmit buffers left - data discarded\n");
#endif
}
}
else {
#ifdef DEBUG
if (debug) printf("Packet too large\n");
#endif
}
return Result;
}
#else
static int NonEmptyCompilationUnit;
#endif