Using Airflow FileSensor for Triggering ETL Process

ETL
Airflow
Author

Aleksei

Published

November 5, 2024

Abstract

In this article, we will discuss how to use the Airflow FileSensor to trigger an ETL process. We will walk through the process of setting up a FileSensor in Airflow and using it to monitor a directory for new files. Once a new file is detected, the ETL process will be triggered automatically. This can be a useful technique for automating data processing tasks that rely on the availability of new files.

Introduction

Airflow is a popular open-source platform for orchestrating complex data workflows. It allows users to define, schedule, and monitor workflows as directed acyclic graphs (DAGs). Airflow provides a wide range of operators that can be used to perform various tasks, such as executing SQL queries, transferring files, and sending emails.

Apache Airflow Logo

Apache Airflow Logo

One common use case for Airflow is to automate ETL (Extract, Transform, Load) processes. ETL processes involve extracting data from various sources, transforming it into a usable format, and loading it into a data warehouse or other storage system. Airflow provides operators that can be used to perform each step of the ETL process, making it easy to build and schedule complex data pipelines.

In this article, we will focus on the Extract step of the ETL process and discuss how to use the Airflow FileSensor to trigger an ETL process when new files become available.

Example Use Case

Suppose we have a source system that generates CSV files on a scheduled basis. We want to build an ETL process that reads these CSV files, transforms the data, and loads it into a database. To automate this process, we can use the Airflow FileSensor to monitor a directory for changes and trigger the ETL process.

To signal the availability of new files, we can program source system to add a new file to the directory after new CSV files are created. The FileSensor will detect the presence of the new file and trigger the ETL process automatically.

Setting up the DAG

Trigger DAG can be set by creating a new Python file in the dags directory of your Airflow installation. For example, you can create a new file called reports_trigger.py with the following content:

import pendulum

from Airflow.models.dag import DAG
from Airflow.operators.bash import BashOperator

from Airflow.sensors.filesystem import FileSensor
from Airflow.operators.trigger_dagrun import TriggerDagRunOperator

# Where to look for the file
FILEPATH = "/mnt/Reports/READY"
# The DAG to trigger
DAG_ID = "reports_uploader"
# Central European Time
TZ = "CET"

with DAG(
    dag_id="reports_trigger",
    description="Monitors the appearance of a file and starts a DAG",
    # Every 2 hours
    schedule="5 */2 * * *",
    start_date=pendulum.datetime(2024, 11, 5, 0, 0, 0, tz=TZ),
    # Don't run backfill
    catchup=False,
    tags=["trigger"],
    # Only one run at a time
    max_active_runs=1,
) as dag:

    check_flag = FileSensor(
        task_id="check_flag",
        filepath=FILEPATH,
    )

    remove_flag = BashOperator(
        task_id="remove_flag",
        bash_command=f"rm -f {FILEPATH}",
    )

    trigger_dag = TriggerDagRunOperator(
        task_id="trigger_dag",
        trigger_dag_id=DAG_ID,
        logical_date=pendulum.now().add(seconds=5),
    )
    
    # Set the order of the tasks
    check_flag >> remove_flag >> trigger_dag

In this example, we define a new DAG called reports_trigger that monitors the appearance of a file READY in the /mnt/Reports directory. When a new file is detected, the next task remove_flag deletes it, and another DAG called reports_uploader is triggered for execution.

DAG in Graph View

DAG in Graph View

Setting up the source system

The source system should be programmed to create a new file named READY in the /mnt/Reports directory after new CSV files are created. There is a variety of ways to achieve this, depending on the source system and the tools available.

If the source system is a Windows machine, you can create a batch file that creates the READY file and schedule it to run after the CSV files are generated. For example, you can create a batch file called create_flag.cmd with the following content:

echo > C:\Users\admin\Reports\%1

The argument would be the name of the file to create, READY in this case. If you use the Windows Task Scheduler, add new action to run the batch file with the argument READY.

If the source system is a Linux machine, you can create file using the touch command:

touch /mnt/Reports/READY

Setting up the ETL Process

The ETL process is defined within a distinct DAG named reports_uploader, located in a separate Python file in the dags directory of the Airflow installation.

Conclusion

In this article, we discussed how to use the Airflow FileSensor to trigger an ETL process when new files become available and how to set up a DAG to monitor a directory for changes. We also discussed how to program the source system to create a new file to signal the availability of new data.

I hope this article has been helpful in understanding how to use the Airflow FileSensor for triggering ETL processes. If you have any further questions or comments, please feel free to leave them int the comments section.