We delete task logs by implementing our own FileTaskHandler and then pointing to it in airflow.cfg . Thus, we overwrite the default LogHandler to save only N task logs, without planning additional database availability groups.
We use Airflow==1.10.1 .
[core] logging_config_class = log_config.LOGGING_CONFIG
log_config.LOGGING_CONFIG
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}' FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log' LOGGING_CONFIG = { 'formatters': {}, 'handlers': { '...': {}, 'task': { 'class': 'file_task_handler.FileTaskRotationHandler', 'formatter': 'airflow.job', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'filename_template': FILENAME_TEMPLATE, 'folder_task_template': FOLDER_TASK_TEMPLATE, 'retention': 20 }, '...': {} }, 'loggers': { 'airflow.task': { 'handlers': ['task'], 'level': JOB_LOG_LEVEL, 'propagate': False, }, 'airflow.task_runner': { 'handlers': ['task'], 'level': LOG_LEVEL, 'propagate': True, }, '...': {} } }
file_task_handler.FileTaskRotationHandler
import os import shutil from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler class FileTaskRotationHandler(FileTaskHandler): def __init__(self, base_log_folder, filename_template, folder_task_template, retention): """ :param base_log_folder: Base log folder to place logs. :param filename_template: template filename string. :param folder_task_template: template folder task path. :param retention: Number of folder logs to keep """ super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template) self.retention = retention self.folder_task_template, self.folder_task_template_jinja_template = \ parse_template_string(folder_task_template) @staticmethod def _get_directories(path='.'): return next(os.walk(path))[1] def _render_folder_task_path(self, ti): if self.folder_task_template_jinja_template: jinja_context = ti.get_template_context() return self.folder_task_template_jinja_template.render(**jinja_context) return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id) def _init_file(self, ti): relative_path = self._render_folder_task_path(ti) folder_task_path = os.path.join(self.local_base, relative_path) subfolders = self._get_directories(folder_task_path) to_remove = set(subfolders) - set(subfolders[-self.retention:]) for dir_to_remove in to_remove: full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove) print('Removing', full_dir_to_remove) shutil.rmtree(full_dir_to_remove) return FileTaskHandler._init_file(self, ti)