#!/usr/bin/env python
# -*- mode: python -*-
#
# This file is part of eventmq.
#
# eventmq is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# eventmq is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with eventmq. If not, see .
from __future__ import print_function
import argparse
import cmd
import json
from pprint import pprint
import sys
from eventmq import exceptions
from eventmq.client.messages import send_emqp_message
from eventmq.constants import (
PROTOCOL_VERSION, ROUTER_SHOW_WORKERS, ROUTER_SHOW_SCHEDULERS
)
from eventmq.poller import Poller, POLLIN
from eventmq.sender import Sender
from eventmq.utils.messages import generate_msgid
class Shell(cmd.Cmd):
"""
Interactive EventMQ Shell.
To add a command, add a method called "do_".
.. note::
Any replies from a component should be json encoded so it can be pretty
printed with this tool
"""
def __init__(self, addr=None):
cmd.Cmd.__init__(self)
self.prompt = "> "
self.socket = Sender()
self.poller = Poller()
self.poller.register(self.socket, POLLIN)
self.addr = addr
if self.addr:
self.do_connect(addr)
def do_connect(self, line):
"""
Connect to a component
"""
if not line:
sys.stderr.write('Error: No address provided\n')
return
try:
self.socket.connect(line)
except exceptions.EventMQError as e:
if "status=201" in e.message:
sys.stderr.write('Error: Already connected to {} Disconnect '
'first\n'.format(self.addr))
return
raise
self.addr = line
print("Connecting to {}...".format(line))
self.send_message('HELLO')
def do_disconnect(self, line):
"""
Disconnect from a service
"""
print('Closing connection to {}'.format(self.addr))
self.poller.unregister(self.socket)
self.socket.rebuild()
self.poller.register(self.socket)
self.addr = None
def do_status(self, line):
"""
Request the status of the connected component
"""
self.send_message('STATUS')
pprint(self.recv_reply())
def do_show_workers(self, line):
"""
Request the status of the connected workers and queues
"""
self.send_message(ROUTER_SHOW_WORKERS)
pprint(self.recv_reply())
def do_show_schedulers(self, line):
"""
Request the status of the connected schedulers
"""
self.send_message(ROUTER_SHOW_SCHEDULERS)
pprint(self.recv_reply())
def do_shutdown(self, line):
"""
Request the connected component shutdown
"""
self.send_message('DISCONNECT')
import time; time.sleep(1)
print(self.socket.recv_multipart())
def do_send_cmd(self, line):
"""
Send a raw command to the connected host.
"""
def send_message(self, command, message=()):
if not self.addr:
sys.stderr.write('Error: Not connected\n')
return
message = (command, generate_msgid('admin:')) + message
self.socket.send_multipart(message, PROTOCOL_VERSION)
def recv_reply(self, timeout_secs=1000):
if self.socket in self.poller.poll(timeout_secs):
msg = self.socket.recv_multipart()
return json.loads(msg[4])
def do_quit(self, line):
return True
def do_EOF(self, line):
return self.do_quit(line)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Utility for interacting with '
'an EventMQ cluster')
parser.add_argument('--addr', '-a', type=str, nargs='?',
help='specify address to connect to')
args = parser.parse_args()
app = Shell(addr=args.addr)
app.cmdloop()