Event Processor
Go-based deterministic event ingestion service with pluggable source adapters, PostGIS spatial correlation, hash-based deduplication, and atomic upserts.
Overview
The tg-event-processor is a Go service that handles all disaster event ingestion into TerraGuard. It replaces a previous Lambda+SQS+DynamoDB architecture with a single deterministic binary that polls authoritative data sources, normalizes events into a common format, deduplicates them, correlates them with existing records using PostGIS spatial queries, and performs atomic upserts into PostgreSQL.
The service runs on port 5606 and compiles to a ~16MB static binary.
Architecture
Key Features
Adapter Pattern
Every data source implements the SourceAdapter interface:
type SourceAdapter interface {
Name() string
Poll(ctx context.Context) ([]model.RawEvent, error)
Normalize(raw model.RawEvent) (*model.NormalizedEvent, error)
ResolveIDs(raw model.RawEvent) []string
Config() AdapterConfig
}Adding a new source requires implementing this interface and registering the adapter in cmd/server/main.go. Zero pipeline code changes needed.
Deterministic Pipeline
Every event passes through the same six-stage pipeline:
-
ID Resolution -- Resolves all known identifiers for an event. USGS earthquakes can have multiple IDs from different seismic networks (e.g.,
us7000abc,nn00001). Theevent_id_aliasestable tracks these relationships. -
Hash Dedup -- Computes an MD5 hash of the raw event JSON. If the hash matches an existing ingestion record, the event is skipped (no changes since last poll).
-
Staleness Check -- Compares the source's
lastUpdatedtimestamp against the stored value. Skips events that have not been updated since last ingestion. -
PostGIS Correlation -- Uses spatial and temporal queries to find matching
DisasterEventrecords. Thresholds are configurable per event type inconfig/correlation.yaml. -
Atomic Upsert -- Within a single database transaction, either creates a new
DisasterEventor updates an existing one, records theEventIngestion, and stores ID aliases. -
Webhook Notification -- POSTs to the Backend API (
/api/v1/internal/event-processed) to trigger the enrichment pipeline via Inngest.
YAML-Configurable Correlation
Correlation thresholds are defined in config/correlation.yaml with no code changes required:
| Event Type | Spatial Radius | Temporal Window | Match by Name |
|---|---|---|---|
| Earthquake | 25 km | 6 hours | No |
| Tropical Cyclone | 150 km | 12 hours | Yes |
| Flood | 50 km | 24 hours | No |
| Drought | 100 km | 48 hours | No |
| Wildfire | 30 km | 24 hours | No |
| Tsunami | 100 km | 12 hours | No |
| Volcano | 15 km | 48 hours | Yes |
USGS ID Aliasing
USGS earthquakes present a unique challenge: the same physical earthquake can have different IDs from different seismic networks. The USGS ids property contains a comma-separated list like ",us7000abc,nn00001,ci12345,". The adapter's ResolveIDs() method parses all aliases, and the pipeline stores them in the event_id_aliases table so that future updates from any network ID are correctly correlated to the same disaster event.
Testing Endpoints
The service exposes /test routes for manual event injection and data file processing, useful for replaying historical events or testing adapter changes without waiting for a poll cycle. Adapter names in test endpoint paths are case-insensitive (e.g., gdacs, GDACS, and Gdacs all work).
Available test endpoints:
| Method | Path | Description |
|---|---|---|
GET | /test/files | List available data files for replay |
POST | /test/ingest | Ingest events from saved data files |
POST | /test/ingest/raw | Ingest raw event JSON directly |
POST | /test/ingest/file | Ingest events from a specific file |
POST | /test/ingest/batch | Batch ingest multiple events |
POST | /test/save-poll/{adapter} | Save poll results to disk for replay |
Configuration
| Variable | Description | Default |
|---|---|---|
PORT | HTTP server port | 5606 |
LOG_LEVEL | Logging level (debug, info, warn, error) | info |
DATABASE_URL | PostgreSQL connection string | required |
DB_POOL_SIZE | Connection pool size | 25 |
BACKEND_WEBHOOK_URL | Backend API webhook endpoint | http://localhost:5601/api/v1/internal/event-processed |
WEBHOOK_API_KEY | API key for webhook authentication | optional |
WEBHOOK_TIMEOUT_SECONDS | Webhook request timeout | 10 |
WEBHOOK_RETRIES | Number of webhook retry attempts | 3 |
GDACS_ENABLED | Enable GDACS adapter | true |
USGS_ENABLED | Enable USGS adapter | true |
NHC_ENABLED | Enable NHC adapter | true |
GDACS_POLL_INTERVAL | GDACS polling interval | 5m |
USGS_POLL_INTERVAL | USGS polling interval | 2m |
NHC_POLL_INTERVAL | NHC polling interval | 5m |
GEOPOP_API_URL | GeoPop API URL for population enrichment | http://localhost:5605/api/v1 |
CORRELATION_CONFIG_PATH | Path to correlation thresholds YAML | config/correlation.yaml |
SCORING_CONFIG_PATH | Path to scoring configuration YAML | config/scoring.yaml |
API Endpoints
| Method | Path | Auth | Description |
|---|---|---|---|
GET | /health | No | Service health with DB and adapter status |
GET | /adapters | Yes | Status of all registered source adapters |
GET | /metrics | Yes | Pipeline processing metrics (processed, skipped, errors) |
POST | /poll/{adapter} | Yes | Trigger an immediate poll for a specific adapter |
GET | /test/files | Yes | List available data files for replay |
POST | /test/ingest | Yes | Ingest events from saved data files |
POST | /test/ingest/raw | Yes | Ingest raw event JSON directly |
POST | /test/ingest/file | Yes | Ingest events from a specific file |
POST | /test/ingest/batch | Yes | Batch ingest multiple events |
POST | /test/save-poll/{adapter} | Yes | Save poll results to disk for replay |
Directory Structure
tg-event-processor/
├── cmd/
│ └── server/
│ └── main.go # Entry point: config, DB, registry, scheduler, HTTP
├── internal/
│ ├── adapter/
│ │ ├── adapter.go # SourceAdapter interface + helpers
│ │ ├── registry.go # Adapter registry (register, get, list)
│ │ ├── gdacs.go # GDACS GeoJSON adapter
│ │ ├── usgs.go # USGS Earthquake adapter
│ │ └── nhc.go # NHC ActiveStorms adapter
│ ├── api/
│ │ ├── handler.go # HTTP handlers (health, adapters, metrics, poll)
│ │ ├── testing.go # Test/simulation endpoints
│ │ └── middleware.go # Auth, CORS, logging, recovery
│ ├── config/
│ │ └── config.go # Environment configuration loading
│ ├── db/
│ │ └── db.go # PostgreSQL connection pool + migrations
│ ├── model/
│ │ ├── event.go # RawEvent, NormalizedEvent, enums
│ │ └── result.go # ProcessResult
│ ├── pipeline/
│ │ ├── pipeline.go # Core pipeline orchestration
│ │ ├── alias.go # ID alias resolution
│ │ ├── dedup.go # Hash-based deduplication
│ │ ├── staleness.go # Staleness check
│ │ ├── correlation.go # PostGIS spatial/temporal matching
│ │ ├── upsert.go # Atomic upsert logic
│ │ └── webhook.go # Backend webhook notifier
│ └── scheduler/
│ └── scheduler.go # Per-adapter poll scheduling
├── config/
│ ├── correlation.yaml # Correlation thresholds per event type
│ └── scoring.yaml # Event scoring configuration
├── migrations/ # SQL migration files
├── Makefile
└── go.modRunning
# Build the binary
make build
# Build and run
make run
# Run tests
make test
# Run unit tests only (skip integration)
make test-shortBackend API
Python/FastAPI core API powering TerraGuard with 40+ endpoints, 6 AI agents, 5 MCP servers, and Inngest-driven async workflows for disaster event management.
Search Layer
API-based web + news search using Serper.dev as the primary provider and Brave Search as the error fallback, called directly from the Backend API.