Diseño procedimental

En este apartado se referencian los algoritmos implementados en el sistema que son considerados de mayor relevancia.

Mensaje de descubrimiento

Algoritmo que realiza el descubrimiento de nodos. Un mecanismo similar se utiliza para el resto de operaciones (marco, request_for...). Se omiten las comprobaciones de datos.

    def marco(self, max_nodes=None, exclude=[], timeout=None, params={}, retries=0, group=conf.MULTICAST_ADDR):
        while attempts < retries + 1  and not stop:

            #Looping until timeout is raised or max_nodes is reached
            while True:
                try:
                    msg, address = self.socket_mcast.recvfrom(conf.FRAME_SIZE)
                except socket.timeout:
                    attempts += 1
                    break
                
                error = None
                
                try:
                    json_data = json.loads(msg.decode('utf-8'))
                except ValueError:
                    error = True
                
                if error:
                    raise MarcoException("Malformed message")

                if json_data.get("Command", "") == "Polo" and address not in exclude:

                    n = utils.Node()
                    n.address = address[0] # IP address.
                    n.params = json_data.get("Params", {})

                    if type(params) != type({}):
                        raise MarcoException("params must be a dictionary")

                    for name, value in params.items():
                        if n.params.get(name, None) != value:
                            break
                    else:
                        nodes.add(n)

                    stop = True
                
                if max_nodes:
                    counter +=1
                    if counter >= max_nodes:
                        stop = True
                        break
        return copy(nodes)

Secuencia de arranque de Polo

Durante la secuencia de arranque se leen los ficheros de servicios estáticos y se añaden a la lista de servicios a publicar.

    def startProtocol(self):
        services_dir = os.path.join(conf.CONF_DIR, conf.SERVICES_DIR)
        
        servicefiles = [f for f in os.listdir(services_dir) if isfile(os.path.join(services_dir,f))]
        
        service_ids = set()
        for service in servicefiles:
            try:
                with open(os.path.join(os.path.join(conf.CONF_DIR, conf.SERVICES_DIR), service), 'r') as f:
                    s = json.load(f)
                    if not s.get("disabled", False) == True:
                        s["permanent"] = True
                        s["params"] = s.get("params", {})
                        s["file"] = service
                        groups = s.get("groups",[])
                        if self.multicast_group in groups or len(groups) == 0:
                            if not self.verify.match(s['id']):
                                if s['id'] in service_ids:
                                    logging.warning("Service %s already published. The service in the file %s will not be published" % (s['id'], service))
                                else:
                                    service_ids.add(s['id'])
                                    self.offered_services.append(s)
                        #if not self.verify.match(s['id']):
                        #   self.offered_services.append(s)
                            else:
                                logging.warning("The service %s does not have a valid id", s['id'])

Recepción de un mensaje de descubrimiento

Se incluye a modo de ejemplo las funciones implicadas en el procesamiento de un mensaje request-for. El resto de mensajes se tratan de forma similar.

    def datagramReceived(self, datagram, address):
        command = message_dict.get("Command", "")

        if command == 'Discover' or command == 'Marco':
            self.polo(command, address)
        elif command == 'Request-for' or command == 'Request-For':
            self.response_request_for(command, message_dict["Params"], address)
        elif command == 'Services':
            self.response_services(command, address)
        else:
            logging.info("Datagram received from [%s:%s]. Unknown command %s " % (address[0], address[1], datagram.decode('utf-8')))
            response_services.append(send_service)
        reload_user_services_iter(user) is called
        """
        
        self.reload_user_services_iter(user)
        
        match = next((s for s in self.user_services.get(user, []) if s['id'] == service), None)
        
        if match:
            command_msg = json.dumps({'Command':'OK', 'Params': match.get("params", {})})
            self.transport.write(command_msg.encode('utf-8'), address)
            return
            pass ## reload and retry!
        """
        
        if self.verify.match(service):
            try:
                user, service = self.verify.match(service).groups()
            except (IndexError, ValueError):
                return
            self.response_request_for_user(command, user, service, address)
            return

        match = next((s for s in self.offered_services if s['id'] == service), None)
        if match:
            command_msg = json.dumps({'Command':'OK', 'Params':match.get("params", {})})

Solicitud de un token de acceso

La creación de un token implica el uso del módulo auxiliar tokenprovider.py

    def request_token(self, uid):
        pw_user = pwd.getpwuid(uid)
        if pw_user == None:
            self.write_error("User not found")
            return

        polo_dir = os.path.join(pw_user.pw_dir, ".polo")
        
        if not os.path.exists(polo_dir):
            os.mkdir(polo_dir)
            os.chown(polo_dir, pw_user.pw_uid, pw_user.pw_gid)

        if not os.path.isfile(os.path.join(polo_dir, "token")):
            try:
                f = open(os.path.join(polo_dir, "token"), 'wb')
                os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRUSR)
                os.fchown(f.fileno(), pw_user.pw_uid, pw_user.pw_gid)
                f.write(tokenprovider.create_token(uid, self.secret))
                f.close()
            except Exception as e:
                self.write_error(str(e))
                return
            self.write_ok(0)
        else:
            self.write_ok(1)
    uid = str(uid).encode('UTF-8')
    
    # create a cipher object using the random secret
    cipher = AES.new(secret)
    return six.u(EncodeAES(cipher, uid))
    

def decrypt_token(token, secret):

Conexión con un binding

Un ejemplo de conexión a uno de los bindings, en concreto, para la ejecución de una consulta request_for. Todos los bindings siguen la misma estrategia de serialización que este ejemplo representa.

    def request_for(self, service, node=None, max_nodes=None, exclude=[], params={}, timeout=None):
        timeout = timeout if timeout else self.timeout
        error = None
        rvalue = None
        try:
            
            rvalue = self.marco_socket.sendto(bytes(json.dumps({"Command": "Request-for", 
                                                           "Params":service, 
                                                           "node":node, 
                                                           "max_nodes":max_nodes, 
                                                           "exclude":exclude, 
                                                           "params":params, 
                                                           "timeout":timeout}).encode('utf-8')), ('127.0.1.1', 1338))
        except ValueError as e:
            error = True
        if error:
            raise MarcoTimeOutException("Bad parameters")

        if rvalue < 1:
            raise MarcoInternalError("Error on communication")

        error = None
        try:
            data, address = self.marco_socket.recvfrom(4096)
        except socket.timeout:
            error = True
        if error:
            raise MarcoTimeOutException('No connection to the resolver')

        error_parse = None
        try:
            nodes_arr = json.loads(data.decode('utf-8'))
        except ValueError:
            error_parse = True
        
        if error_parse:
            raise MarcoInternalError("Internal parsing error")
        
        nodes = set()
        for node_arr in nodes_arr:
            node = Node()
            node.address = node_arr["Address"]
            node.params = node_arr["Params"]

            nodes.add(node)
        return nodes