Source code for marcopolo.marco.marcobinding

from __future__ import absolute_import
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor

import socket, sys, json, logging, os
from os import path
from copy import copy
import six

from marcopolo.marco_conf import utils
from marcopolo.marco import conf

from marcopolo.marco.marco import Marco, MarcoException



[docs]class MarcoBinding(DatagramProtocol): """ Twisted class for an asynchronous socket server """ def __init__(self): self.marco = Marco() #Own instance of Marco def __del__(self): del self.marco def graceful_shutdown(self): logging.info('Stopping service marcod') def startProtocol(self): reactor.addSystemEventTrigger('before', 'shutdown', self.graceful_shutdown) def marcoInThread(self, command, address): nodes = [] nodes = self.marco.marco(max_nodes=command.get("max_nodes", None), exclude=command.get("exclude", []), timeout=command.get("timeout", None), params=command.get("params", {}), group=command.get("group", conf.MULTICAST_ADDR) ) self.transport.write(json.dumps([{"Address":n.address, "Params": n.params} for n in nodes]).encode('utf-8'), address) def requestForInThread(self, command, address): nodes = self.marco.request_for(command["Params"], max_nodes=command.get("max_nodes", None), exclude=command.get("exclude", []), params=command.get("params", {}), timeout=command.get("timeout", None)) if len(nodes) > 0: self.transport.write(json.dumps( [{"Address": n.address, "Params": n.params} for n in nodes]).encode('utf-8'), address) else: self.transport.write(json.dumps([]).encode('utf-8'), address) def servicesInThread(self, command, address): services = self.marco.services(addr=command.get("node", None), timeout=command.get("timeout", 0) ) self.transport.write(json.dumps([service for service in services]).encode('utf-8'), address) def datagramReceived(self, data, address): try: command = json.loads(data.decode('utf-8')) except ValueError: return print(command.get("Command")) if command.get("Command", None) == None: self.transport.write(json.dumps({"Error": True}).encode('utf-8'), address) else: if command["Command"] == "Marco": reactor.callInThread(self.marcoInThread, command, address) elif command["Command"] == "Request-for" or command["Command"] == "Request-For": reactor.callInThread(self.requestForInThread, command, address) elif command["Command"] == "Services": reactor.callInThread(self.servicesInThread, command, address) else: self.transport.write(json.dumps({"Error": True}).encode('utf-8'), address)