It is not possible to change the DAG during its execution (without a lot of work).
The scheduler picks up dag = DAG(... in a loop. It will contain the instance of the task 'python_operator' . This instance of the task will be scheduled during the execution of the dag and executed by the worker or executor. Since the DAG models in the Airflow database are updated only by the scheduler, these added fictitious the tasks will not be saved in the database availability group and are not scheduled to be launched. They will be forgotten when you exit the worker. If you do not copy all the code from the scheduler regarding saving and updating the model ... But this will be canceled but the next time the scheduler visits the DAG file for analysis, which can happen once a minute, once a second, or faster, depending on the number of other DAG files that need to be analyzed.
Airflow actually wants each DAG to stay roughly the same between runs. He also wants to constantly reload / analyze DAG files. Thus, although you could create a DAG file that, at each start, defines tasks dynamically based on some external data (preferably cached in a file or pyc module, rather than network I / O, like searching in a database, you slow down the whole planning cycle for of all accessibility groups for databases) this is not a good plan, because your schedule and tree structure are confused, and your search in the scheduler will be more burdensome.
You can make the caller run every task ...
def make_tasks(context): du1 = DummyOperator(task_id='dummy1', dag=dag) du2 = DummyOperator(task_id='dummy2', dag=dag) du3 = DummyOperator(task_id='dummy3', dag=dag) du1.execute(context) du2.execute(context) du3.execute(context) p = PythonOperator( provides_context=true,
But this is consistent, and you need to figure out how to use python to make them parallel (use futures?), And if an exception occurs, the whole task will fail. He is also tied to one contractor or employee, therefore, does not use the distribution of air flow tasks (kubernetes, mesos, celery).
Another way to work with this is to add a fixed number of tasks (maximum number) and use the called elements to short-circuit unnecessary tasks or put forward arguments with xcom for each of them, changing their behavior at runtime. but without changing the DAG.
dlamblin
source share