Event Processing Pipeline
The deterministic event processing pipeline at the core of TerraGuard -- how raw disaster data from GDACS, USGS, and NHC is normalized, deduplicated, correlated, and persisted.
Overview
The event processing pipeline is the foundation of TerraGuard. It transforms raw disaster feeds from multiple authoritative sources into a single, deduplicated stream of canonical disaster events. The pipeline is entirely deterministic -- no AI or heuristics involved -- ensuring that every event can be traced back to its original source data.
The Event Processor is a standalone Go binary that replaced an earlier architecture built on AWS Lambda, SQS, and DynamoDB. The single-binary approach eliminates cold starts, reduces operational complexity, and makes the entire pipeline testable with a single make test.
Source Adapter Pattern
Each data source implements the SourceAdapter interface, which defines how to fetch, parse, and normalize events from that source. This pattern makes adding new sources straightforward -- implement the interface, register the adapter, and the rest of the pipeline handles it.
Each adapter runs on its own poll schedule:
| Source | Poll Interval | Event Types | Feed Format |
|---|---|---|---|
| GDACS | 5 minutes | EQ, TC, FL, VO, DR, WF | RSS + JSON API |
| USGS | 2 minutes | EQ only | GeoJSON |
| NHC | 10 minutes | TC only | RSS + KML advisories |
The NormalizedEvent Structure
Every adapter produces a NormalizedEvent -- the common currency of the pipeline. This structure captures the essential attributes of any disaster event regardless of source:
- source and source_event_id -- provenance tracking
- event_type -- enum: EQ, TC, FL, VO, DR, WF
- title and description -- human-readable summary
- location -- latitude/longitude as a PostGIS point
- event_onset -- when the event occurred or was first detected
- severity_level -- source-reported alert level (RED, ORANGE, GREEN)
- measurements -- type-specific data (magnitude, wind speed, rainfall, etc.)
- raw_payload_hash -- MD5 of the original payload for dedup
Pipeline Steps
Step 1: ID Resolution
Some data sources change event identifiers over time. USGS is the primary offender -- a preliminary earthquake ID may be reassigned a new canonical ID as review progresses. Without ID resolution, TerraGuard would create duplicate disaster events for what is actually a single earthquake.
The event_id_aliases table maintains a mapping from any known ID to its canonical form. When a normalized event arrives:
- Check if
source_event_idexists in the alias table - If yes, rewrite to the canonical ID
- If no, register it as a new canonical ID
- When USGS sends an ID change notification, update the alias mapping
This ensures that all records for a single physical event are grouped under one disaster event, regardless of how the source's ID scheme evolves.
Step 2: Hash Deduplication
Every raw payload is hashed with MD5. The hash is checked against raw_event_records.payload_hash. If an exact match exists, the record is silently dropped.
This handles the common case where a source poll returns the same data multiple times. GDACS in particular frequently returns identical payloads across consecutive polls when an event has not been updated.
Step 3: Staleness Check
Even when a payload is not an exact duplicate, it may contain outdated information. The staleness check compares the incoming event's timestamp against the most recent normalized record for the same source event.
If the incoming timestamp is older than or equal to the existing record, it is rejected. This prevents race conditions where a delayed API response arrives after a fresher one has already been processed.
Step 4: Spatial-Temporal Correlation
This is the most sophisticated step in the pipeline. When a normalized event passes dedup and staleness checks, the processor must determine whether it represents a new disaster event or an update to an existing one.
The correlation engine uses PostGIS spatial queries combined with temporal windows to match incoming events against existing disaster events. The parameters are configurable per event type:
# correlation_config.yaml
correlation:
earthquake:
spatial_radius_km: 25
temporal_window_hours: 6
cross_source: true
tropical_cyclone:
spatial_radius_km: 150
temporal_window_hours: 12
cross_source: true
flood:
spatial_radius_km: 50
temporal_window_hours: 24
cross_source: true
volcano:
spatial_radius_km: 15
temporal_window_hours: 48
cross_source: true
drought:
spatial_radius_km: 200
temporal_window_hours: 168
cross_source: false
wildfire:
spatial_radius_km: 30
temporal_window_hours: 24
cross_source: trueThe correlation query works as follows:
Cross-source matching is what makes this powerful. When GDACS reports a magnitude 7.2 earthquake in Turkey and USGS reports the same earthquake 3 minutes later with a slightly different location, the correlation engine matches them to a single disaster_event because they fall within 25km and 6 hours of each other.
Tropical cyclones use a much larger radius (150km) because cyclone positions can vary significantly between reporting agencies, and the storm itself spans a wide area. Droughts disable cross-source matching entirely because drought boundaries are too imprecise for spatial correlation.
Step 5: Atomic Upsert
Once correlation is resolved, the processor writes all three tiers in a single database transaction:
- Insert into
raw_event_records(the verbatim payload) - Insert into
normalized_event_records(the normalized form) - Insert or update
disaster_events(create if new, update if correlated) - Insert into
event_measurements(type-specific data points) - Insert into
location_data(if GeoPop data is available)
The transaction ensures that either all records are written or none are. A crash between steps cannot leave the database in an inconsistent state.
Step 6: Webhook Dispatch
After the transaction commits, the Event Processor sends a fire-and-forget HTTP POST to the Backend API's webhook endpoint. The payload includes the disaster_event_id and a flag indicating whether the event is new or updated.
The webhook is intentionally non-blocking. The webhook endpoint is protected by X-API-Key header authentication (configurable via INTERNAL_API_KEY; skipped when empty for local dev). If the Backend API is unavailable, the webhook fails silently. The Backend API can always catch up by querying the database directly -- the source of truth is always PostgreSQL, never the webhook.
Step 7: Post-Processing (Inngest)
The Backend API receives the webhook and dispatches a chain of Inngest jobs:
Local-first scoring: The scoring step reads the raw event payload already stored in PostgreSQL (by the Go processor during ingestion) and combines it with real-time GeoPop population analysis. No external API calls to USGS or GDACS are made during scoring. For non-green alerts only, USGS PAGER products are fetched as optional enrichment.
These jobs run asynchronously via Inngest, which provides retry logic, concurrency control, and observability. The post-processing pipeline is covered in detail in the following sections:
- Scoring & Priority -- How
severity_scoreandpriorityare computed - Notification Engine -- How matrix matching determines who gets alerted
- Knowledge Discovery -- How web content is found, crawled, and indexed
What This Replaced
The original pipeline used an AWS-native event-driven architecture:
| Before | After |
|---|---|
| AWS Lambda functions (per source) | Single Go binary with adapter pattern |
| SQS queues between stages | In-process pipeline with channels |
| DynamoDB for dedup state | PostgreSQL with hash indexes |
| EventBridge for scheduling | Built-in Go tickers |
| CloudWatch for correlation logic | PostGIS spatial queries |
The Go binary reduced the end-to-end latency from event detection to database write from approximately 8-12 seconds (Lambda cold starts + SQS delays) to under 500 milliseconds. It also eliminated the operational burden of managing five Lambda functions, three SQS queues, and a DynamoDB table.
Pipeline Observability
The Event Processor exposes health and metrics endpoints:
GET /v1/health-- liveness check with source poll statusGET /v1/metrics-- Prometheus-compatible metrics (events processed, dedup hits, correlation matches, errors per source)
Every pipeline decision is logged with structured fields, making it possible to trace exactly why an event was deduplicated, correlated, or created as new.
System Architecture
Complete technical overview of the TerraGuard platform architecture, including all six services, database design, inter-service communication, and infrastructure components.
Scoring & Priority Classification
How TerraGuard computes severity scores and assigns priority levels to disaster events -- the system that determines alerting thresholds and response urgency.