Triggering Workflows – Comprehensive Study Notes
Overview of Triggering Workflows
Airflow can launch workflows (DAGs) in two fundamentally different ways:
Time-based schedules (chapter 3): convenience strings (e.g. "@daily"),
timedeltaobjects (e.g.timedelta(days=3)), or cron strings (e.g. "30 14 * * *").Event- or action-based triggers (chapter 6): start the workflow after something happens (file appears, code pushed, partition created, etc.).
Chapter focuses on three complementary techniques:
Sensors ➔ continuously poll for a condition inside the same DAG (pull-based).
TriggerDagRunOperator / ExternalTaskSensor ➔ coordinate dependencies between different DAGs (push or pull across DAG boundaries).
CLI & REST API ➔ kick off DAGs from outside of Airflow (CI/CD, serverless callback, etc.).
6.1 Polling Conditions With Sensors
Why sensors?
Real-world example: Four supermarkets deliver daily promotion data **anytime between and next day.
Naïve schedule: start the workflow at ➔ causes up to hours idle waiting (Fig 6.3).
Better: start early and wait only for the individual supermarket that is late.
A sensor is an operator subclass that repeatedly checks (“pokes”) a criterion and succeeds only when true.
6.1.1 FileSensor (built-in)
Code skeleton:
from airflow.sensors.filesystem import FileSensor
wait_for_supermarket_1 = FileSensor(
task_id="wait_for_supermarket_1",
filepath="/data/supermarket1/data.csv",
)
Behaviour
Pokes every s by default (
poke_intervalis configurable).Keeps the task running until either:
Condition succeeds → downstream tasks run.
timeoutreached → raisesAirflowSensorTimeout, task fails.
6.1.2 PythonSensor (custom logic)
Needed when a simple wildcard isn’t enough; e.g. require all data-*.csv *and* a
_SUCCESSmarker.
from airflow.sensors.python import PythonSensor
def _wait_for_supermarket(supermarket_id):
path = Path("/data/" + supermarket_id)
data_files = list(path.glob("data-*.csv"))
success_file = path / "_SUCCESS"
return data_files and success_file.exists()
wait_for_supermarket_1 = PythonSensor(
task_id="wait_for_supermarket_1",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id": "supermarket1"},
)
Appears as a sensor (different colour) in the UI; otherwise behaves identically.
6.1.3 Timeouts, Sensor Deadlock & Concurrency
Default
timeout; if schedule is daily, a sensor that never returns snowballs ➡ many overlapping runs.Concurrency limits exist at several levels:
Per DAG (
concurrencyargument):python dag = DAG(..., concurrency=50)Global Airflow config (see §12.6).
Sensor deadlock: all task slots occupied by stuck sensors → downstream tasks, even in other DAGs, can’t start (Fig 6.8).
Mitigation: set
mode="reschedule"so the sensor releases the slot between pokes.
PythonSensor(..., mode="reschedule")
New task state
up_for_reschedulein UI (Fig 6.9).
6.2 Triggering Other DAGs
Motivation for DAG-to-DAG triggering
Create-metrics logic originally downstream of each supermarket branch ➔ code duplication (Fig 6.12).
Solution: split into two DAGs
DAG 1: ingest & process per supermarket.
DAG 2: create metrics.
6.2.1 TriggerDagRunOperator (push)
In DAG 1:
TriggerDagRunOperator(
task_id="trigger_create_metrics_dag_supermarket_1",
trigger_dag_id="create_metrics",
)
trigger_dag_idmust match the target DAG’sdag_id(listing 6.4).Target DAG can have
schedule_interval=Noneif it should only run when triggered.UI cues:
Scheduled DAG runs have black border; triggered runs don’t (Fig 6.14).
run_idprefixes:scheduled__,backfill__,manual__(triggered by button or operator) (Fig 6.15).
6.2.2 Backfill & Clearing Caveat
Clearing a
TriggerDagRunOperatordoes not clear the triggered DAG’s tasks ➔ instead it fires a new run (Fig 6.16). Plan re-processing strategy accordingly.
6.2.3 ExternalTaskSensor (pull across DAGs)
Use when multiple upstream DAGs must finish before downstream DAG starts.
Mechanics:
ExternalTaskSensor(
task_id="wait_for_process_supermarket",
external_dag_id="ingest_supermarket_data",
external_task_id="process_supermarket",
execution_delta=timedelta(hours=4), # optional offset
)
Alignment challenges:
Default: looks for same
execution_datein external task.If DAG schedules differ, provide
execution_delta(positive = look earlier) orexecution_date_fn(returns list of candidate dates) (Fig 6.22).
6.2.4 Graph Patterns Enabled
One-to-one triggering (left diagram, Fig 6.17).
One-to-many (middle) or many-to-one (right) by combining TriggerDagRunOperator and/or ExternalTaskSensor.
6.3 Starting Workflows With REST API & CLI
Airflow CLI
Trigger a run with current timestamp:
airflow dags trigger dag1
Pass arbitrary JSON config:
airflow dags trigger -c '{"supermarket_id": 1}' dag1
Inside DAG tasks, access via context:
context["dag_run"].conf # => {'supermarket_id': 1}
Enables one generic DAG to serve multiple cases (Fig 6.23) but note: if DAG relies on
dag_run.conf, you typically setschedule_interval=None.
REST API (stable v1 endpoints)
Example request:
curl -u admin:admin \
-X POST \
-H "Content-Type: application/json" \
"http://localhost:8080/api/v1/dags/print_dag_run_conf/dagRuns" \
-d '{"conf": {"supermarket": 1}}'
Response contains fields:
dag_run_id(prefixedmanual__)execution_date,state,external_trigger=true, etc.
Security: avoid plaintext basic auth in production; consult Airflow auth back-ends.
Practical / Ethical / Operational Implications
Operational limits: Always dimension
concurrency& global parallelism; sensors inpokemode can cripple clusters.Design principle: Keep DAGs single-responsibility; trigger or sense across boundaries rather than duplicating tasks.
Error handling: Provide realistic
timeoutand fallback paths; missing data should fail fast not stall indefinitely.Auditability: triggered runs are labelled (
manual__, etc.) ➔ easy to trace why a DAG executed.Cost & efficiency:
mode="reschedule"saves worker slots (= compute cost) versus naive polling.Security: External triggers via REST must use secure auth (token, OAuth, etc.) and should be rate-limited.
Key Equations / Numeric References
Idle wait in naïve schedule for supermarket 1: .
Default sensor poke interval: .
Default sensor timeout: .
Example concurrency setting: tasks per DAG.
Chapter 6 Takeaways
Sensors allow in-DAG waiting; choose
pokevsreschedulewisely.TriggerDagRunOperator pushes execution to other DAGs; ExternalTaskSensor pulls information from other DAGs.
Splitting DAGs reduces duplication and clarifies logic, but complicates backfill & clearing semantics.
Workflows can be started programmatically via CLI or REST, passing dynamic config that propagates to tasks.
Always consider concurrency limits, timeout settings, and security implications when designing trigger mechanisms.