Diseño procedimental¶
En este apartado se referencian los algoritmos implementados en el sistema que son considerados de mayor relevancia.
La aplicación utiliza las siguientes bibliotecas y frameworks:
- Tornado como servidor web.
- Snorky para la gestión de los WebSocket
- Jade como motor de renderizado de plantillas (mediante pyjade.
- jQuery como biblioteca para la gestión de peticiones asíncronas y gestión de los diferentes elementos con el plugin jQuery Cookie.
- Bootstrap para la gestión de los elementos visuales.
- MarcoPolo para la detección de los diferentes nodos.
- PAM para la gestión de usuarios mediante la interfaz python-pam.
La autenticación en el cliente se realiza mediante la interfaz PAM disponible en Python. En caso de ser exitosa, el cliente recibe una cookie segura, firmada mediante HMAC y marcada temporalmente. Dicha cookie es enviada en todas las transmisiones WebSocket como token de autenticación (los nodos son capaces de descifrar el valor de la misma al conocer la clave criptográfica con la que se ha realizado el proceso de firmado).
Antes de realizarse el despliegue se crea el canal WebSocket. Todas las salidas se envían por el mismo canal. Los nodos mantienen una tabla clave-valor para identificar los WebSockets abiertos, siendo la clave el nombre de usuario en el sistema (mediante PAM dichos nombres son homogéneos en el sistema). Cada ejecución cuenta con un identificador aleatorio que el servidor comparte con el cliente.
Despliegue¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | class UploadAndDeployHandler(BaseHandler):
"""
Listens for POST requests and performs the deployment asynchronously.
"""
#The post is asynchronous due to the potencially long deploying time
@asynchronous
@engine
def post(self):
file1 = self.request.files['file'][0] #Only one file at a time
original_fname = file1['filename']
output_file = open(os.path.join(__UPLOADS__, original_fname), 'wb')
output_file.write(file1['body'])
output_file.close()
# The nodes where to deploy are returned as a comma-separated string
nodes = self.get_argument('nodes', '').split(',')[:-1]
from concurrent import futures
"""The deployment process is performed asynchronously
using a ThreadPool, which will handle the request asynchronously"""
futures_set = set()
for node in nodes:
future = self.deploy(node=node,
request=self,
filename=original_fname,
command=self.get_argument('command', ''),
user=self.current_user,
folder=self.get_argument('folder', ''),
tomcat=self.get_argument('tomcat', ''),
overwrite=self.get_argument('overwrite', 'false'))
futures_set.add((future, node))
error = []
for future, node in futures_set:
try:
response = future.result()
if response.status_code > 400:
error.append((node, response.reason))
except Exception as e:
error.append((node, "Could not connect to the node"))
if len(error) > 0:
self.finish("Errors occurred " +
" ".join(["Node:"+node+"." for node, reason in error]))
else:
self.finish("file" + original_fname + " is uploaded and on deploy")
def deploy(self, node, request, filename, command, user, folder="", idpolo="", tomcat="", overwrite='false'):
:param str folder: The deployment folder
:param str idpolo: The id of the polo service to publish
:param str tomcat: Specifies whether the file should be deployed as a tomcat service
:param str overwrite: Specifies if the file can overwrite existing files
:returns: :class:`concurrent.future` A future that encapsulates the asynchronous execution
"""
def get_content_type(filename):
"""
Guesses the MIME type of the file so it can be sent with the POST request
:param str filename: The name of the file to process
"""
return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
url = "https://"+node+":"+str(conf.RECEIVER_PORT)+"/deploy/"
files = {'file': (filename,
open(os.path.join(__UPLOADS__, filename), 'rb'),
get_content_type(filename))
}
commands = {'command':command,
'user':user,
'folder': folder,
'idpolo': idpolo,
|
Secuencia de realización de un despliegue. Mediante el uso de futures
se consigue que el código sea asíncrono y de esta forma se optimiza el rendimiento del sistema de forma significativa.
Recepción de un despliegue¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | class DeployHandler(RequestHandler):
@tornado.web.asynchronous
def post(self):
"""
POST handler received from ``deployer.py``.
It handles the deployment of the file and the execution of the desired command.
"""
file1 = self.request.files['file'][0]
command = self.get_argument('command', '')
idpolo = self.get_argument('idpolo', '')
tomcat = self.get_argument('tomcat', '')
if not tomcat:
folder = self.get_argument('folder', '')
else:
folder = conf.TOMCAT_PATH
fname = file1['filename']
user = self.get_argument('user', '')
user_pwd = pwd.getpwnam(user)
#Handling of relative paths
folder = folder.replace('~', user_pwd.pw_dir, 1)
if len(folder) == 0 or folder[0] != '/':
folder = os.path.join(user_pwd.pw_dir, folder)
if folder == '':
folder = user_pwd.pw_dir
if not os.path.isdir(folder):
return
if not os.path.exists(folder):
makedirs(folder)
chown(folder, user.pw_uid, user.pw_gid)
final_directory = os.path.join(folder, fname)
overwrite = self.get_argument('overwrite', 'false')
overwrite = False if overwrite.lower() == 'false' else True;
thread_pool = futures.ThreadPoolExecutor(max_workers=1)
thread_pool.submit(self.execute, command=command, file_desc=file1, filename=final_directory, directory=folder, user=user_pwd, tomcat=tomcat, overwrite=overwrite)
self.finish('OK')
@tornado.web.asynchronous
def execute(self, command, file_desc, filename, directory, user, tomcat=False, overwrite="false"):
if os.path.isfile(filename) and not overwrite:
return
def demote(user_uid, user_gid):
os.setgid(user_gid)
os.setuid(user_uid)
if os.path.exists(os.path.dirname(filename)):
output_file = open(filename, 'wb')
else:
return
output_file.write(file_desc['body'])
if not tomcat:
os.chown(filename, user.pw_uid, user.pw_gid)
else:
os.chown(filename, pwd.getpwnam('tomcat7').pw_uid, pwd.getpwnam('tomcat7').pw_gid)
output_file.close()
if len(command) > 0:
p = ProcessReactor(user, directory, io_loop, ip, opensockets, split(command), shell=False)
if processes.get(user.pw_name, None) is None:
processes[user.pw_name] = set()
|
Ejecución de un comando¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | class ProcessReactor(object):
def __init__(self, user, directory, ioloop, ip, opensockets, *args, **kwargs):
"""
Starts the command and sets the redirection of the desired buffers
:param pwd: user The pwd structure with the information of the user which issued the command
:param str directory: The directory to use as cwd
:param list: args A list of supplementary arguments
:param dict: kwargs A dictionary of keyword arguments
The function redirects STDOUT and STDERR to a pipe, and then executes the command using Popen.
The output pipe descriptor is made non-blocking and included in the instance of the IOLoop.
"""
self.user = user #user which executes the command
self.command = ' '.join(*args) # The name of the command
self.ip = ip # The IP of the server
self.shell = kwargs.get("shell", False)
self.ioloop = ioloop
def randomString():
"""
Generates a random token
:returns: A random string which acts as a token
"""
return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(3))
# Generates a random token to identify the execution
self.identifier = ''.join(random.choice(string.ascii_uppercase) for i in range(12))
self.opensockets = opensockets
#The buffers are redirected
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
def demote(uid, gid):
"""
The UID and GID of the child process is changed to match those of the user
who issued the command. Otherwise the operation would be executed as root.
"""
os.setgid(gid)
os.setuid(uid)
kwargs['shell'] = False
self.process = subprocess.Popen(preexec_fn=demote(user.pw_uid, user.pw_gid), #function executed before the call
cwd=directory, # The current working directory is changed
*args, **kwargs)
#The fileno of the stdout buffer is used to make it non-blocking
self.fd = self.process.stdout.fileno()
fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) #The file access mode is returned
fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # The flags of the file are modified, appending the non-blocking flag blogin
#The same for stderr
self.fd_err = self.process.stderr.fileno()
fl_err = fcntl.fcntl(self.fd_err, fcntl.F_GETFL)
fcntl.fcntl(self.fd_err, fcntl.F_SETFL, fl | os.O_NONBLOCK)
#Creation of the line buffer
self.line_buffer = LineBuffer()
#Two handlers are registered, each for one of the output buffers. can_read and can_read_stderr act as the callback for events related to both of them
self.ioloop.add_handler(self.process.stdout, self.can_read, self.ioloop.READ)
self.ioloop.add_handler(self.process.stderr, self.can_read_stderr, self.ioloop.READ)
def kill(self):
if self.process.returncode is None:
self.process.kill()
def stop(self):
if self.process.returncode is None:
print("Sending terminate signal")
self.process.terminate()
self.ioloop.call_later(60, self.kill)
def can_read(self, fd, events):
"""
Processes the stdout event
:param int fd: The file descriptor of the stdout buffer
:param int events: The event flags (bitwise OR of the constants IOLoop.READ, IOLoop.WRITE, and IOLoop.ERROR)
"""
data = self.process.stdout.read(1024)
if len(data) > 0:
"""If the length of the data is larger than zero, the information is sent to all
the listening sockets"""
self.on_data(data, "stdout")
else:
print("Lost connection to subprocess")
self.ioloop.remove_handler(self.process.stdout)
self.stop_output("stdout")
def can_read_stderr(self, fd, events):
"""
Processes the stderr event
:param int fd: The file descriptor of the stderr buffer
:param int events: The event flags (bitwise OR of the constants IOLoop.READ, IOLoop.WRITE, and IOLoop.ERROR)
"""
data = self.process.stderr.read(1024)
if len(data) > 0:
self.on_data(data, "stderr")
else:
print("Lost connection to subprocess")
self.ioloop.remove_handler(self.process.stderr)
self.stop_output("stderr")
def on_data(self, data, stream_name):
"""
Decodes the data and passes it to on_line
:param bytes data: an array of bytes with the message
:param str stream_name: The name of the stream
"""
for line in self.line_buffer.read_lines(data):
self.on_line(line.decode('utf-8'), stream_name)
def on_line(self, line, stream_name):
"""
Sends the line to the open websocket
:param str line: The message line
:param str stream_name: The name of the stream
"""
for ws in self.opensockets[self.user.pw_name]:
ws.on_line(self.user.pw_name, self.command, line, self.ip, self.identifier, False, stream_name, shell=self.shell)
def stop_output(self, stream_name="unknown"):
"""
Sends a special message to close the websocket connection
"""
for ws in self.opensockets[self.user.pw_name]:
ws.on_line(self.user.pw_name, self.command, None, self.ip, self.identifier, True, stream_name, shell=self.shell)
|
La ejecución de un comando es completamente asíncrona, dado que se vincula el descriptor de fichero de cada uno de los streams de salida del comando al bucle de eventos de Tornado, añadiendo manejadoras que recojan los diferentes eventos emitidos por el bucle relativos a dichos descriptores.
Recolección de datos¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | def get_data():
response_dict = {}
response_dict["time"] = strftime("%a, %d %b %Y %H:%M:%S", localtime())
response_dict["hostname"] = subprocess.Popen("hostname", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
response_dict["ip"] = subprocess.Popen("/sbin/ifconfig eth0| grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
if len(response_dict["ip"]) == 0:
response_dict["ip"] = subprocess.Popen(" /sbin/ifconfig eth0 | grep 'inet\ ' | cut -d: -f2 | awk '{ print $2 }'", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
response_dict["kernel_name"] = subprocess.Popen("uname -r", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
response_dict["memtotal"] = subprocess.Popen("egrep --color 'MemTotal' /proc/meminfo | egrep '[0-9.]{4,}' -o",
shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
response_dict["memfree"] = subprocess.Popen("egrep --color 'MemFree' /proc/meminfo | egrep '[0-9.]{4,}' -o", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
|
La recolección de datos se realiza mediante llamadas al sistema.