jobmanager – Job Manager

Ensures things about jobs and spawns the actual tasks

class eventmq.jobmanager.JobManager(*args, **kwargs)

The exposed portion of the worker. The job manager’s main responsibility is to manage the resources on the server it’s running.

This job manager uses multiprocessing Queues

__init__(*args, **kwargs)

Note

All args are optional unless otherwise noted.

Parameters:
  • name (str) – unique name of this instance. By default a uuid will be generated.
  • queues (tuple) – List of queue names to listen on.
  • skip_signal (bool) – Don’t register the signal handlers. Useful for testing.
concurrent_jobs = None

keep track of workers

jobmanager_main(broker_addr=None)

Kick off jobmanager with logging and settings import

Parameters:broker_addr (str) – The address of the broker to connect to.
name = None

Define the name of this JobManager instance. Useful to know when referring to the logs.

on_heartbeat(msgid, message)

a placeholder for a noop command. The actual ‘logic’ for HEARTBEAT is in self.process_message() as every message is counted as a HEARTBEAT

on_request(msgid, msg)

Handles a REQUEST command

Messages are formatted like this: [subcmd(str), {

...options...

}]

Subcommands:
run - run some callable. Options:
{

‘callable’: func or method name (eg. walk), ‘path’: module path (eg. os.path), ‘args’: (optional) list of args, ‘kwargs’: (optional) dict of kwargs, ‘class_args’: (optional) list of args for class

instantiation,

‘class_kwargs’: (optional) dict of kwargs for class,

}

outgoing = None

JobManager starts out by INFORMing the router of it’s existence, then telling the router that it is READY. The reply will be the unit of work.

queues = None

List of queues that this job manager is listening on

send_ready()

send the READY command upstream to indicate that JobManager is ready for another REQUEST message.

send_reply(res)

Sends an REPLY response

Parameters:
  • socket (socket) – The socket to use for this ack
  • recipient (str) – The recipient id for the ack
  • msgid – The unique id that we are acknowledging
eventmq.jobmanager.mp_init()

The instance of Context is copied when python multiprocessing fork()s the worker processes, so we need to terminate that Context so a new one can be rebuilt. Without doing this, messages sent from functions in those child processes will never be delivered.