aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-03-28 15:06:52 -0600
committerjason2017-03-28 15:12:10 -0600
commitd2a91b00a983d177a1c54069dd97b8b28af3f02e (patch)
tree22180416b81563cce42701fdc6555b1d9e19491a
parentf62b135cb4f7e57bc34a369259f41365452d0854 (diff)
downloadeventmq-feature/config_refactor.tar.gz
eventmq-feature/config_refactor.zip
Refactor eventmq configuration managementfeature/config_refactor
- Refactor each device's configuration loading. The global section will be loaded then the section for the app will overrite any sections. - Defines ambiguous settings that cannot be defined globally. skip these settings and emit a warning if they are - update eventmq.conf-dist to include comments and more of the settings that can be defined. - Refactor some option names. * Job Manager: `WORKER_ADDR` is now `CONNECT_ADDR` in the `[jobmanager]` section * Job Manager: `WORKER_ADDR_DEFAULT` is now `CONNECT_ADDR_DEFAULT` in the `[jobmanager]` section * Job Manager: `WORKER_ADDR_FAILOVER` is now `CONNECT_ADDR_FAILOVER` in the `[jobmanager]` section * Publisher: `PUBLISHER_FRONTEND_ADDR` is not `FRONTEND_LISTEN_ADDR` in the `[publisher]` section * Publisher: `PUBLISHER_BACKEND_ADDR` is not `BACKEND_LISTEN_ADDR` in the `[publisher]` section * Router: `FRONTEND_ADDR` is now `FRONTEND_LISTEN_ADDR` in the `[router]` section * Router: `BACKEND_ADDR` is now `BACKEND_LISTEN_ADDR` in the `[router]` section * Scheduler: `SCHEDULER_ADDR` is now `CONNECT_ADDR` in the `[scheduler]` section * `RQ_HOST` is now `REDIS_HOST` * `RQ_PORT` is now `REDIS_PORT` * `RQ_DB` is now `REDIS_DB` * `RQ_PASSWORD` is now `REDIS_PASSWORD` * `ADMINISTRATIVE_ADDR` is now `ADMINISTRATIVE_LISTEN_ADDR` in each respective section See also: #20
-rw-r--r--CHANGELOG.rst24
-rw-r--r--docs/changelog_link.rst1
-rw-r--r--docs/index.rst2
-rw-r--r--etc/eventmq.conf-dist70
-rw-r--r--eventmq/conf.py41
-rw-r--r--eventmq/jobmanager.py14
-rw-r--r--eventmq/pub.py8
-rw-r--r--eventmq/router.py30
-rw-r--r--eventmq/scheduler.py20
-rw-r--r--eventmq/subscriber.py21
-rw-r--r--eventmq/tests/test_jobmanager.py8
-rw-r--r--eventmq/tests/test_router.py5
-rw-r--r--eventmq/tests/test_scheduler.py2
-rw-r--r--eventmq/tests/test_utils_settings.py105
-rw-r--r--eventmq/utils/settings.py84
15 files changed, 265 insertions, 170 deletions
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
new file mode 100644
index 0000000..4f961b9
--- /dev/null
+++ b/CHANGELOG.rst
@@ -0,0 +1,24 @@
1#########
2CHANGELOG
3#########
4
50.4
6===
7* Update config file loading. Each device will load the ``global`` section followed by it's own section of the provided config (router for emq-router, jobmanager for emq-jobmanager, etc)
8
9Backwards incompatible changes
10------------------------------
11* Configuration: The name and section for the listening addresses has changed:
12 * Job Manager: ``WORKER_ADDR`` is now ``CONNECT_ADDR`` in the ``[jobmanager]`` section
13 * Job Manager: ``WORKER_ADDR_DEFAULT`` is now ``CONNECT_ADDR_DEFAULT`` in the ``[jobmanager]`` section
14 * Job Manager: ``WORKER_ADDR_FAILOVER`` is now ``CONNECT_ADDR_FAILOVER`` in the ``[jobmanager]`` section
15 * Publisher: ``PUBLISHER_FRONTEND_ADDR`` is not ``FRONTEND_LISTEN_ADDR`` in the ``[publisher]`` section
16 * Publisher: ``PUBLISHER_BACKEND_ADDR`` is not ``BACKEND_LISTEN_ADDR`` in the ``[publisher]`` section
17 * Router: ``FRONTEND_ADDR`` is now ``FRONTEND_LISTEN_ADDR`` in the ``[router]`` section
18 * Router: ``BACKEND_ADDR`` is now ``BACKEND_LISTEN_ADDR`` in the ``[router]`` section
19 * Scheduler: ``SCHEDULER_ADDR`` is now ``CONNECT_ADDR`` in the ``[scheduler]`` section
20 * ``RQ_HOST`` is now ``REDIS_HOST``
21 * ``RQ_PORT`` is now ``REDIS_PORT``
22 * ``RQ_DB`` is now ``REDIS_DB``
23 * ``RQ_PASSWORD`` is now ``REDIS_PASSWORD``
24 * ``ADMINISTRATIVE_ADDR`` is now ``ADMINISTRATIVE_LISTEN_ADDR`` in each respective section
diff --git a/docs/changelog_link.rst b/docs/changelog_link.rst
new file mode 100644
index 0000000..565b052
--- /dev/null
+++ b/docs/changelog_link.rst
@@ -0,0 +1 @@
.. include:: ../CHANGELOG.rst
diff --git a/docs/index.rst b/docs/index.rst
index 25fc86f..af6a758 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -8,7 +8,7 @@ EventMQ Documentation
8 using 8 using
9 api 9 api
10 contributing 10 contributing
11 11 changelog_link
12 12
13Indices and tables 13Indices and tables
14================== 14==================
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist
index d492269..2d42285 100644
--- a/etc/eventmq.conf-dist
+++ b/etc/eventmq.conf-dist
@@ -1,27 +1,61 @@
1# -*- mode: conf; encoding: utf-8 -*-
1[global] 2[global]
2# Enable message output at different stages in the app. 3# Don't bother with HEARTBEATS, both sending and paying attention to them
3super_debug = true 4disable_heartbeats=False
4 5# Assume the peer is dead after this many missed heartbeats
5# Hide the heartbeat logs when super_debug is enabled. Showing them will generate a lot of messages. 6heartbeat_liveness=3
6hide_heartbeat_logs = True 7# Assume a missed heartbeat after this many seconds
7 8heartbeat_timeout=5
8# Port to listen on for administrative commands sent via emq-cli 9# How often should a heartbeat be sent in seconds. This should be lower than
9administrative_addr=tcp://0.0.0.0:47293 10# HEARTBEAT_TIMEOUT
10 11heartbeat_interval = 3
11frontend_addr=tcp://0.0.0.0:47291 12# Enable raw message output at different stages in the app.
12backend_addr=tcp://0.0.0.0:47290 13super_debug=true
13worker_addr=tcp://eventmq:47290 14# Hide the heartbeat logs when super_debug is enabled. Showing them will
14scheduler_addr=tcp://eventmq:47291 15# generate a lot of messages.
16hide_heartbeat_logs=true
17# Default character encoding for strings in messages See these URLs for
18# supported encodings:
19# https://docs.python.org/2/library/codecs.html#standard-encodings
20# https://docs.python.org/3/library/codecs.html#standard-encodings
21default_encoding=utf-8
15 22
16[router] 23[router]
24# Listen for clients and schedulers on this address
25frontend_listen_addr=tcp://127.0.0.1:47291
26# Listen for jobmanagers on this address
27backend_listen_addr=tcp://127.0.0.1:47290
28# Listen for administrative status and management commands on this address
29admnistrative_listen_addr=tcp://127.0.0.1:47293
17 30
18[scheduler] 31[scheduler]
32# The address of the router's frontend_listen_addr to connect to
33connect_addr=tcp://127.0.0.1:47291
34# Scheduled jobs are saved and loaded from Redis
35redis_host=localhost
36redis_port=6379
37redis_db=0
38# Leave password blank if there is no password
39redis_password=
19 40
20[jobmanager] 41[jobmanager]
21worker_addr=tcp://127.0.0.1:47290 42# The address of the router's backend_listen_addr to connect to
22queues=[[50,"google"], [40,"pushes"], [10,"default"]] 43connect_addr=tcp://127.0.0.1:47290
23concurrent_jobs=2 44# When no named queue is specified for a job use this queue
45default_queue_name=default
46# JSON Array of weights and named queues. We recommend you keep the default
47# queue unless your 100% positive every job will specify a queue
48queues=[[10, "default"]]
49# Number of concurrent jobs to run
50concurrent_jobs=4
51# Number of messages to cache in RAM before discarding messages
52hwm=10000
53# Number of jobs to execute before restarting the Multiprocessing workers to
54# free up any leaked memory.
55max_job_count=1024
24 56
25[publisher] 57[publisher]
26publisher_frontend_addr=tcp://0.0.0.0:47298 58# Listen for publish messages on this address
27publisher_backend_addr=tcp://0.0.0.0:47299 \ No newline at end of file 59frontend_listen_addr=tcp://127.0.0.1:47298
60# Listen for subscribers on this address
61backend_listen_addr=tcp://127.0.0.1:47299 \ No newline at end of file
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 16b1f78..ef6cafb 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -17,6 +17,12 @@
17=================================== 17===================================
18""" 18"""
19 19
20# Settings that should not be honored if they are defined in the global
21# section. These must be lowercase.
22_AMBIGUOUS_SETTINGS = ('frontend_listen_addr', 'backend_listen_addr',
23 'administrative_listen_addr', 'connect_addr',
24 'reconnect_timeout')
25
20#: SUPER_DEBUG basically enables more debugging logs. Specifically the messages 26#: SUPER_DEBUG basically enables more debugging logs. Specifically the messages
21#: at different levels in the application. 27#: at different levels in the application.
22#: Default: False 28#: Default: False
@@ -30,11 +36,10 @@ HIDE_HEARTBEAT_LOGS = True
30# would be a good idea to have a handful of workers listening on this queue 36# would be a good idea to have a handful of workers listening on this queue
31# unless you're positive that everything specifies a queue with workers. 37# unless you're positive that everything specifies a queue with workers.
32DEFAULT_QUEUE_NAME = 'default' 38DEFAULT_QUEUE_NAME = 'default'
33DEFAULT_QUEUE_WEIGHT = 10
34 39
35# Default queues for the Job Manager to listen on. The values here should match 40# Default queues for the Job Manager to listen on. The values here should match
36# the values defined on the router. 41# the values defined on the router.
37QUEUES = [(DEFAULT_QUEUE_WEIGHT, DEFAULT_QUEUE_NAME), ] 42QUEUES = [(10, DEFAULT_QUEUE_NAME), ]
38 43
39# {{{Job Manager 44# {{{Job Manager
40# How long should we wait before retrying to connect to a broker? 45# How long should we wait before retrying to connect to a broker?
@@ -59,32 +64,30 @@ CONFIG_FILE = '/etc/eventmq.conf'
59# https://docs.python.org/3/library/codecs.html#standard-encodings 64# https://docs.python.org/3/library/codecs.html#standard-encodings
60DEFAULT_ENCODING = 'utf-8' 65DEFAULT_ENCODING = 'utf-8'
61 66
67
62# Default addresses to localhost 68# Default addresses to localhost
63# Router: 69FRONTEND_LISTEN_ADDR = 'tcp://127.0.0.1:47291'
64FRONTEND_ADDR = 'tcp://127.0.0.1:47291' 70BACKEND_LISTEN_ADDR = 'tcp://127.0.0.1:47290'
65BACKEND_ADDR = 'tcp://127.0.0.1:47290'
66# Where the Scheduler should connect.
67SCHEDULER_ADDR = 'tcp://127.0.0.1:47291'
68# Where the worker should connect
69WORKER_ADDR = 'tcp://127.0.0.1:47290'
70WORKER_ADDR_DEFAULT = 'tcp://127.0.0.1:47290'
71WORKER_ADDR_FAILOVER = 'tcp://127.0.0.1:47290'
72# Used to monitor and manage the devices 71# Used to monitor and manage the devices
73ADMINISTRATIVE_ADDR = 'tcp://127.0.0.1:47293' 72ADMINISTRATIVE_LISTEN_ADDR = 'tcp://127.0.0.1:47293'
73
74# Where the router is located
75CONNECT_ADDR = None
76
77# # Where the Scheduler should connect.
78CONNECT_ADDR_DEFAULT = 'tcp://127.0.0.1:47290'
79CONNECT_ADDR_FAILOVER = 'tcp://127.0.0.1:47290'
74 80
75# PubSub
76PUBLISHER_FRONTEND_ADDR = 'tcp://127.0.0.1:47298'
77PUBLISHER_BACKEND_ADDR = 'tcp://127.0.0.1:47299'
78 81
79# How many jobs should the job manager concurrently handle? 82# How many jobs should the job manager concurrently handle?
80CONCURRENT_JOBS = 4 83CONCURRENT_JOBS = 4
81HWM = 10000 84HWM = 10000
82 85
83# Redis settings 86# Redis settings
84RQ_HOST = 'localhost' 87REDIS_HOST = 'localhost'
85RQ_PORT = 6379 88REDIS_PORT = 6379
86RQ_DB = 0 89REDIS_DB = 0
87RQ_PASSWORD = '' 90REDIS_PASSWORD = ''
88 91
89MAX_JOB_COUNT = 1024 92MAX_JOB_COUNT = 1024
90 93
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 831cee5..6433861 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -170,10 +170,10 @@ class JobManager(HeartbeatMixin, EMQPService):
170 # False, so it should stay at the bottom of the loop. 170 # False, so it should stay at the bottom of the loop.
171 if not self.maybe_send_heartbeat(events): 171 if not self.maybe_send_heartbeat(events):
172 # Toggle default and failover worker_addr 172 # Toggle default and failover worker_addr
173 if (conf.WORKER_ADDR == conf.WORKER_ADDR_DEFAULT): 173 if (conf.CONNECT_ADDR == conf.CONNECT_ADDR_DEFAULT):
174 conf.WORKER_ADDR = conf.WORKER_ADDR_FAILOVER 174 conf.CONNECT_ADDR = conf.CONNECT_ADDR_FAILOVER
175 else: 175 else:
176 conf.WORKER_ADDR = conf.WORKER_ADDR_DEFAULT 176 conf.CONNECT_ADDR = conf.CONNECT_ADDR_DEFAULT
177 177
178 break 178 break
179 179
@@ -278,7 +278,7 @@ class JobManager(HeartbeatMixin, EMQPService):
278 278
279 def on_disconnect(self, msgid, msg): 279 def on_disconnect(self, msgid, msg):
280 sendmsg(self.frontend, KBYE) 280 sendmsg(self.frontend, KBYE)
281 self.frontend.unbind(conf.WORKER_ADDR) 281 self.frontend.unbind(conf.CONNECT_ADDR)
282 super(JobManager, self).on_disconnect(msgid, msg) 282 super(JobManager, self).on_disconnect(msgid, msg)
283 283
284 def on_kbye(self, msgid, msg): 284 def on_kbye(self, msgid, msg):
@@ -287,7 +287,6 @@ class JobManager(HeartbeatMixin, EMQPService):
287 287
288 def sighup_handler(self, signum, frame): 288 def sighup_handler(self, signum, frame):
289 logger.info('Caught signal %s' % signum) 289 logger.info('Caught signal %s' % signum)
290 import_settings()
291 import_settings(section='jobmanager') 290 import_settings(section='jobmanager')
292 291
293 self.should_reset = True 292 self.should_reset = True
@@ -308,7 +307,6 @@ class JobManager(HeartbeatMixin, EMQPService):
308 broker_addr (str): The address of the broker to connect to. 307 broker_addr (str): The address of the broker to connect to.
309 """ 308 """
310 setup_logger('') 309 setup_logger('')
311 import_settings()
312 import_settings(section='jobmanager') 310 import_settings(section='jobmanager')
313 311
314 # If this manager was passed explicit options, favor those 312 # If this manager was passed explicit options, favor those
@@ -316,9 +314,9 @@ class JobManager(HeartbeatMixin, EMQPService):
316 conf.QUEUES = self.queues 314 conf.QUEUES = self.queues
317 315
318 if broker_addr: 316 if broker_addr:
319 conf.WORKER_ADDR = broker_addr 317 conf.CONNECT_ADDR = broker_addr
320 318
321 self.start(addr=conf.WORKER_ADDR, queues=conf.QUEUES) 319 self.start(addr=conf.CONNECT_ADDR, queues=conf.QUEUES)
322 320
323 321
324def jobmanager_main(): 322def jobmanager_main():
diff --git a/eventmq/pub.py b/eventmq/pub.py
index 487e8c5..f668f05 100644
--- a/eventmq/pub.py
+++ b/eventmq/pub.py
@@ -41,8 +41,8 @@ class Pub(HeartbeatMixin):
41 return 41 return
42 42
43 def start(self, 43 def start(self,
44 frontend_addr=conf.PUBLISHER_FRONTEND_ADDR, 44 frontend_addr=conf.FRONTEND_LISTEN_ADDR,
45 backend_addr=conf.PUBLISHER_BACKEND_ADDR): 45 backend_addr=conf.BACKEND_LISTEN_ADDR):
46 46
47 self.status = STATUS.starting 47 self.status = STATUS.starting
48 48
@@ -87,8 +87,8 @@ class Pub(HeartbeatMixin):
87 """ 87 """
88 setup_logger('eventmq') 88 setup_logger('eventmq')
89 import_settings(section='publisher') 89 import_settings(section='publisher')
90 self.start(frontend_addr=conf.PUBLISHER_FRONTEND_ADDR, 90 self.start(frontend_addr=conf.FRONTEND_LISTEN_ADDR,
91 backend_addr=conf.PUBLISHER_BACKEND_ADDR) 91 backend_addr=conf.BACKEND_LISTEN_ADDR)
92 92
93 93
94# Entry point for pip console scripts 94# Entry point for pip console scripts
diff --git a/eventmq/router.py b/eventmq/router.py
index 4ff3732..a455ece 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -136,9 +136,9 @@ class Router(HeartbeatMixin):
136 pdb.Pdb().set_trace(frame) 136 pdb.Pdb().set_trace(frame)
137 137
138 def start(self, 138 def start(self,
139 frontend_addr=conf.FRONTEND_ADDR, 139 frontend_addr=conf.FRONTEND_LISTEN_ADDR,
140 backend_addr=conf.BACKEND_ADDR, 140 backend_addr=conf.BACKEND_LISTEN_ADDR,
141 administrative_addr=conf.ADMINISTRATIVE_ADDR): 141 administrative_addr=conf.ADMINISTRATIVE_LISTEN_ADDR):
142 """ 142 """
143 Begin listening for connections on the provided connection strings 143 Begin listening for connections on the provided connection strings
144 144
@@ -359,7 +359,7 @@ class Router(HeartbeatMixin):
359 self.send_kbye(self.frontend, scheduler) 359 self.send_kbye(self.frontend, scheduler)
360 360
361 self.schedulers.clear() 361 self.schedulers.clear()
362 self.frontend.unbind(conf.FRONTEND_ADDR) 362 self.frontend.unbind(conf.FRONTEND_LISTEN_ADDR)
363 363
364 if len(self.waiting_messages) > 0: 364 if len(self.waiting_messages) > 0:
365 logger.info("Router processing messages in queue.") 365 logger.info("Router processing messages in queue.")
@@ -372,7 +372,7 @@ class Router(HeartbeatMixin):
372 self.send_kbye(self.backend, worker) 372 self.send_kbye(self.backend, worker)
373 373
374 self.workers.clear() 374 self.workers.clear()
375 self.backend.unbind(conf.BACKEND_ADDR) 375 self.backend.unbind(conf.BACKEND_LISTEN_ADDR)
376 376
377 # Loops event loops should check for this and break out 377 # Loops event loops should check for this and break out
378 self.received_disconnect = True 378 self.received_disconnect = True
@@ -905,22 +905,22 @@ class Router(HeartbeatMixin):
905 process receives a SIGHUP from the system. 905 process receives a SIGHUP from the system.
906 """ 906 """
907 logger.info('Caught signame %s' % signum) 907 logger.info('Caught signame %s' % signum)
908 self.frontend.unbind(conf.FRONTEND_ADDR) 908 self.frontend.unbind(conf.FRONTEND_LISTEN_ADDR)
909 self.backend.unbind(conf.BACKEND_ADDR) 909 self.backend.unbind(conf.BACKEND_LISTEN_ADDR)
910 import_settings() 910 import_settings('router')
911 self.start(frontend_addr=conf.FRONTEND_ADDR, 911 self.start(frontend_addr=conf.FRONTEND_LISTEN_ADDR,
912 backend_addr=conf.BACKEND_ADDR, 912 backend_addr=conf.BACKEND_LISTEN_ADDR,
913 administrative_addr=conf.ADMINISTRATIVE_ADDR) 913 administrative_addr=conf.ADMINISTRATIVE_LISTEN_ADDR)
914 914
915 def router_main(self): 915 def router_main(self):
916 """ 916 """
917 Kick off router with logging and settings import 917 Kick off router with logging and settings import
918 """ 918 """
919 setup_logger('eventmq') 919 setup_logger('eventmq')
920 import_settings() 920 import_settings('router')
921 self.start(frontend_addr=conf.FRONTEND_ADDR, 921 self.start(frontend_addr=conf.FRONTEND_LISTEN_ADDR,
922 backend_addr=conf.BACKEND_ADDR, 922 backend_addr=conf.BACKEND_LISTEN_ADDR,
923 administrative_addr=conf.ADMINISTRATIVE_ADDR) 923 administrative_addr=conf.ADMINISTRATIVE_LISTEN_ADDR)
924 924
925 925
926def router_on_full(): 926def router_on_full():
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 0d953a6..e8188a0 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -52,9 +52,10 @@ class Scheduler(HeartbeatMixin, EMQPService):
52 52
53 def __init__(self, *args, **kwargs): 53 def __init__(self, *args, **kwargs):
54 self.name = kwargs.get('name', None) 54 self.name = kwargs.get('name', None)
55
56 logger.info('Initializing Scheduler...') 55 logger.info('Initializing Scheduler...')
57 import_settings() 56
57 import_settings('scheduler')
58
58 super(Scheduler, self).__init__(*args, **kwargs) 59 super(Scheduler, self).__init__(*args, **kwargs)
59 self.frontend = Sender() 60 self.frontend = Sender()
60 self._redis_server = None 61 self._redis_server = None
@@ -82,7 +83,6 @@ class Scheduler(HeartbeatMixin, EMQPService):
82 self.interval_jobs = {} 83 self.interval_jobs = {}
83 84
84 self.poller = Poller() 85 self.poller = Poller()
85
86 self.load_jobs() 86 self.load_jobs()
87 87
88 self._setup() 88 self._setup()
@@ -189,10 +189,10 @@ class Scheduler(HeartbeatMixin, EMQPService):
189 if self._redis_server is None: 189 if self._redis_server is None:
190 try: 190 try:
191 self._redis_server = \ 191 self._redis_server = \
192 redis.StrictRedis(host=conf.RQ_HOST, 192 redis.StrictRedis(host=conf.REDIS_HOST,
193 port=conf.RQ_PORT, 193 port=conf.REDIS_PORT,
194 db=conf.RQ_DB, 194 db=conf.REDIS_DB,
195 password=conf.RQ_PASSWORD) 195 password=conf.REDIS_PASSWORD)
196 return self._redis_server 196 return self._redis_server
197 197
198 except Exception as e: 198 except Exception as e:
@@ -222,7 +222,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
222 logger.info("Received DISCONNECT request: {}".format(message)) 222 logger.info("Received DISCONNECT request: {}".format(message))
223 self._redis_server.connection_pool.disconnect() 223 self._redis_server.connection_pool.disconnect()
224 sendmsg(self.frontend, KBYE) 224 sendmsg(self.frontend, KBYE)
225 self.frontend.unbind(conf.SCHEDULER_ADDR) 225 self.frontend.unbind(conf.CONNECT_ADDR)
226 super(Scheduler, self).on_disconnect(msgid, message) 226 super(Scheduler, self).on_disconnect(msgid, message)
227 227
228 def on_kbye(self, msgid, msg): 228 def on_kbye(self, msgid, msg):
@@ -425,9 +425,9 @@ class Scheduler(HeartbeatMixin, EMQPService):
425 Kick off scheduler with logging and settings import 425 Kick off scheduler with logging and settings import
426 """ 426 """
427 setup_logger("eventmq") 427 setup_logger("eventmq")
428 import_settings() 428 import_settings('scheduler')
429 self.__init__() 429 self.__init__()
430 self.start(addr=conf.SCHEDULER_ADDR) 430 self.start(addr=conf.CONNECT_ADDR)
431 431
432 432
433# Entry point for pip console scripts 433# Entry point for pip console scripts
diff --git a/eventmq/subscriber.py b/eventmq/subscriber.py
deleted file mode 100644
index f585572..0000000
--- a/eventmq/subscriber.py
+++ /dev/null
@@ -1,21 +0,0 @@
1"""
2derp subscriber
3"""
4import zmq
5
6
7if __name__ == "__main__":
8 sockets = []
9 for i in xrange(100):
10 ctx = zmq.Context()
11 s = ctx.socket(zmq.SUB)
12 s.linger = 0
13 s.setsockopt(zmq.SUBSCRIBE, str(i))
14 s.connect('tcp://127.0.0.1:47299')
15 sockets.append(s)
16
17 while True:
18 # block until something comes in. normally you'd do something with
19 # this in another thread or something
20 for s in sockets:
21 print s.recv_multipart() # noqa
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index 82fed3f..7b62eea 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -119,7 +119,7 @@ class TestCase(unittest.TestCase):
119 119
120 # called once for the default settings, once for the jobmanager 120 # called once for the default settings, once for the jobmanager
121 # settings 121 # settings
122 self.assertEqual(2, import_settings_mock.call_count) 122 self.assertEqual(1, import_settings_mock.call_count)
123 # check to see if the last call was called with the jobmanager section 123 # check to see if the last call was called with the jobmanager section
124 import_settings_mock.assert_called_with(section='jobmanager') 124 import_settings_mock.assert_called_with(section='jobmanager')
125 125
@@ -140,18 +140,18 @@ class TestCase(unittest.TestCase):
140 140
141 jm.jobmanager_main() 141 jm.jobmanager_main()
142 142
143 self.assertEqual(2, import_settings_mock.call_count) 143 self.assertEqual(1, import_settings_mock.call_count)
144 # Assert that the last call to import settings was for the jobmanager 144 # Assert that the last call to import settings was for the jobmanager
145 # section 145 # section
146 import_settings_mock.assert_called_with(section='jobmanager') 146 import_settings_mock.assert_called_with(section='jobmanager')
147 147
148 start_mock.assert_called_with(addr=conf.WORKER_ADDR, 148 start_mock.assert_called_with(addr=conf.CONNECT_ADDR,
149 queues=conf.QUEUES) 149 queues=conf.QUEUES)
150 150
151 jm.queues = ((10, 'derp'), (0, 'blurp')) 151 jm.queues = ((10, 'derp'), (0, 'blurp'))
152 jm.jobmanager_main() 152 jm.jobmanager_main()
153 153
154 start_mock.assert_called_with(addr=conf.WORKER_ADDR, 154 start_mock.assert_called_with(addr=conf.CONNECT_ADDR,
155 queues=jm.queues) 155 queues=jm.queues)
156 156
157 def cleanup(self): 157 def cleanup(self):
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index 10441eb..95c5e74 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -37,8 +37,9 @@ class TestCase(unittest.TestCase):
37 def test_start(self, event_loop_mock, zsocket_bind_mock): 37 def test_start(self, event_loop_mock, zsocket_bind_mock):
38 # Test default args 38 # Test default args
39 self.router.start() 39 self.router.start()
40 self.router.frontend.listen.assert_called_with(conf.FRONTEND_ADDR) 40 self.router.frontend.listen.assert_called_with(
41 self.router.backend.listen.assert_called_with(conf.BACKEND_ADDR) 41 conf.FRONTEND_LISTEN_ADDR)
42 self.router.backend.listen.assert_called_with(conf.BACKEND_LISTEN_ADDR)
42 self.assertEqual(self.router.status, constants.STATUS.listening) 43 self.assertEqual(self.router.status, constants.STATUS.listening)
43 44
44 # Test invalid args 45 # Test invalid args
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py
index ae7dd50..1d72146 100644
--- a/eventmq/tests/test_scheduler.py
+++ b/eventmq/tests/test_scheduler.py
@@ -18,8 +18,6 @@ import unittest
18 18
19from .. import constants, scheduler 19from .. import constants, scheduler
20 20
21ADDR = 'inproc://pour_the_rice_in_the_thing'
22
23 21
24class TestCase(unittest.TestCase): 22class TestCase(unittest.TestCase):
25 def test__setup(self): 23 def test__setup(self):
diff --git a/eventmq/tests/test_utils_settings.py b/eventmq/tests/test_utils_settings.py
index 2d94d4f..589d292 100644
--- a/eventmq/tests/test_utils_settings.py
+++ b/eventmq/tests/test_utils_settings.py
@@ -25,59 +25,73 @@ from ..utils import settings
25class TestCase(unittest.TestCase): 25class TestCase(unittest.TestCase):
26 settings_ini = "\n".join( 26 settings_ini = "\n".join(
27 ("[global]", 27 ("[global]",
28 "super_debug=TRuE", 28 "super_debug=FaLsE",
29 "frontend_addr=tcp://0.0.0.0:47291", 29 "hide_heartbeat_logs=false",
30 "frontend_listen_addr=tcp://1.2.3.4:1234",
30 "", 31 "",
31 "[jobmanager]", 32 "[jobmanager]",
32 "super_debug=FalSe", 33 "hide_heartbeat_logs=true",
34 "listen_addr=tcp://160.254.23.88:47290",
33 'queues=[[50,"google"], [40,"pushes"], [10,"default"]]', 35 'queues=[[50,"google"], [40,"pushes"], [10,"default"]]',
34 "worker_addr=tcp://160.254.23.88:47290", 36 "concurrent_jobs=9283",
35 "concurrent_jobs=9283",)) 37 "",
38 "[router]",
39 "# Listen addresses",
40 "frontend_listen_addr=tcp://123.211.1.1:47291",
41 "backend_listen_addr=tcp://123.211.1.1:47290",
42 "# Log all messages",
43 "super_dEbug=TrUe",
44 ""))
36 45
37 def test_import_settings_default(self): 46 def test_import_settings_default(self):
38 # sometimes the tests step on each other with this module. reloading 47 # Test loading the global section only
39 # ensures fresh test data 48 with LogCapture() as log_checker:
40 reload(conf) 49 with utils.mock_config_file(self.settings_ini):
41 50 settings.import_settings()
42 # Global section
43 # --------------
44 with utils.mock_config_file(self.settings_ini):
45 settings.import_settings()
46
47 # Changed. Default is 127.0.0.1:47291
48 self.assertEqual(conf.FRONTEND_ADDR, 'tcp://0.0.0.0:47291')
49
50 # Changed. Default is false
51 self.assertTrue(conf.SUPER_DEBUG, True)
52 51
53 # Default True 52 log_checker.check(
54 self.assertTrue(conf.HIDE_HEARTBEAT_LOGS) 53 ('eventmq.utils.settings',
54 'DEBUG',
55 'Setting conf.SUPER_DEBUG to False'),
56 ('eventmq.utils.settings',
57 'DEBUG',
58 'Setting conf.HIDE_HEARTBEAT_LOGS to False'),
59 ('eventmq.utils.settings',
60 'WARNING',
61 'Ignoring ambiguous setting defined in global section: '
62 'frontend_listen_addr=tcp://1.2.3.4:1234'))
63
64 # Defined in the global section
65 self.assertFalse(conf.SUPER_DEBUG)
66 self.assertFalse(conf.HIDE_HEARTBEAT_LOGS)
55 67
56 # Default is 4 68 # Defaults defefined in conf.py
57 self.assertEqual(conf.CONCURRENT_JOBS, 4) 69 self.assertEqual(conf.CONCURRENT_JOBS, 4)
58 70 self.assertEqual(conf.QUEUES, [(10, 'default'), ])
59 # Default is (10, 'default')
60 self.assertEqual(conf.QUEUES, [(10, conf.DEFAULT_QUEUE_NAME), ])
61 71
62 def test_read_section(self): 72 def test_read_section(self):
73 # Test reading the router section
63 with utils.mock_config_file(self.settings_ini): 74 with utils.mock_config_file(self.settings_ini):
64 settings.import_settings('jobmanager') 75 settings.import_settings('router')
65 76
66 # Changed 77 # Changed in global section
67 self.assertFalse(conf.SUPER_DEBUG) 78 self.assertFalse(conf.HIDE_HEARTBEAT_LOGS)
68 # Changed
69 self.assertEqual(conf.CONCURRENT_JOBS, 9283)
70 79
71 # Changed 80 # Changed in router section
72 self.assertEqual(conf.QUEUES, 81 self.assertEqual(conf.FRONTEND_LISTEN_ADDR, 'tcp://123.211.1.1:47291')
73 [(50, 'google'), (40, 'pushes'), (10, 'default')]) 82 self.assertEqual(conf.BACKEND_LISTEN_ADDR, 'tcp://123.211.1.1:47290')
83 self.assertTrue(conf.SUPER_DEBUG)
84
85 # # Changed
86 # self.assertEqual(conf.QUEUES,
87 # [(50, 'google'), (40, 'pushes'), (10, 'default')])
74 88
75 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') 89 # self.assertEqual(conf.CONNECT_ADDR, 'tcp://160.254.23.88:47290')
76 90
77 def test_invalid_section(self): 91 def test_invalid_section(self):
78 conf.CONCURRENT_JOBS = 1234 92 conf.CONCURRENT_JOBS = 1234
79 conf.QUEUES = [(50, 'google'), (40, 'pushes'), (10, 'default')] 93 conf.QUEUES = [(50, 'google'), (40, 'pushes'), (10, 'default')]
80 conf.WORKER_ADDR = 'tcp://160.254.23.88:47290' 94 conf.CONNECT_ADDR = 'tcp://160.254.23.88:47290'
81 95
82 # Invalid section 96 # Invalid section
83 with utils.mock_config_file(self.settings_ini): 97 with utils.mock_config_file(self.settings_ini):
@@ -87,7 +101,7 @@ class TestCase(unittest.TestCase):
87 self.assertEqual(conf.CONCURRENT_JOBS, 1234) 101 self.assertEqual(conf.CONCURRENT_JOBS, 1234)
88 self.assertEqual(conf.QUEUES, 102 self.assertEqual(conf.QUEUES,
89 [(50, 'google'), (40, 'pushes'), (10, 'default')]) 103 [(50, 'google'), (40, 'pushes'), (10, 'default')])
90 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') 104 self.assertEqual(conf.CONNECT_ADDR, 'tcp://160.254.23.88:47290')
91 # Default value 105 # Default value
92 self.assertEqual(conf.DEFAULT_QUEUE_NAME, 'default') 106 self.assertEqual(conf.DEFAULT_QUEUE_NAME, 'default')
93 107
@@ -109,7 +123,7 @@ class TestCase(unittest.TestCase):
109 self.assertRaises(ValueError, 123 self.assertRaises(ValueError,
110 settings.import_settings, 'jobmanager') 124 settings.import_settings, 'jobmanager')
111 125
112 def test_parse_array(self): 126 def test_parse_string_array(self):
113 # Tests parsing a non-nested array (nested are tested via QUEUES 127 # Tests parsing a non-nested array (nested are tested via QUEUES
114 # setting) via FAKE_VALUE 128 # setting) via FAKE_VALUE
115 settings_ini = "\n".join( 129 settings_ini = "\n".join(
@@ -124,6 +138,20 @@ class TestCase(unittest.TestCase):
124 138
125 self.assertEqual(conf.FAKE_VALUE, [u'asdf', u'asdf2']) 139 self.assertEqual(conf.FAKE_VALUE, [u'asdf', u'asdf2'])
126 140
141 def test_parse_dict_array(self):
142 # Tests parsing an array of dictionaries
143 settings_ini = '\n'.join(
144 ("[jobmanager]",
145 'fake_value=[{"key1": "value1"}, {"key2": "value2"}]')
146 )
147 reload(conf)
148 conf.FAKE_VALUE = [{'default': 1}]
149 with utils.mock_config_file(settings_ini):
150 settings.import_settings('jobmanager')
151
152 self.assertEqual(conf.FAKE_VALUE,
153 [{u"key1": u"value1"}, {u"key2": u"value2"}])
154
127 def test_invalid_setting(self): 155 def test_invalid_setting(self):
128 settings_ini = "\n".join( 156 settings_ini = "\n".join(
129 ("[global]", 157 ("[global]",
@@ -138,4 +166,5 @@ class TestCase(unittest.TestCase):
138 log_checker.check( 166 log_checker.check(
139 ('eventmq.utils.settings', 167 ('eventmq.utils.settings',
140 'WARNING', 168 'WARNING',
141 'Tried to set invalid setting: nonexistent_setting')) 169 'Tried to set invalid setting: nonexistent_setting=rabbit '
170 'blood'))
diff --git a/eventmq/utils/settings.py b/eventmq/utils/settings.py
index 30c4dee..2e6fa5c 100644
--- a/eventmq/utils/settings.py
+++ b/eventmq/utils/settings.py
@@ -30,7 +30,8 @@ logger = logging.getLogger(__name__)
30 30
31def import_settings(section='global'): 31def import_settings(section='global'):
32 """ 32 """
33 Import settings and apply to configuration globals 33 Import settings from the defined config file. This will import all the
34 settings from the `global` section followed by the requested section.
34 35
35 Args: 36 Args:
36 section (str): Name of the INI section to import 37 section (str): Name of the INI section to import
@@ -40,39 +41,66 @@ def import_settings(section='global'):
40 if os.path.exists(conf.CONFIG_FILE): 41 if os.path.exists(conf.CONFIG_FILE):
41 config.read(conf.CONFIG_FILE) 42 config.read(conf.CONFIG_FILE)
42 43
44 if config.has_section('global'):
45 _load_section(config, 'global')
46
47 if section == 'global':
48 # If the requested section is the default then there is nothing
49 # left to do
50 return
51
43 if not config.has_section(section): 52 if not config.has_section(section):
44 logger.warning( 53 logger.warning(
45 'Tried to read nonexistent section {}'.format(section)) 54 'Tried to read nonexistent section {}'.format(section))
46 return 55 return
47 56
48 for name, value in config.items(section): 57 _load_section(config, section)
49 if hasattr(conf, name.upper()): 58 else:
50 default_value = getattr(conf, name.upper()) 59 logger.warning('Config file at {} not found. Continuing with '
51 t = type(default_value) 60 'defaults.'.format(conf.CONFIG_FILE))
52 if isinstance(default_value, (list, tuple)): 61
53 try: 62
54 value = t(json.loads(value)) 63def _load_section(config, section):
55 except ValueError: 64 """
56 raise ValueError( 65 Load the requested section into the configuration globals in
57 "Invalid JSON syntax for {} setting".format(name)) 66 :mod:`eventmq.conf`
58 # json.loads coverts all arrays to lists, but if the first 67
59 # element in the default is a tuple (like in QUEUES) then 68 Args:
60 # convert those elements, otherwise whatever it's type is 69 section (str): Name of the INI section to import
61 # correct 70 """
62 if isinstance(default_value[0], tuple): 71 for name, value in config.items(section):
63 setattr(conf, name.upper(), 72 if hasattr(conf, name.upper()):
64 t(map(tuplify, value))) 73
65 else: 74 if section == 'global' and \
66 setattr(conf, name.upper(), t(value)) 75 name.lower() in conf._AMBIGUOUS_SETTINGS:
67 elif isinstance(default_value, bool): 76 logger.warning('Ignoring ambiguous setting defined in {} '
77 'section: {}={}'.format(section, name, value))
78 continue
79
80 default_value = getattr(conf, name.upper())
81 t = type(default_value)
82 if isinstance(default_value, (list, tuple)):
83 try:
84 value = t(json.loads(value))
85 except ValueError:
86 raise ValueError(
87 "Invalid JSON syntax for {} setting".format(name))
88 # json.loads coverts all arrays to lists, but if the first
89 # element in the default is a tuple (like in QUEUES) then
90 # convert those elements, otherwise whatever it's type is
91 # correct
92 if isinstance(default_value[0], tuple):
68 setattr(conf, name.upper(), 93 setattr(conf, name.upper(),
69 True if 't' in value.lower() else False) 94 t(map(tuplify, value)))
70 else: 95 else:
71 setattr(conf, name.upper(), t(value)) 96 setattr(conf, name.upper(), t(value))
72 logger.debug("Setting conf.{} to {}".format( 97 elif isinstance(default_value, bool):
73 name.upper(), getattr(conf, name.upper()))) 98 setattr(conf, name.upper(),
99 True if 't' in value.lower() else False)
74 else: 100 else:
75 logger.warning('Tried to set invalid setting: %s' % name) 101 setattr(conf, name.upper(), t(value))
76 else: 102 logger.debug("Setting conf.{} to {}".format(
77 logger.warning('Config file at {} not found. Continuing with ' 103 name.upper(), getattr(conf, name.upper())))
78 'defaults.'.format(conf.CONFIG_FILE)) 104 else:
105 logger.warning('Tried to set invalid setting: {}={}'.format(
106 name, value))