diff options
| author | jason | 2016-05-24 16:49:54 -0600 |
|---|---|---|
| committer | jason | 2016-05-24 16:49:54 -0600 |
| commit | a592ba06ea019bbb0051ffe5fc006d98560d038a (patch) | |
| tree | 588590a6423a19c59aa2cac9f57b7025f4f5f44b | |
| parent | 2c599d965493b3658ffadc0c3e9e1b5a209cf1ba (diff) | |
| download | eventmq-a592ba06ea019bbb0051ffe5fc006d98560d038a.tar.gz eventmq-a592ba06ea019bbb0051ffe5fc006d98560d038a.zip | |
More work for named queues
- Rename `WORKERS` setting to `CONCURRENT_JOBS` for more clarity. Added this
setting to the command line options, the ini .conf & default settings conf.py
files.
- Added support for JSON style arrays in INI config.
- Added support for weighted named queues. The style for the setting is
[[weight, "name"], [weight, "name"]]. Configured in both the INI and command
line for job manager. Added documentation.
- Updated the spec for the INFORM message. Weights are sent with the queue
names. If there are no weights specified they will be given the default value
of 0.
- updated Router.queues to a list from a deque so that it can be sorted
by priority more easily.
| -rwxr-xr-x | bin/emq-jobmanager | 18 | ||||
| -rw-r--r-- | docs/conf.py | 7 | ||||
| -rw-r--r-- | docs/protocol.rst | 13 | ||||
| -rw-r--r-- | docs/settings_file.rst | 34 | ||||
| -rw-r--r-- | etc/eventmq.docker.conf | 13 | ||||
| -rw-r--r-- | eventmq/conf.py | 42 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 20 | ||||
| -rw-r--r-- | eventmq/router.py | 93 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 11 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 336 | ||||
| -rw-r--r-- | eventmq/tests/test_utils.py | 70 | ||||
| -rw-r--r-- | eventmq/utils/__init__.py | 12 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 28 | ||||
| -rw-r--r-- | eventmq/utils/settings.py | 13 | ||||
| -rw-r--r-- | eventmq/worker.py | 2 |
15 files changed, 466 insertions, 246 deletions
diff --git a/bin/emq-jobmanager b/bin/emq-jobmanager index 82c2e9b..4071ada 100755 --- a/bin/emq-jobmanager +++ b/bin/emq-jobmanager | |||
| @@ -21,12 +21,19 @@ from eventmq.jobmanager import JobManager | |||
| 21 | from eventmq import conf | 21 | from eventmq import conf |
| 22 | 22 | ||
| 23 | if __name__ == "__main__": | 23 | if __name__ == "__main__": |
| 24 | parser = argparse.ArgumentParser(description='Listen for requests') | 24 | parser = argparse.ArgumentParser(description='Listen for job requests and ' |
| 25 | parser.add_argument('--config', '-c', type=str, nargs='?', | 25 | 'manage their execution') |
| 26 | parser.add_argument('--broker-addr', '-B', type=str, nargs='?', | ||
| 27 | help='manually specify the broker address to connect ' | ||
| 28 | 'to in order to receive jobs') | ||
| 29 | parser.add_argument('--config', '-C', type=str, nargs='?', | ||
| 26 | help='manually specify the location of eventmq.conf') | 30 | help='manually specify the location of eventmq.conf') |
| 27 | parser.add_argument('--queues', '-Q', type=str, nargs='+', | 31 | parser.add_argument('--queues', '-Q', type=str, nargs='+', |
| 28 | help='space separated list of queue names to listen ' | 32 | help='space separated list of queue names to listen ' |
| 29 | 'on') | 33 | 'on') |
| 34 | parser.add_argument('--jobs', '-J', type=int, nargs='?', | ||
| 35 | help='the max number of concurrent jobs to manage at ' | ||
| 36 | 'a time') | ||
| 30 | 37 | ||
| 31 | args = parser.parse_args() | 38 | args = parser.parse_args() |
| 32 | 39 | ||
| @@ -39,5 +46,8 @@ if __name__ == "__main__": | |||
| 39 | if queues: | 46 | if queues: |
| 40 | queues = ','.join(queues) | 47 | queues = ','.join(queues) |
| 41 | 48 | ||
| 42 | j = JobManager(queues=queues) | 49 | broker_addr = args.broker_addr |
| 43 | j.jobmanager_main() | 50 | concurrent_jobs = args.jobs |
| 51 | |||
| 52 | j = JobManager(queues=queues, concurrent_jobs=concurrent_jobs) | ||
| 53 | j.jobmanager_main(broker_addr=broker_addr) | ||
diff --git a/docs/conf.py b/docs/conf.py index 864bbcf..aa23956 100644 --- a/docs/conf.py +++ b/docs/conf.py | |||
| @@ -16,6 +16,7 @@ import sys | |||
| 16 | import os | 16 | import os |
| 17 | import shlex | 17 | import shlex |
| 18 | 18 | ||
| 19 | import eventmq # for __version__ | ||
| 19 | # If extensions (or modules to document with autodoc) are in another directory, | 20 | # If extensions (or modules to document with autodoc) are in another directory, |
| 20 | # add these directories to sys.path here. If the directory is relative to the | 21 | # add these directories to sys.path here. If the directory is relative to the |
| 21 | # documentation root, use os.path.abspath to make it absolute, like shown here. | 22 | # documentation root, use os.path.abspath to make it absolute, like shown here. |
| @@ -52,15 +53,15 @@ master_doc = 'index' | |||
| 52 | 53 | ||
| 53 | # General information about the project. | 54 | # General information about the project. |
| 54 | project = u'EventMQ' | 55 | project = u'EventMQ' |
| 55 | copyright = u'2015, eventboard.io' | 56 | copyright = u'2016, eventboard.io' |
| 56 | author = u'eventboard.io' | 57 | author = u'EventMQ Contributors' |
| 57 | 58 | ||
| 58 | # The version info for the project you're documenting, acts as replacement for | 59 | # The version info for the project you're documenting, acts as replacement for |
| 59 | # |version| and |release|, also used in various other places throughout the | 60 | # |version| and |release|, also used in various other places throughout the |
| 60 | # built documents. | 61 | # built documents. |
| 61 | # | 62 | # |
| 62 | # The short X.Y version. | 63 | # The short X.Y version. |
| 63 | version = '0' | 64 | version = eventmq.__version__ |
| 64 | # The full version, including alpha/beta/rc tags. | 65 | # The full version, including alpha/beta/rc tags. |
| 65 | release = '0' | 66 | release = '0' |
| 66 | 67 | ||
diff --git a/docs/protocol.rst b/docs/protocol.rst index 4c2aff0..c341403 100644 --- a/docs/protocol.rst +++ b/docs/protocol.rst | |||
| @@ -71,7 +71,7 @@ FRAME Value Description | |||
| 71 | 1 eMQP/1.0 Protocol version | 71 | 1 eMQP/1.0 Protocol version |
| 72 | 2 REQUEST command | 72 | 2 REQUEST command |
| 73 | 3 _MSGID_ A unique id for the msg | 73 | 3 _MSGID_ A unique id for the msg |
| 74 | 4 _QUEUE_NAME_ the name of the queue the worker belongs to | 74 | 4 _QUEUE_NAME_ the name of the queue the request should be sent to |
| 75 | 5 _HEADERS_ dictionary of headers. can be an empty set | 75 | 5 _HEADERS_ dictionary of headers. can be an empty set |
| 76 | 6 _MSG_ The message to send | 76 | 6 _MSG_ The message to send |
| 77 | ====== ============== =========== | 77 | ====== ============== =========== |
| @@ -85,7 +85,7 @@ FRAME Value Description | |||
| 85 | 1 eMQP/1.0 Protocol version | 85 | 1 eMQP/1.0 Protocol version |
| 86 | 2 PUBLISH command | 86 | 2 PUBLISH command |
| 87 | 3 _MSGID_ A unique id for the msg | 87 | 3 _MSGID_ A unique id for the msg |
| 88 | 4 _TOPIC_NAME_ the name of the queue the worker belongs to | 88 | 4 _TOPIC_NAME_ the name of the topic this message should be published across |
| 89 | 5 _HEADERS_ csv list of headers | 89 | 5 _HEADERS_ csv list of headers |
| 90 | 6 _MSG_ The message to send | 90 | 6 _MSG_ The message to send |
| 91 | ====== ============== =========== | 91 | ====== ============== =========== |
| @@ -99,7 +99,7 @@ FRAME Value Description | |||
| 99 | 1 eMQP/1.0 Protocol version | 99 | 1 eMQP/1.0 Protocol version |
| 100 | 2 SCHEDULE command | 100 | 2 SCHEDULE command |
| 101 | 3 _MSGID_ A unique id for the msg | 101 | 3 _MSGID_ A unique id for the msg |
| 102 | 4 _TOPIC_NAME_ name of queue that the job should run in | 102 | 4 _QUEUE_NAME_ name of queue that the job should run in |
| 103 | 5 _HEADERS_ csv list of headers for this message | 103 | 5 _HEADERS_ csv list of headers for this message |
| 104 | 6 _MSG_ The message to send | 104 | 6 _MSG_ The message to send |
| 105 | ====== ============== =========== | 105 | ====== ============== =========== |
| @@ -113,7 +113,7 @@ FRAME Value Description | |||
| 113 | 1 eMQP/1.0 Protocol version | 113 | 1 eMQP/1.0 Protocol version |
| 114 | 2 UNSCHEDULE command | 114 | 2 UNSCHEDULE command |
| 115 | 3 _MSGID_ A unique id for the msg | 115 | 3 _MSGID_ A unique id for the msg |
| 116 | 4 _TOPIC_NAME_ ignored for this command, broadcasted to all queues | 116 | 4 _QUEUE_NAME_ ignored for this command, broadcasted to all queues |
| 117 | 5 _HEADERS_ csv list of headers for this message | 117 | 5 _HEADERS_ csv list of headers for this message |
| 118 | 6 _MSG_ The message to send | 118 | 6 _MSG_ The message to send |
| 119 | ====== ============== =========== | 119 | ====== ============== =========== |
| @@ -129,7 +129,7 @@ FRAME Value Description | |||
| 129 | 1 eMQP/1.0 Protocol version | 129 | 1 eMQP/1.0 Protocol version |
| 130 | 2 INFORM command | 130 | 2 INFORM command |
| 131 | 3 _MSGID_ A unique id for the msg | 131 | 3 _MSGID_ A unique id for the msg |
| 132 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to | 132 | 4 Queues. Unused for scheduler |
| 133 | 5 scheduler type of peer connecting | 133 | 5 scheduler type of peer connecting |
| 134 | ====== ============== =========== | 134 | ====== ============== =========== |
| 135 | 135 | ||
| @@ -144,7 +144,7 @@ FRAME Value Description | |||
| 144 | 1 eMQP/1.0 Protocol version | 144 | 1 eMQP/1.0 Protocol version |
| 145 | 2 INFORM command | 145 | 2 INFORM command |
| 146 | 3 _MSGID_ A unique id for the msg | 146 | 3 _MSGID_ A unique id for the msg |
| 147 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to. | 147 | 4 _QUEUES_ csv seperated arrays containing an int and a string for weight and name. e.g. [40, 'email'] |
| 148 | 5 worker type of peer connecting | 148 | 5 worker type of peer connecting |
| 149 | ====== ============== =========== | 149 | ====== ============== =========== |
| 150 | 150 | ||
| @@ -203,7 +203,6 @@ Heartbeating | |||
| 203 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. | 203 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. |
| 204 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. | 204 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. |
| 205 | * If the scheduler detects that the broker disconnects it SHOULD restart the conversation. | 205 | * If the scheduler detects that the broker disconnects it SHOULD restart the conversation. |
| 206 | * If the broker detects that a scheduler has disconnected it should ??????????. | ||
| 207 | 206 | ||
| 208 | REQUEST Headers | 207 | REQUEST Headers |
| 209 | --------------- | 208 | --------------- |
diff --git a/docs/settings_file.rst b/docs/settings_file.rst index 07d2338..b0bc36c 100644 --- a/docs/settings_file.rst +++ b/docs/settings_file.rst | |||
| @@ -15,16 +15,34 @@ Scheduler | |||
| 15 | Job Manager | 15 | Job Manager |
| 16 | *********** | 16 | *********** |
| 17 | 17 | ||
| 18 | concurrent_jobs | ||
| 19 | =============== | ||
| 20 | Default: 4 | ||
| 21 | |||
| 22 | This is the number of concurrent jobs the indiviudal job manager should execute | ||
| 23 | at a time. If you are using the multiprocess or threading model this number | ||
| 24 | becomes important as you will want to control the load on your server. If the | ||
| 25 | load equals the number of cores on the server, processes will begin waiting for | ||
| 26 | cpu cycles and things will begin to slow down. | ||
| 27 | |||
| 28 | A safe number to choose if your jobs block a lot would be (2 * cores). If your | ||
| 29 | jobs are cpu intensive you will want to set this number to the number of cores | ||
| 30 | you have or (cores - 1) to leave cycles for the os and other processes. This is | ||
| 31 | something that will have to be tuned based on the jobs that are | ||
| 32 | running. Grouping similar jobs in named queues will help you tune this number. | ||
| 33 | |||
| 18 | queues | 34 | queues |
| 19 | ====== | 35 | ====== |
| 20 | Default: default | 36 | Default: (10, default) |
| 21 | 37 | ||
| 22 | Comma seperated list of queues to process jobs for. Example: | 38 | Semi-colon seperated list of queues to process jobs for with thier |
| 23 | ``queues=high,med,low,default``. The philosophy taken for this list is each job | 39 | weights. Example: ``queues=(10, data_process); (15, email)``. With these |
| 24 | manager should have a single primary queue. This queue is the first in the list | 40 | weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers |
| 25 | (in the case of the example ``high`` is the primary queue). Subsequent queues | 41 | running jobs locally pretty efficiently. If you have a larger box with a weight |
| 26 | are queues that this job manager should help out with should jobs be backed up, | 42 | of 50 on q1 and 8 concurrent jobs and a smaller box with a weight 30 and 4 |
| 27 | and there are no primary queue jobs to take care of. | 43 | concurrent jobs, the q1 jobs will be sent to the large box until it is no longer |
| 44 | accepting jobs. At this point jobs will start to be sent to the next highest | ||
| 45 | number until the large box is ready to accept another q1 job. | ||
| 28 | 46 | ||
| 29 | .. note:: | 47 | .. note:: |
| 30 | 48 | ||
diff --git a/etc/eventmq.docker.conf b/etc/eventmq.docker.conf index a61364c..55fee92 100644 --- a/etc/eventmq.docker.conf +++ b/etc/eventmq.docker.conf | |||
| @@ -1,4 +1,10 @@ | |||
| 1 | [settings] | 1 | [global] |
| 2 | # Enable message output at different stages in the app. | ||
| 3 | super_debug = true | ||
| 4 | |||
| 5 | # Hide the heartbeat logs when super_debug is enabled. Showing them will generate a lot of messages. | ||
| 6 | hide_heartbeat_logs = True | ||
| 7 | |||
| 2 | frontend_addr=tcp://0.0.0.0:47291 | 8 | frontend_addr=tcp://0.0.0.0:47291 |
| 3 | backend_addr=tcp://0.0.0.0:47290 | 9 | backend_addr=tcp://0.0.0.0:47290 |
| 4 | worker_addr=tcp://eventmq:47290 | 10 | worker_addr=tcp://eventmq:47290 |
| @@ -9,5 +15,6 @@ scheduler_addr=tcp://eventmq:47291 | |||
| 9 | [scheduler] | 15 | [scheduler] |
| 10 | 16 | ||
| 11 | [jobmanager] | 17 | [jobmanager] |
| 12 | queues=one,two,three,default | 18 | worker_addr=tcp://127.0.0.1:47290 |
| 13 | worker_addr=tcp://127.0.0.1:47290 \ No newline at end of file | 19 | queues=[[50,"google"], [40,"pushes"], [10,"default"]] |
| 20 | concurrent_jobs=2 \ No newline at end of file | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index 68fe7de..0c4b7cc 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -1,13 +1,40 @@ | |||
| 1 | # SUPER_DEBUG basically enables more debugging logs. specifically of messages | 1 | # This file is part of eventmq. |
| 2 | # at different levels in the application | 2 | # |
| 3 | SUPER_DEBUG = True | 3 | # eventmq is free software: you can redistribute it and/or modify it under the |
| 4 | # Don't show HEARTBEAT message when debug logging is enabled | 4 | # terms of the GNU Lesser General Public License as published by the Free |
| 5 | # Software Foundation, either version 2.1 of the License, or (at your option) | ||
| 6 | # any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU Lesser General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU Lesser General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | """ | ||
| 16 | :mod:`conf` -- Settings Definitions | ||
| 17 | =================================== | ||
| 18 | """ | ||
| 19 | |||
| 20 | #: SUPER_DEBUG basically enables more debugging logs. Specifically the messages | ||
| 21 | #: at different levels in the application. | ||
| 22 | #: Default: False | ||
| 23 | SUPER_DEBUG = False | ||
| 24 | |||
| 25 | #: Don't show HEARTBEAT message when debug logging is enabled | ||
| 26 | #: Default: True | ||
| 5 | HIDE_HEARTBEAT_LOGS = True | 27 | HIDE_HEARTBEAT_LOGS = True |
| 6 | 28 | ||
| 7 | # When a queue name isn't specified use this queue name for the default. It | 29 | # When a queue name isn't specified use this queue name for the default. It |
| 8 | # would be a good idea to have a handful of workers listening on this queue | 30 | # would be a good idea to have a handful of workers listening on this queue |
| 9 | # unless you're positive that everything specifies a queue with workers. | 31 | # unless you're positive that everything specifies a queue with workers. |
| 10 | DEFAULT_QUEUE_NAME = 'default' | 32 | DEFAULT_QUEUE_NAME = 'default' |
| 33 | DEFAULT_QUEUE_WEIGHT = 10 | ||
| 34 | |||
| 35 | # Default queues for the Job Manager to listen on. The values here should match | ||
| 36 | # the values defined on the router. | ||
| 37 | QUEUES = [(DEFAULT_QUEUE_WEIGHT, DEFAULT_QUEUE_NAME), ] | ||
| 11 | 38 | ||
| 12 | # {{{Job Manager | 39 | # {{{Job Manager |
| 13 | # How long should we wait before retrying to connect to a broker? | 40 | # How long should we wait before retrying to connect to a broker? |
| @@ -30,13 +57,14 @@ BACKEND_ADDR = 'tcp://127.0.0.1:47290' | |||
| 30 | SCHEDULER_ADDR = 'tcp://127.0.0.1:47291' | 57 | SCHEDULER_ADDR = 'tcp://127.0.0.1:47291' |
| 31 | WORKER_ADDR = 'tcp://127.0.0.1:47290' | 58 | WORKER_ADDR = 'tcp://127.0.0.1:47290' |
| 32 | 59 | ||
| 60 | # How many jobs should the job manager concurrently handle? | ||
| 61 | CONCURRENT_JOBS = 4 | ||
| 62 | HWM = 10000 | ||
| 63 | |||
| 33 | # Redis settings | 64 | # Redis settings |
| 34 | RQ_HOST = 'localhost' | 65 | RQ_HOST = 'localhost' |
| 35 | RQ_PORT = 6379 | 66 | RQ_PORT = 6379 |
| 36 | RQ_DB = 0 | 67 | RQ_DB = 0 |
| 37 | RQ_PASSWORD = '' | 68 | RQ_PASSWORD = '' |
| 38 | WORKERS = 4 | ||
| 39 | HWM = 10000 | ||
| 40 | 69 | ||
| 41 | QUEUES = '{}'.format(DEFAULT_QUEUE_NAME) | ||
| 42 | # }}} | 70 | # }}} |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 60501fb..332b178 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -66,10 +66,11 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 66 | logger.info('Initializing JobManager {}...'.format(self.name)) | 66 | logger.info('Initializing JobManager {}...'.format(self.name)) |
| 67 | 67 | ||
| 68 | #: keep track of workers | 68 | #: keep track of workers |
| 69 | self.workers = Pool(processes=conf.WORKERS) | 69 | concurrent_jobs = kwargs.pop('concurrent_jobs', conf.CONCURRENT_JOBS) |
| 70 | self.workers = Pool(processes=concurrent_jobs) | ||
| 70 | 71 | ||
| 71 | #: List of queues that this job manager is listening on | 72 | #: List of queues that this job manager is listening on |
| 72 | self.queues = kwargs.pop('queues', None) | 73 | self.queues = kwargs.pop('queues', conf.QUEUES) |
| 73 | 74 | ||
| 74 | if not kwargs.pop('skip_signal', False): | 75 | if not kwargs.pop('skip_signal', False): |
| 75 | # handle any sighups by reloading config | 76 | # handle any sighups by reloading config |
| @@ -91,7 +92,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 91 | """ | 92 | """ |
| 92 | # Acknowledgment has come | 93 | # Acknowledgment has come |
| 93 | # Send a READY for each available worker | 94 | # Send a READY for each available worker |
| 94 | for i in range(0, conf.WORKERS): | 95 | for i in range(0, conf.CONCURRENT_JOBS): |
| 95 | self.send_ready() | 96 | self.send_ready() |
| 96 | 97 | ||
| 97 | while True: | 98 | while True: |
| @@ -173,19 +174,26 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 173 | import_settings(section='jobmanager') | 174 | import_settings(section='jobmanager') |
| 174 | self.start(addr=conf.WORKER_ADDR) | 175 | self.start(addr=conf.WORKER_ADDR) |
| 175 | 176 | ||
| 176 | def jobmanager_main(self): | 177 | def jobmanager_main(self, broker_addr=None): |
| 177 | """ | 178 | """ |
| 178 | Kick off jobmanager with logging and settings import | 179 | Kick off jobmanager with logging and settings import |
| 180 | |||
| 181 | Args: | ||
| 182 | broker_addr (str): The address of the broker to connect to. | ||
| 179 | """ | 183 | """ |
| 180 | setup_logger('') | 184 | setup_logger('') |
| 181 | import_settings() | 185 | import_settings() |
| 182 | import_settings(section='jobmanager') | 186 | import_settings(section='jobmanager') |
| 183 | 187 | ||
| 184 | # If this manager was passed explicit queues, favor those. | 188 | # If this manager was passed explicit options, favor those |
| 185 | if self.queues: | 189 | if self.queues: |
| 186 | conf.QUEUES = self.queues | 190 | conf.QUEUES = self.queues |
| 187 | 191 | ||
| 188 | self.start(addr=conf.WORKER_ADDR, queues=self.queues or conf.QUEUES) | 192 | if broker_addr: |
| 193 | conf.WORKER_ADDR = broker_addr | ||
| 194 | |||
| 195 | self.start(addr=conf.WORKER_ADDR, | ||
| 196 | queues=conf.QUEUES) | ||
| 189 | 197 | ||
| 190 | 198 | ||
| 191 | def jobmanager_main(): | 199 | def jobmanager_main(): |
diff --git a/eventmq/router.py b/eventmq/router.py index e91d74f..7aa0d9b 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -18,6 +18,7 @@ | |||
| 18 | Routes messages to workers (that are in named queues). | 18 | Routes messages to workers (that are in named queues). |
| 19 | """ | 19 | """ |
| 20 | from copy import copy | 20 | from copy import copy |
| 21 | import json # deserialize queues in on_inform. should be refactored | ||
| 21 | import logging | 22 | import logging |
| 22 | import signal | 23 | import signal |
| 23 | 24 | ||
| @@ -29,7 +30,7 @@ from .utils.messages import ( | |||
| 29 | fwd_emqp_router_message as fwdmsg, | 30 | fwd_emqp_router_message as fwdmsg, |
| 30 | parse_router_message | 31 | parse_router_message |
| 31 | ) | 32 | ) |
| 32 | from .utils import zero_index_cmp | 33 | from .utils import tuplify, zero_index_cmp |
| 33 | from .utils.settings import import_settings | 34 | from .utils.settings import import_settings |
| 34 | from .utils.devices import generate_device_name | 35 | from .utils.devices import generate_device_name |
| 35 | from .utils.timeutils import monotonic, timestamp | 36 | from .utils.timeutils import monotonic, timestamp |
| @@ -76,8 +77,8 @@ class Router(HeartbeatMixin): | |||
| 76 | #: here. | 77 | #: here. |
| 77 | #: | 78 | #: |
| 78 | #: **Keys** | 79 | #: **Keys** |
| 79 | #: * ``queues``: list() of queues the worker belongs to. The highest | 80 | #: * ``queues``: list() of queue names and prioritiess the worker |
| 80 | # priority queue should come first. | 81 | #: belongs to. e.g. (10, 'default') |
| 81 | #: * ``hb``: monotonic timestamp of the last received message from | 82 | #: * ``hb``: monotonic timestamp of the last received message from |
| 82 | #: worker | 83 | #: worker |
| 83 | #: * ``available_slots``: int count of jobs this manager can still | 84 | #: * ``available_slots``: int count of jobs this manager can still |
| @@ -246,10 +247,10 @@ class Router(HeartbeatMixin): | |||
| 246 | queue_names = msg[0] | 247 | queue_names = msg[0] |
| 247 | client_type = msg[1] | 248 | client_type = msg[1] |
| 248 | 249 | ||
| 249 | if not queue_names: | 250 | if not queue_names: # Ideally, this matches some workers |
| 250 | queues = ('default', ) | 251 | queues = [(conf.DEFAULT_QUEUE_WEIGHT, conf.DEFAULT_QUEUE_NAME), ] |
| 251 | else: | 252 | else: |
| 252 | queues = queue_names.split(',') | 253 | queues = list(map(tuplify, json.loads(queue_names))) |
| 253 | 254 | ||
| 254 | logger.info('Received INFORM request from {} (type: {})'.format( | 255 | logger.info('Received INFORM request from {} (type: {})'.format( |
| 255 | sender, client_type)) | 256 | sender, client_type)) |
| @@ -274,16 +275,17 @@ class Router(HeartbeatMixin): | |||
| 274 | msgid (str): Unique identifier for this message | 275 | msgid (str): Unique identifier for this message |
| 275 | msg: The actual message that was sent | 276 | msg: The actual message that was sent |
| 276 | """ | 277 | """ |
| 278 | queue_names = self.workers[sender]['queues'] | ||
| 279 | |||
| 277 | # if there are waiting messages for the queues this worker is a member | 280 | # if there are waiting messages for the queues this worker is a member |
| 278 | # of, then reply back with the oldest waiting message, otherwise just | 281 | # of, then reply back with the oldest waiting message, otherwise just |
| 279 | # add the worker to the list of available workers. | 282 | # add the worker to the list of available workers. |
| 280 | # Note: This is only taking into account the queue the worker is | 283 | # Note: This is only taking into account the queue the worker is |
| 281 | # returning from, and not other queue_names that might have had | 284 | # returning from, and not other queue_names that might have had |
| 282 | # messages waiting even longer. | 285 | # messages waiting even longer. |
| 283 | queue_names = self.workers[sender]['queues'] | ||
| 284 | |||
| 285 | # Assumes the highest priority queue comes first | 286 | # Assumes the highest priority queue comes first |
| 286 | for queue_name in queue_names: | 287 | for queue in queue_names: |
| 288 | queue_name = queue[1] | ||
| 287 | if queue_name in self.waiting_messages.keys(): | 289 | if queue_name in self.waiting_messages.keys(): |
| 288 | logger.debug('Found waiting message in the %s waiting_messages' | 290 | logger.debug('Found waiting message in the %s waiting_messages' |
| 289 | ' queue' % queue_name) | 291 | ' queue' % queue_name) |
| @@ -292,12 +294,13 @@ class Router(HeartbeatMixin): | |||
| 292 | fwdmsg(self.outgoing, sender, msg) | 294 | fwdmsg(self.outgoing, sender, msg) |
| 293 | 295 | ||
| 294 | # It is easier to check if a key exists rather than the len of | 296 | # It is easier to check if a key exists rather than the len of |
| 295 | # a key if it exists elsewhere, so if that was the last message | 297 | # a key's value if it exists elsewhere, so if that was the last |
| 296 | # remove the queue | 298 | # message remove the queue |
| 297 | if len(self.waiting_messages[queue_name]) == 0: | 299 | if len(self.waiting_messages[queue_name]) == 0: |
| 298 | logger.debug('No more messages in waiting_messages queue ' | 300 | logger.debug('No more messages in waiting_messages queue ' |
| 299 | '%s. Removing from list...' % queue_name) | 301 | '%s. Removing from list...' % queue_name) |
| 300 | del self.waiting_messages[queue_name] | 302 | del self.waiting_messages[queue_name] |
| 303 | |||
| 301 | # the message has been forwarded so short circuit that way the | 304 | # the message has been forwarded so short circuit that way the |
| 302 | # manager isn't reslotted | 305 | # manager isn't reslotted |
| 303 | return | 306 | return |
| @@ -392,17 +395,9 @@ class Router(HeartbeatMixin): | |||
| 392 | conf.HEARTBEAT_TIMEOUT)) | 395 | conf.HEARTBEAT_TIMEOUT)) |
| 393 | 396 | ||
| 394 | # Remove the worker from the actual queues | 397 | # Remove the worker from the actual queues |
| 395 | for i in range(0, len(self.workers[worker_id]['queues'])): | 398 | for queue in self.workers[worker_id]['queues']: |
| 396 | if i == 0: | ||
| 397 | priority = 10 | ||
| 398 | else: | ||
| 399 | priority = 0 | ||
| 400 | |||
| 401 | queue = self.workers[worker_id]['queues'][i] | ||
| 402 | |||
| 403 | try: | 399 | try: |
| 404 | self.queues[queue].remove((priority, worker_id)) | 400 | self.queues[queue[1]].remove((queue[0], worker_id)) |
| 405 | break | ||
| 406 | except KeyError: | 401 | except KeyError: |
| 407 | # This queue disappeared for some reason | 402 | # This queue disappeared for some reason |
| 408 | continue | 403 | continue |
| @@ -426,6 +421,10 @@ class Router(HeartbeatMixin): | |||
| 426 | raise TypeError('type of `queue` parameter not one of (list, ' | 421 | raise TypeError('type of `queue` parameter not one of (list, ' |
| 427 | 'tuple). got {}'.format(type(queues))) | 422 | 'tuple). got {}'.format(type(queues))) |
| 428 | 423 | ||
| 424 | if worker_id in self.workers: | ||
| 425 | logger.warning('Worker id already found in `workers`. Overwriting ' | ||
| 426 | 'data') | ||
| 427 | |||
| 429 | # Add the worker to our worker dict | 428 | # Add the worker to our worker dict |
| 430 | self.workers[worker_id] = {} | 429 | self.workers[worker_id] = {} |
| 431 | self.workers[worker_id]['queues'] = tuple(queues) | 430 | self.workers[worker_id]['queues'] = tuple(queues) |
| @@ -433,21 +432,15 @@ class Router(HeartbeatMixin): | |||
| 433 | self.workers[worker_id]['available_slots'] = 0 | 432 | self.workers[worker_id]['available_slots'] = 0 |
| 434 | 433 | ||
| 435 | # Define priorities. First element is the highest priority | 434 | # Define priorities. First element is the highest priority |
| 436 | for i in range(len(queues)): | 435 | for q in queues: |
| 437 | if i == 0: | 436 | if q[1] not in self.queues: |
| 438 | priority = 10 | 437 | self.queues[q[1]] = list() |
| 439 | else: | ||
| 440 | priority = 0 | ||
| 441 | 438 | ||
| 442 | if queues[i] not in self.queues: | 439 | self.queues[q[1]].append((q[0], worker_id)) |
| 443 | self.queues[queues[i]] = EMQdeque() | 440 | self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]]) |
| 444 | 441 | ||
| 445 | self.queues[queues[i]].append((priority, worker_id)) | ||
| 446 | |||
| 447 | self.queues[queues[i]] = \ | ||
| 448 | self.prioritize_queue_list(self.queues[queues[i]]) | ||
| 449 | logger.debug('Added worker {} to the queues {}'.format( | 442 | logger.debug('Added worker {} to the queues {}'.format( |
| 450 | worker_id, queues)) | 443 | worker_id, queues)) |
| 451 | 444 | ||
| 452 | def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): | 445 | def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): |
| 453 | """ | 446 | """ |
| @@ -480,15 +473,19 @@ class Router(HeartbeatMixin): | |||
| 480 | # pop the next job manager id & check if it has a worker slot | 473 | # pop the next job manager id & check if it has a worker slot |
| 481 | # if it doesn't add it to popped_workers to be added back to | 474 | # if it doesn't add it to popped_workers to be added back to |
| 482 | # self.queues after the loop | 475 | # self.queues after the loop |
| 483 | worker = self.queues[queue_name].popleft() | 476 | worker = self.queues[queue_name].pop(0) |
| 477 | |||
| 484 | # LRU when sorted later by appending | 478 | # LRU when sorted later by appending |
| 485 | popped_workers.append(worker) | 479 | popped_workers.append(worker) |
| 480 | |||
| 486 | if self.workers[worker[1]]['available_slots'] > 0: | 481 | if self.workers[worker[1]]['available_slots'] > 0: |
| 487 | worker_addr = worker[1] | 482 | worker_addr = worker[1] |
| 488 | break | 483 | break |
| 484 | |||
| 489 | except KeyError: | 485 | except KeyError: |
| 490 | # This should only happen if worker[1] is missing: | 486 | # This should only happen if worker[1] is missing 1 from |
| 491 | # - available slots is pre set to 0 self.add_worker | 487 | # self.workers because: |
| 488 | # - available slots initialized to 0 self.add_worker() | ||
| 492 | # - we already checked that self.queues[queue_name] exists | 489 | # - we already checked that self.queues[queue_name] exists |
| 493 | logger.error("Worker {} not found for queue {}".format( | 490 | logger.error("Worker {} not found for queue {}".format( |
| 494 | worker, queue_name)) | 491 | worker, queue_name)) |
| @@ -497,14 +494,16 @@ class Router(HeartbeatMixin): | |||
| 497 | worker, queue_name | 494 | worker, queue_name |
| 498 | )) | 495 | )) |
| 499 | continue | 496 | continue |
| 497 | |||
| 500 | except IndexError: | 498 | except IndexError: |
| 501 | # worker[1] should exist if it follows the (priority, id) fmt | 499 | # worker[1] should exist if it follows the (priority, id) fmt |
| 502 | logger.error("Invalid worker format in self.queues {}".format( | 500 | logger.error("Invalid priority/worker format in self.queues " |
| 503 | worker | 501 | "{}".format(worker)) |
| 504 | )) | ||
| 505 | continue | 502 | continue |
| 503 | else: | ||
| 504 | # No more queues to try | ||
| 505 | pass | ||
| 506 | 506 | ||
| 507 | # Should always evaluate to true | ||
| 508 | if popped_workers: | 507 | if popped_workers: |
| 509 | self.queues[queue_name].extend(popped_workers) | 508 | self.queues[queue_name].extend(popped_workers) |
| 510 | self.queues[queue_name] = self.prioritize_queue_list( | 509 | self.queues[queue_name] = self.prioritize_queue_list( |
| @@ -553,12 +552,6 @@ class Router(HeartbeatMixin): | |||
| 553 | """ | 552 | """ |
| 554 | self.workers[worker_id]['available_slots'] += 1 | 553 | self.workers[worker_id]['available_slots'] += 1 |
| 555 | 554 | ||
| 556 | def queue_message(self, msg): | ||
| 557 | """ | ||
| 558 | Add a message to the queue for processing later | ||
| 559 | """ | ||
| 560 | raise NotImplementedError() | ||
| 561 | |||
| 562 | def process_client_message(self, original_msg, depth=0): | 555 | def process_client_message(self, original_msg, depth=0): |
| 563 | """ | 556 | """ |
| 564 | Args: | 557 | Args: |
| @@ -691,13 +684,9 @@ class Router(HeartbeatMixin): | |||
| 691 | IndexError - There was no 0-index element. | 684 | IndexError - There was no 0-index element. |
| 692 | 685 | ||
| 693 | Returns: | 686 | Returns: |
| 694 | sorted :class:`EMQdeque` with largest priorites being indexed | 687 | decsending order list. E.g. ((20, 'a'), (14, 'b'), (12, 'c')) |
| 695 | smaller. E.g. ((20,'a' ), (14, 'b'), ('12', c)) | ||
| 696 | """ | 688 | """ |
| 697 | return EMQdeque( | 689 | return sorted(unprioritized_iterable, cmp=zero_index_cmp, reverse=True) |
| 698 | initial=sorted(unprioritized_iterable, | ||
| 699 | cmp=zero_index_cmp, | ||
| 700 | reverse=True)) | ||
| 701 | 690 | ||
| 702 | def sighup_handler(self, signum, frame): | 691 | def sighup_handler(self, signum, frame): |
| 703 | """ | 692 | """ |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index 37014cb..6d3c663 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -23,8 +23,6 @@ ADDR = 'inproc://pour_the_rice_in_the_thing' | |||
| 23 | 23 | ||
| 24 | 24 | ||
| 25 | class TestCase(unittest.TestCase): | 25 | class TestCase(unittest.TestCase): |
| 26 | jm = None | ||
| 27 | |||
| 28 | def test__setup(self): | 26 | def test__setup(self): |
| 29 | jm = jobmanager.JobManager(name='RuckusBringer') | 27 | jm = jobmanager.JobManager(name='RuckusBringer') |
| 30 | self.assertEqual(jm.name, 'RuckusBringer') | 28 | self.assertEqual(jm.name, 'RuckusBringer') |
| @@ -62,8 +60,8 @@ class TestCase(unittest.TestCase): | |||
| 62 | 60 | ||
| 63 | jm._start_event_loop() | 61 | jm._start_event_loop() |
| 64 | 62 | ||
| 65 | # send conf.WORKERS ready messages | 63 | # send int(conf.CONCURRENT_JOBS) ready messages |
| 66 | self.assertEqual(conf.WORKERS, send_ready_mock.call_count) | 64 | self.assertEqual(conf.CONCURRENT_JOBS, send_ready_mock.call_count) |
| 67 | 65 | ||
| 68 | process_msg_mock.assert_called_with( | 66 | process_msg_mock.assert_called_with( |
| 69 | sender_mock.return_value) | 67 | sender_mock.return_value) |
| @@ -86,6 +84,7 @@ class TestCase(unittest.TestCase): | |||
| 86 | callback=jm.worker_done, | 84 | callback=jm.worker_done, |
| 87 | func=run_mock) | 85 | func=run_mock) |
| 88 | 86 | ||
| 87 | # Other Tests | ||
| 89 | @mock.patch('eventmq.jobmanager.JobManager.start') | 88 | @mock.patch('eventmq.jobmanager.JobManager.start') |
| 90 | @mock.patch('eventmq.jobmanager.import_settings') | 89 | @mock.patch('eventmq.jobmanager.import_settings') |
| 91 | @mock.patch('eventmq.jobmanager.Sender.rebuild') | 90 | @mock.patch('eventmq.jobmanager.Sender.rebuild') |
| @@ -111,7 +110,7 @@ class TestCase(unittest.TestCase): | |||
| 111 | @mock.patch('eventmq.jobmanager.import_settings') | 110 | @mock.patch('eventmq.jobmanager.import_settings') |
| 112 | @mock.patch('eventmq.jobmanager.setup_logger') | 111 | @mock.patch('eventmq.jobmanager.setup_logger') |
| 113 | def test_jobmanager_main(self, setup_logger_mock, import_settings_mock, | 112 | def test_jobmanager_main(self, setup_logger_mock, import_settings_mock, |
| 114 | start_mock): | 113 | start_mock): |
| 115 | jm = jobmanager.JobManager() | 114 | jm = jobmanager.JobManager() |
| 116 | 115 | ||
| 117 | jm.jobmanager_main() | 116 | jm.jobmanager_main() |
| @@ -125,7 +124,7 @@ class TestCase(unittest.TestCase): | |||
| 125 | start_mock.assert_called_with(addr=conf.WORKER_ADDR, | 124 | start_mock.assert_called_with(addr=conf.WORKER_ADDR, |
| 126 | queues=conf.QUEUES) | 125 | queues=conf.QUEUES) |
| 127 | 126 | ||
| 128 | jm.queues = ('derp', 'blurp') | 127 | jm.queues = ((10, 'derp'), (0, 'blurp')) |
| 129 | jm.jobmanager_main() | 128 | jm.jobmanager_main() |
| 130 | 129 | ||
| 131 | start_mock.assert_called_with(addr=conf.WORKER_ADDR, | 130 | start_mock.assert_called_with(addr=conf.WORKER_ADDR, |
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index 6349159..82fe0b5 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py | |||
| @@ -44,33 +44,77 @@ class TestCase(unittest.TestCase): | |||
| 44 | 44 | ||
| 45 | @mock.patch('eventmq.router.Router.send_ack') | 45 | @mock.patch('eventmq.router.Router.send_ack') |
| 46 | @mock.patch('eventmq.router.Router.add_worker') | 46 | @mock.patch('eventmq.router.Router.add_worker') |
| 47 | def test_on_inform_worker_defaut_queue(self, add_worker_mock, | 47 | def test_on_inform_worker(self, add_worker_mock, send_ack_mock): |
| 48 | send_ack_mock): | 48 | sender_id = 'omgsenderid19' |
| 49 | queues = '[[32, "top"], [23, "drop"], [12, "shop"]]' | ||
| 50 | inform_msgid = 'msg31' | ||
| 51 | |||
| 52 | self.router.on_inform( | ||
| 53 | sender_id, inform_msgid, [queues, 'worker']) | ||
| 54 | |||
| 55 | self.router.send_ack.assert_called_with( | ||
| 56 | self.router.outgoing, sender_id, inform_msgid) | ||
| 57 | |||
| 58 | self.router.add_worker.assert_called_with( | ||
| 59 | sender_id, [(32, 'top'), (23, 'drop'), (12, 'shop')]) | ||
| 60 | |||
| 61 | @mock.patch('eventmq.router.Router.send_ack') | ||
| 62 | @mock.patch('eventmq.router.Router.add_worker') | ||
| 63 | def test_on_inform_worker_default_queue(self, add_worker_mock, | ||
| 64 | send_ack_mock): | ||
| 65 | # Test on_inform when no queue is specified | ||
| 49 | sender_id = 'omgsender18' | 66 | sender_id = 'omgsender18' |
| 50 | queues = '' | 67 | queues = '' |
| 51 | inform_msgid = 'msg29' | 68 | inform_msgid = 'msg29' |
| 52 | 69 | ||
| 53 | self.router.on_inform(sender_id, inform_msgid, | 70 | self.router.on_inform( |
| 54 | [queues, 'worker']) | 71 | sender_id, inform_msgid, [queues, constants.CLIENT_TYPE.worker]) |
| 55 | 72 | ||
| 56 | self.router.send_ack.assert_called_with(self.router.outgoing, | 73 | self.router.send_ack.assert_called_with( |
| 57 | sender_id, inform_msgid) | 74 | self.router.outgoing, sender_id, inform_msgid) |
| 58 | self.router.add_worker.assert_called_with(sender_id, ('default',)) | 75 | self.router.add_worker.assert_called_with( |
| 76 | sender_id, [(10, 'default'), ]) | ||
| 59 | 77 | ||
| 60 | @mock.patch('eventmq.router.Router.send_ack') | 78 | # @mock.patch('eventmq.router.Router.prioritize_queue_list') |
| 61 | @mock.patch('eventmq.router.Router.add_worker') | 79 | def test_add_worker(self): |
| 62 | def test_on_inform_worker(self, add_worker_mock, send_ack_mock): | 80 | worker1_id = 'w1' |
| 63 | sender_id = 'omgsenderid19' | 81 | worker2_id = 'w2' |
| 64 | queues = 'top,drop,shop' | ||
| 65 | inform_msgid = 'msg31' | ||
| 66 | 82 | ||
| 67 | self.router.on_inform(sender_id, inform_msgid, | 83 | queues1 = [(10, 'top'), (9, 'drop'), (8, 'shop')] |
| 68 | [queues, 'worker']) | 84 | queues2 = [(10, 'default'), (9, 'shop'), (8, 'top')] |
| 69 | 85 | ||
| 70 | self.router.send_ack.assert_called_with(self.router.outgoing, | 86 | self.router.add_worker(worker1_id, queues=queues1) |
| 71 | sender_id, inform_msgid) | 87 | self.router.add_worker(worker2_id, queues=queues2) |
| 72 | self.router.add_worker.assert_called_with(sender_id, | 88 | # added to the list of workers |
| 73 | queues.split(',')) | 89 | self.assertIn(worker1_id, self.router.workers) |
| 90 | self.assertIn(worker2_id, self.router.workers) | ||
| 91 | self.assertGreater(self.router.workers[worker1_id]['hb'], 0) | ||
| 92 | # no slots yet | ||
| 93 | self.assertEqual(self.router.workers[worker1_id]['available_slots'], 0) | ||
| 94 | |||
| 95 | # aware of the queues | ||
| 96 | self.assertEqual(3, len(self.router.workers[worker1_id]['queues'])) | ||
| 97 | self.assertIn((10, 'top'), self.router.workers[worker1_id]['queues']) | ||
| 98 | self.assertIn((9, 'drop'), self.router.workers[worker1_id]['queues']) | ||
| 99 | self.assertIn((8, 'shop'), self.router.workers[worker1_id]['queues']) | ||
| 100 | |||
| 101 | # Worker2 | ||
| 102 | self.assertIn((10, 'default'), | ||
| 103 | self.router.workers[worker2_id]['queues']) | ||
| 104 | self.assertIn((9, 'shop'), self.router.workers[worker2_id]['queues']) | ||
| 105 | self.assertIn((8, 'top'), self.router.workers[worker2_id]['queues']) | ||
| 106 | |||
| 107 | self.assertIn((10, worker1_id), self.router.queues['top']) | ||
| 108 | self.assertIn((9, worker1_id), self.router.queues['drop']) | ||
| 109 | self.assertIn((8, worker1_id), self.router.queues['shop']) | ||
| 110 | |||
| 111 | self.assertIn((10, worker2_id), self.router.queues['default']) | ||
| 112 | self.assertIn((9, worker2_id), self.router.queues['shop']) | ||
| 113 | self.assertIn((8, worker2_id), self.router.queues['top']) | ||
| 114 | |||
| 115 | def test_add_worker_invalid_queues(self): | ||
| 116 | with self.assertRaises(TypeError): | ||
| 117 | self.router.add_worker('83902', 8902) | ||
| 74 | 118 | ||
| 75 | @mock.patch('eventmq.utils.messages.generate_msgid') | 119 | @mock.patch('eventmq.utils.messages.generate_msgid') |
| 76 | def test_send_ack(self, generate_msgid_mock): | 120 | def test_send_ack(self, generate_msgid_mock): |
| @@ -115,12 +159,12 @@ class TestCase(unittest.TestCase): | |||
| 115 | self.assertEqual(self.router._meta['last_sent_heartbeat'], 0) | 159 | self.assertEqual(self.router._meta['last_sent_heartbeat'], 0) |
| 116 | self.router.workers = { | 160 | self.router.workers = { |
| 117 | 'w1': { | 161 | 'w1': { |
| 118 | 'queues': ['default', ], | 162 | 'queues': [(10, 'default'), ], |
| 119 | 'hb': 123.2, | 163 | 'hb': 123.2, |
| 120 | 'available_slots': 3, | 164 | 'available_slots': 3, |
| 121 | }, | 165 | }, |
| 122 | 'w2': { | 166 | 'w2': { |
| 123 | 'queues': ['default', ], | 167 | 'queues': [(10, 'not-default'), ], |
| 124 | 'hb': 123.2, | 168 | 'hb': 123.2, |
| 125 | 'available_slots': 2, | 169 | 'available_slots': 2, |
| 126 | } | 170 | } |
| @@ -131,10 +175,27 @@ class TestCase(unittest.TestCase): | |||
| 131 | # is very hard to mock) | 175 | # is very hard to mock) |
| 132 | self.assertGreater(self.router._meta['last_sent_heartbeat'], 0) | 176 | self.assertGreater(self.router._meta['last_sent_heartbeat'], 0) |
| 133 | 177 | ||
| 134 | self.router.send_heartbeat.assert_has_calls( | 178 | send_heartbeat_mock.assert_has_calls( |
| 135 | [mock.call(self.router.outgoing, 'w1'), | 179 | [mock.call(self.router.outgoing, 'w1'), |
| 136 | mock.call(self.router.outgoing, 'w2')], any_order=True) | 180 | mock.call(self.router.outgoing, 'w2')], any_order=True) |
| 137 | 181 | ||
| 182 | @mock.patch('eventmq.router.Router.send_heartbeat') | ||
| 183 | def test_send_schedulers_heartbeats(self, send_hb_mock): | ||
| 184 | scheduler_id = 's39' | ||
| 185 | self.assertEqual(self.router._meta['last_sent_scheduler_heartbeat'], 0) | ||
| 186 | |||
| 187 | self.router.schedulers = { | ||
| 188 | scheduler_id: { | ||
| 189 | 'hb': 0, | ||
| 190 | } | ||
| 191 | } | ||
| 192 | |||
| 193 | self.router.send_schedulers_heartbeats() | ||
| 194 | |||
| 195 | self.assertGreater( | ||
| 196 | self.router._meta['last_sent_scheduler_heartbeat'], 0) | ||
| 197 | send_hb_mock.assert_called_with(self.router.incoming, scheduler_id) | ||
| 198 | |||
| 138 | def test_on_disconnect(self): | 199 | def test_on_disconnect(self): |
| 139 | self.assertFalse(self.router.received_disconnect) | 200 | self.assertFalse(self.router.received_disconnect) |
| 140 | self.router.on_disconnect('msg1', 'derp') | 201 | self.router.on_disconnect('msg1', 'derp') |
| @@ -152,14 +213,14 @@ class TestCase(unittest.TestCase): | |||
| 152 | 213 | ||
| 153 | self.router.workers = { | 214 | self.router.workers = { |
| 154 | worker_id: { | 215 | worker_id: { |
| 155 | 'queues': ['default', ], | 216 | 'queues': [(10, 'default'), ], |
| 156 | 'hb': 123.2, | 217 | 'hb': 123.2, |
| 157 | 'available_slots': 3, | 218 | 'available_slots': 3, |
| 158 | }, | 219 | }, |
| 159 | } | 220 | } |
| 160 | 221 | ||
| 161 | self.router.waiting_messages['default'] = EMQdeque( | 222 | self.router.waiting_messages['default'] = EMQdeque( |
| 162 | initial=[waiting_msg]) | 223 | initial=[waiting_msg, ]) |
| 163 | 224 | ||
| 164 | self.router.on_ready(worker_id, msgid, msg) | 225 | self.router.on_ready(worker_id, msgid, msg) |
| 165 | 226 | ||
| @@ -171,8 +232,10 @@ class TestCase(unittest.TestCase): | |||
| 171 | 232 | ||
| 172 | @mock.patch('eventmq.router.fwdmsg') | 233 | @mock.patch('eventmq.router.fwdmsg') |
| 173 | @mock.patch('eventmq.router.Router.requeue_worker') | 234 | @mock.patch('eventmq.router.Router.requeue_worker') |
| 174 | def test_on_ready_prioritized_queue(self, requeue_worker_mock, | 235 | def test_on_ready_multpile_queues(self, requeue_worker_mock, |
| 175 | fwdmsg_mock): | 236 | fwdmsg_mock): |
| 237 | # Test that if messages are waiting on multiple queues, they are | ||
| 238 | # dispatched immediatly after a READY message. | ||
| 176 | worker1_id = 'w1' | 239 | worker1_id = 'w1' |
| 177 | worker2_id = 'w2' | 240 | worker2_id = 'w2' |
| 178 | 241 | ||
| @@ -188,53 +251,61 @@ class TestCase(unittest.TestCase): | |||
| 188 | 251 | ||
| 189 | self.router.workers = { | 252 | self.router.workers = { |
| 190 | worker1_id: { | 253 | worker1_id: { |
| 191 | 'queues': ['kun', 'blu'], | 254 | 'queues': [(10, 'kun'), (0, 'blu')], |
| 192 | 'hb': 123.2, | 255 | 'hb': 123.2, |
| 193 | 'available_slots': 0, | 256 | 'available_slots': 0, |
| 194 | }, | 257 | }, |
| 195 | worker2_id: { | 258 | worker2_id: { |
| 196 | 'queues': ['blu', 'kun'], | 259 | 'queues': [(10, 'blu'), (0, 'kun')], |
| 197 | 'hb': 123.2, | 260 | 'hb': 123.2, |
| 198 | 'available_slots': 0 | 261 | 'available_slots': 0 |
| 199 | } | 262 | } |
| 200 | } | 263 | } |
| 201 | 264 | ||
| 202 | self.router.queues = { | 265 | self.router.queues = { |
| 203 | 'kun': EMQdeque(initial=[(10, worker1_id), (0, worker2_id)]), | 266 | 'kun': [(10, worker1_id), (0, worker2_id)], |
| 204 | 'blu': EMQdeque(initial=[(10, worker2_id), (0, worker1_id)]) | 267 | 'blu': [(10, worker2_id), (0, worker1_id)], |
| 205 | |||
| 206 | } | 268 | } |
| 207 | 269 | ||
| 208 | self.router.waiting_messages = { | 270 | self.router.waiting_messages = { |
| 209 | 'kun': EMQdeque(initial=[waiting_msg1, waiting_msg2]), | 271 | 'kun': EMQdeque(initial=[waiting_msg1, waiting_msg2]), |
| 210 | 'blu': EMQdeque(initial=[waiting_msg3]) | 272 | 'blu': EMQdeque(initial=[waiting_msg3, ]), |
| 211 | } | 273 | } |
| 212 | 274 | ||
| 275 | # Forward waiting_msg1 | ||
| 213 | ready_msgid1 = 'ready23' | 276 | ready_msgid1 = 'ready23' |
| 214 | self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) | 277 | self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) |
| 215 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker1_id, | 278 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker1_id, |
| 216 | waiting_msg1) | 279 | waiting_msg1) |
| 217 | 280 | ||
| 218 | ready_msgid2 = 'ready19' | 281 | # Forward waiting_msg3 -- blu is a higher priority for worker2 |
| 219 | self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) | 282 | ready_msgid3 = 'ready19' |
| 283 | self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3]) | ||
| 220 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, | 284 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, |
| 221 | waiting_msg3) | 285 | waiting_msg3) |
| 222 | 286 | ||
| 223 | ready_msgid3 = 'ready5' | 287 | # Forward waiting_msg2 |
| 224 | self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3]) | 288 | ready_msgid2 = 'ready5' |
| 289 | self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) | ||
| 225 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, | 290 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, |
| 226 | waiting_msg2) | 291 | waiting_msg2) |
| 227 | 292 | ||
| 293 | # There should be no keys because the code checks for their existence | ||
| 294 | # to know if there is a waiting message | ||
| 295 | self.assertEqual(0, len(self.router.waiting_messages.keys())) | ||
| 296 | |||
| 297 | # No waiting messages | ||
| 228 | self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) | 298 | self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) |
| 229 | requeue_worker_mock.assert_called_with(worker1_id) | 299 | requeue_worker_mock.assert_called_with(worker1_id) |
| 230 | self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) | 300 | self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) |
| 231 | requeue_worker_mock.assert_called_with(worker2_id) | 301 | requeue_worker_mock.assert_called_with(worker2_id) |
| 232 | 302 | ||
| 303 | @mock.patch('eventmq.router.Router.clean_up_dead_workers') | ||
| 233 | @mock.patch('eventmq.router.Router.process_client_message') | 304 | @mock.patch('eventmq.router.Router.process_client_message') |
| 234 | @mock.patch('eventmq.router.Router.get_available_worker') | 305 | @mock.patch('eventmq.router.Router.get_available_worker') |
| 235 | @mock.patch('eventmq.router.fwdmsg') | 306 | @mock.patch('eventmq.router.fwdmsg') |
| 236 | def test_on_request(self, fwdmsg_mock, get_worker_mock, | 307 | def test_on_request(self, fwdmsg_mock, get_worker_mock, |
| 237 | process_client_msg_mock): | 308 | process_client_msg_mock, cleanupworkers_mock): |
| 238 | client_id = 'c1' | 309 | client_id = 'c1' |
| 239 | msgid = 'msg18' | 310 | msgid = 'msg18' |
| 240 | queue = 'default' | 311 | queue = 'default' |
| @@ -245,13 +316,13 @@ class TestCase(unittest.TestCase): | |||
| 245 | 316 | ||
| 246 | self.router.workers = { | 317 | self.router.workers = { |
| 247 | worker_id: { | 318 | worker_id: { |
| 248 | 'queues': EMQdeque(initial=(queue,)), | 319 | 'queues': [(10, queue)], |
| 249 | 'hb': 2903.34, | 320 | 'hb': 2903.34, |
| 250 | 'available_slots': 1, | 321 | 'available_slots': 1, |
| 251 | } | 322 | } |
| 252 | } | 323 | } |
| 253 | self.router.queues = { | 324 | self.router.queues = { |
| 254 | 'default': EMQdeque(initial=((10, worker_id))) | 325 | queue: [(10, worker_id),] |
| 255 | } | 326 | } |
| 256 | 327 | ||
| 257 | # Router accepts job for 1 available slot | 328 | # Router accepts job for 1 available slot |
| @@ -268,7 +339,7 @@ class TestCase(unittest.TestCase): | |||
| 268 | get_worker_mock.side_effect = raise_no_workers | 339 | get_worker_mock.side_effect = raise_no_workers |
| 269 | self.router.on_request(client_id, msgid, msg) | 340 | self.router.on_request(client_id, msgid, msg) |
| 270 | 341 | ||
| 271 | self.assertIn(msg[0], self.router.waiting_messages) | 342 | self.assertIn(queue, self.router.waiting_messages) |
| 272 | self.assertEqual(list(self.router.waiting_messages[queue])[0], | 343 | self.assertEqual(list(self.router.waiting_messages[queue])[0], |
| 273 | ['', constants.PROTOCOL_VERSION, 'REQUEST', | 344 | ['', constants.PROTOCOL_VERSION, 'REQUEST', |
| 274 | msgid] + msg) | 345 | msgid] + msg) |
| @@ -286,14 +357,85 @@ class TestCase(unittest.TestCase): | |||
| 286 | [client_id, '', constants.PROTOCOL_VERSION, 'REQUEST', msgid]+msg, | 357 | [client_id, '', constants.PROTOCOL_VERSION, 'REQUEST', msgid]+msg, |
| 287 | depth=2) | 358 | depth=2) |
| 288 | 359 | ||
| 289 | def test_cleanup_dead_workers(self): | 360 | def test_get_available_worker(self): |
| 361 | worker2_id = 'w2' | ||
| 362 | worker3_id = 'w3' | ||
| 363 | |||
| 364 | queue1_id = 'default' | ||
| 365 | queue2_id = 'jimjam' | ||
| 366 | |||
| 367 | self.router.queues = { | ||
| 368 | queue1_id: [(10, worker3_id), (0, worker2_id)], | ||
| 369 | queue2_id: [(10, worker2_id)], | ||
| 370 | } | ||
| 371 | |||
| 372 | self.router.workers = { | ||
| 373 | worker2_id: { | ||
| 374 | 'queues': [(10, queue2_id), (0, queue1_id)], | ||
| 375 | 'available_slots': 1, | ||
| 376 | }, | ||
| 377 | worker3_id: { | ||
| 378 | 'queues': [(10, queue1_id), ], | ||
| 379 | 'available_slots': 1, | ||
| 380 | }, | ||
| 381 | } | ||
| 382 | |||
| 383 | # Get the next available worker for queue2 | ||
| 384 | check1 = self.router.get_available_worker(queue_name=queue2_id) | ||
| 385 | self.assertEqual(worker2_id, check1) | ||
| 386 | |||
| 387 | # Get the next available worker for queue1 | ||
| 388 | check2 = self.router.get_available_worker(queue_name=queue1_id) | ||
| 389 | self.assertEqual(worker3_id, check2) | ||
| 390 | |||
| 391 | # Pretend worker 3 is doing something | ||
| 392 | self.router.workers[worker3_id]['available_slots'] = 0 | ||
| 393 | |||
| 394 | # Get the next available worker for queue1 | ||
| 395 | check3 = self.router.get_available_worker(queue_name=queue1_id) | ||
| 396 | self.assertEqual(worker2_id, check3) | ||
| 397 | |||
| 398 | def test_get_available_worker_dont_decrement_slots(self): | ||
| 399 | # Once upon a time get_available_worker() decremented the available | ||
| 400 | # slots counter and the townsfolk greived | ||
| 401 | queue1_id = 'q1' | ||
| 402 | worker1_id = 'w1' | ||
| 403 | |||
| 404 | self.router.queues = { | ||
| 405 | queue1_id: [(10, worker1_id, ), ] | ||
| 406 | } | ||
| 407 | |||
| 408 | self.router.workers = { | ||
| 409 | worker1_id: { | ||
| 410 | 'queues': [(10, queue1_id), ], | ||
| 411 | 'available_slots': 1, | ||
| 412 | } | ||
| 413 | } | ||
| 414 | |||
| 415 | self.router.get_available_worker(queue_name=queue1_id) | ||
| 416 | |||
| 417 | self.assertEqual(self.router.workers[worker1_id]['available_slots'], 1) | ||
| 418 | |||
| 419 | def test_requeue_worker(self): | ||
| 420 | worker_id = 'w1' | ||
| 421 | |||
| 422 | self.router.workers = { | ||
| 423 | worker_id: { | ||
| 424 | 'available_slots': 1 | ||
| 425 | } | ||
| 426 | } | ||
| 427 | |||
| 428 | self.router.requeue_worker(worker_id) | ||
| 429 | self.assertEqual(self.router.workers[worker_id]['available_slots'], 2) | ||
| 430 | |||
| 431 | def test_clean_up_dead_workers(self): | ||
| 290 | worker1_id = 'w1' | 432 | worker1_id = 'w1' |
| 291 | worker2_id = 'w2' | 433 | worker2_id = 'w2' |
| 292 | worker3_id = 'w3' | 434 | worker3_id = 'w3' |
| 293 | 435 | ||
| 294 | queue1_id = 'default' | 436 | queue1_id = 'default' |
| 295 | queue2_id = 'jimjam' | 437 | queue2_id = 'jimjam' |
| 296 | nonexistent_queue1 = 'pig' | 438 | nonexistent_queue1 = 'nonexistent' |
| 297 | 439 | ||
| 298 | # To ensure the value was changed later because monotonic() is hard to | 440 | # To ensure the value was changed later because monotonic() is hard to |
| 299 | # mock | 441 | # mock |
| @@ -307,21 +449,21 @@ class TestCase(unittest.TestCase): | |||
| 307 | } | 449 | } |
| 308 | 450 | ||
| 309 | self.router.workers = { | 451 | self.router.workers = { |
| 310 | # 1 second away from timeout | 452 | # 3 in the future |
| 311 | worker1_id: { | 453 | worker1_id: { |
| 312 | 'queues': (queue1_id,), | 454 | 'queues': [(10, queue1_id), ], |
| 313 | 'hb': monotonic() - conf.HEARTBEAT_TIMEOUT + 1, | 455 | 'hb': monotonic() + 3, |
| 314 | 'available_slots': 0, | 456 | 'available_slots': 0, |
| 315 | }, | 457 | }, |
| 316 | # below the timeout | 458 | # below the timeout |
| 317 | worker2_id: { | 459 | worker2_id: { |
| 318 | 'queues': (queue2_id, queue1_id), | 460 | 'queues': [(10, queue2_id), (0, queue1_id)], |
| 319 | 'hb': 0, | 461 | 'hb': 0, |
| 320 | 'available_slots': 2, | 462 | 'available_slots': 2, |
| 321 | }, | 463 | }, |
| 322 | # below the timeout and a queue missing from self.router.queues | 464 | # below the timeout and a queue missing from self.router.queues |
| 323 | worker3_id: { | 465 | worker3_id: { |
| 324 | 'queues': (queue2_id, nonexistent_queue1), | 466 | 'queues': [(10, queue2_id), (3, nonexistent_queue1)], |
| 325 | 'hb': 0, | 467 | 'hb': 0, |
| 326 | 'available_slots': 0, | 468 | 'available_slots': 0, |
| 327 | }, | 469 | }, |
| @@ -337,74 +479,6 @@ class TestCase(unittest.TestCase): | |||
| 337 | self.assertNotIn(queue2_id, self.router.queues) | 479 | self.assertNotIn(queue2_id, self.router.queues) |
| 338 | self.assertNotIn(nonexistent_queue1, self.router.queues) | 480 | self.assertNotIn(nonexistent_queue1, self.router.queues) |
| 339 | 481 | ||
| 340 | # @mock.patch('eventmq.router.Router.prioritize_queue_list') | ||
| 341 | def test_add_worker(self): | ||
| 342 | worker1_id = 'w1' | ||
| 343 | queues = ('top', 'drop', 'shop') | ||
| 344 | |||
| 345 | self.router.add_worker(worker1_id, queues=queues) | ||
| 346 | # added to the list of workers | ||
| 347 | self.assertIn(worker1_id, self.router.workers) | ||
| 348 | # got an inital heartbeat | ||
| 349 | self.assertGreater(self.router.workers[worker1_id]['hb'], 0) | ||
| 350 | # no slots yet | ||
| 351 | self.assertEqual(self.router.workers[worker1_id]['available_slots'], 0) | ||
| 352 | # aware of the queues | ||
| 353 | self.assertEqual(3, len(self.router.workers[worker1_id]['queues'])) | ||
| 354 | self.assertIn((10, worker1_id), list(self.router.queues['top'])) | ||
| 355 | self.assertIn((0, worker1_id), list(self.router.queues['drop'])) | ||
| 356 | self.assertIn((0, worker1_id), list(self.router.queues['shop'])) | ||
| 357 | |||
| 358 | def test_get_available_worker(self): | ||
| 359 | worker2_id = 'w2' | ||
| 360 | worker3_id = 'w3' | ||
| 361 | |||
| 362 | queue1_id = 'default' | ||
| 363 | queue2_id = 'jimjam' | ||
| 364 | |||
| 365 | self.router.queues = { | ||
| 366 | queue1_id: EMQdeque(initial=[(10, worker3_id), (0, worker2_id)]), | ||
| 367 | queue2_id: EMQdeque(initial=[(10, worker2_id)]), | ||
| 368 | } | ||
| 369 | |||
| 370 | self.router.workers = { | ||
| 371 | worker2_id: { | ||
| 372 | 'queues': (queue2_id, queue1_id), | ||
| 373 | 'available_slots': 1, | ||
| 374 | }, | ||
| 375 | worker3_id: { | ||
| 376 | 'queues': (queue1_id,), | ||
| 377 | 'available_slots': 1, | ||
| 378 | }, | ||
| 379 | } | ||
| 380 | |||
| 381 | # worker1 has no available slots. | ||
| 382 | check1 = self.router.get_available_worker(queue_name=queue2_id) | ||
| 383 | self.assertEqual(worker2_id, check1) | ||
| 384 | self.assertEqual(self.router.workers[worker2_id]['available_slots'], 1) | ||
| 385 | |||
| 386 | check2 = self.router.get_available_worker(queue_name=queue1_id) | ||
| 387 | self.assertEqual(worker3_id, check2) | ||
| 388 | self.assertEqual(self.router.workers[worker3_id]['available_slots'], 1) | ||
| 389 | |||
| 390 | self.router.workers[worker3_id]['available_slots'] = 0 | ||
| 391 | |||
| 392 | check3 = self.router.get_available_worker(queue_name=queue1_id) | ||
| 393 | self.assertEqual(worker2_id, check3) | ||
| 394 | self.assertEqual(self.router.workers[worker2_id]['available_slots'], 1) | ||
| 395 | |||
| 396 | def test_requeue_worker(self): | ||
| 397 | worker_id = 'w1' | ||
| 398 | |||
| 399 | self.router.workers = { | ||
| 400 | worker_id: { | ||
| 401 | 'available_slots': 1 | ||
| 402 | } | ||
| 403 | } | ||
| 404 | |||
| 405 | self.router.requeue_worker(worker_id) | ||
| 406 | self.assertEqual(self.router.workers[worker_id]['available_slots'], 2) | ||
| 407 | |||
| 408 | @mock.patch('eventmq.router.Router.on_inform') | 482 | @mock.patch('eventmq.router.Router.on_inform') |
| 409 | @mock.patch('eventmq.router.Router.on_request') | 483 | @mock.patch('eventmq.router.Router.on_request') |
| 410 | @mock.patch('eventmq.router.parse_router_message') | 484 | @mock.patch('eventmq.router.parse_router_message') |
| @@ -457,37 +531,35 @@ class TestCase(unittest.TestCase): | |||
| 457 | on_inform_mock.assert_called_with(sender_id, msgid, msg) | 531 | on_inform_mock.assert_called_with(sender_id, msgid, msg) |
| 458 | 532 | ||
| 459 | def test_prioritize_queue_list(self): | 533 | def test_prioritize_queue_list(self): |
| 460 | queue = EMQdeque(initial=[(0, 'd'), (10, 'b'), (0, 'e'), (10, 'a'), | 534 | queue = [(0, 'd'), (10, 'b'), (0, 'e'), (10, 'a'), (0, 'c')] |
| 461 | (0, 'c')]) | ||
| 462 | 535 | ||
| 463 | sorted1 = self.router.prioritize_queue_list(queue) | 536 | sorted1 = self.router.prioritize_queue_list(queue) |
| 464 | self.assertEqual([(10, 'b'), (10, 'a'), (0, 'd'), (0, 'e'), | 537 | self.assertEqual([(10, 'b'), (10, 'a'), (0, 'd'), (0, 'e'), |
| 465 | (0, 'c')], list(sorted1)) | 538 | (0, 'c')], sorted1) |
| 466 | 539 | ||
| 467 | pop1 = sorted1.popleft() | 540 | pop1 = sorted1.pop(0) |
| 468 | self.assertEqual(pop1, (10, 'b')) | 541 | self.assertEqual(pop1, (10, 'b')) |
| 469 | sorted1.append(pop1) | 542 | sorted1.append(pop1) |
| 470 | # a, b, d, e, c | 543 | # a, b, d, e, c |
| 471 | sorted2 = self.router.prioritize_queue_list(sorted1) | 544 | sorted2 = self.router.prioritize_queue_list(sorted1) |
| 472 | self.assertEqual([(10, 'a'), (10, 'b'), (0, 'd'), (0, 'e'), (0, 'c')], | 545 | self.assertEqual([(10, 'a'), (10, 'b'), (0, 'd'), (0, 'e'), (0, 'c')], |
| 473 | list(sorted2)) | 546 | sorted2) |
| 474 | pop2 = sorted2.popleft() # a | 547 | pop2 = sorted2.pop(0) # a |
| 475 | pop3 = sorted2.popleft() # b | 548 | pop3 = sorted2.pop(0) # b |
| 476 | pop4 = sorted2.popleft() # d | 549 | pop4 = sorted2.pop(0) # d |
| 477 | self.assertEqual(pop4, (0, 'd')) | 550 | self.assertEqual(pop4, (0, 'd')) |
| 478 | self.assertEqual(pop2, (10, 'a')) | 551 | self.assertEqual(pop2, (10, 'a')) |
| 479 | self.assertEqual(pop3, (10, 'b')) | 552 | self.assertEqual(pop3, (10, 'b')) |
| 480 | self.assertEqual([(0, 'e'), (0, 'c')], list(sorted2)) | 553 | self.assertEqual([(0, 'e'), (0, 'c')], list(sorted2)) |
| 481 | 554 | ||
| 482 | sorted2.appendleft(pop2) | 555 | sorted2.append(pop2) |
| 483 | sorted2.appendleft(pop4) | 556 | sorted2.append(pop4) |
| 484 | sorted2.appendleft(pop3) | 557 | sorted2.append(pop3) |
| 485 | 558 | ||
| 486 | # a, b, d, e, c | ||
| 487 | sorted3 = self.router.prioritize_queue_list(sorted2) | 559 | sorted3 = self.router.prioritize_queue_list(sorted2) |
| 488 | 560 | ||
| 489 | self.assertEqual(sorted3.popleft(), (10, 'b')) | 561 | self.assertEqual(sorted3.pop(0), (10, 'a')) |
| 490 | self.assertEqual(sorted3.popleft(), (10, 'a')) | 562 | self.assertEqual(sorted3.pop(0), (10, 'b')) |
| 491 | self.assertEqual(sorted3.popleft(), (0, 'd')) | 563 | self.assertEqual(sorted3.pop(0), (0, 'e')) |
| 492 | self.assertEqual(sorted3.popleft(), (0, 'e')) | 564 | self.assertEqual(sorted3.pop(0), (0, 'c')) |
| 493 | self.assertEqual(sorted3.popleft(), (0, 'c')) | 565 | self.assertEqual(sorted3.pop(0), (0, 'd')) |
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index ff3c238..f4669ea 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py | |||
| @@ -18,6 +18,7 @@ import unittest | |||
| 18 | 18 | ||
| 19 | import mock | 19 | import mock |
| 20 | 20 | ||
| 21 | from .. import constants | ||
| 21 | from .. import exceptions | 22 | from .. import exceptions |
| 22 | from ..utils import messages | 23 | from ..utils import messages |
| 23 | from ..utils import classes | 24 | from ..utils import classes |
| @@ -95,6 +96,75 @@ class SettingsTestCase(unittest.TestCase): | |||
| 95 | self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') | 96 | self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') |
| 96 | 97 | ||
| 97 | 98 | ||
| 99 | class EMQPServiceTestCase(unittest.TestCase): | ||
| 100 | |||
| 101 | # pretend to be an emq socket | ||
| 102 | outgoing = 'some-outgoing-socket' | ||
| 103 | |||
| 104 | def get_worker(self): | ||
| 105 | """return an EMQPService mimicking a worker""" | ||
| 106 | obj = classes.EMQPService() | ||
| 107 | obj.SERVICE_TYPE = constants.CLIENT_TYPE.worker | ||
| 108 | obj.outgoing = self.outgoing | ||
| 109 | obj._meta = { | ||
| 110 | 'last_sent_heartbeat': 0 | ||
| 111 | } | ||
| 112 | |||
| 113 | return obj | ||
| 114 | |||
| 115 | @mock.patch('eventmq.utils.classes.sendmsg') | ||
| 116 | def test_send_inform_return_msgid(self, sendmsg_mock): | ||
| 117 | obj = self.get_worker() | ||
| 118 | sendmsg_mock.return_value = 'some-msgid' | ||
| 119 | |||
| 120 | retval = obj.send_inform(queues=[(10, 'default'), ]) | ||
| 121 | |||
| 122 | self.assertEqual(retval, sendmsg_mock.return_value) | ||
| 123 | |||
| 124 | @mock.patch('eventmq.utils.classes.sendmsg') | ||
| 125 | def test_send_inform_single_weightless_queue(self, sendmsg_mock): | ||
| 126 | # Test that the inform message is backward compatible with a change | ||
| 127 | # in v0.2.0 | ||
| 128 | obj = self.get_worker() | ||
| 129 | |||
| 130 | obj.send_inform(queues='derpfault') | ||
| 131 | |||
| 132 | sendmsg_mock.assert_called_with( | ||
| 133 | 'some-outgoing-socket', 'INFORM', | ||
| 134 | ['derpfault', constants.CLIENT_TYPE.worker] | ||
| 135 | ) | ||
| 136 | |||
| 137 | @mock.patch('eventmq.utils.classes.sendmsg') | ||
| 138 | def test_send_inform_empty_queue_name(self, sendmsg_mock): | ||
| 139 | obj = self.get_worker() | ||
| 140 | |||
| 141 | obj.send_inform() | ||
| 142 | |||
| 143 | sendmsg_mock.assert_called_with( | ||
| 144 | 'some-outgoing-socket', 'INFORM', | ||
| 145 | ['', constants.CLIENT_TYPE.worker]) | ||
| 146 | |||
| 147 | @mock.patch('eventmq.utils.classes.sendmsg') | ||
| 148 | def test_send_inform_specified_valid_queues(self, sendmsg_mock): | ||
| 149 | obj = self.get_worker() | ||
| 150 | |||
| 151 | obj.send_inform(queues=([10, 'push'], [7, 'email'], | ||
| 152 | [3, 'default'])) | ||
| 153 | sendmsg_mock.asert_called_with( | ||
| 154 | 'some-outgoing-socket', 'INFORM', | ||
| 155 | ["[10, 'push'],[7, 'email'],[3, 'default]", | ||
| 156 | constants.CLIENT_TYPE.worker] | ||
| 157 | ) | ||
| 158 | |||
| 159 | @mock.patch('eventmq.utils.classes.sendmsg') | ||
| 160 | def test_send_inform_update_last_sent_heartbeat(self, sendmsg_mock): | ||
| 161 | obj = self.get_worker() | ||
| 162 | |||
| 163 | obj.send_inform(queues=(['', constants.CLIENT_TYPE.worker])) | ||
| 164 | |||
| 165 | self.assertGreater(obj._meta['last_sent_heartbeat'], 0) | ||
| 166 | |||
| 167 | |||
| 98 | class TestCase(unittest.TestCase): | 168 | class TestCase(unittest.TestCase): |
| 99 | def test_generate_msgid(self): | 169 | def test_generate_msgid(self): |
| 100 | msgid = messages.generate_msgid() | 170 | msgid = messages.generate_msgid() |
diff --git a/eventmq/utils/__init__.py b/eventmq/utils/__init__.py index 5973c93..2a047a5 100644 --- a/eventmq/utils/__init__.py +++ b/eventmq/utils/__init__.py | |||
| @@ -46,3 +46,15 @@ def zero_index_cmp(a, b): | |||
| 46 | when sorting the values in :attr:`router.Router.queues`. | 46 | when sorting the values in :attr:`router.Router.queues`. |
| 47 | """ | 47 | """ |
| 48 | return cmp(a[0], b[0]) | 48 | return cmp(a[0], b[0]) |
| 49 | |||
| 50 | |||
| 51 | def tuplify(v): | ||
| 52 | """ | ||
| 53 | Recursively convert lists to tuples. | ||
| 54 | |||
| 55 | Args: | ||
| 56 | v (object): any value of interest | ||
| 57 | """ | ||
| 58 | if isinstance(v, list): | ||
| 59 | return tuple(map(tuplify, v)) | ||
| 60 | return v | ||
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index a0f9740..0e54f43 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -53,14 +53,20 @@ class EMQPService(object): | |||
| 53 | 53 | ||
| 54 | See the code for :class:`Scheduler` and :class:`JobManager` for examples. | 54 | See the code for :class:`Scheduler` and :class:`JobManager` for examples. |
| 55 | """ | 55 | """ |
| 56 | def send_inform(self, queue=None): | 56 | def send_inform(self, queues=()): |
| 57 | """ | 57 | """ |
| 58 | Queues an INFORM command to `self.outgoing`. | 58 | Notify the router that this job manager is online and and ready for |
| 59 | work. This includes a list of queues the router should forward messages | ||
| 60 | for. | ||
| 59 | 61 | ||
| 60 | Args: | 62 | Args: |
| 61 | type_ (str): Either 'worker' or 'scheduler' | 63 | type_ (str): Either 'worker' or 'scheduler' |
| 62 | queue (list): | 64 | queues (list): |
| 63 | - For 'worker' type, the queues the worker is listening on | 65 | - For 'worker' type, the queues the worker is listening on and |
| 66 | their weights. | ||
| 67 | |||
| 68 | Example: | ||
| 69 | ([10, 'default'], [15, 'push_notifications']) | ||
| 64 | - Ignored for 'scheduler' type | 70 | - Ignored for 'scheduler' type |
| 65 | 71 | ||
| 66 | Raises: | 72 | Raises: |
| @@ -68,6 +74,11 @@ class EMQPService(object): | |||
| 68 | 74 | ||
| 69 | Returns: | 75 | Returns: |
| 70 | str: ID of the message | 76 | str: ID of the message |
| 77 | |||
| 78 | .. note:: | ||
| 79 | |||
| 80 | Passing a single string for queues is supported for backward | ||
| 81 | compatibility and not recommended for new apps. | ||
| 71 | """ | 82 | """ |
| 72 | valid_types = (constants.CLIENT_TYPE.worker, | 83 | valid_types = (constants.CLIENT_TYPE.worker, |
| 73 | constants.CLIENT_TYPE.scheduler) | 84 | constants.CLIENT_TYPE.scheduler) |
| @@ -75,8 +86,15 @@ class EMQPService(object): | |||
| 75 | if self.SERVICE_TYPE not in valid_types: | 86 | if self.SERVICE_TYPE not in valid_types: |
| 76 | raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE, | 87 | raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE, |
| 77 | valid_types)) | 88 | valid_types)) |
| 89 | |||
| 90 | if isinstance(queues, (list, tuple)): | ||
| 91 | stringified_queues = '' | ||
| 92 | for pair in queues: | ||
| 93 | stringified_queues += '{},'.format(str(pair)) | ||
| 94 | queues = stringified_queues[:-1] # strip off the last comma | ||
| 95 | |||
| 78 | msgid = sendmsg(self.outgoing, 'INFORM', [ | 96 | msgid = sendmsg(self.outgoing, 'INFORM', [ |
| 79 | queue or conf.DEFAULT_QUEUE_NAME, | 97 | queues, |
| 80 | self.SERVICE_TYPE | 98 | self.SERVICE_TYPE |
| 81 | ]) | 99 | ]) |
| 82 | 100 | ||
diff --git a/eventmq/utils/settings.py b/eventmq/utils/settings.py index 0621aaf..1c2671e 100644 --- a/eventmq/utils/settings.py +++ b/eventmq/utils/settings.py | |||
| @@ -21,6 +21,7 @@ import json | |||
| 21 | import logging | 21 | import logging |
| 22 | import os | 22 | import os |
| 23 | 23 | ||
| 24 | from . import tuplify | ||
| 24 | from .. import conf | 25 | from .. import conf |
| 25 | 26 | ||
| 26 | 27 | ||
| @@ -71,15 +72,3 @@ def import_settings(section='global'): | |||
| 71 | else: | 72 | else: |
| 72 | logger.warning('Config file at {} not found. Continuing with ' | 73 | logger.warning('Config file at {} not found. Continuing with ' |
| 73 | 'defaults.'.format(conf.CONFIG_FILE)) | 74 | 'defaults.'.format(conf.CONFIG_FILE)) |
| 74 | |||
| 75 | |||
| 76 | def tuplify(v): | ||
| 77 | """ | ||
| 78 | Recursively convert lists to tuples. | ||
| 79 | |||
| 80 | Args: | ||
| 81 | v (object): any value of interest | ||
| 82 | """ | ||
| 83 | if isinstance(v, list): | ||
| 84 | return tuple(map(tuplify, v)) | ||
| 85 | return v | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 0e931ff..23c9dfd 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -20,7 +20,7 @@ Defines different short-lived workers that execute jobs | |||
| 20 | from importlib import import_module | 20 | from importlib import import_module |
| 21 | import logging | 21 | import logging |
| 22 | 22 | ||
| 23 | logger = logging.getLogger('*') | 23 | logger = logging.getLogger(__name__) |
| 24 | 24 | ||
| 25 | 25 | ||
| 26 | def run(payload): | 26 | def run(payload): |