Triggering Workflows – Comprehensive Study Notes

Overview of Triggering Workflows

  • Airflow can launch workflows (DAGs) in two fundamentally different ways:

    • Time-based schedules (chapter 3): convenience strings (e.g. "@daily"), timedelta objects (e.g. timedelta(days=3)), or cron strings (e.g. "30 14 * * *").

    • Event- or action-based triggers (chapter 6): start the workflow after something happens (file appears, code pushed, partition created, etc.).

  • Chapter focuses on three complementary techniques:

    • Sensors ➔ continuously poll for a condition inside the same DAG (pull-based).

    • TriggerDagRunOperator / ExternalTaskSensor ➔ coordinate dependencies between different DAGs (push or pull across DAG boundaries).

    • CLI & REST API ➔ kick off DAGs from outside of Airflow (CI/CD, serverless callback, etc.).


6.1 Polling Conditions With Sensors

Why sensors?
  • Real-world example: Four supermarkets deliver daily promotion data **anytime between 16::0016{:}:00 and 02::0002{:}:00 next day.

    • Naïve schedule: start the workflow at 02::0002{:}:00 ➔ causes up to 9.59.5 hours idle waiting (Fig 6.3).

    • Better: start early and wait only for the individual supermarket that is late.

  • A sensor is an operator subclass that repeatedly checks (“pokes”) a criterion and succeeds only when true.

6.1.1 FileSensor (built-in)
  • Code skeleton:

  from airflow.sensors.filesystem import FileSensor
  wait_for_supermarket_1 = FileSensor(
      task_id="wait_for_supermarket_1",
      filepath="/data/supermarket1/data.csv",
  )
  • Behaviour

    • Pokes every 6060 s by default (poke_interval is configurable).

    • Keeps the task running until either:

    • Condition succeeds → downstream tasks run.

    • timeout reached → raises AirflowSensorTimeout, task fails.

6.1.2 PythonSensor (custom logic)
  • Needed when a simple wildcard isn’t enough; e.g. require all data-*.csv *and* a _SUCCESS marker.

  from airflow.sensors.python import PythonSensor
  def _wait_for_supermarket(supermarket_id):
      path = Path("/data/" + supermarket_id)
      data_files = list(path.glob("data-*.csv"))
      success_file = path / "_SUCCESS"
      return data_files and success_file.exists()

  wait_for_supermarket_1 = PythonSensor(
      task_id="wait_for_supermarket_1",
      python_callable=_wait_for_supermarket,
      op_kwargs={"supermarket_id": "supermarket1"},
  )
  • Appears as a sensor (different colour) in the UI; otherwise behaves identically.

6.1.3 Timeouts, Sensor Deadlock & Concurrency
  • Default timeout =7days=7\,\text{days}; if schedule is daily, a sensor that never returns snowballs many overlapping runs.

  • Concurrency limits exist at several levels:

    • Per DAG (concurrency argument):
      python dag = DAG(..., concurrency=50)

    • Global Airflow config (see §12.6).

  • Sensor deadlock: all task slots occupied by stuck sensors → downstream tasks, even in other DAGs, can’t start (Fig 6.8).

  • Mitigation: set mode="reschedule" so the sensor releases the slot between pokes.

  PythonSensor(..., mode="reschedule")
  • New task state up_for_reschedule in UI (Fig 6.9).


6.2 Triggering Other DAGs

Motivation for DAG-to-DAG triggering
  • Create-metrics logic originally downstream of each supermarket branch ➔ code duplication (Fig 6.12).

  • Solution: split into two DAGs

    • DAG 1: ingest & process per supermarket.

    • DAG 2: create metrics.

6.2.1 TriggerDagRunOperator (push)
  • In DAG 1:

  TriggerDagRunOperator(
      task_id="trigger_create_metrics_dag_supermarket_1",
      trigger_dag_id="create_metrics",
  )
  • trigger_dag_id must match the target DAG’s dag_id (listing 6.4).

  • Target DAG can have schedule_interval=None if it should only run when triggered.

  • UI cues:

    • Scheduled DAG runs have black border; triggered runs don’t (Fig 6.14).

    • run_id prefixes: scheduled__, backfill__, manual__ (triggered by button or operator) (Fig 6.15).

