The code:
Python version 2.7.x and version 1.5.1 airflow
my script is
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'schedule_interval':timedelta(minutes=5), 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing', default_args=default_args) run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first)
From this it is clear that I am creating a database availability group with 6 tasks, the first task (Start1) is launched first, after which all other five tasks are started.
I have currently given a 5 minute delay between starting DAG
He completed all six tasks of the first type perfectly, but after five minutes the database availability group did not restart
More than 1 hour has passed, but the DAG has not been re-initiated. I really don't know where I was wrong.
It would be very nice if someone could tell me what is wrong. I tried to clear using airflow testing clear then the same thing would happen. He launched the first instance, and then just stood there.
The only thing the command line shows is Getting all instance for DAG testing
When I change the schedule_interval position, it just starts without any parallel schedule interval. This happens in 5 minutes. 300 or more task instances completed. No 5 minute interval
Code 2:
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first)