import subprocess, random, string, os, fcntl
[docs]class LineBuffer(object):
Processes each line in the desired buffer
From Juan Luis Boya:
def __init__(self):
self.buffer = b''
[docs] def read_lines(self, input):
Processes each line and appends it to the buffer
while b'\n' in input:
before, after = input.split(b'\n', 1)
yield self.buffer + before
self.buffer = b''
input = after
self.buffer += input
class ProcessReactor(object):
def __init__(self, user, directory, ioloop, ip, opensockets, *args, **kwargs):
Starts the command and sets the redirection of the desired buffers
:param pwd: user The pwd structure with the information of the user which issued the command
:param str directory: The directory to use as cwd
:param list: args A list of supplementary arguments
:param dict: kwargs A dictionary of keyword arguments
The function redirects STDOUT and STDERR to a pipe, and then executes the command using Popen.
The output pipe descriptor is made non-blocking and included in the instance of the IOLoop.
self.user = user #user which executes the command
self.command = ' '.join(*args) # The name of the command
self.ip = ip # The IP of the server = kwargs.get("shell", False)
self.ioloop = ioloop
def randomString():
Generates a random token
:returns: A random string which acts as a token
return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(3))
# Generates a random token to identify the execution
self.identifier = ''.join(random.choice(string.ascii_uppercase) for i in range(12))
self.opensockets = opensockets
#The buffers are redirected
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
def demote(uid, gid):
The UID and GID of the child process is changed to match those of the user
who issued the command. Otherwise the operation would be executed as root.
kwargs['shell'] = False
self.process = subprocess.Popen(preexec_fn=demote(user.pw_uid, user.pw_gid), #function executed before the call
cwd=directory, # The current working directory is changed
*args, **kwargs)
#The fileno of the stdout buffer is used to make it non-blocking
self.fd = self.process.stdout.fileno()
fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) #The file access mode is returned
fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # The flags of the file are modified, appending the non-blocking flag blogin
#The same for stderr
self.fd_err = self.process.stderr.fileno()
fl_err = fcntl.fcntl(self.fd_err, fcntl.F_GETFL)
fcntl.fcntl(self.fd_err, fcntl.F_SETFL, fl | os.O_NONBLOCK)
#Creation of the line buffer
self.line_buffer = LineBuffer()
#Two handlers are registered, each for one of the output buffers. can_read and can_read_stderr act as the callback for events related to both of them
self.ioloop.add_handler(self.process.stdout, self.can_read, self.ioloop.READ)
self.ioloop.add_handler(self.process.stderr, self.can_read_stderr, self.ioloop.READ)
def kill(self):
if self.process.returncode is None:
def stop(self):
if self.process.returncode is None:
print("Sending terminate signal")
self.ioloop.call_later(60, self.kill)
def can_read(self, fd, events):
Processes the stdout event
:param int fd: The file descriptor of the stdout buffer
:param int events: The event flags (bitwise OR of the constants IOLoop.READ, IOLoop.WRITE, and IOLoop.ERROR)
data =
if len(data) > 0:
"""If the length of the data is larger than zero, the information is sent to all
the listening sockets"""
self.on_data(data, "stdout")
print("Lost connection to subprocess")
def can_read_stderr(self, fd, events):
Processes the stderr event
:param int fd: The file descriptor of the stderr buffer
:param int events: The event flags (bitwise OR of the constants IOLoop.READ, IOLoop.WRITE, and IOLoop.ERROR)
data =
if len(data) > 0:
self.on_data(data, "stderr")
print("Lost connection to subprocess")
def on_data(self, data, stream_name):
Decodes the data and passes it to on_line
:param bytes data: an array of bytes with the message
:param str stream_name: The name of the stream
for line in self.line_buffer.read_lines(data):
self.on_line(line.decode('utf-8'), stream_name)
def on_line(self, line, stream_name):
Sends the line to the open websocket
:param str line: The message line
:param str stream_name: The name of the stream
for ws in self.opensockets[self.user.pw_name]:
ws.on_line(self.user.pw_name, self.command, line, self.ip, self.identifier, False, stream_name,
def stop_output(self, stream_name="unknown"):
Sends a special message to close the websocket connection
for ws in self.opensockets[self.user.pw_name]:
ws.on_line(self.user.pw_name, self.command, None, self.ip, self.identifier, True, stream_name,