aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Hurst2017-05-03 14:24:27 -0600
committerDavid Hurst2017-05-03 14:24:27 -0600
commit30ae820f682e62e5522970832071a62837886a7d (patch)
treeceebc81e56362ecbac51823b7ccf925a59c1ab93
parentb7a137e52e0c5f5751dcbe351025f87df16fbd43 (diff)
downloadeventmq-30ae820f682e62e5522970832071a62837886a7d.tar.gz
eventmq-30ae820f682e62e5522970832071a62837886a7d.zip
Implement Scheduler status command
-rw-r--r--eventmq/constants.py5
-rw-r--r--eventmq/router.py30
-rw-r--r--eventmq/scheduler.py37
-rw-r--r--eventmq/settings.py10
4 files changed, 65 insertions, 17 deletions
diff --git a/eventmq/constants.py b/eventmq/constants.py
index 8cc5d7a..cd6fe8a 100644
--- a/eventmq/constants.py
+++ b/eventmq/constants.py
@@ -22,9 +22,12 @@ PROTOCOL_VERSION = 'eMQP/1.0'
22DISCONNECT = "DISCONNECT" 22DISCONNECT = "DISCONNECT"
23KBYE = "KBYE" 23KBYE = "KBYE"
24 24
25# ADMINISTRATIVE COMMANDS 25# Router ADMINISTRATIVE COMMANDS
26ROUTER_SHOW_WORKERS = 'ROUTER_SHOW_WORKERS' 26ROUTER_SHOW_WORKERS = 'ROUTER_SHOW_WORKERS'
27ROUTER_SHOW_SCHEDULERS = 'ROUTER_SHOW_SCHEDULERS' 27ROUTER_SHOW_SCHEDULERS = 'ROUTER_SHOW_SCHEDULERS'
28 28
29# Scheduler ADMINISTRATIVE COMMANDS
30SCHEDULER_SHOW_SCHEDULED_JOBS = 'SCHEDULER_SHOW_SCHEDULED_JOBS'
31
29# ENVIRONMENT VARIABLES 32# ENVIRONMENT VARIABLES
30ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' 33ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR'
diff --git a/eventmq/router.py b/eventmq/router.py
index 9971c4c..d2b442a 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -298,25 +298,25 @@ class Router(HeartbeatMixin):
298 queue_names = msg[0] 298 queue_names = msg[0]
299 client_type = msg[1] 299 client_type = msg[1]
300 300
301 if not queue_names: # Ideally, this matches some workers
302 logger.error('Recieved INFORM message with no defined '
303 'queues. Message was: {}'.format(msg))
304 return
305
306 try:
307 queues = list(map(tuplify, json.loads(queue_names)))
308 except ValueError:
309 # this was invalid json
310 logger.error(
311 'Received invalid queue names in INFORM. names:{} from:{} '
312 'type:{}'.format(
313 queue_names, sender, client_type))
314 return
315
316 logger.info('Received INFORM request from {} (type: {})'.format( 301 logger.info('Received INFORM request from {} (type: {})'.format(
317 sender, client_type)) 302 sender, client_type))
318 303
319 if client_type == CLIENT_TYPE.worker: 304 if client_type == CLIENT_TYPE.worker:
305 if not queue_names: # Ideally, this matches some workers
306 logger.error('Recieved INFORM message with no defined '
307 'queues. Message was: {}'.format(msg))
308 return
309
310 try:
311 queues = list(map(tuplify, json.loads(queue_names)))
312 except ValueError:
313 # this was invalid json
314 logger.error(
315 'Received invalid queue names in INFORM. names:{} from:{} '
316 'type:{}'.format(
317 queue_names, sender, client_type))
318 return
319
320 self.add_worker(sender, queues) 320 self.add_worker(sender, queues)
321 self.send_ack(self.backend, sender, msgid) 321 self.send_ack(self.backend, sender, msgid)
322 elif client_type == CLIENT_TYPE.scheduler: 322 elif client_type == CLIENT_TYPE.scheduler:
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 3abe58a..09f8db2 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -31,11 +31,14 @@ from . import constants
31from .client.messages import send_request 31from .client.messages import send_request
32from .constants import KBYE 32from .constants import KBYE
33from .poller import Poller, POLLIN 33from .poller import Poller, POLLIN
34from .receiver import Receiver
34from .sender import Sender 35from .sender import Sender
35from .settings import conf, reload_settings 36from .settings import conf, reload_settings
36from .utils.classes import EMQPService, HeartbeatMixin 37from .utils.classes import EMQPService, HeartbeatMixin
37from .utils.devices import generate_device_name 38from .utils.devices import generate_device_name
39from .utils.jsonencoders import EventMQEncoder
38from .utils.messages import send_emqp_message as sendmsg 40from .utils.messages import send_emqp_message as sendmsg
41from .utils.messages import send_emqp_router_message as send_router_msg
39from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp 42from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp
40 43
41 44
@@ -75,6 +78,12 @@ class Scheduler(HeartbeatMixin, EMQPService):
75 self.frontend = Sender() 78 self.frontend = Sender()
76 self._redis_server = None 79 self._redis_server = None
77 80
81 admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR
82
83 #: Port for administrative commands
84 self.administrative_socket = Receiver()
85 self.administrative_socket.listen(admin_addr)
86
78 # contains dict of 4-item lists representing cron jobs key of this 87 # contains dict of 4-item lists representing cron jobs key of this
79 # dictionary is a hash of arguments, path, and callable from the 88 # dictionary is a hash of arguments, path, and callable from the
80 # message of the SCHEDULE command received 89 # message of the SCHEDULE command received
@@ -100,6 +109,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
100 self.poller = Poller() 109 self.poller = Poller()
101 self.load_jobs() 110 self.load_jobs()
102 111
112 self.poller.register(self.administrative_socket, POLLIN)
113
103 self._setup() 114 self._setup()
104 115
105 def load_jobs(self): 116 def load_jobs(self):
@@ -135,6 +146,18 @@ class Scheduler(HeartbeatMixin, EMQPService):
135 m_now = monotonic() 146 m_now = monotonic()
136 events = self.poller.poll() 147 events = self.poller.poll()
137 148
149 if events.get(self.administrative_socket) == POLLIN:
150 msg = self.administrative_socket.recv_multipart()
151 # ##############
152 # Admin Commands
153 # ##############
154 if len(msg) > 4:
155 if msg[3] == constants.SCHEDULER_SHOW_SCHEDULED_JOBS:
156 send_router_msg(self.administrative_socket,
157 msg[0],
158 'REPLY',
159 (self.get_scheduled_jobs(),))
160
138 if events.get(self.frontend) == POLLIN: 161 if events.get(self.frontend) == POLLIN:
139 msg = self.frontend.recv_multipart() 162 msg = self.frontend.recv_multipart()
140 self.process_message(msg) 163 self.process_message(msg)
@@ -417,11 +440,25 @@ class Scheduler(HeartbeatMixin, EMQPService):
417 run_count = int(header.split(':')[1]) 440 run_count = int(header.split(':')[1])
418 return run_count 441 return run_count
419 442
443 def on_status(self, msgid, message):
444
445 sendmsg(self.frontend, message[0], 'REPLY', (self.interval_jobs, ))
446
420 def on_heartbeat(self, msgid, message): 447 def on_heartbeat(self, msgid, message):
421 """ 448 """
422 Noop command. The logic for heartbeating is in the event loop. 449 Noop command. The logic for heartbeating is in the event loop.
423 """ 450 """
424 451
452 def get_scheduled_jobs(self):
453
454 return json.dumps(
455 {
456 'interval_jobs': self.interval_jobs,
457 'cron_jobs': self.cron_jobs,
458 'name': self.name,
459 },
460 cls=EventMQEncoder)
461
425 @classmethod 462 @classmethod
426 def schedule_hash(cls, message): 463 def schedule_hash(cls, message):
427 """ 464 """
diff --git a/eventmq/settings.py b/eventmq/settings.py
index 18ec306..c07f275 100644
--- a/eventmq/settings.py
+++ b/eventmq/settings.py
@@ -270,7 +270,15 @@ _CONFIG_DEFS = {
270 'long-arg': '--redis-password', 270 'long-arg': '--redis-password',
271 'type': str, 271 'type': str,
272 'help': 'Password to redis' 272 'help': 'Password to redis'
273 } 273 },
274 'SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR': {
275 'default': 'tcp://127.0.0.1:47294',
276 'long-arg': '--scheduler-administrative-listen-addr',
277 'short-arg': '-S',
278 'type': str,
279 'help': 'Address to listen for administrative commands for '
280 'schedulers',
281 },
274 }, 282 },
275 'publisher': { 283 'publisher': {
276 'FRONTEND_LISTEN_ADDR': { 284 'FRONTEND_LISTEN_ADDR': {