Python / rq - employee status monitoring

If this is an idiotic question, I apologize and will hide my head from shame, but:

I use rq to queue in Python. I want it to work as follows:

  • Job A starts. Job A captures data through the web API and saves it.
  • Work in progress A.
  • Task A is completed.
  • Upon completion of A, work B starts. Task B checks each record stored in task A and adds some additional response data.
  • Upon completion of task B, the user receives a happy letter stating that the report is ready.

My code is:

redis_conn = Redis() use_connection(redis_conn) q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later w = Worker(q) job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid) w.work() 

I suggested that my best solution is to have 2 workers, one for job A and one for B. Job B worker could control job A and, when job A was completed, start job B.

What I cannot understand in order to save my life is how I force one employee to control the status of another. I can get the job id from job A with job.id. I can capture the name of the worker using w.name. But I have nothing stupid, how to transfer any of this information to another employee.

Or, is there a much simpler way to do this that I am completely missing?

+6
source share
4 answers

You probably go too deep into your project to switch, but if not, check out Twisted . http://twistedmatrix.com/trac/ I am using it right now for a project that hits APIs, web content scratches, etc. It performs several tasks in parallel, as well as organizing certain tasks, so that Job B is not executed until task A.

This is the best tutorial for learning Twisted if you want to try it. http://krondo.com/?page_id=1327

0
source

Update januari 2015 , this transfer request is now merged, and the parameter is renamed depends_on , that is:

 second_job = q.enqueue(email_customer, depends_on=first_job) 

The source code remains unchanged for people working in older versions, and such:

I sent a transfer request ( https://github.com/nvie/rq/pull/207 ) to handle job dependencies in RQ. When this pull request is combined, you can:

 def generate_report(): pass def email_customer(): pass first_job = q.enqueue(generate_report) second_job = q.enqueue(email_customer, after=first_job) # In the second enqueue call, job is created, # but only moved into queue after first_job finishes 

Currently, I am proposing to write a wrapper function for sequentially launching your tasks. For instance:

 def generate_report(): pass def email_customer(): pass def generate_report_and_email(): generate_report() email_customer() # You can also enqueue this function, if you really want to # Somewhere else q.enqueue(generate_report_and_email) 
+6
source

From this page in rq docs, it looks like every job object has a result attribute called by job.result , which you can check. If the task is not finished yet, it will be None , but if you make sure that your task will return some value (even just "Done" ), you can check that your other employee has completed the result of the first task, and then start working only when job.result matters, that is, the first job was completed.

+2
source

Combine the things that are done by task A and task B in one function, and then use, for example, multiprocessing.Pool (this is the map_async method) to control this process in different processes.

I am not familiar with rq, but multiprocessing is part of the standard library. By default, it uses as many processes as your processor has cores, which, it seems to me, are usually sufficient to saturate the machine.

0
source

Source: https://habr.com/ru/post/923595/


All Articles