Delete Airflow Task Logs

I am launching 5 DAGs that over a month generated a total of about 6 GB of log data in base_log_folder . I just added remote_base_log_folder , but it doesn't seem to exclude the entry in base_log_folder .

Is there a way to automatically delete old log files, rotate them, or cause the airflow not to register on disk (base_log_folder) only in remote storage?

+18
airflow
source share
5 answers

Please refer to https://github.com/teamclairvoyant/airflow-maintenance-dags

There are DAG groups in this plugin that can kill stopped tasks and clear the log. You can take advantage of the concepts and come up with a new DAG that can clean up as per your requirement.

+10
source share

We delete task logs by implementing our own FileTaskHandler and then pointing to it in airflow.cfg . Thus, we overwrite the default LogHandler to save only N task logs, without planning additional database availability groups.

We use Airflow==1.10.1 .

 [core] logging_config_class = log_config.LOGGING_CONFIG 

log_config.LOGGING_CONFIG

 BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}' FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log' LOGGING_CONFIG = { 'formatters': {}, 'handlers': { '...': {}, 'task': { 'class': 'file_task_handler.FileTaskRotationHandler', 'formatter': 'airflow.job', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'filename_template': FILENAME_TEMPLATE, 'folder_task_template': FOLDER_TASK_TEMPLATE, 'retention': 20 }, '...': {} }, 'loggers': { 'airflow.task': { 'handlers': ['task'], 'level': JOB_LOG_LEVEL, 'propagate': False, }, 'airflow.task_runner': { 'handlers': ['task'], 'level': LOG_LEVEL, 'propagate': True, }, '...': {} } } 

file_task_handler.FileTaskRotationHandler

 import os import shutil from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler class FileTaskRotationHandler(FileTaskHandler): def __init__(self, base_log_folder, filename_template, folder_task_template, retention): """ :param base_log_folder: Base log folder to place logs. :param filename_template: template filename string. :param folder_task_template: template folder task path. :param retention: Number of folder logs to keep """ super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template) self.retention = retention self.folder_task_template, self.folder_task_template_jinja_template = \ parse_template_string(folder_task_template) @staticmethod def _get_directories(path='.'): return next(os.walk(path))[1] def _render_folder_task_path(self, ti): if self.folder_task_template_jinja_template: jinja_context = ti.get_template_context() return self.folder_task_template_jinja_template.render(**jinja_context) return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id) def _init_file(self, ti): relative_path = self._render_folder_task_path(ti) folder_task_path = os.path.join(self.local_base, relative_path) subfolders = self._get_directories(folder_task_path) to_remove = set(subfolders) - set(subfolders[-self.retention:]) for dir_to_remove in to_remove: full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove) print('Removing', full_dir_to_remove) shutil.rmtree(full_dir_to_remove) return FileTaskHandler._init_file(self, ti) 
+2
source share

Airflow proponents do not believe that truncated logs are part of the core logic of the airflow to see this , and then in this issue, the accompanying ones propose changing LOG_LEVEL to avoid too much log data.

And in this PR, we can learn how to change the log level in airflow.cfg .

good luck.

+1
source share

I donโ€™t think there is a rotation mechanism, but you can store them in S3 or Google Cloud Storage, as described here: https://airflow.incubator.apache.org/configuration.html#logs

0
source share

France's answer on Airflow 1.10 is correct, I just can't add comments.

One caveat: because of the way logging, multiprocessing, and the default Airflow handlers interact, it is safer to override handler methods than extend them by calling super () in the handler derived class. Because Airflow handlers do not use locks by default

0
source share

All Articles