Airflow DAGs: Creation, Operators, and Dependencies
Creating Your First Airflow DAG in Google Cloud Composer
The previous video covered creating a Composer environment and a walkthrough of the Airflow UI, including the default Airflow monitoring DAG.
This video focuses on creating basic DAGs, which serves as a foundation for understanding more complex DAGs and data pipelines in future videos.
Key topics include understanding how to create a DAG, the role of operators in Airflow, and how to utilize these operators effectively.
Steps to Create a DAG in Google Cloud Composer
Creating a DAG involves a few simple steps:
Select Apache Airflow Operators: Choose operators based on your specific use case or requirements. These can be related to Google Cloud, AWS, GitHub, Python, or bash commands.
Define Tasks and Dependencies: Use operators to create individual tasks within your DAG. Clearly define the dependencies between these tasks to establish the order of execution.
Create a DAG Definition File: Write your DAG logic in a Python file (e.g.,
my_dag.py).Upload to Cloud Storage Bucket: For Google Cloud Composer, upload the
.pyfile to the designateddagsfolder within the Cloud Storage bucket that was created when the Composer environment was set up.If using Airflow on a VM instance or Linux server, you would upload the file to the local
dagsfolder on the server. The exact path of this folder can be found in theairflow.configfile.
DAG Processing: Once uploaded, the file is processed by the DAG processor. If the syntax is correct, the DAG will be parsed, scheduled, and then become available on the Airflow web server.
Cloud Composer handles DAG parsing, scheduling, execution, error handling, retries, monitoring, and logging automatically.
Understanding Airflow Operators
Definition: Airflow operators act as workers or middlemen that perform specific functions or tasks within an Airflow DAG.
Purpose: They instruct Airflow on what actions need to be carried out, such as running a Python script, executing an SQL query, or transferring data between cloud services.
Automation: Operators are essential for automating processes and abstracting complex code into reusable components.
Benefits of Operators:
They provide predefined code templates, allowing users to achieve complex operations by simply passing parameters (e.g., a SQL query for a
MySqlOperator, or bucket and file names for aGoogleCloudStorageToBigQueryOperator).This significantly reduces the amount of boilerplate code compared to implementing the same logic with plain Python.
Task and Pipeline Creation: Each operator creates a task. Combining multiple tasks using operators allows you to build a comprehensive data pipeline.
Provider-Specific Operators: Airflow provides operators tailored for various platforms and services:
PythonOperator: Executes a standard Python callable (function).BashOperator: Runs a bash command.MySQLOperator: Executes SQL queries on a MySQL database.Google Cloud Operators (GCP): A wide range of operators for Google Cloud services, such as
GoogleCloudStorageToBigQueryOperatorfor moving data from Google Cloud Storage (GCS) to BigQuery.AWS Operators: Operators for various Amazon Web Services (AWS).
HTTP Operator, Email Operator, etc.
Practical Demonstration: Creating Basic DAGs
1. Initial Setup and Basic DAG Creation
All DAGs are written in Python, so the file name must end with the
.pyextension (e.g.,basicdags.py).A basic DAG structure was copied from an existing Airflow monitoring DAG to serve as a template.
DAG Definition Parameters:
dag_id: The unique identifier for the DAG (e.g.,my_basic_DAG).description: A brief explanation of the DAG's purpose.schedule_interval: How frequently the DAG should run (e.g.,timedelta(minutes=10)for every 10 minutes, orNonefor manual triggering only).start_date: The specific date from which the DAG should start running (e.g.,datetime(2023, 1, 1)).catchup=False: Prevents the DAG from running for past missed schedules.
Creating Tasks:
BashOperator: A task namedt1(orbash_operator) was created to execute a simpleecho "test command"bash command.PythonOperator:First, the
PythonOperatorwas imported:from airflow.operators.python import PythonOperator.A simple Python function,
def hello_world(): print("Hello World"), was defined.A task named
hello_world_task(orpython_operator) was created, referencing thehello_worldfunction as itspython_callableargument.
Uploading the DAG: The
.pyfile was uploaded to the designated DAGs folder within the Google Cloud Storage bucket associated with the Composer environment.Monitoring and Triggering:
After uploading, it takes approximately $1$ to $2$ minutes for the DAG to appear in the Airflow UI.
The DAG's status and execution can be viewed in the
Graph VieworGrid View.DAGs can be manually triggered using the