aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2016-06-30 15:24:57 -0600
committerGitHub2016-06-30 15:24:57 -0600
commitd687fab75c8ae61a9e4d602f1fbdf9003100cb22 (patch)
tree977e2febfc35172d418bf287d3937dedd44d5b21
parentc7acf27259f5145722c3d9f29d70e8e6af48b959 (diff)
parent6463dbf471e4806a1a0fb9e46b465cfa1bb6b22a (diff)
downloadeventmq-0.2.3.tar.gz
eventmq-0.2.3.zip
Merge pull request #35 from sideshowdave7/feature/replies_and_latency0.2.3
Feature: job latencies
-rwxr-xr-xbin/send_request24
-rw-r--r--eventmq/jobmanager.py60
-rw-r--r--eventmq/router.py26
-rw-r--r--eventmq/tests/test_jobmanager.py2
-rw-r--r--eventmq/worker.py13
5 files changed, 107 insertions, 18 deletions
diff --git a/bin/send_request b/bin/send_request
new file mode 100755
index 0000000..aa84639
--- /dev/null
+++ b/bin/send_request
@@ -0,0 +1,24 @@
1#!/usr/bin/env python
2"""
3Usage: ./send_msg <ipaddresss> <command> <frame w/ values space separated>
4"""
5import sys
6
7from eventmq.sender import Sender
8from eventmq.client.messages import send_request
9
10
11if __name__ == "__main__":
12 s = Sender()
13 s.connect(sys.argv[1])
14
15 msg = ['run', {
16 'path': 'eventmq.scheduler',
17 'callable': 'test_job',
18 'class_args': (),
19 'class_kwargs': {},
20 'args': (),
21 'kwargs': {}
22 }]
23
24 send_request(s, msg, guarantee=True, reply_requested=True)
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 52b3c7e..6e4cd5e 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -17,7 +17,8 @@
17================================ 17================================
18Ensures things about jobs and spawns the actual tasks 18Ensures things about jobs and spawns the actual tasks
19""" 19"""
20from json import loads as serializer 20from json import loads as deserializer
21from json import dumps as serializer
21import logging 22import logging
22import signal 23import signal
23 24
@@ -67,14 +68,13 @@ class JobManager(HeartbeatMixin, EMQPService):
67 68
68 #: keep track of workers 69 #: keep track of workers
69 concurrent_jobs = kwargs.pop('concurrent_jobs', None) 70 concurrent_jobs = kwargs.pop('concurrent_jobs', None)
70 if concurrent_jobs is None: 71 if concurrent_jobs is not None:
71 concurrent_jobs = conf.CONCURRENT_JOBS 72 conf.CONCURRENT_JOBS = concurrent_jobs
72 self.workers = Pool(processes=concurrent_jobs)
73 73
74 #: List of queues that this job manager is listening on 74 #: List of queues that this job manager is listening on
75 self.queues = kwargs.pop('queues', None) 75 self.queues = kwargs.pop('queues', None)
76 if self.queues is None: 76 if self.queues is None:
77 self.quques = conf.QUEUES 77 self.queues = conf.QUEUES
78 78
79 if not kwargs.pop('skip_signal', False): 79 if not kwargs.pop('skip_signal', False):
80 # handle any sighups by reloading config 80 # handle any sighups by reloading config
@@ -90,12 +90,23 @@ class JobManager(HeartbeatMixin, EMQPService):
90 90
91 self._setup() 91 self._setup()
92 92
93 @property
94 def workers(self):
95 if not hasattr(self, '_workers'):
96 self._workers = Pool(processes=conf.CONCURRENT_JOBS)
97 elif self._workers.processes != conf.CONCURRENT_JOBS:
98 self._workers.close()
99 self._workers = Pool(processes=conf.CONCURRENT_JOBS)
100
101 return self._workers
102
93 def _start_event_loop(self): 103 def _start_event_loop(self):
94 """ 104 """
95 Starts the actual event loop. Usually called by :meth:`start` 105 Starts the actual event loop. Usually called by :meth:`start`
96 """ 106 """
97 # Acknowledgment has come 107 # Acknowledgment has come
98 # Send a READY for each available worker 108 # Send a READY for each available worker
109
99 for i in range(0, conf.CONCURRENT_JOBS): 110 for i in range(0, conf.CONCURRENT_JOBS):
100 self.send_ready() 111 self.send_ready()
101 112
@@ -145,16 +156,26 @@ class JobManager(HeartbeatMixin, EMQPService):
145 # s_ indicates the string path vs the actual module and class 156 # s_ indicates the string path vs the actual module and class
146 # queue_name = msg[0] 157 # queue_name = msg[0]
147 158
148 # run callable 159 # Parse REQUEST message values
149 payload = serializer(msg[2]) 160 headers = msg[1]
150 # subcmd = payload[0] 161 payload = deserializer(msg[2])
151 params = payload[1] 162 params = payload[1]
152 163
164 if 'reply-requested' in headers:
165 callback = self.worker_done_with_reply
166 else:
167 callback = self.worker_done
168
169 # kick off the job asynchronously with an appropiate callback
153 self.workers.apply_async(func=worker.run, 170 self.workers.apply_async(func=worker.run,
154 args=(params,), 171 args=(params, msgid),
155 callback=self.worker_done) 172 callback=callback)
173
174 def worker_done_with_reply(self, msgid):
175 self.send_reply(msgid)
176 self.send_ready()
156 177
157 def worker_done(self, result): 178 def worker_done(self, msgid):
158 self.send_ready() 179 self.send_ready()
159 180
160 def send_ready(self): 181 def send_ready(self):
@@ -164,6 +185,23 @@ class JobManager(HeartbeatMixin, EMQPService):
164 """ 185 """
165 sendmsg(self.outgoing, 'READY') 186 sendmsg(self.outgoing, 'READY')
166 187
188 def send_reply(self, res):
189 """
190 Sends an REPLY response
191
192 Args:
193 socket (socket): The socket to use for this ack
194 recipient (str): The recipient id for the ack
195 msgid: The unique id that we are acknowledging
196 """
197 msgid = res[0]
198
199 reply = res[1]
200
201 reply = serializer(reply)
202
203 sendmsg(self.outgoing, 'REPLY', [reply, msgid])
204
167 def on_heartbeat(self, msgid, message): 205 def on_heartbeat(self, msgid, message):
168 """ 206 """
169 a placeholder for a noop command. The actual 'logic' for HEARTBEAT is 207 a placeholder for a noop command. The actual 'logic' for HEARTBEAT is
diff --git a/eventmq/router.py b/eventmq/router.py
index fbd8d80..7dd2471 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -105,6 +105,12 @@ class Router(HeartbeatMixin):
105 #: } 105 #: }
106 self.schedulers = {} 106 self.schedulers = {}
107 107
108
109 #: Latency tracking dictionary
110 #: Key: msgid of message each REQUEST received and forwarded to a worker
111 #: Value: (timestamp, queue_name)
112 self.job_latencies = {}
113
108 #: Set to True when the router should die. 114 #: Set to True when the router should die.
109 self.received_disconnect = False 115 self.received_disconnect = False
110 116
@@ -270,6 +276,25 @@ class Router(HeartbeatMixin):
270 self.add_scheduler(sender) 276 self.add_scheduler(sender)
271 self.send_ack(self.incoming, sender, msgid) 277 self.send_ack(self.incoming, sender, msgid)
272 278
279 def on_reply(self, sender, msgid, msg):
280 """
281 Handles an REPLY message. Replies are sent by the worker for latanecy
282 measurements
283 """
284
285 orig_msgid = msg[1]
286 logger.info('Received REPLY from {} (msgid: {}, ACK msgid: {})'.format(
287 sender, msgid, orig_msgid))
288
289 if orig_msgid in self.job_latencies:
290 logger.info("Completed {queue} job with msgid: {msgid} in "
291 "{time:.2f}ms".\
292 format(queue=self.job_latencies[orig_msgid][1],
293 msgid=orig_msgid,
294 time=(monotonic()-self.job_latencies[orig_msgid][0])*1000.0))
295 del self.job_latencies[orig_msgid]
296
297
273 def on_disconnect(self, msgid, msg): 298 def on_disconnect(self, msgid, msg):
274 # Loops event loops should check for this and break out 299 # Loops event loops should check for this and break out
275 self.received_disconnect = True 300 self.received_disconnect = True
@@ -364,6 +389,7 @@ class Router(HeartbeatMixin):
364 try: 389 try:
365 # Rebuild the message to be sent to the worker. fwdmsg will 390 # Rebuild the message to be sent to the worker. fwdmsg will
366 # properly address the message. 391 # properly address the message.
392 self.job_latencies[msgid] = (monotonic(), queue_name)
367 fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, 393 fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION,
368 'REQUEST', msgid, ] + msg) 394 'REQUEST', msgid, ] + msg)
369 self.workers[worker_addr]['available_slots'] -= 1 395 self.workers[worker_addr]['available_slots'] -= 1
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index 6d3c663..483dde1 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -80,7 +80,7 @@ class TestCase(unittest.TestCase):
80 80
81 jm.on_request(_msgid, _msg) 81 jm.on_request(_msgid, _msg)
82 apply_async_mock.assert_called_with( 82 apply_async_mock.assert_called_with(
83 args=({'a': 1},), 83 args=({'a': 1}, _msgid),
84 callback=jm.worker_done, 84 callback=jm.worker_done,
85 func=run_mock) 85 func=run_mock)
86 86
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 78ea97a..284f478 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -26,7 +26,7 @@ from . import log
26logger = log.setup_logger(__name__) 26logger = log.setup_logger(__name__)
27 27
28 28
29def run(payload): 29def run(payload, msgid):
30 """ 30 """
31 process a run message and execute a job 31 process a run message and execute a job
32 32
@@ -47,7 +47,7 @@ def run(payload):
47 reload(package) 47 reload(package)
48 except Exception as e: 48 except Exception as e:
49 logger.exception('Error importing module: {}'.format(str(e))) 49 logger.exception('Error importing module: {}'.format(str(e)))
50 return 'DONE' 50 return (msgid, str(e))
51 51
52 if s_cls: 52 if s_cls:
53 cls = getattr(package, s_cls) 53 cls = getattr(package, s_cls)
@@ -70,7 +70,7 @@ def run(payload):
70 callable_ = getattr(obj, s_callable) 70 callable_ = getattr(obj, s_callable)
71 except AttributeError as e: 71 except AttributeError as e:
72 logger.exception('Error getting callable: {}'.format(str(e))) 72 logger.exception('Error getting callable: {}'.format(str(e)))
73 return 'DONE' 73 return (msgid, str(e))
74 74
75 if "args" in payload: 75 if "args" in payload:
76 args = payload["args"] 76 args = payload["args"]
@@ -83,10 +83,11 @@ def run(payload):
83 kwargs = {} 83 kwargs = {}
84 84
85 try: 85 try:
86 callable_(*args, **kwargs) 86 r = callable_(*args, **kwargs)
87 return (msgid, r)
87 except Exception as e: 88 except Exception as e:
88 logger.exception(e) 89 logger.exception(e)
89 return 'DONE' 90 return (msgid, str(e))
90 91
91 # Signal that we're done with this job 92 # Signal that we're done with this job
92 return 'DONE' 93 return (msgid, '')