Airflow DAGs: Creation, Operators, and Dependencies

Creating Your First Airflow DAG in Google Cloud Composer

  • The previous video covered creating a Composer environment and a walkthrough of the Airflow UI, including the default Airflow monitoring DAG.

  • This video focuses on creating basic DAGs, which serves as a foundation for understanding more complex DAGs and data pipelines in future videos.

  • Key topics include understanding how to create a DAG, the role of operators in Airflow, and how to utilize these operators effectively.

Steps to Create a DAG in Google Cloud Composer

Creating a DAG involves a few simple steps:

  1. Select Apache Airflow Operators: Choose operators based on your specific use case or requirements. These can be related to Google Cloud, AWS, GitHub, Python, or bash commands.

  2. Define Tasks and Dependencies: Use operators to create individual tasks within your DAG. Clearly define the dependencies between these tasks to establish the order of execution.

  3. Create a DAG Definition File: Write your DAG logic in a Python file (e.g., my_dag.py).

  4. Upload to Cloud Storage Bucket: For Google Cloud Composer, upload the .py file to the designated dags folder within the Cloud Storage bucket that was created when the Composer environment was set up.

    • If using Airflow on a VM instance or Linux server, you would upload the file to the local dags folder on the server. The exact path of this folder can be found in the airflow.config file.

  5. DAG Processing: Once uploaded, the file is processed by the DAG processor. If the syntax is correct, the DAG will be parsed, scheduled, and then become available on the Airflow web server.

    • Cloud Composer handles DAG parsing, scheduling, execution, error handling, retries, monitoring, and logging automatically.

Understanding Airflow Operators

  • Definition: Airflow operators act as workers or middlemen that perform specific functions or tasks within an Airflow DAG.

  • Purpose: They instruct Airflow on what actions need to be carried out, such as running a Python script, executing an SQL query, or transferring data between cloud services.

  • Automation: Operators are essential for automating processes and abstracting complex code into reusable components.

  • Benefits of Operators:

    • They provide predefined code templates, allowing users to achieve complex operations by simply passing parameters (e.g., a SQL query for a MySqlOperator, or bucket and file names for a GoogleCloudStorageToBigQueryOperator).

    • This significantly reduces the amount of boilerplate code compared to implementing the same logic with plain Python.

  • Task and Pipeline Creation: Each operator creates a task. Combining multiple tasks using operators allows you to build a comprehensive data pipeline.

  • Provider-Specific Operators: Airflow provides operators tailored for various platforms and services:

    • PythonOperator: Executes a standard Python callable (function).

    • BashOperator: Runs a bash command.

    • MySQLOperator: Executes SQL queries on a MySQL database.

    • Google Cloud Operators (GCP): A wide range of operators for Google Cloud services, such as GoogleCloudStorageToBigQueryOperator for moving data from Google Cloud Storage (GCS) to BigQuery.

    • AWS Operators: Operators for various Amazon Web Services (AWS).

    • HTTP Operator, Email Operator, etc.

Practical Demonstration: Creating Basic DAGs

1. Initial Setup and Basic DAG Creation
  • All DAGs are written in Python, so the file name must end with the .py extension (e.g., basicdags.py).

  • A basic DAG structure was copied from an existing Airflow monitoring DAG to serve as a template.

  • DAG Definition Parameters:

    • dag_id: The unique identifier for the DAG (e.g., my_basic_DAG).

    • description: A brief explanation of the DAG's purpose.

    • schedule_interval: How frequently the DAG should run (e.g., timedelta(minutes=10) for every 10 minutes, or None for manual triggering only).

    • start_date: The specific date from which the DAG should start running (e.g., datetime(2023, 1, 1)).

    • catchup=False: Prevents the DAG from running for past missed schedules.

  • Creating Tasks:

    • BashOperator: A task named t1 (or bash_operator) was created to execute a simple echo "test command" bash command.

    • PythonOperator:

      • First, the PythonOperator was imported: from airflow.operators.python import PythonOperator.

      • A simple Python function, def hello_world(): print("Hello World"), was defined.

      • A task named hello_world_task (or python_operator) was created, referencing the hello_world function as its python_callable argument.

  • Uploading the DAG: The .py file was uploaded to the designated DAGs folder within the Google Cloud Storage bucket associated with the Composer environment.

  • Monitoring and Triggering:

    • After uploading, it takes approximately $1$ to $2$ minutes for the DAG to appear in the Airflow UI.

    • The DAG's status and execution can be viewed in the Graph View or Grid View.

    • DAGs can be manually triggered using the