aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eventmq/router.py3
-rw-r--r--eventmq/scheduler.py9
-rw-r--r--eventmq/tests/test_router.py16
-rw-r--r--eventmq/tests/test_scheduler.py71
4 files changed, 93 insertions, 6 deletions
diff --git a/eventmq/router.py b/eventmq/router.py
index d2b442a..56d7ebe 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -322,6 +322,9 @@ class Router(HeartbeatMixin):
322 elif client_type == CLIENT_TYPE.scheduler: 322 elif client_type == CLIENT_TYPE.scheduler:
323 self.add_scheduler(sender) 323 self.add_scheduler(sender)
324 self.send_ack(self.frontend, sender, msgid) 324 self.send_ack(self.frontend, sender, msgid)
325 else:
326 logger.error('Received invalid client type on INFORM ({}), '
327 'ignoring'.format(client_type))
325 328
326 def on_reply(self, sender, msgid, msg): 329 def on_reply(self, sender, msgid, msg):
327 """ 330 """
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 09f8db2..3d3f856 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -372,6 +372,9 @@ class Scheduler(HeartbeatMixin, EMQPService):
372 372
373 schedule_hash = self.schedule_hash(message) 373 schedule_hash = self.schedule_hash(message)
374 374
375 # If interval is negative, cron MUST be populated
376 interval_job = interval >= 0
377
375 # Notify if this is updating existing, or new 378 # Notify if this is updating existing, or new
376 if (schedule_hash in self.cron_jobs or 379 if (schedule_hash in self.cron_jobs or
377 schedule_hash in self.interval_jobs): 380 schedule_hash in self.interval_jobs):
@@ -381,8 +384,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
381 logger.debug('Creating a new scheduled job with %s' 384 logger.debug('Creating a new scheduled job with %s'
382 % schedule_hash) 385 % schedule_hash)
383 386
384 # If interval is negative, cron MUST be populated 387 if interval_job:
385 if interval >= 0:
386 inter_iter = IntervalIter(monotonic(), interval) 388 inter_iter = IntervalIter(monotonic(), interval)
387 389
388 self.interval_jobs[schedule_hash] = [ 390 self.interval_jobs[schedule_hash] = [
@@ -430,7 +432,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
430 if run_count > 0 or run_count == INFINITE_RUN_COUNT: 432 if run_count > 0 or run_count == INFINITE_RUN_COUNT:
431 # Don't allow run_count to decrement below 0 433 # Don't allow run_count to decrement below 0
432 if run_count > 0: 434 if run_count > 0:
433 self.interval_jobs[schedule_hash][4] -= 1 435 if interval_job:
436 self.interval_jobs[schedule_hash][4] -= 1
434 self.send_request(message[3], queue=queue) 437 self.send_request(message[3], queue=queue)
435 438
436 def get_run_count_from_headers(self, headers): 439 def get_run_count_from_headers(self, headers):
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index 0986f92..a737f89 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -75,9 +75,14 @@ class TestCase(unittest.TestCase):
75 75
76 log_checker.check( 76 log_checker.check(
77 ('eventmq.router', 77 ('eventmq.router',
78 'INFO',
79 'Received INFORM request from {} (type: worker)'.format(
80 sender_id)),
81
82 ('eventmq.router',
78 'ERROR', 83 'ERROR',
79 'Recieved INFORM message with no defined queues. Message ' 84 "Recieved INFORM message with no defined queues. Message "
80 'was: {}'.format(msg)) 85 "was: {}".format(msg)),
81 ) 86 )
82 87
83 def test_on_inform_invalid_queues(self): 88 def test_on_inform_invalid_queues(self):
@@ -93,9 +98,14 @@ class TestCase(unittest.TestCase):
93 98
94 log_checker.check( 99 log_checker.check(
95 ('eventmq.router', 100 ('eventmq.router',
101 'INFO',
102 'Received INFORM request from {} (type: worker)'.format(
103 sender_id)),
104
105 ('eventmq.router',
96 'ERROR', 106 'ERROR',
97 'Received invalid queue names in INFORM. names:{} from:{} ' 107 'Received invalid queue names in INFORM. names:{} from:{} '
98 'type:{}'.format(msg[0], sender_id, msg[1])) 108 'type:{}'.format(msg[0], sender_id, msg[1])),
99 ) 109 )
100 110
101 # @mock.patch('eventmq.router.Router.prioritize_queue_list') 111 # @mock.patch('eventmq.router.Router.prioritize_queue_list')
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py
index ab0b51f..9f21354 100644
--- a/eventmq/tests/test_scheduler.py
+++ b/eventmq/tests/test_scheduler.py
@@ -66,6 +66,77 @@ class TestCase(unittest.TestCase):
66 h2 = scheduler.Scheduler.schedule_hash(msg2) 66 h2 = scheduler.Scheduler.schedule_hash(msg2)
67 self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2) 67 self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2)
68 68
69 def test_on_schedule(self):
70 override_settings = {}
71 sched = scheduler.Scheduler(override_settings=override_settings)
72
73 job_msg = json.dumps(['run', {
74 'path': 'test',
75 'args': [33, 'asdf'],
76 'kwargs': {'zeta': 'Z', 'alpha': 'α'},
77 'class_args': [0],
78 'class_kwargs': {
79 'donkey': True, 'apple': False},
80 'callable': 'do_the_thing'}])
81
82 msg = [
83 'default',
84 'run_count:3,guarantee',
85 '3',
86 job_msg,
87 None
88 ]
89
90 cron_msg = [
91 'default',
92 'run_count:3,guarantee',
93 '-1',
94 job_msg,
95 '* * * * *',
96 ]
97
98 sched.on_schedule('fake_msgid', msg)
99 self.assertEqual(1, len(sched.interval_jobs))
100 self.assertEqual(0, len(sched.cron_jobs))
101
102 self.assertEqual(1, len(json.loads(
103 sched.get_scheduled_jobs())['interval_jobs']))
104 self.assertEqual(0,
105 len(json.loads(
106 sched.get_scheduled_jobs())['cron_jobs']))
107
108 # Scheduling the same job as a cron should remove it from interval
109 sched.on_schedule('fake_msgid2', cron_msg)
110 self.assertEqual(0, len(sched.interval_jobs))
111 self.assertEqual(1, len(sched.cron_jobs))
112
113 self.assertEqual(0, len(json.loads(
114 sched.get_scheduled_jobs())['interval_jobs']))
115 self.assertEqual(1, len(json.loads(
116 sched.get_scheduled_jobs())['cron_jobs']))
117
118 # Change the job message and it should make new jobs
119 job_msg = json.dumps(['run', {
120 'path': 'test',
121 'args': [333, 'asdf'],
122 'kwargs': {'zeta': 'Z', 'alpha': 'α'},
123 'class_args': [0],
124 'class_kwargs': {
125 'donkey': True, 'apple': False},
126 'callable': 'do_the_thing'}])
127
128 msg[3] = job_msg
129
130 sched.on_schedule('fake_msgid3', msg)
131 self.assertEqual(1, len(sched.interval_jobs))
132 self.assertEqual(1, len(sched.cron_jobs))
133
134 self.assertEqual(1, len(json.loads(
135 sched.get_scheduled_jobs())['interval_jobs']))
136 self.assertEqual(1, len(json.loads(
137 sched.get_scheduled_jobs())['cron_jobs']))
138
139
69# EMQP Tests 140# EMQP Tests
70 def test_reset(self): 141 def test_reset(self):
71 sched = scheduler.Scheduler() 142 sched = scheduler.Scheduler()