aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-02-17 16:46:12 -0700
committerjason2017-02-17 16:46:12 -0700
commitda3538dcb8d189905116709b0d1bdb36910e36c6 (patch)
treedb94ade7bf967ff5fcd6b3e42a4e03361505629c
parent6237f10994f940bb83a276a4ea41f11e5daf7af7 (diff)
parent35101b668191aa084d1d115d4590e7d412e5282a (diff)
downloadeventmq-da3538dcb8d189905116709b0d1bdb36910e36c6.tar.gz
eventmq-da3538dcb8d189905116709b0d1bdb36910e36c6.zip
Merge branch 'feature/schedule_helpers' into 0.4
-rw-r--r--README.md12
-rwxr-xr-xbin/send_msg2
-rwxr-xr-xbin/send_publish12
-rw-r--r--docs/settings_file.rst11
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/constants.py1
-rw-r--r--eventmq/jobmanager.py14
-rw-r--r--eventmq/scheduler.py1
-rw-r--r--eventmq/tests/test_jobmanager.py11
-rw-r--r--eventmq/utils/classes.py3
-rw-r--r--eventmq/worker.py26
-rw-r--r--setup.py8
12 files changed, 60 insertions, 43 deletions
diff --git a/README.md b/README.md
index 908d7e1..7c1e84a 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
1EventMQ 1EventMQ
2======= 2=======
3[![Circle CI](https://circleci.com/gh/enderlabs/eventmq.svg?style=svg&circle-token=312da6ae260c2302baed268d2e052ce5e81cc71f)](https://circleci.com/gh/enderlabs/eventmq) 3[![CircleCI](https://circleci.com/gh/eventmq/eventmq.svg?style=svg)](https://circleci.com/gh/eventmq/eventmq)
4[![Coverage Status](https://coveralls.io/repos/github/enderlabs/eventmq/badge.svg?branch=master)](https://coveralls.io/github/enderlabs/eventmq?branch=master) 4[![Coverage Status](https://coveralls.io/repos/github/eventmq/eventmq/badge.svg)](https://coveralls.io/github/eventmq/eventmq)
5 5
6# Overview 6# Overview
7EventMQ is a message passing system built on [ZeroMQ](https://zeromq.org) 7EventMQ is a message passing system built on [ZeroMQ](https://zeromq.org)
@@ -17,14 +17,18 @@ pip install eventmq
17# Support 17# Support
18## Documenation 18## Documenation
19 19
20[Documentation](https://enderlabs.github.io/eventmq/) 20[Documentation](https://eventmq.github.io/eventmq/)
21 21
22## Mailing Lists 22## Mailing Lists
23User Support: http://lists.eventmq.io/listinfo.cgi/eventmq-users-eventmq.io 23User Support: http://lists.eventmq.io/listinfo.cgi/eventmq-users-eventmq.io
24 24
25Development Discussion: http://lists.eventmq.io/listinfo.cgi/eventmq-devel-eventmq.io 25Development Discussion: http://lists.eventmq.io/listinfo.cgi/eventmq-devel-eventmq.io
26 26
27## Quick Start 27## IRC
28
29 #eventmq on [irc.freenode.net](https://webchat.freenode.net/?channels=#eventmq)
30
31# Quick Start
28 32
29my_jerbs.py 33my_jerbs.py
30``` python 34``` python
diff --git a/bin/send_msg b/bin/send_msg
index b0ea160..19e88de 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -26,7 +26,7 @@ if __name__ == "__main__":
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
29 send_request(s, msg, guarantee=True, reply_requested=True, timeout=10) 29 send_request(s, msg, guarantee=True, reply_requested=True, timeout=1)
30 print zmq.POLLOUT 30 print zmq.POLLOUT
31 events = dict(poller.poll(500)) 31 events = dict(poller.poll(500))
32 print events 32 print events
diff --git a/bin/send_publish b/bin/send_publish
index 140be63..50f430e 100755
--- a/bin/send_publish
+++ b/bin/send_publish
@@ -12,13 +12,5 @@ if __name__ == "__main__":
12 s.connect(sys.argv[1]) 12 s.connect(sys.argv[1])
13 13
14 msg = ['update', '1'] 14 msg = ['update', '1']
15 j = 0 15 topic = str('topic')
16 while True: 16 send_publish_request(s, topic, msg)
17
18 for i in xrange(10):
19 topic = str(i)
20 send_publish_request(s, topic, msg)
21
22
23 print j
24 j += 1
diff --git a/docs/settings_file.rst b/docs/settings_file.rst
index cd37b6a..17d3b67 100644
--- a/docs/settings_file.rst
+++ b/docs/settings_file.rst
@@ -33,13 +33,14 @@ running. Grouping similar jobs in named queues will help you tune this number.
33 33
34queues 34queues
35====== 35======
36Default: (10, default) 36Default: [[10, "default"]]
37 37
38Semi-colon seperated list of queues to process jobs for with thier 38Comma seperated list of queues to process jobs for with thier weights. This list
39weights. Example: ``queues=(10, data_process); (15, email)``. With these 39must be valid JSON otherwise an error will be thrown.
40Example: ``queues=[[10, "data_process"], [15, "email"]]``. With these
40weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers 41weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers
41running jobs locally pretty efficiently. If you have a larger box with a weight 42running jobs locally efficiently. If you have a larger server with a weight of
42of 50 on q1 and 8 concurrent jobs and a smaller box with a weight 30 and 4 4350 on q1 and 8 concurrent jobs and a smaller server with a weight 30 and 4
43concurrent jobs, the q1 jobs will be sent to the large box until it is no longer 44concurrent jobs, the q1 jobs will be sent to the large box until it is no longer
44accepting jobs. At this point jobs will start to be sent to the next highest 45accepting jobs. At this point jobs will start to be sent to the next highest
45number until the large box is ready to accept another q1 job. 46number until the large box is ready to accept another q1 job.
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index 8ec8043..ca9b517 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3-rc10' 2__version__ = '0.3.1'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/constants.py b/eventmq/constants.py
index 8dbf530..17c6130 100644
--- a/eventmq/constants.py
+++ b/eventmq/constants.py
@@ -7,6 +7,7 @@ class STATUS(object):
7 connected = 201 7 connected = 201
8 stopping = 300 8 stopping = 300
9 stopped = 301 9 stopped = 301
10 running = 400 # Running and accepting jobs actively
10 11
11 12
12class CLIENT_TYPE(object): 13class CLIENT_TYPE(object):
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 12629ee..ea5af07 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -27,7 +27,7 @@ import zmq
27 27
28from eventmq.log import setup_logger 28from eventmq.log import setup_logger
29from . import conf 29from . import conf
30from .constants import KBYE 30from .constants import KBYE, STATUS
31from .poller import Poller, POLLIN 31from .poller import Poller, POLLIN
32from .sender import Sender 32from .sender import Sender
33from .utils.classes import EMQPService, HeartbeatMixin 33from .utils.classes import EMQPService, HeartbeatMixin
@@ -37,6 +37,7 @@ from .utils.messages import send_emqp_message as sendmsg
37from .utils.settings import import_settings 37from .utils.settings import import_settings
38from .worker import MultiprocessWorker as Worker 38from .worker import MultiprocessWorker as Worker
39 39
40
40if sys.version[0] == '2': 41if sys.version[0] == '2':
41 import Queue 42 import Queue
42else: 43else:
@@ -124,9 +125,12 @@ class JobManager(HeartbeatMixin, EMQPService):
124 for i in range(0, len(self.workers)): 125 for i in range(0, len(self.workers)):
125 self.send_ready() 126 self.send_ready()
126 127
128 self.status = STATUS.running
129
127 while True: 130 while True:
128 # Clear any workers if it's time to shut down 131 # Clear any workers if it's time to shut down
129 if self.received_disconnect: 132 if self.received_disconnect:
133 self.status = STATUS.stopping
130 for _ in range(0, len(self.workers)): 134 for _ in range(0, len(self.workers)):
131 logger.debug('Requesting worker death...') 135 logger.debug('Requesting worker death...')
132 self.request_queue.put_nowait('DONE') 136 self.request_queue.put_nowait('DONE')
@@ -253,7 +257,8 @@ class JobManager(HeartbeatMixin, EMQPService):
253 in :meth:`self.process_message` as every message is counted as a 257 in :meth:`self.process_message` as every message is counted as a
254 HEARTBEAT 258 HEARTBEAT
255 """ 259 """
256 self.check_worker_health() 260 if self.status == STATUS.running:
261 self.check_worker_health()
257 262
258 def check_worker_health(self): 263 def check_worker_health(self):
259 """ 264 """
@@ -282,10 +287,11 @@ class JobManager(HeartbeatMixin, EMQPService):
282 287
283 def sighup_handler(self, signum, frame): 288 def sighup_handler(self, signum, frame):
284 logger.info('Caught signal %s' % signum) 289 logger.info('Caught signal %s' % signum)
285 self.outgoing.rebuild()
286 import_settings() 290 import_settings()
287 import_settings(section='jobmanager') 291 import_settings(section='jobmanager')
288 self.start(addr=conf.WORKER_ADDR) 292
293 self.should_reset = True
294 self.received_disconnect = True
289 295
290 def sigterm_handler(self, signum, frame): 296 def sigterm_handler(self, signum, frame):
291 logger.info('Shutting down..') 297 logger.info('Shutting down..')
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 8e0f5ac..4bac781 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -401,7 +401,6 @@ class Scheduler(HeartbeatMixin, EMQPService):
401 Returns: 401 Returns:
402 int: unique hash for the job 402 int: unique hash for the job
403 """ 403 """
404
405 # Get the job portion of the message 404 # Get the job portion of the message
406 msg = deserialize(message[3])[1] 405 msg = deserialize(message[3])[1]
407 406
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index d0497bc..db82843 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -111,27 +111,18 @@ class TestCase(unittest.TestCase):
111 self.assertTrue(jm.received_disconnect, "Did not receive disconnect.") 111 self.assertTrue(jm.received_disconnect, "Did not receive disconnect.")
112 112
113 # Other Tests 113 # Other Tests
114 @mock.patch('eventmq.jobmanager.JobManager.start')
115 @mock.patch('eventmq.jobmanager.import_settings') 114 @mock.patch('eventmq.jobmanager.import_settings')
116 @mock.patch('eventmq.jobmanager.Sender.rebuild') 115 def test_sighup_handler(self, import_settings_mock):
117 def test_sighup_handler(self, rebuild_mock, import_settings_mock,
118 start_mock):
119 jm = jobmanager.JobManager() 116 jm = jobmanager.JobManager()
120 117
121 jm.sighup_handler(982374, "FRAMEY the frame") 118 jm.sighup_handler(982374, "FRAMEY the frame")
122 119
123 self.assertTrue(rebuild_mock.called)
124
125 # called once for the default settings, once for the jobmanager 120 # called once for the default settings, once for the jobmanager
126 # settings 121 # settings
127 self.assertEqual(2, import_settings_mock.call_count) 122 self.assertEqual(2, import_settings_mock.call_count)
128 # check to see if the last call was called with the jobmanager section 123 # check to see if the last call was called with the jobmanager section
129 import_settings_mock.assert_called_with(section='jobmanager') 124 import_settings_mock.assert_called_with(section='jobmanager')
130 125
131 start_mock.assert_called_with(
132 addr=conf.WORKER_ADDR,
133 )
134
135 @mock.patch('eventmq.jobmanager.sendmsg') 126 @mock.patch('eventmq.jobmanager.sendmsg')
136 def test_sigterm_handler(self, sendmsg_mock): 127 def test_sigterm_handler(self, sendmsg_mock):
137 jm = jobmanager.JobManager() 128 jm = jobmanager.JobManager()
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index d1ca275..39c600b 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -119,6 +119,7 @@ class EMQPService(object):
119 self.poller.register(self.outgoing, poller.POLLIN) 119 self.poller.register(self.outgoing, poller.POLLIN)
120 self.awaiting_startup_ack = False 120 self.awaiting_startup_ack = False
121 self.received_disconnect = False 121 self.received_disconnect = False
122 self.should_reset = False
122 123
123 self.status = constants.STATUS.ready 124 self.status = constants.STATUS.ready
124 125
@@ -165,7 +166,7 @@ class EMQPService(object):
165 166
166 # When we return, soemthing has gone wrong and try to reconnect 167 # When we return, soemthing has gone wrong and try to reconnect
167 # unless self.received_disconnect is True 168 # unless self.received_disconnect is True
168 if not self.received_disconnect: 169 if not self.received_disconnect or self.should_reset:
169 self.reset() 170 self.reset()
170 171
171 logger.info('Death.') 172 logger.info('Death.')
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 7729120..8d25c63 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -21,13 +21,29 @@ from importlib import import_module
21import logging 21import logging
22from multiprocessing import Process 22from multiprocessing import Process
23import os 23import os
24from threading import Thread 24from threading import Event, Thread
25 25
26from . import conf 26from . import conf
27 27
28logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
29 29
30 30
31class StoppableThread(Thread):
32 """Thread class with a stop() method. The thread itself has to check
33 regularly for the stopped() condition."""
34
35 def __init__(self, target, name=None, args=()):
36 super(StoppableThread, self).__init__(name=name, target=target,
37 args=args)
38 self._stop = Event()
39
40 def stop(self):
41 self._stop.set()
42
43 def stopped(self):
44 return self._stop.isSet()
45
46
31class MultiprocessWorker(Process): 47class MultiprocessWorker(Process):
32 """ 48 """
33 Defines a worker that spans the job in a multiprocessing task 49 Defines a worker that spans the job in a multiprocessing task
@@ -60,12 +76,13 @@ class MultiprocessWorker(Process):
60 76
61 try: 77 try:
62 if timeout: 78 if timeout:
63 worker_thread = Thread(target=_run, 79 worker_thread = StoppableThread(target=_run,
64 args=(payload['params'], )) 80 args=(payload['params'], ))
65 worker_thread.start() 81 worker_thread.start()
66 worker_thread.join(timeout) 82 worker_thread.join(timeout)
67 83
68 if worker_thread.isAlive(): 84 if worker_thread.isAlive():
85 worker_thread.stop()
69 resp['return'] = 'TimeoutError' 86 resp['return'] = 'TimeoutError'
70 else: 87 else:
71 resp['return'] = 'DONE' 88 resp['return'] = 'DONE'
@@ -80,6 +97,9 @@ class MultiprocessWorker(Process):
80 if self.job_count > conf.MAX_JOB_COUNT: 97 if self.job_count > conf.MAX_JOB_COUNT:
81 break 98 break
82 99
100 if resp['return'] == 'TimeoutError':
101 break
102
83 logger.debug("Worker death, PID: {}".format(os.getpid())) 103 logger.debug("Worker death, PID: {}".format(os.getpid()))
84 104
85 105
diff --git a/setup.py b/setup.py
index c5315de..067b352 100644
--- a/setup.py
+++ b/setup.py
@@ -7,8 +7,8 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3-rc10', 10 version='0.3.1',
11 description='EventMQ messaging system based on ZeroMQ', 11 description='EventMQ job execution and messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==16.0.2', 13 install_requires=['pyzmq==16.0.2',
14 'six==1.10.0', 14 'six==1.10.0',
@@ -32,7 +32,7 @@ setup(
32 'mock==1.3.0'], 32 'mock==1.3.0'],
33 }, 33 },
34 author='EventMQ Contributors', 34 author='EventMQ Contributors',
35 url='https://github.com/enderlabs/eventmq/', 35 url='https://github.com/eventmq/eventmq/',
36 36
37 # See https://pypi.python.org/pypi?%3Aaction=list_classifiers 37 # See https://pypi.python.org/pypi?%3Aaction=list_classifiers
38 classifiers=[ 38 classifiers=[
@@ -53,6 +53,8 @@ setup(
53 # that you indicate whether you support Python 2, Python 3 or both. 53 # that you indicate whether you support Python 2, Python 3 or both.
54 'Programming Language :: Python :: 2.6', 54 'Programming Language :: Python :: 2.6',
55 'Programming Language :: Python :: 2.7', 55 'Programming Language :: Python :: 2.7',
56 'Programming Language :: Python :: 3.4',
57 'Programming Language :: Python :: 3.5',
56 ], 58 ],
57 scripts=[ 59 scripts=[
58 'bin/emq-cli', 60 'bin/emq-cli',