diff options
| author | David Hurst | 2017-05-03 14:24:27 -0600 |
|---|---|---|
| committer | David Hurst | 2017-05-03 14:24:27 -0600 |
| commit | 30ae820f682e62e5522970832071a62837886a7d (patch) | |
| tree | ceebc81e56362ecbac51823b7ccf925a59c1ab93 | |
| parent | b7a137e52e0c5f5751dcbe351025f87df16fbd43 (diff) | |
| download | eventmq-30ae820f682e62e5522970832071a62837886a7d.tar.gz eventmq-30ae820f682e62e5522970832071a62837886a7d.zip | |
Implement Scheduler status command
| -rw-r--r-- | eventmq/constants.py | 5 | ||||
| -rw-r--r-- | eventmq/router.py | 30 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 37 | ||||
| -rw-r--r-- | eventmq/settings.py | 10 |
4 files changed, 65 insertions, 17 deletions
diff --git a/eventmq/constants.py b/eventmq/constants.py index 8cc5d7a..cd6fe8a 100644 --- a/eventmq/constants.py +++ b/eventmq/constants.py | |||
| @@ -22,9 +22,12 @@ PROTOCOL_VERSION = 'eMQP/1.0' | |||
| 22 | DISCONNECT = "DISCONNECT" | 22 | DISCONNECT = "DISCONNECT" |
| 23 | KBYE = "KBYE" | 23 | KBYE = "KBYE" |
| 24 | 24 | ||
| 25 | # ADMINISTRATIVE COMMANDS | 25 | # Router ADMINISTRATIVE COMMANDS |
| 26 | ROUTER_SHOW_WORKERS = 'ROUTER_SHOW_WORKERS' | 26 | ROUTER_SHOW_WORKERS = 'ROUTER_SHOW_WORKERS' |
| 27 | ROUTER_SHOW_SCHEDULERS = 'ROUTER_SHOW_SCHEDULERS' | 27 | ROUTER_SHOW_SCHEDULERS = 'ROUTER_SHOW_SCHEDULERS' |
| 28 | 28 | ||
| 29 | # Scheduler ADMINISTRATIVE COMMANDS | ||
| 30 | SCHEDULER_SHOW_SCHEDULED_JOBS = 'SCHEDULER_SHOW_SCHEDULED_JOBS' | ||
| 31 | |||
| 29 | # ENVIRONMENT VARIABLES | 32 | # ENVIRONMENT VARIABLES |
| 30 | ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' | 33 | ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' |
diff --git a/eventmq/router.py b/eventmq/router.py index 9971c4c..d2b442a 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -298,25 +298,25 @@ class Router(HeartbeatMixin): | |||
| 298 | queue_names = msg[0] | 298 | queue_names = msg[0] |
| 299 | client_type = msg[1] | 299 | client_type = msg[1] |
| 300 | 300 | ||
| 301 | if not queue_names: # Ideally, this matches some workers | ||
| 302 | logger.error('Recieved INFORM message with no defined ' | ||
| 303 | 'queues. Message was: {}'.format(msg)) | ||
| 304 | return | ||
| 305 | |||
| 306 | try: | ||
| 307 | queues = list(map(tuplify, json.loads(queue_names))) | ||
| 308 | except ValueError: | ||
| 309 | # this was invalid json | ||
| 310 | logger.error( | ||
| 311 | 'Received invalid queue names in INFORM. names:{} from:{} ' | ||
| 312 | 'type:{}'.format( | ||
| 313 | queue_names, sender, client_type)) | ||
| 314 | return | ||
| 315 | |||
| 316 | logger.info('Received INFORM request from {} (type: {})'.format( | 301 | logger.info('Received INFORM request from {} (type: {})'.format( |
| 317 | sender, client_type)) | 302 | sender, client_type)) |
| 318 | 303 | ||
| 319 | if client_type == CLIENT_TYPE.worker: | 304 | if client_type == CLIENT_TYPE.worker: |
| 305 | if not queue_names: # Ideally, this matches some workers | ||
| 306 | logger.error('Recieved INFORM message with no defined ' | ||
| 307 | 'queues. Message was: {}'.format(msg)) | ||
| 308 | return | ||
| 309 | |||
| 310 | try: | ||
| 311 | queues = list(map(tuplify, json.loads(queue_names))) | ||
| 312 | except ValueError: | ||
| 313 | # this was invalid json | ||
| 314 | logger.error( | ||
| 315 | 'Received invalid queue names in INFORM. names:{} from:{} ' | ||
| 316 | 'type:{}'.format( | ||
| 317 | queue_names, sender, client_type)) | ||
| 318 | return | ||
| 319 | |||
| 320 | self.add_worker(sender, queues) | 320 | self.add_worker(sender, queues) |
| 321 | self.send_ack(self.backend, sender, msgid) | 321 | self.send_ack(self.backend, sender, msgid) |
| 322 | elif client_type == CLIENT_TYPE.scheduler: | 322 | elif client_type == CLIENT_TYPE.scheduler: |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 3abe58a..09f8db2 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -31,11 +31,14 @@ from . import constants | |||
| 31 | from .client.messages import send_request | 31 | from .client.messages import send_request |
| 32 | from .constants import KBYE | 32 | from .constants import KBYE |
| 33 | from .poller import Poller, POLLIN | 33 | from .poller import Poller, POLLIN |
| 34 | from .receiver import Receiver | ||
| 34 | from .sender import Sender | 35 | from .sender import Sender |
| 35 | from .settings import conf, reload_settings | 36 | from .settings import conf, reload_settings |
| 36 | from .utils.classes import EMQPService, HeartbeatMixin | 37 | from .utils.classes import EMQPService, HeartbeatMixin |
| 37 | from .utils.devices import generate_device_name | 38 | from .utils.devices import generate_device_name |
| 39 | from .utils.jsonencoders import EventMQEncoder | ||
| 38 | from .utils.messages import send_emqp_message as sendmsg | 40 | from .utils.messages import send_emqp_message as sendmsg |
| 41 | from .utils.messages import send_emqp_router_message as send_router_msg | ||
| 39 | from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp | 42 | from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp |
| 40 | 43 | ||
| 41 | 44 | ||
| @@ -75,6 +78,12 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 75 | self.frontend = Sender() | 78 | self.frontend = Sender() |
| 76 | self._redis_server = None | 79 | self._redis_server = None |
| 77 | 80 | ||
| 81 | admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR | ||
| 82 | |||
| 83 | #: Port for administrative commands | ||
| 84 | self.administrative_socket = Receiver() | ||
| 85 | self.administrative_socket.listen(admin_addr) | ||
| 86 | |||
| 78 | # contains dict of 4-item lists representing cron jobs key of this | 87 | # contains dict of 4-item lists representing cron jobs key of this |
| 79 | # dictionary is a hash of arguments, path, and callable from the | 88 | # dictionary is a hash of arguments, path, and callable from the |
| 80 | # message of the SCHEDULE command received | 89 | # message of the SCHEDULE command received |
| @@ -100,6 +109,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 100 | self.poller = Poller() | 109 | self.poller = Poller() |
| 101 | self.load_jobs() | 110 | self.load_jobs() |
| 102 | 111 | ||
| 112 | self.poller.register(self.administrative_socket, POLLIN) | ||
| 113 | |||
| 103 | self._setup() | 114 | self._setup() |
| 104 | 115 | ||
| 105 | def load_jobs(self): | 116 | def load_jobs(self): |
| @@ -135,6 +146,18 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 135 | m_now = monotonic() | 146 | m_now = monotonic() |
| 136 | events = self.poller.poll() | 147 | events = self.poller.poll() |
| 137 | 148 | ||
| 149 | if events.get(self.administrative_socket) == POLLIN: | ||
| 150 | msg = self.administrative_socket.recv_multipart() | ||
| 151 | # ############## | ||
| 152 | # Admin Commands | ||
| 153 | # ############## | ||
| 154 | if len(msg) > 4: | ||
| 155 | if msg[3] == constants.SCHEDULER_SHOW_SCHEDULED_JOBS: | ||
| 156 | send_router_msg(self.administrative_socket, | ||
| 157 | msg[0], | ||
| 158 | 'REPLY', | ||
| 159 | (self.get_scheduled_jobs(),)) | ||
| 160 | |||
| 138 | if events.get(self.frontend) == POLLIN: | 161 | if events.get(self.frontend) == POLLIN: |
| 139 | msg = self.frontend.recv_multipart() | 162 | msg = self.frontend.recv_multipart() |
| 140 | self.process_message(msg) | 163 | self.process_message(msg) |
| @@ -417,11 +440,25 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 417 | run_count = int(header.split(':')[1]) | 440 | run_count = int(header.split(':')[1]) |
| 418 | return run_count | 441 | return run_count |
| 419 | 442 | ||
| 443 | def on_status(self, msgid, message): | ||
| 444 | |||
| 445 | sendmsg(self.frontend, message[0], 'REPLY', (self.interval_jobs, )) | ||
| 446 | |||
| 420 | def on_heartbeat(self, msgid, message): | 447 | def on_heartbeat(self, msgid, message): |
| 421 | """ | 448 | """ |
| 422 | Noop command. The logic for heartbeating is in the event loop. | 449 | Noop command. The logic for heartbeating is in the event loop. |
| 423 | """ | 450 | """ |
| 424 | 451 | ||
| 452 | def get_scheduled_jobs(self): | ||
| 453 | |||
| 454 | return json.dumps( | ||
| 455 | { | ||
| 456 | 'interval_jobs': self.interval_jobs, | ||
| 457 | 'cron_jobs': self.cron_jobs, | ||
| 458 | 'name': self.name, | ||
| 459 | }, | ||
| 460 | cls=EventMQEncoder) | ||
| 461 | |||
| 425 | @classmethod | 462 | @classmethod |
| 426 | def schedule_hash(cls, message): | 463 | def schedule_hash(cls, message): |
| 427 | """ | 464 | """ |
diff --git a/eventmq/settings.py b/eventmq/settings.py index 18ec306..c07f275 100644 --- a/eventmq/settings.py +++ b/eventmq/settings.py | |||
| @@ -270,7 +270,15 @@ _CONFIG_DEFS = { | |||
| 270 | 'long-arg': '--redis-password', | 270 | 'long-arg': '--redis-password', |
| 271 | 'type': str, | 271 | 'type': str, |
| 272 | 'help': 'Password to redis' | 272 | 'help': 'Password to redis' |
| 273 | } | 273 | }, |
| 274 | 'SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR': { | ||
| 275 | 'default': 'tcp://127.0.0.1:47294', | ||
| 276 | 'long-arg': '--scheduler-administrative-listen-addr', | ||
| 277 | 'short-arg': '-S', | ||
| 278 | 'type': str, | ||
| 279 | 'help': 'Address to listen for administrative commands for ' | ||
| 280 | 'schedulers', | ||
| 281 | }, | ||
| 274 | }, | 282 | }, |
| 275 | 'publisher': { | 283 | 'publisher': { |
| 276 | 'FRONTEND_LISTEN_ADDR': { | 284 | 'FRONTEND_LISTEN_ADDR': { |