aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsideshowdave72016-01-20 13:36:08 -0700
committersideshowdave72016-01-20 13:36:08 -0700
commitb52d4a45ed2bb37d293e7d3e9b85cf325511fe9e (patch)
tree2546ee14d8e26c88b08ea08f533c4008e0b721d0
parent9b1271dbdf05e95ee39e5aa6d349a245fc2cf285 (diff)
parent4c216077b12bd56b6d399beccf5a035c5966daa0 (diff)
downloadeventmq-b52d4a45ed2bb37d293e7d3e9b85cf325511fe9e.tar.gz
eventmq-b52d4a45ed2bb37d293e7d3e9b85cf325511fe9e.zip
Merge pull request #15 from enderlabs/feature/scheduler0.1.2
Feature/scheduler
-rw-r--r--docs/protocol.rst53
-rw-r--r--eventmq/client/messages.py91
-rw-r--r--eventmq/constants.py6
-rw-r--r--eventmq/jobmanager.py179
-rw-r--r--eventmq/router.py217
-rw-r--r--eventmq/scheduler.py228
-rw-r--r--eventmq/tests/test_jobmanager.py148
-rw-r--r--eventmq/tests/test_router.py42
-rw-r--r--eventmq/tests/test_utils.py3
-rw-r--r--eventmq/utils/classes.py209
-rw-r--r--eventmq/utils/timeutils.py42
-rw-r--r--requirements.txt4
12 files changed, 990 insertions, 232 deletions
diff --git a/docs/protocol.rst b/docs/protocol.rst
index e468b06..f7089ad 100644
--- a/docs/protocol.rst
+++ b/docs/protocol.rst
@@ -86,10 +86,53 @@ FRAME Value Description
862 PUBLISH command 862 PUBLISH command
873 _MSGID_ A unique id for the msg 873 _MSGID_ A unique id for the msg
884 _TOPIC_NAME_ the name of the queue the worker belongs to 884 _TOPIC_NAME_ the name of the queue the worker belongs to
895 _HEADERS_ dictionary of headers. can be an empty set 895 _HEADERS_ csv list of headers
906 _MSG_ The message to send
91====== ============== ===========
92
93A **SCHEDULE** command consists of a 7-frame multipart message, formatted as follows.
94
95====== ============== ===========
96FRAME Value Description
97====== ============== ===========
980 _EMPTY_ leave empty
991 eMQP/1.0 Protocol version
1002 SCHEDULE command
1013 _MSGID_ A unique id for the msg
1024 _TOPIC_NAME_ name of queue that the job should run in
1035 _HEADERS_ csv list of headers for this message
1046 _MSG_ The message to send
105====== ============== ===========
106
107An **UNSCHEDULE** command consists of a 7-frame multipart message, formatted as follows.
108
109====== ============== ===========
110FRAME Value Description
111====== ============== ===========
1120 _EMPTY_ leave empty
1131 eMQP/1.0 Protocol version
1142 UNSCHEDULE command
1153 _MSGID_ A unique id for the msg
1164 _TOPIC_NAME_ ignored for this command, broadcasted to all queues
1175 _HEADERS_ csv list of headers for this message
906 _MSG_ The message to send 1186 _MSG_ The message to send
91====== ============== =========== 119====== ============== ===========
92 120
121eMQP / Scheduler
122----------------
123An **INFORM** command consists of a 6-frame multipart message, formatted as follows.
124
125====== ============== ===========
126FRAME Value Description
127====== ============== ===========
1280 _EMPTY_ leave empty
1291 eMQP/1.0 Protocol version
1302 INFORM command
1313 _MSGID_ A unique id for the msg
1324 _QUEUE_NAME_ csv seperated names of queue the worker belongs to
1335 scheduler type of peer connecting
134====== ============== ===========
135
93eMQP / Worker 136eMQP / Worker
94------------- 137-------------
95An **INFORM** command consists of a 5-frame multipart message, formatted as follows. 138An **INFORM** command consists of a 5-frame multipart message, formatted as follows.
@@ -102,6 +145,7 @@ FRAME Value Description
1022 INFORM command 1452 INFORM command
1033 _MSGID_ A unique id for the msg 1463 _MSGID_ A unique id for the msg
1044 _QUEUE_NAME_ csv seperated names of queue the worker belongs to 1474 _QUEUE_NAME_ csv seperated names of queue the worker belongs to
1485 worker type of peer connecting
105====== ============== =========== 149====== ============== ===========
106 150
107A **READY** frame consists of a 4-frame multipart message, formatted as follows. 151A **READY** frame consists of a 4-frame multipart message, formatted as follows.
@@ -154,11 +198,14 @@ Heartbeating
154------------ 198------------
155 * HEARTBEAT commands are valid at any time after an INFORM command 199 * HEARTBEAT commands are valid at any time after an INFORM command
156 * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands. 200 * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands.
157 * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals. 201 * Worker and broker MUST send heartbeats at regular and agreed-upon intervals.
202 * Scheduler and broker MUST send heartbeats at regular and agreed-upon intervals.
158 * If the worker detects that the broker disconnected it SHOULD restart the conversation. 203 * If the worker detects that the broker disconnected it SHOULD restart the conversation.
159 * If the broker detects that a worker has disconnected it should stop sending it a message of any type. 204 * If the broker detects that a worker has disconnected it should stop sending it a message of any type.
205 * If the scheduler detects that the broker disconnects it SHOULD restart the conversation.
206 * If the broker detects that a scheduler has disconnected it should ??????????.
160 207
161Request Headers 208REQUEST Headers
162--------------- 209---------------
163Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required. 210Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required.
164 211
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
index 66af8a9..03732f9 100644
--- a/eventmq/client/messages.py
+++ b/eventmq/client/messages.py
@@ -26,6 +26,67 @@ from ..utils.messages import send_emqp_message
26logger = logging.getLogger(__name__) 26logger = logging.getLogger(__name__)
27 27
28 28
29def schedule(socket, func, interval_secs, args=(), kwargs=None, class_args=(),
30 class_kwargs=None, headers=('guarantee',),
31 queue=conf.DEFAULT_QUEUE_NAME, unschedule=False):
32 """
33 Execute a task on a defined interval.
34
35 Args:
36 socket (socket): eventmq socket to use for sending the message
37 func (callable): the callable to be scheduled on a worker
38 minutes (int): minutes to wait in between executions
39 args (list): list of *args to pass to the callable
40 kwargs (dict): dict of **kwargs to pass to the callable
41 class_args (list): list of *args to pass to the class (if applicable)
42 class_kwargs (dict): dict of **kwargs to pass to the class (if
43 applicable)
44 headers (list): list of strings denoting enabled headers. Default:
45 guarantee is enabled to ensure the scheduler schedules the job.
46 queue (str): name of the queue to use when executing the job. The
47 default value is the default queue.
48 """
49 if not class_kwargs:
50 class_kwargs = {}
51 if not kwargs:
52 kwargs = {}
53
54 if callable(func):
55 path, callable_name = build_module_path(func)
56 else:
57 logger.error('Encountered non-callable func: {}'.format(func))
58 return False
59
60 if not callable_name:
61 logger.error('Encountered callable with no name in {}'.format(
62 func.__module__
63 ))
64 return False
65
66 if not path:
67 logger.error('Encountered callable with no __module__ path {}'.format(
68 func.__name__
69 ))
70 return False
71
72 # TODO: convert all the times to seconds for the clock
73
74 # TODO: send the schedule request
75
76 msg = ['run', {
77 'callable': callable_name,
78 'path': path,
79 'args': args,
80 'kwargs': kwargs,
81 'class_args': class_args,
82 'class_kwargs': class_kwargs,
83 }]
84
85 send_schedule_request(socket, interval_secs=interval_secs,
86 message=msg, headers=headers, queue=queue,
87 unschedule=unschedule)
88
89
29def defer_job(socket, func, args=(), kwargs=None, class_args=(), 90def defer_job(socket, func, args=(), kwargs=None, class_args=(),
30 class_kwargs=None, reply_requested=False, guarantee=False, 91 class_kwargs=None, reply_requested=False, guarantee=False,
31 retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): 92 retry_count=0, queue=conf.DEFAULT_QUEUE_NAME):
@@ -94,7 +155,7 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(),
94 send_request(socket, msg, reply_requested=reply_requested, 155 send_request(socket, msg, reply_requested=reply_requested,
95 guarantee=guarantee, retry_count=retry_count, queue=queue) 156 guarantee=guarantee, retry_count=retry_count, queue=queue)
96 157
97 return True 158 return True # The message has successfully been queued for delivery
98 159
99 160
100def build_module_path(func): 161def build_module_path(func):
@@ -196,6 +257,34 @@ def send_request(socket, message, reply_requested=False, guarantee=False,
196 ) 257 )
197 258
198 259
260def send_schedule_request(socket, interval_secs, message, headers=(),
261 queue=None, unschedule=False):
262 """
263 Send a SCHEDULE or UNSCHEDULE command.
264
265 Queues a message requesting that something happens on an
266 interval for the scheduler.
267
268 Args:
269 socket (socket):
270 interval_secs (int):
271 message: Message to send socket.
272 headers (list): List of headers for the message
273 queue (str): name of queue the job should be executed in
274 """
275
276 if unschedule:
277 command = 'UNSCHEDULE'
278 else:
279 command = 'SCHEDULE'
280
281 send_emqp_message(socket, command,
282 (queue or conf.DEFAULT_QUEUE_NAME,
283 ','.join(headers),
284 str(interval_secs),
285 serialize(message)))
286
287
199def job(block=False): # Move to decorators.py 288def job(block=False): # Move to decorators.py
200 """ 289 """
201 run the decorated function on a worker 290 run the decorated function on a worker
diff --git a/eventmq/constants.py b/eventmq/constants.py
index 621a6a5..c379ef5 100644
--- a/eventmq/constants.py
+++ b/eventmq/constants.py
@@ -8,5 +8,11 @@ class STATUS(object):
8 stopping = 300 8 stopping = 300
9 stopped = 301 9 stopped = 301
10 10
11
12class CLIENT_TYPE(object):
13 worker = 'worker'
14 scheduler = 'scheduler'
15
16
11# See doc/protocol.rst 17# See doc/protocol.rst
12PROTOCOL_VERSION = 'eMQP/1.0' 18PROTOCOL_VERSION = 'eMQP/1.0'
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index b52eebd..9499146 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -21,14 +21,13 @@ import json
21import logging 21import logging
22import signal 22import signal
23 23
24from . import conf, constants, exceptions, utils 24from . import conf
25from .poller import Poller, POLLIN 25from .poller import Poller, POLLIN
26from .sender import Sender 26from .sender import Sender
27from .utils.classes import HeartbeatMixin 27from .utils.classes import EMQPService, HeartbeatMixin
28from .utils.settings import import_settings 28from .utils.settings import import_settings
29from .utils.devices import generate_device_name 29from .utils.devices import generate_device_name
30from .utils.messages import send_emqp_message as sendmsg 30from .utils.messages import send_emqp_message as sendmsg
31import utils.messages
32from .utils.timeutils import monotonic 31from .utils.timeutils import monotonic
33from .worker import MultiprocessWorker as Worker 32from .worker import MultiprocessWorker as Worker
34from eventmq.log import setup_logger 33from eventmq.log import setup_logger
@@ -36,13 +35,14 @@ from eventmq.log import setup_logger
36logger = logging.getLogger(__name__) 35logger = logging.getLogger(__name__)
37 36
38 37
39class JobManager(HeartbeatMixin): 38class JobManager(HeartbeatMixin, EMQPService):
40 """ 39 """
41 The exposed portion of the worker. The job manager's main responsibility is 40 The exposed portion of the worker. The job manager's main responsibility is
42 to manage the resources on the server it's running. 41 to manage the resources on the server it's running.
43 42
44 This job manager uses tornado's eventloop. 43 This job manager uses tornado's eventloop.
45 """ 44 """
45 SERVICE_TYPE = 'worker'
46 46
47 def __init__(self, *args, **kwargs): 47 def __init__(self, *args, **kwargs):
48 """ 48 """
@@ -58,7 +58,7 @@ class JobManager(HeartbeatMixin):
58 #: Define the name of this JobManager instance. Useful to know when 58 #: Define the name of this JobManager instance. Useful to know when
59 #: referring to the logs. 59 #: referring to the logs.
60 self.name = kwargs.pop('name', generate_device_name()) 60 self.name = kwargs.pop('name', generate_device_name())
61 logger.info('Initializing JobManager %s...'.format(self.name)) 61 logger.info('Initializing JobManager {}...'.format(self.name))
62 62
63 #: Number of workers that are available to have a job executed. This 63 #: Number of workers that are available to have a job executed. This
64 #: number changes as workers become busy with jobs 64 #: number changes as workers become busy with jobs
@@ -67,7 +67,8 @@ class JobManager(HeartbeatMixin):
67 #: JobManager starts out by INFORMing the router of it's existance, 67 #: JobManager starts out by INFORMing the router of it's existance,
68 #: then telling the router that it is READY. The reply will be the unit 68 #: then telling the router that it is READY. The reply will be the unit
69 #: of work. 69 #: of work.
70 self.incoming = Sender() 70 # Despite the name, jobs are received on this socket
71 self.outgoing = Sender(name=self.name)
71 72
72 #: Jobs that are running should be stored in `active_jobs`. There 73 #: Jobs that are running should be stored in `active_jobs`. There
73 #: should always be at most `available_workers` count of active jobs. 74 #: should always be at most `available_workers` count of active jobs.
@@ -78,109 +79,39 @@ class JobManager(HeartbeatMixin):
78 79
79 self._setup() 80 self._setup()
80 81
81 def _setup(self): 82 def _start_event_loop(self):
82 """ 83 """
83 Prepares JobManager ready to connect to a broker. Actions that must 84 Starts the actual eventloop. Usually called by :meth:`start`
84 also run on a reset are here.
85 """ 85 """
86 # Look for incoming events 86 # Acknowledgment has come
87 self.poller.register(self.incoming, POLLIN) 87 # Send a READY for each available worker
88 self.awaiting_startup_ack = False 88 for i in range(0, self.available_workers):
89 89 self.send_ready()
90 self.status = constants.STATUS.ready
91 90
92 def start(self, addr='tcp://127.0.0.1:47291'):
93 """
94 Connect to `addr` and begin listening for job requests
95
96 Args:
97 addr (str): connection string to connect to
98 """
99 while True:
100 self.status = constants.STATUS.connecting
101 self.incoming.connect(addr)
102
103 self.awaiting_startup_ack = True
104 self.send_inform()
105
106 # We don't want to accidentally start processing jobs before our
107 # connection has been setup completely and acknowledged.
108 while self.awaiting_startup_ack:
109 # Poller timeout is in ms so the reconnect timeout is
110 # multiplied by 1000 to get seconds
111 events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000)
112
113 if self.incoming in events: # A message from the Router!
114 msg = self.incoming.recv_multipart()
115 # TODO This will silently drop messages that aren't ACK
116 if msg[2] == "ACK":
117 # :meth:`on_ack` will set self.awaiting_startup_ack to
118 # False
119 self.process_message(msg)
120
121 # Acknowledgment has come
122 # Send a READY for each available worker
123 for i in range(0, self.available_workers):
124 self.send_ready()
125
126 self.status = constants.STATUS.connected
127 logger.info('Starting to listen for jobs')
128 91
129 # handle any sighups by reloading config 92 # handle any sighups by reloading config
130 signal.signal(signal.SIGHUP, self.sighup_handler) 93 signal.signal(signal.SIGHUP, self.sighup_handler)
131 94
132 self._start_event_loop()
133 # When we return, soemthing has gone wrong and we should try to
134 # reconnect
135 self.reset()
136
137 def reset(self):
138 """
139 Resets the current connection by closing and reopening the socket
140 """
141 # Unregister the old socket from the poller
142 self.poller.unregister(self.incoming)
143
144 # Polish up a new socket to use
145 self.incoming.rebuild()
146
147 # Prepare the device to connect again
148 self._setup()
149
150 def _start_event_loop(self):
151 """
152 Starts the actual eventloop. Usually called by :meth:`JobManager.start`
153 """
154 while True: 95 while True:
96 if self.received_disconnect:
97 # self.reset()
98 # Shut down if there are no active jobs waiting
99 if len(self.active_jobs) > 0:
100 self.prune_active_jobs()
101 continue
102 break
103
155 now = monotonic() 104 now = monotonic()
156 events = self.poller.poll() 105 events = self.poller.poll()
157 106
158 if events.get(self.incoming) == POLLIN: 107 if events.get(self.outgoing) == POLLIN:
159 msg = self.incoming.recv_multipart() 108 msg = self.outgoing.recv_multipart()
160 self.process_message(msg) 109 self.process_message(msg)
161 110
162 # Maintain the list of active jobs 111 self.prune_active_jobs()
163 for job in self.active_jobs:
164 if not job.is_alive():
165 self.active_jobs.remove(job)
166 self.available_workers += 1
167 self.send_ready()
168 112
169 # TODO: Optimization: Move the method calls into another thread so 113 if not self.maybe_send_heartbeat(events):
170 # they don't block the event loop 114 break
171 if not conf.DISABLE_HEARTBEATS:
172 # Send a HEARTBEAT if necessary
173 if now - self._meta['last_sent_heartbeat'] >= \
174 conf.HEARTBEAT_INTERVAL:
175 self.send_heartbeat(self.incoming)
176
177 # Do something about any missed HEARTBEAT, if we have nothing
178 # waiting on the socket
179 if self.is_dead() and not events:
180 logger.critical(
181 'The broker appears to have gone away. '
182 'Reconnecting...')
183 break
184 115
185 def on_request(self, msgid, msg): 116 def on_request(self, msgid, msg):
186 """ 117 """
@@ -222,58 +153,12 @@ class JobManager(HeartbeatMixin):
222 153
223 self.available_workers -= 1 154 self.available_workers -= 1
224 155
225 def process_message(self, msg):
226 """
227 Processes a message
228
229 Args:
230 msg: The message received from the socket to parse and process.
231 Processing takes form of calling an `on_COMMAND` method.
232 """
233 # Any received message should count as a heartbeat
234 self._meta['last_received_heartbeat'] = monotonic()
235 if self._meta['heartbeat_miss_count']:
236 self._meta['heartbeat_miss_count'] = 0 # Reset the miss count too
237
238 try:
239 message = utils.messages.parse_message(msg)
240 except exceptions.InvalidMessageError:
241 logger.error('Invalid message: %s' % str(msg))
242 return
243
244 command = message[0]
245 msgid = message[1]
246 message = message[2]
247
248 if hasattr(self, "on_%s" % command.lower()):
249 func = getattr(self, "on_%s" % command.lower())
250 func(msgid, message)
251 else:
252 logger.warning('No handler for %s found (tried: %s)' %
253 (command, ('on_%s' % command.lower())))
254
255 def send_ready(self): 156 def send_ready(self):
256 """ 157 """
257 send the READY command upstream to indicate that JobManager is ready 158 send the READY command upstream to indicate that JobManager is ready
258 for another REQUEST message. 159 for another REQUEST message.
259 """ 160 """
260 sendmsg(self.incoming, 'READY') 161 sendmsg(self.outgoing, 'READY')
261
262 def send_inform(self, queue=None):
263 """
264 Send an INFORM command
265 """
266 sendmsg(self.incoming, 'INFORM', queue or conf.DEFAULT_QUEUE_NAME)
267 self._meta['last_sent_heartbeat'] = monotonic()
268
269 def on_ack(self, msgid, ackd_msgid):
270 """
271 Sets :attr:`awaiting_ack` to False
272 """
273 # The msgid is the only frame in the message
274 ackd_msgid = ackd_msgid[0]
275 logger.info('Received ACK for router (or client) %s' % ackd_msgid)
276 self.awaiting_startup_ack = False
277 162
278 def on_heartbeat(self, msgid, message): 163 def on_heartbeat(self, msgid, message):
279 """ 164 """
@@ -282,6 +167,16 @@ class JobManager(HeartbeatMixin):
282 HEARTBEAT 167 HEARTBEAT
283 """ 168 """
284 169
170 def prune_active_jobs(self):
171 # Maintain the list of active jobs
172 for job in self.active_jobs:
173 if not job.is_alive():
174 self.active_jobs.remove(job)
175 self.available_workers += 1
176
177 if not self.received_disconnect:
178 self.send_ready()
179
285 def sighup_handler(self, signum, frame): 180 def sighup_handler(self, signum, frame):
286 logger.info('Caught signal %s' % signum) 181 logger.info('Caught signal %s' % signum)
287 self.incoming.unbind(conf.FRONTEND_ADDR) 182 self.incoming.unbind(conf.FRONTEND_ADDR)
diff --git a/eventmq/router.py b/eventmq/router.py
index ac230ff..ecbe46c 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -19,12 +19,11 @@ Routes messages to workers (that are in named queues).
19""" 19"""
20from copy import copy 20from copy import copy
21import logging 21import logging
22import threading
23import warnings 22import warnings
24import signal 23import signal
25 24
26from . import conf, exceptions, poller, receiver 25from . import conf, exceptions, poller, receiver
27from .constants import STATUS 26from .constants import STATUS, CLIENT_TYPE
28from .utils.classes import HeartbeatMixin 27from .utils.classes import HeartbeatMixin
29from .utils.messages import ( 28from .utils.messages import (
30 send_emqp_router_message as sendmsg, 29 send_emqp_router_message as sendmsg,
@@ -85,6 +84,25 @@ class Router(HeartbeatMixin):
85 #: workers available to take the job 84 #: workers available to take the job
86 self.waiting_messages = {} 85 self.waiting_messages = {}
87 86
87 #: Tracks the last time the scheduler queue was cleaned out of dead
88 #: schedulers
89 self._meta['last_scheduler_cleanup'] = 0
90
91 #: Queue for schedulers to use:
92 self.scheduler_queue = []
93
94 #: Scheduler clients. Clients are able to send SCHEDULE commands that
95 #: need to be routed to a scheduler, which will keep track of time and
96 #: run the job.
97 #: Contains dictionaries:
98 #: self.schedulers[<scheduler_zmq_id>] = {
99 #: 'hb': <last_recv_heartbeat>,
100 #: }
101 self.schedulers = {}
102
103 #: Set to True when the router should die.
104 self.received_disconnect = False
105
88 def start(self, 106 def start(self,
89 frontend_addr=conf.FRONTEND_ADDR, 107 frontend_addr=conf.FRONTEND_ADDR,
90 backend_addr=conf.BACKEND_ADDR): 108 backend_addr=conf.BACKEND_ADDR):
@@ -113,6 +131,10 @@ class Router(HeartbeatMixin):
113 Starts the actual eventloop. Usually called by :meth:`Router.start` 131 Starts the actual eventloop. Usually called by :meth:`Router.start`
114 """ 132 """
115 while True: 133 while True:
134
135 if self.received_disconnect:
136 break
137
116 now = monotonic() 138 now = monotonic()
117 events = self.poller.poll() 139 events = self.poller.poll()
118 140
@@ -137,6 +159,22 @@ class Router(HeartbeatMixin):
137 # ones so the next one is alive 159 # ones so the next one is alive
138 self.clean_up_dead_workers() 160 self.clean_up_dead_workers()
139 161
162 if now - self._meta['last_sent_scheduler_heartbeat'] >= \
163 conf.HEARTBEAT_INTERVAL:
164 self.send_schedulers_heartbeats()
165
166 if now - self._meta['last_scheduler_cleanup'] >= 10:
167 self.clean_up_dead_schedulers()
168
169 def reset_heartbeat_counters(self):
170 """
171 Reset all the counters for heartbeats back to 0
172 """
173 super(Router, self).reset_heartbeat_counters()
174
175 # track the last time the router sent a heartbeat to the schedulers
176 self._meta['last_sent_scheduler_heartbeat'] = 0
177
140 def send_ack(self, socket, recipient, msgid): 178 def send_ack(self, socket, recipient, msgid):
141 """ 179 """
142 Sends an ACK response 180 Sends an ACK response
@@ -162,13 +200,22 @@ class Router(HeartbeatMixin):
162 200
163 def send_workers_heartbeats(self): 201 def send_workers_heartbeats(self):
164 """ 202 """
165 Send heartbeats to all registered workers. 203 Send HEARTBEATs to all registered workers.
166 """ 204 """
167 self._meta['last_sent_heartbeat'] = monotonic() 205 self._meta['last_sent_heartbeat'] = monotonic()
168 206
169 for worker_id in self.workers: 207 for worker_id in self.workers:
170 self.send_heartbeat(self.outgoing, worker_id) 208 self.send_heartbeat(self.outgoing, worker_id)
171 209
210 def send_schedulers_heartbeats(self):
211 """
212 Send HEARTBEATs to all registered schedulers
213 """
214 self._meta['last_sent_scheduler_heartbeat'] = monotonic()
215
216 for scheduler_id in self.schedulers:
217 self.send_heartbeat(self.incoming, scheduler_id)
218
172 def on_heartbeat(self, sender, msgid, msg): 219 def on_heartbeat(self, sender, msgid, msg):
173 """ 220 """
174 a placeholder for a no-op command. The actual 'logic' for HEARTBEAT is 221 a placeholder for a no-op command. The actual 'logic' for HEARTBEAT is
@@ -181,12 +228,22 @@ class Router(HeartbeatMixin):
181 Handles an INFORM message. This happens when new worker coming online 228 Handles an INFORM message. This happens when new worker coming online
182 and announces itself. 229 and announces itself.
183 """ 230 """
184 logger.info('Received INFORM request from %s' % sender)
185 queue_name = msg[0] 231 queue_name = msg[0]
232 client_type = msg[1]
233
234 logger.info('Received INFORM request from {} (type: {})'.format(
235 sender, client_type))
186 236
187 self.add_worker(sender, queue_name) 237 if client_type == CLIENT_TYPE.worker:
238 self.add_worker(sender, queue_name)
239 self.send_ack(self.outgoing, sender, msgid)
240 elif client_type == CLIENT_TYPE.scheduler:
241 self.add_scheduler(sender)
242 self.send_ack(self.incoming, sender, msgid)
188 243
189 self.send_ack(self.outgoing, sender, msgid) 244 def on_disconnect(self, msgid, msg):
245 # Loops event loops should check for this and break out
246 self.received_disconnect = True
190 247
191 def on_ready(self, sender, msgid, msg): 248 def on_ready(self, sender, msgid, msg):
192 """ 249 """
@@ -252,7 +309,7 @@ class Router(HeartbeatMixin):
252 Adds a worker to worker queues 309 Adds a worker to worker queues
253 310
254 Args: 311 Args:
255 worker_id: unique id of the worker to add 312 worker_id (str): unique id of the worker to add
256 queues: queue or queues this worker should be a member of 313 queues: queue or queues this worker should be a member of
257 """ 314 """
258 # Add the worker to our worker dict 315 # Add the worker to our worker dict
@@ -263,6 +320,36 @@ class Router(HeartbeatMixin):
263 logger.debug('Adding {} to the self.workers for queues:{}'.format( 320 logger.debug('Adding {} to the self.workers for queues:{}'.format(
264 worker_id, str(queues))) 321 worker_id, str(queues)))
265 322
323 def clean_up_dead_schedulers(self):
324 """
325 Loops through the list of schedulers and remove any schedulers who
326 the router hasn't received a heartbeat in HEARTBEAT_TIMEOUT
327 """
328 now = monotonic()
329 self._meta['last_scheduler_cleanup'] = now
330 schedulers = copy(self.scheduler_queue)
331
332 for scheduler_id in schedulers:
333 last_hb_seconds = now - self.schedulers[scheduler_id]['hb']
334 if last_hb_seconds >= conf.HEARTBEAT_TIMEOUT:
335 logger.info("No HEARTBEAT from scheduler {} in {} Removing "
336 "from the queue".format(scheduler_id,
337 last_hb_seconds))
338 del self.schedulers[scheduler_id]
339 self.scheduler_queue.remove(scheduler_id)
340
341 def add_scheduler(self, scheduler_id):
342 """
343 Adds a scheduler to the queue to receive SCHEDULE commands
344
345 Args:
346 scheduler_id (str): unique id of the scheduler to add
347 """
348 self.scheduler_queue.append(scheduler_id)
349 self.schedulers[scheduler_id] = {}
350 self.schedulers[scheduler_id]['hb'] = monotonic()
351 logger.debug('Adding {} to self.schedulers'.format(scheduler_id))
352
266 def requeue_worker(self, worker_id): 353 def requeue_worker(self, worker_id):
267 """ 354 """
268 Add a worker back to the pools for which it is a member of. 355 Add a worker back to the pools for which it is a member of.
@@ -295,10 +382,6 @@ class Router(HeartbeatMixin):
295 382
296 def on_receive_request(self, msg): 383 def on_receive_request(self, msg):
297 """ 384 """
298 This function is called when a message comes in from the client socket.
299 It then calls `on_command`. If `on_command` isn't found, then a
300 warning is created.
301
302 Args: 385 Args:
303 msg: The untouched message from zmq 386 msg: The untouched message from zmq
304 """ 387 """
@@ -307,43 +390,83 @@ class Router(HeartbeatMixin):
307 except exceptions.InvalidMessageError: 390 except exceptions.InvalidMessageError:
308 logger.exception('Invalid message from clients: %s' % str(msg)) 391 logger.exception('Invalid message from clients: %s' % str(msg))
309 392
310 queue_name = message[3][0] 393 sender = message[0]
311 394 command = message[1]
312 # If we have no workers for the queue TODO something about it
313 if queue_name not in self.queues:
314 logger.warning("Received REQUEST with a queue I don't recognize: "
315 "%s" % queue_name)
316 logger.critical("Discarding message")
317 # TODO: Don't discard the message
318 return
319
320 try:
321 worker_addr = self.queues[queue_name].pop()
322 except KeyError:
323 logger.critical("REQUEST for an unknown queue caught in exception")
324 logger.critical("Discarding message")
325 return
326 except IndexError:
327 logger.warning('No available workers for queue "%s". Buffering '
328 'message to send later.' % queue_name)
329 if queue_name not in self.waiting_messages:
330 self.waiting_messages[queue_name] = []
331 self.waiting_messages[queue_name].append(msg)
332 logger.debug('%d waiting messages in queue "%s"' %
333 (len(self.waiting_messages[queue_name]), queue_name))
334 return
335 395
336 try: 396 # Count this message as a heart beat if it came from a scheduler that
337 # strip off the client id before forwarding because the worker 397 # the router is aware of.
338 # isn't expecting it, and the zmq socket is going to put our 398 if sender in self.schedulers and sender in self.scheduler_queue:
339 # id on it. 399 self.schedulers[sender]['hb'] = monotonic()
340 fwdmsg(self.outgoing, worker_addr, msg[1:]) 400
341 except exceptions.PeerGoneAwayError: 401 # If it is a heartbeat then there is nothing left to do
342 logger.debug("Worker {} has unexpectedly gone away. Trying " 402 if command == "HEARTBEAT":
343 "another worker".format(worker_addr)) 403 return
344 404
345 # TODO: Rewrite this logic as a loop 405 # REQUEST is the most common message so it goes at the top
346 self.on_receive_request(msg) 406 if command == "REQUEST":
407 queue_name = message[3][0]
408 # If we have no workers for the queue TODO something about it
409 if queue_name not in self.queues:
410 logger.warning("Received %s with a queue I don't recognize: "
411 "%s" % (msg[3], queue_name))
412 logger.critical("Discarding message")
413 # TODO: Don't discard the message
414 return
415
416 try:
417 worker_addr = self.queues[queue_name].pop()
418 except KeyError:
419 logger.critical("REQUEST for an unknown queue caught in "
420 "exception")
421 logger.critical("Discarding message")
422 return
423 except IndexError:
424 logger.warning('No available workers for queue "%s". '
425 'Buffering message to send later.' % queue_name)
426 if queue_name not in self.waiting_messages:
427 self.waiting_messages[queue_name] = []
428 self.waiting_messages[queue_name].append(msg)
429 logger.debug('%d waiting messages in queue "%s"' %
430 (len(self.waiting_messages[queue_name]),
431 queue_name))
432 return
433
434 try:
435 # strip off the client id before forwarding because the
436 # worker isn't expecting it, and the zmq socket is going
437 # to put this router's id on it.
438 fwdmsg(self.outgoing, worker_addr, msg[1:])
439 except exceptions.PeerGoneAwayError:
440 logger.debug("Worker {} has unexpectedly gone away. "
441 "Trying another worker".format(worker_addr))
442
443 # TODO: Rewrite this logic as a loop, so it can't recurse
444 # into oblivion
445 self.on_receive_request(msg)
446 # elif command == "HEARTBEAT":
447 # # The scheduler is heartbeating
448
449 elif command == "INFORM":
450 # This is a scheduler trying join
451 self.on_inform(message[0], message[2], message[3])
452
453 elif command == "SCHEDULE" or command == "UNSCHEDULE":
454 # Forward the schedule message to the schedulers
455 scheduler_addr = self.scheduler_queue.pop()
456 self.scheduler_queue.append(scheduler_addr)
457 self.schedulers[scheduler_addr] = {
458 'hb': monotonic(),
459 }
460
461 try:
462 # Strips off the client id before forwarding because the
463 # scheduler isn't expecting it.
464 fwdmsg(self.incoming, scheduler_addr, msg[1:])
465 except exceptions.PeerGoneAwayError:
466 logger.debug("Scheduler {} has unexpectedly gone away. Trying "
467 "another scheduler.".format(scheduler_addr))
468 # TODO: rewrite this in a loop
469 self.on_receive_request(msg)
347 470
348 def process_worker_message(self, msg): 471 def process_worker_message(self, msg):
349 """ 472 """
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index c5c5754..16f6095 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -17,17 +17,23 @@
17============================= 17=============================
18Handles cron and other scheduled tasks 18Handles cron and other scheduled tasks
19""" 19"""
20import json
20import logging 21import logging
21import time 22import time
23import redis
22 24
23from croniter import croniter 25from croniter import croniter
24from six import next 26from six import next
25 27
26from . import conf 28from . import conf
27from .sender import Sender 29from .sender import Sender
28from .utils.classes import HeartbeatMixin 30from .poller import Poller, POLLIN
31from .utils.classes import EMQPService, HeartbeatMixin
32from json import loads as deserialize
33from json import dumps as serialize
29from .utils.settings import import_settings 34from .utils.settings import import_settings
30from .utils.timeutils import seconds_until, timestamp 35from .utils.timeutils import IntervalIter
36from .utils.timeutils import seconds_until, timestamp, monotonic
31from .client.messages import send_request 37from .client.messages import send_request
32 38
33from eventmq.log import setup_logger 39from eventmq.log import setup_logger
@@ -35,40 +41,65 @@ from eventmq.log import setup_logger
35logger = logging.getLogger(__name__) 41logger = logging.getLogger(__name__)
36 42
37 43
38class Scheduler(HeartbeatMixin): 44class Scheduler(HeartbeatMixin, EMQPService):
39 """ 45 """
40 Keeper of time, master of schedules 46 Keeper of time, master of schedules
41 """ 47 """
48 SERVICE_TYPE = 'scheduler'
42 49
43 def __init__(self, *args, **kwargs): 50 def __init__(self, *args, **kwargs):
44 logger.info('Initializing Scheduler...') 51 logger.info('Initializing Scheduler...')
45 super(Scheduler, self).__init__(*args, **kwargs) 52 super(Scheduler, self).__init__(*args, **kwargs)
46 self.outgoing = Sender() 53 self.outgoing = Sender()
47 54
48 # 0 = the next ts this job should be executed 55 # Open connection to redis server for persistance
56 self.redis_server = redis.StrictRedis(host='localhost',
57 port=6379,
58 db=0)
59
60 # contains 4-item lists representing cron jobs
61 # IDX Description
62 # 0 = the next ts this job should be executed in
49 # 1 = the function to be executed 63 # 1 = the function to be executed
50 # 2 = the croniter iterator for this job 64 # 2 = the croniter iterator for this job
51 self.jobs = [] 65 # 3 = the queue to execute the job in
66 self.cron_jobs = []
67
68 # contains dict of 4-item lists representing jobs based on an interval
69 # key of this dictionary is a hash of company_id, path, and callable
70 # from the message of the SCHEDULE command received
71 # values of this list follow this format:
72 # IDX Descriptions
73 # 0 = the next (monotonic) ts that this job should be executed in
74 # 1 = the function to be executed
75 # 2 = the interval iter for this job
76 # 3 = the queue to execute the job in
77 self.interval_jobs = {}
78
79 self.poller = Poller()
52 80
53 self.load_jobs() 81 self.load_jobs()
54 82
55 def connect(self, addr='tcp://127.0.0.1:47290'): 83 self._setup()
56 """
57 Connect the scheduler to worker/router at `addr`
58 """
59 self.outgoing.connect(addr)
60 84
61 def load_jobs(self): 85 def load_jobs(self):
62 """ 86 """
63 Loads the jobs that need to be scheduled 87 Loads the jobs that need to be scheduled
64 """ 88 """
65 raw_jobs = ( 89 raw_jobs = (
66 ('* * * * *', 'eventmq.scheduler.test_job'), 90 # ('* * * * *', 'eventmq.scheduler.test_job'),
67 ) 91 )
68 ts = int(timestamp()) 92 ts = int(timestamp())
69 for job in raw_jobs: 93 for job in raw_jobs:
70 # Create the croniter iterator 94 # Create the croniter iterator
71 c = croniter(job[0]) 95 c = croniter(job[0])
96 path = '.'.join(job[1].split('.')[:-1])
97 callable_ = job.split('.')[-1]
98
99 msg = ['run', {
100 'path': path,
101 'callable': callable_
102 }]
72 103
73 # Get the next time this job should be run 104 # Get the next time this job should be run
74 c_next = next(c) 105 c_next = next(c)
@@ -76,15 +107,24 @@ class Scheduler(HeartbeatMixin):
76 # If the next execution time has passed move the iterator to 107 # If the next execution time has passed move the iterator to
77 # the following time 108 # the following time
78 c_next = next(c) 109 c_next = next(c)
79 self.jobs.append([c_next, job[1], c]) 110 self.cron_jobs.append([c_next, msg, c, None])
80
81 def start(self, addr='tcp://127.0.0.1:47290'):
82 """
83 Begin sending messages to execute scheduled jobs
84 """
85 self.connect(addr)
86 111
87 self._start_event_loop() 112 # Restore persisted data if redis connection is alive and has jobs
113 if (self.redis_server):
114 interval_job_list = self.redis_server.lrange('interval_jobs',
115 0,
116 -1)
117 if interval_job_list is not None:
118 for i in interval_job_list:
119 logger.debug('Restoring job with hash %s' % i)
120 if (self.redis_server.get(i)):
121 self.load_job_from_redis(
122 message=deserialize(self.redis_server.get(i)))
123 else:
124 logger.warning('Expected scheduled job in redis,' +
125 'but none was found with hash %s' % i)
126 else:
127 logger.warning('Unabled to talk to redis server')
88 128
89 def _start_event_loop(self): 129 def _start_event_loop(self):
90 """ 130 """
@@ -92,29 +132,146 @@ class Scheduler(HeartbeatMixin):
92 """ 132 """
93 while True: 133 while True:
94 ts_now = int(timestamp()) 134 ts_now = int(timestamp())
135 m_now = monotonic()
136 events = self.poller.poll()
137
138 if events.get(self.outgoing) == POLLIN:
139 msg = self.outgoing.recv_multipart()
140 self.process_message(msg)
95 141
96 for i in range(0, len(self.jobs)): 142 # TODO: distribute me!
97 if self.jobs[i][0] <= ts_now: # If the time is now, or passed 143 for i in range(0, len(self.cron_jobs)):
98 job = self.jobs[i][1] 144 # If the time is now, or passed
99 path = '.'.join(job.split('.')[:-1]) 145 if self.cron_jobs[i][0] <= ts_now:
100 callable_ = job.split('.')[-1] 146 msg = self.cron_jobs[i][1]
147 queue = self.cron_jobs[i][3]
101 148
102 # Run the job 149 # Run the msg
103 logger.debug("Time is: %s; Schedule is: %s - Running %s" 150 logger.debug("Time is: %s; Schedule is: %s - Running %s"
104 % (ts_now, self.jobs[i][0], job)) 151 % (ts_now, self.cron_jobs[i][0], msg))
105 152
106 msg = ['run', { 153 self.send_request(self.outgoing, msg, queue=queue)
107 'path': path,
108 'callable': callable_
109 }]
110 send_request(self.outgoing, msg)
111 154
112 # Update the next time to run 155 # Update the next time to run
113 self.jobs[i][0] = next(self.jobs[i][2]) 156 self.cron_jobs[i][0] = next(self.cron_jobs[i][2])
114 logger.debug("Next execution will be in %ss" % 157 logger.debug("Next execution will be in %ss" %
115 seconds_until(self.jobs[i][0])) 158 seconds_until(self.cron_jobs[i][0]))
116 159
117 time.sleep(0.1) 160 for k, v in self.interval_jobs.iteritems():
161 if v[0] <= m_now:
162 msg = v[1]
163 queue = v[3]
164
165 logger.debug("Time is: %s; Schedule is: %s - Running %s"
166 % (ts_now, v[0], msg))
167
168 self.send_request(msg, queue=queue)
169 v[0] = next(v[2])
170
171 if not self.maybe_send_heartbeat(events):
172 break
173
174 def send_request(self, jobmsg, queue=None):
175 jobmsg = json.loads(jobmsg)
176 send_request(self.outgoing, jobmsg, queue=queue)
177
178 def on_unschedule(self, msgid, message):
179 """
180 Unschedule an existing schedule job, if it exists
181 """
182 logger.info("Received new UNSCHEDULE request: {}".format(message))
183
184 schedule_hash = self.schedule_hash(message)
185
186 if schedule_hash in self.interval_jobs:
187 # Remove scheduled job
188 self.interval_jobs.pop(schedule_hash)
189 else:
190 logger.debug("Couldn't find matching schedule for unschedule " +
191 "request")
192
193 # Double check the redis server even if we didn't find the hash
194 # in memory
195 if (self.redis_server):
196 if (self.redis_server.get(schedule_hash)):
197 self.redis_server.lrem('interval_jobs', 0, schedule_hash)
198 self.redis_server.save()
199
200 def load_job_from_redis(self, message):
201 """
202 """
203 from .utils.timeutils import IntervalIter
204
205 queue = message[0].encode('utf-8')
206 interval = int(message[2])
207 inter_iter = IntervalIter(monotonic(), interval)
208 schedule_hash = self.schedule_hash(message)
209
210 self.interval_jobs[schedule_hash] = [
211 next(inter_iter),
212 message[3],
213 inter_iter,
214 queue
215 ]
216
217 def on_schedule(self, msgid, message):
218 """
219 """
220 logger.info("Received new SCHEDULE request: {}".format(message))
221
222 queue = message[0]
223 interval = int(message[2])
224 inter_iter = IntervalIter(monotonic(), interval)
225 schedule_hash = self.schedule_hash(message)
226
227 # Notify if this is updating existing, or new
228 if (schedule_hash in self.interval_jobs):
229 logger.debug('Update existing scheduled job with %s'
230 % schedule_hash)
231 else:
232 logger.debug('Creating a new scheduled job with %s'
233 % schedule_hash)
234
235 self.interval_jobs[schedule_hash] = [
236 next(inter_iter),
237 message[3],
238 inter_iter,
239 queue
240 ]
241
242 # Persist the scheduled job
243 if (self.redis_server):
244 if schedule_hash not in self.redis_server.lrange('interval_jobs',
245 0,
246 -1):
247 self.redis_server.lpush('interval_jobs', schedule_hash)
248 self.redis_server.set(schedule_hash, serialize(message))
249 self.redis_server.save()
250
251 self.send_request(message[3], queue=queue)
252
253 def on_heartbeat(self, msgid, message):
254 """
255 Noop command. The logic for heartbeating is in the event loop.
256 """
257
258 def schedule_hash(self, message):
259 """
260 Create a unique identifier for this message for storing
261 and referencing later
262 """
263 # Items to use for uniquely identifying this scheduled job
264 # TODO: Pass company_id in a more rigid place
265 msg = deserialize(message[3])[1]
266 schedule_hash_items = {'company_id': msg['class_args'][0],
267 'path': msg['path'],
268 'callable': msg['callable']}
269
270 # Hash the sorted, immutable set of items in our identifying dict
271 schedule_hash = str(hash(tuple(frozenset(sorted(
272 schedule_hash_items.items())))))
273
274 return schedule_hash
118 275
119 def scheduler_main(self): 276 def scheduler_main(self):
120 """ 277 """
@@ -122,6 +279,7 @@ class Scheduler(HeartbeatMixin):
122 """ 279 """
123 setup_logger("eventmq") 280 setup_logger("eventmq")
124 import_settings() 281 import_settings()
282 self.__init__()
125 self.start(addr=conf.SCHEDULER_ADDR) 283 self.start(addr=conf.SCHEDULER_ADDR)
126 284
127 285
@@ -134,6 +292,4 @@ def scheduler_main():
134def test_job(): 292def test_job():
135 print "hello!" 293 print "hello!"
136 print "hello!" 294 print "hello!"
137 print "hello!"
138 print "hello!"
139 time.sleep(4) 295 time.sleep(4)
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
new file mode 100644
index 0000000..3c3941d
--- /dev/null
+++ b/eventmq/tests/test_jobmanager.py
@@ -0,0 +1,148 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15import json
16import threading
17import time
18import unittest
19
20import mock
21import zmq
22
23from .. import conf, constants, jobmanager
24from ..utils.classes import ZMQSendMixin, ZMQReceiveMixin
25from ..utils.messages import send_emqp_router_message
26
27ADDR = 'inproc://pour_the_rice_in_the_thing'
28
29
30class FakeDevice(ZMQReceiveMixin, ZMQSendMixin):
31 """
32 A fake router device so we can test with some of the nice utilities, but
33 still allowing manual control
34 """
35 def __init__(self, addr=ADDR):
36 super(FakeDevice, self).__init__()
37
38 self.zsocket = zmq.Context.instance().socket(zmq.ROUTER)
39
40
41class TestCase(unittest.TestCase):
42 def setUp(self):
43 self.jm = jobmanager.JobManager()
44
45 # Since JobManager runs as a process a thread is used to allow the loop
46 # to run
47 self.jm_thread = threading.Thread(target=start_jm,
48 args=(self.jm,))
49
50 self.addCleanup(self.cleanup)
51
52 def test__setup(self):
53 jm = jobmanager.JobManager(name='RuckusBringer')
54 self.assertEqual(jm.name, 'RuckusBringer')
55
56 self.assertFalse(jm.awaiting_startup_ack)
57 self.assertEqual(jm.status, constants.STATUS.ready)
58
59# EMQP Tests
60 def test_reset(self):
61 self.jm.reset()
62
63 self.assertFalse(self.jm.awaiting_startup_ack)
64 self.assertEqual(self.jm.status, constants.STATUS.ready)
65
66 @mock.patch('signal.signal')
67 def test_start(self, mock_signal_signal):
68 sock = FakeDevice()
69
70 self.jm_thread.start()
71 time.sleep(.1) # wait for the manager to warm up
72
73 self.assertTrue(self.jm.awaiting_startup_ack)
74 self.assertEqual(self.jm.status, constants.STATUS.connecting)
75
76 # Give JM something to connect to.
77 sock.zsocket.bind(ADDR)
78
79 jm_addr, _, _, cmd, msgid, queues, type_ = sock.recv_multipart()
80 self.assertEqual(self.jm.name, jm_addr)
81 self.assertEqual(cmd, "INFORM")
82 self.assertEqual(type_, constants.CLIENT_TYPE.worker)
83
84 self.send_ack(sock, jm_addr, msgid)
85
86 time.sleep(.1)
87 self.assertEqual(self.jm.status, constants.STATUS.connected)
88
89 def send_ack(self, sock, jm_addr, msgid):
90 send_emqp_router_message(sock, jm_addr, "ACK", msgid)
91
92 @mock.patch('signal.signal')
93 def test__start_event_loop(self, mock_signal_signal):
94 # Tests the first part of the event loop
95 sock = FakeDevice()
96 sock.zsocket.bind(ADDR)
97
98 self.jm_thread.start()
99
100 # Consume the INFORM command
101 jm_addr, _, _, cmd, msgid, queues, type_ = sock.recv_multipart()
102 self.send_ack(sock, jm_addr, msgid)
103
104 # Test the correct number of READY messages is sent for the broker
105 # to know how many jobs the JM can handle
106 ready_msg_count = 0
107 for i in range(0, self.jm.available_workers):
108 msg = sock.recv_multipart()
109 if len(msg) > 4 and msg[3] == "READY":
110 ready_msg_count += 1
111 # If this fails, less READY messages were sent than were supposed
112 # to be sent.
113 self.assertEqual(ready_msg_count, self.jm.available_workers)
114
115 @mock.patch('signal.signal')
116 def test_on_request(self, mock_signal_signal):
117 from ..client.messages import build_module_path
118 sock = FakeDevice()
119 sock.zsocket.bind(ADDR)
120 self.jm_thread.start()
121
122 jm_addr, _, _, _, msgid, _, _ = sock.recv_multipart()
123 self.send_ack(sock, jm_addr, msgid)
124 time.sleep(.1) # give time for the JM to process
125
126 path, callable_name = build_module_path(pretend_job)
127
128 run_msg = ['run', {
129 'path': path,
130 'callable': callable_name,
131 }]
132
133 msg = (conf.DEFAULT_QUEUE_NAME, '', json.dumps(run_msg))
134
135 send_emqp_router_message(sock, jm_addr, 'REQUEST', msg)
136 time.sleep(.1) # give time for the job to start up.
137 self.assertEqual(len(self.jm.active_jobs), 1)
138
139 def cleanup(self):
140 self.jm.on_disconnect(None, None)
141
142
143def start_jm(jm):
144 jm.start(ADDR)
145
146
147def pretend_job():
148 time.sleep(1)
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
new file mode 100644
index 0000000..11ca9ec
--- /dev/null
+++ b/eventmq/tests/test_router.py
@@ -0,0 +1,42 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15import threading
16import unittest
17
18import mock
19
20from .. import router
21
22ADDR = 'inproc://kodak_film_festival'
23
24
25class TestCase(unittest.TestCase):
26 def setUp(self):
27 self.router = router.Router()
28
29 self.thread = threading.Thread(target=start_router,
30 args=(self.router,))
31
32 self.addCleanup(self.cleanup)
33
34 @mock.patch('signal.signal')
35 def test_start(self, mock_signal_signal):
36 self.thread.start()
37
38 def cleanup(self):
39 self.router.on_disconnect(None, None)
40
41def start_router(router):
42 router.start(ADDR)
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py
index 0fc8042..7ed36d7 100644
--- a/eventmq/tests/test_utils.py
+++ b/eventmq/tests/test_utils.py
@@ -52,3 +52,6 @@ class TestCase(unittest.TestCase):
52 broken_message = ('dlkajfs', 'lkasdjf') 52 broken_message = ('dlkajfs', 'lkasdjf')
53 with self.assertRaises(exceptions.InvalidMessageError): 53 with self.assertRaises(exceptions.InvalidMessageError):
54 messages.parse_router_message(broken_message) 54 messages.parse_router_message(broken_message)
55
56 def test_parse_router_message(self):
57 ['aef451a0-5cef-4f03-818a-221061c8ab68', '', 'eMQP/1.0', 'INFORM', '5caeb5fd-15d4-4b08-89e8-4e536672eef3', 'default', 'worker']
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index 3459c65..cfd06d5 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -21,13 +21,198 @@ import logging
21 21
22import zmq.error 22import zmq.error
23 23
24from .. import conf, exceptions 24from .. import conf, constants, exceptions, poller, utils
25from ..utils.messages import send_emqp_message as sendmsg 25from ..utils.messages import send_emqp_message as sendmsg
26from ..utils.timeutils import monotonic, timestamp 26from ..utils.timeutils import monotonic, timestamp
27 27
28logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
29 29
30 30
31class EMQPService(object):
32 """
33 Helper for devices that connect to brokers.
34
35 Implements utility methods for sending EMQP messages for the following
36 EMQP commands.
37 - INFORM
38
39 Also implements utlitiy methods for managing long-running processes.
40
41 To use you must define:
42 - `self.outgoing` - socket where messages can be sent to the Router
43 - `self.SERVICE_TYPE` - defines the service type for INFORM. See
44 :meth:`send_inform` for more information.
45 - `self.poller` - the poller that `self.outgoing` will be using.
46 Usually: `self.poller = eventmq.poller.Poller()`
47
48 When messages are received from the router, they are processed in
49 :meth:`process_message` which then calls `on_COMMAND`. This should be used
50 in the event loop so if you want to respond to the SCHEDULE command, you
51 would define the method `on_schedule` in your service class.
52
53 See the code for :class:`Scheduler` and :class:`JobManager` for examples.
54 """
55 def send_inform(self, queue=None):
56 """
57 Queues an INFORM command to `self.outgoing`.
58
59 Args:
60 type_ (str): Either 'worker' or 'scheduler'
61 queue (list):
62 - For 'worker' type, the queues the worker is listening on
63 - Ignored for 'scheduler' type
64
65 Raises:
66 ValueError: When `type_` does not match a specified type
67 """
68 valid_types = (constants.CLIENT_TYPE.worker,
69 constants.CLIENT_TYPE.scheduler)
70
71 if self.SERVICE_TYPE not in valid_types:
72 raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE,
73 valid_types))
74
75 sendmsg(self.outgoing, 'INFORM', [
76 queue or conf.DEFAULT_QUEUE_NAME,
77 self.SERVICE_TYPE
78 ])
79
80 # If heartbeating is active, update the last heartbeat time
81 if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta:
82 self._meta['last_sent_heartbeat'] = monotonic()
83
84 def _setup(self):
85 """
86 Prepares the service to connect to a broker. Actions that must
87 also run on a reset are here.
88 """
89 # Look for incoming events
90 self.poller.register(self.outgoing, poller.POLLIN)
91 self.awaiting_startup_ack = False
92 self.received_disconnect = False
93
94 self.status = constants.STATUS.ready
95
96 def start(self, addr, queues=conf.DEFAULT_QUEUE_NAME):
97 """
98 Connect to `addr` and begin listening for job requests
99
100 Args:
101 addr (str): connection string to connect to
102 """
103 while not self.received_disconnect:
104 self.status = constants.STATUS.connecting
105 self.outgoing.connect(addr)
106
107 # Setting this to false is how the loop is broken and the
108 # _event_loop is started.
109 self.awaiting_startup_ack = True
110
111 # If this is inside the loop, then many inform messages will stack
112 # up on the buffer until something is actually connected to.
113 self.send_inform(queues)
114
115 # We don't want to accidentally start processing jobs before our
116 # connection has been setup completely and acknowledged.
117 while self.awaiting_startup_ack:
118 # Poller timeout is in ms so the reconnect timeout is
119 # multiplied by 1000 to get seconds
120 events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000)
121
122 if self.outgoing in events: # A message from the Router!
123 msg = self.outgoing.recv_multipart()
124 # TODO This will silently drop messages that aren't
125 # ACK/DISCONNECT
126 if msg[2] == "ACK" or msg[2] == "DISCONNECT":
127 # :meth:`on_ack` will set self.awaiting_startup_ack to
128 # False
129 self.process_message(msg)
130
131 self.status = constants.STATUS.connected
132 logger.info('Starting event loop...')
133 self._start_event_loop()
134 # When we return, soemthing has gone wrong and try to reconnect
135 # unless self.received_disconnect is True
136 if not self.received_disconnect:
137 self.reset()
138
139 logger.info('Death.')
140
141 def reset(self):
142 """
143 Resets the current connection by closing and reopening the socket
144 """
145 # Unregister the old socket from the poller
146 self.poller.unregister(self.outgoing)
147
148 # Polish up a new socket to use
149 self.outgoing.rebuild()
150
151 # Prepare the device to connect again
152 self._setup()
153
154 def process_message(self, msg):
155 """
156 Processes a message. Processing takes form of calling an
157 `on_EMQP_COMMAND` method. The method must accept `msgid` and `message`
158 as the first arguments.
159
160 Args:
161 msg: The message received from the socket to parse and process.
162 """
163 if self.is_heartbeat_enabled:
164 # Any received message should count as a heartbeat
165 self._meta['last_received_heartbeat'] = monotonic()
166 if self._meta['heartbeat_miss_count']:
167 # Reset the miss count too
168 self._meta['heartbeat_miss_count'] = 0
169
170 try:
171 message = utils.messages.parse_message(msg)
172 except exceptions.InvalidMessageError:
173 logger.exception('Invalid message: %s' % str(msg))
174 return
175
176 command = message[0].lower()
177 msgid = message[1]
178 message = message[2]
179
180 if hasattr(self, "on_%s" % command):
181 func = getattr(self, "on_%s" % command)
182 func(msgid, message)
183 else:
184 logger.warning('No handler for %s found (tried: %s)' %
185 (command.upper(), ('on_%s' % command)))
186
187 def on_ack(self, msgid, ackd_msgid):
188 """
189 Sets :attr:`awaiting_ack` to False
190 """
191 # The msgid is the only frame in the message
192 ackd_msgid = ackd_msgid[0]
193 logger.info('Received ACK for router (or client) %s' % ackd_msgid)
194 self.awaiting_startup_ack = False
195
196 def on_disconnect(self, msgid, msg):
197 # To break out of the connecting loop if necessary
198 self.awaiting_startup_ack = False
199
200 # Loops event loops should check for this and break out
201 self.received_disconnect = True
202
203 @property
204 def is_heartbeat_enabled(self):
205 """
206 Property to check if heartbeating is enabled. Useful when certain
207 properties must be updated for heartbeating
208 Returns:
209 bool - True if heartbeating is enabled, False if it isn't
210 """
211 if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta:
212 return True
213 return False
214
215
31class HeartbeatMixin(object): 216class HeartbeatMixin(object):
32 """ 217 """
33 Provides methods for implementing heartbeats 218 Provides methods for implementing heartbeats
@@ -36,6 +221,7 @@ class HeartbeatMixin(object):
36 """ 221 """
37 Sets up some variables to track the state of heartbeaty things 222 Sets up some variables to track the state of heartbeaty things
38 """ 223 """
224 super(HeartbeatMixin, self).__init__()
39 if not hasattr(self, '_meta'): 225 if not hasattr(self, '_meta'):
40 self._meta = {} 226 self._meta = {}
41 227
@@ -91,6 +277,25 @@ class HeartbeatMixin(object):
91 277
92 return False 278 return False
93 279
280 def maybe_send_heartbeat(self, events):
281 # TODO: Optimization: Move the method calls into another thread so
282 # they don't block the event loop
283 if not conf.DISABLE_HEARTBEATS:
284 now = monotonic()
285 # Send a HEARTBEAT if necessary
286 if now - self._meta['last_sent_heartbeat'] >= \
287 conf.HEARTBEAT_INTERVAL:
288 self.send_heartbeat(self.outgoing)
289
290 # Do something about any missed HEARTBEAT, if we have nothing
291 # waiting on the socket
292 if self.is_dead() and not events:
293 logger.critical(
294 'The broker appears to have gone away. '
295 'Reconnecting...')
296 return False
297 return True
298
94 299
95class ZMQReceiveMixin(object): 300class ZMQReceiveMixin(object):
96 """ 301 """
@@ -162,7 +367,7 @@ class ZMQSendMixin(object):
162 if conf.SUPER_DEBUG: 367 if conf.SUPER_DEBUG:
163 # If it's not at least 4 frames long then most likely it isn't an 368 # If it's not at least 4 frames long then most likely it isn't an
164 # eventmq message 369 # eventmq message
165 if len(msg) == 4 and \ 370 if len(msg) > 4 and \
166 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ 371 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \
167 not conf.HIDE_HEARTBEAT_LOGS: 372 not conf.HIDE_HEARTBEAT_LOGS:
168 logger.debug('Sending message: %s' % str(msg)) 373 logger.debug('Sending message: %s' % str(msg))
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py
index 9d494eb..70fd6ce 100644
--- a/eventmq/utils/timeutils.py
+++ b/eventmq/utils/timeutils.py
@@ -44,3 +44,45 @@ def seconds_until(ts):
44 time.time() 44 time.time()
45 """ 45 """
46 return ts - timestamp() 46 return ts - timestamp()
47
48
49class IntervalIter(object):
50 """
51 represents an interval (in seconds) and it's `next()` execution time
52
53 Usage:
54 # interval of 5min using monotonic clock (assume it starts at 0 for the
55 # sake of the example)
56 interval = IntervalIter(monotonic, 300)
57 # Py2
58
59 interval.next() # 300
60 interval.next() # 600
61
62 # Py3
63 next(interval) # 300
64 next(interval) # 600
65 """
66 def __init__(self, start_value, interval_secs):
67 """
68 Args:
69 start_value (numeric) - the timestamp to begin with. usually gotten
70 via :func:`monotonic` or :func:`timestamp`
71 interval_secs (int) - the number of seconds between intervals
72 """
73 self.current = start_value
74 self.interval_secs = interval_secs
75
76 # iterate the first time so the first call to .next() is interval_secs
77 # + start_value
78 self.__next__()
79
80 def __iter__(self):
81 return self
82
83 def __next__(self): # Py3
84 self.current += self.interval_secs
85 return self.current - self.interval_secs
86
87 def next(self):
88 return self.__next__()
diff --git a/requirements.txt b/requirements.txt
index ada0351..8872619 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,8 @@
1pyzmq==14.6.0 1pyzmq==15.1.0
2six==1.10.0 2six==1.10.0
3monotonic==0.4 # A clock who's time is not changed. used for scheduling 3monotonic==0.4 # A clock who's time is not changed. used for scheduling
4croniter==0.3.10 4croniter==0.3.10
5redis==2.10.3
5 6
6# Documentation 7# Documentation
7sphinxcontrib-napoleon==0.4.3 8sphinxcontrib-napoleon==0.4.3
@@ -11,3 +12,4 @@ Sphinx==1.3.1 # must come after napoleon to get the latest version
11nose==1.3.6 12nose==1.3.6
12coverage==4.0.3 13coverage==4.0.3
13testfixtures==4.7.0 # To test that logging exists 14testfixtures==4.7.0 # To test that logging exists
15mock==1.3.0