diff options
| author | David Hurst | 2017-05-03 15:08:50 -0600 |
|---|---|---|
| committer | David Hurst | 2017-05-03 15:11:00 -0600 |
| commit | b71a744de644d1ae0121e9c139287fc29e710127 (patch) | |
| tree | 8621c068e1d1191e1df8eb534ddbea4bed0e25e2 | |
| parent | 30ae820f682e62e5522970832071a62837886a7d (diff) | |
| download | eventmq-b71a744de644d1ae0121e9c139287fc29e710127.tar.gz eventmq-b71a744de644d1ae0121e9c139287fc29e710127.zip | |
Add on_schedule tests
| -rw-r--r-- | eventmq/router.py | 3 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 9 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 16 | ||||
| -rw-r--r-- | eventmq/tests/test_scheduler.py | 71 |
4 files changed, 93 insertions, 6 deletions
diff --git a/eventmq/router.py b/eventmq/router.py index d2b442a..56d7ebe 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -322,6 +322,9 @@ class Router(HeartbeatMixin): | |||
| 322 | elif client_type == CLIENT_TYPE.scheduler: | 322 | elif client_type == CLIENT_TYPE.scheduler: |
| 323 | self.add_scheduler(sender) | 323 | self.add_scheduler(sender) |
| 324 | self.send_ack(self.frontend, sender, msgid) | 324 | self.send_ack(self.frontend, sender, msgid) |
| 325 | else: | ||
| 326 | logger.error('Received invalid client type on INFORM ({}), ' | ||
| 327 | 'ignoring'.format(client_type)) | ||
| 325 | 328 | ||
| 326 | def on_reply(self, sender, msgid, msg): | 329 | def on_reply(self, sender, msgid, msg): |
| 327 | """ | 330 | """ |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 09f8db2..3d3f856 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -372,6 +372,9 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 372 | 372 | ||
| 373 | schedule_hash = self.schedule_hash(message) | 373 | schedule_hash = self.schedule_hash(message) |
| 374 | 374 | ||
| 375 | # If interval is negative, cron MUST be populated | ||
| 376 | interval_job = interval >= 0 | ||
| 377 | |||
| 375 | # Notify if this is updating existing, or new | 378 | # Notify if this is updating existing, or new |
| 376 | if (schedule_hash in self.cron_jobs or | 379 | if (schedule_hash in self.cron_jobs or |
| 377 | schedule_hash in self.interval_jobs): | 380 | schedule_hash in self.interval_jobs): |
| @@ -381,8 +384,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 381 | logger.debug('Creating a new scheduled job with %s' | 384 | logger.debug('Creating a new scheduled job with %s' |
| 382 | % schedule_hash) | 385 | % schedule_hash) |
| 383 | 386 | ||
| 384 | # If interval is negative, cron MUST be populated | 387 | if interval_job: |
| 385 | if interval >= 0: | ||
| 386 | inter_iter = IntervalIter(monotonic(), interval) | 388 | inter_iter = IntervalIter(monotonic(), interval) |
| 387 | 389 | ||
| 388 | self.interval_jobs[schedule_hash] = [ | 390 | self.interval_jobs[schedule_hash] = [ |
| @@ -430,7 +432,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 430 | if run_count > 0 or run_count == INFINITE_RUN_COUNT: | 432 | if run_count > 0 or run_count == INFINITE_RUN_COUNT: |
| 431 | # Don't allow run_count to decrement below 0 | 433 | # Don't allow run_count to decrement below 0 |
| 432 | if run_count > 0: | 434 | if run_count > 0: |
| 433 | self.interval_jobs[schedule_hash][4] -= 1 | 435 | if interval_job: |
| 436 | self.interval_jobs[schedule_hash][4] -= 1 | ||
| 434 | self.send_request(message[3], queue=queue) | 437 | self.send_request(message[3], queue=queue) |
| 435 | 438 | ||
| 436 | def get_run_count_from_headers(self, headers): | 439 | def get_run_count_from_headers(self, headers): |
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index 0986f92..a737f89 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py | |||
| @@ -75,9 +75,14 @@ class TestCase(unittest.TestCase): | |||
| 75 | 75 | ||
| 76 | log_checker.check( | 76 | log_checker.check( |
| 77 | ('eventmq.router', | 77 | ('eventmq.router', |
| 78 | 'INFO', | ||
| 79 | 'Received INFORM request from {} (type: worker)'.format( | ||
| 80 | sender_id)), | ||
| 81 | |||
| 82 | ('eventmq.router', | ||
| 78 | 'ERROR', | 83 | 'ERROR', |
| 79 | 'Recieved INFORM message with no defined queues. Message ' | 84 | "Recieved INFORM message with no defined queues. Message " |
| 80 | 'was: {}'.format(msg)) | 85 | "was: {}".format(msg)), |
| 81 | ) | 86 | ) |
| 82 | 87 | ||
| 83 | def test_on_inform_invalid_queues(self): | 88 | def test_on_inform_invalid_queues(self): |
| @@ -93,9 +98,14 @@ class TestCase(unittest.TestCase): | |||
| 93 | 98 | ||
| 94 | log_checker.check( | 99 | log_checker.check( |
| 95 | ('eventmq.router', | 100 | ('eventmq.router', |
| 101 | 'INFO', | ||
| 102 | 'Received INFORM request from {} (type: worker)'.format( | ||
| 103 | sender_id)), | ||
| 104 | |||
| 105 | ('eventmq.router', | ||
| 96 | 'ERROR', | 106 | 'ERROR', |
| 97 | 'Received invalid queue names in INFORM. names:{} from:{} ' | 107 | 'Received invalid queue names in INFORM. names:{} from:{} ' |
| 98 | 'type:{}'.format(msg[0], sender_id, msg[1])) | 108 | 'type:{}'.format(msg[0], sender_id, msg[1])), |
| 99 | ) | 109 | ) |
| 100 | 110 | ||
| 101 | # @mock.patch('eventmq.router.Router.prioritize_queue_list') | 111 | # @mock.patch('eventmq.router.Router.prioritize_queue_list') |
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py index ab0b51f..9f21354 100644 --- a/eventmq/tests/test_scheduler.py +++ b/eventmq/tests/test_scheduler.py | |||
| @@ -66,6 +66,77 @@ class TestCase(unittest.TestCase): | |||
| 66 | h2 = scheduler.Scheduler.schedule_hash(msg2) | 66 | h2 = scheduler.Scheduler.schedule_hash(msg2) |
| 67 | self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2) | 67 | self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2) |
| 68 | 68 | ||
| 69 | def test_on_schedule(self): | ||
| 70 | override_settings = {} | ||
| 71 | sched = scheduler.Scheduler(override_settings=override_settings) | ||
| 72 | |||
| 73 | job_msg = json.dumps(['run', { | ||
| 74 | 'path': 'test', | ||
| 75 | 'args': [33, 'asdf'], | ||
| 76 | 'kwargs': {'zeta': 'Z', 'alpha': 'α'}, | ||
| 77 | 'class_args': [0], | ||
| 78 | 'class_kwargs': { | ||
| 79 | 'donkey': True, 'apple': False}, | ||
| 80 | 'callable': 'do_the_thing'}]) | ||
| 81 | |||
| 82 | msg = [ | ||
| 83 | 'default', | ||
| 84 | 'run_count:3,guarantee', | ||
| 85 | '3', | ||
| 86 | job_msg, | ||
| 87 | None | ||
| 88 | ] | ||
| 89 | |||
| 90 | cron_msg = [ | ||
| 91 | 'default', | ||
| 92 | 'run_count:3,guarantee', | ||
| 93 | '-1', | ||
| 94 | job_msg, | ||
| 95 | '* * * * *', | ||
| 96 | ] | ||
| 97 | |||
| 98 | sched.on_schedule('fake_msgid', msg) | ||
| 99 | self.assertEqual(1, len(sched.interval_jobs)) | ||
| 100 | self.assertEqual(0, len(sched.cron_jobs)) | ||
| 101 | |||
| 102 | self.assertEqual(1, len(json.loads( | ||
| 103 | sched.get_scheduled_jobs())['interval_jobs'])) | ||
| 104 | self.assertEqual(0, | ||
| 105 | len(json.loads( | ||
| 106 | sched.get_scheduled_jobs())['cron_jobs'])) | ||
| 107 | |||
| 108 | # Scheduling the same job as a cron should remove it from interval | ||
| 109 | sched.on_schedule('fake_msgid2', cron_msg) | ||
| 110 | self.assertEqual(0, len(sched.interval_jobs)) | ||
| 111 | self.assertEqual(1, len(sched.cron_jobs)) | ||
| 112 | |||
| 113 | self.assertEqual(0, len(json.loads( | ||
| 114 | sched.get_scheduled_jobs())['interval_jobs'])) | ||
| 115 | self.assertEqual(1, len(json.loads( | ||
| 116 | sched.get_scheduled_jobs())['cron_jobs'])) | ||
| 117 | |||
| 118 | # Change the job message and it should make new jobs | ||
| 119 | job_msg = json.dumps(['run', { | ||
| 120 | 'path': 'test', | ||
| 121 | 'args': [333, 'asdf'], | ||
| 122 | 'kwargs': {'zeta': 'Z', 'alpha': 'α'}, | ||
| 123 | 'class_args': [0], | ||
| 124 | 'class_kwargs': { | ||
| 125 | 'donkey': True, 'apple': False}, | ||
| 126 | 'callable': 'do_the_thing'}]) | ||
| 127 | |||
| 128 | msg[3] = job_msg | ||
| 129 | |||
| 130 | sched.on_schedule('fake_msgid3', msg) | ||
| 131 | self.assertEqual(1, len(sched.interval_jobs)) | ||
| 132 | self.assertEqual(1, len(sched.cron_jobs)) | ||
| 133 | |||
| 134 | self.assertEqual(1, len(json.loads( | ||
| 135 | sched.get_scheduled_jobs())['interval_jobs'])) | ||
| 136 | self.assertEqual(1, len(json.loads( | ||
| 137 | sched.get_scheduled_jobs())['cron_jobs'])) | ||
| 138 | |||
| 139 | |||
| 69 | # EMQP Tests | 140 | # EMQP Tests |
| 70 | def test_reset(self): | 141 | def test_reset(self): |
| 71 | sched = scheduler.Scheduler() | 142 | sched = scheduler.Scheduler() |