TerraGuard

Event Processor

Go-based deterministic event ingestion service with pluggable source adapters, PostGIS spatial correlation, hash-based deduplication, and atomic upserts.

Overview

The tg-event-processor is a Go service that handles all disaster event ingestion into TerraGuard. It replaces a previous Lambda+SQS+DynamoDB architecture with a single deterministic binary that polls authoritative data sources, normalizes events into a common format, deduplicates them, correlates them with existing records using PostGIS spatial queries, and performs atomic upserts into PostgreSQL.

The service runs on port 5606 and compiles to a ~16MB static binary.

Architecture

Loading diagram...

Key Features

Adapter Pattern

Every data source implements the SourceAdapter interface:

type SourceAdapter interface {
    Name() string
    Poll(ctx context.Context) ([]model.RawEvent, error)
    Normalize(raw model.RawEvent) (*model.NormalizedEvent, error)
    ResolveIDs(raw model.RawEvent) []string
    Config() AdapterConfig
}

Adding a new source requires implementing this interface and registering the adapter in cmd/server/main.go. Zero pipeline code changes needed.

Deterministic Pipeline

Every event passes through the same six-stage pipeline:

  1. ID Resolution -- Resolves all known identifiers for an event. USGS earthquakes can have multiple IDs from different seismic networks (e.g., us7000abc, nn00001). The event_id_aliases table tracks these relationships.

  2. Hash Dedup -- Computes an MD5 hash of the raw event JSON. If the hash matches an existing ingestion record, the event is skipped (no changes since last poll).

  3. Staleness Check -- Compares the source's lastUpdated timestamp against the stored value. Skips events that have not been updated since last ingestion.

  4. PostGIS Correlation -- Uses spatial and temporal queries to find matching DisasterEvent records. Thresholds are configurable per event type in config/correlation.yaml.

  5. Atomic Upsert -- Within a single database transaction, either creates a new DisasterEvent or updates an existing one, records the EventIngestion, and stores ID aliases.

  6. Webhook Notification -- POSTs to the Backend API (/api/v1/internal/event-processed) to trigger the enrichment pipeline via Inngest.

YAML-Configurable Correlation

Correlation thresholds are defined in config/correlation.yaml with no code changes required:

Event TypeSpatial RadiusTemporal WindowMatch by Name
Earthquake25 km6 hoursNo
Tropical Cyclone150 km12 hoursYes
Flood50 km24 hoursNo
Drought100 km48 hoursNo
Wildfire30 km24 hoursNo
Tsunami100 km12 hoursNo
Volcano15 km48 hoursYes

USGS ID Aliasing

USGS earthquakes present a unique challenge: the same physical earthquake can have different IDs from different seismic networks. The USGS ids property contains a comma-separated list like ",us7000abc,nn00001,ci12345,". The adapter's ResolveIDs() method parses all aliases, and the pipeline stores them in the event_id_aliases table so that future updates from any network ID are correctly correlated to the same disaster event.

Testing Endpoints

The service exposes /test routes for manual event injection and data file processing, useful for replaying historical events or testing adapter changes without waiting for a poll cycle. Adapter names in test endpoint paths are case-insensitive (e.g., gdacs, GDACS, and Gdacs all work).

Available test endpoints:

MethodPathDescription
GET/test/filesList available data files for replay
POST/test/ingestIngest events from saved data files
POST/test/ingest/rawIngest raw event JSON directly
POST/test/ingest/fileIngest events from a specific file
POST/test/ingest/batchBatch ingest multiple events
POST/test/save-poll/{adapter}Save poll results to disk for replay

Configuration

VariableDescriptionDefault
PORTHTTP server port5606
LOG_LEVELLogging level (debug, info, warn, error)info
DATABASE_URLPostgreSQL connection stringrequired
DB_POOL_SIZEConnection pool size25
BACKEND_WEBHOOK_URLBackend API webhook endpointhttp://localhost:5601/api/v1/internal/event-processed
WEBHOOK_API_KEYAPI key for webhook authenticationoptional
WEBHOOK_TIMEOUT_SECONDSWebhook request timeout10
WEBHOOK_RETRIESNumber of webhook retry attempts3
GDACS_ENABLEDEnable GDACS adaptertrue
USGS_ENABLEDEnable USGS adaptertrue
NHC_ENABLEDEnable NHC adaptertrue
GDACS_POLL_INTERVALGDACS polling interval5m
USGS_POLL_INTERVALUSGS polling interval2m
NHC_POLL_INTERVALNHC polling interval5m
GEOPOP_API_URLGeoPop API URL for population enrichmenthttp://localhost:5605/api/v1
CORRELATION_CONFIG_PATHPath to correlation thresholds YAMLconfig/correlation.yaml
SCORING_CONFIG_PATHPath to scoring configuration YAMLconfig/scoring.yaml

API Endpoints

MethodPathAuthDescription
GET/healthNoService health with DB and adapter status
GET/adaptersYesStatus of all registered source adapters
GET/metricsYesPipeline processing metrics (processed, skipped, errors)
POST/poll/{adapter}YesTrigger an immediate poll for a specific adapter
GET/test/filesYesList available data files for replay
POST/test/ingestYesIngest events from saved data files
POST/test/ingest/rawYesIngest raw event JSON directly
POST/test/ingest/fileYesIngest events from a specific file
POST/test/ingest/batchYesBatch ingest multiple events
POST/test/save-poll/{adapter}YesSave poll results to disk for replay

Directory Structure

tg-event-processor/
├── cmd/
│   └── server/
│       └── main.go             # Entry point: config, DB, registry, scheduler, HTTP
├── internal/
│   ├── adapter/
│   │   ├── adapter.go          # SourceAdapter interface + helpers
│   │   ├── registry.go         # Adapter registry (register, get, list)
│   │   ├── gdacs.go            # GDACS GeoJSON adapter
│   │   ├── usgs.go             # USGS Earthquake adapter
│   │   └── nhc.go              # NHC ActiveStorms adapter
│   ├── api/
│   │   ├── handler.go          # HTTP handlers (health, adapters, metrics, poll)
│   │   ├── testing.go          # Test/simulation endpoints
│   │   └── middleware.go       # Auth, CORS, logging, recovery
│   ├── config/
│   │   └── config.go           # Environment configuration loading
│   ├── db/
│   │   └── db.go               # PostgreSQL connection pool + migrations
│   ├── model/
│   │   ├── event.go            # RawEvent, NormalizedEvent, enums
│   │   └── result.go           # ProcessResult
│   ├── pipeline/
│   │   ├── pipeline.go         # Core pipeline orchestration
│   │   ├── alias.go            # ID alias resolution
│   │   ├── dedup.go            # Hash-based deduplication
│   │   ├── staleness.go        # Staleness check
│   │   ├── correlation.go      # PostGIS spatial/temporal matching
│   │   ├── upsert.go           # Atomic upsert logic
│   │   └── webhook.go          # Backend webhook notifier
│   └── scheduler/
│       └── scheduler.go        # Per-adapter poll scheduling
├── config/
│   ├── correlation.yaml        # Correlation thresholds per event type
│   └── scoring.yaml            # Event scoring configuration
├── migrations/                 # SQL migration files
├── Makefile
└── go.mod

Running

# Build the binary
make build

# Build and run
make run

# Run tests
make test

# Run unit tests only (skip integration)
make test-short

On this page