6.2.2 Backfill & Clearing Caveat
  • Clearing a TriggerDagRunOperator does not clear the triggered DAG’s tasks ➔ instead it fires a new run (Fig 6.16). Plan re-processing strategy accordingly.

6.2.3 ExternalTaskSensor (pull across DAGs)
  • Use when multiple upstream DAGs must finish before downstream DAG starts.

  • Mechanics:

  ExternalTaskSensor(
      task_id="wait_for_process_supermarket",
      external_dag_id="ingest_supermarket_data",
      external_task_id="process_supermarket",
      execution_delta=timedelta(hours=4),  # optional offset
  )
  • Alignment challenges:

    • Default: looks for same execution_date in external task.

    • If DAG schedules differ, provide execution_delta (positive = look earlier) or execution_date_fn (returns list of candidate dates) (Fig 6.22).

6.2.4 Graph Patterns Enabled
  • One-to-one triggering (left diagram, Fig 6.17).

  • One-to-many (middle) or many-to-one (right) by combining TriggerDagRunOperator and/or ExternalTaskSensor.


6.3 Starting Workflows With REST API & CLI

Airflow CLI
  • Trigger a run with current timestamp:

  airflow dags trigger dag1
  • Pass arbitrary JSON config:

  airflow dags trigger -c '{"supermarket_id": 1}' dag1
  • Inside DAG tasks, access via context:

  context["dag_run"].conf  # => {'supermarket_id': 1}
  • Enables one generic DAG to serve multiple cases (Fig 6.23) but note: if DAG relies on dag_run.conf, you typically set schedule_interval=None.

REST API (stable v1 endpoints)
  • Example request:

  curl -u admin:admin \
       -X POST \
       -H "Content-Type: application/json" \
       "http://localhost:8080/api/v1/dags/print_dag_run_conf/dagRuns" \
       -d '{"conf": {"supermarket": 1}}'
  • Response contains fields:

    • dag_run_id (prefixed manual__)

    • execution_date, state, external_trigger=true, etc.

  • Security: avoid plaintext basic auth in production; consult Airflow auth back-ends.


Practical / Ethical / Operational Implications

  • Operational limits: Always dimension concurrency & global parallelism; sensors in poke mode can cripple clusters.

  • Design principle: Keep DAGs single-responsibility; trigger or sense across boundaries rather than duplicating tasks.

  • Error handling: Provide realistic timeout and fallback paths; missing data should fail fast not stall indefinitely.

  • Auditability: triggered runs are labelled (manual__, etc.) ➔ easy to trace why a DAG executed.

  • Cost & efficiency: mode="reschedule" saves worker slots (= compute cost) versus naive polling.

  • Security: External triggers via REST must use secure auth (token, OAuth, etc.) and should be rate-limited.


Key Equations / Numeric References

  • Idle wait in naïve schedule for supermarket 1: 9.5 hours9.5\ \text{hours}.

  • Default sensor poke interval: 60 seconds60\ \text{seconds}.

  • Default sensor timeout: 7 days=7×24 h7\ \text{days} = 7 \times 24 \text{ h}.

  • Example concurrency setting: concurrency=50concurrency = 50 tasks per DAG.


Chapter 6 Takeaways

  • Sensors allow in-DAG waiting; choose poke vs reschedule wisely.

  • TriggerDagRunOperator pushes execution to other DAGs; ExternalTaskSensor pulls information from other DAGs.

  • Splitting DAGs reduces duplication and clarifies logic, but complicates backfill & clearing semantics.

  • Workflows can be started programmatically via CLI or REST, passing dynamic config that propagates to tasks.

  • Always consider concurrency limits, timeout settings, and security implications when designing trigger mechanisms.