diff options
| author | David Hurst | 2017-05-05 17:30:27 -0600 |
|---|---|---|
| committer | GitHub | 2017-05-05 17:30:27 -0600 |
| commit | 7f705f2d21ae38e15e62a48a1813c5cf581d55f8 (patch) | |
| tree | e399e1e6a684e89d752996eb22e29a6209740c72 | |
| parent | ecc2c55ee064a69ad4b2eff4f571733359c3e7f3 (diff) | |
| parent | c35585a437a456ef963a970d351f3236c0dd2e85 (diff) | |
| download | eventmq-7f705f2d21ae38e15e62a48a1813c5cf581d55f8.tar.gz eventmq-7f705f2d21ae38e15e62a48a1813c5cf581d55f8.zip | |
Merge pull request #39 from com4/feature/status_subcommands
Feature/status subcommands
| -rwxr-xr-x | bin/emq-cli | 16 | ||||
| -rw-r--r-- | docs/protocol.rst | 19 | ||||
| -rw-r--r-- | eventmq/constants.py | 15 | ||||
| -rw-r--r-- | eventmq/router.py | 49 | ||||
| -rw-r--r-- | setup.py | 2 |
5 files changed, 64 insertions, 37 deletions
diff --git a/bin/emq-cli b/bin/emq-cli index 52cf180..333f0f6 100755 --- a/bin/emq-cli +++ b/bin/emq-cli | |||
| @@ -24,14 +24,14 @@ from pprint import pprint | |||
| 24 | import sys | 24 | import sys |
| 25 | 25 | ||
| 26 | from eventmq import exceptions | 26 | from eventmq import exceptions |
| 27 | from eventmq.client.messages import send_emqp_message | ||
| 28 | from eventmq.constants import ( | 27 | from eventmq.constants import ( |
| 29 | PROTOCOL_VERSION, ROUTER_SHOW_WORKERS, ROUTER_SHOW_SCHEDULERS | 28 | PROTOCOL_VERSION, STATUS_COMMANDS |
| 30 | ) | 29 | ) |
| 31 | from eventmq.poller import Poller, POLLIN | 30 | from eventmq.poller import Poller, POLLIN |
| 32 | from eventmq.sender import Sender | 31 | from eventmq.sender import Sender |
| 33 | from eventmq.utils.messages import generate_msgid | 32 | from eventmq.utils.messages import generate_msgid |
| 34 | 33 | ||
| 34 | |||
| 35 | class Shell(cmd.Cmd): | 35 | class Shell(cmd.Cmd): |
| 36 | """ | 36 | """ |
| 37 | Interactive EventMQ Shell. | 37 | Interactive EventMQ Shell. |
| @@ -92,21 +92,21 @@ class Shell(cmd.Cmd): | |||
| 92 | """ | 92 | """ |
| 93 | Request the status of the connected component | 93 | Request the status of the connected component |
| 94 | """ | 94 | """ |
| 95 | self.send_message('STATUS') | 95 | self.send_message('STATUS', ('',)) |
| 96 | pprint(self.recv_reply()) | 96 | pprint(self.recv_reply()) |
| 97 | 97 | ||
| 98 | def do_show_workers(self, line): | 98 | def do_show_managers(self, line): |
| 99 | """ | 99 | """ |
| 100 | Request the status of the connected workers and queues | 100 | Request the status of the connected job managers and queues |
| 101 | """ | 101 | """ |
| 102 | self.send_message(ROUTER_SHOW_WORKERS) | 102 | self.send_message(('STATUS', (STATUS_COMMANDS.show_managers,))) |
| 103 | pprint(self.recv_reply()) | 103 | pprint(self.recv_reply()) |
| 104 | 104 | ||
| 105 | def do_show_schedulers(self, line): | 105 | def do_show_schedulers(self, line): |
| 106 | """ | 106 | """ |
| 107 | Request the status of the connected schedulers | 107 | Request the status of the connected schedulers |
| 108 | """ | 108 | """ |
| 109 | self.send_message(ROUTER_SHOW_SCHEDULERS) | 109 | self.send_message('STATUS', (STATUS_COMMANDS.show_schedulers,)) |
| 110 | pprint(self.recv_reply()) | 110 | pprint(self.recv_reply()) |
| 111 | 111 | ||
| 112 | def do_shutdown(self, line): | 112 | def do_shutdown(self, line): |
| @@ -128,7 +128,7 @@ class Shell(cmd.Cmd): | |||
| 128 | return | 128 | return |
| 129 | 129 | ||
| 130 | message = (command, generate_msgid('admin:')) + message | 130 | message = (command, generate_msgid('admin:')) + message |
| 131 | 131 | print('sending: {}'.format(message)) | |
| 132 | self.socket.send_multipart(message, PROTOCOL_VERSION) | 132 | self.socket.send_multipart(message, PROTOCOL_VERSION) |
| 133 | 133 | ||
| 134 | def recv_reply(self, timeout_secs=1000): | 134 | def recv_reply(self, timeout_secs=1000): |
diff --git a/docs/protocol.rst b/docs/protocol.rst index 66b6829..d63d4b1 100644 --- a/docs/protocol.rst +++ b/docs/protocol.rst | |||
| @@ -72,7 +72,7 @@ FRAME Value Description | |||
| 72 | 4 _MSG_ The reply to respond with | 72 | 4 _MSG_ The reply to respond with |
| 73 | ====== ============== =========== | 73 | ====== ============== =========== |
| 74 | 74 | ||
| 75 | A **HEARTBEAT** frame consists of a | 75 | A **HEARTBEAT** frame consists of a 5-frame multipart message formatted as follows. |
| 76 | 76 | ||
| 77 | ====== ============== =========== | 77 | ====== ============== =========== |
| 78 | FRAME Value Description | 78 | FRAME Value Description |
| @@ -84,7 +84,7 @@ FRAME Value Description | |||
| 84 | 4 _UNIX_TS_ A unix timestamp | 84 | 4 _UNIX_TS_ A unix timestamp |
| 85 | ====== ============== =========== | 85 | ====== ============== =========== |
| 86 | 86 | ||
| 87 | A **DISCONNECT** frame consists of | 87 | A **DISCONNECT** frame consists of a 4-frame multipart message formatted as follows. |
| 88 | 88 | ||
| 89 | ====== ============== =========== | 89 | ====== ============== =========== |
| 90 | FRAME Value Description | 90 | FRAME Value Description |
| @@ -95,7 +95,7 @@ FRAME Value Description | |||
| 95 | 3 _MSGID_ A unique id for the msg | 95 | 3 _MSGID_ A unique id for the msg |
| 96 | ====== ============== =========== | 96 | ====== ============== =========== |
| 97 | 97 | ||
| 98 | A **KBAI** frame consists of | 98 | A **KBYE** frame consists of a 4-frame multipart message formatted as follows. |
| 99 | 99 | ||
| 100 | ====== ============== =========== | 100 | ====== ============== =========== |
| 101 | FRAME Value Description | 101 | FRAME Value Description |
| @@ -106,6 +106,19 @@ FRAME Value Description | |||
| 106 | 3 _MSGID_ A unique id for the msg | 106 | 3 _MSGID_ A unique id for the msg |
| 107 | ====== ============== =========== | 107 | ====== ============== =========== |
| 108 | 108 | ||
| 109 | A **STATUS** frame consists of a 5-frame multipart message formatted as follows. | ||
| 110 | |||
| 111 | ====== ============== =========== | ||
| 112 | FRAME Value Description | ||
| 113 | ====== ============== =========== | ||
| 114 | 0 _EMPTY_ leave empty | ||
| 115 | 1 eMQP/1.0 Protocol version | ||
| 116 | 2 STATUS command | ||
| 117 | 3 _MSGID_ A unique id for the msg | ||
| 118 | 4 _SUB_COMMAND_ A sub command (or empty string). Defined on a per device basis. | ||
| 119 | ====== ============== =========== | ||
| 120 | |||
| 121 | |||
| 109 | eMQP / Client | 122 | eMQP / Client |
| 110 | ------------- | 123 | ------------- |
| 111 | A **REQUEST** command consists of a 7-frame multipart message, formatted as follows. | 124 | A **REQUEST** command consists of a 7-frame multipart message, formatted as follows. |
diff --git a/eventmq/constants.py b/eventmq/constants.py index 8cc5d7a..b668f1e 100644 --- a/eventmq/constants.py +++ b/eventmq/constants.py | |||
| @@ -18,13 +18,20 @@ class CLIENT_TYPE(object): | |||
| 18 | # See doc/protocol.rst | 18 | # See doc/protocol.rst |
| 19 | PROTOCOL_VERSION = 'eMQP/1.0' | 19 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 20 | 20 | ||
| 21 | # PROTOCOL COMMANDS | ||
| 22 | DISCONNECT = "DISCONNECT" | 21 | DISCONNECT = "DISCONNECT" |
| 23 | KBYE = "KBYE" | 22 | KBYE = "KBYE" |
| 23 | STATUS_CMD = "STATUS" | ||
| 24 | |||
| 25 | |||
| 26 | class STATUS_COMMANDS(object): | ||
| 27 | """ | ||
| 28 | Defines the STATUS sub commands | ||
| 29 | """ | ||
| 30 | #: Router subommand to show connected job managbers | ||
| 31 | show_managers = 'show_managers' | ||
| 32 | #: Router subcommand to show connected schedulers | ||
| 33 | show_schedulers = 'show_schedulers' | ||
| 24 | 34 | ||
| 25 | # ADMINISTRATIVE COMMANDS | ||
| 26 | ROUTER_SHOW_WORKERS = 'ROUTER_SHOW_WORKERS' | ||
| 27 | ROUTER_SHOW_SCHEDULERS = 'ROUTER_SHOW_SCHEDULERS' | ||
| 28 | 35 | ||
| 29 | # ENVIRONMENT VARIABLES | 36 | # ENVIRONMENT VARIABLES |
| 30 | ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' | 37 | ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' |
diff --git a/eventmq/router.py b/eventmq/router.py index 9971c4c..2b27026 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -24,8 +24,7 @@ import signal | |||
| 24 | 24 | ||
| 25 | from . import constants, exceptions, poller, receiver | 25 | from . import constants, exceptions, poller, receiver |
| 26 | from .constants import ( | 26 | from .constants import ( |
| 27 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, | 27 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, STATUS, STATUS_COMMANDS |
| 28 | ROUTER_SHOW_WORKERS, STATUS | ||
| 29 | ) | 28 | ) |
| 30 | from .settings import conf, reload_settings | 29 | from .settings import conf, reload_settings |
| 31 | from .utils import tuplify | 30 | from .utils import tuplify |
| @@ -176,25 +175,7 @@ class Router(HeartbeatMixin): | |||
| 176 | 175 | ||
| 177 | if events.get(self.administrative_socket) == poller.POLLIN: | 176 | if events.get(self.administrative_socket) == poller.POLLIN: |
| 178 | msg = self.administrative_socket.recv_multipart() | 177 | msg = self.administrative_socket.recv_multipart() |
| 179 | logger.debug('ADMIN: {}'.format(msg)) | 178 | self.process_admin_message(msg) |
| 180 | # ############## | ||
| 181 | # Admin Commands | ||
| 182 | # ############## | ||
| 183 | if len(msg) > 4: | ||
| 184 | if msg[3] == DISCONNECT: | ||
| 185 | logger.info('Received DISCONNECT from administrator') | ||
| 186 | self.send_ack( | ||
| 187 | self.administrative_socket, msg[0], msg[4]) | ||
| 188 | self.on_disconnect(msg[4], msg) | ||
| 189 | elif msg[3] == 'STATUS': | ||
| 190 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | ||
| 191 | (self.get_status(),)) | ||
| 192 | elif msg[3] == ROUTER_SHOW_WORKERS: | ||
| 193 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | ||
| 194 | (self.get_workers_status(),)) | ||
| 195 | elif msg[3] == ROUTER_SHOW_SCHEDULERS: | ||
| 196 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | ||
| 197 | (self.get_schedulers_status(),)) | ||
| 198 | 179 | ||
| 199 | # TODO: Optimization: the calls to functions could be done in | 180 | # TODO: Optimization: the calls to functions could be done in |
| 200 | # another thread so they don't block the loop. synchronize | 181 | # another thread so they don't block the loop. synchronize |
| @@ -822,6 +803,32 @@ class Router(HeartbeatMixin): | |||
| 822 | func = getattr(self, "on_%s" % command.lower()) | 803 | func = getattr(self, "on_%s" % command.lower()) |
| 823 | func(sender, msgid, message) | 804 | func(sender, msgid, message) |
| 824 | 805 | ||
| 806 | def process_admin_message(self, msg): | ||
| 807 | """ | ||
| 808 | This method is called when a message comes in from the administrative | ||
| 809 | socket. | ||
| 810 | |||
| 811 | Args: | ||
| 812 | msg: The untouched admin message from zmq | ||
| 813 | """ | ||
| 814 | logger.debug('ADMIN MSG: {}'.format(msg)) | ||
| 815 | if len(msg) > 4: | ||
| 816 | if msg[3] == DISCONNECT: | ||
| 817 | logger.info('Received DISCONNECT from administrator') | ||
| 818 | self.send_ack( | ||
| 819 | self.administrative_socket, msg[0], msg[4]) | ||
| 820 | self.on_disconnect(msg[4], msg) | ||
| 821 | elif msg[3] == 'STATUS': | ||
| 822 | if msg[5] == STATUS_COMMANDS.show_managers: | ||
| 823 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | ||
| 824 | (self.get_workers_status(),)) | ||
| 825 | elif msg[5] == STATUS_COMMANDS.show_schedulers: | ||
| 826 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | ||
| 827 | (self.get_schedulers_status(),)) | ||
| 828 | else: | ||
| 829 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | ||
| 830 | (self.get_status(),)) | ||
| 831 | |||
| 825 | def _remove_worker(self, worker_id): | 832 | def _remove_worker(self, worker_id): |
| 826 | """ | 833 | """ |
| 827 | Remove worker with given id from any queues it belongs to. | 834 | Remove worker with given id from any queues it belongs to. |
| @@ -19,7 +19,7 @@ setup( | |||
| 19 | 'psutil==5.0.0', | 19 | 'psutil==5.0.0', |
| 20 | 'python-dateutil>=2.1,<3.0.0'], | 20 | 'python-dateutil>=2.1,<3.0.0'], |
| 21 | extras_require={ | 21 | extras_require={ |
| 22 | 'docs': ['Sphinx==1.5.2', ], | 22 | 'docs': ['Sphinx==1.5.3', ], |
| 23 | 'testing': [ | 23 | 'testing': [ |
| 24 | 'flake8==3.2.1', | 24 | 'flake8==3.2.1', |
| 25 | 'flake8-import-order==0.11', | 25 | 'flake8-import-order==0.11', |