Adding a New Data Source
Step-by-step guide to adding a new disaster data source to the TerraGuard Event Processor using the pluggable adapter pattern.
Overview
The Event Processor is designed with a pluggable adapter pattern that makes adding new data sources straightforward. Each data source implements the SourceAdapter interface, and the pipeline handles everything else -- deduplication, correlation, persistence, and webhook notification.
Adding a new source requires zero changes to the pipeline code. You only need to:
- Create an adapter file
- Register it in
main.go - Add correlation thresholds
- Deploy
The SourceAdapter Interface
Every data source must implement this interface defined in internal/adapter/adapter.go:
type SourceAdapter interface {
// Name returns the adapter identifier (e.g., "gdacs", "usgs", "nhc").
Name() string
// Poll fetches the latest events from the source API.
Poll(ctx context.Context) ([]model.RawEvent, error)
// Normalize converts a raw event into the common NormalizedEvent format.
Normalize(raw model.RawEvent) (*model.NormalizedEvent, error)
// ResolveIDs returns all known IDs for this event.
// For sources with ID aliasing: return all aliases.
// For simple sources: return a single-element slice.
ResolveIDs(raw model.RawEvent) []string
// Config returns the adapter's configuration.
Config() AdapterConfig
}The AdapterConfig struct controls polling behavior:
type AdapterConfig struct {
PollInterval time.Duration
BatchSize int
Enabled bool
RetryAttempts int
RetryBackoff time.Duration
SourceURL string
FetchTimeout time.Duration
}The NormalizedEvent Format
All adapters must produce NormalizedEvent structs. This is the common format that the pipeline processes:
type NormalizedEvent struct {
Source string // Source identifier (e.g., "COPERNICUS")
SourceEventID string // Unique ID from the source
EventHash string // MD5 hash of raw JSON (for dedup)
EventType string // EARTHQUAKE, TROPICAL_CYCLONE, FLOOD, etc.
Title string // Human-readable title
Description string // Brief description
AlertLevel string // RED, ORANGE, or GREEN
EventTime time.Time // When the event occurred
Location GeoPoint // Lat/Lon coordinates
ImpactGeometry json.RawMessage // Optional polygon geometry
Countries []string // Affected countries
Regions []string // Affected regions
Measurements []Measurement // Typed measurements
SourceURLs map[string]string // Links to source pages
AdditionalData map[string]interface{} // Source-specific extras
RawData json.RawMessage // Original raw JSON
ResolvedIDs []string // All known IDs for this event
SourceReportedAt time.Time // When the source first reported
SourceLastUpdated time.Time // Last modification by the source
PrimaryMeasurement *Measurement // Most important measurement
}NormalizedEvent Fields Reference
| Field | Required | Description |
|---|---|---|
Source | Yes | Your source identifier (uppercase, e.g., "EMSC") |
SourceEventID | Yes | Unique event ID from source (e.g., "emsc:20260328_001") |
EventHash | Yes | MD5 of raw JSON -- use ComputeEventHash() |
EventType | Yes | One of the predefined constants (see below) |
Title | Yes | Human-readable event title |
AlertLevel | Yes | RED, ORANGE, or GREEN |
EventTime | Yes | When the event occurred |
Location | Yes | Lat/lon coordinates as a GeoPoint |
Measurements | Yes | At least one measurement, mark primary with IsPrimary: true |
RawData | Yes | Original JSON payload for audit trail |
ResolvedIDs | Yes | All known IDs for this event |
SourceReportedAt | Yes | When the source first reported this event |
SourceLastUpdated | Yes | Last modification timestamp from the source |
ImpactGeometry | No | Optional polygon/multipolygon for impact area |
Countries | No | Affected country names |
SourceURLs | No | Map of link labels to URLs |
AdditionalData | No | Source-specific metadata |
Event Type Constants
Use the predefined constants in internal/model/event.go:
| Constant | Value |
|---|---|
EventTypeEarthquake | "EARTHQUAKE" |
EventTypeTropicalCyclone | "TROPICAL_CYCLONE" |
EventTypeFlood | "FLOOD" |
EventTypeDrought | "DROUGHT" |
EventTypeWildfire | "WILDFIRE" |
EventTypeTsunami | "TSUNAMI" |
EventTypeVolcano | "VOLCANO" |
EventTypeOther | "OTHER" |
Step-by-Step Guide
Step 1: Create the Adapter File
Create internal/adapter/your_source.go. Here is a complete skeleton:
package adapter
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"time"
"github.com/TerraGuard/tg-event-processor/internal/model"
)
// YourSourceAdapter fetches and normalizes events from Your Source API.
type YourSourceAdapter struct {
cfg AdapterConfig
client *http.Client
}
// NewYourSourceAdapter creates a new adapter instance.
func NewYourSourceAdapter(sourceURL string, interval time.Duration,
fetchTimeout time.Duration, enabled bool) *YourSourceAdapter {
return &YourSourceAdapter{
cfg: AdapterConfig{
SourceURL: sourceURL,
PollInterval: interval,
FetchTimeout: fetchTimeout,
Enabled: enabled,
RetryAttempts: 3,
RetryBackoff: 5 * time.Second,
BatchSize: 100,
},
client: &http.Client{Timeout: fetchTimeout},
}
}
func (a *YourSourceAdapter) Name() string { return "YOUR_SOURCE" }
func (a *YourSourceAdapter) Config() AdapterConfig { return a.cfg }
// Poll fetches the latest events from the source API.
func (a *YourSourceAdapter) Poll(ctx context.Context) ([]model.RawEvent, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, a.cfg.SourceURL, nil)
if err != nil {
return nil, fmt.Errorf("your_source: creating request: %w", err)
}
resp, err := a.client.Do(req)
if err != nil {
return nil, fmt.Errorf("your_source: fetching events: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("your_source: unexpected status %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("your_source: reading body: %w", err)
}
// Parse the API response into your source-specific format.
// Then convert each item to a model.RawEvent:
var events []model.RawEvent
// ... parse body, iterate over items ...
// For each item:
// events = append(events, model.RawEvent{
// ID: "your_source:" + itemID,
// Source: "YOUR_SOURCE",
// Properties: propsMap,
// RawJSON: rawBytes,
// })
slog.Info("your_source: poll complete", "count", len(events))
return events, nil
}
// Normalize converts a raw event into the common NormalizedEvent format.
func (a *YourSourceAdapter) Normalize(raw model.RawEvent) (*model.NormalizedEvent, error) {
props := raw.Properties
if props == nil {
return nil, fmt.Errorf("your_source: nil properties for %s", raw.ID)
}
// Extract fields using the safe helpers:
// SafeString(props, "key", "fallback")
// SafeFloat(props, "key", 0)
// SafeInt(props, "key", 0)
return &model.NormalizedEvent{
Source: "YOUR_SOURCE",
SourceEventID: raw.ID,
EventHash: ComputeEventHash(raw.RawJSON),
EventType: model.EventTypeEarthquake, // pick the right type
Title: "Event title",
AlertLevel: model.AlertLevelGreen,
EventTime: time.Now().UTC(),
Location: model.GeoPoint{Lat: 0, Lon: 0},
Measurements: []model.Measurement{
{Type: "magnitude", Value: 5.0, Unit: "M", IsPrimary: true},
},
RawData: raw.RawJSON,
ResolvedIDs: a.ResolveIDs(raw),
SourceReportedAt: time.Now().UTC(),
SourceLastUpdated: time.Now().UTC(),
}, nil
}
// ResolveIDs returns all known IDs for this event.
func (a *YourSourceAdapter) ResolveIDs(raw model.RawEvent) []string {
return []string{raw.ID}
}Step 2: Register in main.go
Add your adapter to the registry in cmd/server/main.go:
// Register adapters.
registry := adapter.NewRegistry()
registry.Register(adapter.NewGDACSAdapter(...))
registry.Register(adapter.NewUSGSAdapter(...))
registry.Register(adapter.NewNHCAdapter(...))
// Add your new source:
registry.Register(adapter.NewYourSourceAdapter(
cfg.YourSourceURL,
cfg.YourSourceInterval,
cfg.FetchTimeout,
cfg.YourSourceEnabled,
))Add the configuration fields to internal/config/config.go:
YourSourceEnabled bool `envconfig:"YOUR_SOURCE_ENABLED" default:"false"`
YourSourceURL string `envconfig:"YOUR_SOURCE_API_URL" default:"https://..."`
YourSourceInterval time.Duration `envconfig:"YOUR_SOURCE_POLL_INTERVAL" default:"5m"`Step 3: Add Correlation Thresholds
Edit config/correlation.yaml to define how your source's events should be matched against existing disaster events:
event_types:
# ... existing types ...
# Add your event type (if new) with appropriate thresholds:
YOUR_TYPE:
spatial_radius_km: 50 # How close (km) events must be spatially
temporal_window_hours: 12 # How close (hours) events must be temporally
magnitude_tolerance: null # Set for earthquake-like events
match_by_name: false # Set true for named events (storms, volcanoes)If your source reports event types that already exist in the config (e.g., earthquakes), the existing thresholds will be used automatically -- no entry needed.
Step 4: Add Environment Variables
Add configuration to .env.example:
# Your Source
YOUR_SOURCE_ENABLED=true
YOUR_SOURCE_POLL_INTERVAL=5m
YOUR_SOURCE_API_URL=https://api.yoursource.org/eventsStep 5: Deploy
Build and deploy. No pipeline changes, no database migrations, no downstream service changes.
make build
make runTesting Your Adapter
Using Test Endpoints
The Event Processor provides /test endpoints for verifying adapter behavior without waiting for poll cycles:
Trigger a manual poll:
curl -X POST http://localhost:5606/poll/YOUR_SOURCE \
-H "X-API-Key: your-api-key"Save poll results for inspection:
curl -X POST http://localhost:5606/test/save-poll/YOUR_SOURCE \
-H "X-API-Key: your-api-key"Writing Unit Tests
Create internal/adapter/your_source_test.go:
func TestYourSourceNormalize(t *testing.T) {
adapter := NewYourSourceAdapter(
"https://api.example.com", 5*time.Minute, 15*time.Second, true,
)
raw := model.RawEvent{
ID: "your_source:123",
Source: "YOUR_SOURCE",
Properties: map[string]interface{}{
"magnitude": 5.5,
"place": "Test Location",
},
RawJSON: json.RawMessage(`{"test": true}`),
}
event, err := adapter.Normalize(raw)
require.NoError(t, err)
assert.Equal(t, "YOUR_SOURCE", event.Source)
assert.Equal(t, "your_source:123", event.SourceEventID)
assert.NotEmpty(t, event.EventHash)
}Run tests with:
make testHelper Functions
The adapter.go file provides utility functions available to all adapters:
| Function | Description |
|---|---|
ComputeEventHash(rawJSON) | MD5 hash of raw JSON for deduplication |
ParseISO8601(s) | Parses multiple ISO 8601 timestamp formats |
ParseUnixMS(ms) | Converts Unix milliseconds to time.Time |
SafeString(m, key, fallback) | Safe string extraction from property maps |
SafeFloat(m, key, fallback) | Safe float64 extraction |
SafeInt(m, key, fallback) | Safe int extraction |
What Happens After Ingestion
Once your adapter produces NormalizedEvent structs, the pipeline automatically handles the rest:
The Backend API then triggers AI enrichment -- news search, content crawling, knowledge base indexing, and notification evaluation -- all without any changes to downstream services.
Potential Data Sources
Here are some data sources that could be added using this pattern:
| Source | Event Types | API Format |
|---|---|---|
| EMSC | Earthquakes (Europe-focused) | JSON |
| EONET | NASA Earth events (wildfires, storms) | GeoJSON |
| Copernicus EMS | Floods, fires, earthquakes (EU) | GeoJSON |
| VolcanoDiscovery | Volcanic activity | RSS/XML |
| ReliefWeb | Humanitarian crises | JSON API |
Sending CAP Data
A step-by-step integration guide for external organizations pushing Common Alerting Protocol (CAP v1.2) alerts into TerraGuard — request format, message requirements, responses, retries, and FAQ.
Local Development
Set up the TerraGuard platform for local development, including prerequisites, environment configuration, port assignments, and testing workflows.