TerraGuard

Adding a New Data Source

Step-by-step guide to adding a new disaster data source to the TerraGuard Event Processor using the pluggable adapter pattern.

Overview

The Event Processor is designed with a pluggable adapter pattern that makes adding new data sources straightforward. Each data source implements the SourceAdapter interface, and the pipeline handles everything else -- deduplication, correlation, persistence, and webhook notification.

Adding a new source requires zero changes to the pipeline code. You only need to:

  1. Create an adapter file
  2. Register it in main.go
  3. Add correlation thresholds
  4. Deploy

The SourceAdapter Interface

Every data source must implement this interface defined in internal/adapter/adapter.go:

type SourceAdapter interface {
    // Name returns the adapter identifier (e.g., "gdacs", "usgs", "nhc").
    Name() string

    // Poll fetches the latest events from the source API.
    Poll(ctx context.Context) ([]model.RawEvent, error)

    // Normalize converts a raw event into the common NormalizedEvent format.
    Normalize(raw model.RawEvent) (*model.NormalizedEvent, error)

    // ResolveIDs returns all known IDs for this event.
    // For sources with ID aliasing: return all aliases.
    // For simple sources: return a single-element slice.
    ResolveIDs(raw model.RawEvent) []string

    // Config returns the adapter's configuration.
    Config() AdapterConfig
}

The AdapterConfig struct controls polling behavior:

type AdapterConfig struct {
    PollInterval  time.Duration
    BatchSize     int
    Enabled       bool
    RetryAttempts int
    RetryBackoff  time.Duration
    SourceURL     string
    FetchTimeout  time.Duration
}

The NormalizedEvent Format

All adapters must produce NormalizedEvent structs. This is the common format that the pipeline processes:

type NormalizedEvent struct {
    Source             string              // Source identifier (e.g., "COPERNICUS")
    SourceEventID      string             // Unique ID from the source
    EventHash          string             // MD5 hash of raw JSON (for dedup)
    EventType          string             // EARTHQUAKE, TROPICAL_CYCLONE, FLOOD, etc.
    Title              string             // Human-readable title
    Description        string             // Brief description
    AlertLevel         string             // RED, ORANGE, or GREEN
    EventTime          time.Time          // When the event occurred
    Location           GeoPoint           // Lat/Lon coordinates
    ImpactGeometry     json.RawMessage    // Optional polygon geometry
    Countries          []string           // Affected countries
    Regions            []string           // Affected regions
    Measurements       []Measurement      // Typed measurements
    SourceURLs         map[string]string  // Links to source pages
    AdditionalData     map[string]interface{} // Source-specific extras
    RawData            json.RawMessage    // Original raw JSON
    ResolvedIDs        []string           // All known IDs for this event
    SourceReportedAt   time.Time          // When the source first reported
    SourceLastUpdated  time.Time          // Last modification by the source
    PrimaryMeasurement *Measurement       // Most important measurement
}

NormalizedEvent Fields Reference

FieldRequiredDescription
SourceYesYour source identifier (uppercase, e.g., "EMSC")
SourceEventIDYesUnique event ID from source (e.g., "emsc:20260328_001")
EventHashYesMD5 of raw JSON -- use ComputeEventHash()
EventTypeYesOne of the predefined constants (see below)
TitleYesHuman-readable event title
AlertLevelYesRED, ORANGE, or GREEN
EventTimeYesWhen the event occurred
LocationYesLat/lon coordinates as a GeoPoint
MeasurementsYesAt least one measurement, mark primary with IsPrimary: true
RawDataYesOriginal JSON payload for audit trail
ResolvedIDsYesAll known IDs for this event
SourceReportedAtYesWhen the source first reported this event
SourceLastUpdatedYesLast modification timestamp from the source
ImpactGeometryNoOptional polygon/multipolygon for impact area
CountriesNoAffected country names
SourceURLsNoMap of link labels to URLs
AdditionalDataNoSource-specific metadata

Event Type Constants

Use the predefined constants in internal/model/event.go:

