aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Hurst2017-05-05 17:30:27 -0600
committerGitHub2017-05-05 17:30:27 -0600
commit7f705f2d21ae38e15e62a48a1813c5cf581d55f8 (patch)
treee399e1e6a684e89d752996eb22e29a6209740c72
parentecc2c55ee064a69ad4b2eff4f571733359c3e7f3 (diff)
parentc35585a437a456ef963a970d351f3236c0dd2e85 (diff)
downloadeventmq-7f705f2d21ae38e15e62a48a1813c5cf581d55f8.tar.gz
eventmq-7f705f2d21ae38e15e62a48a1813c5cf581d55f8.zip
Merge pull request #39 from com4/feature/status_subcommands
Feature/status subcommands
-rwxr-xr-xbin/emq-cli16
-rw-r--r--docs/protocol.rst19
-rw-r--r--eventmq/constants.py15
-rw-r--r--eventmq/router.py49
-rw-r--r--setup.py2
5 files changed, 64 insertions, 37 deletions
diff --git a/bin/emq-cli b/bin/emq-cli
index 52cf180..333f0f6 100755
--- a/bin/emq-cli
+++ b/bin/emq-cli
@@ -24,14 +24,14 @@ from pprint import pprint
24import sys 24import sys
25 25
26from eventmq import exceptions 26from eventmq import exceptions
27from eventmq.client.messages import send_emqp_message
28from eventmq.constants import ( 27from eventmq.constants import (
29 PROTOCOL_VERSION, ROUTER_SHOW_WORKERS, ROUTER_SHOW_SCHEDULERS 28 PROTOCOL_VERSION, STATUS_COMMANDS
30) 29)
31from eventmq.poller import Poller, POLLIN 30from eventmq.poller import Poller, POLLIN
32from eventmq.sender import Sender 31from eventmq.sender import Sender
33from eventmq.utils.messages import generate_msgid 32from eventmq.utils.messages import generate_msgid
34 33
34
35class Shell(cmd.Cmd): 35class Shell(cmd.Cmd):
36 """ 36 """
37 Interactive EventMQ Shell. 37 Interactive EventMQ Shell.
@@ -92,21 +92,21 @@ class Shell(cmd.Cmd):
92 """ 92 """
93 Request the status of the connected component 93 Request the status of the connected component
94 """ 94 """
95 self.send_message('STATUS') 95 self.send_message('STATUS', ('',))
96 pprint(self.recv_reply()) 96 pprint(self.recv_reply())
97 97
98 def do_show_workers(self, line): 98 def do_show_managers(self, line):
99 """ 99 """
100 Request the status of the connected workers and queues 100 Request the status of the connected job managers and queues
101 """ 101 """
102 self.send_message(ROUTER_SHOW_WORKERS) 102 self.send_message(('STATUS', (STATUS_COMMANDS.show_managers,)))
103 pprint(self.recv_reply()) 103 pprint(self.recv_reply())
104 104
105 def do_show_schedulers(self, line): 105 def do_show_schedulers(self, line):
106 """ 106 """
107 Request the status of the connected schedulers 107 Request the status of the connected schedulers
108 """ 108 """
109 self.send_message(ROUTER_SHOW_SCHEDULERS) 109 self.send_message('STATUS', (STATUS_COMMANDS.show_schedulers,))
110 pprint(self.recv_reply()) 110 pprint(self.recv_reply())
111 111
112 def do_shutdown(self, line): 112 def do_shutdown(self, line):
@@ -128,7 +128,7 @@ class Shell(cmd.Cmd):
128 return 128 return
129 129
130 message = (command, generate_msgid('admin:')) + message 130 message = (command, generate_msgid('admin:')) + message
131 131 print('sending: {}'.format(message))
132 self.socket.send_multipart(message, PROTOCOL_VERSION) 132 self.socket.send_multipart(message, PROTOCOL_VERSION)
133 133
134 def recv_reply(self, timeout_secs=1000): 134 def recv_reply(self, timeout_secs=1000):
diff --git a/docs/protocol.rst b/docs/protocol.rst
index 66b6829..d63d4b1 100644
--- a/docs/protocol.rst
+++ b/docs/protocol.rst
@@ -72,7 +72,7 @@ FRAME Value Description
724 _MSG_ The reply to respond with 724 _MSG_ The reply to respond with
73====== ============== =========== 73====== ============== ===========
74 74
75A **HEARTBEAT** frame consists of a 75A **HEARTBEAT** frame consists of a 5-frame multipart message formatted as follows.
76 76
77====== ============== =========== 77====== ============== ===========
78FRAME Value Description 78FRAME Value Description
@@ -84,7 +84,7 @@ FRAME Value Description
844 _UNIX_TS_ A unix timestamp 844 _UNIX_TS_ A unix timestamp
85====== ============== =========== 85====== ============== ===========
86 86
87A **DISCONNECT** frame consists of 87A **DISCONNECT** frame consists of a 4-frame multipart message formatted as follows.
88 88
89====== ============== =========== 89====== ============== ===========
90FRAME Value Description 90FRAME Value Description
@@ -95,7 +95,7 @@ FRAME Value Description
953 _MSGID_ A unique id for the msg 953 _MSGID_ A unique id for the msg
96====== ============== =========== 96====== ============== ===========
97 97
98A **KBAI** frame consists of 98A **KBYE** frame consists of a 4-frame multipart message formatted as follows.
99 99
100====== ============== =========== 100====== ============== ===========
101FRAME Value Description 101FRAME Value Description
@@ -106,6 +106,19 @@ FRAME Value Description
1063 _MSGID_ A unique id for the msg 1063 _MSGID_ A unique id for the msg
107====== ============== =========== 107====== ============== ===========
108 108
109A **STATUS** frame consists of a 5-frame multipart message formatted as follows.
110
111====== ============== ===========
112FRAME Value Description
113====== ============== ===========
1140 _EMPTY_ leave empty
1151 eMQP/1.0 Protocol version
1162 STATUS command
1173 _MSGID_ A unique id for the msg
1184 _SUB_COMMAND_ A sub command (or empty string). Defined on a per device basis.
119====== ============== ===========
120
121
109eMQP / Client 122eMQP / Client
110------------- 123-------------
111A **REQUEST** command consists of a 7-frame multipart message, formatted as follows. 124A **REQUEST** command consists of a 7-frame multipart message, formatted as follows.
diff --git a/eventmq/constants.py b/eventmq/constants.py
index 8cc5d7a..b668f1e 100644
--- a/eventmq/constants.py
+++ b/eventmq/constants.py
@@ -18,13 +18,20 @@ class CLIENT_TYPE(object):
18# See doc/protocol.rst 18# See doc/protocol.rst
19PROTOCOL_VERSION = 'eMQP/1.0' 19PROTOCOL_VERSION = 'eMQP/1.0'
20 20
21# PROTOCOL COMMANDS
22DISCONNECT = "DISCONNECT" 21DISCONNECT = "DISCONNECT"
23KBYE = "KBYE" 22KBYE = "KBYE"
23STATUS_CMD = "STATUS"
24
25
26class STATUS_COMMANDS(object):
27 """
28 Defines the STATUS sub commands
29 """
30 #: Router subommand to show connected job managbers
31 show_managers = 'show_managers'
32 #: Router subcommand to show connected schedulers
33 show_schedulers = 'show_schedulers'
24 34
25# ADMINISTRATIVE COMMANDS
26ROUTER_SHOW_WORKERS = 'ROUTER_SHOW_WORKERS'
27ROUTER_SHOW_SCHEDULERS = 'ROUTER_SHOW_SCHEDULERS'
28 35
29# ENVIRONMENT VARIABLES 36# ENVIRONMENT VARIABLES
30ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' 37ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR'
diff --git a/eventmq/router.py b/eventmq/router.py
index 9971c4c..2b27026 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -24,8 +24,7 @@ import signal
24 24
25from . import constants, exceptions, poller, receiver 25from . import constants, exceptions, poller, receiver
26from .constants import ( 26from .constants import (
27 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, 27 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, STATUS, STATUS_COMMANDS
28 ROUTER_SHOW_WORKERS, STATUS
29) 28)
30from .settings import conf, reload_settings 29from .settings import conf, reload_settings
31from .utils import tuplify 30from .utils import tuplify
@@ -176,25 +175,7 @@ class Router(HeartbeatMixin):
176 175
177 if events.get(self.administrative_socket) == poller.POLLIN: 176 if events.get(self.administrative_socket) == poller.POLLIN:
178 msg = self.administrative_socket.recv_multipart() 177 msg = self.administrative_socket.recv_multipart()
179 logger.debug('ADMIN: {}'.format(msg)) 178 self.process_admin_message(msg)
180 # ##############
181 # Admin Commands
182 # ##############
183 if len(msg) > 4:
184 if msg[3] == DISCONNECT:
185 logger.info('Received DISCONNECT from administrator')
186 self.send_ack(
187 self.administrative_socket, msg[0], msg[4])
188 self.on_disconnect(msg[4], msg)
189 elif msg[3] == 'STATUS':
190 sendmsg(self.administrative_socket, msg[0], 'REPLY',
191 (self.get_status(),))
192 elif msg[3] == ROUTER_SHOW_WORKERS:
193 sendmsg(self.administrative_socket, msg[0], 'REPLY',
194 (self.get_workers_status(),))
195 elif msg[3] == ROUTER_SHOW_SCHEDULERS:
196 sendmsg(self.administrative_socket, msg[0], 'REPLY',
197 (self.get_schedulers_status(),))
198 179
199 # TODO: Optimization: the calls to functions could be done in 180 # TODO: Optimization: the calls to functions could be done in
200 # another thread so they don't block the loop. synchronize 181 # another thread so they don't block the loop. synchronize
@@ -822,6 +803,32 @@ class Router(HeartbeatMixin):
822 func = getattr(self, "on_%s" % command.lower()) 803 func = getattr(self, "on_%s" % command.lower())
823 func(sender, msgid, message) 804 func(sender, msgid, message)
824 805
806 def process_admin_message(self, msg):
807 """
808 This method is called when a message comes in from the administrative
809 socket.
810
811 Args:
812 msg: The untouched admin message from zmq
813 """
814 logger.debug('ADMIN MSG: {}'.format(msg))
815 if len(msg) > 4:
816 if msg[3] == DISCONNECT:
817 logger.info('Received DISCONNECT from administrator')
818 self.send_ack(
819 self.administrative_socket, msg[0], msg[4])
820 self.on_disconnect(msg[4], msg)
821 elif msg[3] == 'STATUS':
822 if msg[5] == STATUS_COMMANDS.show_managers:
823 sendmsg(self.administrative_socket, msg[0], 'REPLY',
824 (self.get_workers_status(),))
825 elif msg[5] == STATUS_COMMANDS.show_schedulers:
826 sendmsg(self.administrative_socket, msg[0], 'REPLY',
827 (self.get_schedulers_status(),))
828 else:
829 sendmsg(self.administrative_socket, msg[0], 'REPLY',
830 (self.get_status(),))
831
825 def _remove_worker(self, worker_id): 832 def _remove_worker(self, worker_id):
826 """ 833 """
827 Remove worker with given id from any queues it belongs to. 834 Remove worker with given id from any queues it belongs to.
diff --git a/setup.py b/setup.py
index 72e8e35..3f4eb70 100644
--- a/setup.py
+++ b/setup.py
@@ -19,7 +19,7 @@ setup(
19 'psutil==5.0.0', 19 'psutil==5.0.0',
20 'python-dateutil>=2.1,<3.0.0'], 20 'python-dateutil>=2.1,<3.0.0'],
21 extras_require={ 21 extras_require={
22 'docs': ['Sphinx==1.5.2', ], 22 'docs': ['Sphinx==1.5.3', ],
23 'testing': [ 23 'testing': [
24 'flake8==3.2.1', 24 'flake8==3.2.1',
25 'flake8-import-order==0.11', 25 'flake8-import-order==0.11',