Patterns
Multi-backend routing, sampling strategies, PII masking, logs-to-metrics, edge vs aggregator topology, buffering
Patterns
The patterns that make pipelines pay for themselves and survive production traffic.
Topology: Edge + Aggregator
Two common deployment shapes:
Edge-only
[app] → [agent on host] → [vendor backend]Vector/Fluent Bit on every node, ships directly. Simple, but every agent talks to the backend independently — backend connection storms during incident, no central place to apply enrichment.
Edge + aggregator (recommended at scale)
[app] → [agent on host] → [central aggregator] → [vendor backend]
→ [archive]
→ [SIEM]Edge agents are dumb (just collect and forward). Central aggregators handle transforms, sampling, fan-out, and buffering. Pros:
- One place to change routing or transforms
- Aggregator buffers during backend outages (edge agents are stateless)
- Vendor connections from few aggregators, not thousands of agents
- Easier secrets handling (API keys live in aggregator only)
The aggregator usually scales horizontally; load balance via Kafka or NATS in front for at-least-once delivery.
Routing by Content
A single source, many sinks, based on what's in the event:
# Vector
[sources.all_logs]
type = "kubernetes_logs"
# Errors → SIEM and Datadog
[transforms.errors]
type = "filter"
inputs = ["all_logs"]
condition = '.level == "error" || ."http.status_code" >= 500'
# Audit events → SIEM only
[transforms.audit]
type = "filter"
inputs = ["all_logs"]
condition = 'starts_with(string!(.event), "audit.")'
[sinks.siem]
type = "splunk_hec_logs"
inputs = ["errors", "audit"]
# ...
# Everything → S3 cheap archive
[sinks.archive]
type = "aws_s3"
inputs = ["all_logs"]
# ...
# Non-audit, non-debug → Datadog
[transforms.dd_filter]
type = "filter"
inputs = ["all_logs"]
condition = 'starts_with(string!(.event), "audit.") == false && .level != "debug"'
[sinks.datadog]
type = "datadog_logs"
inputs = ["dd_filter"]Same data, different cuts for different consumers. Cost-optimize each downstream independently.
PII Masking
Compliance often demands that PII never reaches certain backends. Mask at the pipeline:
# Vector VRL
[transforms.mask_pii]
type = "remap"
inputs = ["all_logs"]
source = '''
# Mask credit cards (any 13-19 digit run)
.message = replace(string!(.message), r'\b\d{13,19}\b', "[REDACTED:CC]")
# Mask emails
.message = replace(string!(.message), r'[\w.+-]+@[\w-]+\.[\w.-]+', "[REDACTED:EMAIL]")
# Mask SSNs
.message = replace(string!(.message), r'\b\d{3}-\d{2}-\d{4}\b', "[REDACTED:SSN]")
# Drop a known-sensitive field entirely
del(.user_password)
del(.ssn)
'''OTel Collector has a transform processor with OTTL (OpenTelemetry Transformation Language) that does the same. Test these transforms carefully — false negatives leak PII, false positives obscure debugging.
Logs-to-Metrics
Logs are expensive; metrics are cheap. Convert at the pipeline:
[transforms.checkout_metric]
type = "log_to_metric"
inputs = ["all_logs"]
[[transforms.checkout_metric.metrics]]
field = "event"
type = "counter"
name = "events_total"
tags = { event = "{{event}}", status = "{{status}}" }
[[transforms.checkout_metric.metrics]]
field = "duration_ms"
type = "histogram"
name = "request_duration_ms"
buckets = [10, 25, 50, 100, 250, 500, 1000, 2500]
tags = { route = "{{route}}" }Result: a stream of structured logs becomes a small set of Prometheus-style counters and histograms. Alert on the metrics; keep the logs only for debugging.
Tail-Based Trace Sampling
Most trace sampling decisions are made at trace start (head sampling). But you don't know yet which traces will be interesting. Tail sampling looks at completed traces:
# OpenTelemetry Collector
processors:
tail_sampling:
decision_wait: 10s
num_traces: 100000
expected_new_traces_per_sec: 10000
policies:
# Always keep errors
- name: errors
type: status_code
status_code: { status_codes: [ERROR] }
# Always keep slow
- name: slow
type: latency
latency: { threshold_ms: 1000 }
# 1% of healthchecks (or drop entirely)
- name: healthchecks
type: and
and:
and_sub_policy:
- { name: hc-match, type: string_attribute, string_attribute: { key: http.route, values: ["/health"] } }
- { name: hc-sample, type: probabilistic, probabilistic: { sampling_percentage: 0 } }
# 1% of everything else
- name: baseline
type: probabilistic
probabilistic: { sampling_percentage: 1 }This keeps the diagnostic value (every error, every slow request) at a fraction of the cost.
Caveats: tail sampling requires holding traces in memory until decision time, which limits throughput per collector and requires consistent routing (all spans of a trace to the same collector — Kafka partitioning by trace ID).
Buffering for Backend Outages
A backend goes down. What happens to your data?
Without buffering: dropped. With on-disk buffering, the pipeline holds it until the backend recovers.
[sinks.datadog]
type = "datadog_logs"
inputs = ["all_logs"]
buffer.type = "disk"
buffer.max_size = 1073741824 # 1 GB
buffer.when_full = "drop_newest" # or "block"Important: configure the failure mode. block backpressures into the rest of the pipeline; drop_newest keeps the oldest data (good for archive); drop_oldest keeps the newest (good for live debugging during recovery).
Multi-Tenancy
A platform team running a pipeline for multiple product teams:
[sources.k8s_logs]
type = "kubernetes_logs"
# Tag each event with team based on namespace label
[transforms.tag_team]
type = "remap"
inputs = ["k8s_logs"]
source = '''
.team = .kubernetes.pod_labels.team || "platform"
'''
# Route per team
[transforms.team_payments]
type = "filter"
inputs = ["tag_team"]
condition = '.team == "payments"'
[sinks.payments_datadog]
type = "datadog_logs"
inputs = ["team_payments"]
default_api_key = "${PAYMENTS_DD_KEY}"
[transforms.team_growth]
type = "filter"
inputs = ["tag_team"]
condition = '.team == "growth"'
[sinks.growth_datadog]
type = "datadog_logs"
inputs = ["team_growth"]
default_api_key = "${GROWTH_DD_KEY}"Each team owns their backend; pipeline routes. Chargeback works.
Replay and Reprocessing
A genuine superpower: archive everything to S3 (cheap), then replay subsets through the pipeline when needed. Cribl markets this heavily; OpenTelemetry + S3 source + reprocessing config achieves it.
Use cases:
- Forgot to capture a field, want to re-ingest with new parsing
- New analysis tool wants old data
- Incident reconstruction needs raw events
The archive is the source of truth; the pipeline is the query engine over time.
Schema Normalization
Different services emit different shapes. The pipeline normalizes:
[transforms.normalize]
type = "remap"
inputs = ["all_logs"]
source = '''
# Different services name latency differently — pick a canonical name
.duration_ms = .duration_ms || .response_time || .elapsed || .latency_ms
del(.response_time)
del(.elapsed)
del(.latency_ms)
# Standard severity field
.severity = downcase(string!(.level || .severity || .lvl || "info"))
del(.level)
del(.lvl)
'''Downstream queries can rely on consistent field names regardless of which service emitted the log.
Edge Filtering for Cost
Sometimes the cheapest reduction is the one done before the data even leaves the source. Fluent Bit at each K8s node:
# fluent-bit on each node, drops noise before central aggregator
[FILTER]
Name grep
Match k8s.logs.*
Exclude $kubernetes.container_name healthcheck-sidecar
[FILTER]
Name grep
Match k8s.logs.*
Exclude log healthcheck
# Only forward what matters
[OUTPUT]
Name http
Match *
Host central-aggregator
Port 8080Save 30% of bandwidth and aggregator load at the edge.
Anti-Patterns
Massive transform graphs that should be at the source. If your pipeline has 50 transform steps to parse legacy log formats, fix the apps. Pipelines should focus on routing and sampling, not heroic salvage.
Pipeline as code-not-config. Custom plugins in a vendor pipeline = lock-in. Stick to declarative config; if you need code, write a separate microservice.
No backpressure plan. Backend is slow → pipeline buffer fills → memory pressure → OOM → entire pipeline restarts → data lost. Define the queueing strategy upfront.
Schema-less downstream. "We'll just send everything as JSON." Then half the fields are strings, half are numbers, queries are slow, costs are unbounded. Define a schema and enforce it in the pipeline.
Pipeline not in observability itself. The pipeline drops 30% of events during a memory pressure incident, but no metric tells you. Always monitor the pipeline's own health.
What's Next
- Best Practices — reliability, capacity, monitoring the pipeline, pitfalls, scaling