ConstantValue
EventTypeEarthquake"EARTHQUAKE"
EventTypeTropicalCyclone"TROPICAL_CYCLONE"
EventTypeFlood"FLOOD"
EventTypeDrought"DROUGHT"
EventTypeWildfire"WILDFIRE"
EventTypeTsunami"TSUNAMI"
EventTypeVolcano"VOLCANO"
EventTypeOther"OTHER"

Step-by-Step Guide

Step 1: Create the Adapter File

Create internal/adapter/your_source.go. Here is a complete skeleton:

package adapter

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "time"

    "github.com/TerraGuard/tg-event-processor/internal/model"
)

// YourSourceAdapter fetches and normalizes events from Your Source API.
type YourSourceAdapter struct {
    cfg    AdapterConfig
    client *http.Client
}

// NewYourSourceAdapter creates a new adapter instance.
func NewYourSourceAdapter(sourceURL string, interval time.Duration,
    fetchTimeout time.Duration, enabled bool) *YourSourceAdapter {
    return &YourSourceAdapter{
        cfg: AdapterConfig{
            SourceURL:     sourceURL,
            PollInterval:  interval,
            FetchTimeout:  fetchTimeout,
            Enabled:       enabled,
            RetryAttempts: 3,
            RetryBackoff:  5 * time.Second,
            BatchSize:     100,
        },
        client: &http.Client{Timeout: fetchTimeout},
    }
}

func (a *YourSourceAdapter) Name() string          { return "YOUR_SOURCE" }
func (a *YourSourceAdapter) Config() AdapterConfig  { return a.cfg }

// Poll fetches the latest events from the source API.
func (a *YourSourceAdapter) Poll(ctx context.Context) ([]model.RawEvent, error) {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, a.cfg.SourceURL, nil)
    if err != nil {
        return nil, fmt.Errorf("your_source: creating request: %w", err)
    }

    resp, err := a.client.Do(req)
    if err != nil {
        return nil, fmt.Errorf("your_source: fetching events: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("your_source: unexpected status %d", resp.StatusCode)
    }

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return nil, fmt.Errorf("your_source: reading body: %w", err)
    }

    // Parse the API response into your source-specific format.
    // Then convert each item to a model.RawEvent:
    var events []model.RawEvent

    // ... parse body, iterate over items ...
    // For each item:
    //   events = append(events, model.RawEvent{
    //       ID:         "your_source:" + itemID,
    //       Source:     "YOUR_SOURCE",
    //       Properties: propsMap,
    //       RawJSON:    rawBytes,
    //   })

    slog.Info("your_source: poll complete", "count", len(events))
    return events, nil
}

// Normalize converts a raw event into the common NormalizedEvent format.
func (a *YourSourceAdapter) Normalize(raw model.RawEvent) (*model.NormalizedEvent, error) {
    props := raw.Properties
    if props == nil {
        return nil, fmt.Errorf("your_source: nil properties for %s", raw.ID)
    }

    // Extract fields using the safe helpers:
    //   SafeString(props, "key", "fallback")
    //   SafeFloat(props, "key", 0)
    //   SafeInt(props, "key", 0)

    return &model.NormalizedEvent{
        Source:        "YOUR_SOURCE",
        SourceEventID: raw.ID,
        EventHash:     ComputeEventHash(raw.RawJSON),
        EventType:     model.EventTypeEarthquake, // pick the right type
        Title:         "Event title",
        AlertLevel:    model.AlertLevelGreen,
        EventTime:     time.Now().UTC(),
        Location:      model.GeoPoint{Lat: 0, Lon: 0},
        Measurements:  []model.Measurement{
            {Type: "magnitude", Value: 5.0, Unit: "M", IsPrimary: true},
        },
        RawData:            raw.RawJSON,
        ResolvedIDs:        a.ResolveIDs(raw),
        SourceReportedAt:   time.Now().UTC(),
        SourceLastUpdated:  time.Now().UTC(),
    }, nil
}

// ResolveIDs returns all known IDs for this event.
func (a *YourSourceAdapter) ResolveIDs(raw model.RawEvent) []string {
    return []string{raw.ID}
}

Step 2: Register in main.go

Add your adapter to the registry in cmd/server/main.go:

// Register adapters.
registry := adapter.NewRegistry()
registry.Register(adapter.NewGDACSAdapter(...))
registry.Register(adapter.NewUSGSAdapter(...))
registry.Register(adapter.NewNHCAdapter(...))

