1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
#!/usr/bin/env python
# -*- mode: python -*-
#
# This file is part of eventmq.
#
# eventmq is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# eventmq is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import argparse
import cmd
import json
from pprint import pprint
import sys
from eventmq import exceptions
from eventmq.client.messages import send_emqp_message
from eventmq.constants import (
PROTOCOL_VERSION, ROUTER_SHOW_WORKERS, ROUTER_SHOW_SCHEDULERS
)
from eventmq.poller import Poller, POLLIN
from eventmq.sender import Sender
from eventmq.utils.messages import generate_msgid
class Shell(cmd.Cmd):
"""
Interactive EventMQ Shell.
To add a command, add a method called "do_<MYCOMMDAND>".
.. note::
Any replies from a component should be json encoded so it can be pretty
printed with this tool
"""
def __init__(self, addr=None):
cmd.Cmd.__init__(self)
self.prompt = "> "
self.socket = Sender()
self.poller = Poller()
self.poller.register(self.socket, POLLIN)
self.addr = addr
if self.addr:
self.do_connect(addr)
def do_connect(self, line):
"""
Connect to a component
"""
if not line:
sys.stderr.write('Error: No address provided\n')
return
try:
self.socket.connect(line)
except exceptions.EventMQError as e:
if "status=201" in e.message:
sys.stderr.write('Error: Already connected to {} Disconnect '
'first\n'.format(self.addr))
return
raise
self.addr = line
print("Connecting to {}...".format(line))
self.send_message('HELLO')
def do_disconnect(self, line):
"""
Disconnect from a service
"""
print('Closing connection to {}'.format(self.addr))
self.poller.unregister(self.socket)
self.socket.rebuild()
self.poller.register(self.socket)
self.addr = None
def do_status(self, line):
"""
Request the status of the connected component
"""
self.send_message('STATUS')
pprint(self.recv_reply())
def do_show_workers(self, line):
"""
Request the status of the connected workers and queues
"""
self.send_message(ROUTER_SHOW_WORKERS)
pprint(self.recv_reply())
def do_show_schedulers(self, line):
"""
Request the status of the connected schedulers
"""
self.send_message(ROUTER_SHOW_SCHEDULERS)
pprint(self.recv_reply())
def do_shutdown(self, line):
"""
Request the connected component shutdown
"""
self.send_message('DISCONNECT')
import time; time.sleep(1)
print(self.socket.recv_multipart())
def do_send_cmd(self, line):
"""
Send a raw command to the connected host.
"""
def send_message(self, command, message=()):
if not self.addr:
sys.stderr.write('Error: Not connected\n')
return
message = (command, generate_msgid('admin:')) + message
self.socket.send_multipart(message, PROTOCOL_VERSION)
def recv_reply(self, timeout_secs=1000):
if self.socket in self.poller.poll(timeout_secs):
msg = self.socket.recv_multipart()
return json.loads(msg[4])
def do_quit(self, line):
return True
def do_EOF(self, line):
return self.do_quit(line)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Utility for interacting with '
'an EventMQ cluster')
parser.add_argument('--addr', '-a', type=str, nargs='?',
help='specify address to connect to')
args = parser.parse_args()
app = Shell(addr=args.addr)
app.cmdloop()
|