Airflow - a long-running task in SubDag, flagged as failed after an hour

I have a SubDAG in the airflow with a long step (usually about 2 hours, although it varies depending on which unit is running). In section 1.7.1.3, this step will invoke AIRFLOW-736 sequentially , and SubDAG will stop in the running state when all steps have been successful. We could get around this because we had no steps after SubDAG, manually noting that SubDagOperator was successful (and not running) in the database.

Now we are testing Airflow 1.8.1, updating the following actions:

  • Disabling our scheduler and workers
  • Via pip, removing airflow and installing apache-airflow (version 1.8.1)
  • Production Stream Updatedb
  • Launching the airflow planner and workers

If the system is not otherwise touched, the same DAG will now fail 100% of the time after the long-term task reaches the 1-hour mark (although it is strange, but not exactly after 3600 seconds, it can be from 30 to 90 a few seconds after the hour) with the message "The instance of the Executor report instance is completed (failed), although the task says to start it. Was this task killed from the outside?". However, the task itself continues to work in the workplace. One way or another, there is an inconsistency between the scheduler, erroneous in the idea that the task failed (see this line jobs.py) based on the database, despite the actual task working fine.

, - " " task_instance . , , , , .

dag, :

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}

def define_sub(dag, step_name, sleeptime):
    op = BashOperator(
        task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
    )
    return dag

def gen_sub_dag(parent_name, step_name, sleeptime):
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
    define_sub(sub, step_name, sleeptime)
    return sub

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
+6
1

Celery Redis, - .

Celery -ack-late, . .

0

All Articles