Dynamic airflow tasks at runtime

Other questions about "dynamic tasks" seem to focus on dynamically building DAGs on a schedule or development time. I am interested in dynamically adding tasks to the DAG at runtime.

from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime dag = DAG('test_dag', description='a test', schedule_interval='0 0 * * *', start_date=datetime(2018, 1, 1), catchup=False) def make_tasks(): du1 = DummyOperator(task_id='dummy1', dag=dag) du2 = DummyOperator(task_id='dummy2', dag=dag) du3 = DummyOperator(task_id='dummy3', dag=dag) du1 >> du2 >> du3 p = PythonOperator( task_id='python_operator', dag=dag, python_callable=make_tasks) 

This naive implementation does not seem to work - dummy tasks never appear in the user interface.

What is the correct way to add new statements to the DAG at runtime? Is it possible?

+16
python airflow airflow-scheduler
source share
4 answers

It is not possible to change the DAG during its execution (without a lot of work).

The scheduler picks up dag = DAG(... in a loop. It will contain the instance of the task 'python_operator' . This instance of the task will be scheduled during the execution of the dag and executed by the worker or executor. Since the DAG models in the Airflow database are updated only by the scheduler, these added fictitious the tasks will not be saved in the database availability group and are not scheduled to be launched. They will be forgotten when you exit the worker. If you do not copy all the code from the scheduler regarding saving and updating the model ... But this will be canceled but the next time the scheduler visits the DAG file for analysis, which can happen once a minute, once a second, or faster, depending on the number of other DAG files that need to be analyzed.

Airflow actually wants each DAG to stay roughly the same between runs. He also wants to constantly reload / analyze DAG files. Thus, although you could create a DAG file that, at each start, defines tasks dynamically based on some external data (preferably cached in a file or pyc module, rather than network I / O, like searching in a database, you slow down the whole planning cycle for of all accessibility groups for databases) this is not a good plan, because your schedule and tree structure are confused, and your search in the scheduler will be more burdensome.

You can make the caller run every task ...

 def make_tasks(context): du1 = DummyOperator(task_id='dummy1', dag=dag) du2 = DummyOperator(task_id='dummy2', dag=dag) du3 = DummyOperator(task_id='dummy3', dag=dag) du1.execute(context) du2.execute(context) du3.execute(context) p = PythonOperator( provides_context=true, 

But this is consistent, and you need to figure out how to use python to make them parallel (use futures?), And if an exception occurs, the whole task will fail. He is also tied to one contractor or employee, therefore, does not use the distribution of air flow tasks (kubernetes, mesos, celery).

Another way to work with this is to add a fixed number of tasks (maximum number) and use the called elements to short-circuit unnecessary tasks or put forward arguments with xcom for each of them, changing their behavior at runtime. but without changing the DAG.

+4
source share

As for the sample code, you never call your function that registers your tasks in your database availability group.

To have some kind of dynamic tasks, you can have one statement that does something else depending on a state, or you can have several statements that can be skipped depending on a state using ShortCircuitOperator.

+1
source share

I appreciate all the work that everyone has done here, since I have the same task of creating dynamically structured database availability groups. I made enough mistakes not to use the software against its design. If I can’t check the entire run in the user interface and zoom in and out, I mainly use the airflow functions, which are the main reason I use it anyway. I can just write multiprocessor code inside the function and get it over with.

All of this suggests that my solution is to use a resource manager, such as a redirect lock, and have a database availability group that writes data to this resource manager about what to start, how to start, etc. .; and have other database availability groups or database availability groups that run at regular intervals, interrogating the resource manager, blocking them before starting, and deleting them at the end. Thus, at least I use the air flow, as expected, although its characteristics do not quite match my needs. I broke the problem into more specific pieces. The solutions are creative, but they contradict the design and are not tested by the developers. In particular, it is said to have fixed structured workflows. I can’t get around a code that is not verified and does not fit the design unless I rewrite the main airflow code and check myself. I understand that my solution brings difficulties with blocking and all this, but at least I know the limits of this.

+1
source share
-2
source share

All Articles