Skip to content

Data Pipeline

The grant data pipeline is responsible for fetching, normalizing, deduplicating, and embedding grant data from multiple international sources.


Pipeline Flow

flowchart TB
    subgraph Sources["Data Sources"]
        S1[CORDIS<br/>EU Research]
        S2[EU Portal<br/>Funding & Tenders]
        S3[Cohesion<br/>ERDF/ESF]
        S4[Innovate UK<br/>UKRI]
        S5[GCF<br/>Green Climate Fund]
        S6[IATI<br/>Development Finance]
        S7[World Bank<br/>Projects]
    end

    subgraph Pipeline["Processing Pipeline"]
        F[Fetch<br/>Async HTTP + Rate Limiting]
        N[Normalize<br/>GrantNormalizer]
        D[Deduplicate<br/>Content Hashing]
        E[Embed<br/>all-mpnet-base-v2]
        I[Index<br/>Meilisearch]
    end

    subgraph Storage["Storage"]
        DB[(PostgreSQL<br/>+ pgvector)]
        MS[(Meilisearch<br/>Search Index)]
    end

    Sources --> F
    F --> N
    N --> D
    D --> DB
    DB --> E
    E --> DB
    DB --> I
    I --> MS

Data Sources

Source Inventory

Source Client Class API Type Base URL Rate Limit Auth
CORDIS CordisScraper REST/CSV cordis.europa.eu/api 1 req/sec Optional API key
EU Portal EUPortalClient REST api.tech.ec.europa.eu 2 req/sec SEDIA key (public)
Cohesion CohesionClient Socrata SODA cohesiondata.ec.europa.eu 2 req/sec Optional app token
Innovate UK InnovateUKClient REST (JSON v7) gtr.ukri.org/api 2 req/sec None
GCF GCFClient REST api.gcfund.org/v1 2 req/sec Optional API key
IATI IATIClient REST (Solr) api.iatistandard.org 2 req/sec Optional subscription key
World Bank WorldBankClient REST api.worldbank.org/v2 2 req/sec None

All clients are located in backend/app/services/scrapers/.

CORDIS (EU Research Funding)

The Community Research and Development Information Service provides access to EU-funded research projects.

from backend.app.services.scrapers import CordisScraper, CordisScraperConfig

config = CordisScraperConfig(
    base_url="https://cordis.europa.eu/api",
    page_size=100,
    requests_per_second=1.0,
    max_retries=3,
)

async with CordisScraper(config) as scraper:
    projects = await scraper.fetch_all_projects(
        programme="HORIZON.2.5",
        framework="HORIZON",
        max_pages=10,
    )

EU Funding & Tenders Portal

The Single Electronic Data Interchange Area (SEDIA) for EU funding opportunities. Uses multipart/form-data with JSON blobs.

from backend.app.services.scrapers import EUPortalClient, EUPortalConfig

config = EUPortalConfig(api_key="SEDIA", page_size=100)

async with EUPortalClient(config) as client:
    calls = await client.search_all_calls(
        status="OPEN",
        programme="HORIZON",
    )

Cohesion Open Data Portal (ERDF/ESF)

European Regional Development Fund and European Social Fund data via Socrata SODA API.

from backend.app.services.scrapers import CohesionClient, CohesionConfig

async with CohesionClient(CohesionConfig()) as client:
    projects = await client.fetch_projects(country="DE", fund="ERDF")
    all_projects = await client.fetch_all_projects(fund="ESF", max_pages=10)

Innovate UK (Gateway to Research)

UK Research and Innovation funding data. Requires a specific Accept header for JSON v7 responses.

from backend.app.services.scrapers import InnovateUKClient, InnovateUKConfig

async with InnovateUKClient(InnovateUKConfig()) as client:
    projects = await client.fetch_projects(
        funder="Innovate UK",
        status="Active",
    )

Green Climate Fund (GCF)

Climate finance projects with country and thematic filters.

from backend.app.services.scrapers import GCFClient, GCFConfig

async with GCFClient(GCFConfig()) as client:
    projects = await client.list_projects(
        country_code="NG",
        theme="Mitigation",
    )

IATI Datastore

International Aid Transparency Initiative data for development and climate finance activities.

from backend.app.services.scrapers import IATIClient, IATIConfig

async with IATIClient(IATIConfig()) as client:
    activities = await client.search_activities(
        country_code="NG",
        sector_code_prefix="230",  # Energy sector
    )

World Bank Projects

World Bank project data with country and sector filters.

from backend.app.services.scrapers import WorldBankClient, WorldBankConfig

async with WorldBankClient(WorldBankConfig()) as client:
    projects = await client.list_projects(country_code="NG")

Scraper Architecture

All scraper clients follow a consistent architectural pattern:

Common Features

  • Async HTTP using httpx.AsyncClient for non-blocking I/O
  • Rate limiting with configurable requests-per-second via time-based throttling
  • Exponential backoff retry logic with jitter for failed requests
  • Async context manager (async with) for proper resource cleanup
  • Dataclass-based configuration and response models
  • Structured logging via structlog

Rate Limiting Implementation

async def _rate_limit(self):
    """Enforce rate limiting between requests."""
    now = asyncio.get_event_loop().time()
    elapsed = now - self._last_request_time
    if elapsed < self._request_interval:
        await asyncio.sleep(self._request_interval - elapsed)
    self._last_request_time = asyncio.get_event_loop().time()

