From 57329aba1e888b64d56ac2386466791304cdf86a Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 10 Apr 2018 16:25:27 -0600 Subject: Add job setup and cleanup functions This adds 2 new config options: `job_entry_func` and `job_exit_func`. These functions are executed before and after every single job execution. The rationale behind this is before and after each request Django cleans up stale database connections, so Django jobs need some way of running this same setup/cleanup functions. https://github.com/django/django/blob/master/django/db/__init__.py#L57 ``` # Register an event to reset transaction state and close connections past # their lifetime. def close_old_connections(**kwargs): for conn in connections.all(): conn.close_if_unusable_or_obsolete() signals.request_started.connect(close_old_connections) signals.request_finished.connect(close_old_connections) ``` fixes: #41 --- etc/eventmq.conf-dist | 15 ++++- eventmq/__init__.py | 2 +- eventmq/conf.py | 9 +++ eventmq/tests/test_worker.py | 142 ++++++++++++++++++++++++++++++++++++++++--- eventmq/worker.py | 38 ++++++++++-- setup.py | 2 +- 6 files changed, 192 insertions(+), 16 deletions(-) diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 43ec6f2..1ff09c4 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist @@ -19,9 +19,22 @@ scheduler_addr=tcp://eventmq:47291 [jobmanager] worker_addr=tcp://127.0.0.1:47290 -queues=[[50,"google"], [40,"pushes"], [10,"default"]] + +# Defines the weight and name of queues this worker deals with. +queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]] + +# Specifies the number of of sub-processes to spawn to process jobs concurrently concurrent_jobs=2 +# This function is executed when EventMQ creates a new worker subprocess +# subprocess_setup_func = 'path.to.my_setup_function' + +# This function is executed before every job +# job_entry_func = 'path.to.my_job_setup_function' + +# This function is executed after every job +# job_exit_func = 'path.to.my_job_teardown_function' + [publisher] publisher_incoming_addr=tcp://0.0.0.0:47298 publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file diff --git a/eventmq/__init__.py b/eventmq/__init__.py index aefc3a2..2e180ed 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py @@ -1,5 +1,5 @@ __author__ = 'EventMQ Contributors' -__version__ = '0.3.5.6' +__version__ = '0.3.6' PROTOCOL_VERSION = 'eMQP/1.0' diff --git a/eventmq/conf.py b/eventmq/conf.py index 8609716..715bbb2 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py @@ -89,9 +89,18 @@ RQ_PASSWORD = '' MAX_JOB_COUNT = 1024 # Path/Callable to run on start of a worker process +# These options are deprecated for the more user-friendly +# SUBPROCESS_SETUP_FUNC which can be a full path to a function. SETUP_PATH = '' SETUP_CALLABLE = '' +# Function to run on the start of a new worker subprocess +SUBPROCESS_SETUP_FUNC = '' + +# function to be run before the execution of every job +JOB_ENTRY_FUNC = '' +# function to be run after the execution of every job +JOB_EXIT_FUNC = '' # Time to wait after receiving SIGTERM to kill the workers in the jobmanager # forecfully KILL_GRACE_PERIOD = 300 diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py index 5e72f39..b8e4f3e 100644 --- a/eventmq/tests/test_worker.py +++ b/eventmq/tests/test_worker.py @@ -15,11 +15,19 @@ import logging import time +import sys +import mock from .. import worker -ADDR = 'inproc://pour_the_rice_in_the_thing' +if sys.version[0] == '2': + import Queue +else: + import queue as Queue + +ADDR = 'inproc://pour_the_rice_in_the_thing' +SETUP_SUCCESS_RETVAL = 'job setup success' def test_run_with_timeout(): payload = { @@ -33,11 +41,124 @@ def test_run_with_timeout(): assert msgid +@mock.patch('eventmq.worker.callable_from_name') +def test_run_job_setup_hook(callable_from_name_mock): + from eventmq import conf + + setup_func_str = 'eventmq.tests.test_worker.job_setup_hook' + callable_from_name_mock.return_value = mock.Mock() + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.JOB_ENTRY_FUNC = setup_func_str + worker._run(q, res_q, logging.getLogger()) + finally: + conf.JOB_ENTRY_FUNC = '' + + callable_from_name_mock.assert_called_with(setup_func_str) + assert callable_from_name_mock.return_value.call_count == 1 + + +@mock.patch('eventmq.worker.callable_from_name') +def test_run_job_teardown_hook(callable_from_name_mock): + from eventmq import conf + + teardown_func_str = 'eventmq.tests.test_worker.job_teardown_hook' + callable_from_name_mock.return_value = mock.Mock() + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.JOB_EXIT_FUNC = teardown_func_str + worker._run(q, res_q, logging.getLogger()) + finally: + conf.JOB_EXIT_FUNC = '' + + callable_from_name_mock.assert_called_with(teardown_func_str) + assert callable_from_name_mock.return_value.call_count == 1 + + +@mock.patch('eventmq.worker.callable_from_name') +def test_run_subprocess_setup_func(callable_from_name_mock): + from eventmq import conf + + setup_func_str = 'eventmq.tests.test_worker.process_setup_hook' + callable_from_name_mock.return_value = mock.Mock() + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.SUBPROCESS_SETUP_FUNC = setup_func_str + worker._run(q, res_q, logging.getLogger()) + finally: + conf.SUBPROCESS_SETUP_FUNC = '' + + callable_from_name_mock.assert_called_with(setup_func_str) + assert callable_from_name_mock.return_value.call_count == 1 + + +@mock.patch('eventmq.worker.run_setup') +def test_run_run_setup_func(run_setup_mock): + from eventmq import conf + + setup_func_path = 'eventmq.tests.test_worker' + setup_func_callable = 'process_setup_hook' + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.SETUP_PATH = setup_func_path + conf.SETUP_CALLABLE = setup_func_callable + worker._run(q, res_q, logging.getLogger()) + finally: + conf.SETUP_PATH = '' + conf.SETUP_CALLABLE = '' + + run_setup_mock.assert_called_with(setup_func_path, setup_func_callable) + + def test_run_setup(): - setup_callable = 'pre_hook' + setup_callable = 'process_setup_hook' setup_path = 'eventmq.tests.test_worker' - worker.run_setup(setup_path, setup_callable) + assert worker.run_setup(setup_path, setup_callable) def job(sleep_time=0): @@ -46,9 +167,16 @@ def job(sleep_time=0): return True -def pre_hook(): - return 1 +def process_setup_hook(): + print 'process setup hook executed' + return True -def post_hook(): - return 1 +def job_setup_hook(): + print 'job setup hook executed' + return SETUP_SUCCESS_RETVAL + + +def job_teardown_hook(): + print 'job teardown hook executed' + return True diff --git a/eventmq/worker.py b/eventmq/worker.py index 1621578..aa03284 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py @@ -18,16 +18,13 @@ Defines different short-lived workers that execute jobs """ from importlib import import_module - import logging - from multiprocessing import Process - import os import sys - from threading import Thread +from .utils.functions import callable_from_name from . import conf if sys.version[0] == '2': @@ -180,7 +177,19 @@ def _run(queue, result_queue, logger): "class_kwargs": {"value": 2} } """ - if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): + if any(conf.SUBPROCESS_SETUP_FUNC): + try: + logger.debug("Running setup ({}) for worker id {}".format( + conf.SUBPROCESS_SETUP_FUNC, os.getpid())) + setup_func = callable_from_name(conf.SUBPROCESS_SETUP_FUNC) + setup_func() + except Exception as e: + logger.warning('Unable to do setup task ({}): {}' + .format(conf.SUBPROCESS_SETUP_FUNC, str(e))) + + elif any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): + logger.warning("SETUP_CALLABLE and SETUP_PATH deprecated in favor for " + "SUBPROCESS_SETUP_FUNC") try: logger.debug("Running setup ({}.{}) for worker id {}" .format( @@ -193,6 +202,16 @@ def _run(queue, result_queue, logger): .format(conf.SETUP_PATH, conf.SETUP_CALLABLE, str(e))) + if conf.JOB_ENTRY_FUNC: + job_entry_func = callable_from_name(conf.JOB_ENTRY_FUNC) + else: + job_entry_func = None + + if conf.JOB_EXIT_FUNC: + job_exit_func = callable_from_name(conf.JOB_EXIT_FUNC) + else: + job_exit_func = None + while True: # Blocking get so we don't spin cycles reading over and over try: @@ -204,7 +223,14 @@ def _run(queue, result_queue, logger): if payload == 'DONE': break + if job_entry_func: + job_entry_func() + return_val = _run_job(payload, logger) + + if job_exit_func: + job_exit_func() + # Signal that we're done with this job and put its return value on the # result queue result_queue.put(return_val) @@ -278,4 +304,4 @@ def run_setup(setup_path, setup_callable): setup_callable_ = getattr(setup_package, setup_callable) - setup_callable_() + return setup_callable_() diff --git a/setup.py b/setup.py index 64afbd9..9bacfbf 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ from setuptools import find_packages, setup setup( name='eventmq', - version='0.3.5.6', + version='0.3.6', description='EventMQ job execution and messaging system based on ZeroMQ', packages=find_packages(), install_requires=['pyzmq==15.4.0', -- cgit v1.2.1 From 54e304200a869a33db12e18e34f6952f06fe991f Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 10 Apr 2018 16:32:38 -0600 Subject: Display EventMQ version on daemon startup --- etc/eventmq.conf-dist | 6 +++--- eventmq/jobmanager.py | 2 ++ eventmq/router.py | 7 +++++-- eventmq/scheduler.py | 2 ++ 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 1ff09c4..9a59f2a 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist @@ -27,13 +27,13 @@ queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]] concurrent_jobs=2 # This function is executed when EventMQ creates a new worker subprocess -# subprocess_setup_func = 'path.to.my_setup_function' +# subprocess_setup_func = path.to.my_setup_function # This function is executed before every job -# job_entry_func = 'path.to.my_job_setup_function' +# job_entry_func = path.to.my_job_setup_function # This function is executed after every job -# job_exit_func = 'path.to.my_job_teardown_function' +# job_exit_func = path.to.my_job_teardown_function [publisher] publisher_incoming_addr=tcp://0.0.0.0:47298 diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 78a080b..f4a928c 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py @@ -29,6 +29,7 @@ import time import zmq from eventmq.log import setup_logger +from . import __version__ from . import conf from .constants import KBYE, STATUS from .poller import Poller, POLLIN @@ -80,6 +81,7 @@ class JobManager(HeartbeatMixin, EMQPService): #: Define the name of this JobManager instance. Useful to know when #: referring to the logs. self.name = kwargs.pop('name', None) or generate_device_name() + logger.info('EventMQ Version {}'.format(__version__)) logger.info('Initializing JobManager {}...'.format(self.name)) #: keep track of workers diff --git a/eventmq/router.py b/eventmq/router.py index 0b896b3..81ca257 100644 --- a/eventmq/router.py +++ b/eventmq/router.py @@ -23,6 +23,7 @@ import logging import signal from eventmq.log import setup_logger, setup_wal_logger +from . import __version__ from . import conf, constants, exceptions, poller, receiver from .constants import ( CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, @@ -51,8 +52,11 @@ class Router(HeartbeatMixin): def __init__(self, *args, **kwargs): super(Router, self).__init__(*args, **kwargs) # Creates _meta + setup_logger("eventmq") + self.name = generate_device_name() - logger.info('Initializing Router %s...' % self.name) + logger.info('EventMQ Version {}'.format(__version__)) + logger.info('Initializing Router {}...'.format(self.name)) self.poller = poller.Poller() @@ -943,7 +947,6 @@ class Router(HeartbeatMixin): """ Kick off router with logging and settings import """ - setup_logger('eventmq') import_settings() setup_wal_logger('eventmq-wal', conf.WAL) self.start(frontend_addr=conf.FRONTEND_ADDR, diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 32c9b46..2ff726a 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py @@ -29,6 +29,7 @@ from six import next from eventmq.log import setup_logger +from . import __version__ from . import conf, constants from .client.messages import send_request from .constants import KBYE @@ -53,6 +54,7 @@ class Scheduler(HeartbeatMixin, EMQPService): def __init__(self, *args, **kwargs): self.name = kwargs.get('name', None) + logger.info('EventMQ Version {}'.format(__version__)) logger.info('Initializing Scheduler...') import_settings() super(Scheduler, self).__init__(*args, **kwargs) -- cgit v1.2.1 From 777800c2acd2b760bbfaec47f66114320c47e4bb Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 10 Apr 2018 16:52:14 -0600 Subject: fix style errors --- eventmq/tests/test_worker.py | 7 +++---- eventmq/worker.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py index b8e4f3e..ec13be6 100644 --- a/eventmq/tests/test_worker.py +++ b/eventmq/tests/test_worker.py @@ -14,10 +14,11 @@ # along with eventmq. If not, see . import logging -import time import sys +import time import mock + from .. import worker if sys.version[0] == '2': @@ -29,6 +30,7 @@ else: ADDR = 'inproc://pour_the_rice_in_the_thing' SETUP_SUCCESS_RETVAL = 'job setup success' + def test_run_with_timeout(): payload = { 'path': 'eventmq.tests.test_worker', @@ -168,15 +170,12 @@ def job(sleep_time=0): def process_setup_hook(): - print 'process setup hook executed' return True def job_setup_hook(): - print 'job setup hook executed' return SETUP_SUCCESS_RETVAL def job_teardown_hook(): - print 'job teardown hook executed' return True diff --git a/eventmq/worker.py b/eventmq/worker.py index aa03284..1f3a6fa 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py @@ -24,8 +24,8 @@ import os import sys from threading import Thread -from .utils.functions import callable_from_name from . import conf +from .utils.functions import callable_from_name if sys.version[0] == '2': import Queue -- cgit v1.2.1