Prevent duplication of tasks in celery brokers

I want to create the following thread using celery \ api configuration:

  • Send TaskA (argB) Only if the celery queue does not have TaskA (argB) already waiting

Is it possible? as?

+4
source share
4 answers

I can't think of a way, but

  • Get all ongoing and scheduled tasks through celery inspect

  • Iterate through them to see if your task is.

check out this SO question how to take the first step.

luck

+2
source

- . (redis, memcached,/tmp, , ), . redis .

from redis import Redis

@app.task
def run_only_one_instance(params):
    try:
        sentinel =  Redis().incr("run_only_one_instance_sentinel")
        if sentinel == 1:
            #I am the legitimate running task
            perform_task()
        else:
            #Do you want to do something else on task duplicate?
            pass
        Redis().decr("run_only_one_instance_sentinel")
    except Exception as e:
        Redis().decr("run_only_one_instance_sentinel")
        # potentially log error with Sentry?
        # decrement the counter to insure tasks can run
        # or: raise e
+1

, , , , , srj. . , .

def is_task_active_or_registered(app, task_id):

    i = app.control.inspect()

    active_dict = i.active()
    scheduled_dict = i.scheduled()
    keys_set = set(active_dict.keys() + scheduled_dict.keys())
    tasks_ids_set = set()

    for _dict in [active_dict, scheduled_dict]:
        for k in keys_set:
            for task in _dict[k]:
                tasks_ids_set.add(task['id'])

    if task_id in tasks_ids_set:
        return True
    else:
        return False

, :

, celery-app, :

def check_task_can_not_run(task_id):
    return is_task_active_or_registered(app=celery, task_id=task_id)

, check_task_can_not_run(...) True.

+1

I ran into a similar problem. The beat made duplicates in my lineup. I wanted to use it expiresbut this function does not work properly https://github.com/celery/celery/issues/4300 .

So, here is a scheduler that checks if a task is queued (based on the name of the task).

# -*- coding: UTF-8 -*-
from __future__ import unicode_literals

import json
from heapq import heappop, heappush

from celery.beat import event_t
from celery.schedules import schedstate
from django_celery_beat.schedulers import DatabaseScheduler
from typing import List, Optional
from typing import TYPE_CHECKING

from your_project import celery_app

if TYPE_CHECKING:
    from celery.beat import ScheduleEntry


def is_task_in_queue(task, queue_name=None):
    # type: (str, Optional[str]) -> bool
    queues = [queue_name] if queue_name else celery_app.amqp.queues.keys()

    for queue in queues:
        if task in get_celery_queue_tasks(queue):
            return True
    return False


def get_celery_queue_tasks(queue_name):
    # type: (str) -> List[str]
    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        task = j['headers']['task']
        if task not in decoded_tasks:
            decoded_tasks.append(task)

    return decoded_tasks


class SmartScheduler(DatabaseScheduler):
    """
    Smart means that prevents duplicating of tasks in queues.
    """
    def is_due(self, entry):
        # type: (ScheduleEntry) -> schedstate
        is_due, next_time_to_run = entry.is_due()

        if (
            not is_due or  # duplicate wouldn't be created
            not is_task_in_queue(entry.task)  # not in queue so let it run
        ):
            return schedstate(is_due, next_time_to_run)

        # Task should be run (is_due) and it is present in queue (is_task_in_queue)

        H = self._heap
        if not H:
            return schedstate(False, self.max_interval)

        event = H[0]
        verify = heappop(H)
        if verify is event:
            next_entry = self.reserve(entry)
            heappush(H, event_t(self._when(next_entry, next_time_to_run), event[1], next_entry))
        else:
            heappush(H, verify)
            next_time_to_run = min(verify[0], next_time_to_run)

        return schedstate(False, min(next_time_to_run, self.max_interval))
0
source

All Articles