"""UNIX Process management.
Do NOT use this module directly - use reactor.spawnProcess() instead.
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
"""
import os, sys, traceback, select, errno, struct, cStringIO, types, signal
try:
import pty
except ImportError:
pty = None
try:
import fcntl, termios
except ImportError:
fcntl = None
from twisted.persisted import styles
from twisted.python import log, failure
from twisted.python.util import switchUID
from twisted.internet import protocol
import abstract, main, fdesc, error
from main import CONNECTION_LOST, CONNECTION_DONE
reapProcessHandlers = {}
def reapAllProcesses():
"""Reap all registered processes.
"""
for process in reapProcessHandlers.values():
process.reapProcess()
def registerReapProcessHandler(pid, process):
if reapProcessHandlers.has_key(pid):
raise RuntimeError
try:
aux_pid, status = os.waitpid(pid, os.WNOHANG)
except:
log.msg('Failed to reap %d:' % pid)
log.err()
aux_pid = None
if aux_pid:
process.processEnded(status)
else:
reapProcessHandlers[pid] = process
def unregisterReapProcessHandler(pid, process):
if not (reapProcessHandlers.has_key(pid)
and reapProcessHandlers[pid] == process):
raise RuntimeError
del reapProcessHandlers[pid]
class ProcessWriter(abstract.FileDescriptor):
"""(Internal) Helper class to write into a Process's input pipe.
I am a helper which describes a selectable asynchronous writer to a
process's input pipe, including stdin.
"""
connected = 1
ic = 0
def __init__(self, reactor, proc, name, fileno):
"""Initialize, specifying a Process instance to connect to.
"""
abstract.FileDescriptor.__init__(self, reactor)
self.proc = proc
self.name = name
self.fd = fileno
def fileno(self):
"""Return the fileno() of my process's stdin.
"""
return self.fd
def writeSomeData(self, data):
"""Write some data to the open process.
"""
try:
rv = os.write(self.fd, data)
if rv == len(data):
self.startReading()
return rv
except IOError, io:
if io.args[0] == errno.EAGAIN:
return 0
return CONNECTION_LOST
except OSError, ose:
if ose.errno == errno.EPIPE:
return CONNECTION_LOST
if ose.errno == errno.EAGAIN: return 0
raise
def write(self, data):
self.stopReading()
abstract.FileDescriptor.write(self, data)
def doRead(self):
"""The only way this pipe can become readable is at EOF, because the
child has closed it.
"""
fd = self.fd
r, w, x = select.select([fd], [fd], [], 0)
if r and w:
return CONNECTION_LOST
def connectionLost(self, reason):
"""See abstract.FileDescriptor.connectionLost.
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.fd)
self.proc.childConnectionLost(self.name)
class ProcessReader(abstract.FileDescriptor):
"""ProcessReader
I am a selectable representation of a process's output pipe, such as
stdout and stderr.
"""
def __init__(self, reactor, proc, name, fileno):
"""Initialize, specifying a process to connect to.
"""
abstract.FileDescriptor.__init__(self, reactor)
self.proc = proc
self.name = name
self.fd = fileno
def fileno(self):
"""Return the fileno() of my process's stderr.
"""
return self.fd
def writeSomeData(self, data):
assert data == ""
return CONNECTION_LOST
def doRead(self):
"""This is called when the pipe becomes readable.
"""
return fdesc.readFromFD(self.fd, self.dataReceived)
def dataReceived(self, data):
self.proto.childDataReceived(self.name, data)
def connectionLost(self, reason):
"""Close my end of the pipe, signal the Process (which signals the
ProcessProtocol).
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.fd)
self.proc.childConnectionLost(self.name)
class ProcessExitedAlready(Exception):
"""The process has already excited, and the operation requested can no longer be performed."""
pass
class Process(styles.Ephemeral):
"""An operating-system Process.
This represents an operating-system process with arbitrary input/output
pipes connected to it. Those pipes may represent standard input,
standard output, and standard error, or any other file descriptor.
On UNIX, this is implemented using fork(), exec(), pipe()
and fcntl(). These calls may not exist elsewhere so this
code is not cross-platform. (also, windows can only select
on sockets...)
"""
debug = False
debug_child = False
def __init__(self, reactor, command, args, environment, path, proto,
uid=None, gid=None, childFDs=None):
"""Spawn an operating-system process.
This is where the hard work of disconnecting all currently open
files / forking / executing the new process happens. (This is
executed automatically when a Process is instantiated.)
This will also run the subprocess as a given user ID and group ID, if
specified. (Implementation Note: this doesn't support all the arcane
nuances of setXXuid on UNIX: it will assume that either your effective
or real UID is 0.)
@param childFDs: a dictionary mapping
fd_in_child -> current_fd_in_parent/'r'/'w'
If the value is a number, it specifies one of the parent's fds
that will be remapped to the child's fd. This is useful for
things like inetd and shell-like file redirection.
If it is the string 'r', a pipe will be created and attached to
the child at that fd number, and the parent will be able to
read from the pipe. This is useful for the child's stdout and
stderr.
If it is the string 'w', a pipe will be created and attached,
and the parent will be able to write into that pipe. This is
useful for the child's stdin.
If childFDs is not passed, the default behaviour is to use a
mapping that opens the usual stdin/stdout/stderr pipes.
"""
if not proto:
assert 'r' not in childFDs.values() and 'w' not in childFDs.values()
self.lostProcess = False
settingUID = (uid is not None) or (gid is not None)
if settingUID:
curegid = os.getegid()
currgid = os.getgid()
cureuid = os.geteuid()
curruid = os.getuid()
if uid is None:
uid = cureuid
if gid is None:
gid = curegid
os.setuid(0)
os.setgid(0)
self.pipes = {}
helpers = {}
if childFDs is None:
childFDs = {0: "w", 1: "r", 2: "r", }
debug = self.debug
if debug: print "childFDs", childFDs
fdmap = {} for childFD, target in childFDs.items():
if debug: print "[%d]" % childFD, target
if target == "r":
readFD, writeFD = os.pipe()
if debug: print "readFD=%d, writeFD%d" % (readFD, writeFD)
fdmap[childFD] = writeFD helpers[childFD] = readFD fdesc.setNonBlocking(readFD)
elif target == "w":
readFD, writeFD = os.pipe()
if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
fdmap[childFD] = readFD helpers[childFD] = writeFD fdesc.setNonBlocking(writeFD)
else:
assert type(target) == int, '%r should be an int' % (target,)
fdmap[childFD] = target if debug: print "fdmap", fdmap
if debug: print "helpers", helpers
self.pid = os.fork()
if self.pid == 0:
try:
sys.settrace(None)
self._setupChild(fdmap)
self._execChild(path, settingUID, uid, gid,
command, args, environment)
except:
try:
stderr = os.fdopen(2,'w')
stderr.write("Upon execvpe %s %s in environment %s\n:" %
(command, str(args),
"id %s" % id(environment)))
traceback.print_exc(file=stderr)
stderr.flush()
for fd in range(3):
os.close(fd)
except:
pass os._exit(1)
if settingUID:
os.setregid(currgid, curegid)
os.setreuid(curruid, cureuid)
self.status = -1
for childFD, parentFD in helpers.items():
os.close(fdmap[childFD])
if childFDs[childFD] == "r":
reader = ProcessReader(reactor, self, childFD, parentFD)
reader.proto = proto
self.pipes[childFD] = reader
reader.startReading()
if childFDs[childFD] == "w":
writer = ProcessWriter(reactor, self, childFD, parentFD)
writer.proto = proto
self.pipes[childFD] = writer
writer.startReading()
self.proto = proto
try:
if self.proto is not None:
self.proto.makeConnection(self)
except:
log.err()
registerReapProcessHandler(self.pid, self)
def _setupChild(self, fdmap):
"""
fdmap[childFD] = parentFD
The child wants to end up with 'childFD' attached to what used to be
the parent's parentFD. As an example, a bash command run like
'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}.
'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}.
Step 1: close all file descriptors that aren't values of fdmap.
This means 0 .. maxfds.
Step 2: for each childFD:
if fdmap[childFD] == childFD, the descriptor is already in place.
Make sure the CLOEXEC flag is not set, then delete the entry from
fdmap.
if childFD is in fdmap.values(), then the target descriptor is
busy. Use os.dup() to move it elsewhere, update all fdmap[childFD]
items that point to it, then close the original. Then fall through
to the next case.
now fdmap[childFD] is not in fdmap.values(), and is free. Use
os.dup2() to move it to the right place, then close the original.
"""
debug = self.debug_child
if debug:
errfd = sys.stderr
print >>errfd, "starting _setupChild"
destList = fdmap.values()
try:
import resource
maxfds = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + 1
if maxfds > 1024:
maxfds = 1024
except:
maxfds = 256
for fd in range(maxfds):
if fd in destList:
continue
if debug and fd == errfd.fileno():
continue
try: os.close(fd)
except: pass
if debug: print >>errfd, "fdmap", fdmap
childlist = fdmap.keys()
childlist.sort()
for child in childlist:
target = fdmap[child]
if target == child:
if debug: print >>errfd, "%d already in place" % target
if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
old = fcntl.fcntl(child, fcntl.F_GETFD)
fcntl.fcntl(child,
fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
else:
if child in fdmap.values():
newtarget = os.dup(child) if debug: print >>errfd, "os.dup(%d) -> %d" % (child,
newtarget)
os.close(child) for c,p in fdmap.items():
if p == child:
fdmap[c] = newtarget if debug: print >>errfd, "os.dup2(%d,%d)" % (target, child)
os.dup2(target, child)
old = []
for fd in fdmap.values():
if not fd in old:
if not fd in fdmap.keys():
old.append(fd)
if debug: print >>errfd, "old", old
for fd in old:
os.close(fd)
def _execChild(self, path, settingUID, uid, gid,
command, args, environment):
if path:
os.chdir(path)
if settingUID:
switchUID(uid, gid)
os.execvpe(command, args, environment)
def reapProcess(self):
"""Try to reap a process (without blocking) via waitpid.
This is called when sigchild is caught or a Process object loses its
"connection" (stdout is closed) This ought to result in reaping all
zombie processes, since it will be called twice as often as it needs
to be.
(Unfortunately, this is a slightly experimental approach, since
UNIX has no way to be really sure that your process is going to
go away w/o blocking. I don't want to block.)
"""
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except:
log.msg('Failed to reap %d:' % self.pid)
log.err()
pid = None
if pid:
self.processEnded(status)
unregisterReapProcessHandler(pid, self)
def writeToChild(self, childFD, data):
self.pipes[childFD].write(data)
def closeChildFD(self, childFD):
if self.pipes.has_key(childFD):
self.pipes[childFD].loseConnection()
def pauseProducing(self):
for p in self.pipes.itervalues():
if isinstance(p, ProcessReader):
p.stopReading()
def resumeProducing(self):
for p in self.pipes.itervalues():
if isinstance(p, ProcessReader):
p.startReading()
def closeStdin(self):
"""Call this to close standard input on this process.
"""
self.closeChildFD(0)
def closeStdout(self):
self.closeChildFD(1)
def closeStderr(self):
self.closeChildFD(2)
def loseConnection(self):
self.closeStdin()
self.closeStderr()
self.closeStdout()
def write(self,data):
"""Call this to write to standard input on this process.
"""
self.pipes[0].write(data)
def signalProcess(self, signalID):
if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
signalID = getattr(signal, 'SIG'+signalID)
if self.pid is None:
raise ProcessExitedAlready
os.kill(self.pid, signalID)
def processEnded(self, status):
self.status = status
self.lostProcess = True
self.pid = None
self.maybeCallProcessEnded()
def childConnectionLost(self, childFD):
del self.pipes[childFD]
try:
self.proto.childConnectionLost(childFD)
except:
log.err()
self.maybeCallProcessEnded()
def maybeCallProcessEnded(self):
if self.pipes:
return
if not self.lostProcess:
self.reapProcess()
return
try:
exitCode = sig = None
if self.status != -1:
if os.WIFEXITED(self.status):
exitCode = os.WEXITSTATUS(self.status)
else:
sig = os.WTERMSIG(self.status)
else:
pass if exitCode or sig:
e = error.ProcessTerminated(exitCode, sig, self.status)
else:
e = error.ProcessDone(self.status)
if self.proto is not None:
self.proto.processEnded(failure.Failure(e))
self.proto = None
except:
log.err()
class PTYProcess(abstract.FileDescriptor, styles.Ephemeral):
"""An operating-system Process that uses PTY support."""
def __init__(self, reactor, command, args, environment, path, proto,
uid=None, gid=None, usePTY=None):
"""Spawn an operating-system process.
This is where the hard work of disconnecting all currently open
files / forking / executing the new process happens. (This is
executed automatically when a Process is instantiated.)
This will also run the subprocess as a given user ID and group ID, if
specified. (Implementation Note: this doesn't support all the arcane
nuances of setXXuid on UNIX: it will assume that either your effective
or real UID is 0.)
"""
if not pty and type(usePTY) not in (types.ListType, types.TupleType):
raise NotImplementedError, "cannot use PTYProcess on platforms without the pty module."
abstract.FileDescriptor.__init__(self, reactor)
settingUID = (uid is not None) or (gid is not None)
if settingUID:
curegid = os.getegid()
currgid = os.getgid()
cureuid = os.geteuid()
curruid = os.getuid()
if uid is None:
uid = cureuid
if gid is None:
gid = curegid
os.setuid(0)
os.setgid(0)
if type(usePTY) in (types.TupleType, types.ListType):
masterfd, slavefd, ttyname = usePTY
else:
masterfd, slavefd = pty.openpty()
ttyname = os.ttyname(slavefd)
pid = os.fork()
self.pid = pid
if pid == 0: try:
sys.settrace(None)
os.close(masterfd)
os.setsid()
if hasattr(termios, 'TIOCSCTTY'):
fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
else:
for fd in range(3):
if fd != slavefd:
os.close(fd)
fd = os.open(ttyname, os.O_RDWR)
os.close(fd)
os.dup2(slavefd, 0) os.dup2(slavefd, 1) os.dup2(slavefd, 2)
if path:
os.chdir(path)
for fd in range(3, 256):
try: os.close(fd)
except: pass
if settingUID:
switchUID(uid, gid)
os.execvpe(command, args, environment)
except:
stderr = os.fdopen(1, 'w')
stderr.write("Upon execvpe %s %s in environment %s:\n" %
(command, str(args),
"id %s" % id(environment)))
traceback.print_exc(file=stderr)
stderr.flush()
os._exit(1)
assert pid!=0
os.close(slavefd)
fdesc.setNonBlocking(masterfd)
self.fd=masterfd
self.startReading()
self.connected = 1
self.proto = proto
self.lostProcess = 0
self.status = -1
try:
self.proto.makeConnection(self)
except:
log.err()
registerReapProcessHandler(self.pid, self)
def reapProcess(self):
"""Try to reap a process (without blocking) via waitpid.
This is called when sigchild is caught or a Process object loses its
"connection" (stdout is closed) This ought to result in reaping all
zombie processes, since it will be called twice as often as it needs
to be.
(Unfortunately, this is a slightly experimental approach, since
UNIX has no way to be really sure that your process is going to
go away w/o blocking. I don't want to block.)
"""
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except OSError, e:
if e.errno == 10: pid = None
else:
raise
except:
log.err()
pid = None
if pid:
self.processEnded(status)
unregisterReapProcessHandler(self.pid, self)
def closeStdin(self):
pass
def closeStdout(self):
pass
def closeStderr(self):
pass
def signalProcess(self, signalID):
if signalID in ('HUP', 'STOP', 'INT', 'KILL'):
signalID = getattr(signal, 'SIG'+signalID)
os.kill(self.pid, signalID)
def processEnded(self, status):
self.status = status
self.lostProcess += 1
self.maybeCallProcessEnded()
def doRead(self):
"""Called when my standard output stream is ready for reading.
"""
try:
return fdesc.readFromFD(self.fd, self.proto.outReceived)
except OSError:
return CONNECTION_LOST
def fileno(self):
"""This returns the file number of standard output on this process.
"""
return self.fd
def maybeCallProcessEnded(self):
if self.lostProcess == 2:
try:
exitCode = sig = None
if self.status != -1:
if os.WIFEXITED(self.status):
exitCode = os.WEXITSTATUS(self.status)
else:
sig = os.WTERMSIG(self.status)
else:
pass if exitCode or sig:
e = error.ProcessTerminated(exitCode, sig, self.status)
else:
e = error.ProcessDone(self.status)
self.proto.processEnded(failure.Failure(e))
self.proto = None
except:
log.err()
def connectionLost(self, reason):
"""I call this to clean up when one or all of my connections has died.
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.fd)
self.lostProcess +=1
self.maybeCallProcessEnded()
def writeSomeData(self, data):
"""Write some data to the open process.
"""
try:
return os.write(self.fd, data)
except IOError,io:
if io.args[0] == errno.EAGAIN:
return 0
return CONNECTION_LOST