Diseño procedimental¶
En este apartado se referencian los algoritmos implementados en el sistema que son considerados de mayor relevancia.
Mensaje de descubrimiento¶
Algoritmo que realiza el descubrimiento de nodos. Un mecanismo similar se utiliza para el resto de operaciones (marco, request_for...). Se omiten las comprobaciones de datos.
def marco(self, max_nodes=None, exclude=[], timeout=None, params={}, retries=0, group=conf.MULTICAST_ADDR):
while attempts < retries + 1 and not stop:
#Looping until timeout is raised or max_nodes is reached
while True:
try:
msg, address = self.socket_mcast.recvfrom(conf.FRAME_SIZE)
except socket.timeout:
attempts += 1
break
error = None
try:
json_data = json.loads(msg.decode('utf-8'))
except ValueError:
error = True
if error:
raise MarcoException("Malformed message")
if json_data.get("Command", "") == "Polo" and address not in exclude:
n = utils.Node()
n.address = address[0] # IP address.
n.params = json_data.get("Params", {})
if type(params) != type({}):
raise MarcoException("params must be a dictionary")
for name, value in params.items():
if n.params.get(name, None) != value:
break
else:
nodes.add(n)
stop = True
if max_nodes:
counter +=1
if counter >= max_nodes:
stop = True
break
return copy(nodes)
Secuencia de arranque de Polo¶
Durante la secuencia de arranque se leen los ficheros de servicios estáticos y se añaden a la lista de servicios a publicar.
def startProtocol(self):
services_dir = os.path.join(conf.CONF_DIR, conf.SERVICES_DIR)
servicefiles = [f for f in os.listdir(services_dir) if isfile(os.path.join(services_dir,f))]
service_ids = set()
for service in servicefiles:
try:
with open(os.path.join(os.path.join(conf.CONF_DIR, conf.SERVICES_DIR), service), 'r') as f:
s = json.load(f)
if not s.get("disabled", False) == True:
s["permanent"] = True
s["params"] = s.get("params", {})
s["file"] = service
groups = s.get("groups",[])
if self.multicast_group in groups or len(groups) == 0:
if not self.verify.match(s['id']):
if s['id'] in service_ids:
logging.warning("Service %s already published. The service in the file %s will not be published" % (s['id'], service))
else:
service_ids.add(s['id'])
self.offered_services.append(s)
#if not self.verify.match(s['id']):
# self.offered_services.append(s)
else:
logging.warning("The service %s does not have a valid id", s['id'])
Recepción de un mensaje de descubrimiento¶
Se incluye a modo de ejemplo las funciones implicadas en el procesamiento de un mensaje request-for
. El resto de mensajes se tratan de forma similar.
def datagramReceived(self, datagram, address):
command = message_dict.get("Command", "")
if command == 'Discover' or command == 'Marco':
self.polo(command, address)
elif command == 'Request-for' or command == 'Request-For':
self.response_request_for(command, message_dict["Params"], address)
elif command == 'Services':
self.response_services(command, address)
else:
logging.info("Datagram received from [%s:%s]. Unknown command %s " % (address[0], address[1], datagram.decode('utf-8')))
response_services.append(send_service)
reload_user_services_iter(user) is called
"""
self.reload_user_services_iter(user)
match = next((s for s in self.user_services.get(user, []) if s['id'] == service), None)
if match:
command_msg = json.dumps({'Command':'OK', 'Params': match.get("params", {})})
self.transport.write(command_msg.encode('utf-8'), address)
return
pass ## reload and retry!
"""
if self.verify.match(service):
try:
user, service = self.verify.match(service).groups()
except (IndexError, ValueError):
return
self.response_request_for_user(command, user, service, address)
return
match = next((s for s in self.offered_services if s['id'] == service), None)
if match:
command_msg = json.dumps({'Command':'OK', 'Params':match.get("params", {})})
Solicitud de un token de acceso¶
La creación de un token implica el uso del módulo auxiliar tokenprovider.py
def request_token(self, uid):
pw_user = pwd.getpwuid(uid)
if pw_user == None:
self.write_error("User not found")
return
polo_dir = os.path.join(pw_user.pw_dir, ".polo")
if not os.path.exists(polo_dir):
os.mkdir(polo_dir)
os.chown(polo_dir, pw_user.pw_uid, pw_user.pw_gid)
if not os.path.isfile(os.path.join(polo_dir, "token")):
try:
f = open(os.path.join(polo_dir, "token"), 'wb')
os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRUSR)
os.fchown(f.fileno(), pw_user.pw_uid, pw_user.pw_gid)
f.write(tokenprovider.create_token(uid, self.secret))
f.close()
except Exception as e:
self.write_error(str(e))
return
self.write_ok(0)
else:
self.write_ok(1)
uid = str(uid).encode('UTF-8')
# create a cipher object using the random secret
cipher = AES.new(secret)
return six.u(EncodeAES(cipher, uid))
def decrypt_token(token, secret):
Conexión con un binding¶
Un ejemplo de conexión a uno de los bindings, en concreto, para la ejecución de una consulta request_for
. Todos los bindings siguen la misma estrategia de serialización que este ejemplo representa.
def request_for(self, service, node=None, max_nodes=None, exclude=[], params={}, timeout=None):
timeout = timeout if timeout else self.timeout
error = None
rvalue = None
try:
rvalue = self.marco_socket.sendto(bytes(json.dumps({"Command": "Request-for",
"Params":service,
"node":node,
"max_nodes":max_nodes,
"exclude":exclude,
"params":params,
"timeout":timeout}).encode('utf-8')), ('127.0.1.1', 1338))
except ValueError as e:
error = True
if error:
raise MarcoTimeOutException("Bad parameters")
if rvalue < 1:
raise MarcoInternalError("Error on communication")
error = None
try:
data, address = self.marco_socket.recvfrom(4096)
except socket.timeout:
error = True
if error:
raise MarcoTimeOutException('No connection to the resolver')
error_parse = None
try:
nodes_arr = json.loads(data.decode('utf-8'))
except ValueError:
error_parse = True
if error_parse:
raise MarcoInternalError("Internal parsing error")
nodes = set()
for node_arr in nodes_arr:
node = Node()
node.address = node_arr["Address"]
node.params = node_arr["Params"]
nodes.add(node)
return nodes