TerraGuard

Event Processing Pipeline

The deterministic event processing pipeline at the core of TerraGuard -- how raw disaster data from GDACS, USGS, and NHC is normalized, deduplicated, correlated, and persisted.

Overview

The event processing pipeline is the foundation of TerraGuard. It transforms raw disaster feeds from multiple authoritative sources into a single, deduplicated stream of canonical disaster events. The pipeline is entirely deterministic -- no AI or heuristics involved -- ensuring that every event can be traced back to its original source data.

The Event Processor is a standalone Go binary that replaced an earlier architecture built on AWS Lambda, SQS, and DynamoDB. The single-binary approach eliminates cold starts, reduces operational complexity, and makes the entire pipeline testable with a single make test.

Loading diagram...

Source Adapter Pattern

Each data source implements the SourceAdapter interface, which defines how to fetch, parse, and normalize events from that source. This pattern makes adding new sources straightforward -- implement the interface, register the adapter, and the rest of the pipeline handles it.

Loading diagram...

Each adapter runs on its own poll schedule:

SourcePoll IntervalEvent TypesFeed Format
GDACS5 minutesEQ, TC, FL, VO, DR, WFRSS + JSON API
USGS2 minutesEQ onlyGeoJSON
NHC10 minutesTC onlyRSS + KML advisories

The NormalizedEvent Structure

Every adapter produces a NormalizedEvent -- the common currency of the pipeline. This structure captures the essential attributes of any disaster event regardless of source:

  • source and source_event_id -- provenance tracking
  • event_type -- enum: EQ, TC, FL, VO, DR, WF
  • title and description -- human-readable summary
  • location -- latitude/longitude as a PostGIS point
  • event_onset -- when the event occurred or was first detected
  • severity_level -- source-reported alert level (RED, ORANGE, GREEN)
  • measurements -- type-specific data (magnitude, wind speed, rainfall, etc.)
  • raw_payload_hash -- MD5 of the original payload for dedup

Pipeline Steps

Step 1: ID Resolution

Some data sources change event identifiers over time. USGS is the primary offender -- a preliminary earthquake ID may be reassigned a new canonical ID as review progresses. Without ID resolution, TerraGuard would create duplicate disaster events for what is actually a single earthquake.

The event_id_aliases table maintains a mapping from any known ID to its canonical form. When a normalized event arrives:

  1. Check if source_event_id exists in the alias table
  2. If yes, rewrite to the canonical ID
  3. If no, register it as a new canonical ID
  4. When USGS sends an ID change notification, update the alias mapping

This ensures that all records for a single physical event are grouped under one disaster event, regardless of how the source's ID scheme evolves.

Step 2: Hash Deduplication

Every raw payload is hashed with MD5. The hash is checked against raw_event_records.payload_hash. If an exact match exists, the record is silently dropped.

This handles the common case where a source poll returns the same data multiple times. GDACS in particular frequently returns identical payloads across consecutive polls when an event has not been updated.

Loading diagram...

Step 3: Staleness Check

Even when a payload is not an exact duplicate, it may contain outdated information. The staleness check compares the incoming event's timestamp against the most recent normalized record for the same source event.

If the incoming timestamp is older than or equal to the existing record, it is rejected. This prevents race conditions where a delayed API response arrives after a fresher one has already been processed.

Step 4: Spatial-Temporal Correlation

This is the most sophisticated step in the pipeline. When a normalized event passes dedup and staleness checks, the processor must determine whether it represents a new disaster event or an update to an existing one.

The correlation engine uses PostGIS spatial queries combined with temporal windows to match incoming events against existing disaster events. The parameters are configurable per event type:

# correlation_config.yaml
correlation:
  earthquake:
    spatial_radius_km: 25
    temporal_window_hours: 6
    cross_source: true

  tropical_cyclone:
    spatial_radius_km: 150
    temporal_window_hours: 12
    cross_source: true

  flood:
    spatial_radius_km: 50
    temporal_window_hours: 24
    cross_source: true

  volcano:
    spatial_radius_km: 15
    temporal_window_hours: 48
    cross_source: true

  drought:
    spatial_radius_km: 200
    temporal_window_hours: 168
    cross_source: false

  wildfire:
    spatial_radius_km: 30
    temporal_window_hours: 24
    cross_source: true

The correlation query works as follows:

Loading diagram...

Cross-source matching is what makes this powerful. When GDACS reports a magnitude 7.2 earthquake in Turkey and USGS reports the same earthquake 3 minutes later with a slightly different location, the correlation engine matches them to a single disaster_event because they fall within 25km and 6 hours of each other.

Tropical cyclones use a much larger radius (150km) because cyclone positions can vary significantly between reporting agencies, and the storm itself spans a wide area. Droughts disable cross-source matching entirely because drought boundaries are too imprecise for spatial correlation.

Step 5: Atomic Upsert

Once correlation is resolved, the processor writes all three tiers in a single database transaction:

  1. Insert into raw_event_records (the verbatim payload)
  2. Insert into normalized_event_records (the normalized form)
  3. Insert or update disaster_events (create if new, update if correlated)
  4. Insert into event_measurements (type-specific data points)
  5. Insert into location_data (if GeoPop data is available)

The transaction ensures that either all records are written or none are. A crash between steps cannot leave the database in an inconsistent state.

Loading diagram...

Step 6: Webhook Dispatch

After the transaction commits, the Event Processor sends a fire-and-forget HTTP POST to the Backend API's webhook endpoint. The payload includes the disaster_event_id and a flag indicating whether the event is new or updated.

The webhook is intentionally non-blocking. The webhook endpoint is protected by X-API-Key header authentication (configurable via INTERNAL_API_KEY; skipped when empty for local dev). If the Backend API is unavailable, the webhook fails silently. The Backend API can always catch up by querying the database directly -- the source of truth is always PostgreSQL, never the webhook.

Step 7: Post-Processing (Inngest)

The Backend API receives the webhook and dispatches a chain of Inngest jobs:

Loading diagram...

Local-first scoring: The scoring step reads the raw event payload already stored in PostgreSQL (by the Go processor during ingestion) and combines it with real-time GeoPop population analysis. No external API calls to USGS or GDACS are made during scoring. For non-green alerts only, USGS PAGER products are fetched as optional enrichment.

These jobs run asynchronously via Inngest, which provides retry logic, concurrency control, and observability. The post-processing pipeline is covered in detail in the following sections:

What This Replaced

The original pipeline used an AWS-native event-driven architecture:

BeforeAfter
AWS Lambda functions (per source)Single Go binary with adapter pattern
SQS queues between stagesIn-process pipeline with channels
DynamoDB for dedup statePostgreSQL with hash indexes
EventBridge for schedulingBuilt-in Go tickers
CloudWatch for correlation logicPostGIS spatial queries

The Go binary reduced the end-to-end latency from event detection to database write from approximately 8-12 seconds (Lambda cold starts + SQS delays) to under 500 milliseconds. It also eliminated the operational burden of managing five Lambda functions, three SQS queues, and a DynamoDB table.

Pipeline Observability

The Event Processor exposes health and metrics endpoints:

  • GET /v1/health -- liveness check with source poll status
  • GET /v1/metrics -- Prometheus-compatible metrics (events processed, dedup hits, correlation matches, errors per source)

Every pipeline decision is logged with structured fields, making it possible to trace exactly why an event was deduplicated, correlated, or created as new.

On this page