#!/usr/bin/env python # -*- mode: python -*- # # This file is part of eventmq. # # eventmq is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # eventmq is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with eventmq. If not, see . from __future__ import print_function import argparse import cmd import json from pprint import pprint import sys from eventmq import exceptions from eventmq.client.messages import send_emqp_message from eventmq.constants import ( PROTOCOL_VERSION, ROUTER_SHOW_WORKERS, ROUTER_SHOW_SCHEDULERS ) from eventmq.poller import Poller, POLLIN from eventmq.sender import Sender from eventmq.utils.messages import generate_msgid class Shell(cmd.Cmd): """ Interactive EventMQ Shell. To add a command, add a method called "do_". .. note:: Any replies from a component should be json encoded so it can be pretty printed with this tool """ def __init__(self, addr=None): cmd.Cmd.__init__(self) self.prompt = "> " self.socket = Sender() self.poller = Poller() self.poller.register(self.socket, POLLIN) self.addr = addr if self.addr: self.do_connect(addr) def do_connect(self, line): """ Connect to a component """ if not line: sys.stderr.write('Error: No address provided\n') return try: self.socket.connect(line) except exceptions.EventMQError as e: if "status=201" in e.message: sys.stderr.write('Error: Already connected to {} Disconnect ' 'first\n'.format(self.addr)) return raise self.addr = line print("Connecting to {}...".format(line)) self.send_message('HELLO') def do_disconnect(self, line): """ Disconnect from a service """ print('Closing connection to {}'.format(self.addr)) self.poller.unregister(self.socket) self.socket.rebuild() self.poller.register(self.socket) self.addr = None def do_status(self, line): """ Request the status of the connected component """ self.send_message('STATUS') pprint(self.recv_reply()) def do_show_workers(self, line): """ Request the status of the connected workers and queues """ self.send_message(ROUTER_SHOW_WORKERS) pprint(self.recv_reply()) def do_show_schedulers(self, line): """ Request the status of the connected schedulers """ self.send_message(ROUTER_SHOW_SCHEDULERS) pprint(self.recv_reply()) def do_shutdown(self, line): """ Request the connected component shutdown """ self.send_message('DISCONNECT') import time; time.sleep(1) print(self.socket.recv_multipart()) def do_send_cmd(self, line): """ Send a raw command to the connected host. """ def send_message(self, command, message=()): if not self.addr: sys.stderr.write('Error: Not connected\n') return message = (command, generate_msgid('admin:')) + message self.socket.send_multipart(message, PROTOCOL_VERSION) def recv_reply(self, timeout_secs=1000): if self.socket in self.poller.poll(timeout_secs): msg = self.socket.recv_multipart() return json.loads(msg[4]) def do_quit(self, line): return True def do_EOF(self, line): return self.do_quit(line) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Utility for interacting with ' 'an EventMQ cluster') parser.add_argument('--addr', '-a', type=str, nargs='?', help='specify address to connect to') args = parser.parse_args() app = Shell(addr=args.addr) app.cmdloop()