Source code for marcopolo.polo.polo

# -*- coding: utf-8 -*-
from __future__ import with_statement
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor

import os
from os.path import isfile
import json, logging, re
import pwd

from marcopolo.polo import conf

[docs]class Polo(DatagramProtocol): """ Twisted-inherited class in charge of receiving Marco\ requests on the defined multicast groups """
[docs] def __init__(self, offered_services=None, user_services=None, multicast_group=None, verify_regexp=None): """ Creates the ``Polo`` instance with the data structures to work with. If defined, the ``offered_services`` and ``user_services`` variables will be treated as references to a list and dictionary respectively (i.e. the values will be modified, but the object reference will never be altered). :param list offered_services: A list of dictionaries which comprises all the root services. :param dict user_services: A dictionary of all the user services (the key is the user name and the value is a list of services like that of offered_services). :param str multicast_group: The IPv4 address of the multicast group to join to. **Important**: The multicast_addr is not validated until the reactor is started. :param str verify_regexp: Regular expression used to verify an user service. """ self.offered_services = offered_services if offered_services is not None else [] self.user_services = user_services if user_services is not None else {} self.verify = re.compile(verify_regexp or conf.VERIFY_REGEXP) self.multicast_group = multicast_group or conf.MULTICAST_ADDR_FALLBACK
[docs] def reload_services(self): """ Reloads both root and user services (calling :func:`reload_user_services`). The services stored in a file are loaded again, whereas dynamic services are kept intact. """ del self.offered_services[:] #http://stackoverflow.com/a/1400622/2628463 logging.info("Reloading services in polo instance for group %s" % self.multicast_group) #Load list of filenames servicefiles = [f for f in os.listdir(os.path.join(conf.CONF_DIR, conf.SERVICES_DIR)) if isfile(os.path.join('/etc/marcopolo/polo/services', f))] service_ids = set() for service_file in servicefiles: try: with open(os.path.join(os.path.join(conf.CONF_DIR, conf.SERVICES_DIR), service_file), 'r') as f: service = json.load(f) service["permanent"] = True service["params"] = service.get("params", {}) service["file"] = service_file groups = service.get("groups", []) if self.multicast_group in groups or len(groups) == 0: if not self.verify.match(service['id']): if service['id'] in service_ids: logging.warning("Service %s already published. The service in the file %s will not be added" % (service['id'], service_file)) else: service_ids.add(service['id']) self.offered_services.append(service) except ValueError as e: logging.debug(str.format("The file {0} does not have a valid JSON structure", os.path.join(conf.SERVICES_DIR, service_file))) except Exception as e: logging.warning("Unknown error: %s" % e) self.reload_user_services() logging.info("Polo instance for group " + self.multicast_group + ".Reloaded: Offering " + str(len(self.offered_services)) + " services")
[docs] def reload_user_services(self): """ Iterates through all the users who have services and calls reload_user_services_iter for each of them """ for user in self.user_services: self.reload_user_services_iter(user)
[docs] def reload_user_services_iter(self, user): """ Reloads the services for a given user :param str user: The name of the user """ logging.info("Reloading user services") #Loads the user pwd structure try: user = pwd.getpwnam(user) except KeyError: return #Check if the user home path exists if os.path.exists(user.pw_dir): #The services must be stored in $HOME/.polo/ polo_dir = os.path.join(user.pw_dir,conf.POLO_USER_DIR) username = user.pw_name self.user_services[username] = [service for service in self.user_services.get(username, []) if service[1] == False] servicefiles = [os.path.join(polo_dir, f) for f in os.listdir(polo_dir) if isfile(os.path.join(polo_dir, f))] fileservices = [] for service in servicefiles: try: with open(service, 'r') as f: s = json.load(f) s["permanent"] = True s["params"] = s.get("params", {}) s["file"] = service if not self.verify.match(s['id']): fileservices.append(s) except ValueError: logging.warning(str.format("The file {0} does not have a valid JSON structure", os.path.join(conf.SERVICES_DIR, service))) self.user_services[username] = self.user_services[username] + fileservices
[docs] def startProtocol(self): """ Operations to be performed before starting to listen """ logging.info("Starting service polod") logging.info("Loading services") #List all files in the service directory services_dir = os.path.join(conf.CONF_DIR, conf.SERVICES_DIR) servicefiles = [f for f in os.listdir(services_dir) if isfile(os.path.join(services_dir,f))] service_ids = set() for service in servicefiles: try: with open(os.path.join(os.path.join(conf.CONF_DIR, conf.SERVICES_DIR), service), 'r') as f: s = json.load(f) if not s["disabled"] == True: s["permanent"] = True s["params"] = s.get("params", {}) s["file"] = service groups = s.get("groups",[]) if self.multicast_group in groups or len(groups) == 0: if not self.verify.match(s['id']): if s['id'] in service_ids: logging.warning("Service %s already published. The service in the file %s will not be published" % (s['id'], service)) else: service_ids.add(s['id']) self.offered_services.append(s) #if not self.verify.match(s['id']): # self.offered_services.append(s) else: logging.warning("The service %s does not have a valid id", s['id']) except ValueError: logging.warning(str.format("The file {0} does not have a valid JSON structure", os.path.join(conf.SERVICES_DIR, service))) except Exception as e: logging.error("Unknown error %s", e) if conf.DEBUG: for s in self.offered_services: logging.debug("%s:%s"% (s['id'], s['params'])) logging.info("Offering " + str(len(self.offered_services)) + " services") self.attempts = 0 self.transport.joinGroup(self.multicast_group).addErrback(self.handler) self.transport.setTTL(conf.HOPS) #Go beyond the network. TODO
[docs] def handler(self, arg): """ An 'errback' that is called when the multicast subscription is unsuccessful. It schedules a retry and increments an attempt counter. :param object arg: The arg passed in the addErrback() call """ logging.error("Error on joining the multicast group %s. %d retries" % (self.multicast_group, self.attempts)) self.attempts += 1 reactor.callLater(3, self.retry)
[docs] def retry(self): """ Tries to join the multicast group if it unsuccessful """ if self.attempts < conf.RETRIES or conf.RETRIES < 0: self.transport.joinGroup(conf.MULTICAST_ADDR).addErrback(self.handler) else: logging.error("Could not joing the multicast group after %d attempts. Leaving" % (conf.RETRIES))
[docs] def datagramReceived(self, datagram, address): """ When a datagram is received the command is parsed and a response is generated :param bytes datagram: The byte stream with the message :param tuple address: A tuple with the requesting address and port """ try: message_dict = json.loads(datagram.decode('utf-8')) except ValueError: logging.info("Datagram received from [%s:%s]. Invalid JSON structure" % (address[0], address[1])) return command = message_dict.get("Command", "") if command == 'Discover' or command == 'Marco': self.polo(command, address) elif command == 'Request-for' or command == 'Request-For': self.response_request_for(command, message_dict["Params"], address) elif command == 'Services': self.response_services(command, address) else: logging.info("Datagram received from [%s:%s]. Unknown command %s " % (address[0], address[1], datagram.decode('utf-8')))
[docs] def polo(self, command, address): """ Replies to `Polo` requests :param str command: The command that triggered this action :param tuple address: A tuple with the requesting address and port """ response_dict = {} response_dict["Command"] = "Polo" response_dict["Params"] = conf.POLO_PARAMS json_msg = json.dumps(response_dict, separators=(',', ':')) msg = json_msg.encode('utf-8') self.transport.write(msg, address)
[docs] def response_services(self, command, param, address): """ Replies to `Services` requests """ response_services = [] for service in self.offered_services: send_service = {} send_service['id'] = service['id'] send_service['params'] = service['params'] response_services.append(send_service) self.transport.write(json.dumps({'Command': 'OK', 'Services': response_services}).encode('utf-8'), address)
[docs] def response_request_for_user(self, command, user, service, address): """ Handles user request-for requests :param str user: The name of the user :param str command: The command that triggered this action :param str service: The service name :param tuple address: A tuple with the requesting address and port If the user has not been added to the list of services before this request, reload_user_services_iter(user) is called """ self.reload_user_services_iter(user) match = next((s for s in self.user_services.get(user, []) if s['id'] == service), None) if match: command_msg = json.dumps({'Command':'OK', 'Params': match.get("params", {})}) self.transport.write(command_msg.encode('utf-8'), address) return else: pass ## reload and retry!
[docs] def response_request_for(self, command, service, address): """ Handles request-for requests :param str command: The command that triggered this action :param str service: The id of the service :param tuple address: A tuple with the requesting address and port """ if self.verify.match(service): try: user, service = self.verify.match(service).groups() except (IndexError, ValueError): return self.response_request_for_user(command, user, service, address) return match = next((s for s in self.offered_services if s['id'] == service), None) if match: command_msg = json.dumps({'Command':'OK', 'Params':match.get("params", {})}) self.transport.write(command_msg.encode('utf-8'), address) return