from __future__ import with_statement
from __future__ import absolute_import
from twisted.internet.protocol import DatagramProtocol, Protocol, Factory
from io import open
import os
from os import makedirs, path
from os.path import isfile
import json, logging, re
import pwd, grp, stat
import socket
import six
from marcopolo.marco_conf import utils
from marcopolo.polo import conf, tokenprovider
def sanitize_path(path_str):
"""
Prevents unwanted directory traversing and other bash vulnerabilities.
:param str path_str: The path to be sanitized.
:returns: The sanitized path.
:rtype: str
"""
return path.normpath("/"+path_str).lstrip('/')
class PoloBindingSSLFactory(Factory):
def __init__(self, secret, offered_services, user_services, multicast_addrs):
self.secret = secret
self.offered_services = offered_services
self.user_services = user_services
self.multicast_addrs = multicast_addrs
def buildProtocol(self, addr):
p = self.protocol(self.secret, self.offered_services, self.user_services, self.multicast_addrs)
p.factory = self
return p
[docs]class PoloBindingSSL(Protocol):
[docs] def __init__(self, secret, offered_services, user_services, multicast_groups=conf.MULTICAST_ADDRS, verify_regexp=conf.VERIFY_REGEXP):
"""
Creates the ``PoloBinding`` instance with the data structures to work with.
If defined, the ``offered_services`` and ``user_services`` variables will be treated
as references to a dictionaries (i.e. the values will be modified, but the object reference
will never be altered).
:param dict offered_services: A dictionary which comprises all the dictionaries \
passed to the ``offered_services`` param in the Polo instances. That way the services \
can be altered by both parties
:param dict user_services: A dictionary of all the user services dictionaries (the key is multicast
group and the value is the list of user dictionaries passed to the param ``user_services``.
:param str multicast_group: The IPv4 address of the multicast group to join to. **Important**: The multicast_addr is not validated until the reactor is started.
:param str verify_regexp: Regular expression used to verify an user service.
"""
self.secret = secret
self.offered_services = offered_services
self.user_services = user_services
self.verify = re.compile(verify_regexp)#re.compile('^([\d\w]+):([\d\w]+)$')
self.multicast_groups = multicast_groups
self.router = {}
self.router["Register"] = self.publish_service_wrapper
self.router["Publish"] = self.publish_service_wrapper
self.router["Unpublish"] = self.unpublish_service_wrapper
self.router["Request-token"] = self.request_token_service_wrapper
[docs] def startProtocol(self):
"""
Starts the binding and adds an entry in the log file
"""
logging.info("Starting binding")
def connectionMade(self):
print("A connection was made!")
[docs] def dataReceived(self, datagram):
"""
Receives datagrams from bindings, and verifies the `Command` field.
It emits a response based on the value (if necessary)
:param bytes datagram: The byte stream with the message
"""
datos = datagram.decode('utf-8')
logging.debug("Datagram received: %s" % datos)
try:
datos_dict = json.loads(datos)
except ValueError:
self.write_error("Malformed JSON")
logging.debug("Malformed JSON")
return
if datos_dict.get("Command", None) is None:
self.write_error("Missing command")
logging.debug("Missing command")
return
command = datos_dict["Command"]
method = self.router.get(command, self.unknown_command_handler)
method(command, datos_dict.get("Args", {}))
def request_token(self, uid):
pw_user = pwd.getpwuid(uid)
if pw_user == None:
self.write_error("User not found")
return
polo_dir = os.path.join(pw_user.pw_dir, ".polo")
if not os.path.exists(polo_dir):
os.mkdir(polo_dir)
os.chown(polo_dir, pw_user.pw_uid, pw_user.pw_gid)
if not os.path.isfile(os.path.join(polo_dir, "token")):
try:
f = open(os.path.join(polo_dir, "token"), 'wb')
os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRUSR)
os.fchown(f.fileno(), pw_user.pw_uid, pw_user.pw_gid)
f.write(tokenprovider.create_token(uid, self.secret))
f.close()
except Exception as e:
self.write_error(str(e))
return
self.write_ok(0)
else:
self.write_ok(1)
[docs] def write_ok(self, status):
"""
Creates and sends an OK message
:param int status: The status code
"""
json_str = json.dumps({"OK":status})
self.transport.write(json_str.encode('utf-8'))
[docs] def write_error(self, error):
"""
Creates and sends an Error message
:param str error: The error reason
"""
logging.debug(error)
self.transport.write(json.dumps({"Error": error}).encode('utf-8'))
[docs] def publish_service_wrapper(self, command, args):
"""
A wrapper for the :method:publish_service method
:param str command: The command that triggered the function
:param dict args: The arguments to pass to :method:publish_service
"""
logging.debug("Publish service")
self.publish_service(args.get("service", ''),
args.get("token", ""),
multicast_groups=args.get("multicast_groups", conf.MULTICAST_ADDRS),
permanent=args.get("permanent", False),
root=args.get("root", False))
[docs] def unpublish_service_wrapper(self, command, args):
"""
A wrapper for the :method:unpublish_service method
:param str command: The command that triggered the function
:param dict args: The arguments to pass to :method:unpublish_service
"""
logging.debug("Unpublish service")
self.unpublish_service(args.get("service", ''),
args.get("token", ""),
multicast_groups=args.get("multicast_groups", set()),
delete_file=args.get("delete_file", False)
)
[docs] def request_token_service_wrapper(self, command, args):
"""
A wrapper for the :method:request_token method
:param str command: The command that triggered the function
:param dict args: The arguments to pass to :method:request_token
"""
self.request_token(args.get("uid", -1))
[docs] def unknown_command_handler(self, command, args):
"""
Handles all unkown commands
:param str command: The command that triggered the function
:param dict args: The arguments that were passed
"""
self.transport.write(self.write_error("Malformed request. Commands missing"))
[docs] def publish_service(self, service, token, multicast_groups=conf.MULTICAST_ADDRS, permanent=False, root=False):
"""
Registers a service during execution time.
:param tuple address: A tuple with the requesting address and port
:param str service: Indicates the unique identifier of the service.
If `root` is true, the published service will have the same identifier as the value of the parameter. Otherwise, the name of the user will be prepended (`<user>:<service>`).
:param int uid: The unique user identifier
:param set multicast_groups: Indicates the groups where the service shall be published.
Note that the groups must be defined in the polo.conf file, or otherwise the method will throw an exception.
:param bool permanent: If set to true a file will be created and the service will be permanently offered until the file is deleted.
:param bool root: Stores the file in the marcopolo configuration directory.
This feature is only available to privileged users, by default root and users in the marcopolo group.
"""
if len(str(token)) == 0:
self.write_error("Need a token")
return
uid = tokenprovider.decrypt_token(token, self.secret)
try:
if uid is None or int(uid) < 0:
self.write_error("Bad token. Could not find user")
return
except ValueError as e:
self.write_error("Bad token")
return
error = False # Python does not allow throwing an exception insided an exception, so we use a flag
reason = ""
#Verification of services
if not isinstance(service, six.string_types):
error = True
reason = "Service must be a string"
#The service must be something larger than 1 character
if service is None or len(service) < 1:
error = True
reason = "Must be larger than 1"
if error:
logging.debug(error)
self.write_error("The name of the service %s is invalid: %s" % (service, reason))
return
error = False
faulty_ip = ''
#The IP addresses must be represented in valid dot notation and belong to the range 224-239
for ip in multicast_groups:
#The IP must be a string
error, faulty_ip, reason = utils.verify_ip(ip, multicast_groups)
if error == True:
logging.debug(reason)
try:
self.write_error("Invalid multicast group address '%s': %s" % (str(faulty_ip), reason))
except Exception:
self.write_error("Invalid multicast group address")
return
if type(permanent) is not bool:
logging.debug("Permanent must be boolean")
self.write_error("permanent must be boolean")
return
if type(root) is not bool:
self.write_error("root must be boolean")
return
#UID verification
user = self.validate_user(uid)
if user is None:
self.write_error("wrong user")
return
#Final entry with all service parameters
service_dict = {}
service_dict["id"] = service
#TODOservice_dict["params"] ={} Add parameters
service_dict["groups"] = multicast_groups
#Root service
error = False
error_reason = ""
if (len(multicast_groups) < 1):
multicast_groups = conf.MULTICAST_ADDRS
if root is True:
if not self.is_superuser(user):
self.write_error("Permission denied")
return
for group in multicast_groups:
#Only root or members of the `marcopolo` group can publish root services
if service in [s['id'] for s in self.offered_services[group]]:
error_reason = "Service %s already exists" % service
error = True
continue
if permanent is True:
services_dir = path.join(conf.CONF_DIR, conf.SERVICES_DIR)
if not path.exists(services_dir):
makedirs(services_dir)
os.chown(services_dir, 0, grp.getgrnam(conf.GROUP_NAME).gr_gid)
service_file = sanitize_path(service)
if path.isfile(path.join(services_dir, service_file)):
error_reason = "Permanent service %s already exists" % service
error = True
continue
try:
f = open(path.join(services_dir, service_file), 'w')
f.write(json.dumps(service_dict))
os.fchown(f.fileno(), user.pw_uid, user.pw_gid)
f.close()
service_dict["file"] = service_file
except Exception as e:
error_reason = "Could not write file"
error = True
continue
self.offered_services[group].append({"id":service, "permanent":permanent})
else:
service_dict["permanent"] = permanent
if not error:
self.write_ok(service)
else:
self.write_error(error_reason)
return
else:
error = False
for group in multicast_groups:
if self.user_services[group].get(user.pw_name, None):
if service in [s['id'] for s in self.user_services[group][user.pw_name]]:
error = True
continue
folder = user.pw_dir
deploy_folder = path.join(folder, conf.POLO_USER_DIR)
if permanent is True:
if not path.exists(deploy_folder):
makedirs(deploy_folder)
os.chown(deploy_folder, user.pw_uid, user.pw_gid)
service_file = sanitize_path(service)
if path.isfile(path.join(deploy_folder, service_file)):
#TODO: if unpublished and not deleted, this will be true
error = True
continue
try:
f = open(path.join(deploy_folder, service_file), 'w')
f.write(json.dumps(service_dict))
os.fchown(f.fileno(), user.pw_uid, user.pw_gid)
f.close()
except Exception as e:
logging.debug(e)
self.write_error("Could not write service file %s" % str(e))
error = True
return
if self.user_services[group].get(user.pw_name, None) is None:
self.user_services[group][user.pw_name] = []
self.user_services[group][user.pw_name].append({"id":service, "permanent":permanent})
logging.debug("Publishing user service %s in group %s" % (service, group))
else:
if not error:
self.write_ok(user.pw_name+":"+service)
else:
self.write_error("Service already exists")
[docs] def unpublish_service(self, service, token, multicast_groups=conf.MULTICAST_ADDRS, delete_file=False):
"""
Removes a service from the offered services structures and all associated files, upon request.
:param tuple address: A tuple with the requesting address and port.
:param str service: The id of the service to delete.
:param int uid: The uid of the requesting user.
:param list multicast_groups: The list of multicast_groups to delete the service from.\
If not defined, the service is removed from all groups.
:param bool delete_file: If set to ``True`` and the service is of type permanent,\
the service file is deleted. If the service is not permanent the parameter is ignored.
"""
logging.debug("Unpublishing service")
if len(str(token)) == 0:
self.write_error("Bad token")
return
uid = tokenprovider.decrypt_token(token, self.secret)
try:
if uid is None or int(uid) < 0:
self.write_error("Bad token")
return
except ValueError as e:
self.write_error("Bad token")
return
if len(multicast_groups) < 1:
multicast_groups = conf.MULTICAST_ADDRS
#Determine whether it is a root or a user service
#The IP addresses must be represented in valid dot notation and belong to the range 224-239
error = False
reason = ""
for ip in multicast_groups:
#The IP must be a string
error, faulty_ip, reason = utils.verify_ip(ip, multicast_groups)
print(error, ip)
if error == True:
logging.debug(reason)
try:
self.write_error("Invalid multicast group address '%s': %s" % (str(faulty_ip), reason))
except Exception:
self.write_error("Invalid multicast group address")
return
if error != False:
self.write_error("Invalid multicast group address '%s': %s" % (str(faulty_ip), reason))
return
if len(set(multicast_groups) - (set(conf.MULTICAST_ADDRS) & set(multicast_groups))) > 0:
self.write_error("The group %s is not available" % multicast_groups[0])
return
user = self.validate_user(uid)
if user is None:
self.write_error("wrong user")
return
if self.verify.match(service):
#user service
try:
user, service_name = self.verify.match(service).groups()
user_pwd = pwd.getpwnam(user)
except (IndexError, ValueError):
self.write_error("Invalid formatting")
return
if user_pwd is None:
self.write_error("Invalid user")
return
for group in multicast_groups:
if self.user_services[group].get(user, None) is not None:
match = next((s for s in self.user_services[group][user] if s['id'] == service_name), None)
if match:
is_permanent = match.get("permanent", False)
if delete_file and is_permanent:
folder = user_pwd.pw_dir
deploy_folder = path.join(folder, conf.POLO_USER_DIR)
if path.exists(deploy_folder) and isfile(path.join(deploy_folder, service_name)):
try:
os.remove(path.join(deploy_folder, service_name))
except Exception as e:
pass
else:
self.write_error("Could not find service file")
try:
self.user_services[group][user].remove(match)
except ValueError:
pass
else:
self.write_error("Could not find service")
return
else:
self.write_error("Could not find service")
return
else:
self.write_ok(0)
return
else:
#root service
error_str = ""
for group in multicast_groups:
match = next((s for s in self.offered_services[group] if s['id'] == service), None)
if match:
is_permanent = match.get("permanent", False)
if delete_file and is_permanent:
folder = path.join(conf.CONF_DIR, conf.SERVICES_DIR)
if path.exists(folder) and isfile(path.join(folder, service)):
with open(path.join(folder, service), 'r+') as f:
file_dict = json.load(f)
if len(file_dict.get("groups", [])) == 0:
try:
f.close()
os.remove(path.join(folder, service))
except Exception as e:
self.write_error("Internal error during processing of file")
else:
groups = file_dict.get("groups", [])
if len(groups) > 0:
try:
groups.remove(group)
except ValueError:
pass
if len(groups) == 0:
try:
f.close()
os.remove(path.join(folder, service))
except Exception as e:
self.write_error("Internal error during processing of file")
else:
f.seek(0, 0)
f.truncate()
json.dump(file_dict, f)
else:
self.write_error("Could not find service file")
try:
self.offered_services[group].remove(match)
except ValueError:
pass
else:
if delete_file:
folder = path.join(conf.CONF_DIR, conf.SERVICES_DIR)
if path.exists(folder) and isfile(path.join(folder, service)):
with open(path.join(folder, service), 'r+') as f:
file_dict = json.load(f)
if len(file_dict.get("groups", [])) < 1:
try:
f.close()
os.remove(path.join(folder, service))
except Exception as e:
self.write_error("Internal error during processing of file")
else:
groups = file_dict.get("groups", [])
if len(groups) > 0:
try:
groups.remove(group)
except ValueError:
pass
f.seek(0, 0)
f.truncate()
json.dump(file_dict, f)
else:
self.write_error("Could not find service")
return
try:
self.offered_services[group].remove(match)
except ValueError as e:
pass
else:
try:
self.write_ok(0)
except ValueError:
pass
[docs] def validate_user(self, uid):
"""
Returns a `pwd` structure if the uid is present in the passwd database.
Otherwise `None` is returned
:param string uid: The user identifier of the service
"""
if type(uid) != type(0):
return None
if uid < 0:
return None
try:
user = pwd.getpwuid(uid)
except KeyError:
return None
return user
[docs] def is_superuser(self, user):
"""
Returns `True` if the user is a 'superuser' (it is root or it a member of the `marcopolo` group)
:param string user: `pwd` structure with all the information from the user
"""
groups = [g.gr_name for g in grp.getgrall() if user.pw_name in g.gr_mem]
gid = user.pw_gid
groups.append(grp.getgrgid(gid).gr_name)
return 'marcopolo' in groups or user.pw_uid == 0