In memory broker for celery:

Typical Celery setups involve an additional running service(redis, rabbitmq, etc). to recieve incoming tasks. These services when run locally need to bind to a port and be running as long as you want to run celery tasks. I am converting a traditional flask project, one that uses postgresql, nginx, redis, and uwsgi to be able to run locally with no non python dependencies in a single executable while using the same codebase. I am hoping to be able to allow both modes of operation (as a traditional server and as a local service) through configuration changes alone.

Here we look at trying to remove redis as a dependency for asynchronous task execution.. The messaging library used by Celery, Kombu, has an option to use memory as the broker as opposed to redis. This is not documented very well in the main Celery documentation. The main use of using memory as the broker is for unit testing tasks, where we might not want to setup a whole nother service just for running a projects tests.


>>> from celery import Celery
>>> x = Celery('tasks', broker='memory://', backend='file:///tmp/celery')
>>> x.broker_connection()
<Connection: memory://localhost// at 0x7f613f70a940>
>>> x.backend
<celery.backends.filesystem.FilesystemBackend object at 0x7f613f76fdd8>

We can confugure and set the memory to be the broker no problem. Our Celery object will dump tasks that we run into some part of memory where they will sit waiting for a consumer service that we never started. If we start a celery worker from the command line along the lines of 'Celery -A app -b memory://' we will see the usual start up logs but it will not consume anything our producer puts into the queue. This is becuase arbitrary seperate processes cannot read each others memory. There are two solutions to this problem.

The first is a configuration setting, 'task_always_eager=True'. From the documentation:

"If this is True, all tasks will be executed locally by blocking until the task returns. apply_async() and Task.delay() will return an EagerResult instance, that emulates the API and behavior of AsyncResult, except the result is already evaluated.
That is, tasks will be executed locally instead of being sent to the queue."

Great, we don't need to start a service to consume and execute tasks. The caveats, this is no longer asynchronous since our main process is now responsible for execution of the task functions. Because our main process must execute the funcitons, the functions must also be available to our process. This means the send_task method of submitting tasks will not work. If you have 'task_always_eager' enabled and call send_task celery will throw an error:

"RuntimeError: Cannot retrieve result with task_always_eager enabled"

The work around is to switch to using apply_async, which may involve some code rorginization to allow the file that calls apply_async to import the task directly.

There second way to use an in memory broker is to start up a celery worker under the same process that submits the tasks. Celery offers a python function to start a worker. If we call this function directly it will block the execution of our script until the Celery worker exits, so we start it in another thread:

import threading
from celery.bin import worker
from tasks import app

wrkr = worker.worker(app)
wrkr_thread = threading.Thread(target=wrkr.run)
wrkr_thread.start()

Now we have Celery worker, and a set of tasks that can both read to and write from the same memory since they are in the same process, and will execute code asynchronously if "task_alaways_eager=False". With the test scripts I was using I ran into the following issue when calling apply_async:

"RuntimeError: Never call result.get() within a task!"
http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

The not recommended workaround was to pass "disable_sync_subtasks=False" to the results 'get()' function. There might be away to avoid this error while threading the celery worker, but I don't know what it is.

None of this makes any sense to do unless you have something like the constraints I outlined at the beggining. I have an existing codebase that needs to be able to use Celery in the traditional way, but in a different environment won't have access to Redis, and we want the code to work in both environments with minimal changes.