#include "pth_p.h"
int pth_usleep(unsigned int usec)
{
pth_time_t until;
pth_time_t offset;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
if (usec == 0)
return 0;
offset = pth_time((long)(usec / 1000000), (long)(usec % 1000000));
pth_time_set(&until, PTH_TIME_NOW);
pth_time_add(&until, &offset);
ev = pth_event(PTH_EVENT_TIME|PTH_MODE_STATIC, &ev_key, until);
pth_wait(ev);
return 0;
}
unsigned int pth_sleep(unsigned int sec)
{
pth_time_t until;
pth_time_t offset;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
if (sec == 0)
return 0;
offset = pth_time(sec, 0);
pth_time_set(&until, PTH_TIME_NOW);
pth_time_add(&until, &offset);
ev = pth_event(PTH_EVENT_TIME|PTH_MODE_STATIC, &ev_key, until);
pth_wait(ev);
return 0;
}
int pth_sigmask(int how, const sigset_t *set, sigset_t *oset)
{
int rv;
if (set != NULL)
pth_sc(sigprocmask)(how, &(pth_current->mctx.sigs), NULL);
rv = pth_sc(sigprocmask)(how, set, oset);
return rv;
}
int pth_sigwait(const sigset_t *set, int *sigp)
{
return pth_sigwait_ev(set, sigp, NULL);
}
int pth_sigwait_ev(const sigset_t *set, int *sigp, pth_event_t ev_extra)
{
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
sigset_t pending;
int sig;
if (set == NULL || sigp == NULL)
return_errno(EINVAL, EINVAL);
if (sigpending(&pending) < 0)
sigemptyset(&pending);
for (sig = 1; sig < PTH_NSIG; sig++) {
if (sigismember(set, sig) && sigismember(&pending, sig)) {
pth_util_sigdelete(sig);
*sigp = sig;
return 0;
}
}
ev = pth_event(PTH_EVENT_SIGS|PTH_MODE_STATIC, &ev_key, set, sigp);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev))
return_errno(EINTR, EINTR);
}
return 0;
}
pid_t pth_waitpid(pid_t wpid, int *status, int options)
{
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
pid_t pid;
pth_debug2("pth_waitpid: called from thread \"%s\"", pth_current->name);
for (;;) {
while ( (pid = pth_sc(waitpid)(wpid, status, options|WNOHANG)) < 0
&& errno == EINTR) ;
if (pid == -1 || pid > 0 || (pid == 0 && (options & WNOHANG)))
break;
ev = pth_event(PTH_EVENT_TIME|PTH_MODE_STATIC, &ev_key, pth_timeout(0,250000));
pth_wait(ev);
}
pth_debug2("pth_waitpid: leave to thread \"%s\"", pth_current->name);
return pid;
}
int pth_system(const char *cmd)
{
struct sigaction sa_ign, sa_int, sa_quit;
sigset_t ss_block, ss_old;
struct stat sb;
pid_t pid;
int pstat;
if (cmd == NULL) {
if (stat(PTH_PATH_BINSH, &sb) == -1)
return 0;
return 1;
}
sa_ign.sa_handler = SIG_IGN;
sigemptyset(&sa_ign.sa_mask);
sa_ign.sa_flags = 0;
sigaction(SIGINT, &sa_ign, &sa_int);
sigaction(SIGQUIT, &sa_ign, &sa_quit);
sigemptyset(&ss_block);
sigaddset(&ss_block, SIGCHLD);
sigprocmask(SIG_BLOCK, &ss_block, &ss_old);
pstat = -1;
switch (pid = pth_fork()) {
case -1:
break;
case 0:
sigaction(SIGINT, &sa_int, NULL);
sigaction(SIGQUIT, &sa_quit, NULL);
sigprocmask(SIG_SETMASK, &ss_old, NULL);
pth_scheduler_kill();
execl(PTH_PATH_BINSH, "sh", "-c", cmd, NULL);
exit(127);
default:
pid = pth_waitpid(pid, &pstat, 0);
break;
}
sigaction(SIGINT, &sa_int, NULL);
sigaction(SIGQUIT, &sa_quit, NULL);
sigprocmask(SIG_SETMASK, &ss_old, NULL);
return (pid == -1 ? -1 : pstat);
}
int pth_select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout)
{
return pth_select_ev(nfds, readfds, writefds, exceptfds, timeout, NULL);
}
int pth_select_ev(int nfd, fd_set *rfds, fd_set *wfds,
fd_set *efds, struct timeval *timeout, pth_event_t ev_extra)
{
struct timeval delay;
pth_event_t ev;
pth_event_t ev_select;
pth_event_t ev_timeout;
static pth_key_t ev_key_select = PTH_KEY_INIT;
static pth_key_t ev_key_timeout = PTH_KEY_INIT;
fd_set rspare, wspare, espare;
fd_set *rtmp, *wtmp, *etmp;
int selected;
int rc;
pth_implicit_init();
pth_debug2("pth_select_ev: called from thread \"%s\"", pth_current->name);
if (nfd == 0 && rfds == NULL && wfds == NULL && efds == NULL && timeout != NULL) {
if (timeout->tv_sec < 0 || timeout->tv_usec < 0)
return_errno(-1, EINVAL);
if (timeout->tv_sec == 0 && timeout->tv_usec < 500000) {
while ( pth_sc(select)(0, NULL, NULL, NULL, timeout) < 0
&& errno == EINTR) ;
}
else {
ev = pth_event(PTH_EVENT_TIME|PTH_MODE_STATIC, &ev_key_timeout,
pth_timeout(timeout->tv_sec, timeout->tv_usec));
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev))
return_errno(-1, EINTR);
}
}
if (rfds != NULL) FD_ZERO(rfds);
if (wfds != NULL) FD_ZERO(wfds);
if (efds != NULL) FD_ZERO(efds);
return 0;
}
delay.tv_sec = 0;
delay.tv_usec = 0;
rtmp = NULL;
if (rfds != NULL) {
rspare = *rfds;
rtmp = &rspare;
}
wtmp = NULL;
if (wfds != NULL) {
wspare = *wfds;
wtmp = &wspare;
}
etmp = NULL;
if (efds != NULL) {
espare = *efds;
etmp = &espare;
}
while ((rc = pth_sc(select)(nfd, rtmp, wtmp, etmp, &delay)) < 0 && errno == EINTR) ;
if (rc > 0) {
if (rfds != NULL)
*rfds = rspare;
if (wfds != NULL)
*wfds = wspare;
if (efds != NULL)
*efds = espare;
return rc;
}
if (rc == 0 && timeout != NULL) {
if (pth_time_cmp(timeout, PTH_TIME_ZERO) == 0) {
if (rfds != NULL) FD_ZERO(rfds);
if (wfds != NULL) FD_ZERO(wfds);
if (efds != NULL) FD_ZERO(efds);
return 0;
}
}
rc = -1;
ev = ev_select = pth_event(PTH_EVENT_SELECT|PTH_MODE_STATIC,
&ev_key_select, &rc, nfd, rfds, wfds, efds);
ev_timeout = NULL;
if (timeout != NULL) {
ev_timeout = pth_event(PTH_EVENT_TIME|PTH_MODE_STATIC, &ev_key_timeout,
pth_timeout(timeout->tv_sec, timeout->tv_usec));
pth_event_concat(ev, ev_timeout, NULL);
}
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
selected = FALSE;
pth_event_isolate(ev_select);
if (pth_event_occurred(ev_select))
selected = TRUE;
if (timeout != NULL) {
pth_event_isolate(ev_timeout);
if (pth_event_occurred(ev_timeout)) {
selected = TRUE;
if (rfds != NULL) FD_ZERO(rfds);
if (wfds != NULL) FD_ZERO(wfds);
if (efds != NULL) FD_ZERO(efds);
rc = 0;
}
}
if (ev_extra != NULL && !selected)
return_errno(-1, EINTR);
return rc;
}
int pth_poll(struct pollfd *pfd, nfds_t nfd, int timeout)
{
return pth_poll_ev(pfd, nfd, timeout, NULL);
}
int pth_poll_ev(struct pollfd *pfd, nfds_t nfd, int timeout, pth_event_t ev_extra)
{
fd_set rfds, wfds, efds;
struct timeval tv, *ptv;
int maxfd, rc, ok;
unsigned int i;
char data[64];
pth_implicit_init();
pth_debug2("pth_poll_ev: called from thread \"%s\"", pth_current->name);
if (pfd == NULL)
return_errno(-1, EFAULT);
ptv = &tv;
if (timeout == 0) {
ptv->tv_sec = 0;
ptv->tv_usec = 0;
}
else if (timeout == INFTIM ) {
ptv = NULL;
}
else if (timeout > 0) {
ptv->tv_sec = timeout / 1000;
ptv->tv_usec = (timeout % 1000) * 1000;
}
else
return_errno(-1, EINVAL);
maxfd = -1;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
for(i = 0; i < nfd; i++) {
if (pfd[i].fd < 0)
continue;
if (pfd[i].events & POLLIN)
FD_SET(pfd[i].fd, &rfds);
if (pfd[i].events & POLLOUT)
FD_SET(pfd[i].fd, &wfds);
if (pfd[i].events & POLLPRI)
FD_SET(pfd[i].fd, &efds);
if (pfd[i].fd >= maxfd && (pfd[i].events & (POLLIN|POLLOUT|POLLPRI)))
maxfd = pfd[i].fd;
}
if (maxfd == -1)
return_errno(-1, EINVAL);
rc = pth_select_ev(maxfd+1, &rfds, &wfds, &efds, ptv, ev_extra);
if (rc > 0) {
rc = 0;
for (i = 0; i < nfd; i++) {
ok = 0;
pfd[i].revents = 0;
if (pfd[i].fd < 0) {
pfd[i].revents |= POLLNVAL;
continue;
}
if (FD_ISSET(pfd[i].fd, &rfds)) {
pfd[i].revents |= POLLIN;
ok++;
if (recv(pfd[i].fd, data, 64, MSG_PEEK) == -1) {
if ( errno == ESHUTDOWN || errno == ECONNRESET
|| errno == ECONNABORTED || errno == ENETRESET) {
pfd[i].revents &= ~(POLLIN);
pfd[i].revents |= POLLHUP;
ok--;
}
}
}
if (FD_ISSET(pfd[i].fd, &wfds)) {
pfd[i].revents |= POLLOUT;
ok++;
}
if (FD_ISSET(pfd[i].fd, &efds)) {
pfd[i].revents |= POLLPRI;
ok++;
}
if (ok)
rc++;
}
}
return rc;
}
int pth_connect(int s, const struct sockaddr *addr, socklen_t addrlen)
{
return pth_connect_ev(s, addr, addrlen, NULL);
}
int pth_connect_ev(int s, const struct sockaddr *addr, socklen_t addrlen, pth_event_t ev_extra)
{
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
int rv, err;
socklen_t errlen;
int fdmode;
pth_implicit_init();
pth_debug2("pth_connect_ev: enter from thread \"%s\"", pth_current->name);
fdmode = pth_fdmode(s, PTH_FDMODE_NONBLOCK);
while ( (rv = pth_sc(connect)(s, (struct sockaddr *)addr, addrlen)) == -1
&& errno == EINTR)
;
errno_shield { pth_fdmode(s, fdmode); }
if (rv == -1 && errno == EINPROGRESS) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_WRITEABLE|PTH_MODE_STATIC, &ev_key, s);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev))
return_errno(-1, EINTR);
}
errlen = sizeof(err);
if (getsockopt(s, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen) == -1)
return -1;
if (err == 0)
return 0;
return_errno(rv, err);
}
pth_debug2("pth_connect_ev: leave to thread \"%s\"", pth_current->name);
return rv;
}
int pth_accept(int s, struct sockaddr *addr, socklen_t *addrlen)
{
return pth_accept_ev(s, addr, addrlen, NULL);
}
int pth_accept_ev(int s, struct sockaddr *addr, socklen_t *addrlen, pth_event_t ev_extra)
{
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
int fdmode;
int rv;
pth_implicit_init();
pth_debug2("pth_accept_ev: enter from thread \"%s\"", pth_current->name);
fdmode = pth_fdmode(s, PTH_FDMODE_NONBLOCK);
ev = NULL;
while ((rv = pth_sc(accept)(s, addr, addrlen)) == -1
&& (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (ev == NULL) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, s);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
}
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev)) {
pth_fdmode(s, fdmode);
return_errno(-1, EINTR);
}
}
}
errno_shield {
pth_fdmode(s, fdmode);
if (rv != -1)
pth_fdmode(rv, fdmode);
}
pth_debug2("pth_accept_ev: leave to thread \"%s\"", pth_current->name);
return rv;
}
ssize_t pth_read(int fd, void *buf, size_t nbytes)
{
return pth_read_ev(fd, buf, nbytes, NULL);
}
ssize_t pth_read_ev(int fd, void *buf, size_t nbytes, pth_event_t ev_extra)
{
struct timeval delay;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
fd_set fds;
int n;
pth_implicit_init();
pth_debug2("pth_read_ev: enter from thread \"%s\"", pth_current->name);
if (nbytes == 0)
return 0;
if (pth_fdmode(fd, PTH_FDMODE_POLL) == PTH_FDMODE_BLOCK) {
FD_ZERO(&fds);
FD_SET(fd, &fds);
delay.tv_sec = 0;
delay.tv_usec = 0;
while ((n = pth_sc(select)(fd+1, &fds, NULL, NULL, &delay)) < 0
&& errno == EINTR) ;
if (n < 1) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, fd);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
n = pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev))
return_errno(-1, EINTR);
}
}
}
while ((n = pth_sc(read)(fd, buf, nbytes)) < 0
&& errno == EINTR) ;
pth_debug2("pth_read_ev: leave to thread \"%s\"", pth_current->name);
return n;
}
ssize_t pth_write(int fd, const void *buf, size_t nbytes)
{
return pth_write_ev(fd, buf, nbytes, NULL);
}
ssize_t pth_write_ev(int fd, const void *buf, size_t nbytes, pth_event_t ev_extra)
{
struct timeval delay;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
fd_set fds;
int fdmode;
ssize_t rv;
ssize_t s;
int n;
pth_implicit_init();
pth_debug2("pth_write_ev: enter from thread \"%s\"", pth_current->name);
if (nbytes == 0)
return 0;
fdmode = pth_fdmode(fd, PTH_FDMODE_NONBLOCK);
if (fdmode != PTH_FDMODE_NONBLOCK) {
FD_ZERO(&fds);
FD_SET(fd, &fds);
delay.tv_sec = 0;
delay.tv_usec = 0;
while ((n = pth_sc(select)(fd+1, NULL, &fds, NULL, &delay)) < 0
&& errno == EINTR) ;
rv = 0;
for (;;) {
if (n < 1) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_WRITEABLE|PTH_MODE_STATIC, &ev_key, fd);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev)) {
pth_fdmode(fd, fdmode);
return_errno(-1, EINTR);
}
}
}
while ((s = pth_sc(write)(fd, buf, nbytes)) < 0
&& errno == EINTR) ;
if (s > 0)
rv += s;
if (s > 0 && s < (ssize_t)nbytes) {
nbytes -= s;
buf = (void *)((char *)buf + s);
n = 0;
continue;
}
if (s < 0 && rv == 0)
rv = -1;
break;
}
}
else {
while ((rv = pth_sc(write)(fd, buf, nbytes)) < 0
&& errno == EINTR) ;
}
errno_shield { pth_fdmode(fd, fdmode); }
pth_debug2("pth_write_ev: leave to thread \"%s\"", pth_current->name);
return rv;
}
ssize_t pth_readv(int fd, const struct iovec *iov, int iovcnt)
{
return pth_readv_ev(fd, iov, iovcnt, NULL);
}
ssize_t pth_readv_ev(int fd, const struct iovec *iov, int iovcnt, pth_event_t ev_extra)
{
struct timeval delay;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
fd_set fds;
int n;
pth_implicit_init();
pth_debug2("pth_readv_ev: enter from thread \"%s\"", pth_current->name);
if (iovcnt <= 0 || iovcnt > UIO_MAXIOV)
return_errno(-1, EINVAL);
if (pth_fdmode(fd, PTH_FDMODE_POLL) == PTH_FDMODE_BLOCK) {
FD_ZERO(&fds);
FD_SET(fd, &fds);
delay.tv_sec = 0;
delay.tv_usec = 0;
while ((n = pth_sc(select)(fd+1, &fds, NULL, NULL, &delay)) < 0
&& errno == EINTR) ;
if (n < 1) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, fd);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
n = pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev))
return_errno(-1, EINTR);
}
}
}
#if PTH_FAKE_RWV
while ((n = pth_readv_faked(fd, iov, iovcnt)) < 0
&& errno == EINTR) ;
#else
while ((n = pth_sc(readv)(fd, iov, iovcnt)) < 0
&& errno == EINTR) ;
#endif
pth_debug2("pth_readv_ev: leave to thread \"%s\"", pth_current->name);
return n;
}
intern ssize_t pth_readv_faked(int fd, const struct iovec *iov, int iovcnt)
{
char *buffer;
size_t bytes, copy, rv;
int i;
bytes = 0;
for (i = 0; i < iovcnt; i++) {
if (iov[i].iov_len <= 0)
return_errno((ssize_t)(-1), EINVAL);
bytes += iov[i].iov_len;
}
if (bytes <= 0)
return_errno((ssize_t)(-1), EINVAL);
if ((buffer = (char *)malloc(bytes)) == NULL)
return (ssize_t)(-1);
rv = pth_sc(read)(fd, buffer, bytes);
if (rv > 0) {
bytes = rv;
for (i = 0; i < iovcnt; i++) {
copy = pth_util_min(iov[i].iov_len, bytes);
memcpy(iov[i].iov_base, buffer, copy);
buffer += copy;
bytes -= copy;
if (bytes <= 0)
break;
}
}
errno_shield { free(buffer); }
return(rv);
}
ssize_t pth_writev(int fd, const struct iovec *iov, int iovcnt)
{
return pth_writev_ev(fd, iov, iovcnt, NULL);
}
ssize_t pth_writev_ev(int fd, const struct iovec *iov, int iovcnt, pth_event_t ev_extra)
{
struct timeval delay;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
fd_set fds;
int fdmode;
struct iovec *liov;
int liovcnt;
size_t nbytes;
ssize_t rv;
ssize_t s;
int n;
pth_implicit_init();
pth_debug2("pth_writev_ev: enter from thread \"%s\"", pth_current->name);
if (iovcnt <= 0 || iovcnt > UIO_MAXIOV)
return_errno(-1, EINVAL);
fdmode = pth_fdmode(fd, PTH_FDMODE_NONBLOCK);
if (fdmode != PTH_FDMODE_NONBLOCK) {
rv = 0;
nbytes = pth_writev_iov_bytes(iov, iovcnt);
liov = NULL;
liovcnt = 0;
pth_writev_iov_advance(iov, iovcnt, 0, &liov, &liovcnt);
FD_ZERO(&fds);
FD_SET(fd, &fds);
delay.tv_sec = 0;
delay.tv_usec = 0;
while ((n = pth_sc(select)(fd+1, NULL, &fds, NULL, &delay)) < 0
&& errno == EINTR) ;
for (;;) {
if (n < 1) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_WRITEABLE|PTH_MODE_STATIC, &ev_key, fd);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev)) {
pth_fdmode(fd, fdmode);
return_errno(-1, EINTR);
}
}
}
#if PTH_FAKE_RWV
while ((s = pth_writev_faked(fd, liov, liovcnt)) < 0
&& errno == EINTR) ;
#else
while ((s = pth_sc(writev)(fd, liov, liovcnt)) < 0
&& errno == EINTR) ;
#endif
if (s > 0)
rv += s;
if (s > 0 && s < (ssize_t)nbytes) {
nbytes -= s;
pth_writev_iov_advance(iov, iovcnt, n, &liov, &liovcnt);
n = 0;
continue;
}
if (s < 0 && rv == 0)
rv = -1;
break;
}
}
else {
#if PTH_FAKE_RWV
while ((rv = pth_writev_faked(fd, iov, iovcnt)) < 0
&& errno == EINTR) ;
#else
while ((rv = pth_sc(writev)(fd, iov, iovcnt)) < 0
&& errno == EINTR) ;
#endif
}
errno_shield { pth_fdmode(fd, fdmode); }
pth_debug2("pth_writev_ev: leave to thread \"%s\"", pth_current->name);
return rv;
}
intern ssize_t pth_writev_iov_bytes(const struct iovec *iov, int iovcnt)
{
ssize_t bytes;
int i;
bytes = 0;
for (i = 0; i < iovcnt; i++) {
if (iov[i].iov_len <= 0)
continue;
bytes += iov[i].iov_len;
}
return bytes;
}
intern void pth_writev_iov_advance(const struct iovec *riov, int riovcnt, size_t advance,
struct iovec **liov, int *liovcnt)
{
static struct iovec siov[UIO_MAXIOV];
int i;
if (*liov == NULL && *liovcnt == 0) {
*liov = (struct iovec *)riov;
*liovcnt = riovcnt;
}
if (advance > 0) {
if (*liov == riov && *liovcnt == riovcnt) {
*liov = &siov[0];
for (i = 0; i < riovcnt; i++) {
siov[i].iov_base = riov[i].iov_base;
siov[i].iov_len = riov[i].iov_len;
}
}
while (*liovcnt > 0 && advance > 0) {
if ((*liov)->iov_len > advance) {
(*liov)->iov_base = (char *)((*liov)->iov_base) + advance;
(*liov)->iov_len -= advance;
break;
}
else {
advance -= (*liov)->iov_len;
(*liovcnt)--;
(*liov)++;
}
}
}
return;
}
intern ssize_t pth_writev_faked(int fd, const struct iovec *iov, int iovcnt)
{
char *buffer, *cp;
size_t bytes, to_copy, copy, rv;
int i;
bytes = 0;
for (i = 0; i < iovcnt; i++) {
if (iov[i].iov_len <= 0)
return_errno((ssize_t)(-1), EINVAL);
bytes += iov[i].iov_len;
}
if (bytes <= 0)
return_errno((ssize_t)(-1), EINVAL);
if ((buffer = (char *)malloc(bytes)) == NULL)
return (ssize_t)(-1);
to_copy = bytes;
cp = buffer;
for (i = 0; i < iovcnt; i++) {
copy = pth_util_min(iov[i].iov_len, to_copy);
memcpy(cp, iov[i].iov_base, copy);
to_copy -= copy;
if (to_copy <= 0)
break;
}
rv = pth_sc(write)(fd, buffer, bytes);
errno_shield { free(buffer); }
return(rv);
}
ssize_t pth_pread(int fd, void *buf, size_t nbytes, off_t offset)
{
static pth_mutex_t mutex = PTH_MUTEX_INIT;
off_t old_offset;
ssize_t rc;
if (!pth_mutex_acquire(&mutex, FALSE, NULL))
return (-1);
if ((old_offset = lseek(fd, 0, SEEK_CUR)) == (off_t)(-1)) {
pth_mutex_release(&mutex);
return (-1);
}
if (lseek(fd, offset, SEEK_SET) == (off_t)(-1)) {
pth_mutex_release(&mutex);
return (-1);
}
rc = pth_read(fd, buf, nbytes);
errno_shield { lseek(fd, old_offset, SEEK_SET); }
pth_mutex_release(&mutex);
return rc;
}
ssize_t pth_pwrite(int fd, const void *buf, size_t nbytes, off_t offset)
{
static pth_mutex_t mutex = PTH_MUTEX_INIT;
off_t old_offset;
ssize_t rc;
if (!pth_mutex_acquire(&mutex, FALSE, NULL))
return (-1);
if ((old_offset = lseek(fd, 0, SEEK_CUR)) == (off_t)(-1)) {
pth_mutex_release(&mutex);
return (-1);
}
if (lseek(fd, offset, SEEK_SET) == (off_t)(-1)) {
pth_mutex_release(&mutex);
return (-1);
}
rc = pth_write(fd, buf, nbytes);
errno_shield { lseek(fd, old_offset, SEEK_SET); }
pth_mutex_release(&mutex);
return rc;
}
ssize_t pth_recv(int s, void *buf, size_t len, int flags)
{
return pth_recv_ev(s, buf, len, flags, NULL);
}
ssize_t pth_recv_ev(int s, void *buf, size_t len, int flags, pth_event_t ev)
{
return pth_recvfrom_ev(s, buf, len, flags, NULL, 0, ev);
}
ssize_t pth_recvfrom(int s, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen)
{
return pth_recvfrom_ev(s, buf, len, flags, from, fromlen, NULL);
}
ssize_t pth_recvfrom_ev(int fd, void *buf, size_t nbytes, int flags, struct sockaddr *from, socklen_t *fromlen, pth_event_t ev_extra)
{
struct timeval delay;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
fd_set fds;
int n;
pth_implicit_init();
pth_debug2("pth_recvfrom_ev: enter from thread \"%s\"", pth_current->name);
if (nbytes == 0)
return 0;
if (pth_fdmode(fd, PTH_FDMODE_POLL) == PTH_FDMODE_BLOCK) {
FD_ZERO(&fds);
FD_SET(fd, &fds);
delay.tv_sec = 0;
delay.tv_usec = 0;
while ((n = pth_sc(select)(fd+1, &fds, NULL, NULL, &delay)) < 0
&& errno == EINTR) ;
if (n < 1) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, fd);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
n = pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev))
return_errno(-1, EINTR);
}
}
}
while ((n = pth_sc(recvfrom)(fd, buf, nbytes, flags, from, fromlen)) < 0
&& errno == EINTR) ;
pth_debug2("pth_recvfrom_ev: leave to thread \"%s\"", pth_current->name);
return n;
}
ssize_t pth_send(int s, const void *buf, size_t len, int flags)
{
return pth_send_ev(s, buf, len, flags, NULL);
}
ssize_t pth_send_ev(int s, const void *buf, size_t len, int flags, pth_event_t ev)
{
return pth_sendto_ev(s, buf, len, flags, NULL, 0, ev);
}
ssize_t pth_sendto(int s, const void *buf, size_t len, int flags, const struct sockaddr *to, socklen_t tolen)
{
return pth_sendto_ev(s, buf, len, flags, to, tolen, NULL);
}
ssize_t pth_sendto_ev(int fd, const void *buf, size_t nbytes, int flags, const struct sockaddr *to, socklen_t tolen, pth_event_t ev_extra)
{
struct timeval delay;
pth_event_t ev;
static pth_key_t ev_key = PTH_KEY_INIT;
fd_set fds;
int fdmode;
ssize_t rv;
ssize_t s;
int n;
pth_implicit_init();
pth_debug2("pth_sendto_ev: enter from thread \"%s\"", pth_current->name);
if (nbytes == 0)
return 0;
fdmode = pth_fdmode(fd, PTH_FDMODE_NONBLOCK);
if (fdmode != PTH_FDMODE_NONBLOCK) {
FD_ZERO(&fds);
FD_SET(fd, &fds);
delay.tv_sec = 0;
delay.tv_usec = 0;
while ((n = pth_sc(select)(fd+1, NULL, &fds, NULL, &delay)) < 0
&& errno == EINTR) ;
rv = 0;
for (;;) {
if (n < 1) {
ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_WRITEABLE|PTH_MODE_STATIC, &ev_key, fd);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (!pth_event_occurred(ev)) {
pth_fdmode(fd, fdmode);
return_errno(-1, EINTR);
}
}
}
while ((s = pth_sc(sendto)(fd, buf, nbytes, flags, to, tolen)) < 0
&& errno == EINTR) ;
if (s > 0)
rv += s;
if (s > 0 && s < (ssize_t)nbytes) {
nbytes -= s;
buf = (void *)((char *)buf + s);
n = 0;
continue;
}
if (s < 0 && rv == 0)
rv = -1;
break;
}
}
else {
while ((rv = pth_sc(sendto)(fd, buf, nbytes, flags, to, tolen)) < 0
&& errno == EINTR) ;
}
errno_shield { pth_fdmode(fd, fdmode); }
pth_debug2("pth_sendto_ev: leave to thread \"%s\"", pth_current->name);
return rv;
}