diff options
| -rwxr-xr-x | bin/emq-cli | 4 | ||||
| -rw-r--r-- | eventmq/client/jobs.py | 5 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 13 | ||||
| -rw-r--r-- | eventmq/constants.py | 5 | ||||
| -rw-r--r-- | eventmq/router.py | 38 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 49 | ||||
| -rw-r--r-- | eventmq/settings.py | 10 | ||||
| -rw-r--r-- | eventmq/tests/test_client_jobs.py | 7 | ||||
| -rw-r--r-- | eventmq/tests/test_client_messages.py | 7 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 16 | ||||
| -rw-r--r-- | eventmq/tests/test_scheduler.py | 77 | ||||
| -rw-r--r-- | eventmq/utils/jsonencoders.py | 9 | ||||
| -rw-r--r-- | eventmq/utils/messages.py | 21 |
13 files changed, 217 insertions, 44 deletions
diff --git a/bin/emq-cli b/bin/emq-cli index 333f0f6..d781984 100755 --- a/bin/emq-cli +++ b/bin/emq-cli | |||
| @@ -95,6 +95,10 @@ class Shell(cmd.Cmd): | |||
| 95 | self.send_message('STATUS', ('',)) | 95 | self.send_message('STATUS', ('',)) |
| 96 | pprint(self.recv_reply()) | 96 | pprint(self.recv_reply()) |
| 97 | 97 | ||
| 98 | def do_show_scheduled_jobs(self, line): | ||
| 99 | self.send_message(('STATUS', (STATUS_COMMANDS.show_scheduled_jobs,))) | ||
| 100 | pprint(self.recv_reply()) | ||
| 101 | |||
| 98 | def do_show_managers(self, line): | 102 | def do_show_managers(self, line): |
| 99 | """ | 103 | """ |
| 100 | Request the status of the connected job managers and queues | 104 | Request the status of the connected job managers and queues |
diff --git a/eventmq/client/jobs.py b/eventmq/client/jobs.py index b61c65b..b06370f 100644 --- a/eventmq/client/jobs.py +++ b/eventmq/client/jobs.py | |||
| @@ -86,8 +86,11 @@ class Job(object): | |||
| 86 | socket = Sender() | 86 | socket = Sender() |
| 87 | socket.connect(addr=self.broker_addr) | 87 | socket.connect(addr=self.broker_addr) |
| 88 | 88 | ||
| 89 | msgid = kwargs.get('msgid', None) | ||
| 90 | |||
| 89 | msgid = messages.defer_job( | 91 | msgid = messages.defer_job( |
| 90 | socket, f, args=args, kwargs=kwargs, queue=self.queue) | 92 | socket, f, args=args, kwargs=kwargs, queue=self.queue, |
| 93 | msgid=msgid) | ||
| 91 | 94 | ||
| 92 | return msgid | 95 | return msgid |
| 93 | else: | 96 | else: |
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py index 45b323d..7188407 100644 --- a/eventmq/client/messages.py +++ b/eventmq/client/messages.py | |||
| @@ -111,7 +111,8 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None, | |||
| 111 | def defer_job( | 111 | def defer_job( |
| 112 | socket, func, args=(), kwargs=None, class_args=(), | 112 | socket, func, args=(), kwargs=None, class_args=(), |
| 113 | class_kwargs=None, reply_requested=False, guarantee=False, | 113 | class_kwargs=None, reply_requested=False, guarantee=False, |
| 114 | retry_count=0, timeout=0, queue=conf.DEFAULT_QUEUE_NAME): | 114 | retry_count=0, timeout=0, queue=conf.DEFAULT_QUEUE_NAME, |
| 115 | msgid=None): | ||
| 115 | """ | 116 | """ |
| 116 | Used to send a job to a worker to execute via `socket`. | 117 | Used to send a job to a worker to execute via `socket`. |
| 117 | 118 | ||
| @@ -141,6 +142,7 @@ def defer_job( | |||
| 141 | queue (str): Name of queue to use when executing the job. If this value | 142 | queue (str): Name of queue to use when executing the job. If this value |
| 142 | evaluates to False, the default is used. Default: is configured | 143 | evaluates to False, the default is used. Default: is configured |
| 143 | default queue name | 144 | default queue name |
| 145 | msgid: (str): An optional msgid to use instead of a generated one | ||
| 144 | Raises: | 146 | Raises: |
| 145 | TypeError: When one or more parameters are not JSON serializable. | 147 | TypeError: When one or more parameters are not JSON serializable. |
| 146 | Returns: | 148 | Returns: |
| @@ -191,13 +193,14 @@ def defer_job( | |||
| 191 | guarantee=guarantee, | 193 | guarantee=guarantee, |
| 192 | retry_count=retry_count, | 194 | retry_count=retry_count, |
| 193 | timeout=timeout, | 195 | timeout=timeout, |
| 194 | queue=queue) | 196 | queue=queue, |
| 197 | msgid=msgid) | ||
| 195 | 198 | ||
| 196 | return msgid | 199 | return msgid |
| 197 | 200 | ||
| 198 | 201 | ||
| 199 | def send_request(socket, message, reply_requested=False, guarantee=False, | 202 | def send_request(socket, message, reply_requested=False, guarantee=False, |
| 200 | retry_count=0, timeout=0, queue=None): | 203 | retry_count=0, timeout=0, queue=None, msgid=None): |
| 201 | """ | 204 | """ |
| 202 | Send a REQUEST command. | 205 | Send a REQUEST command. |
| 203 | 206 | ||
| @@ -238,6 +241,7 @@ def send_request(socket, message, reply_requested=False, guarantee=False, | |||
| 238 | default: 0 which means infinite timeout | 241 | default: 0 which means infinite timeout |
| 239 | queue (str): Name of queue to use when executing the job. Default: is | 242 | queue (str): Name of queue to use when executing the job. Default: is |
| 240 | configured default queue name | 243 | configured default queue name |
| 244 | msgid: (str): An optional msgid to use instead of a generated one | ||
| 241 | 245 | ||
| 242 | Returns: | 246 | Returns: |
| 243 | str: ID of the message | 247 | str: ID of the message |
| @@ -259,7 +263,8 @@ def send_request(socket, message, reply_requested=False, guarantee=False, | |||
| 259 | msgid = send_emqp_message(socket, 'REQUEST', | 263 | msgid = send_emqp_message(socket, 'REQUEST', |
| 260 | (queue or conf.DEFAULT_QUEUE_NAME, | 264 | (queue or conf.DEFAULT_QUEUE_NAME, |
| 261 | ",".join(headers), | 265 | ",".join(headers), |
| 262 | serialize(message))) | 266 | serialize(message)), |
| 267 | msgid=msgid) | ||
| 263 | 268 | ||
| 264 | return msgid | 269 | return msgid |
| 265 | 270 | ||
diff --git a/eventmq/constants.py b/eventmq/constants.py index b668f1e..cf2d5d0 100644 --- a/eventmq/constants.py +++ b/eventmq/constants.py | |||
| @@ -27,11 +27,14 @@ class STATUS_COMMANDS(object): | |||
| 27 | """ | 27 | """ |
| 28 | Defines the STATUS sub commands | 28 | Defines the STATUS sub commands |
| 29 | """ | 29 | """ |
| 30 | #: Router subommand to show connected job managbers | 30 | #: Router subcommand to show connected job managbers |
| 31 | show_managers = 'show_managers' | 31 | show_managers = 'show_managers' |
| 32 | #: Router subcommand to show connected schedulers | 32 | #: Router subcommand to show connected schedulers |
| 33 | show_schedulers = 'show_schedulers' | 33 | show_schedulers = 'show_schedulers' |
| 34 | 34 | ||
| 35 | #: Scheduler subcommand to show scheduled jobs | ||
| 36 | show_scheduled_jobs = 'show_scheduled_jobs' | ||
| 37 | |||
| 35 | 38 | ||
| 36 | # ENVIRONMENT VARIABLES | 39 | # ENVIRONMENT VARIABLES |
| 37 | ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' | 40 | ENV_BROKER_ADDR = 'EMQ_CONNECT_ADDR' |
diff --git a/eventmq/router.py b/eventmq/router.py index eceb080..89f92e8 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -24,7 +24,8 @@ import signal | |||
| 24 | 24 | ||
| 25 | from . import constants, exceptions, poller, receiver | 25 | from . import constants, exceptions, poller, receiver |
| 26 | from .constants import ( | 26 | from .constants import ( |
| 27 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, STATUS, STATUS_COMMANDS | 27 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, STATUS, STATUS_CMD, |
| 28 | STATUS_COMMANDS | ||
| 28 | ) | 29 | ) |
| 29 | from .settings import conf, reload_settings | 30 | from .settings import conf, reload_settings |
| 30 | from .utils import tuplify | 31 | from .utils import tuplify |
| @@ -280,30 +281,33 @@ class Router(HeartbeatMixin): | |||
| 280 | queue_names = msg[0] | 281 | queue_names = msg[0] |
| 281 | client_type = msg[1] | 282 | client_type = msg[1] |
| 282 | 283 | ||
| 283 | if not queue_names: # Ideally, this matches some workers | ||
| 284 | logger.error('Recieved INFORM message with no defined ' | ||
| 285 | 'queues. Message was: {}'.format(msg)) | ||
| 286 | return | ||
| 287 | |||
| 288 | try: | ||
| 289 | queues = list(map(tuplify, json.loads(queue_names))) | ||
| 290 | except ValueError: | ||
| 291 | # this was invalid json | ||
| 292 | logger.error( | ||
| 293 | 'Received invalid queue names in INFORM. names:{} from:{} ' | ||
| 294 | 'type:{}'.format( | ||
| 295 | queue_names, sender, client_type)) | ||
| 296 | return | ||
| 297 | |||
| 298 | logger.info('Received INFORM request from {} (type: {})'.format( | 284 | logger.info('Received INFORM request from {} (type: {})'.format( |
| 299 | sender, client_type)) | 285 | sender, client_type)) |
| 300 | 286 | ||
| 301 | if client_type == CLIENT_TYPE.worker: | 287 | if client_type == CLIENT_TYPE.worker: |
| 288 | if not queue_names: # Ideally, this matches some workers | ||
| 289 | logger.error('Recieved INFORM message with no defined ' | ||
| 290 | 'queues. Message was: {}'.format(msg)) | ||
| 291 | return | ||
| 292 | |||
| 293 | try: | ||
| 294 | queues = list(map(tuplify, json.loads(queue_names))) | ||
| 295 | except ValueError: | ||
| 296 | # this was invalid json | ||
| 297 | logger.error( | ||
| 298 | 'Received invalid queue names in INFORM. names:{} from:{} ' | ||
| 299 | 'type:{}'.format( | ||
| 300 | queue_names, sender, client_type)) | ||
| 301 | return | ||
| 302 | |||
| 302 | self.add_worker(sender, queues) | 303 | self.add_worker(sender, queues) |
| 303 | self.send_ack(self.backend, sender, msgid) | 304 | self.send_ack(self.backend, sender, msgid) |
| 304 | elif client_type == CLIENT_TYPE.scheduler: | 305 | elif client_type == CLIENT_TYPE.scheduler: |
| 305 | self.add_scheduler(sender) | 306 | self.add_scheduler(sender) |
| 306 | self.send_ack(self.frontend, sender, msgid) | 307 | self.send_ack(self.frontend, sender, msgid) |
| 308 | else: | ||
| 309 | logger.error('Received invalid client type on INFORM ({}), ' | ||
| 310 | 'ignoring'.format(client_type)) | ||
| 307 | 311 | ||
| 308 | def on_reply(self, sender, msgid, msg): | 312 | def on_reply(self, sender, msgid, msg): |
| 309 | """ | 313 | """ |
| @@ -819,7 +823,7 @@ class Router(HeartbeatMixin): | |||
| 819 | self.send_ack( | 823 | self.send_ack( |
| 820 | self.administrative_socket, msg[0], msg[4]) | 824 | self.administrative_socket, msg[0], msg[4]) |
| 821 | self.on_disconnect(msg[4], msg) | 825 | self.on_disconnect(msg[4], msg) |
| 822 | elif msg[3] == 'STATUS': | 826 | elif msg[3] == STATUS_CMD: |
| 823 | if msg[5] == STATUS_COMMANDS.show_managers: | 827 | if msg[5] == STATUS_COMMANDS.show_managers: |
| 824 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | 828 | sendmsg(self.administrative_socket, msg[0], 'REPLY', |
| 825 | (self.get_workers_status(),)) | 829 | (self.get_workers_status(),)) |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 799ae20..db72d82 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -30,13 +30,16 @@ from six import next | |||
| 30 | 30 | ||
| 31 | from . import constants | 31 | from . import constants |
| 32 | from .client.messages import send_request | 32 | from .client.messages import send_request |
| 33 | from .constants import KBYE | 33 | from .constants import KBYE, STATUS_CMD, STATUS_COMMANDS |
| 34 | from .poller import Poller, POLLIN | 34 | from .poller import Poller, POLLIN |
| 35 | from .receiver import Receiver | ||
| 35 | from .sender import Sender | 36 | from .sender import Sender |
| 36 | from .settings import conf, reload_settings | 37 | from .settings import conf, reload_settings |
| 37 | from .utils.classes import EMQPService, HeartbeatMixin | 38 | from .utils.classes import EMQPService, HeartbeatMixin |
| 38 | from .utils.devices import generate_device_name | 39 | from .utils.devices import generate_device_name |
| 40 | from .utils.jsonencoders import EventMQEncoder | ||
| 39 | from .utils.messages import send_emqp_message as sendmsg | 41 | from .utils.messages import send_emqp_message as sendmsg |
| 42 | from .utils.messages import send_emqp_router_message as send_router_msg | ||
| 40 | from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp | 43 | from .utils.timeutils import IntervalIter, monotonic, seconds_until, timestamp |
| 41 | 44 | ||
| 42 | 45 | ||
| @@ -80,6 +83,12 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 80 | self.frontend = Sender(conf.NAME) | 83 | self.frontend = Sender(conf.NAME) |
| 81 | self._redis_server = None | 84 | self._redis_server = None |
| 82 | 85 | ||
| 86 | admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR | ||
| 87 | |||
| 88 | #: Port for administrative commands | ||
| 89 | self.administrative_socket = Receiver() | ||
| 90 | self.administrative_socket.listen(admin_addr) | ||
| 91 | |||
| 83 | # contains dict of 4-item lists representing cron jobs key of this | 92 | # contains dict of 4-item lists representing cron jobs key of this |
| 84 | # dictionary is a hash of arguments, path, and callable from the | 93 | # dictionary is a hash of arguments, path, and callable from the |
| 85 | # message of the SCHEDULE command received | 94 | # message of the SCHEDULE command received |
| @@ -105,6 +114,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 105 | self.poller = Poller() | 114 | self.poller = Poller() |
| 106 | self.load_jobs() | 115 | self.load_jobs() |
| 107 | 116 | ||
| 117 | self.poller.register(self.administrative_socket, POLLIN) | ||
| 118 | |||
| 108 | self._setup() | 119 | self._setup() |
| 109 | 120 | ||
| 110 | def load_jobs(self): | 121 | def load_jobs(self): |
| @@ -143,6 +154,19 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 143 | m_now = monotonic() | 154 | m_now = monotonic() |
| 144 | events = self.poller.poll() | 155 | events = self.poller.poll() |
| 145 | 156 | ||
| 157 | if events.get(self.administrative_socket) == POLLIN: | ||
| 158 | msg = self.administrative_socket.recv_multipart() | ||
| 159 | # ############## | ||
| 160 | # Admin Commands | ||
| 161 | # ############## | ||
| 162 | if len(msg) > 4: | ||
| 163 | if msg[3] == STATUS_CMD: | ||
| 164 | if msg[5] == STATUS_COMMANDS.show_scheduled_jobs: | ||
| 165 | send_router_msg(self.administrative_socket, | ||
| 166 | msg[0], | ||
| 167 | 'REPLY', | ||
| 168 | (self.get_scheduled_jobs(),)) | ||
| 169 | |||
| 146 | if events.get(self.frontend) == POLLIN: | 170 | if events.get(self.frontend) == POLLIN: |
| 147 | msg = self.frontend.recv_multipart() | 171 | msg = self.frontend.recv_multipart() |
| 148 | self.process_message(msg) | 172 | self.process_message(msg) |
| @@ -378,6 +402,9 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 378 | 402 | ||
| 379 | schedule_hash = self.schedule_hash(message) | 403 | schedule_hash = self.schedule_hash(message) |
| 380 | 404 | ||
| 405 | # If interval is negative, cron MUST be populated | ||
| 406 | interval_job = interval >= 0 | ||
| 407 | |||
| 381 | # Notify if this is updating existing, or new | 408 | # Notify if this is updating existing, or new |
| 382 | if (schedule_hash in self.cron_jobs or | 409 | if (schedule_hash in self.cron_jobs or |
| 383 | schedule_hash in self.interval_jobs): | 410 | schedule_hash in self.interval_jobs): |
| @@ -387,8 +414,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 387 | logger.debug('Creating a new scheduled job with %s' | 414 | logger.debug('Creating a new scheduled job with %s' |
| 388 | % schedule_hash) | 415 | % schedule_hash) |
| 389 | 416 | ||
| 390 | # If interval is negative, cron MUST be populated | 417 | if interval_job: |
| 391 | if interval >= 0: | ||
| 392 | inter_iter = IntervalIter(monotonic(), interval) | 418 | inter_iter = IntervalIter(monotonic(), interval) |
| 393 | 419 | ||
| 394 | self.interval_jobs[schedule_hash] = [ | 420 | self.interval_jobs[schedule_hash] = [ |
| @@ -436,7 +462,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 436 | if run_count > 0 or run_count == INFINITE_RUN_COUNT: | 462 | if run_count > 0 or run_count == INFINITE_RUN_COUNT: |
| 437 | # Don't allow run_count to decrement below 0 | 463 | # Don't allow run_count to decrement below 0 |
| 438 | if run_count > 0: | 464 | if run_count > 0: |
| 439 | self.interval_jobs[schedule_hash][4] -= 1 | 465 | if interval_job: |
| 466 | self.interval_jobs[schedule_hash][4] -= 1 | ||
| 440 | self.send_request(message[3], queue=queue) | 467 | self.send_request(message[3], queue=queue) |
| 441 | 468 | ||
| 442 | def get_run_count_from_headers(self, headers): | 469 | def get_run_count_from_headers(self, headers): |
| @@ -446,11 +473,25 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 446 | run_count = int(header.split(':')[1]) | 473 | run_count = int(header.split(':')[1]) |
| 447 | return run_count | 474 | return run_count |
| 448 | 475 | ||
| 476 | def on_status(self, msgid, message): | ||
| 477 | |||
| 478 | sendmsg(self.frontend, message[0], 'REPLY', (self.interval_jobs, )) | ||
| 479 | |||
| 449 | def on_heartbeat(self, msgid, message): | 480 | def on_heartbeat(self, msgid, message): |
| 450 | """ | 481 | """ |
| 451 | Noop command. The logic for heartbeating is in the event loop. | 482 | Noop command. The logic for heartbeating is in the event loop. |
| 452 | """ | 483 | """ |
| 453 | 484 | ||
| 485 | def get_scheduled_jobs(self): | ||
| 486 | |||
| 487 | return json.dumps( | ||
| 488 | { | ||
| 489 | 'interval_jobs': self.interval_jobs, | ||
| 490 | 'cron_jobs': self.cron_jobs, | ||
| 491 | 'name': self.name, | ||
| 492 | }, | ||
| 493 | cls=EventMQEncoder) | ||
| 494 | |||
| 454 | @classmethod | 495 | @classmethod |
| 455 | def schedule_hash(cls, message): | 496 | def schedule_hash(cls, message): |
| 456 | """ | 497 | """ |
diff --git a/eventmq/settings.py b/eventmq/settings.py index a25e953..826b3a1 100644 --- a/eventmq/settings.py +++ b/eventmq/settings.py | |||
| @@ -270,7 +270,15 @@ _CONFIG_DEFS = { | |||
| 270 | 'long-arg': '--redis-password', | 270 | 'long-arg': '--redis-password', |
| 271 | 'type': str, | 271 | 'type': str, |
| 272 | 'help': 'Password to redis' | 272 | 'help': 'Password to redis' |
| 273 | } | 273 | }, |
| 274 | 'ADMINISTRATIVE_LISTEN_ADDR': { | ||
| 275 | 'default': 'tcp://127.0.0.1:47294', | ||
| 276 | 'long-arg': '--administrative-listen-addr', | ||
| 277 | 'short-arg': '-S', | ||
| 278 | 'type': str, | ||
| 279 | 'help': 'Address to listen for administrative commands for ' | ||
| 280 | 'schedulers', | ||
| 281 | }, | ||
| 274 | }, | 282 | }, |
| 275 | 'publisher': { | 283 | 'publisher': { |
| 276 | 'FRONTEND_LISTEN_ADDR': { | 284 | 'FRONTEND_LISTEN_ADDR': { |
diff --git a/eventmq/tests/test_client_jobs.py b/eventmq/tests/test_client_jobs.py index c658187..c66875a 100644 --- a/eventmq/tests/test_client_jobs.py +++ b/eventmq/tests/test_client_jobs.py | |||
| @@ -34,13 +34,13 @@ class TestCase(unittest.TestCase): | |||
| 34 | self.assertEqual(d(), 12) | 34 | self.assertEqual(d(), 12) |
| 35 | self.assertEqual(d.delay(), '43e14eaa-2034-4c84-8fe7-5577c70b6a7c') | 35 | self.assertEqual(d.delay(), '43e14eaa-2034-4c84-8fe7-5577c70b6a7c') |
| 36 | defer_job_mock.assert_called_with(Sender_mock(), test_func, args=(), | 36 | defer_job_mock.assert_called_with(Sender_mock(), test_func, args=(), |
| 37 | kwargs={}, queue=None) | 37 | msgid=None, kwargs={}, queue=None) |
| 38 | 38 | ||
| 39 | d = jobs.job(test_func, broker_addr=self.BROKER_ADDR, queue='mojo') | 39 | d = jobs.job(test_func, broker_addr=self.BROKER_ADDR, queue='mojo') |
| 40 | d.delay(1, 2, three=3) | 40 | d.delay(1, 2, three=3) |
| 41 | defer_job_mock.assert_called_with(Sender_mock(), test_func, | 41 | defer_job_mock.assert_called_with(Sender_mock(), test_func, |
| 42 | args=(1, 2), kwargs={'three': 3}, | 42 | args=(1, 2), kwargs={'three': 3}, |
| 43 | queue='mojo') | 43 | msgid=None, queue='mojo') |
| 44 | 44 | ||
| 45 | @mock.patch('eventmq.client.messages.defer_job') | 45 | @mock.patch('eventmq.client.messages.defer_job') |
| 46 | @mock.patch('eventmq.client.jobs.Sender') | 46 | @mock.patch('eventmq.client.jobs.Sender') |
| @@ -85,7 +85,8 @@ class TestCase(unittest.TestCase): | |||
| 85 | sched_mock.assert_called_with( | 85 | sched_mock.assert_called_with( |
| 86 | Sender_mock(), test_func, args=(), class_args=(), | 86 | Sender_mock(), test_func, args=(), class_args=(), |
| 87 | class_kwargs=None, cron=None, headers=('guarantee',), | 87 | class_kwargs=None, cron=None, headers=('guarantee',), |
| 88 | interval_secs=None, kwargs=None, queue='default', unschedule=True) | 88 | interval_secs=None, kwargs=None, queue='default', |
| 89 | unschedule=True) | ||
| 89 | 90 | ||
| 90 | 91 | ||
| 91 | def test_func(): | 92 | def test_func(): |
diff --git a/eventmq/tests/test_client_messages.py b/eventmq/tests/test_client_messages.py index 68b847e..8939e9b 100644 --- a/eventmq/tests/test_client_messages.py +++ b/eventmq/tests/test_client_messages.py | |||
| @@ -86,6 +86,7 @@ class TestCase(unittest.TestCase): | |||
| 86 | guarantee=False, | 86 | guarantee=False, |
| 87 | retry_count=3, | 87 | retry_count=3, |
| 88 | timeout=0, | 88 | timeout=0, |
| 89 | msgid=None, | ||
| 89 | queue='test_queue') | 90 | queue='test_queue') |
| 90 | 91 | ||
| 91 | with LogCapture() as log_checker: | 92 | with LogCapture() as log_checker: |
| @@ -259,7 +260,8 @@ class TestCase(unittest.TestCase): | |||
| 259 | socket, 'REQUEST', | 260 | socket, 'REQUEST', |
| 260 | ('mozo', | 261 | ('mozo', |
| 261 | 'reply-requested,retry-count:2', | 262 | 'reply-requested,retry-count:2', |
| 262 | messages.serialize(msg))) | 263 | messages.serialize(msg)), |
| 264 | msgid=None) | ||
| 263 | 265 | ||
| 264 | @mock.patch('eventmq.client.messages.send_emqp_message') | 266 | @mock.patch('eventmq.client.messages.send_emqp_message') |
| 265 | def test_send_request_all_headers(self, snd_empq_msg_mock): | 267 | def test_send_request_all_headers(self, snd_empq_msg_mock): |
| @@ -282,7 +284,8 @@ class TestCase(unittest.TestCase): | |||
| 282 | socket, 'REQUEST', | 284 | socket, 'REQUEST', |
| 283 | ('default', | 285 | ('default', |
| 284 | 'reply-requested,guarantee,retry-count:2,timeout:3', | 286 | 'reply-requested,guarantee,retry-count:2,timeout:3', |
| 285 | messages.serialize(msg))) | 287 | messages.serialize(msg)), |
| 288 | msgid=None) | ||
| 286 | 289 | ||
| 287 | @mock.patch('eventmq.client.messages.send_emqp_message') | 290 | @mock.patch('eventmq.client.messages.send_emqp_message') |
| 288 | def test_send_schedule_request(self, snd_empq_msg_mock): | 291 | def test_send_schedule_request(self, snd_empq_msg_mock): |
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index 0986f92..a737f89 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py | |||
| @@ -75,9 +75,14 @@ class TestCase(unittest.TestCase): | |||
| 75 | 75 | ||
| 76 | log_checker.check( | 76 | log_checker.check( |
| 77 | ('eventmq.router', | 77 | ('eventmq.router', |
| 78 | 'INFO', | ||
| 79 | 'Received INFORM request from {} (type: worker)'.format( | ||
| 80 | sender_id)), | ||
| 81 | |||
| 82 | ('eventmq.router', | ||
| 78 | 'ERROR', | 83 | 'ERROR', |
| 79 | 'Recieved INFORM message with no defined queues. Message ' | 84 | "Recieved INFORM message with no defined queues. Message " |
| 80 | 'was: {}'.format(msg)) | 85 | "was: {}".format(msg)), |
| 81 | ) | 86 | ) |
| 82 | 87 | ||
| 83 | def test_on_inform_invalid_queues(self): | 88 | def test_on_inform_invalid_queues(self): |
| @@ -93,9 +98,14 @@ class TestCase(unittest.TestCase): | |||
| 93 | 98 | ||
| 94 | log_checker.check( | 99 | log_checker.check( |
| 95 | ('eventmq.router', | 100 | ('eventmq.router', |
| 101 | 'INFO', | ||
| 102 | 'Received INFORM request from {} (type: worker)'.format( | ||
| 103 | sender_id)), | ||
| 104 | |||
| 105 | ('eventmq.router', | ||
| 96 | 'ERROR', | 106 | 'ERROR', |
| 97 | 'Received invalid queue names in INFORM. names:{} from:{} ' | 107 | 'Received invalid queue names in INFORM. names:{} from:{} ' |
| 98 | 'type:{}'.format(msg[0], sender_id, msg[1])) | 108 | 'type:{}'.format(msg[0], sender_id, msg[1])), |
| 99 | ) | 109 | ) |
| 100 | 110 | ||
| 101 | # @mock.patch('eventmq.router.Router.prioritize_queue_list') | 111 | # @mock.patch('eventmq.router.Router.prioritize_queue_list') |
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py index dc96b09..82b459d 100644 --- a/eventmq/tests/test_scheduler.py +++ b/eventmq/tests/test_scheduler.py | |||
| @@ -18,7 +18,7 @@ import unittest | |||
| 18 | 18 | ||
| 19 | import mock | 19 | import mock |
| 20 | 20 | ||
| 21 | from .. import constants, scheduler | 21 | from .. import constants, scheduler, utils |
| 22 | 22 | ||
| 23 | 23 | ||
| 24 | class TestCase(unittest.TestCase): | 24 | class TestCase(unittest.TestCase): |
| @@ -70,6 +70,81 @@ class TestCase(unittest.TestCase): | |||
| 70 | h2 = scheduler.Scheduler.schedule_hash(msg2) | 70 | h2 = scheduler.Scheduler.schedule_hash(msg2) |
| 71 | self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2) | 71 | self.assertEqual('4658982cab9d32bf1ef9113a9d8bdec01775e2bc', h2) |
| 72 | 72 | ||
| 73 | @mock.patch.object(utils.classes.ZMQSendMixin, 'send_multipart') | ||
| 74 | def test_on_schedule(self, send_mock): | ||
| 75 | override_settings = {} | ||
| 76 | sched = scheduler.Scheduler(override_settings=override_settings) | ||
| 77 | |||
| 78 | job_msg = json.dumps(['run', { | ||
| 79 | 'path': 'test', | ||
| 80 | 'args': [33, 'asdf'], | ||
| 81 | 'kwargs': {'zeta': 'Z', 'alpha': 'α'}, | ||
| 82 | 'class_args': [0], | ||
| 83 | 'class_kwargs': { | ||
| 84 | 'donkey': True, 'apple': False}, | ||
| 85 | 'callable': 'do_the_thing'}]) | ||
| 86 | |||
| 87 | msg = [ | ||
| 88 | 'default', | ||
| 89 | 'run_count:3,guarantee', | ||
| 90 | '3', | ||
| 91 | job_msg, | ||
| 92 | None | ||
| 93 | ] | ||
| 94 | |||
| 95 | cron_msg = [ | ||
| 96 | 'default', | ||
| 97 | 'run_count:3,guarantee', | ||
| 98 | '-1', | ||
| 99 | job_msg, | ||
| 100 | '* * * * *', | ||
| 101 | ] | ||
| 102 | |||
| 103 | sched.on_schedule('fake_msgid', msg) | ||
| 104 | self.assertEqual(1, len(sched.interval_jobs)) | ||
| 105 | self.assertEqual(0, len(sched.cron_jobs)) | ||
| 106 | |||
| 107 | self.assertEqual(1, len(json.loads( | ||
| 108 | sched.get_scheduled_jobs())['interval_jobs'])) | ||
| 109 | self.assertEqual(0, | ||
| 110 | len(json.loads( | ||
| 111 | sched.get_scheduled_jobs())['cron_jobs'])) | ||
| 112 | |||
| 113 | # Scheduling the same job as a cron should remove it from interval | ||
| 114 | sched.on_schedule('fake_msgid2', cron_msg) | ||
| 115 | self.assertEqual(0, len(sched.interval_jobs)) | ||
| 116 | self.assertEqual(1, len(sched.cron_jobs)) | ||
| 117 | |||
| 118 | self.assertEqual(0, len(json.loads( | ||
| 119 | sched.get_scheduled_jobs())['interval_jobs'])) | ||
| 120 | self.assertEqual(1, len(json.loads( | ||
| 121 | sched.get_scheduled_jobs())['cron_jobs'])) | ||
| 122 | |||
| 123 | # Change the job message and it should make new jobs | ||
| 124 | job_msg = json.dumps(['run', { | ||
| 125 | 'path': 'test', | ||
| 126 | 'args': [333, 'asdf'], | ||
| 127 | 'kwargs': {'zeta': 'Z', 'alpha': 'α'}, | ||
| 128 | 'class_args': [0], | ||
| 129 | 'class_kwargs': { | ||
| 130 | 'donkey': True, 'apple': False}, | ||
| 131 | 'callable': 'do_the_thing'}]) | ||
| 132 | |||
| 133 | msg[3] = job_msg | ||
| 134 | |||
| 135 | sched.on_schedule('fake_msgid3', msg) | ||
| 136 | self.assertEqual(1, len(sched.interval_jobs)) | ||
| 137 | self.assertEqual(1, len(sched.cron_jobs)) | ||
| 138 | |||
| 139 | self.assertEqual(1, len(json.loads( | ||
| 140 | sched.get_scheduled_jobs())['interval_jobs'])) | ||
| 141 | self.assertEqual(1, len(json.loads( | ||
| 142 | sched.get_scheduled_jobs())['cron_jobs'])) | ||
| 143 | |||
| 144 | # Make sure the scheduler obeyed 3 schedule commands with 'haste' | ||
| 145 | self.assertEqual(3, send_mock.call_count) | ||
| 146 | |||
| 147 | |||
| 73 | # EMQP Tests | 148 | # EMQP Tests |
| 74 | def test_reset(self): | 149 | def test_reset(self): |
| 75 | sched = scheduler.Scheduler() | 150 | sched = scheduler.Scheduler() |
diff --git a/eventmq/utils/jsonencoders.py b/eventmq/utils/jsonencoders.py new file mode 100644 index 0000000..a4d48a1 --- /dev/null +++ b/eventmq/utils/jsonencoders.py | |||
| @@ -0,0 +1,9 @@ | |||
| 1 | import json | ||
| 2 | |||
| 3 | from ..utils.timeutils import IntervalIter | ||
| 4 | |||
| 5 | |||
| 6 | class EventMQEncoder(json.JSONEncoder): | ||
| 7 | def default(self, o): | ||
| 8 | if isinstance(o, IntervalIter): | ||
| 9 | return o.__dict__ | ||
diff --git a/eventmq/utils/messages.py b/eventmq/utils/messages.py index 656d474..b664594 100644 --- a/eventmq/utils/messages.py +++ b/eventmq/utils/messages.py | |||
| @@ -89,20 +89,23 @@ def generate_msgid(prefix=None): | |||
| 89 | return id if not prefix else str(prefix) + id | 89 | return id if not prefix else str(prefix) + id |
| 90 | 90 | ||
| 91 | 91 | ||
| 92 | def send_emqp_message(socket, command, message=None): | 92 | def send_emqp_message(socket, command, message=None, msgid=None): |
| 93 | """ | 93 | """ |
| 94 | Formats and sends an eMQP message | 94 | Formats and sends an eMQP message |
| 95 | 95 | ||
| 96 | Args: | 96 | Args: |
| 97 | socket | 97 | socket (zmq.socket) Socket to send on |
| 98 | command | 98 | command (str) Command to send |
| 99 | message | 99 | msgid: (str): An optional msgid to use instead of a generated one |
| 100 | message (tuple, str) Message to send | ||
| 100 | Raises: | 101 | Raises: |
| 101 | 102 | ||
| 102 | Returns: | 103 | Returns: |
| 103 | str: Message id for this message | 104 | str: Message id for this message |
| 104 | """ | 105 | """ |
| 105 | msgid = generate_msgid() | 106 | if not msgid: |
| 107 | msgid = generate_msgid() | ||
| 108 | |||
| 106 | msg = (str(command).upper(), msgid) | 109 | msg = (str(command).upper(), msgid) |
| 107 | if message and isinstance(message, (tuple, list)): | 110 | if message and isinstance(message, (tuple, list)): |
| 108 | msg += tuple(message) | 111 | msg += tuple(message) |
| @@ -114,7 +117,8 @@ def send_emqp_message(socket, command, message=None): | |||
| 114 | return msgid | 117 | return msgid |
| 115 | 118 | ||
| 116 | 119 | ||
| 117 | def send_emqp_router_message(socket, recipient_id, command, message=None): | 120 | def send_emqp_router_message(socket, recipient_id, command, |
| 121 | message=None, msgid=None): | ||
| 118 | """ | 122 | """ |
| 119 | Formats and sends an eMQP message taking into account the recipient frame | 123 | Formats and sends an eMQP message taking into account the recipient frame |
| 120 | used by a :attr:`zmq.ROUTER` device. | 124 | used by a :attr:`zmq.ROUTER` device. |
| @@ -123,12 +127,15 @@ def send_emqp_router_message(socket, recipient_id, command, message=None): | |||
| 123 | socket: socket to send the message with | 127 | socket: socket to send the message with |
| 124 | recipient_id (str): the id of the connected device to reply to | 128 | recipient_id (str): the id of the connected device to reply to |
| 125 | command (str): the eMQP command to send | 129 | command (str): the eMQP command to send |
| 130 | msgid: (str): An optional msgid to use instead of a generated one | ||
| 126 | message: a msg tuple to send | 131 | message: a msg tuple to send |
| 127 | 132 | ||
| 128 | Returns | 133 | Returns |
| 129 | str: Message id for this message | 134 | str: Message id for this message |
| 130 | """ | 135 | """ |
| 131 | msgid = generate_msgid() | 136 | if not msgid: |
| 137 | msgid = generate_msgid() | ||
| 138 | |||
| 132 | msg = (str(command).upper(), msgid) | 139 | msg = (str(command).upper(), msgid) |
| 133 | if message and isinstance(message, (tuple, list)): | 140 | if message and isinstance(message, (tuple, list)): |
| 134 | msg += message | 141 | msg += message |