Retry Logic

for attempt in range(self.config.max_retries):
    try:
        await self._rate_limit()
        response = await self._client.get(url, params=params)
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        if e.response.status_code in (429, 500, 502, 503, 504):
            delay = min(
                self.config.backoff_base * (2 ** attempt) + random.uniform(0, 1),
                self.config.backoff_max,
            )
            await asyncio.sleep(delay)
        else:
            raise

Normalization

Source: backend/app/services/grant_normalizer.py

The GrantNormalizer converts raw data from each source into a unified NormalizedGrant schema that maps directly to the database Grant model.

NormalizedGrant Schema

@dataclass
class NormalizedGrant:
    external_id: str
    source: str          # cordis, eu_portal, cohesion, innovate_uk, gcf, iati, world_bank
    title: str
    description: str | None = None
    funding_amount_min: float | None = None
    funding_amount_max: float | None = None
    currency: str = "EUR"
    deadline: datetime | None = None
    countries: list[str] = field(default_factory=list)
    nace_codes: list[str] = field(default_factory=list)
    company_sizes: list[str] = field(default_factory=list)
    content_hash: str | None = None
    status: str = "active"
    # ... additional fields

Source-Specific Normalization

Each source has a dedicated normalization method:

  • normalize_cordis() -- Maps CORDIS project fields to NormalizedGrant
  • normalize_eu_portal() -- Handles SEDIA multipart response format
  • normalize_cohesion() -- Converts Socrata SODA records
  • normalize_innovate_uk() -- Maps GtR JSON v7 fields
  • normalize_gcf() -- Converts GCF project data
  • normalize_iati() -- Maps IATI activity fields
  • normalize_world_bank() -- Converts World Bank project data

Carbon Classification

During normalization, grants are automatically classified for carbon relevance:

  • is_carbon_focused is set based on keyword analysis of title and description
  • carbon_categories are assigned from the 14 recognized categories
  • EU Taxonomy objectives are extracted where available

Deduplication

Deduplication uses content hashing (MD5 of title + description + source) to identify duplicate grants:

def compute_content_hash(grant: NormalizedGrant) -> str:
    content = f"{grant.title}|{grant.description}|{grant.source}"
    return hashlib.md5(content.encode("utf-8")).hexdigest()

The database-level deduplication task (deduplicate_grants_task) identifies groups of grants sharing the same content_hash, keeps the most recently synced record as canonical, and marks duplicates as inactive.


Embedding Generation

Source: backend/app/services/embedding_service.py

The EmbeddingService generates 768-dimensional vectors using the all-mpnet-base-v2 sentence transformer model.

Configuration

@dataclass
class EmbeddingConfig:
    model_name: str = "all-mpnet-base-v2"
    embedding_dimension: int = 768
    batch_size: int = 32
    max_seq_length: int = 384
    device: str | None = None  # Auto-detect
    normalize_embeddings: bool = True

Features

  • Lazy model loading -- model is only loaded on first use
  • In-memory caching -- embeddings are cached by MD5 hash of input text
  • Batch processing -- multiple texts are processed together for GPU efficiency
  • Grant-specific -- generate_grant_embeddings() produces both title and description embeddings
  • Async wrapper -- generate_grant_embeddings_async() runs in a thread pool for non-blocking use
  • Cosine similarity -- compute_similarity() for comparing embedding vectors

Batch Processing

service = EmbeddingService(config)

# Single text
result = service.generate_embedding("Solar panel installation grant")

# Batch processing (efficient for multiple grants)
results = service.generate_batch_embeddings([
    "Energy efficiency grant",
    "Renewable energy funding",
    "Clean technology innovation",
])

# Grant-specific (title + description embeddings)
grant_result = service.generate_grant_embeddings(normalized_grant)

Celery Tasks

Pipeline tasks are defined in backend/app/tasks/grant_pipeline_tasks.py and routed to the sync queue:

Task Name Description
sync_cordis_grants grant_pipeline.sync_cordis_grants Fetch from CORDIS
sync_eu_portal_grants grant_pipeline.sync_eu_portal_grants Fetch from EU Portal
sync_iati_grants grant_pipeline.sync_iati_grants Fetch from IATI
sync_gcf_projects grant_pipeline.sync_gcf_projects Fetch from GCF
sync_world_bank_projects grant_pipeline.sync_world_bank_projects Fetch from World Bank
sync_all_grants grant_pipeline.sync_all_grants Orchestrate all sources
generate_grant_embeddings_task grant_pipeline.generate_embeddings Generate missing embeddings
deduplicate_grants_task grant_pipeline.deduplicate_grants Remove duplicates

All sync tasks use autoretry_for=(Exception,) with exponential backoff (max 600 seconds) and 3 retries.


Scheduling

Grant synchronization is automated via Celery Beat:

Schedule Task Time (UTC)
Daily sync daily_grant_sync 2:00 AM
Weekly full sync weekly_full_sync Sunday 3:00 AM
Periodic sync sync_all_grant_sources Every 6 hours
Expired cleanup cleanup_expired_grants Daily 4:00 AM

See Celery for full schedule details.