Diseño procedimental

En este apartado se referencian los algoritmos implementados en el sistema que son considerados de mayor relevancia.

La aplicación utiliza las siguientes bibliotecas y frameworks:

  • Tornado como servidor web.
  • Snorky para la gestión de los WebSocket
  • Jade como motor de renderizado de plantillas (mediante pyjade.
  • jQuery como biblioteca para la gestión de peticiones asíncronas y gestión de los diferentes elementos con el plugin jQuery Cookie.
  • Bootstrap para la gestión de los elementos visuales.
  • MarcoPolo para la detección de los diferentes nodos.
  • PAM para la gestión de usuarios mediante la interfaz python-pam.

La autenticación en el cliente se realiza mediante la interfaz PAM disponible en Python. En caso de ser exitosa, el cliente recibe una cookie segura, firmada mediante HMAC y marcada temporalmente. Dicha cookie es enviada en todas las transmisiones WebSocket como token de autenticación (los nodos son capaces de descifrar el valor de la misma al conocer la clave criptográfica con la que se ha realizado el proceso de firmado).

Antes de realizarse el despliegue se crea el canal WebSocket. Todas las salidas se envían por el mismo canal. Los nodos mantienen una tabla clave-valor para identificar los WebSockets abiertos, siendo la clave el nombre de usuario en el sistema (mediante PAM dichos nombres son homogéneos en el sistema). Cada ejecución cuenta con un identificador aleatorio que el servidor comparte con el cliente.

Despliegue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class UploadAndDeployHandler(BaseHandler):
    """
    Listens for POST requests and performs the deployment asynchronously.
    """
    #The post is asynchronous due to the potencially long deploying time
    @asynchronous
    @engine
    def post(self):
        file1 = self.request.files['file'][0] #Only one file at a time

        original_fname = file1['filename']
       
        output_file = open(os.path.join(__UPLOADS__, original_fname), 'wb')
        output_file.write(file1['body'])
        output_file.close()
        
        # The nodes where to deploy are returned as a comma-separated string
        nodes = self.get_argument('nodes', '').split(',')[:-1]
        from concurrent import futures
        
        """The deployment process is performed asynchronously 
        using a ThreadPool, which will handle the request asynchronously"""

        futures_set = set()

        for node in nodes:
            future = self.deploy(node=node,
             request=self, 
             filename=original_fname, 
             command=self.get_argument('command', ''), 
             user=self.current_user, 
             folder=self.get_argument('folder', ''), 
             tomcat=self.get_argument('tomcat', ''), 
             overwrite=self.get_argument('overwrite', 'false'))
            
            futures_set.add((future, node))

        error = []
       
        for future, node in futures_set:
            try:
                response = future.result()
                
                if response.status_code > 400:
                    error.append((node, response.reason))
            except Exception as e:
                error.append((node, "Could not connect to the node"))

        if len(error) > 0:
            self.finish("Errors occurred " + 
                " ".join(["Node:"+node+"." for node, reason in error]))
        else:
            self.finish("file" + original_fname + " is uploaded and on deploy")
    
    def deploy(self, node, request, filename, command, user, folder="", idpolo="", tomcat="", overwrite='false'):
        :param str folder: The deployment folder
        
        :param str idpolo: The id of the polo service to publish
        
        :param str tomcat: Specifies whether the file should be deployed as a tomcat service
        
        :param str overwrite: Specifies if the file can overwrite existing files

        :returns: :class:`concurrent.future` A future that encapsulates the asynchronous execution 
        """
        def get_content_type(filename):
            """
            Guesses the MIME type of the file so it can be sent with the POST request

            :param str filename: The name of the file to process
            """
            return mimetypes.guess_type(filename)[0] or 'application/octet-stream'

        url = "https://"+node+":"+str(conf.RECEIVER_PORT)+"/deploy/"
        
        files = {'file': (filename, 
                    open(os.path.join(__UPLOADS__, filename), 'rb'), 
                    get_content_type(filename))
                }
        
        commands = {'command':command, 
                    'user':user, 
                    'folder': folder, 
                    'idpolo': idpolo, 

Secuencia de realización de un despliegue. Mediante el uso de futures se consigue que el código sea asíncrono y de esta forma se optimiza el rendimiento del sistema de forma significativa.

Recepción de un despliegue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DeployHandler(RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        """
        POST handler received from ``deployer.py``. 
        It handles the deployment of the file and the execution of the desired command.
        """

        file1 = self.request.files['file'][0]
        
        command = self.get_argument('command', '')
        idpolo = self.get_argument('idpolo', '')
        tomcat = self.get_argument('tomcat', '')
        
        if not tomcat:
            folder = self.get_argument('folder', '')
        else:
            folder = conf.TOMCAT_PATH

        fname = file1['filename']
        
        user = self.get_argument('user', '')
        
        user_pwd = pwd.getpwnam(user)
        
        #Handling of relative paths
        folder = folder.replace('~', user_pwd.pw_dir, 1)
        
        if len(folder) == 0 or folder[0] != '/':
            folder = os.path.join(user_pwd.pw_dir, folder)

        if folder == '':
            folder = user_pwd.pw_dir

        if not os.path.isdir(folder):
            return

        if not os.path.exists(folder):
            makedirs(folder)
            chown(folder, user.pw_uid, user.pw_gid)


        final_directory = os.path.join(folder, fname)
        
        overwrite = self.get_argument('overwrite', 'false')
        
        overwrite = False if overwrite.lower() == 'false' else True;


        thread_pool = futures.ThreadPoolExecutor(max_workers=1)
        thread_pool.submit(self.execute, command=command, file_desc=file1, filename=final_directory, directory=folder, user=user_pwd, tomcat=tomcat, overwrite=overwrite)
        
        self.finish('OK')

    @tornado.web.asynchronous
    def execute(self, command, file_desc, filename, directory, user, tomcat=False, overwrite="false"):
        
        if os.path.isfile(filename) and not overwrite:
            return

        def demote(user_uid, user_gid):
            os.setgid(user_gid)
            os.setuid(user_uid)
            
        if os.path.exists(os.path.dirname(filename)):
            output_file = open(filename, 'wb')
        else:
            return

        output_file.write(file_desc['body'])
        if not tomcat:
            os.chown(filename, user.pw_uid, user.pw_gid)
        else:
            os.chown(filename, pwd.getpwnam('tomcat7').pw_uid, pwd.getpwnam('tomcat7').pw_gid)
        output_file.close()
        if len(command) > 0:
            p = ProcessReactor(user, directory, io_loop, ip, opensockets, split(command), shell=False)
            if processes.get(user.pw_name, None) is None:
                processes[user.pw_name] = set()

Ejecución de un comando

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class ProcessReactor(object):
	def __init__(self, user, directory, ioloop, ip, opensockets, *args, **kwargs):
		"""
		Starts the command and sets the redirection of the desired buffers
		:param pwd: user The pwd structure with the information of the user which issued the command
		:param str directory: The directory to use as cwd
		:param list: args A list of supplementary arguments
		:param dict: kwargs A dictionary of keyword arguments

		The function redirects STDOUT and STDERR to a pipe, and then executes the command using Popen.
		The output pipe descriptor is made non-blocking and included in the instance of the IOLoop. 
		"""

		self.user = user #user which executes the command
		self.command = ' '.join(*args) # The name of the command
		self.ip = ip # The IP of the server
		self.shell =  kwargs.get("shell", False)
		self.ioloop = ioloop
		def randomString():
			"""
			Generates a random token

			:returns: A random string which acts as a token
			"""
			return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(3))

		# Generates a random token to identify the execution
		self.identifier = ''.join(random.choice(string.ascii_uppercase) for i in range(12))
		
		self.opensockets  = opensockets
		
		#The buffers are redirected
		kwargs['stdout'] = subprocess.PIPE
		kwargs['stderr'] = subprocess.PIPE
		
		def demote(uid, gid):
			"""
			The UID and GID of the child process is changed to match those of the user
			who issued the command. Otherwise the operation would be executed as root.
			"""
			os.setgid(gid)
			os.setuid(uid)

		kwargs['shell'] = False
		self.process = subprocess.Popen(preexec_fn=demote(user.pw_uid, user.pw_gid), #function executed before the call
										cwd=directory, # The current working directory is changed
										*args, **kwargs)
		
		#The fileno of the stdout buffer is used to make it non-blocking
		self.fd = self.process.stdout.fileno()
		fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) #The file access mode is returned
		fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # The flags of the file are modified, appending the non-blocking flag blogin

		#The same for stderr
		self.fd_err = self.process.stderr.fileno()
		fl_err = fcntl.fcntl(self.fd_err, fcntl.F_GETFL)
		fcntl.fcntl(self.fd_err, fcntl.F_SETFL, fl | os.O_NONBLOCK)

		#Creation of the line buffer
		self.line_buffer = LineBuffer()

		#Two handlers are registered, each for one of the output buffers. can_read and can_read_stderr act as the callback for events related to both of them
		self.ioloop.add_handler(self.process.stdout, self.can_read, self.ioloop.READ)
		self.ioloop.add_handler(self.process.stderr, self.can_read_stderr, self.ioloop.READ)
	
	def kill(self):
		if self.process.returncode is None:
			self.process.kill()

	def stop(self):
		if self.process.returncode is None:
			print("Sending terminate signal")
			self.process.terminate()
			self.ioloop.call_later(60, self.kill)

	def can_read(self, fd, events):
		"""
		Processes the stdout event
		:param int fd: The file descriptor of the stdout buffer
		:param int events: The event flags (bitwise OR of the constants IOLoop.READ, IOLoop.WRITE, and IOLoop.ERROR)
		"""
		data = self.process.stdout.read(1024)
		

		if len(data) > 0:
			"""If the length of the data is larger than zero, the information is sent to all
			the listening sockets"""
			self.on_data(data, "stdout")

		else:
			print("Lost connection to subprocess")
			self.ioloop.remove_handler(self.process.stdout)
			self.stop_output("stdout")
	
	def can_read_stderr(self, fd, events):
		"""
		Processes the stderr event
		:param int fd: The file descriptor of the stderr buffer
		:param int events: The event flags (bitwise OR of the constants IOLoop.READ, IOLoop.WRITE, and IOLoop.ERROR)
		"""
		data = self.process.stderr.read(1024)

		if len(data) > 0:
			self.on_data(data, "stderr")

		else:
			print("Lost connection to subprocess")
			self.ioloop.remove_handler(self.process.stderr)
			self.stop_output("stderr")

	def on_data(self, data, stream_name):
		"""
		Decodes the data and passes it to on_line
		:param bytes data: an array of bytes with the message
		:param str stream_name: The name of the stream
		"""
		for line in self.line_buffer.read_lines(data):
			self.on_line(line.decode('utf-8'), stream_name)

	def on_line(self, line, stream_name):
		"""
		Sends the line to the open websocket
		:param str line: The message line
		:param str stream_name: The name of the stream
		"""
		for ws in self.opensockets[self.user.pw_name]:
			ws.on_line(self.user.pw_name, self.command, line, self.ip, self.identifier, False, stream_name, shell=self.shell)


	def stop_output(self, stream_name="unknown"):
		"""
		Sends a special message to close the websocket connection
		"""
		for ws in self.opensockets[self.user.pw_name]:
			ws.on_line(self.user.pw_name, self.command, None, self.ip, self.identifier, True, stream_name, shell=self.shell)

La ejecución de un comando es completamente asíncrona, dado que se vincula el descriptor de fichero de cada uno de los streams de salida del comando al bucle de eventos de Tornado, añadiendo manejadoras que recojan los diferentes eventos emitidos por el bucle relativos a dichos descriptores.

Recolección de datos

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def get_data():
    response_dict = {}
    response_dict["time"] = strftime("%a, %d %b %Y %H:%M:%S", localtime())

    response_dict["hostname"] = subprocess.Popen("hostname", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
    response_dict["ip"] = subprocess.Popen("/sbin/ifconfig eth0| grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
    if len(response_dict["ip"]) == 0:
        response_dict["ip"] = subprocess.Popen(" /sbin/ifconfig eth0 | grep 'inet\ ' | cut -d: -f2 | awk '{ print $2 }'", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')

    response_dict["kernel_name"] = subprocess.Popen("uname -r", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')
    
    response_dict["memtotal"] = subprocess.Popen("egrep --color 'MemTotal' /proc/meminfo | egrep '[0-9.]{4,}' -o",
                    shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')

    response_dict["memfree"] = subprocess.Popen("egrep --color 'MemFree' /proc/meminfo | egrep '[0-9.]{4,}' -o", shell=True, stdout=subprocess.PIPE).stdout.read().decode('utf-8')

La recolección de datos se realiza mediante llamadas al sistema.