Source code for marcodeployer.receiver

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from tornado.web import Application, RequestHandler, StaticFileHandler
from tornado import ioloop
from tornado.websocket import WebSocketHandler
import tornado.web
from tornado.httpserver import HTTPServer 
from tornado import ioloop
from tornado.gen import coroutine

import os, time
from shlex import split
from subprocess import Popen
import subprocess
import pwd
import sys


sys.path.append(os.path.realpath(__file__))


from tornado.ioloop import PeriodicCallback
import signal
from os import path, makedirs

import subprocess
import fcntl
import sys
import json

import socket
import string, random
import hashlib
from concurrent import futures
from tornado.web import decode_signed_value
import logging
import ssl

from marcopolo.bindings.polo import Polo, PoloInternalException, PoloException

import six
from six.moves.urllib import parse as urlparse

from marcodeployer.bufferprocessor import ProcessReactor
from marcodeployer.statusmonitor import get_data
from marcodeployer.utils import getip 
from marcodeployer import conf

#ip = ""
ip = getip(conf.INTERFACE)
opensockets={}
io_loop = ioloop.IOLoop.instance()
data_dict = {}
data_json = ""

response_dict = {}
statusmonitor_open_sockets =  []
getDataCallback = None
processes = {}

[docs]def shutdown(signal, frame): """ Stops the application gracefully, closing all socket connections and unpublishing the MarcoPolo services. """ logging.info("Stopping gracefully") for socket in statusmonitor_open_sockets: socket.close() try: Polo().unpublish_service(conf.RECEIVER_SERVICE_NAME, delete_file=True) Polo().unpublish_service(conf.STATUS_MONITOR_SERVICE_NAME, delete_file=True) except Exception as e: logging.warning(e) io_loop.stop()
[docs]def sigint_handler(signal, frame): io_loop.add_callback(shutdown)
signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler)
[docs]class DeployHandler(RequestHandler): @tornado.web.asynchronous
[docs] def post(self): """ POST handler received from ``deployer.py``. It handles the deployment of the file and the execution of the desired command. """ file1 = self.request.files['file'][0] command = self.get_argument('command', '') idpolo = self.get_argument('idpolo', '') tomcat = self.get_argument('tomcat', '') if not tomcat: folder = self.get_argument('folder', '') else: folder = conf.TOMCAT_PATH fname = file1['filename'] user = self.get_argument('user', '') user_pwd = pwd.getpwnam(user) #Handling of relative paths folder = folder.replace('~', user_pwd.pw_dir, 1) if len(folder) == 0 or folder[0] != '/': folder = os.path.join(user_pwd.pw_dir, folder) if folder == '': folder = user_pwd.pw_dir if not os.path.isdir(folder): return if not os.path.exists(folder): makedirs(folder) chown(folder, user.pw_uid, user.pw_gid) final_directory = os.path.join(folder, fname) overwrite = self.get_argument('overwrite', 'false') overwrite = False if overwrite.lower() == 'false' else True; thread_pool = futures.ThreadPoolExecutor(max_workers=1) thread_pool.submit(self.execute, command=command, file_desc=file1, filename=final_directory, directory=folder, user=user_pwd, tomcat=tomcat, overwrite=overwrite) self.finish('OK')
@tornado.web.asynchronous
[docs] def execute(self, command, file_desc, filename, directory, user, tomcat=False, overwrite="false"): if os.path.isfile(filename) and not overwrite: return def demote(user_uid, user_gid): os.setgid(user_gid) os.setuid(user_uid) if os.path.exists(os.path.dirname(filename)): output_file = open(filename, 'wb') else: return output_file.write(file_desc['body']) if not tomcat: os.chown(filename, user.pw_uid, user.pw_gid) else: os.chown(filename, pwd.getpwnam('tomcat7').pw_uid, pwd.getpwnam('tomcat7').pw_gid) output_file.close() if len(command) > 0: p = ProcessReactor(user, directory, io_loop, ip, opensockets, split(command), shell=False) if processes.get(user.pw_name, None) is None: processes[user.pw_name] = set() processes[user.pw_name].add(p)
secret = "" try: with open(conf.SECRET_FILE, 'r') as secret_file: secret = secret_file.read() except Exception as e: logging.error("Could not open secret file") #sys.exit(1) settings = { "debug": True, "static_path": conf.STATIC_PATH, #os.path.join(os.path.dirname(__file__), "static"), "cookie_secret": secret }
[docs]class LoggerHandler(WebSocketHandler): """ Processes the logging messages """
[docs] def check_origin(self, origin): """ Overrides the parent method to return True for any request, since we are working without names :returns: bool True """ return True
[docs] def open(self): """ Notifies the opening of a new Logger connection """ logging.debug("A new connection was open")
[docs] def on_message(self, message): """ A message is sent by the client after creating the connection. The method verifies the user secret cookie and appends the connection to the opensockets dictionary. :param str message: The received message """ user_id = decode_signed_value(settings["cookie_secret"], 'user', json.loads(message).get("register", "")).decode('utf-8') """ If the user_id is other than None the verification has succeded, and the connection is appended to the rest of the websockets related to the user. """ if user_id is not None: if opensockets.get(user_id) is None: opensockets[user_id] = [] opensockets[user_id].append(self) else: logging.debug("The user could not be found")
[docs] def on_line(self, user, command, message, ip, identifier, stop=False, stream_name="stdout", *args, **kwargs): """ The io_loop calls the function when a new message appears. :param str user: The name of the user :param str command: The command in execution :param str message: The message to deliver :param str ip: The ip of the server, so the client knows where the message comes from :param boolean stop: Determines if the connection must be closed or not *deprecated* :param str stream_name: The name of the stream """ msg = {} msg["user"] = user msg["command"] = command msg["message"] = message msg["ip"] = ip msg["identifier"] = identifier msg["stop"] = stop msg["stream_name"] = stream_name msg["shell"] = kwargs.get("shell", False) self.write_message(json.dumps(msg))
[docs] def on_close(self): """ Removes the connection from the opensockets dictionary """ for ws in opensockets: if self in opensockets[ws]: opensockets[ws].remove(self) break else: logging.debug("The websocket could not be found")
[docs]class ShellHandler(LoggerHandler):
[docs] def on_message(self, message): message_json = message#.decode('utf-8') try: message_dict = json.loads(message_json) except ValueError as v: return if message_dict.get("register", None) is not None: user_id = decode_signed_value(settings["cookie_secret"], 'user', message_dict["register"]).decode('utf-8') if not user_id is None: if opensockets.get(user_id) is None: opensockets[user_id] = [] opensockets[user_id].append(self) else: pass elif message_dict.get("command", None) is not None: user_id = decode_signed_value(settings["cookie_secret"], 'user', message_dict.get("user_id", "")) if user_id is not None: user_id = user_id.decode('utf-8') user_pwd = pwd.getpwnam(user_id) try: command = message_dict["command"] p = ProcessReactor(user_pwd, user_pwd.pw_dir, io_loop, ip, opensockets, split(command), shell=True) if processes.get(user_id, None) is None: processes[user_id] = set() processes[user_id].add(p) except Exception as e: logging.warning(e) elif message_dict.get("remove", None) is not None: logging.debug("remove") user_id = decode_signed_value(settings["cookie_secret"], 'user', message_dict.get("user_id", "")) if user_id is not None: identifier = message_dict.get("remove", None) if identifier is not None: print("identifier", identifier) process = next((x for x in processes.get(user_id, set()) if x.identifier == identifier), None) if process is not None: process.stop() processes[user_id].remove(process) elif message_dict.get("removeshell", None) is not None: logging.debug("removeshell") user_id = decode_signed_value(settings["cookie_secret"], 'user', message_dict.get("user_id", "")) if user_id is not None: identifiers = message_dict.get("removeshell", None) if identifiers is not None: try: identifiers_dict = message_dict["removeshell"] for identifier in identifiers: logging.debug(identifier) logging.debug(processes.get(user_id, set())) process = next((x for x in processes.get(user_id, set()) if x.identifier == identifier), None) if process is not None: logging.debug("Killing") process.stop() processes[user_id].remove(process) except ValueError as v: logging.warning(v) except KeyError as k: logging.warning(k)
[docs]class ProbeWSHandler(WebSocketHandler):
[docs] def check_origin(self, origin): """ Overrides the parent method to return True for any request, since we are working without names :returns: bool True """ return True
[docs] def open(self): """ Returns a confirmation message """ self.write_message("OK") self.close()
[docs]class ProbeHandler(RequestHandler):
[docs] def get(self): self.write("You should be able to create websocket connections now")
[docs]def start_callback(): """ Starts the collection callback """ global getDataCallback if getDataCallback is None: getDataCallback = PeriodicCallback(process_data, conf.REFRESH_FREQ) getDataCallback.start() elif not getDataCallback.is_running(): getDataCallback.start()
[docs]def stop_callback(): """ If the number of open connections is zero, stops the collection callback of data. """ global getDataCallback if getDataCallback is not None: if len(statusmonitor_open_sockets) == 0: getDataCallback.stop()
[docs]def process_data(): """ Processes the statusmonitor data """ global data_dict, data_json if len(statusmonitor_open_sockets) > 0: data_dict = get_data() data_json = json.dumps(data_dict,separators=(',',':'))
[docs]class SocketHandler(WebSocketHandler):
[docs] def check_origin(self, origin): """ Overrides the parent method to return True for any request, since we are working without names :returns: bool True """ return True
[docs] def open(self): logging.info("Connection open from " + self.request.remote_ip) if not self in statusmonitor_open_sockets: statusmonitor_open_sockets.append(self) #http://stackoverflow.com/a/19571205 self.callback = PeriodicCallback(self.send_data, 1000) self.callback.start() start_callback()
[docs] def send_data(self): self.write_message(data_json) return
[docs] def on_close(self): self.callback.stop() if self in statusmonitor_open_sockets: statusmonitor_open_sockets.remove(self) stop_callback()
[docs] def send_update(self): pass
routes = [ (r'/deploy/?', DeployHandler), ] routes_ws = [ (r'/ws/probe/', ProbeWSHandler), (r'/ws/status/', SocketHandler), (r'/ws/logger/', ShellHandler), (r'/probe/', ProbeHandler), (r'/', ProbeHandler), ] app = Application(routes, **settings) wsapp = Application(routes_ws, **settings);
[docs]def main(args=None): """ Creates the servers, initializes the logging facility, publishes the ``MarcoPolo`` services and starts the ``io_loop``. """ ip = getip(conf.INTERFACE) pid = os.getpid() #if not os.path.exists('/var/run/marcopolo'): # makedirs('/var/run/marcopolo') logging.basicConfig(filename=conf.RECEIVER_LOG_FILE, level=getattr(logging, conf.RECEIVER_LOGLEVEL.upper())) httpServer = HTTPServer(app, ssl_options={ "certfile": conf.RECEIVERCERT, "keyfile": conf.RECEIVERKEY, "cert_reqs": ssl.CERT_REQUIRED, "ca_certs": conf.APPCERT, }) httpServer.listen(conf.RECEIVER_PORT) wsapp.listen(conf.RECEIVER_WEBSOCKET_PORT, ssl_options={"certfile": conf.APPCERT, "keyfile": conf.APPKEY}) while True: try: Polo().publish_service(conf.RECEIVER_SERVICE_NAME, root=True) Polo().publish_service(conf.STATUS_MONITOR_SERVICE_NAME, root=True) break except PoloInternalException as e: logging.warning(e) time.sleep(1) except PoloException as i: logging.warning(i) break logging.info("Starting receiver on port %d. WebSockets on %d" % (conf.RECEIVER_PORT, conf.RECEIVER_WEBSOCKET_PORT)) io_loop.start()
if __name__ == "__main__": main(sys.argv[1:])