// Add your new source:
registry.Register(adapter.NewYourSourceAdapter(
    cfg.YourSourceURL,
    cfg.YourSourceInterval,
    cfg.FetchTimeout,
    cfg.YourSourceEnabled,
))

Add the configuration fields to internal/config/config.go:

YourSourceEnabled  bool          `envconfig:"YOUR_SOURCE_ENABLED" default:"false"`
YourSourceURL      string        `envconfig:"YOUR_SOURCE_API_URL" default:"https://..."`
YourSourceInterval time.Duration `envconfig:"YOUR_SOURCE_POLL_INTERVAL" default:"5m"`

Step 3: Add Correlation Thresholds

Edit config/correlation.yaml to define how your source's events should be matched against existing disaster events:

event_types:
  # ... existing types ...

  # Add your event type (if new) with appropriate thresholds:
  YOUR_TYPE:
    spatial_radius_km: 50    # How close (km) events must be spatially
    temporal_window_hours: 12 # How close (hours) events must be temporally
    magnitude_tolerance: null # Set for earthquake-like events
    match_by_name: false      # Set true for named events (storms, volcanoes)

If your source reports event types that already exist in the config (e.g., earthquakes), the existing thresholds will be used automatically -- no entry needed.

Step 4: Add Environment Variables

Add configuration to .env.example:

# Your Source
YOUR_SOURCE_ENABLED=true
YOUR_SOURCE_POLL_INTERVAL=5m
YOUR_SOURCE_API_URL=https://api.yoursource.org/events

Step 5: Deploy

Build and deploy. No pipeline changes, no database migrations, no downstream service changes.

make build
make run

Testing Your Adapter

Using Test Endpoints

The Event Processor provides /test endpoints for verifying adapter behavior without waiting for poll cycles:

Trigger a manual poll:

curl -X POST http://localhost:5606/poll/YOUR_SOURCE \
  -H "X-API-Key: your-api-key"

Save poll results for inspection:

curl -X POST http://localhost:5606/test/save-poll/YOUR_SOURCE \
  -H "X-API-Key: your-api-key"

Writing Unit Tests

Create internal/adapter/your_source_test.go:

func TestYourSourceNormalize(t *testing.T) {
    adapter := NewYourSourceAdapter(
        "https://api.example.com", 5*time.Minute, 15*time.Second, true,
    )

    raw := model.RawEvent{
        ID:     "your_source:123",
        Source: "YOUR_SOURCE",
        Properties: map[string]interface{}{
            "magnitude": 5.5,
            "place":     "Test Location",
        },
        RawJSON: json.RawMessage(`{"test": true}`),
    }

    event, err := adapter.Normalize(raw)
    require.NoError(t, err)
    assert.Equal(t, "YOUR_SOURCE", event.Source)
    assert.Equal(t, "your_source:123", event.SourceEventID)
    assert.NotEmpty(t, event.EventHash)
}

Run tests with:

make test

Helper Functions

The adapter.go file provides utility functions available to all adapters:

FunctionDescription
ComputeEventHash(rawJSON)MD5 hash of raw JSON for deduplication
ParseISO8601(s)Parses multiple ISO 8601 timestamp formats
ParseUnixMS(ms)Converts Unix milliseconds to time.Time
SafeString(m, key, fallback)Safe string extraction from property maps
SafeFloat(m, key, fallback)Safe float64 extraction
SafeInt(m, key, fallback)Safe int extraction

What Happens After Ingestion

Once your adapter produces NormalizedEvent structs, the pipeline automatically handles the rest:

Loading diagram...

The Backend API then triggers AI enrichment -- news search, content crawling, knowledge base indexing, and notification evaluation -- all without any changes to downstream services.

Potential Data Sources

Here are some data sources that could be added using this pattern:

SourceEvent TypesAPI Format
EMSCEarthquakes (Europe-focused)JSON
EONETNASA Earth events (wildfires, storms)GeoJSON
Copernicus EMSFloods, fires, earthquakes (EU)GeoJSON
VolcanoDiscoveryVolcanic activityRSS/XML
ReliefWebHumanitarian crisesJSON API

On this page