Airflow not planning correctly Python

The code:

Python version 2.7.x and version 1.5.1 airflow

my script is

from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'schedule_interval':timedelta(minutes=5), 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing', default_args=default_args) run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first) 

From this it is clear that I am creating a database availability group with 6 tasks, the first task (Start1) is launched first, after which all other five tasks are started.

I have currently given a 5 minute delay between starting DAG

He completed all six tasks of the first type perfectly, but after five minutes the database availability group did not restart

More than 1 hour has passed, but the DAG has not been re-initiated. I really don't know where I was wrong.

It would be very nice if someone could tell me what is wrong. I tried to clear using airflow testing clear then the same thing would happen. He launched the first instance, and then just stood there.

The only thing the command line shows is Getting all instance for DAG testing

When I change the schedule_interval position, it just starts without any parallel schedule interval. This happens in 5 minutes. 300 or more task instances completed. No 5 minute interval

Code 2:

 from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first) 
+11
python airflow
source share
2 answers

For code 2, I think the reason it starts every minute:

  • Start time - 2015-10-13 00:00

  • Schedule interval is 5 minutes

  • Each scheduler heartbeat (default 5 seconds), your DAG will be checked

    • First check: start date (no last due date) + scheduler interval <Current time? If so, the DAG will be executed and the last recording time will be recorded. (e.g. 2015-10-13 00:00 + 5 min <current?)
    • The second check of the following beating: last run time + scheduler interval <Current time? If so, the DAG will be executed again.
    • ....

The solution is set to start_date DAG as datetime.now() - schedule_interval .

And also if you want to debug:

  • Setting LOGGINGLEVEL to debug in settings.py

  • Change the class method is_queueable() airflow.models.TaskInstance to

:

 def is_queueable(self, flag_upstream_failed=False): logging.debug('Checking whether task instance is queueable or not!') if self.execution_date > datetime.now() - self.task.schedule_interval: logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now())) return False ... 
+6
source share

Since the start time (2015-10-13 00:00) is less than now, it starts the backfill stream. It will work from 2015-10-13 00:00, when an air flow scheduler (its start date) is detected every second, but the completion date is from 5 minutes (task interval time).

See log name:

 $tree airflow/logs/testing/ testing/ |-- Orders10 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders11 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders12 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders13 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders14 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 -- Start1 |-- 2015-10-13T00:00:00 |-- 2015-10-13T00:05:00 |-- 2015-10-13T00:10:00 -- 2015-10-13T00:15:00 

See log creation time:

 $ll airflow/logs/testing/Start1 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00 

In addition, you can see task instances in the web interface:

Air Flow Task Instances

+3
source share

All Articles