aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/scheduler2
-rwxr-xr-xbin/worker2
-rw-r--r--docs/protocol.rst36
-rw-r--r--eventmq/client/messages.py76
-rw-r--r--eventmq/jobmanager.py128
-rw-r--r--eventmq/router.py13
-rw-r--r--eventmq/scheduler.py26
-rw-r--r--eventmq/utils/classes.py168
8 files changed, 310 insertions, 141 deletions
diff --git a/bin/scheduler b/bin/scheduler
index 33f164e..6be810c 100755
--- a/bin/scheduler
+++ b/bin/scheduler
@@ -6,4 +6,4 @@ from eventmq.scheduler import Scheduler
6if __name__ == "__main__": 6if __name__ == "__main__":
7 setup_logger("eventmq") 7 setup_logger("eventmq")
8 s = Scheduler() 8 s = Scheduler()
9 s.start() 9 s.start(addr='tcp://127.0.0.1:47290')
diff --git a/bin/worker b/bin/worker
index 2dbf41b..a44f143 100755
--- a/bin/worker
+++ b/bin/worker
@@ -6,4 +6,4 @@ from eventmq.log import setup_logger
6if __name__ == "__main__": 6if __name__ == "__main__":
7 setup_logger('') 7 setup_logger('')
8 j = JobManager() 8 j = JobManager()
9 j.start() 9 j.start(addr='tcp://127.0.0.1:47291')
diff --git a/docs/protocol.rst b/docs/protocol.rst
index e468b06..a0ab6be 100644
--- a/docs/protocol.rst
+++ b/docs/protocol.rst
@@ -90,6 +90,34 @@ FRAME Value Description
906 _MSG_ The message to send 906 _MSG_ The message to send
91====== ============== =========== 91====== ============== ===========
92 92
93A **SCHEDULE** command consists of a 7-frame multipart message, formatted as follows.
94
95====== ============== ===========
96FRAME Value Description
97====== ============== ===========
980 _EMPTY_ leave empty
991 eMQP/1.0 Protocol version
1002 SCHEDULE command
1013 _MSGID_ A unique id for the msg
1024 _QUEUE_NAME_ csv seperated names of queue the worker belongs to
1035 _MSG_ The message to send
104====== ============== ===========
105
106eMQP / Scheduler
107----------------
108An **INFORM** command consists of a 6-frame multipart message, formatted as follows.
109
110====== ============== ===========
111FRAME Value Description
112====== ============== ===========
1130 _EMPTY_ leave empty
1141 eMQP/1.0 Protocol version
1152 INFORM command
1163 _MSGID_ A unique id for the msg
1174 _QUEUE_NAME_ csv seperated names of queue the worker belongs to
1185 scheduler type of peer connecting
119====== ============== ===========
120
93eMQP / Worker 121eMQP / Worker
94------------- 122-------------
95An **INFORM** command consists of a 5-frame multipart message, formatted as follows. 123An **INFORM** command consists of a 5-frame multipart message, formatted as follows.
@@ -102,6 +130,7 @@ FRAME Value Description
1022 INFORM command 1302 INFORM command
1033 _MSGID_ A unique id for the msg 1313 _MSGID_ A unique id for the msg
1044 _QUEUE_NAME_ csv seperated names of queue the worker belongs to 1324 _QUEUE_NAME_ csv seperated names of queue the worker belongs to
1335 worker type of peer connecting
105====== ============== =========== 134====== ============== ===========
106 135
107A **READY** frame consists of a 4-frame multipart message, formatted as follows. 136A **READY** frame consists of a 4-frame multipart message, formatted as follows.
@@ -154,11 +183,14 @@ Heartbeating
154------------ 183------------
155 * HEARTBEAT commands are valid at any time after an INFORM command 184 * HEARTBEAT commands are valid at any time after an INFORM command
156 * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands. 185 * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands.
157 * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals. 186 * Worker and broker MUST send heartbeats at regular and agreed-upon intervals.
187 * Scheduler and broker MUST send heartbeats at regular and agreed-upon intervals.
158 * If the worker detects that the broker disconnected it SHOULD restart the conversation. 188 * If the worker detects that the broker disconnected it SHOULD restart the conversation.
159 * If the broker detects that a worker has disconnected it should stop sending it a message of any type. 189 * If the broker detects that a worker has disconnected it should stop sending it a message of any type.
190 * If the scheduler detects that the broker disconnects it SHOULD restart the conversation.
191 * If the broker detects that a scheduler has disconnected it should ??????????.
160 192
161Request Headers 193REQUEST Headers
162--------------- 194---------------
163Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required. 195Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required.
164 196
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
index 66af8a9..801e861 100644
--- a/eventmq/client/messages.py
+++ b/eventmq/client/messages.py
@@ -26,6 +26,62 @@ from ..utils.messages import send_emqp_message
26logger = logging.getLogger(__name__) 26logger = logging.getLogger(__name__)
27 27
28 28
29def schedule(socket, func, interval_mins, args=(), kwargs=None, class_args=(),
30 class_kwargs=None, queue=conf.DEFAULT_QUEUE_NAME):
31 """
32 Execute a task on a defined interval.
33
34 Args:
35 socket (socket): eventmq socket to use for sending the message
36 func (callable): the callable to be scheduled on a worker
37 minutes (int): minutes to wait in between executions
38 args (list): list of *args to pass to the callable
39 kwargs (dict): dict of **kwargs to pass to the callable
40 class_args (list): list of *args to pass to the class (if applicable)
41 class_kwargs (dict): dict of **kwargs to pass to the class (if
42 applicable)
43 queue (str): name of the queue to use when executing the job. The
44 default value is the default queue.
45 """
46 if not class_kwargs:
47 class_kwargs = {}
48 if not kwargs:
49 kwargs = {}
50
51 if callable(func):
52 path, callable_name = build_module_path(func)
53 else:
54 logger.error('Encountered non-callable func: {}'.format(func))
55 return False
56
57 if not callable_name:
58 logger.error('Encountered callable with no name in {}'.format(
59 func.__module__
60 ))
61 return False
62
63 if not path:
64 logger.error('Encountered callable with no __module__ path {}'.format(
65 func.__name__
66 ))
67 return False
68
69 # TODO: convert all the times to seconds for the clock
70
71 # TODO: send the schedule request
72
73 msg = ['run', {
74 'callable': callable_name,
75 'path': path,
76 'args': args,
77 'kwargs': kwargs,
78 'class_args': class_args,
79 'class_kwargs': class_kwargs,
80 }]
81
82 send_schedule_request(socket, 300, msg, queue)
83
84
29def defer_job(socket, func, args=(), kwargs=None, class_args=(), 85def defer_job(socket, func, args=(), kwargs=None, class_args=(),
30 class_kwargs=None, reply_requested=False, guarantee=False, 86 class_kwargs=None, reply_requested=False, guarantee=False,
31 retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): 87 retry_count=0, queue=conf.DEFAULT_QUEUE_NAME):
@@ -94,7 +150,7 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(),
94 send_request(socket, msg, reply_requested=reply_requested, 150 send_request(socket, msg, reply_requested=reply_requested,
95 guarantee=guarantee, retry_count=retry_count, queue=queue) 151 guarantee=guarantee, retry_count=retry_count, queue=queue)
96 152
97 return True 153 return True # The message has successfully been queued for delivery
98 154
99 155
100def build_module_path(func): 156def build_module_path(func):
@@ -196,6 +252,24 @@ def send_request(socket, message, reply_requested=False, guarantee=False,
196 ) 252 )
197 253
198 254
255def send_schedule_request(socket, interval_secs, message, queue=None):
256 """
257 Send a SCHEDULE command.
258
259 Queues a message requesting that something happens on an
260 interval for the scheduler.
261
262 Args:
263 socket (socket):
264 interval_secs (int):
265 message: Message to send socket.
266 """
267 send_emqp_message(socket, 'SCHEDULE',
268 (queue or conf.DEFAULT_QUEUE_NAME,
269 str(interval_secs),
270 serialize(message)))
271
272
199def job(block=False): # Move to decorators.py 273def job(block=False): # Move to decorators.py
200 """ 274 """
201 run the decorated function on a worker 275 run the decorated function on a worker
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 9ea6800..6244c02 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -20,26 +20,26 @@ Ensures things about jobs and spawns the actual tasks
20import json 20import json
21import logging 21import logging
22 22
23from . import conf, constants, exceptions, utils 23from . import conf
24from .poller import Poller, POLLIN 24from .poller import Poller, POLLIN
25from .sender import Sender 25from .sender import Sender
26from .utils.classes import HeartbeatMixin 26from .utils.classes import EMQPService, HeartbeatMixin
27from .utils.devices import generate_device_name 27from .utils.devices import generate_device_name
28from .utils.messages import send_emqp_message as sendmsg 28from .utils.messages import send_emqp_message as sendmsg
29import utils.messages
30from .utils.timeutils import monotonic 29from .utils.timeutils import monotonic
31from .worker import MultiprocessWorker as Worker 30from .worker import MultiprocessWorker as Worker
32 31
33logger = logging.getLogger(__name__) 32logger = logging.getLogger(__name__)
34 33
35 34
36class JobManager(HeartbeatMixin): 35class JobManager(HeartbeatMixin, EMQPService):
37 """ 36 """
38 The exposed portion of the worker. The job manager's main responsibility is 37 The exposed portion of the worker. The job manager's main responsibility is
39 to manage the resources on the server it's running. 38 to manage the resources on the server it's running.
40 39
41 This job manager uses tornado's eventloop. 40 This job manager uses tornado's eventloop.
42 """ 41 """
42 SERVICE_TYPE = 'worker'
43 43
44 def __init__(self, *args, **kwargs): 44 def __init__(self, *args, **kwargs):
45 """ 45 """
@@ -64,7 +64,8 @@ class JobManager(HeartbeatMixin):
64 #: JobManager starts out by INFORMing the router of it's existance, 64 #: JobManager starts out by INFORMing the router of it's existance,
65 #: then telling the router that it is READY. The reply will be the unit 65 #: then telling the router that it is READY. The reply will be the unit
66 #: of work. 66 #: of work.
67 self.incoming = Sender() 67 # Despite the name, jobs are received on this socket
68 self.outgoing = Sender()
68 69
69 #: Jobs that are running should be stored in `active_jobs`. There 70 #: Jobs that are running should be stored in `active_jobs`. There
70 #: should always be at most `available_workers` count of active jobs. 71 #: should always be at most `available_workers` count of active jobs.
@@ -75,81 +76,21 @@ class JobManager(HeartbeatMixin):
75 76
76 self._setup() 77 self._setup()
77 78
78 def _setup(self):
79 """
80 Prepares JobManager ready to connect to a broker. Actions that must
81 also run on a reset are here.
82 """
83 # Look for incoming events
84 self.poller.register(self.incoming, POLLIN)
85 self.awaiting_startup_ack = False
86
87 self.status = constants.STATUS.ready
88
89 def start(self, addr='tcp://127.0.0.1:47291'):
90 """
91 Connect to `addr` and begin listening for job requests
92
93 Args:
94 addr (str): connection string to connect to
95 """
96 while True:
97 self.status = constants.STATUS.connecting
98 self.incoming.connect(addr)
99
100 self.awaiting_startup_ack = True
101 self.send_inform()
102
103 # We don't want to accidentally start processing jobs before our
104 # connection has been setup completely and acknowledged.
105 while self.awaiting_startup_ack:
106 # Poller timeout is in ms so the reconnect timeout is
107 # multiplied by 1000 to get seconds
108 events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000)
109
110 if self.incoming in events: # A message from the Router!
111 msg = self.incoming.recv_multipart()
112 # TODO This will silently drop messages that aren't ACK
113 if msg[2] == "ACK":
114 # :meth:`on_ack` will set self.awaiting_startup_ack to
115 # False
116 self.process_message(msg)
117
118 # Acknowledgment has come
119 # Send a READY for each available worker
120 for i in range(0, self.available_workers):
121 self.send_ready()
122
123 self.status = constants.STATUS.connected
124 logger.info('Starting to listen for jobs')
125 self._start_event_loop()
126 # When we return, soemthing has gone wrong and we should try to
127 # reconnect
128 self.reset()
129
130 def reset(self):
131 """
132 Resets the current connection by closing and reopening the socket
133 """
134 # Unregister the old socket from the poller
135 self.poller.unregister(self.incoming)
136
137 # Polish up a new socket to use
138 self.incoming.rebuild()
139
140 # Prepare the device to connect again
141 self._setup()
142
143 def _start_event_loop(self): 79 def _start_event_loop(self):
144 """ 80 """
145 Starts the actual eventloop. Usually called by :meth:`JobManager.start` 81 Starts the actual eventloop. Usually called by :meth:`start`
146 """ 82 """
83 # Acknowledgment has come
84 # Send a READY for each available worker
85 for i in range(0, self.available_workers):
86 self.send_ready()
87
147 while True: 88 while True:
148 now = monotonic() 89 now = monotonic()
149 events = self.poller.poll() 90 events = self.poller.poll()
150 91
151 if events.get(self.incoming) == POLLIN: 92 if events.get(self.outgoing) == POLLIN:
152 msg = self.incoming.recv_multipart() 93 msg = self.outgoing.recv_multipart()
153 self.process_message(msg) 94 self.process_message(msg)
154 95
155 # Maintain the list of active jobs 96 # Maintain the list of active jobs
@@ -165,7 +106,7 @@ class JobManager(HeartbeatMixin):
165 # Send a HEARTBEAT if necessary 106 # Send a HEARTBEAT if necessary
166 if now - self._meta['last_sent_heartbeat'] >= \ 107 if now - self._meta['last_sent_heartbeat'] >= \
167 conf.HEARTBEAT_INTERVAL: 108 conf.HEARTBEAT_INTERVAL:
168 self.send_heartbeat(self.incoming) 109 self.send_heartbeat(self.outgoing)
169 110
170 # Do something about any missed HEARTBEAT, if we have nothing 111 # Do something about any missed HEARTBEAT, if we have nothing
171 # waiting on the socket 112 # waiting on the socket
@@ -215,49 +156,12 @@ class JobManager(HeartbeatMixin):
215 156
216 self.available_workers -= 1 157 self.available_workers -= 1
217 158
218 def process_message(self, msg):
219 """
220 Processes a message
221
222 Args:
223 msg: The message received from the socket to parse and process.
224 Processing takes form of calling an `on_COMMAND` method.
225 """
226 # Any received message should count as a heartbeat
227 self._meta['last_received_heartbeat'] = monotonic()
228 if self._meta['heartbeat_miss_count']:
229 self._meta['heartbeat_miss_count'] = 0 # Reset the miss count too
230
231 try:
232 message = utils.messages.parse_message(msg)
233 except exceptions.InvalidMessageError:
234 logger.error('Invalid message: %s' % str(msg))
235 return
236
237 command = message[0]
238 msgid = message[1]
239 message = message[2]
240
241 if hasattr(self, "on_%s" % command.lower()):
242 func = getattr(self, "on_%s" % command.lower())
243 func(msgid, message)
244 else:
245 logger.warning('No handler for %s found (tried: %s)' %
246 (command, ('on_%s' % command.lower())))
247
248 def send_ready(self): 159 def send_ready(self):
249 """ 160 """
250 send the READY command upstream to indicate that JobManager is ready 161 send the READY command upstream to indicate that JobManager is ready
251 for another REQUEST message. 162 for another REQUEST message.
252 """ 163 """
253 sendmsg(self.incoming, 'READY') 164 sendmsg(self.outgoing, 'READY')
254
255 def send_inform(self, queue=None):
256 """
257 Send an INFORM command
258 """
259 sendmsg(self.incoming, 'INFORM', queue or conf.DEFAULT_QUEUE_NAME)
260 self._meta['last_sent_heartbeat'] = monotonic()
261 165
262 def on_ack(self, msgid, ackd_msgid): 166 def on_ack(self, msgid, ackd_msgid):
263 """ 167 """
diff --git a/eventmq/router.py b/eventmq/router.py
index 23700bf..d896853 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -82,6 +82,11 @@ class Router(HeartbeatMixin):
82 #: workers available to take the job 82 #: workers available to take the job
83 self.waiting_messages = {} 83 self.waiting_messages = {}
84 84
85 #: Scheduler clients. Clients are able to send SCHEDULE commands that
86 #: need to be routed to a scheduler, which will keep track of time and
87 #: run the job.
88 self.schedulers = []
89
85 def start(self, 90 def start(self,
86 frontend_addr='tcp://127.0.0.1:47290', 91 frontend_addr='tcp://127.0.0.1:47290',
87 backend_addr='tcp://127.0.0.1:47291'): 92 backend_addr='tcp://127.0.0.1:47291'):
@@ -176,9 +181,9 @@ class Router(HeartbeatMixin):
176 Handles an INFORM message. This happens when new worker coming online 181 Handles an INFORM message. This happens when new worker coming online
177 and announces itself. 182 and announces itself.
178 """ 183 """
179 logger.info('Received INFORM request from %s' % sender)
180 queue_name = msg[0] 184 queue_name = msg[0]
181 185 client_type = msg[1]
186 logger.info('Received INFORM request from {} (type: {})'.format(sender, client_type))
182 self.add_worker(sender, queue_name) 187 self.add_worker(sender, queue_name)
183 188
184 self.send_ack(self.outgoing, sender, msgid) 189 self.send_ack(self.outgoing, sender, msgid)
@@ -305,8 +310,8 @@ class Router(HeartbeatMixin):
305 310
306 # If we have no workers for the queue TODO something about it 311 # If we have no workers for the queue TODO something about it
307 if queue_name not in self.queues: 312 if queue_name not in self.queues:
308 logger.warning("Received REQUEST with a queue I don't recognize: " 313 logger.warning("Received %s with a queue I don't recognize: "
309 "%s" % queue_name) 314 "%s" % (msg[3], queue_name))
310 logger.critical("Discarding message") 315 logger.critical("Discarding message")
311 # TODO: Don't discard the message 316 # TODO: Don't discard the message
312 return 317 return
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index b1b3fb6..24e95f5 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -24,7 +24,8 @@ from croniter import croniter
24from six import next 24from six import next
25 25
26from .sender import Sender 26from .sender import Sender
27from .utils.classes import HeartbeatMixin 27from .poller import Poller
28from .utils.classes import EMQPService, HeartbeatMixin
28from .utils.timeutils import seconds_until, timestamp 29from .utils.timeutils import seconds_until, timestamp
29from .client.messages import send_request 30from .client.messages import send_request
30 31
@@ -32,28 +33,27 @@ from .client.messages import send_request
32logger = logging.getLogger(__name__) 33logger = logging.getLogger(__name__)
33 34
34 35
35class Scheduler(HeartbeatMixin): 36class Scheduler(HeartbeatMixin, EMQPService):
36 """ 37 """
37 Keeper of time, master of schedules 38 Keeper of time, master of schedules
38 """ 39 """
39 40 SERVICE_TYPE = 'scheduler'
40 def __init__(self, *args, **kwargs): 41 def __init__(self, *args, **kwargs):
41 logger.info('Initializing Scheduler...') 42 logger.info('Initializing Scheduler...')
42 super(Scheduler, self).__init__(*args, **kwargs) 43 super(Scheduler, self).__init__(*args, **kwargs)
43 self.outgoing = Sender() 44 self.outgoing = Sender()
44 45
46 # IDX Description
45 # 0 = the next ts this job should be executed 47 # 0 = the next ts this job should be executed
46 # 1 = the function to be executed 48 # 1 = the function to be executed
47 # 2 = the croniter iterator for this job 49 # 2 = the croniter iterator for this job
48 self.jobs = [] 50 self.jobs = []
49 51
52 self.poller = Poller()
53
50 self.load_jobs() 54 self.load_jobs()
51 55
52 def connect(self, addr='tcp://127.0.0.1:47290'): 56 self._setup()
53 """
54 Connect the scheduler to worker/router at `addr`
55 """
56 self.outgoing.connect(addr)
57 57
58 def load_jobs(self): 58 def load_jobs(self):
59 """ 59 """
@@ -75,14 +75,6 @@ class Scheduler(HeartbeatMixin):
75 c_next = next(c) 75 c_next = next(c)
76 self.jobs.append([c_next, job[1], c]) 76 self.jobs.append([c_next, job[1], c])
77 77
78 def start(self, addr='tcp://127.0.0.1:47290'):
79 """
80 Begin sending messages to execute scheduled jobs
81 """
82 self.connect(addr)
83
84 self._start_event_loop()
85
86 def _start_event_loop(self): 78 def _start_event_loop(self):
87 """ 79 """
88 Starts the actual event loop. Usually called by :meth:`Scheduler.start` 80 Starts the actual event loop. Usually called by :meth:`Scheduler.start`
@@ -117,6 +109,4 @@ class Scheduler(HeartbeatMixin):
117def test_job(): 109def test_job():
118 print "hello!" 110 print "hello!"
119 print "hello!" 111 print "hello!"
120 print "hello!"
121 print "hello!"
122 time.sleep(4) 112 time.sleep(4)
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index e98b9b7..b6ae03c 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -21,13 +21,176 @@ import logging
21 21
22import zmq.error 22import zmq.error
23 23
24from .. import conf, exceptions 24from .. import conf, constants, exceptions, poller, utils
25from ..utils.messages import send_emqp_message as sendmsg 25from ..utils.messages import send_emqp_message as sendmsg
26from ..utils.timeutils import monotonic, timestamp 26from ..utils.timeutils import monotonic, timestamp
27 27
28logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
29 29
30 30
31class EMQPService(object):
32 """
33 Helper for devices that connect to brokers.
34
35 Implements utility methods for sending EMQP messages for the following
36 EMQP commands.
37 - INFORM
38
39 Also implements utlitiy methods for managing long-running processes.
40
41 To use you must define:
42 - `self.outgoing` - socket where messages can be sent to the Router
43 - `self.SERVICE_TYPE` - defines the service type for INFORM. See
44 :meth:`send_inform` for more information.
45 - `self.poller` - the poller that `self.outgoing` will be using.
46 Usually: `self.poller = eventmq.poller.Poller()`
47
48 When messages are received from the router, they are processed in
49 :meth:`process_message` which then calls `on_COMMAND`, so if you want to
50 respond to the SCHEDULE command, you would define the method `on_schedule`
51 in your service class.
52
53 See the code for :class:`Scheduler` and :class:`JobManager` for examples.
54 """
55 def send_inform(self, queue=None):
56 """
57 Queues an INFORM command to `self.outgoing`.
58
59 Args:
60 type_ (str): Either 'worker' or 'scheduler'
61 queue (list):
62 - For 'worker' type, the queues the worker is listening on
63 - Ignored for 'scheduler' type
64
65 Raises:
66 ValueError: When `type_` does not match a specified type
67 """
68 valid_types = ('worker', 'scheduler')
69
70 if self.SERVICE_TYPE not in valid_types:
71 raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE,
72 valid_types))
73
74 sendmsg(self.outgoing, 'INFORM', [
75 queue or conf.DEFAULT_QUEUE_NAME,
76 self.SERVICE_TYPE
77 ])
78
79 # If heartbeating is active, update the last heartbeat time
80 if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta:
81 self._meta['last_sent_heartbeat'] = monotonic()
82
83 def _setup(self):
84 """
85 Prepares the service to connect to a broker. Actions that must
86 also run on a reset are here.
87 """
88 # Look for incoming events
89 self.poller.register(self.outgoing, poller.POLLIN)
90 self.awaiting_startup_ack = False
91
92 self.status = constants.STATUS.ready
93
94 def start(self, addr, queues=conf.DEFAULT_QUEUE_NAME):
95 """
96 Connect to `addr` and begin listening for job requests
97
98 Args:
99 addr (str): connection string to connect to
100 """
101 while True:
102 self.status = constants.STATUS.connecting
103 self.outgoing.connect(addr)
104
105 # Setting this to false is how the loop is broken and the
106 # _event_loop is started.
107 self.awaiting_startup_ack = True
108
109 # If this is inside the loop, then many inform messages will stack
110 # up on the buffer until something is actually connected to.
111 self.send_inform(queues)
112
113 # We don't want to accidentally start processing jobs before our
114 # connection has been setup completely and acknowledged.
115 while self.awaiting_startup_ack:
116 # Poller timeout is in ms so the reconnect timeout is
117 # multiplied by 1000 to get seconds
118 events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000)
119
120 if self.outgoing in events: # A message from the Router!
121 msg = self.outgoing.recv_multipart()
122 # TODO This will silently drop messages that aren't ACK
123 if msg[2] == "ACK":
124 # :meth:`on_ack` will set self.awaiting_startup_ack to
125 # False
126 self.process_message(msg)
127
128 self.status = constants.STATUS.connected
129 logger.info('Starting event loop...')
130 self._start_event_loop()
131 # When we return, soemthing has gone wrong and we should try to
132 # reconnect
133 self.reset()
134
135 def reset(self):
136 """
137 Resets the current connection by closing and reopening the socket
138 """
139 # Unregister the old socket from the poller
140 self.poller.unregister(self.incoming)
141
142 # Polish up a new socket to use
143 self.outgoing.rebuild()
144
145 # Prepare the device to connect again
146 self._setup()
147
148 def process_message(self, msg):
149 """
150 Processes a message. Processing takes form of calling an
151 `on_EMQP_COMMAND` method. The method must accept `msgid` and `message`
152 as the first arguments.
153
154 Args:
155 msg: The message received from the socket to parse and process.
156 """
157 if self.is_heartbeat_enabled:
158 # Any received message should count as a heartbeat
159 self._meta['last_received_heartbeat'] = monotonic()
160 if self._meta['heartbeat_miss_count']:
161 # Reset the miss count too
162 self._meta['heartbeat_miss_count'] = 0
163
164 try:
165 message = utils.messages.parse_message(msg)
166 except exceptions.InvalidMessageError:
167 logger.exception('Invalid message: %s' % str(msg))
168 return
169
170 command = message[0].lower()
171 msgid = message[1]
172 message = message[2]
173
174 if hasattr(self, "on_%s" % command):
175 func = getattr(self, "on_%s" % command)
176 func(msgid, message)
177 else:
178 logger.warning('No handler for %s found (tried: %s)' %
179 (command.upper(), ('on_%s' % command)))
180
181 @property
182 def is_heartbeat_enabled(self):
183 """
184 Property to check if heartbeating is enabled. Useful when certain
185 properties must be updated for heartbeating
186 Returns:
187 bool - True if heartbeating is enabled, False if it isn't
188 """
189 if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta:
190 return True
191 return False
192
193
31class HeartbeatMixin(object): 194class HeartbeatMixin(object):
32 """ 195 """
33 Provides methods for implementing heartbeats 196 Provides methods for implementing heartbeats
@@ -36,6 +199,7 @@ class HeartbeatMixin(object):
36 """ 199 """
37 Sets up some variables to track the state of heartbeaty things 200 Sets up some variables to track the state of heartbeaty things
38 """ 201 """
202 super(HeartbeatMixin, self).__init__()
39 if not hasattr(self, '_meta'): 203 if not hasattr(self, '_meta'):
40 self._meta = {} 204 self._meta = {}
41 205
@@ -162,7 +326,7 @@ class ZMQSendMixin(object):
162 if conf.SUPER_DEBUG: 326 if conf.SUPER_DEBUG:
163 # If it's not at least 4 frames long then most likely it isn't an 327 # If it's not at least 4 frames long then most likely it isn't an
164 # eventmq message 328 # eventmq message
165 if len(msg) == 4 and \ 329 if len(msg) > 4 and \
166 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ 330 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \
167 not conf.HIDE_HEARTBEAT_LOGS: 331 not conf.HIDE_HEARTBEAT_LOGS:
168 logger.debug('Sending message: %s' % str(msg)) 332 logger.debug('Sending message: %s' % str(msg))