aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2016-05-24 16:49:54 -0600
committerjason2016-05-24 16:49:54 -0600
commita592ba06ea019bbb0051ffe5fc006d98560d038a (patch)
tree588590a6423a19c59aa2cac9f57b7025f4f5f44b
parent2c599d965493b3658ffadc0c3e9e1b5a209cf1ba (diff)
downloadeventmq-a592ba06ea019bbb0051ffe5fc006d98560d038a.tar.gz
eventmq-a592ba06ea019bbb0051ffe5fc006d98560d038a.zip
More work for named queues
- Rename `WORKERS` setting to `CONCURRENT_JOBS` for more clarity. Added this setting to the command line options, the ini .conf & default settings conf.py files. - Added support for JSON style arrays in INI config. - Added support for weighted named queues. The style for the setting is [[weight, "name"], [weight, "name"]]. Configured in both the INI and command line for job manager. Added documentation. - Updated the spec for the INFORM message. Weights are sent with the queue names. If there are no weights specified they will be given the default value of 0. - updated Router.queues to a list from a deque so that it can be sorted by priority more easily.
-rwxr-xr-xbin/emq-jobmanager18
-rw-r--r--docs/conf.py7
-rw-r--r--docs/protocol.rst13
-rw-r--r--docs/settings_file.rst34
-rw-r--r--etc/eventmq.docker.conf13
-rw-r--r--eventmq/conf.py42
-rw-r--r--eventmq/jobmanager.py20
-rw-r--r--eventmq/router.py93
-rw-r--r--eventmq/tests/test_jobmanager.py11
-rw-r--r--eventmq/tests/test_router.py336
-rw-r--r--eventmq/tests/test_utils.py70
-rw-r--r--eventmq/utils/__init__.py12
-rw-r--r--eventmq/utils/classes.py28
-rw-r--r--eventmq/utils/settings.py13
-rw-r--r--eventmq/worker.py2
15 files changed, 466 insertions, 246 deletions
diff --git a/bin/emq-jobmanager b/bin/emq-jobmanager
index 82c2e9b..4071ada 100755
--- a/bin/emq-jobmanager
+++ b/bin/emq-jobmanager
@@ -21,12 +21,19 @@ from eventmq.jobmanager import JobManager
21from eventmq import conf 21from eventmq import conf
22 22
23if __name__ == "__main__": 23if __name__ == "__main__":
24 parser = argparse.ArgumentParser(description='Listen for requests') 24 parser = argparse.ArgumentParser(description='Listen for job requests and '
25 parser.add_argument('--config', '-c', type=str, nargs='?', 25 'manage their execution')
26 parser.add_argument('--broker-addr', '-B', type=str, nargs='?',
27 help='manually specify the broker address to connect '
28 'to in order to receive jobs')
29 parser.add_argument('--config', '-C', type=str, nargs='?',
26 help='manually specify the location of eventmq.conf') 30 help='manually specify the location of eventmq.conf')
27 parser.add_argument('--queues', '-Q', type=str, nargs='+', 31 parser.add_argument('--queues', '-Q', type=str, nargs='+',
28 help='space separated list of queue names to listen ' 32 help='space separated list of queue names to listen '
29 'on') 33 'on')
34 parser.add_argument('--jobs', '-J', type=int, nargs='?',
35 help='the max number of concurrent jobs to manage at '
36 'a time')
30 37
31 args = parser.parse_args() 38 args = parser.parse_args()
32 39
@@ -39,5 +46,8 @@ if __name__ == "__main__":
39 if queues: 46 if queues:
40 queues = ','.join(queues) 47 queues = ','.join(queues)
41 48
42 j = JobManager(queues=queues) 49 broker_addr = args.broker_addr
43 j.jobmanager_main() 50 concurrent_jobs = args.jobs
51
52 j = JobManager(queues=queues, concurrent_jobs=concurrent_jobs)
53 j.jobmanager_main(broker_addr=broker_addr)
diff --git a/docs/conf.py b/docs/conf.py
index 864bbcf..aa23956 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -16,6 +16,7 @@ import sys
16import os 16import os
17import shlex 17import shlex
18 18
19import eventmq # for __version__
19# If extensions (or modules to document with autodoc) are in another directory, 20# If extensions (or modules to document with autodoc) are in another directory,
20# add these directories to sys.path here. If the directory is relative to the 21# add these directories to sys.path here. If the directory is relative to the
21# documentation root, use os.path.abspath to make it absolute, like shown here. 22# documentation root, use os.path.abspath to make it absolute, like shown here.
@@ -52,15 +53,15 @@ master_doc = 'index'
52 53
53# General information about the project. 54# General information about the project.
54project = u'EventMQ' 55project = u'EventMQ'
55copyright = u'2015, eventboard.io' 56copyright = u'2016, eventboard.io'
56author = u'eventboard.io' 57author = u'EventMQ Contributors'
57 58
58# The version info for the project you're documenting, acts as replacement for 59# The version info for the project you're documenting, acts as replacement for
59# |version| and |release|, also used in various other places throughout the 60# |version| and |release|, also used in various other places throughout the
60# built documents. 61# built documents.
61# 62#
62# The short X.Y version. 63# The short X.Y version.
63version = '0' 64version = eventmq.__version__
64# The full version, including alpha/beta/rc tags. 65# The full version, including alpha/beta/rc tags.
65release = '0' 66release = '0'
66 67
diff --git a/docs/protocol.rst b/docs/protocol.rst
index 4c2aff0..c341403 100644
--- a/docs/protocol.rst
+++ b/docs/protocol.rst
@@ -71,7 +71,7 @@ FRAME Value Description
711 eMQP/1.0 Protocol version 711 eMQP/1.0 Protocol version
722 REQUEST command 722 REQUEST command
733 _MSGID_ A unique id for the msg 733 _MSGID_ A unique id for the msg
744 _QUEUE_NAME_ the name of the queue the worker belongs to 744 _QUEUE_NAME_ the name of the queue the request should be sent to
755 _HEADERS_ dictionary of headers. can be an empty set 755 _HEADERS_ dictionary of headers. can be an empty set
766 _MSG_ The message to send 766 _MSG_ The message to send
77====== ============== =========== 77====== ============== ===========
@@ -85,7 +85,7 @@ FRAME Value Description
851 eMQP/1.0 Protocol version 851 eMQP/1.0 Protocol version
862 PUBLISH command 862 PUBLISH command
873 _MSGID_ A unique id for the msg 873 _MSGID_ A unique id for the msg
884 _TOPIC_NAME_ the name of the queue the worker belongs to 884 _TOPIC_NAME_ the name of the topic this message should be published across
895 _HEADERS_ csv list of headers 895 _HEADERS_ csv list of headers
906 _MSG_ The message to send 906 _MSG_ The message to send
91====== ============== =========== 91====== ============== ===========
@@ -99,7 +99,7 @@ FRAME Value Description
991 eMQP/1.0 Protocol version 991 eMQP/1.0 Protocol version
1002 SCHEDULE command 1002 SCHEDULE command
1013 _MSGID_ A unique id for the msg 1013 _MSGID_ A unique id for the msg
1024 _TOPIC_NAME_ name of queue that the job should run in 1024 _QUEUE_NAME_ name of queue that the job should run in
1035 _HEADERS_ csv list of headers for this message 1035 _HEADERS_ csv list of headers for this message
1046 _MSG_ The message to send 1046 _MSG_ The message to send
105====== ============== =========== 105====== ============== ===========
@@ -113,7 +113,7 @@ FRAME Value Description
1131 eMQP/1.0 Protocol version 1131 eMQP/1.0 Protocol version
1142 UNSCHEDULE command 1142 UNSCHEDULE command
1153 _MSGID_ A unique id for the msg 1153 _MSGID_ A unique id for the msg
1164 _TOPIC_NAME_ ignored for this command, broadcasted to all queues 1164 _QUEUE_NAME_ ignored for this command, broadcasted to all queues
1175 _HEADERS_ csv list of headers for this message 1175 _HEADERS_ csv list of headers for this message
1186 _MSG_ The message to send 1186 _MSG_ The message to send
119====== ============== =========== 119====== ============== ===========
@@ -129,7 +129,7 @@ FRAME Value Description
1291 eMQP/1.0 Protocol version 1291 eMQP/1.0 Protocol version
1302 INFORM command 1302 INFORM command
1313 _MSGID_ A unique id for the msg 1313 _MSGID_ A unique id for the msg
1324 _QUEUE_NAME_ csv seperated names of queue the worker belongs to 1324 Queues. Unused for scheduler
1335 scheduler type of peer connecting 1335 scheduler type of peer connecting
134====== ============== =========== 134====== ============== ===========
135 135
@@ -144,7 +144,7 @@ FRAME Value Description
1441 eMQP/1.0 Protocol version 1441 eMQP/1.0 Protocol version
1452 INFORM command 1452 INFORM command
1463 _MSGID_ A unique id for the msg 1463 _MSGID_ A unique id for the msg
1474 _QUEUE_NAME_ csv seperated names of queue the worker belongs to. 1474 _QUEUES_ csv seperated arrays containing an int and a string for weight and name. e.g. [40, 'email']
1485 worker type of peer connecting 1485 worker type of peer connecting
149====== ============== =========== 149====== ============== ===========
150 150
@@ -203,7 +203,6 @@ Heartbeating
203 * If the worker detects that the broker disconnected it SHOULD restart the conversation. 203 * If the worker detects that the broker disconnected it SHOULD restart the conversation.
204 * If the broker detects that a worker has disconnected it should stop sending it a message of any type. 204 * If the broker detects that a worker has disconnected it should stop sending it a message of any type.
205 * If the scheduler detects that the broker disconnects it SHOULD restart the conversation. 205 * If the scheduler detects that the broker disconnects it SHOULD restart the conversation.
206 * If the broker detects that a scheduler has disconnected it should ??????????.
207 206
208REQUEST Headers 207REQUEST Headers
209--------------- 208---------------
diff --git a/docs/settings_file.rst b/docs/settings_file.rst
index 07d2338..b0bc36c 100644
--- a/docs/settings_file.rst
+++ b/docs/settings_file.rst
@@ -15,16 +15,34 @@ Scheduler
15Job Manager 15Job Manager
16*********** 16***********
17 17
18concurrent_jobs
19===============
20Default: 4
21
22This is the number of concurrent jobs the indiviudal job manager should execute
23at a time. If you are using the multiprocess or threading model this number
24becomes important as you will want to control the load on your server. If the
25load equals the number of cores on the server, processes will begin waiting for
26cpu cycles and things will begin to slow down.
27
28A safe number to choose if your jobs block a lot would be (2 * cores). If your
29jobs are cpu intensive you will want to set this number to the number of cores
30you have or (cores - 1) to leave cycles for the os and other processes. This is
31something that will have to be tuned based on the jobs that are
32running. Grouping similar jobs in named queues will help you tune this number.
33
18queues 34queues
19====== 35======
20Default: default 36Default: (10, default)
21 37
22Comma seperated list of queues to process jobs for. Example: 38Semi-colon seperated list of queues to process jobs for with thier
23``queues=high,med,low,default``. The philosophy taken for this list is each job 39weights. Example: ``queues=(10, data_process); (15, email)``. With these
24manager should have a single primary queue. This queue is the first in the list 40weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers
25(in the case of the example ``high`` is the primary queue). Subsequent queues 41running jobs locally pretty efficiently. If you have a larger box with a weight
26are queues that this job manager should help out with should jobs be backed up, 42of 50 on q1 and 8 concurrent jobs and a smaller box with a weight 30 and 4
27and there are no primary queue jobs to take care of. 43concurrent jobs, the q1 jobs will be sent to the large box until it is no longer
44accepting jobs. At this point jobs will start to be sent to the next highest
45number until the large box is ready to accept another q1 job.
28 46
29.. note:: 47.. note::
30 48
diff --git a/etc/eventmq.docker.conf b/etc/eventmq.docker.conf
index a61364c..55fee92 100644
--- a/etc/eventmq.docker.conf
+++ b/etc/eventmq.docker.conf
@@ -1,4 +1,10 @@
1[settings] 1[global]
2# Enable message output at different stages in the app.
3super_debug = true
4
5# Hide the heartbeat logs when super_debug is enabled. Showing them will generate a lot of messages.
6hide_heartbeat_logs = True
7
2frontend_addr=tcp://0.0.0.0:47291 8frontend_addr=tcp://0.0.0.0:47291
3backend_addr=tcp://0.0.0.0:47290 9backend_addr=tcp://0.0.0.0:47290
4worker_addr=tcp://eventmq:47290 10worker_addr=tcp://eventmq:47290
@@ -9,5 +15,6 @@ scheduler_addr=tcp://eventmq:47291
9[scheduler] 15[scheduler]
10 16
11[jobmanager] 17[jobmanager]
12queues=one,two,three,default 18worker_addr=tcp://127.0.0.1:47290
13worker_addr=tcp://127.0.0.1:47290 \ No newline at end of file 19queues=[[50,"google"], [40,"pushes"], [10,"default"]]
20concurrent_jobs=2 \ No newline at end of file
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 68fe7de..0c4b7cc 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -1,13 +1,40 @@
1# SUPER_DEBUG basically enables more debugging logs. specifically of messages 1# This file is part of eventmq.
2# at different levels in the application 2#
3SUPER_DEBUG = True 3# eventmq is free software: you can redistribute it and/or modify it under the
4# Don't show HEARTBEAT message when debug logging is enabled 4# terms of the GNU Lesser General Public License as published by the Free
5# Software Foundation, either version 2.1 of the License, or (at your option)
6# any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU Lesser General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15"""
16:mod:`conf` -- Settings Definitions
17===================================
18"""
19
20#: SUPER_DEBUG basically enables more debugging logs. Specifically the messages
21#: at different levels in the application.
22#: Default: False
23SUPER_DEBUG = False
24
25#: Don't show HEARTBEAT message when debug logging is enabled
26#: Default: True
5HIDE_HEARTBEAT_LOGS = True 27HIDE_HEARTBEAT_LOGS = True
6 28
7# When a queue name isn't specified use this queue name for the default. It 29# When a queue name isn't specified use this queue name for the default. It
8# would be a good idea to have a handful of workers listening on this queue 30# would be a good idea to have a handful of workers listening on this queue
9# unless you're positive that everything specifies a queue with workers. 31# unless you're positive that everything specifies a queue with workers.
10DEFAULT_QUEUE_NAME = 'default' 32DEFAULT_QUEUE_NAME = 'default'
33DEFAULT_QUEUE_WEIGHT = 10
34
35# Default queues for the Job Manager to listen on. The values here should match
36# the values defined on the router.
37QUEUES = [(DEFAULT_QUEUE_WEIGHT, DEFAULT_QUEUE_NAME), ]
11 38
12# {{{Job Manager 39# {{{Job Manager
13# How long should we wait before retrying to connect to a broker? 40# How long should we wait before retrying to connect to a broker?
@@ -30,13 +57,14 @@ BACKEND_ADDR = 'tcp://127.0.0.1:47290'
30SCHEDULER_ADDR = 'tcp://127.0.0.1:47291' 57SCHEDULER_ADDR = 'tcp://127.0.0.1:47291'
31WORKER_ADDR = 'tcp://127.0.0.1:47290' 58WORKER_ADDR = 'tcp://127.0.0.1:47290'
32 59
60# How many jobs should the job manager concurrently handle?
61CONCURRENT_JOBS = 4
62HWM = 10000
63
33# Redis settings 64# Redis settings
34RQ_HOST = 'localhost' 65RQ_HOST = 'localhost'
35RQ_PORT = 6379 66RQ_PORT = 6379
36RQ_DB = 0 67RQ_DB = 0
37RQ_PASSWORD = '' 68RQ_PASSWORD = ''
38WORKERS = 4
39HWM = 10000
40 69
41QUEUES = '{}'.format(DEFAULT_QUEUE_NAME)
42# }}} 70# }}}
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 60501fb..332b178 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -66,10 +66,11 @@ class JobManager(HeartbeatMixin, EMQPService):
66 logger.info('Initializing JobManager {}...'.format(self.name)) 66 logger.info('Initializing JobManager {}...'.format(self.name))
67 67
68 #: keep track of workers 68 #: keep track of workers
69 self.workers = Pool(processes=conf.WORKERS) 69 concurrent_jobs = kwargs.pop('concurrent_jobs', conf.CONCURRENT_JOBS)
70 self.workers = Pool(processes=concurrent_jobs)
70 71
71 #: List of queues that this job manager is listening on 72 #: List of queues that this job manager is listening on
72 self.queues = kwargs.pop('queues', None) 73 self.queues = kwargs.pop('queues', conf.QUEUES)
73 74
74 if not kwargs.pop('skip_signal', False): 75 if not kwargs.pop('skip_signal', False):
75 # handle any sighups by reloading config 76 # handle any sighups by reloading config
@@ -91,7 +92,7 @@ class JobManager(HeartbeatMixin, EMQPService):
91 """ 92 """
92 # Acknowledgment has come 93 # Acknowledgment has come
93 # Send a READY for each available worker 94 # Send a READY for each available worker
94 for i in range(0, conf.WORKERS): 95 for i in range(0, conf.CONCURRENT_JOBS):
95 self.send_ready() 96 self.send_ready()
96 97
97 while True: 98 while True:
@@ -173,19 +174,26 @@ class JobManager(HeartbeatMixin, EMQPService):
173 import_settings(section='jobmanager') 174 import_settings(section='jobmanager')
174 self.start(addr=conf.WORKER_ADDR) 175 self.start(addr=conf.WORKER_ADDR)
175 176
176 def jobmanager_main(self): 177 def jobmanager_main(self, broker_addr=None):
177 """ 178 """
178 Kick off jobmanager with logging and settings import 179 Kick off jobmanager with logging and settings import
180
181 Args:
182 broker_addr (str): The address of the broker to connect to.
179 """ 183 """
180 setup_logger('') 184 setup_logger('')
181 import_settings() 185 import_settings()
182 import_settings(section='jobmanager') 186 import_settings(section='jobmanager')
183 187
184 # If this manager was passed explicit queues, favor those. 188 # If this manager was passed explicit options, favor those
185 if self.queues: 189 if self.queues:
186 conf.QUEUES = self.queues 190 conf.QUEUES = self.queues
187 191
188 self.start(addr=conf.WORKER_ADDR, queues=self.queues or conf.QUEUES) 192 if broker_addr:
193 conf.WORKER_ADDR = broker_addr
194
195 self.start(addr=conf.WORKER_ADDR,
196 queues=conf.QUEUES)
189 197
190 198
191def jobmanager_main(): 199def jobmanager_main():
diff --git a/eventmq/router.py b/eventmq/router.py
index e91d74f..7aa0d9b 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -18,6 +18,7 @@
18Routes messages to workers (that are in named queues). 18Routes messages to workers (that are in named queues).
19""" 19"""
20from copy import copy 20from copy import copy
21import json # deserialize queues in on_inform. should be refactored
21import logging 22import logging
22import signal 23import signal
23 24
@@ -29,7 +30,7 @@ from .utils.messages import (
29 fwd_emqp_router_message as fwdmsg, 30 fwd_emqp_router_message as fwdmsg,
30 parse_router_message 31 parse_router_message
31) 32)
32from .utils import zero_index_cmp 33from .utils import tuplify, zero_index_cmp
33from .utils.settings import import_settings 34from .utils.settings import import_settings
34from .utils.devices import generate_device_name 35from .utils.devices import generate_device_name
35from .utils.timeutils import monotonic, timestamp 36from .utils.timeutils import monotonic, timestamp
@@ -76,8 +77,8 @@ class Router(HeartbeatMixin):
76 #: here. 77 #: here.
77 #: 78 #:
78 #: **Keys** 79 #: **Keys**
79 #: * ``queues``: list() of queues the worker belongs to. The highest 80 #: * ``queues``: list() of queue names and prioritiess the worker
80 # priority queue should come first. 81 #: belongs to. e.g. (10, 'default')
81 #: * ``hb``: monotonic timestamp of the last received message from 82 #: * ``hb``: monotonic timestamp of the last received message from
82 #: worker 83 #: worker
83 #: * ``available_slots``: int count of jobs this manager can still 84 #: * ``available_slots``: int count of jobs this manager can still
@@ -246,10 +247,10 @@ class Router(HeartbeatMixin):
246 queue_names = msg[0] 247 queue_names = msg[0]
247 client_type = msg[1] 248 client_type = msg[1]
248 249
249 if not queue_names: 250 if not queue_names: # Ideally, this matches some workers
250 queues = ('default', ) 251 queues = [(conf.DEFAULT_QUEUE_WEIGHT, conf.DEFAULT_QUEUE_NAME), ]
251 else: 252 else:
252 queues = queue_names.split(',') 253 queues = list(map(tuplify, json.loads(queue_names)))
253 254
254 logger.info('Received INFORM request from {} (type: {})'.format( 255 logger.info('Received INFORM request from {} (type: {})'.format(
255 sender, client_type)) 256 sender, client_type))
@@ -274,16 +275,17 @@ class Router(HeartbeatMixin):
274 msgid (str): Unique identifier for this message 275 msgid (str): Unique identifier for this message
275 msg: The actual message that was sent 276 msg: The actual message that was sent
276 """ 277 """
278 queue_names = self.workers[sender]['queues']
279
277 # if there are waiting messages for the queues this worker is a member 280 # if there are waiting messages for the queues this worker is a member
278 # of, then reply back with the oldest waiting message, otherwise just 281 # of, then reply back with the oldest waiting message, otherwise just
279 # add the worker to the list of available workers. 282 # add the worker to the list of available workers.
280 # Note: This is only taking into account the queue the worker is 283 # Note: This is only taking into account the queue the worker is
281 # returning from, and not other queue_names that might have had 284 # returning from, and not other queue_names that might have had
282 # messages waiting even longer. 285 # messages waiting even longer.
283 queue_names = self.workers[sender]['queues']
284
285 # Assumes the highest priority queue comes first 286 # Assumes the highest priority queue comes first
286 for queue_name in queue_names: 287 for queue in queue_names:
288 queue_name = queue[1]
287 if queue_name in self.waiting_messages.keys(): 289 if queue_name in self.waiting_messages.keys():
288 logger.debug('Found waiting message in the %s waiting_messages' 290 logger.debug('Found waiting message in the %s waiting_messages'
289 ' queue' % queue_name) 291 ' queue' % queue_name)
@@ -292,12 +294,13 @@ class Router(HeartbeatMixin):
292 fwdmsg(self.outgoing, sender, msg) 294 fwdmsg(self.outgoing, sender, msg)
293 295
294 # It is easier to check if a key exists rather than the len of 296 # It is easier to check if a key exists rather than the len of
295 # a key if it exists elsewhere, so if that was the last message 297 # a key's value if it exists elsewhere, so if that was the last
296 # remove the queue 298 # message remove the queue
297 if len(self.waiting_messages[queue_name]) == 0: 299 if len(self.waiting_messages[queue_name]) == 0:
298 logger.debug('No more messages in waiting_messages queue ' 300 logger.debug('No more messages in waiting_messages queue '
299 '%s. Removing from list...' % queue_name) 301 '%s. Removing from list...' % queue_name)
300 del self.waiting_messages[queue_name] 302 del self.waiting_messages[queue_name]
303
301 # the message has been forwarded so short circuit that way the 304 # the message has been forwarded so short circuit that way the
302 # manager isn't reslotted 305 # manager isn't reslotted
303 return 306 return
@@ -392,17 +395,9 @@ class Router(HeartbeatMixin):
392 conf.HEARTBEAT_TIMEOUT)) 395 conf.HEARTBEAT_TIMEOUT))
393 396
394 # Remove the worker from the actual queues 397 # Remove the worker from the actual queues
395 for i in range(0, len(self.workers[worker_id]['queues'])): 398 for queue in self.workers[worker_id]['queues']:
396 if i == 0:
397 priority = 10
398 else:
399 priority = 0
400
401 queue = self.workers[worker_id]['queues'][i]
402
403 try: 399 try:
404 self.queues[queue].remove((priority, worker_id)) 400 self.queues[queue[1]].remove((queue[0], worker_id))
405 break
406 except KeyError: 401 except KeyError:
407 # This queue disappeared for some reason 402 # This queue disappeared for some reason
408 continue 403 continue
@@ -426,6 +421,10 @@ class Router(HeartbeatMixin):
426 raise TypeError('type of `queue` parameter not one of (list, ' 421 raise TypeError('type of `queue` parameter not one of (list, '
427 'tuple). got {}'.format(type(queues))) 422 'tuple). got {}'.format(type(queues)))
428 423
424 if worker_id in self.workers:
425 logger.warning('Worker id already found in `workers`. Overwriting '
426 'data')
427
429 # Add the worker to our worker dict 428 # Add the worker to our worker dict
430 self.workers[worker_id] = {} 429 self.workers[worker_id] = {}
431 self.workers[worker_id]['queues'] = tuple(queues) 430 self.workers[worker_id]['queues'] = tuple(queues)
@@ -433,21 +432,15 @@ class Router(HeartbeatMixin):
433 self.workers[worker_id]['available_slots'] = 0 432 self.workers[worker_id]['available_slots'] = 0
434 433
435 # Define priorities. First element is the highest priority 434 # Define priorities. First element is the highest priority
436 for i in range(len(queues)): 435 for q in queues:
437 if i == 0: 436 if q[1] not in self.queues:
438 priority = 10 437 self.queues[q[1]] = list()
439 else:
440 priority = 0
441 438
442 if queues[i] not in self.queues: 439 self.queues[q[1]].append((q[0], worker_id))
443 self.queues[queues[i]] = EMQdeque() 440 self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]])
444 441
445 self.queues[queues[i]].append((priority, worker_id))
446
447 self.queues[queues[i]] = \
448 self.prioritize_queue_list(self.queues[queues[i]])
449 logger.debug('Added worker {} to the queues {}'.format( 442 logger.debug('Added worker {} to the queues {}'.format(
450 worker_id, queues)) 443 worker_id, queues))
451 444
452 def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): 445 def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME):
453 """ 446 """
@@ -480,15 +473,19 @@ class Router(HeartbeatMixin):
480 # pop the next job manager id & check if it has a worker slot 473 # pop the next job manager id & check if it has a worker slot
481 # if it doesn't add it to popped_workers to be added back to 474 # if it doesn't add it to popped_workers to be added back to
482 # self.queues after the loop 475 # self.queues after the loop
483 worker = self.queues[queue_name].popleft() 476 worker = self.queues[queue_name].pop(0)
477
484 # LRU when sorted later by appending 478 # LRU when sorted later by appending
485 popped_workers.append(worker) 479 popped_workers.append(worker)
480
486 if self.workers[worker[1]]['available_slots'] > 0: 481 if self.workers[worker[1]]['available_slots'] > 0:
487 worker_addr = worker[1] 482 worker_addr = worker[1]
488 break 483 break
484
489 except KeyError: 485 except KeyError:
490 # This should only happen if worker[1] is missing: 486 # This should only happen if worker[1] is missing 1 from
491 # - available slots is pre set to 0 self.add_worker 487 # self.workers because:
488 # - available slots initialized to 0 self.add_worker()
492 # - we already checked that self.queues[queue_name] exists 489 # - we already checked that self.queues[queue_name] exists
493 logger.error("Worker {} not found for queue {}".format( 490 logger.error("Worker {} not found for queue {}".format(
494 worker, queue_name)) 491 worker, queue_name))
@@ -497,14 +494,16 @@ class Router(HeartbeatMixin):
497 worker, queue_name 494 worker, queue_name
498 )) 495 ))
499 continue 496 continue
497
500 except IndexError: 498 except IndexError:
501 # worker[1] should exist if it follows the (priority, id) fmt 499 # worker[1] should exist if it follows the (priority, id) fmt
502 logger.error("Invalid worker format in self.queues {}".format( 500 logger.error("Invalid priority/worker format in self.queues "
503 worker 501 "{}".format(worker))
504 ))
505 continue 502 continue
503 else:
504 # No more queues to try
505 pass
506 506
507 # Should always evaluate to true
508 if popped_workers: 507 if popped_workers:
509 self.queues[queue_name].extend(popped_workers) 508 self.queues[queue_name].extend(popped_workers)
510 self.queues[queue_name] = self.prioritize_queue_list( 509 self.queues[queue_name] = self.prioritize_queue_list(
@@ -553,12 +552,6 @@ class Router(HeartbeatMixin):
553 """ 552 """
554 self.workers[worker_id]['available_slots'] += 1 553 self.workers[worker_id]['available_slots'] += 1
555 554
556 def queue_message(self, msg):
557 """
558 Add a message to the queue for processing later
559 """
560 raise NotImplementedError()
561
562 def process_client_message(self, original_msg, depth=0): 555 def process_client_message(self, original_msg, depth=0):
563 """ 556 """
564 Args: 557 Args:
@@ -691,13 +684,9 @@ class Router(HeartbeatMixin):
691 IndexError - There was no 0-index element. 684 IndexError - There was no 0-index element.
692 685
693 Returns: 686 Returns:
694 sorted :class:`EMQdeque` with largest priorites being indexed 687 decsending order list. E.g. ((20, 'a'), (14, 'b'), (12, 'c'))
695 smaller. E.g. ((20,'a' ), (14, 'b'), ('12', c))
696 """ 688 """
697 return EMQdeque( 689 return sorted(unprioritized_iterable, cmp=zero_index_cmp, reverse=True)
698 initial=sorted(unprioritized_iterable,
699 cmp=zero_index_cmp,
700 reverse=True))
701 690
702 def sighup_handler(self, signum, frame): 691 def sighup_handler(self, signum, frame):
703 """ 692 """
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index 37014cb..6d3c663 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -23,8 +23,6 @@ ADDR = 'inproc://pour_the_rice_in_the_thing'
23 23
24 24
25class TestCase(unittest.TestCase): 25class TestCase(unittest.TestCase):
26 jm = None
27
28 def test__setup(self): 26 def test__setup(self):
29 jm = jobmanager.JobManager(name='RuckusBringer') 27 jm = jobmanager.JobManager(name='RuckusBringer')
30 self.assertEqual(jm.name, 'RuckusBringer') 28 self.assertEqual(jm.name, 'RuckusBringer')
@@ -62,8 +60,8 @@ class TestCase(unittest.TestCase):
62 60
63 jm._start_event_loop() 61 jm._start_event_loop()
64 62
65 # send conf.WORKERS ready messages 63 # send int(conf.CONCURRENT_JOBS) ready messages
66 self.assertEqual(conf.WORKERS, send_ready_mock.call_count) 64 self.assertEqual(conf.CONCURRENT_JOBS, send_ready_mock.call_count)
67 65
68 process_msg_mock.assert_called_with( 66 process_msg_mock.assert_called_with(
69 sender_mock.return_value) 67 sender_mock.return_value)
@@ -86,6 +84,7 @@ class TestCase(unittest.TestCase):
86 callback=jm.worker_done, 84 callback=jm.worker_done,
87 func=run_mock) 85 func=run_mock)
88 86
87# Other Tests
89 @mock.patch('eventmq.jobmanager.JobManager.start') 88 @mock.patch('eventmq.jobmanager.JobManager.start')
90 @mock.patch('eventmq.jobmanager.import_settings') 89 @mock.patch('eventmq.jobmanager.import_settings')
91 @mock.patch('eventmq.jobmanager.Sender.rebuild') 90 @mock.patch('eventmq.jobmanager.Sender.rebuild')
@@ -111,7 +110,7 @@ class TestCase(unittest.TestCase):
111 @mock.patch('eventmq.jobmanager.import_settings') 110 @mock.patch('eventmq.jobmanager.import_settings')
112 @mock.patch('eventmq.jobmanager.setup_logger') 111 @mock.patch('eventmq.jobmanager.setup_logger')
113 def test_jobmanager_main(self, setup_logger_mock, import_settings_mock, 112 def test_jobmanager_main(self, setup_logger_mock, import_settings_mock,
114 start_mock): 113 start_mock):
115 jm = jobmanager.JobManager() 114 jm = jobmanager.JobManager()
116 115
117 jm.jobmanager_main() 116 jm.jobmanager_main()
@@ -125,7 +124,7 @@ class TestCase(unittest.TestCase):
125 start_mock.assert_called_with(addr=conf.WORKER_ADDR, 124 start_mock.assert_called_with(addr=conf.WORKER_ADDR,
126 queues=conf.QUEUES) 125 queues=conf.QUEUES)
127 126
128 jm.queues = ('derp', 'blurp') 127 jm.queues = ((10, 'derp'), (0, 'blurp'))
129 jm.jobmanager_main() 128 jm.jobmanager_main()
130 129
131 start_mock.assert_called_with(addr=conf.WORKER_ADDR, 130 start_mock.assert_called_with(addr=conf.WORKER_ADDR,
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index 6349159..82fe0b5 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -44,33 +44,77 @@ class TestCase(unittest.TestCase):
44 44
45 @mock.patch('eventmq.router.Router.send_ack') 45 @mock.patch('eventmq.router.Router.send_ack')
46 @mock.patch('eventmq.router.Router.add_worker') 46 @mock.patch('eventmq.router.Router.add_worker')
47 def test_on_inform_worker_defaut_queue(self, add_worker_mock, 47 def test_on_inform_worker(self, add_worker_mock, send_ack_mock):
48 send_ack_mock): 48 sender_id = 'omgsenderid19'
49 queues = '[[32, "top"], [23, "drop"], [12, "shop"]]'
50 inform_msgid = 'msg31'
51
52 self.router.on_inform(
53 sender_id, inform_msgid, [queues, 'worker'])
54
55 self.router.send_ack.assert_called_with(
56 self.router.outgoing, sender_id, inform_msgid)
57
58 self.router.add_worker.assert_called_with(
59 sender_id, [(32, 'top'), (23, 'drop'), (12, 'shop')])
60
61 @mock.patch('eventmq.router.Router.send_ack')
62 @mock.patch('eventmq.router.Router.add_worker')
63 def test_on_inform_worker_default_queue(self, add_worker_mock,
64 send_ack_mock):
65 # Test on_inform when no queue is specified
49 sender_id = 'omgsender18' 66 sender_id = 'omgsender18'
50 queues = '' 67 queues = ''
51 inform_msgid = 'msg29' 68 inform_msgid = 'msg29'
52 69
53 self.router.on_inform(sender_id, inform_msgid, 70 self.router.on_inform(
54 [queues, 'worker']) 71 sender_id, inform_msgid, [queues, constants.CLIENT_TYPE.worker])
55 72
56 self.router.send_ack.assert_called_with(self.router.outgoing, 73 self.router.send_ack.assert_called_with(
57 sender_id, inform_msgid) 74 self.router.outgoing, sender_id, inform_msgid)
58 self.router.add_worker.assert_called_with(sender_id, ('default',)) 75 self.router.add_worker.assert_called_with(
76 sender_id, [(10, 'default'), ])
59 77
60 @mock.patch('eventmq.router.Router.send_ack') 78 # @mock.patch('eventmq.router.Router.prioritize_queue_list')
61 @mock.patch('eventmq.router.Router.add_worker') 79 def test_add_worker(self):
62 def test_on_inform_worker(self, add_worker_mock, send_ack_mock): 80 worker1_id = 'w1'
63 sender_id = 'omgsenderid19' 81 worker2_id = 'w2'
64 queues = 'top,drop,shop'
65 inform_msgid = 'msg31'
66 82
67 self.router.on_inform(sender_id, inform_msgid, 83 queues1 = [(10, 'top'), (9, 'drop'), (8, 'shop')]
68 [queues, 'worker']) 84 queues2 = [(10, 'default'), (9, 'shop'), (8, 'top')]
69 85
70 self.router.send_ack.assert_called_with(self.router.outgoing, 86 self.router.add_worker(worker1_id, queues=queues1)
71 sender_id, inform_msgid) 87 self.router.add_worker(worker2_id, queues=queues2)
72 self.router.add_worker.assert_called_with(sender_id, 88 # added to the list of workers
73 queues.split(',')) 89 self.assertIn(worker1_id, self.router.workers)
90 self.assertIn(worker2_id, self.router.workers)
91 self.assertGreater(self.router.workers[worker1_id]['hb'], 0)
92 # no slots yet
93 self.assertEqual(self.router.workers[worker1_id]['available_slots'], 0)
94
95 # aware of the queues
96 self.assertEqual(3, len(self.router.workers[worker1_id]['queues']))
97 self.assertIn((10, 'top'), self.router.workers[worker1_id]['queues'])
98 self.assertIn((9, 'drop'), self.router.workers[worker1_id]['queues'])
99 self.assertIn((8, 'shop'), self.router.workers[worker1_id]['queues'])
100
101 # Worker2
102 self.assertIn((10, 'default'),
103 self.router.workers[worker2_id]['queues'])
104 self.assertIn((9, 'shop'), self.router.workers[worker2_id]['queues'])
105 self.assertIn((8, 'top'), self.router.workers[worker2_id]['queues'])
106
107 self.assertIn((10, worker1_id), self.router.queues['top'])
108 self.assertIn((9, worker1_id), self.router.queues['drop'])
109 self.assertIn((8, worker1_id), self.router.queues['shop'])
110
111 self.assertIn((10, worker2_id), self.router.queues['default'])
112 self.assertIn((9, worker2_id), self.router.queues['shop'])
113 self.assertIn((8, worker2_id), self.router.queues['top'])
114
115 def test_add_worker_invalid_queues(self):
116 with self.assertRaises(TypeError):
117 self.router.add_worker('83902', 8902)
74 118
75 @mock.patch('eventmq.utils.messages.generate_msgid') 119 @mock.patch('eventmq.utils.messages.generate_msgid')
76 def test_send_ack(self, generate_msgid_mock): 120 def test_send_ack(self, generate_msgid_mock):
@@ -115,12 +159,12 @@ class TestCase(unittest.TestCase):
115 self.assertEqual(self.router._meta['last_sent_heartbeat'], 0) 159 self.assertEqual(self.router._meta['last_sent_heartbeat'], 0)
116 self.router.workers = { 160 self.router.workers = {
117 'w1': { 161 'w1': {
118 'queues': ['default', ], 162 'queues': [(10, 'default'), ],
119 'hb': 123.2, 163 'hb': 123.2,
120 'available_slots': 3, 164 'available_slots': 3,
121 }, 165 },
122 'w2': { 166 'w2': {
123 'queues': ['default', ], 167 'queues': [(10, 'not-default'), ],
124 'hb': 123.2, 168 'hb': 123.2,
125 'available_slots': 2, 169 'available_slots': 2,
126 } 170 }
@@ -131,10 +175,27 @@ class TestCase(unittest.TestCase):
131 # is very hard to mock) 175 # is very hard to mock)
132 self.assertGreater(self.router._meta['last_sent_heartbeat'], 0) 176 self.assertGreater(self.router._meta['last_sent_heartbeat'], 0)
133 177
134 self.router.send_heartbeat.assert_has_calls( 178 send_heartbeat_mock.assert_has_calls(
135 [mock.call(self.router.outgoing, 'w1'), 179 [mock.call(self.router.outgoing, 'w1'),
136 mock.call(self.router.outgoing, 'w2')], any_order=True) 180 mock.call(self.router.outgoing, 'w2')], any_order=True)
137 181
182 @mock.patch('eventmq.router.Router.send_heartbeat')
183 def test_send_schedulers_heartbeats(self, send_hb_mock):
184 scheduler_id = 's39'
185 self.assertEqual(self.router._meta['last_sent_scheduler_heartbeat'], 0)
186
187 self.router.schedulers = {
188 scheduler_id: {
189 'hb': 0,
190 }
191 }
192
193 self.router.send_schedulers_heartbeats()
194
195 self.assertGreater(
196 self.router._meta['last_sent_scheduler_heartbeat'], 0)
197 send_hb_mock.assert_called_with(self.router.incoming, scheduler_id)
198
138 def test_on_disconnect(self): 199 def test_on_disconnect(self):
139 self.assertFalse(self.router.received_disconnect) 200 self.assertFalse(self.router.received_disconnect)
140 self.router.on_disconnect('msg1', 'derp') 201 self.router.on_disconnect('msg1', 'derp')
@@ -152,14 +213,14 @@ class TestCase(unittest.TestCase):
152 213
153 self.router.workers = { 214 self.router.workers = {
154 worker_id: { 215 worker_id: {
155 'queues': ['default', ], 216 'queues': [(10, 'default'), ],
156 'hb': 123.2, 217 'hb': 123.2,
157 'available_slots': 3, 218 'available_slots': 3,
158 }, 219 },
159 } 220 }
160 221
161 self.router.waiting_messages['default'] = EMQdeque( 222 self.router.waiting_messages['default'] = EMQdeque(
162 initial=[waiting_msg]) 223 initial=[waiting_msg, ])
163 224
164 self.router.on_ready(worker_id, msgid, msg) 225 self.router.on_ready(worker_id, msgid, msg)
165 226
@@ -171,8 +232,10 @@ class TestCase(unittest.TestCase):
171 232
172 @mock.patch('eventmq.router.fwdmsg') 233 @mock.patch('eventmq.router.fwdmsg')
173 @mock.patch('eventmq.router.Router.requeue_worker') 234 @mock.patch('eventmq.router.Router.requeue_worker')
174 def test_on_ready_prioritized_queue(self, requeue_worker_mock, 235 def test_on_ready_multpile_queues(self, requeue_worker_mock,
175 fwdmsg_mock): 236 fwdmsg_mock):
237 # Test that if messages are waiting on multiple queues, they are
238 # dispatched immediatly after a READY message.
176 worker1_id = 'w1' 239 worker1_id = 'w1'
177 worker2_id = 'w2' 240 worker2_id = 'w2'
178 241
@@ -188,53 +251,61 @@ class TestCase(unittest.TestCase):
188 251
189 self.router.workers = { 252 self.router.workers = {
190 worker1_id: { 253 worker1_id: {
191 'queues': ['kun', 'blu'], 254 'queues': [(10, 'kun'), (0, 'blu')],
192 'hb': 123.2, 255 'hb': 123.2,
193 'available_slots': 0, 256 'available_slots': 0,
194 }, 257 },
195 worker2_id: { 258 worker2_id: {
196 'queues': ['blu', 'kun'], 259 'queues': [(10, 'blu'), (0, 'kun')],
197 'hb': 123.2, 260 'hb': 123.2,
198 'available_slots': 0 261 'available_slots': 0
199 } 262 }
200 } 263 }
201 264
202 self.router.queues = { 265 self.router.queues = {
203 'kun': EMQdeque(initial=[(10, worker1_id), (0, worker2_id)]), 266 'kun': [(10, worker1_id), (0, worker2_id)],
204 'blu': EMQdeque(initial=[(10, worker2_id), (0, worker1_id)]) 267 'blu': [(10, worker2_id), (0, worker1_id)],
205
206 } 268 }
207 269
208 self.router.waiting_messages = { 270 self.router.waiting_messages = {
209 'kun': EMQdeque(initial=[waiting_msg1, waiting_msg2]), 271 'kun': EMQdeque(initial=[waiting_msg1, waiting_msg2]),
210 'blu': EMQdeque(initial=[waiting_msg3]) 272 'blu': EMQdeque(initial=[waiting_msg3, ]),
211 } 273 }
212 274
275 # Forward waiting_msg1
213 ready_msgid1 = 'ready23' 276 ready_msgid1 = 'ready23'
214 self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) 277 self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1])
215 fwdmsg_mock.assert_called_with(self.router.outgoing, worker1_id, 278 fwdmsg_mock.assert_called_with(self.router.outgoing, worker1_id,
216 waiting_msg1) 279 waiting_msg1)
217 280
218 ready_msgid2 = 'ready19' 281 # Forward waiting_msg3 -- blu is a higher priority for worker2
219 self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) 282 ready_msgid3 = 'ready19'
283 self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3])
220 fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, 284 fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id,
221 waiting_msg3) 285 waiting_msg3)
222 286
223 ready_msgid3 = 'ready5' 287 # Forward waiting_msg2
224 self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3]) 288 ready_msgid2 = 'ready5'
289 self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2])
225 fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, 290 fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id,
226 waiting_msg2) 291 waiting_msg2)
227 292
293 # There should be no keys because the code checks for their existence
294 # to know if there is a waiting message
295 self.assertEqual(0, len(self.router.waiting_messages.keys()))
296
297 # No waiting messages
228 self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) 298 self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1])
229 requeue_worker_mock.assert_called_with(worker1_id) 299 requeue_worker_mock.assert_called_with(worker1_id)
230 self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) 300 self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2])
231 requeue_worker_mock.assert_called_with(worker2_id) 301 requeue_worker_mock.assert_called_with(worker2_id)
232 302
303 @mock.patch('eventmq.router.Router.clean_up_dead_workers')
233 @mock.patch('eventmq.router.Router.process_client_message') 304 @mock.patch('eventmq.router.Router.process_client_message')
234 @mock.patch('eventmq.router.Router.get_available_worker') 305 @mock.patch('eventmq.router.Router.get_available_worker')
235 @mock.patch('eventmq.router.fwdmsg') 306 @mock.patch('eventmq.router.fwdmsg')
236 def test_on_request(self, fwdmsg_mock, get_worker_mock, 307 def test_on_request(self, fwdmsg_mock, get_worker_mock,
237 process_client_msg_mock): 308 process_client_msg_mock, cleanupworkers_mock):
238 client_id = 'c1' 309 client_id = 'c1'
239 msgid = 'msg18' 310 msgid = 'msg18'
240 queue = 'default' 311 queue = 'default'
@@ -245,13 +316,13 @@ class TestCase(unittest.TestCase):
245 316
246 self.router.workers = { 317 self.router.workers = {
247 worker_id: { 318 worker_id: {
248 'queues': EMQdeque(initial=(queue,)), 319 'queues': [(10, queue)],
249 'hb': 2903.34, 320 'hb': 2903.34,
250 'available_slots': 1, 321 'available_slots': 1,
251 } 322 }
252 } 323 }
253 self.router.queues = { 324 self.router.queues = {
254 'default': EMQdeque(initial=((10, worker_id))) 325 queue: [(10, worker_id),]
255 } 326 }
256 327
257 # Router accepts job for 1 available slot 328 # Router accepts job for 1 available slot
@@ -268,7 +339,7 @@ class TestCase(unittest.TestCase):
268 get_worker_mock.side_effect = raise_no_workers 339 get_worker_mock.side_effect = raise_no_workers
269 self.router.on_request(client_id, msgid, msg) 340 self.router.on_request(client_id, msgid, msg)
270 341
271 self.assertIn(msg[0], self.router.waiting_messages) 342 self.assertIn(queue, self.router.waiting_messages)
272 self.assertEqual(list(self.router.waiting_messages[queue])[0], 343 self.assertEqual(list(self.router.waiting_messages[queue])[0],
273 ['', constants.PROTOCOL_VERSION, 'REQUEST', 344 ['', constants.PROTOCOL_VERSION, 'REQUEST',
274 msgid] + msg) 345 msgid] + msg)
@@ -286,14 +357,85 @@ class TestCase(unittest.TestCase):
286 [client_id, '', constants.PROTOCOL_VERSION, 'REQUEST', msgid]+msg, 357 [client_id, '', constants.PROTOCOL_VERSION, 'REQUEST', msgid]+msg,
287 depth=2) 358 depth=2)
288 359
289 def test_cleanup_dead_workers(self): 360 def test_get_available_worker(self):
361 worker2_id = 'w2'
362 worker3_id = 'w3'
363
364 queue1_id = 'default'
365 queue2_id = 'jimjam'
366
367 self.router.queues = {
368 queue1_id: [(10, worker3_id), (0, worker2_id)],
369 queue2_id: [(10, worker2_id)],
370 }
371
372 self.router.workers = {
373 worker2_id: {
374 'queues': [(10, queue2_id), (0, queue1_id)],
375 'available_slots': 1,
376 },
377 worker3_id: {
378 'queues': [(10, queue1_id), ],
379 'available_slots': 1,
380 },
381 }
382
383 # Get the next available worker for queue2
384 check1 = self.router.get_available_worker(queue_name=queue2_id)
385 self.assertEqual(worker2_id, check1)
386
387 # Get the next available worker for queue1
388 check2 = self.router.get_available_worker(queue_name=queue1_id)
389 self.assertEqual(worker3_id, check2)
390
391 # Pretend worker 3 is doing something
392 self.router.workers[worker3_id]['available_slots'] = 0
393
394 # Get the next available worker for queue1
395 check3 = self.router.get_available_worker(queue_name=queue1_id)
396 self.assertEqual(worker2_id, check3)
397
398 def test_get_available_worker_dont_decrement_slots(self):
399 # Once upon a time get_available_worker() decremented the available
400 # slots counter and the townsfolk greived
401 queue1_id = 'q1'
402 worker1_id = 'w1'
403
404 self.router.queues = {
405 queue1_id: [(10, worker1_id, ), ]
406 }
407
408 self.router.workers = {
409 worker1_id: {
410 'queues': [(10, queue1_id), ],
411 'available_slots': 1,
412 }
413 }
414
415 self.router.get_available_worker(queue_name=queue1_id)
416
417 self.assertEqual(self.router.workers[worker1_id]['available_slots'], 1)
418
419 def test_requeue_worker(self):
420 worker_id = 'w1'
421
422 self.router.workers = {
423 worker_id: {
424 'available_slots': 1
425 }
426 }
427
428 self.router.requeue_worker(worker_id)
429 self.assertEqual(self.router.workers[worker_id]['available_slots'], 2)
430
431 def test_clean_up_dead_workers(self):
290 worker1_id = 'w1' 432 worker1_id = 'w1'
291 worker2_id = 'w2' 433 worker2_id = 'w2'
292 worker3_id = 'w3' 434 worker3_id = 'w3'
293 435
294 queue1_id = 'default' 436 queue1_id = 'default'
295 queue2_id = 'jimjam' 437 queue2_id = 'jimjam'
296 nonexistent_queue1 = 'pig' 438 nonexistent_queue1 = 'nonexistent'
297 439
298 # To ensure the value was changed later because monotonic() is hard to 440 # To ensure the value was changed later because monotonic() is hard to
299 # mock 441 # mock
@@ -307,21 +449,21 @@ class TestCase(unittest.TestCase):
307 } 449 }
308 450
309 self.router.workers = { 451 self.router.workers = {
310 # 1 second away from timeout 452 # 3 in the future
311 worker1_id: { 453 worker1_id: {
312 'queues': (queue1_id,), 454 'queues': [(10, queue1_id), ],
313 'hb': monotonic() - conf.HEARTBEAT_TIMEOUT + 1, 455 'hb': monotonic() + 3,
314 'available_slots': 0, 456 'available_slots': 0,
315 }, 457 },
316 # below the timeout 458 # below the timeout
317 worker2_id: { 459 worker2_id: {
318 'queues': (queue2_id, queue1_id), 460 'queues': [(10, queue2_id), (0, queue1_id)],
319 'hb': 0, 461 'hb': 0,
320 'available_slots': 2, 462 'available_slots': 2,
321 }, 463 },
322 # below the timeout and a queue missing from self.router.queues 464 # below the timeout and a queue missing from self.router.queues
323 worker3_id: { 465 worker3_id: {
324 'queues': (queue2_id, nonexistent_queue1), 466 'queues': [(10, queue2_id), (3, nonexistent_queue1)],
325 'hb': 0, 467 'hb': 0,
326 'available_slots': 0, 468 'available_slots': 0,
327 }, 469 },
@@ -337,74 +479,6 @@ class TestCase(unittest.TestCase):
337 self.assertNotIn(queue2_id, self.router.queues) 479 self.assertNotIn(queue2_id, self.router.queues)
338 self.assertNotIn(nonexistent_queue1, self.router.queues) 480 self.assertNotIn(nonexistent_queue1, self.router.queues)
339 481
340 # @mock.patch('eventmq.router.Router.prioritize_queue_list')
341 def test_add_worker(self):
342 worker1_id = 'w1'
343 queues = ('top', 'drop', 'shop')
344
345 self.router.add_worker(worker1_id, queues=queues)
346 # added to the list of workers
347 self.assertIn(worker1_id, self.router.workers)
348 # got an inital heartbeat
349 self.assertGreater(self.router.workers[worker1_id]['hb'], 0)
350 # no slots yet
351 self.assertEqual(self.router.workers[worker1_id]['available_slots'], 0)
352 # aware of the queues
353 self.assertEqual(3, len(self.router.workers[worker1_id]['queues']))
354 self.assertIn((10, worker1_id), list(self.router.queues['top']))
355 self.assertIn((0, worker1_id), list(self.router.queues['drop']))
356 self.assertIn((0, worker1_id), list(self.router.queues['shop']))
357
358 def test_get_available_worker(self):
359 worker2_id = 'w2'
360 worker3_id = 'w3'
361
362 queue1_id = 'default'
363 queue2_id = 'jimjam'
364
365 self.router.queues = {
366 queue1_id: EMQdeque(initial=[(10, worker3_id), (0, worker2_id)]),
367 queue2_id: EMQdeque(initial=[(10, worker2_id)]),
368 }
369
370 self.router.workers = {
371 worker2_id: {
372 'queues': (queue2_id, queue1_id),
373 'available_slots': 1,
374 },
375 worker3_id: {
376 'queues': (queue1_id,),
377 'available_slots': 1,
378 },
379 }
380
381 # worker1 has no available slots.
382 check1 = self.router.get_available_worker(queue_name=queue2_id)
383 self.assertEqual(worker2_id, check1)
384 self.assertEqual(self.router.workers[worker2_id]['available_slots'], 1)
385
386 check2 = self.router.get_available_worker(queue_name=queue1_id)
387 self.assertEqual(worker3_id, check2)
388 self.assertEqual(self.router.workers[worker3_id]['available_slots'], 1)
389
390 self.router.workers[worker3_id]['available_slots'] = 0
391
392 check3 = self.router.get_available_worker(queue_name=queue1_id)
393 self.assertEqual(worker2_id, check3)
394 self.assertEqual(self.router.workers[worker2_id]['available_slots'], 1)
395
396 def test_requeue_worker(self):
397 worker_id = 'w1'
398
399 self.router.workers = {
400 worker_id: {
401 'available_slots': 1
402 }
403 }
404
405 self.router.requeue_worker(worker_id)
406 self.assertEqual(self.router.workers[worker_id]['available_slots'], 2)
407
408 @mock.patch('eventmq.router.Router.on_inform') 482 @mock.patch('eventmq.router.Router.on_inform')
409 @mock.patch('eventmq.router.Router.on_request') 483 @mock.patch('eventmq.router.Router.on_request')
410 @mock.patch('eventmq.router.parse_router_message') 484 @mock.patch('eventmq.router.parse_router_message')
@@ -457,37 +531,35 @@ class TestCase(unittest.TestCase):
457 on_inform_mock.assert_called_with(sender_id, msgid, msg) 531 on_inform_mock.assert_called_with(sender_id, msgid, msg)
458 532
459 def test_prioritize_queue_list(self): 533 def test_prioritize_queue_list(self):
460 queue = EMQdeque(initial=[(0, 'd'), (10, 'b'), (0, 'e'), (10, 'a'), 534 queue = [(0, 'd'), (10, 'b'), (0, 'e'), (10, 'a'), (0, 'c')]
461 (0, 'c')])
462 535
463 sorted1 = self.router.prioritize_queue_list(queue) 536 sorted1 = self.router.prioritize_queue_list(queue)
464 self.assertEqual([(10, 'b'), (10, 'a'), (0, 'd'), (0, 'e'), 537 self.assertEqual([(10, 'b'), (10, 'a'), (0, 'd'), (0, 'e'),
465 (0, 'c')], list(sorted1)) 538 (0, 'c')], sorted1)
466 539
467 pop1 = sorted1.popleft() 540 pop1 = sorted1.pop(0)
468 self.assertEqual(pop1, (10, 'b')) 541 self.assertEqual(pop1, (10, 'b'))
469 sorted1.append(pop1) 542 sorted1.append(pop1)
470 # a, b, d, e, c 543 # a, b, d, e, c
471 sorted2 = self.router.prioritize_queue_list(sorted1) 544 sorted2 = self.router.prioritize_queue_list(sorted1)
472 self.assertEqual([(10, 'a'), (10, 'b'), (0, 'd'), (0, 'e'), (0, 'c')], 545 self.assertEqual([(10, 'a'), (10, 'b'), (0, 'd'), (0, 'e'), (0, 'c')],
473 list(sorted2)) 546 sorted2)
474 pop2 = sorted2.popleft() # a 547 pop2 = sorted2.pop(0) # a
475 pop3 = sorted2.popleft() # b 548 pop3 = sorted2.pop(0) # b
476 pop4 = sorted2.popleft() # d 549 pop4 = sorted2.pop(0) # d
477 self.assertEqual(pop4, (0, 'd')) 550 self.assertEqual(pop4, (0, 'd'))
478 self.assertEqual(pop2, (10, 'a')) 551 self.assertEqual(pop2, (10, 'a'))
479 self.assertEqual(pop3, (10, 'b')) 552 self.assertEqual(pop3, (10, 'b'))
480 self.assertEqual([(0, 'e'), (0, 'c')], list(sorted2)) 553 self.assertEqual([(0, 'e'), (0, 'c')], list(sorted2))
481 554
482 sorted2.appendleft(pop2) 555 sorted2.append(pop2)
483 sorted2.appendleft(pop4) 556 sorted2.append(pop4)
484 sorted2.appendleft(pop3) 557 sorted2.append(pop3)
485 558
486 # a, b, d, e, c
487 sorted3 = self.router.prioritize_queue_list(sorted2) 559 sorted3 = self.router.prioritize_queue_list(sorted2)
488 560
489 self.assertEqual(sorted3.popleft(), (10, 'b')) 561 self.assertEqual(sorted3.pop(0), (10, 'a'))
490 self.assertEqual(sorted3.popleft(), (10, 'a')) 562 self.assertEqual(sorted3.pop(0), (10, 'b'))
491 self.assertEqual(sorted3.popleft(), (0, 'd')) 563 self.assertEqual(sorted3.pop(0), (0, 'e'))
492 self.assertEqual(sorted3.popleft(), (0, 'e')) 564 self.assertEqual(sorted3.pop(0), (0, 'c'))
493 self.assertEqual(sorted3.popleft(), (0, 'c')) 565 self.assertEqual(sorted3.pop(0), (0, 'd'))
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py
index ff3c238..f4669ea 100644
--- a/eventmq/tests/test_utils.py
+++ b/eventmq/tests/test_utils.py
@@ -18,6 +18,7 @@ import unittest
18 18
19import mock 19import mock
20 20
21from .. import constants
21from .. import exceptions 22from .. import exceptions
22from ..utils import messages 23from ..utils import messages
23from ..utils import classes 24from ..utils import classes
@@ -95,6 +96,75 @@ class SettingsTestCase(unittest.TestCase):
95 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') 96 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290')
96 97
97 98
99class EMQPServiceTestCase(unittest.TestCase):
100
101 # pretend to be an emq socket
102 outgoing = 'some-outgoing-socket'
103
104 def get_worker(self):
105 """return an EMQPService mimicking a worker"""
106 obj = classes.EMQPService()
107 obj.SERVICE_TYPE = constants.CLIENT_TYPE.worker
108 obj.outgoing = self.outgoing
109 obj._meta = {
110 'last_sent_heartbeat': 0
111 }
112
113 return obj
114
115 @mock.patch('eventmq.utils.classes.sendmsg')
116 def test_send_inform_return_msgid(self, sendmsg_mock):
117 obj = self.get_worker()
118 sendmsg_mock.return_value = 'some-msgid'
119
120 retval = obj.send_inform(queues=[(10, 'default'), ])
121
122 self.assertEqual(retval, sendmsg_mock.return_value)
123
124 @mock.patch('eventmq.utils.classes.sendmsg')
125 def test_send_inform_single_weightless_queue(self, sendmsg_mock):
126 # Test that the inform message is backward compatible with a change
127 # in v0.2.0
128 obj = self.get_worker()
129
130 obj.send_inform(queues='derpfault')
131
132 sendmsg_mock.assert_called_with(
133 'some-outgoing-socket', 'INFORM',
134 ['derpfault', constants.CLIENT_TYPE.worker]
135 )
136
137 @mock.patch('eventmq.utils.classes.sendmsg')
138 def test_send_inform_empty_queue_name(self, sendmsg_mock):
139 obj = self.get_worker()
140
141 obj.send_inform()
142
143 sendmsg_mock.assert_called_with(
144 'some-outgoing-socket', 'INFORM',
145 ['', constants.CLIENT_TYPE.worker])
146
147 @mock.patch('eventmq.utils.classes.sendmsg')
148 def test_send_inform_specified_valid_queues(self, sendmsg_mock):
149 obj = self.get_worker()
150
151 obj.send_inform(queues=([10, 'push'], [7, 'email'],
152 [3, 'default']))
153 sendmsg_mock.asert_called_with(
154 'some-outgoing-socket', 'INFORM',
155 ["[10, 'push'],[7, 'email'],[3, 'default]",
156 constants.CLIENT_TYPE.worker]
157 )
158
159 @mock.patch('eventmq.utils.classes.sendmsg')
160 def test_send_inform_update_last_sent_heartbeat(self, sendmsg_mock):
161 obj = self.get_worker()
162
163 obj.send_inform(queues=(['', constants.CLIENT_TYPE.worker]))
164
165 self.assertGreater(obj._meta['last_sent_heartbeat'], 0)
166
167
98class TestCase(unittest.TestCase): 168class TestCase(unittest.TestCase):
99 def test_generate_msgid(self): 169 def test_generate_msgid(self):
100 msgid = messages.generate_msgid() 170 msgid = messages.generate_msgid()
diff --git a/eventmq/utils/__init__.py b/eventmq/utils/__init__.py
index 5973c93..2a047a5 100644
--- a/eventmq/utils/__init__.py
+++ b/eventmq/utils/__init__.py
@@ -46,3 +46,15 @@ def zero_index_cmp(a, b):
46 when sorting the values in :attr:`router.Router.queues`. 46 when sorting the values in :attr:`router.Router.queues`.
47 """ 47 """
48 return cmp(a[0], b[0]) 48 return cmp(a[0], b[0])
49
50
51def tuplify(v):
52 """
53 Recursively convert lists to tuples.
54
55 Args:
56 v (object): any value of interest
57 """
58 if isinstance(v, list):
59 return tuple(map(tuplify, v))
60 return v
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index a0f9740..0e54f43 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -53,14 +53,20 @@ class EMQPService(object):
53 53
54 See the code for :class:`Scheduler` and :class:`JobManager` for examples. 54 See the code for :class:`Scheduler` and :class:`JobManager` for examples.
55 """ 55 """
56 def send_inform(self, queue=None): 56 def send_inform(self, queues=()):
57 """ 57 """
58 Queues an INFORM command to `self.outgoing`. 58 Notify the router that this job manager is online and and ready for
59 work. This includes a list of queues the router should forward messages
60 for.
59 61
60 Args: 62 Args:
61 type_ (str): Either 'worker' or 'scheduler' 63 type_ (str): Either 'worker' or 'scheduler'
62 queue (list): 64 queues (list):
63 - For 'worker' type, the queues the worker is listening on 65 - For 'worker' type, the queues the worker is listening on and
66 their weights.
67
68 Example:
69 ([10, 'default'], [15, 'push_notifications'])
64 - Ignored for 'scheduler' type 70 - Ignored for 'scheduler' type
65 71
66 Raises: 72 Raises:
@@ -68,6 +74,11 @@ class EMQPService(object):
68 74
69 Returns: 75 Returns:
70 str: ID of the message 76 str: ID of the message
77
78 .. note::
79
80 Passing a single string for queues is supported for backward
81 compatibility and not recommended for new apps.
71 """ 82 """
72 valid_types = (constants.CLIENT_TYPE.worker, 83 valid_types = (constants.CLIENT_TYPE.worker,
73 constants.CLIENT_TYPE.scheduler) 84 constants.CLIENT_TYPE.scheduler)
@@ -75,8 +86,15 @@ class EMQPService(object):
75 if self.SERVICE_TYPE not in valid_types: 86 if self.SERVICE_TYPE not in valid_types:
76 raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE, 87 raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE,
77 valid_types)) 88 valid_types))
89
90 if isinstance(queues, (list, tuple)):
91 stringified_queues = ''
92 for pair in queues:
93 stringified_queues += '{},'.format(str(pair))
94 queues = stringified_queues[:-1] # strip off the last comma
95
78 msgid = sendmsg(self.outgoing, 'INFORM', [ 96 msgid = sendmsg(self.outgoing, 'INFORM', [
79 queue or conf.DEFAULT_QUEUE_NAME, 97 queues,
80 self.SERVICE_TYPE 98 self.SERVICE_TYPE
81 ]) 99 ])
82 100
diff --git a/eventmq/utils/settings.py b/eventmq/utils/settings.py
index 0621aaf..1c2671e 100644
--- a/eventmq/utils/settings.py
+++ b/eventmq/utils/settings.py
@@ -21,6 +21,7 @@ import json
21import logging 21import logging
22import os 22import os
23 23
24from . import tuplify
24from .. import conf 25from .. import conf
25 26
26 27
@@ -71,15 +72,3 @@ def import_settings(section='global'):
71 else: 72 else:
72 logger.warning('Config file at {} not found. Continuing with ' 73 logger.warning('Config file at {} not found. Continuing with '
73 'defaults.'.format(conf.CONFIG_FILE)) 74 'defaults.'.format(conf.CONFIG_FILE))
74
75
76def tuplify(v):
77 """
78 Recursively convert lists to tuples.
79
80 Args:
81 v (object): any value of interest
82 """
83 if isinstance(v, list):
84 return tuple(map(tuplify, v))
85 return v
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 0e931ff..23c9dfd 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -20,7 +20,7 @@ Defines different short-lived workers that execute jobs
20from importlib import import_module 20from importlib import import_module
21import logging 21import logging
22 22
23logger = logging.getLogger('*') 23logger = logging.getLogger(__name__)
24 24
25 25
26def run(payload): 26def run(payload):