/*********************************************************************** * * * This software is part of the ast package * * Copyright (c) 1985-2007 AT&T Knowledge Ventures * * and is licensed under the * * Common Public License, Version 1.0 * * by AT&T Knowledge Ventures * * * * A copy of the License is available at * * http://www.opensource.org/licenses/cpl1.0.txt * * (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9) * * * * Information and Software Systems Research * * AT&T Research * * Florham Park NJ * * * * Glenn Fowler * * David Korn * * Phong Vo * * * ***********************************************************************/ #include "sfdchdr.h" /* Discipline to invoke UNIX processes as data filters. ** These processes must be able to fit in pipelines. ** ** Written by Kiem-Phong Vo, kpv@research.att.com, 03/18/1998. */ typedef struct _filter_s { Sfdisc_t disc; /* discipline structure */ Sfio_t* filter; /* the filter stream */ char* next; /* data unwritten */ char* endb; /* end of data */ char raw[4096]; /* raw data buffer */ } Filter_t; /* read data from the filter */ #if __STD_C static ssize_t filterread(Sfio_t* f, Void_t* buf, size_t n, Sfdisc_t* disc) #else static ssize_t filterread(f, buf, n, disc) Sfio_t* f; /* stream reading from */ Void_t* buf; /* buffer to read into */ size_t n; /* number of bytes requested */ Sfdisc_t* disc; /* discipline */ #endif { Filter_t* fi; ssize_t r, w; fi = (Filter_t*)disc; for(;;) { /* get some raw data to stuff down the pipe */ if(fi->next && fi->next >= fi->endb ) { if((r = sfrd(f,fi->raw,sizeof(fi->raw),disc)) > 0) { fi->next = fi->raw; fi->endb = fi->raw+r; } else { /* eof, close write end of pipes */ sfset(fi->filter,SF_READ,0); close(sffileno(fi->filter)); sfset(fi->filter,SF_READ,1); fi->next = fi->endb = NIL(char*); } } if(fi->next && (w = fi->endb - fi->next) > 0 ) { /* see if pipe is ready for write */ sfset(fi->filter, SF_READ, 0); r = sfpoll(&fi->filter, 1, 1); sfset(fi->filter, SF_READ, 1); if(r == 1) /* non-blocking write */ { errno = 0; if((w = sfwr(fi->filter, fi->next, w, 0)) > 0) fi->next += w; else if(errno != EAGAIN) return 0; } } /* see if pipe is ready for read */ sfset(fi->filter, SF_WRITE, 0); w = sfpoll(&fi->filter, 1, fi->next ? 1 : -1); sfset(fi->filter, SF_WRITE, 1); if(!fi->next || w == 1) /* non-blocking read */ { errno = 0; if((r = sfrd(fi->filter, buf, n, 0)) > 0) return r; if(errno != EAGAIN) return 0; } } } #if __STD_C static ssize_t filterwrite(Sfio_t* f, const Void_t* buf, size_t n, Sfdisc_t* disc) #else static ssize_t filterwrite(f, buf, n, disc) Sfio_t* f; /* stream writing to */ Void_t* buf; /* buffer to write into */ size_t n; /* number of bytes requested */ Sfdisc_t* disc; /* discipline */ #endif { return -1; } /* for the duration of this discipline, the stream is unseekable */ #if __STD_C static Sfoff_t filterseek(Sfio_t* f, Sfoff_t addr, int offset, Sfdisc_t* disc) #else static Sfoff_t filterseek(f, addr, offset, disc) Sfio_t* f; Sfoff_t addr; int offset; Sfdisc_t* disc; #endif { f = NIL(Sfio_t*); addr = 0; offset = 0; disc = NIL(Sfdisc_t*); return (Sfoff_t)(-1); } /* on close, remove the discipline */ #if __STD_C static int filterexcept(Sfio_t* f, int type, Void_t* data, Sfdisc_t* disc) #else static int filterexcept(f,type,data,disc) Sfio_t* f; int type; Void_t* data; Sfdisc_t* disc; #endif { if(type == SF_FINAL || type == SF_DPOP) { sfclose(((Filter_t*)disc)->filter); free(disc); } return 0; } #if __STD_C int sfdcfilter(Sfio_t* f, const char* cmd) #else int sfdcfilter(f, cmd) Sfio_t* f; /* stream to filter data */ char* cmd; /* program to run as a filter */ #endif { reg Filter_t* fi; reg Sfio_t* filter; /* open filter for read&write */ if(!(filter = sfpopen(NIL(Sfio_t*),cmd,"r+")) ) return -1; /* unbuffered stream */ sfsetbuf(filter,NIL(Void_t*),0); if(!(fi = (Filter_t*)malloc(sizeof(Filter_t))) ) { sfclose(filter); return -1; } fi->disc.readf = filterread; fi->disc.writef = filterwrite; fi->disc.seekf = filterseek; fi->disc.exceptf = filterexcept; fi->filter = filter; fi->next = fi->endb = fi->raw; if(sfdisc(f,(Sfdisc_t*)fi) != (Sfdisc_t*)fi) { sfclose(filter); free(fi); return -1; } return 0; }