aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/emq-cli4
-rw-r--r--eventmq/client/jobs.py5
-rw-r--r--eventmq/client/messages.py13
-rw-r--r--eventmq/constants.py5
-rw-r--r--eventmq/router.py38
-rw-r--r--eventmq/scheduler.py49
-rw-r--r--eventmq/settings.py10
-rw-r--r--eventmq/tests/test_client_jobs.py7
-rw-r--r--eventmq/tests/test_client_messages.py7
-rw-r--r--eventmq/tests/test_router.py16
-rw-r--r--eventmq/tests/test_scheduler.py77
-rw-r--r--eventmq/utils/jsonencoders.py9
-rw-r--r--eventmq/utils/messages.py21
13 files changed, 217 insertions, 44 deletions
diff --git a/bin/emq-cli b/bin/emq-cli
index 333f0f6..d781984 100755
--- a/bin/emq-cli
+++ b/bin/emq-cli
@@ -95,6 +95,10 @@ class Shell(cmd.Cmd):
95 self.send_message('STATUS', ('',)) 95 self.send_message('STATUS', ('',))
96 pprint(self.recv_reply()) 96 pprint(self.recv_reply())
97 97
98 def do_show_scheduled_jobs(self, line):
99 self.send_message(('STATUS', (STATUS_COMMANDS.show_scheduled_jobs,)))
100 pprint(self.recv_reply())
101
98 def do_show_managers(self, line): 102 def do_show_managers(self, line):
99 """ 103 """
100 Request the status of the connected job managers and queues 104 Request the status of the connected job managers and queues
diff --git a/eventmq/client/jobs.py b/eventmq/client/jobs.py
index b61c65b..b06370f 100644
--- a/eventmq/client/jobs.py
+++ b/eventmq/client/jobs.py
@@ -86,8 +86,11 @@ class Job(object):
86 socket = Sender() 86 socket = Sender()
87 socket.connect(addr=self.broker_addr) 87 socket.connect(addr=self.broker_addr)
88 88
89 msgid = kwargs.get('msgid', None)
90
89 msgid = messages.defer_job( 91 msgid = messages.defer_job(
90 socket, f, args=args, kwargs=kwargs, queue=self.queue) 92 socket, f, args=args, kwargs=kwargs, queue=self.queue,
93 msgid=msgid)
91 94
92 return msgid 95 return msgid
93 else: 96 else:
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
index 45b323d..7188407 100644
--- a/eventmq/client/messages.py
+++ b/eventmq/client/messages.py
@@ -111,7 +111,8 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None,
111def defer_job( 111def defer_job(
112 socket, func, args=(), kwargs=None, class_args=(), 112 socket, func, args=(), kwargs=None, class_args=(),
113 class_kwargs=None, reply_requested=False, guarantee=False, 113 class_kwargs=None, reply_requested=False, guarantee=False,
114 retry_count=0, timeout=0, queue=conf.DEFAULT_QUEUE_NAME): 114 retry_count=0, timeout=0, queue=conf.DEFAULT_QUEUE_NAME,
115 msgid=None):
115 """ 116 """
116 Used to send a job to a worker to execute via `socket`. 117 Used to send a job to a worker to execute via `socket`.
117 118
@@ -141,6 +142,7 @@ def defer_job(
141 queue (str): Name of queue to use when executing the job. If this value 142 queue (str): Name of queue to use when executing the job. If this value
142 evaluates to False, the default is used. Default: is configured 143 evaluates to False, the default is used. Default: is configured
143 default queue name 144 default queue name
145 msgid: (str): An optional msgid to use instead of a generated one
144 Raises: 146 Raises:
145 TypeError: When one or more parameters are not JSON serializable. 147 TypeError: When one or more parameters are not JSON serializable.
146 Returns: 148 Returns:
@@ -191,13 +193,14 @@ def defer_job(
191 guarantee=guarantee, 193 guarantee=guarantee,
192 retry_count=retry_count, 194 retry_count=retry_count,
193 timeout=timeout, 195 timeout=timeout,
194 queue=queue) 196 queue=queue,
197 msgid=msgid)
195 198
196 return msgid 199 return msgid
197 200
198 201
199def send_request(socket, message, reply_requested=False, guarantee=False, 202def send_request(socket, message, reply_requested=False, guarantee=False,
200 retry_count=0, timeout=0, queue=None): 203 retry_count=0, timeout=0, queue=None, msgid=None):
201 """ 204 """
202 Send a REQUEST command. 205 Send a REQUEST command.
203 206
@@ -238,6 +241,7 @@ def send_request(socket, message, reply_requested=False, guarantee=False,
238 default: 0 which means infinite timeout 241 default: 0 which means infinite timeout
239 queue (str): Name of queue to use when executing the job. Default: is 242 queue (str): Name of queue to use when executing the job. Default: is
240 configured default queue name 243 configured default queue name
244 msgid: (str): An optional msgid to use instead of a generated one
241 245
242 Returns: 246 Returns:
243 str: ID of the message 247 str: ID of the message
@@ -259,7 +263,8 @@ def send_request(socket, message, reply_requested=False, guarantee=False,
259 msgid = send_emqp_message(socket, 'REQUEST', 263 msgid = send_emqp_message(socket, 'REQUEST',
260 (queue or conf.DEFAULT_QUEUE_NAME, 264 (queue or conf.DEFAULT_QUEUE_NAME,
261 ",".join(headers), 265 ",".join(headers),
262 serialize(message))) 266 serialize(message)),
267 msgid=msgid)
263 268
264 return msgid 269 return msgid
265 270
diff --git a/eventmq/constants.py b/eventmq/constants.py
index b668f1e..cf2d5d0 100644
--- a/eventmq/constants.py
+++ b/eventmq/constants.py
@@ -27,11 +27,14 @@ class STATUS_COMMANDS(object):
27 """ 27 """
28 Defines the STATUS sub commands 28 Defines the STATUS sub commands
29 """ 29 """
30 #: Router subommand to show connected job managbers 30 #: Router subcommand to show connected job managbers
31 show_managers = 'show_managers' 31 show_managers = 'show_managers'
32 #: Router subcommand to show connected schedulers 32 #: Router subcommand to show connected schedulers
33 show_schedulers = 'show_schedulers' 33 show_schedulers = 'show_schedulers'
34 34
35 #: Scheduler subcommand to show scheduled jobs
36 show_scheduled_jobs = 'show_scheduled_jobs'
37
35 38
36# ENVIRONMENT VARIABLES 39# ENVIRONMENT VARIABLES
37ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' 40ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR'
diff --git a/eventmq/router.py b/eventmq/router.py
index eceb080..89f92e8 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -24,7 +24,8 @@ import signal
24 24
25from . import constants, exceptions, poller, receiver 25from . import constants, exceptions, poller, receiver
26from .constants import ( 26from .constants import (
27 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, STATUS, STATUS_COMMANDS 27 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, STATUS, STATUS_CMD,
28 STATUS_COMMANDS
28) 29)
29from .settings import conf, reload_settings 30from .settings import conf, reload_settings
30from .utils import tuplify 31from .utils import tuplify
@@ -280,30 +281,33 @@ class Router(HeartbeatMixin):
280 queue_names = msg[0] 281 queue_names = msg[0]
281 client_type = msg[1] 282 client_type = msg[1]
282 283
283 if not queue_names: # Ideally, this matches some workers
284 logger.error('Recieved INFORM message with no defined '
285 'queues. Message was: {}'.format(msg))
286 return
287
288 try:
289 queues = list(map(tuplify, json.loads(queue_names)))
290 except ValueError:
291 # this was invalid json
292 logger.error(
293 'Received invalid queue names in INFORM. names:{} from:{} '
294 'type:{}'.format(
295 queue_names, sender, client_type))
296 return
297
298 logger.info('Received INFORM request from {} (type: {})'.format( 284 logger.info('Received INFORM request from {} (type: {})'.format(
299 sender, client_type)) 285 sender, client_type))
300 286
301 if client_type == CLIENT_TYPE.worker: 287 if client_type == CLIENT_TYPE.worker:
288 if not queue_names: # Ideally, this matches some workers
289 logger.error('Recieved INFORM message with no defined '
290 'queues. Message was: {}'.format(msg))
291 return
292
293 try:
294 queues = list(map(tuplify, json.loads(queue_names)))
295 except ValueError:
296 # this was invalid json
297 logger.error(
298 'Received invalid queue names in INFORM. names:{} from:{} '
299 'type:{}'.format(
300 queue_names, sender, client_type))
301 return
302
302 self.add_worker(sender, queues) 303 self.add_worker(sender, queues)
303 self.send_ack(self.backend, sender, msgid) 304 self.send_ack(self.backend, sender, msgid)
304 elif client_type == CLIENT_TYPE.scheduler: 305 elif client_type == CLIENT_TYPE.scheduler:
305 self.add_scheduler(sender) 306 self.add_scheduler(sender)
306 self.send_ack(self.frontend, sender, msgid) 307 self.send_ack(self.frontend, sender, msgid)
308 else:
309 logger.error('Received invalid client type on INFORM ({}), '
310 'ignoring'.format(client_type))
307 311
308 def on_reply(self, sender, msgid, msg): 312 def on_reply(self, sender, msgid, msg):
309 """ 313 """
@@ -819,7 +823,7 @@ class Router(HeartbeatMixin):
819 self.send_ack( 823 self.send_ack(
820 self.administrative_socket, msg[0], msg[4]) 824 self.administrative_socket, msg[0], msg[4])
821 self.on_disconnect(msg[4], msg) 825 self.on_disconnect(msg[4], msg)
822 elif msg[3] == 'STATUS': 826 elif msg[3] == STATUS_CMD:
823 if msg[5] == STATUS_COMMANDS.show_managers: 827 if msg[5] == STATUS_COMMANDS.show_managers:
824 sendmsg(self.administrative_socket, msg[0], 'REPLY', 828 sendmsg(self.administrative_socket, msg[0], 'REPLY',
825 (self.get_workers_status(),)) 829 (self.get_workers_status(),))
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 799ae20..db72d82 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -30,13 +30,16 @@ from six import next
30 30
31from . import constants 31from . import constants
32from .client.messages import send_request 32from .client.messages import send_request
33from .constants import KBYE 33from .constants import KBYE, STATUS_CMD, STATUS_COMMANDS
34from .poller import Poller, POLLIN 34from .poller import Poller, POLLIN
35from .receiver import Receiver
35from .sender import Sender 36from .sender import Sender
36from .settings import conf, reload_settings 37from .settings import conf, reload_settings
37from .utils.classes import EMQPService, HeartbeatMixin 38from .utils.classes import EMQPService, HeartbeatMixin
38from .utils.devices import generate_device_name 39from .utils.devices import generate_device_name
40from .utils.jsonencoders import EventMQEncoder
39from .utils.messages import send_emqp_message as sendmsg 41from .utils.messages import send_emqp_message as sendmsg
42from .utils.messages import send_emqp_router_message as send_router_msg
40from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp 43from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp
41 44
42 45
@@ -80,6 +83,12 @@ class Scheduler(HeartbeatMixin, EMQPService):
80 self.frontend = Sender(conf.NAME) 83 self.frontend = Sender(conf.NAME)
81 self._redis_server = None 84 self._redis_server = None
82 85
86 admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR
87
88 #: Port for administrative commands
89 self.administrative_socket = Receiver()
90 self.administrative_socket.listen(admin_addr)
91
83 # contains dict of 4-item lists representing cron jobs key of this 92 # contains dict of 4-item lists representing cron jobs key of this
84 # dictionary is a hash of arguments, path, and callable from the 93 # dictionary is a hash of arguments, path, and callable from the
85 # message of the SCHEDULE command received 94 # message of the SCHEDULE command received
@@ -105,6 +114,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
105 self.poller = Poller() 114 self.poller = Poller()
106 self.load_jobs() 115 self.load_jobs()
107 116
117 self.poller.register(self.administrative_socket, POLLIN)
118
108 self._setup() 119 self._setup()
109 120
110 def load_jobs(self): 121 def load_jobs(self):
@@ -143,6 +154,19 @@ class Scheduler(HeartbeatMixin, EMQPService):
143 m_now = monotonic() 154 m_now = monotonic()
144 events = self.poller.poll() 155 events = self.poller.poll()
145 156
157 if events.get(self.administrative_socket) == POLLIN:
158 msg = self.administrative_socket.recv_multipart()
159 # ##############
160 # Admin Commands
161 # ##############
162 if len(msg) > 4:
163 if msg[3] == STATUS_CMD:
164 if msg[5] == STATUS_COMMANDS.show_scheduled_jobs:
165 send_router_msg(self.administrative_socket,
166 msg[0],
167 'REPLY',
168 (self.get_scheduled_jobs(),))
169
146 if events.get(self.frontend) == POLLIN: 170 if events.get(self.frontend) == POLLIN:
147 msg = self.frontend.recv_multipart() 171 msg = self.frontend.recv_multipart()
148 self.process_message(msg) 172 self.process_message(msg)
@@ -378,6 +402,9 @@ class Scheduler(HeartbeatMixin, EMQPService):
378 402
379 schedule_hash = self.schedule_hash(message) 403 schedule_hash = self.schedule_hash(message)
380 404
405 # If interval is negative, cron MUST be populated
406 interval_job = interval >= 0
407
381 # Notify if this is updating existing, or new 408 # Notify if this is updating existing, or new
382 if (schedule_hash in self.cron_jobs or 409 if (schedule_hash in self.cron_jobs or
383 schedule_hash in self.interval_jobs): 410 schedule_hash in self.interval_jobs):
@@ -387,8 +414,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
387 logger.debug('Creating a new scheduled job with %s' 414 logger.debug('Creating a new scheduled job with %s'
388 % schedule_hash) 415 % schedule_hash)
389 416
390 # If interval is negative, cron MUST be populated 417 if interval_job:
391 if interval >= 0:
392 inter_iter = IntervalIter(monotonic(), interval) 418 inter_iter = IntervalIter(monotonic(), interval)
393 419
394 self.interval_jobs[schedule_hash] = [ 420 self.interval_jobs[schedule_hash] = [
@@ -436,7 +462,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
436 if run_count > 0 or run_count == INFINITE_RUN_COUNT: 462 if run_count > 0 or run_count == INFINITE_RUN_COUNT:
437 # Don't allow run_count to decrement below 0 463 # Don't allow run_count to decrement below 0
438 if run_count > 0: 464 if run_count > 0:
439 self.interval_jobs[schedule_hash][4] -= 1 465 if interval_job:
466 self.interval_jobs[schedule_hash][4] -= 1
440 self.send_request(message[3], queue=queue) 467 self.send_request(message[3], queue=queue)
441 468
442 def get_run_count_from_headers(self, headers): 469 def get_run_count_from_headers(self, headers):
@@ -446,11 +473,25 @@ class Scheduler(HeartbeatMixin, EMQPService):
446 run_count = int(header.split(':')[1]) 473 run_count = int(header.split(':')[1])
447 return run_count 474 return run_count
448 475
476 def on_status(self, msgid, message):
477
478 sendmsg(self.frontend, message[0], 'REPLY', (self.interval_jobs, ))
479
449 def on_heartbeat(self, msgid, message): 480 def on_heartbeat(self, msgid, message):
450 """ 481 """
451 Noop command. The logic for heartbeating is in the event loop. 482 Noop command. The logic for heartbeating is in the event loop.
452 """ 483 """
453 484
485 def get_scheduled_jobs(self):
486
487 return json.dumps(
488 {
489 'interval_jobs': self.interval_jobs,
490 'cron_jobs': self.cron_jobs,
491 'name': self.name,
492 },
493 cls=EventMQEncoder)
494
454 @classmethod 495 @classmethod
455 def schedule_hash(cls, message): 496 def schedule_hash(cls, message):
456 """ 497 """
diff --git a/eventmq/settings.py b/eventmq/settings.py
index a25e953..826b3a1 100644
--- a/eventmq/settings.py
+++ b/eventmq/settings.py
@@ -270,7 +270,15 @@ _CONFIG_DEFS = {
270 'long-arg': '--redis-password', 270 'long-arg': '--redis-password',
271 'type': str, 271 'type': str,
272 'help': 'Password to redis' 272 'help': 'Password to redis'
273 } 273 },
274 'ADMINISTRATIVE_LISTEN_ADDR': {
275 'default': 'tcp://127.0.0.1:47294',
276 'long-arg': '--administrative-listen-addr',
277 'short-arg': '-S',
278 'type': str,
279 'help': 'Address to listen for administrative commands for '
280 'schedulers',
281 },
274 }, 282 },
275 'publisher': { 283 'publisher': {
276 'FRONTEND_LISTEN_ADDR': { 284 'FRONTEND_LISTEN_ADDR': {
diff --git a/eventmq/tests/test_client_jobs.py b/eventmq/tests/test_client_jobs.py
index c658187..c66875a 100644
--- a/eventmq/tests/test_client_jobs.py
+++ b/eventmq/tests/test_client_jobs.py
@@ -34,13 +34,13 @@ class TestCase(unittest.TestCase):
34 self.assertEqual(d(), 12) 34 self.assertEqual(d(), 12)
35 self.assertEqual(d.delay(), '43e14eaa-2034-4c84-8fe7-5577c70b6a7c') 35 self.assertEqual(d.delay(), '43e14eaa-2034-4c84-8fe7-5577c70b6a7c')
36 defer_job_mock.assert_called_with(Sender_mock(), test_func, args=(), 36 defer_job_mock.assert_called_with(Sender_mock(), test_func, args=(),
37 kwargs={}, queue=None) 37 msgid=None, kwargs={}, queue=None)
38 38
39 d = jobs.job(test_func, broker_addr=self.BROKER_ADDR, queue='mojo') 39 d = jobs.job(test_func, broker_addr=self.BROKER_ADDR, queue='mojo')
40 d.delay(1, 2, three=3) 40 d.delay(1, 2, three=3)
41 defer_job_mock.assert_called_with(Sender_mock(), test_func, 41 defer_job_mock.assert_called_with(Sender_mock(), test_func,
42 args=(1, 2), kwargs={'three': 3}, 42 args=(1, 2), kwargs={'three': 3},
43 queue='mojo') 43 msgid=None, queue='mojo')
44 44
45 @mock.patch('eventmq.client.messages.defer_job') 45 @mock.patch('eventmq.client.messages.defer_job')
46 @mock.patch('eventmq.client.jobs.Sender') 46 @mock.patch('eventmq.client.jobs.Sender')
@@ -85,7 +85,8 @@ class TestCase(unittest.TestCase):
85 sched_mock.assert_called_with( 85 sched_mock.assert_called_with(
86 Sender_mock(), test_func, args=(), class_args=(), 86 Sender_mock(), test_func, args=(), class_args=(),
87 class_kwargs=None, cron=None, headers=('guarantee',), 87 class_kwargs=None, cron=None, headers=('guarantee',),
88 interval_secs=None, kwargs=None, queue='default', unschedule=True) 88 interval_secs=None, kwargs=None, queue='default',
89 unschedule=True)
89 90
90 91
91def test_func(): 92def test_func():
diff --git a/eventmq/tests/test_client_messages.py b/eventmq/tests/test_client_messages.py
index 68b847e..8939e9b 100644
--- a/eventmq/tests/test_client_messages.py
+++ b/eventmq/tests/test_client_messages.py
@@ -86,6 +86,7 @@ class TestCase(unittest.TestCase):
86 guarantee=False, 86 guarantee=False,
87 retry_count=3, 87 retry_count=3,
88 timeout=0, 88 timeout=0,
89 msgid=None,
89 queue='test_queue') 90 queue='test_queue')
90 91
91 with LogCapture() as log_checker: 92 with LogCapture() as log_checker:
@@ -259,7 +260,8 @@ class TestCase(unittest.TestCase):
259 socket, 'REQUEST', 260 socket, 'REQUEST',
260 ('mozo', 261 ('mozo',
261 'reply-requested,retry-count:2', 262 'reply-requested,retry-count:2',
262 messages.serialize(msg))) 263 messages.serialize(msg)),
264 msgid=None)
263 265
264 @mock.patch('eventmq.client.messages.send_emqp_message') 266 @mock.patch('eventmq.client.messages.send_emqp_message')
265 def test_send_request_all_headers(self, snd_empq_msg_mock): 267 def test_send_request_all_headers(self, snd_empq_msg_mock):
@@ -282,7 +284,8 @@ class TestCase(unittest.TestCase):
282 socket, 'REQUEST', 284 socket, 'REQUEST',
283 ('default', 285 ('default',
284 'reply-requested,guarantee,retry-count:2,timeout:3', 286 'reply-requested,guarantee,retry-count:2,timeout:3',
285 messages.serialize(msg))) 287 messages.serialize(msg)),
288 msgid=None)
286 289
287 @mock.patch('eventmq.client.messages.send_emqp_message') 290 @mock.patch('eventmq.client.messages.send_emqp_message')
288 def test_send_schedule_request(self, snd_empq_msg_mock): 291 def test_send_schedule_request(self, snd_empq_msg_mock):
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index 0986f92..a737f89 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -75,9 +75,14 @@ class TestCase(unittest.TestCase):
75 75
76 log_checker.check( 76 log_checker.check(
77 ('eventmq.router', 77 ('eventmq.router',
78 'INFO',
79 'Received INFORM request from {} (type: worker)'.format(
80 sender_id)),
81
82 ('eventmq.router',
78 'ERROR', 83 'ERROR',
79 'Recieved INFORM message with no defined queues. Message ' 84 "Recieved INFORM message with no defined queues. Message "
80 'was: {}'.format(msg)) 85 "was: {}".format(msg)),
81 ) 86 )
82 87
83 def test_on_inform_invalid_queues(self): 88 def test_on_inform_invalid_queues(self):
@@ -93,9 +98,14 @@ class TestCase(unittest.TestCase):
93 98
94 log_checker.check( 99 log_checker.check(
95 ('eventmq.router', 100 ('eventmq.router',
101 'INFO',
102 'Received INFORM request from {} (type: worker)'.format(
103 sender_id)),
104
105 ('eventmq.router',
96 'ERROR', 106 'ERROR',
97 'Received invalid queue names in INFORM. names:{} from:{} ' 107 'Received invalid queue names in INFORM. names:{} from:{} '
98 'type:{}'.format(msg[0], sender_id, msg[1])) 108 'type:{}'.format(msg[0], sender_id, msg[1])),
99 ) 109 )
100 110
101 # @mock.patch('eventmq.router.Router.prioritize_queue_list') 111 # @mock.patch('eventmq.router.Router.prioritize_queue_list')
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py
index dc96b09..82b459d 100644
--- a/eventmq/tests/test_scheduler.py
+++ b/eventmq/tests/test_scheduler.py
@@ -18,7 +18,7 @@ import unittest
18 18
19import mock 19import mock
20 20
21from .. import constants, scheduler 21from .. import constants, scheduler, utils
22 22
23 23
24class TestCase(unittest.TestCase): 24class TestCase(unittest.TestCase):
@@ -70,6 +70,81 @@ class TestCase(unittest.TestCase):
70 h2 = scheduler.Scheduler.schedule_hash(msg2) 70 h2 = scheduler.Scheduler.schedule_hash(msg2)
71 self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2) 71 self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2)
72 72
73 @mock.patch.object(utils.classes.ZMQSendMixin, 'send_multipart')
74 def test_on_schedule(self, send_mock):
75 override_settings = {}
76 sched = scheduler.Scheduler(override_settings=override_settings)
77
78 job_msg = json.dumps(['run', {
79 'path': 'test',
80 'args': [33, 'asdf'],
81 'kwargs': {'zeta': 'Z', 'alpha': 'α'},
82 'class_args': [0],
83 'class_kwargs': {
84 'donkey': True, 'apple': False},
85 'callable': 'do_the_thing'}])
86
87 msg = [
88 'default',
89 'run_count:3,guarantee',
90 '3',
91 job_msg,
92 None
93 ]
94
95 cron_msg = [
96 'default',
97 'run_count:3,guarantee',
98 '-1',
99 job_msg,
100 '* * * * *',
101 ]
102
103 sched.on_schedule('fake_msgid', msg)
104 self.assertEqual(1, len(sched.interval_jobs))
105 self.assertEqual(0, len(sched.cron_jobs))
106
107 self.assertEqual(1, len(json.loads(
108 sched.get_scheduled_jobs())['interval_jobs']))
109 self.assertEqual(0,
110 len(json.loads(
111 sched.get_scheduled_jobs())['cron_jobs']))
112
113 # Scheduling the same job as a cron should remove it from interval
114 sched.on_schedule('fake_msgid2', cron_msg)
115 self.assertEqual(0, len(sched.interval_jobs))
116 self.assertEqual(1, len(sched.cron_jobs))
117
118 self.assertEqual(0, len(json.loads(
119 sched.get_scheduled_jobs())['interval_jobs']))
120 self.assertEqual(1, len(json.loads(
121 sched.get_scheduled_jobs())['cron_jobs']))
122
123 # Change the job message and it should make new jobs
124 job_msg = json.dumps(['run', {
125 'path': 'test',
126 'args': [333, 'asdf'],
127 'kwargs': {'zeta': 'Z', 'alpha': 'α'},
128 'class_args': [0],
129 'class_kwargs': {
130 'donkey': True, 'apple': False},
131 'callable': 'do_the_thing'}])
132
133 msg[3] = job_msg
134
135 sched.on_schedule('fake_msgid3', msg)
136 self.assertEqual(1, len(sched.interval_jobs))
137 self.assertEqual(1, len(sched.cron_jobs))
138
139 self.assertEqual(1, len(json.loads(
140 sched.get_scheduled_jobs())['interval_jobs']))
141 self.assertEqual(1, len(json.loads(
142 sched.get_scheduled_jobs())['cron_jobs']))
143
144 # Make sure the scheduler obeyed 3 schedule commands with 'haste'
145 self.assertEqual(3, send_mock.call_count)
146
147
73# EMQP Tests 148# EMQP Tests
74 def test_reset(self): 149 def test_reset(self):
75 sched = scheduler.Scheduler() 150 sched = scheduler.Scheduler()
diff --git a/eventmq/utils/jsonencoders.py b/eventmq/utils/jsonencoders.py
new file mode 100644
index 0000000..a4d48a1
--- /dev/null
+++ b/eventmq/utils/jsonencoders.py
@@ -0,0 +1,9 @@
1import json
2
3from ..utils.timeutils import IntervalIter
4
5
6class EventMQEncoder(json.JSONEncoder):
7 def default(self, o):
8 if isinstance(o, IntervalIter):
9 return o.__dict__
diff --git a/eventmq/utils/messages.py b/eventmq/utils/messages.py
index 656d474..b664594 100644
--- a/eventmq/utils/messages.py
+++ b/eventmq/utils/messages.py
@@ -89,20 +89,23 @@ def generate_msgid(prefix=None):
89 return id if not prefix else str(prefix) + id 89 return id if not prefix else str(prefix) + id
90 90
91 91
92def send_emqp_message(socket, command, message=None): 92def send_emqp_message(socket, command, message=None, msgid=None):
93 """ 93 """
94 Formats and sends an eMQP message 94 Formats and sends an eMQP message
95 95
96 Args: 96 Args:
97 socket 97 socket (zmq.socket) Socket to send on
98 command 98 command (str) Command to send
99 message 99 msgid: (str): An optional msgid to use instead of a generated one
100 message (tuple, str) Message to send
100 Raises: 101 Raises:
101 102
102 Returns: 103 Returns:
103 str: Message id for this message 104 str: Message id for this message
104 """ 105 """
105 msgid = generate_msgid() 106 if not msgid:
107 msgid = generate_msgid()
108
106 msg = (str(command).upper(), msgid) 109 msg = (str(command).upper(), msgid)
107 if message and isinstance(message, (tuple, list)): 110 if message and isinstance(message, (tuple, list)):
108 msg += tuple(message) 111 msg += tuple(message)
@@ -114,7 +117,8 @@ def send_emqp_message(socket, command, message=None):
114 return msgid 117 return msgid
115 118
116 119
117def send_emqp_router_message(socket, recipient_id, command, message=None): 120def send_emqp_router_message(socket, recipient_id, command,
121 message=None, msgid=None):
118 """ 122 """
119 Formats and sends an eMQP message taking into account the recipient frame 123 Formats and sends an eMQP message taking into account the recipient frame
120 used by a :attr:`zmq.ROUTER` device. 124 used by a :attr:`zmq.ROUTER` device.
@@ -123,12 +127,15 @@ def send_emqp_router_message(socket, recipient_id, command, message=None):
123 socket: socket to send the message with 127 socket: socket to send the message with
124 recipient_id (str): the id of the connected device to reply to 128 recipient_id (str): the id of the connected device to reply to
125 command (str): the eMQP command to send 129 command (str): the eMQP command to send
130 msgid: (str): An optional msgid to use instead of a generated one
126 message: a msg tuple to send 131 message: a msg tuple to send
127 132
128 Returns 133 Returns
129 str: Message id for this message 134 str: Message id for this message
130 """ 135 """
131 msgid = generate_msgid() 136 if not msgid:
137 msgid = generate_msgid()
138
132 msg = (str(command).upper(), msgid) 139 msg = (str(command).upper(), msgid)
133 if message and isinstance(message, (tuple, list)): 140 if message and isinstance(message, (tuple, list)):
134 msg += message 141 msg += message