Integration testing multiple celery workers and DB Django API

I work with a software-oriented architecture in which several celery workers work (call them worker1 , worker2 and worker3 ). All three employees are separate entities (i.e., separate code bases, separate repositories, separate celery instances, separate machines), and not one of them is associated with the Django application.

Communicating with each of these three employees is Django, based on the MySQL RESTful API.

In development, these services are located in a wandering field, each of which acts as a separate machine operating from a separate port. We have one RabbitMQ broker for all Celery tasks.

A typical path through these services might look something like this: worker1 receives a message from the device, does some processing, puts the task on worker2 , which does further processing and does POST for the API , which is written to the MySQL database and runs the task on worker3 , which does some other processing and makes another POST for the API , which results in a MySQL record.

The services communicate well, but it’s very annoying to test this stream every time we make changes to any service. I really want to get full integration tests (i.e. starting with a message sent to worker1 and going through the whole chain), but I'm not sure where to start. The main issues I am facing are as follows:

If I focus on something on worker1 , how can I tell when the whole thread has ended? How can I make informed statements about the results when I don’t know if the results have been achieved?

How do I work with installing / disabling a database? I want to delete all entries made during the test at the end of each test, but if I start the test from outside the Django application, I’m not sure how to clean it effectively. Manually deleting it and re-creating it after each test, it looks like it could be too much overhead.

+7
python django integration celery
source share
2 answers

Celery allows you to run a task synchronously, so the first step: divide the entire stream into separate tasks, fake requests and approve the results:

Source Stream:

 device --- worker1 --- worker2 --- django --- worker3 --- django 

Level 1 integration tests:

 1. |- worker1 -| 2. |- worker2 -| 3. |- django -| 4. |- worker3 -| 5. |- django -| 

For each test, create a fake query or synchronous call and validate the results. Put these tests in the appropriate repository. For example, in the test for worker1, you can make fun of work2 and verify that it was called with the correct arguments. Then, in another test, you call work2 and mock request to verify that it is calling the right API. And so on.

Testing the entire thread will be difficult, since all tasks are separate objects. The only way I came across now is to make one fake call to worker1, set a reasonable timeout and wait for the final result in the database. This test only tells you if it works or not. This will not show you where the problem is.

+3
source share

To work with full customization, you can customize the backend of celery use results. See the basics documentation. "Next steps . "

worker1 can then report the task handle of what it passed to worker2 . The result returned by worker2 will be the task identifier of what it switched to worker3 . And the result returned by worker3 means the whole sequence is complete and you can check the results. The results may also already report an interesting bit of these results right away to facilitate verification.

It might look something like this in Celery:

 worker1_result = mytask.delay(someargs) # executed by worker1 worker2_result = worker1_result.get() # waits for worker1 to finish worker3_result = worker2_result.get() # waits for worker2 to finish outcome = worker3_result.get() # waits for worker3 to finish 

(The details probably should be different, I haven’t used it myself yet. I’m not sure that the results of the task are serializable and therefore themselves fit as return values ​​of the task function.)

0
source share

All Articles