Patterns
DAG design, sensors, backfills, dynamic task generation, retries and idempotency, secrets, ML pipelines
Patterns
The patterns that turn a working orchestrator into one teams can rely on.
DAG Design
Idempotency is the foundation. Every task should be safe to re-run any number of times:
- Inserts that detect duplicates (UPSERT, MERGE, dbt's incremental with
unique_key) - Files written with deterministic paths and overwrite semantics
- API calls with idempotency keys
- "Process all events between
start_tsandend_ts" instead of "process the next batch"
The orchestrator will retry; non-idempotent tasks corrupt state.
Pure functions of inputs: a task should produce the same output given the same date / partition. This makes backfills correct.
Small, focused tasks: one task = one concept. A 500-line script as a single task is hard to debug, retry, or parallelize. Split it.
Heavy work outside the orchestrator process: the orchestrator dispatches; the actual computation runs in a warehouse, on Spark, in a separate worker pod. The orchestrator's process stays small and fast.
Partitioned DAGs and Backfills
The killer feature of a real orchestrator is backfill: re-running a date range when something changes.
In Airflow:
with DAG(
"daily_facts",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=True, # !! key
) as dag:
...If you turn this DAG on with catchup=True and start_date in the past, Airflow runs every missed schedule. To re-run from a specific date:
airflow dags backfill daily_facts --start-date 2026-03-01 --end-date 2026-03-31In Dagster (asset-centric):
from dagster import asset, DailyPartitionsDefinition
daily = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily)
def daily_sales(context):
day = context.partition_key # e.g., "2026-05-21"
fetch_data_for_date(day)Backfill from the UI: select asset, pick a date range, "materialize partitions." Dagster shows the partition status grid (green = complete, red = failed, gray = missing).
Always design with partitions in mind. "Run this for the last day" is universal. Tasks that don't know what time range they're for cause backfill chaos.
Sensors and Event-Driven
Cron isn't always right. Sensors trigger when an external condition is met:
| Sensor | Triggers when |
|---|---|
| File sensor | A specific S3 / GCS / HDFS path appears |
| External task sensor | Another DAG task succeeds (cross-DAG dependencies) |
| Database row sensor | A query returns rows |
| API sensor | An external API endpoint returns a value |
| Time delta sensor | After N seconds since DAG start |
| Custom | Any condition your Python can express |
Airflow:
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_file = S3KeySensor(
task_id="wait_for_data",
bucket_name="data-lake",
bucket_key="exports/{{ ds }}/data.parquet",
timeout=60 * 60 * 6, # 6 hours max wait
poke_interval=60, # check every minute
mode="reschedule", # release the worker between checks
)mode="reschedule" is critical: instead of holding a worker for 6 hours, the sensor releases between polls. Always use it for long-waiting sensors.
Dagster equivalents are @sensor functions that wake regularly and decide whether to trigger.
Sensors replace "scheduled too early" cron anti-patterns: you don't need to schedule a job at 04:00 hoping the file arrives by then. Wait for the file.
Dynamic Task Generation
Sometimes the number of tasks isn't known until runtime:
# Airflow 2.x dynamic task mapping
@task
def get_regions():
return ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"]
@task
def process_region(region: str):
print(f"Processing {region}")
process_region.expand(region=get_regions())The DAG now has 4 task instances (or N — whatever get_regions returns), all running in parallel where allowed. Fan-out is built-in.
In Dagster, dynamic outputs:
@op(out=DynamicOut())
def fan_out():
for region in get_regions():
yield DynamicOutput(region, mapping_key=region)
@op
def process(region):
...Use cases: per-region jobs, per-customer batches, per-file processing, per-shard.
Retries and Failure Strategy
Retries should be calibrated:
| Failure type | Strategy |
|---|---|
| Transient network glitch | Retry 3-5 times, exponential backoff |
| Upstream not ready | Retry with longer interval; or use a sensor |
| Auth failure | No retry — fail loud, page someone |
| Data quality failure | No retry (won't fix itself) — alert + manual investigation |
| Timeout / OOM | Retry once with more resources, then fail |
In Airflow:
PythonOperator(
task_id="...",
retries=3,
retry_delay=timedelta(minutes=2),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
on_failure_callback=alert_pagerduty,
on_retry_callback=log_retry,
)For some failure types, you want a clean dead-letter: failed records get archived for manual triage rather than blocking the whole DAG.
Secrets Management
Pipelines need credentials: warehouse passwords, API keys, OAuth tokens.
| Method | Pros / Cons |
|---|---|
| Environment variables | Easy; risk of leakage in logs |
| Airflow Connections / Variables | Built-in encryption; UI managed |
| External Secrets Operator + Vault | Best; rotation supported |
| Cloud-native (AWS Secrets Manager, GCP Secret Manager) | Good; native to your cloud |
In Airflow:
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
# Variable (encrypted in Airflow's metastore)
api_key = Variable.get("stripe_api_key")
# Or fetch from AWS Secrets Manager
import boto3
secret = boto3.client('secretsmanager').get_secret_value(SecretId='prod/stripe')Connect to Secrets. Never commit secrets to DAG source; never print them in task logs.
ML Pipelines
ML adds its own shape on top of orchestration:
ingest_features → featurize → train → evaluate → register → deploy_if_betterEach step has unique needs:
- featurize: heavy compute, often Spark, idempotent per date range
- train: GPU; takes hours; produces a model artifact
- evaluate: compares to existing production model
- register: writes the model to MLflow / SageMaker / MLOps registry
- deploy_if_better: a conditional task; deploys only if metrics improve
In Airflow with the KubernetesPodOperator or EmrAddStepsOperator. In Dagster, type-aware: input/output are typed; pipeline catches "model A expected but B given" before runtime.
Dedicated ML pipeline tools (Kubeflow Pipelines, Flyte, Metaflow) handle this domain better than generic orchestrators. Many teams use both: data orchestrator (Airflow/Dagster) calls into ML pipeline (Kubeflow/Flyte).
Cross-DAG / Cross-Team
When pipelines span teams:
- Dataset triggers (Airflow 2.4+): a DAG can be scheduled on the materialization of a dataset (URI), regardless of which DAG produced it
- External task sensor: explicit wait on another DAG's task
- Asset-centric (Dagster): downstream assets automatically depend on upstream regardless of "job"
Cross-team is where orchestrator-vs-orchestrator integration matters. Dagster's "you depend on the asset, not the team's DAG" composes better than Airflow's task-level wiring.
Observability
Each run is a story. Capture:
- Status: success / failed / running / skipped — for each task
- Duration: detect creep; if a job that takes 10m suddenly takes 2h, find out
- Inputs / outputs: what date / partition was processed
- Logs: stdout/stderr, ideally streamed live for long-running
- Lineage: which assets did this affect downstream
Wire into Monitoring:
Airflow → StatsD / Prometheus exporter → Grafana
Dagster → built-in metrics + OpenTelemetry exportAlerts on: failed runs (especially repeated), duration anomalies, missed schedules (DAG didn't start when expected), backlog (tasks queued beyond threshold).
SLAs and Freshness
For business-critical pipelines:
# Airflow
PythonOperator(
task_id="critical_report",
sla=timedelta(hours=2), # alert if not done within 2h of expected
...
)# Dagster
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=120))
def daily_revenue(...): ...The orchestrator alerts the on-call if the deadline is missed. The dashboard isn't fresh? You know before the executive does.
Data Quality Gates
A task can fail the run when data quality is off:
@task
def quality_check(**context):
rows = check_row_count_today()
if rows < expected_min:
raise AirflowFailException(f"Only {rows} rows; expected >= {expected_min}")
if has_nulls_in_required_columns():
raise AirflowFailException("Required columns have NULLs")The downstream publish_to_bi waits on quality_check; bad data doesn't reach dashboards.
Wire to dbt tests, Great Expectations, Soda, or custom checks.
Anti-Patterns
Cron-style thinking in a DAG world. Scheduling tasks to "start late enough that everything's done." Use sensors / explicit dependencies.
XCom as a data pipeline. Pushing dataframes through XCom kills the metadata DB. Pass references (S3 paths, warehouse tables), not data.
Long-running tasks holding workers. A task that waits 6 hours holds a worker the whole time. Use sensors in reschedule mode or break into smaller tasks.
The God DAG. One 500-task DAG that no one understands. Decompose by concern (extract DAG, transform DAG, publish DAG) — and trigger via dataset/sensor.
No backfill thinking. Tasks that assume "today" but can't be re-run for an arbitrary date. Pass the partition explicitly.
Orchestrator does the work. pandas.read_sql 50GB in the orchestrator worker → OOM. Push compute to the warehouse / Spark / dedicated workers.
Cron-as-code orphans. People sneak cron jobs onto servers because the orchestrator is annoying. The orchestrator must be easier than crontab -e.
What's Next
- Best Practices — code organization, testing, observability, capacity, scaling, pitfalls