From 57329aba1e888b64d56ac2386466791304cdf86a Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 10 Apr 2018 16:25:27 -0600 Subject: Add job setup and cleanup functions This adds 2 new config options: `job_entry_func` and `job_exit_func`. These functions are executed before and after every single job execution. The rationale behind this is before and after each request Django cleans up stale database connections, so Django jobs need some way of running this same setup/cleanup functions. https://github.com/django/django/blob/master/django/db/__init__.py#L57 ``` # Register an event to reset transaction state and close connections past # their lifetime. def close_old_connections(**kwargs): for conn in connections.all(): conn.close_if_unusable_or_obsolete() signals.request_started.connect(close_old_connections) signals.request_finished.connect(close_old_connections) ``` fixes: #41 --- etc/eventmq.conf-dist | 15 ++++- eventmq/__init__.py | 2 +- eventmq/conf.py | 9 +++ eventmq/tests/test_worker.py | 142 ++++++++++++++++++++++++++++++++++++++++--- eventmq/worker.py | 38 ++++++++++-- setup.py | 2 +- 6 files changed, 192 insertions(+), 16 deletions(-) diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 43ec6f2..1ff09c4 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist @@ -19,9 +19,22 @@ scheduler_addr=tcp://eventmq:47291 [jobmanager] worker_addr=tcp://127.0.0.1:47290 -queues=[[50,"google"], [40,"pushes"], [10,"default"]] + +# Defines the weight and name of queues this worker deals with. +queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]] + +# Specifies the number of of sub-processes to spawn to process jobs concurrently concurrent_jobs=2 +# This function is executed when EventMQ creates a new worker subprocess +# subprocess_setup_func = 'path.to.my_setup_function' + +# This function is executed before every job +# job_entry_func = 'path.to.my_job_setup_function' + +# This function is executed after every job +# job_exit_func = 'path.to.my_job_teardown_function' + [publisher] publisher_incoming_addr=tcp://0.0.0.0:47298 publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file diff --git a/eventmq/__init__.py b/eventmq/__init__.py index aefc3a2..2e180ed 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py @@ -1,5 +1,5 @@ __author__ = 'EventMQ Contributors' -__version__ = '0.3.5.6' +__version__ = '0.3.6' PROTOCOL_VERSION = 'eMQP/1.0' diff --git a/eventmq/conf.py b/eventmq/conf.py index 8609716..715bbb2 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py @@ -89,9 +89,18 @@ RQ_PASSWORD = '' MAX_JOB_COUNT = 1024 # Path/Callable to run on start of a worker process +# These options are deprecated for the more user-friendly +# SUBPROCESS_SETUP_FUNC which can be a full path to a function. SETUP_PATH = '' SETUP_CALLABLE = '' +# Function to run on the start of a new worker subprocess +SUBPROCESS_SETUP_FUNC = '' + +# function to be run before the execution of every job +JOB_ENTRY_FUNC = '' +# function to be run after the execution of every job +JOB_EXIT_FUNC = '' # Time to wait after receiving SIGTERM to kill the workers in the jobmanager # forecfully KILL_GRACE_PERIOD = 300 diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py index 5e72f39..b8e4f3e 100644 --- a/eventmq/tests/test_worker.py +++ b/eventmq/tests/test_worker.py @@ -15,11 +15,19 @@ import logging import time +import sys +import mock from .. import worker -ADDR = 'inproc://pour_the_rice_in_the_thing' +if sys.version[0] == '2': + import Queue +else: + import queue as Queue + +ADDR = 'inproc://pour_the_rice_in_the_thing' +SETUP_SUCCESS_RETVAL = 'job setup success' def test_run_with_timeout(): payload = { @@ -33,11 +41,124 @@ def test_run_with_timeout(): assert msgid +@mock.patch('eventmq.worker.callable_from_name') +def test_run_job_setup_hook(callable_from_name_mock): + from eventmq import conf + + setup_func_str = 'eventmq.tests.test_worker.job_setup_hook' + callable_from_name_mock.return_value = mock.Mock() + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.JOB_ENTRY_FUNC = setup_func_str + worker._run(q, res_q, logging.getLogger()) + finally: + conf.JOB_ENTRY_FUNC = '' + + callable_from_name_mock.assert_called_with(setup_func_str) + assert callable_from_name_mock.return_value.call_count == 1 + + +@mock.patch('eventmq.worker.callable_from_name') +def test_run_job_teardown_hook(callable_from_name_mock): + from eventmq import conf + + teardown_func_str = 'eventmq.tests.test_worker.job_teardown_hook' + callable_from_name_mock.return_value = mock.Mock() + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.JOB_EXIT_FUNC = teardown_func_str + worker._run(q, res_q, logging.getLogger()) + finally: + conf.JOB_EXIT_FUNC = '' + + callable_from_name_mock.assert_called_with(teardown_func_str) + assert callable_from_name_mock.return_value.call_count == 1 + + +@mock.patch('eventmq.worker.callable_from_name') +def test_run_subprocess_setup_func(callable_from_name_mock): + from eventmq import conf + + setup_func_str = 'eventmq.tests.test_worker.process_setup_hook' + callable_from_name_mock.return_value = mock.Mock() + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.SUBPROCESS_SETUP_FUNC = setup_func_str + worker._run(q, res_q, logging.getLogger()) + finally: + conf.SUBPROCESS_SETUP_FUNC = '' + + callable_from_name_mock.assert_called_with(setup_func_str) + assert callable_from_name_mock.return_value.call_count == 1 + + +@mock.patch('eventmq.worker.run_setup') +def test_run_run_setup_func(run_setup_mock): + from eventmq import conf + + setup_func_path = 'eventmq.tests.test_worker' + setup_func_callable = 'process_setup_hook' + + payload = { + 'path': 'eventmq.tests.test_worker', + 'callable': 'job', + 'args': [2] + } + + q, res_q = Queue.Queue(), Queue.Queue() + + q.put(payload) + q.put('DONE') + + try: + conf.SETUP_PATH = setup_func_path + conf.SETUP_CALLABLE = setup_func_callable + worker._run(q, res_q, logging.getLogger()) + finally: + conf.SETUP_PATH = '' + conf.SETUP_CALLABLE = '' + + run_setup_mock.assert_called_with(setup_func_path, setup_func_callable) + + def test_run_setup(): - setup_callable = 'pre_hook' + setup_callable = 'process_setup_hook' setup_path = 'eventmq.tests.test_worker' - worker.run_setup(setup_path, setup_callable) + assert worker.run_setup(setup_path, setup_callable) def job(sleep_time=0): @@ -46,9 +167,16 @@ def job(sleep_time=0): return True -def pre_hook(): - return 1 +def process_setup_hook(): + print 'process setup hook executed' + return True -def post_hook(): - return 1 +def job_setup_hook(): + print 'job setup hook executed' + return SETUP_SUCCESS_RETVAL + + +def job_teardown_hook(): + print 'job teardown hook executed' + return True diff --git a/eventmq/worker.py b/eventmq/worker.py index 1621578..aa03284 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py @@ -18,16 +18,13 @@ Defines different short-lived workers that execute jobs """ from importlib import import_module - import logging - from multiprocessing import Process - import os import sys - from threading import Thread +from .utils.functions import callable_from_name from . import conf if sys.version[0] == '2': @@ -180,7 +177,19 @@ def _run(queue, result_queue, logger): "class_kwargs": {"value": 2} } """ - if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): + if any(conf.SUBPROCESS_SETUP_FUNC): + try: + logger.debug("Running setup ({}) for worker id {}".format( + conf.SUBPROCESS_SETUP_FUNC, os.getpid())) + setup_func = callable_from_name(conf.SUBPROCESS_SETUP_FUNC) + setup_func() + except Exception as e: + logger.warning('Unable to do setup task ({}): {}' + .format(conf.SUBPROCESS_SETUP_FUNC, str(e))) + + elif any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): + logger.warning("SETUP_CALLABLE and SETUP_PATH deprecated in favor for " + "SUBPROCESS_SETUP_FUNC") try: logger.debug("Running setup ({}.{}) for worker id {}" .format( @@ -193,6 +202,16 @@ def _run(queue, result_queue, logger): .format(conf.SETUP_PATH, conf.SETUP_CALLABLE, str(e))) + if conf.JOB_ENTRY_FUNC: + job_entry_func = callable_from_name(conf.JOB_ENTRY_FUNC) + else: + job_entry_func = None + + if conf.JOB_EXIT_FUNC: + job_exit_func = callable_from_name(conf.JOB_EXIT_FUNC) + else: + job_exit_func = None + while True: # Blocking get so we don't spin cycles reading over and over try: @@ -204,7 +223,14 @@ def _run(queue, result_queue, logger): if payload == 'DONE': break + if job_entry_func: + job_entry_func() + return_val = _run_job(payload, logger) + + if job_exit_func: + job_exit_func() + # Signal that we're done with this job and put its return value on the # result queue result_queue.put(return_val) @@ -278,4 +304,4 @@ def run_setup(setup_path, setup_callable): setup_callable_ = getattr(setup_package, setup_callable) - setup_callable_() + return setup_callable_() diff --git a/setup.py b/setup.py index 64afbd9..9bacfbf 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ from setuptools import find_packages, setup setup( name='eventmq', - version='0.3.5.6', + version='0.3.6', description='EventMQ job execution and messaging system based on ZeroMQ', packages=find_packages(), install_requires=['pyzmq==15.4.0', -- cgit v1.2.1