Abstract
In this article, we will discuss how to use the Airflow FileSensor to trigger an ETL process. We will walk through the process of setting up a FileSensor in Airflow and using it to monitor a directory for new files. Once a new file is detected, the ETL process will be triggered automatically. This can be a useful technique for automating data processing tasks that rely on the availability of new files.
Introduction
Airflow is a popular open-source platform for orchestrating complex data workflows. It allows users to define, schedule, and monitor workflows as directed acyclic graphs (DAGs). Airflow provides a wide range of operators that can be used to perform various tasks, such as executing SQL queries, transferring files, and sending emails.
One common use case for Airflow is to automate ETL (Extract, Transform, Load) processes. ETL processes involve extracting data from various sources, transforming it into a usable format, and loading it into a data warehouse or other storage system. Airflow provides operators that can be used to perform each step of the ETL process, making it easy to build and schedule complex data pipelines.
In this article, we will focus on the Extract step of the ETL process and discuss how to use the Airflow FileSensor to trigger an ETL process when new files become available.
Example Use Case
Suppose we have a source system that generates CSV files on a scheduled basis. We want to build an ETL process that reads these CSV files, transforms the data, and loads it into a database. To automate this process, we can use the Airflow FileSensor to monitor a directory for changes and trigger the ETL process.
To signal the availability of new files, we can program source system to add a new file to the directory after new CSV files are created. The FileSensor will detect the presence of the new file and trigger the ETL process automatically.
Setting up the DAG
Trigger DAG can be set by creating a new Python file in the dags
directory of your Airflow installation. For example, you can create a new file called reports_trigger.py
with the following content:
import pendulum
from Airflow.models.dag import DAG
from Airflow.operators.bash import BashOperator
from Airflow.sensors.filesystem import FileSensor
from Airflow.operators.trigger_dagrun import TriggerDagRunOperator
# Where to look for the file
= "/mnt/Reports/READY"
FILEPATH # The DAG to trigger
= "reports_uploader"
DAG_ID # Central European Time
= "CET"
TZ
with DAG(
="reports_trigger",
dag_id="Monitors the appearance of a file and starts a DAG",
description# Every 2 hours
="5 */2 * * *",
schedule=pendulum.datetime(2024, 11, 5, 0, 0, 0, tz=TZ),
start_date# Don't run backfill
=False,
catchup=["trigger"],
tags# Only one run at a time
=1,
max_active_runsas dag:
)
= FileSensor(
check_flag ="check_flag",
task_id=FILEPATH,
filepath
)
= BashOperator(
remove_flag ="remove_flag",
task_id=f"rm -f {FILEPATH}",
bash_command
)
= TriggerDagRunOperator(
trigger_dag ="trigger_dag",
task_id=DAG_ID,
trigger_dag_id=pendulum.now().add(seconds=5),
logical_date
)
# Set the order of the tasks
>> remove_flag >> trigger_dag check_flag
In this example, we define a new DAG called reports_trigger
that monitors the appearance of a file READY
in the /mnt/Reports
directory. When a new file is detected, the next task remove_flag
deletes it, and another DAG called reports_uploader
is triggered for execution.
Setting up the source system
The source system should be programmed to create a new file named READY
in the /mnt/Reports
directory after new CSV files are created. There is a variety of ways to achieve this, depending on the source system and the tools available.
If the source system is a Windows machine, you can create a batch file that creates the READY
file and schedule it to run after the CSV files are generated. For example, you can create a batch file called create_flag.cmd
with the following content:
echo > C:\Users\admin\Reports\%1
The argument would be the name of the file to create, READY
in this case. If you use the Windows Task Scheduler, add new action to run the batch file with the argument READY
.
If the source system is a Linux machine, you can create file using the touch
command:
touch /mnt/Reports/READY
Setting up the ETL Process
The ETL process is defined within a distinct DAG named reports_uploader
, located in a separate Python file in the dags
directory of the Airflow installation.
Conclusion
In this article, we discussed how to use the Airflow FileSensor to trigger an ETL process when new files become available and how to set up a DAG to monitor a directory for changes. We also discussed how to program the source system to create a new file to signal the availability of new data.
I hope this article has been helpful in understanding how to use the Airflow FileSensor for triggering ETL processes. If you have any further questions or comments, please feel free to leave them int the comments section.