Data Pipeline¶
The grant data pipeline is responsible for fetching, normalizing, deduplicating, and embedding grant data from multiple international sources.
Pipeline Flow¶
flowchart TB
subgraph Sources["Data Sources"]
S1[CORDIS<br/>EU Research]
S2[EU Portal<br/>Funding & Tenders]
S3[Cohesion<br/>ERDF/ESF]
S4[Innovate UK<br/>UKRI]
S5[GCF<br/>Green Climate Fund]
S6[IATI<br/>Development Finance]
S7[World Bank<br/>Projects]
end
subgraph Pipeline["Processing Pipeline"]
F[Fetch<br/>Async HTTP + Rate Limiting]
N[Normalize<br/>GrantNormalizer]
D[Deduplicate<br/>Content Hashing]
E[Embed<br/>all-mpnet-base-v2]
I[Index<br/>Meilisearch]
end
subgraph Storage["Storage"]
DB[(PostgreSQL<br/>+ pgvector)]
MS[(Meilisearch<br/>Search Index)]
end
Sources --> F
F --> N
N --> D
D --> DB
DB --> E
E --> DB
DB --> I
I --> MS Data Sources¶
Source Inventory¶
| Source | Client Class | API Type | Base URL | Rate Limit | Auth |
|---|---|---|---|---|---|
| CORDIS | CordisScraper | REST/CSV | cordis.europa.eu/api | 1 req/sec | Optional API key |
| EU Portal | EUPortalClient | REST | api.tech.ec.europa.eu | 2 req/sec | SEDIA key (public) |
| Cohesion | CohesionClient | Socrata SODA | cohesiondata.ec.europa.eu | 2 req/sec | Optional app token |
| Innovate UK | InnovateUKClient | REST (JSON v7) | gtr.ukri.org/api | 2 req/sec | None |
| GCF | GCFClient | REST | api.gcfund.org/v1 | 2 req/sec | Optional API key |
| IATI | IATIClient | REST (Solr) | api.iatistandard.org | 2 req/sec | Optional subscription key |
| World Bank | WorldBankClient | REST | api.worldbank.org/v2 | 2 req/sec | None |
All clients are located in backend/app/services/scrapers/.
CORDIS (EU Research Funding)¶
The Community Research and Development Information Service provides access to EU-funded research projects.
from backend.app.services.scrapers import CordisScraper, CordisScraperConfig
config = CordisScraperConfig(
base_url="https://cordis.europa.eu/api",
page_size=100,
requests_per_second=1.0,
max_retries=3,
)
async with CordisScraper(config) as scraper:
projects = await scraper.fetch_all_projects(
programme="HORIZON.2.5",
framework="HORIZON",
max_pages=10,
)
EU Funding & Tenders Portal¶
The Single Electronic Data Interchange Area (SEDIA) for EU funding opportunities. Uses multipart/form-data with JSON blobs.
from backend.app.services.scrapers import EUPortalClient, EUPortalConfig
config = EUPortalConfig(api_key="SEDIA", page_size=100)
async with EUPortalClient(config) as client:
calls = await client.search_all_calls(
status="OPEN",
programme="HORIZON",
)
Cohesion Open Data Portal (ERDF/ESF)¶
European Regional Development Fund and European Social Fund data via Socrata SODA API.
from backend.app.services.scrapers import CohesionClient, CohesionConfig
async with CohesionClient(CohesionConfig()) as client:
projects = await client.fetch_projects(country="DE", fund="ERDF")
all_projects = await client.fetch_all_projects(fund="ESF", max_pages=10)
Innovate UK (Gateway to Research)¶
UK Research and Innovation funding data. Requires a specific Accept header for JSON v7 responses.
from backend.app.services.scrapers import InnovateUKClient, InnovateUKConfig
async with InnovateUKClient(InnovateUKConfig()) as client:
projects = await client.fetch_projects(
funder="Innovate UK",
status="Active",
)
Green Climate Fund (GCF)¶
Climate finance projects with country and thematic filters.
from backend.app.services.scrapers import GCFClient, GCFConfig
async with GCFClient(GCFConfig()) as client:
projects = await client.list_projects(
country_code="NG",
theme="Mitigation",
)
IATI Datastore¶
International Aid Transparency Initiative data for development and climate finance activities.
from backend.app.services.scrapers import IATIClient, IATIConfig
async with IATIClient(IATIConfig()) as client:
activities = await client.search_activities(
country_code="NG",
sector_code_prefix="230", # Energy sector
)
World Bank Projects¶
World Bank project data with country and sector filters.
from backend.app.services.scrapers import WorldBankClient, WorldBankConfig
async with WorldBankClient(WorldBankConfig()) as client:
projects = await client.list_projects(country_code="NG")
Scraper Architecture¶
All scraper clients follow a consistent architectural pattern:
Common Features¶
- Async HTTP using
httpx.AsyncClientfor non-blocking I/O - Rate limiting with configurable requests-per-second via time-based throttling
- Exponential backoff retry logic with jitter for failed requests
- Async context manager (
async with) for proper resource cleanup - Dataclass-based configuration and response models
- Structured logging via
structlog
Rate Limiting Implementation¶
async def _rate_limit(self):
"""Enforce rate limiting between requests."""
now = asyncio.get_event_loop().time()
elapsed = now - self._last_request_time
if elapsed < self._request_interval:
await asyncio.sleep(self._request_interval - elapsed)
self._last_request_time = asyncio.get_event_loop().time()
Retry Logic¶
for attempt in range(self.config.max_retries):
try:
await self._rate_limit()
response = await self._client.get(url, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in (429, 500, 502, 503, 504):
delay = min(
self.config.backoff_base * (2 ** attempt) + random.uniform(0, 1),
self.config.backoff_max,
)
await asyncio.sleep(delay)
else:
raise
Normalization¶
Source: backend/app/services/grant_normalizer.py
The GrantNormalizer converts raw data from each source into a unified NormalizedGrant schema that maps directly to the database Grant model.
NormalizedGrant Schema¶
@dataclass
class NormalizedGrant:
external_id: str
source: str # cordis, eu_portal, cohesion, innovate_uk, gcf, iati, world_bank
title: str
description: str | None = None
funding_amount_min: float | None = None
funding_amount_max: float | None = None
currency: str = "EUR"
deadline: datetime | None = None
countries: list[str] = field(default_factory=list)
nace_codes: list[str] = field(default_factory=list)
company_sizes: list[str] = field(default_factory=list)
content_hash: str | None = None
status: str = "active"
# ... additional fields
Source-Specific Normalization¶
Each source has a dedicated normalization method:
normalize_cordis()-- Maps CORDIS project fields to NormalizedGrantnormalize_eu_portal()-- Handles SEDIA multipart response formatnormalize_cohesion()-- Converts Socrata SODA recordsnormalize_innovate_uk()-- Maps GtR JSON v7 fieldsnormalize_gcf()-- Converts GCF project datanormalize_iati()-- Maps IATI activity fieldsnormalize_world_bank()-- Converts World Bank project data
Carbon Classification¶
During normalization, grants are automatically classified for carbon relevance:
is_carbon_focusedis set based on keyword analysis of title and descriptioncarbon_categoriesare assigned from the 14 recognized categories- EU Taxonomy objectives are extracted where available
Deduplication¶
Deduplication uses content hashing (MD5 of title + description + source) to identify duplicate grants:
def compute_content_hash(grant: NormalizedGrant) -> str:
content = f"{grant.title}|{grant.description}|{grant.source}"
return hashlib.md5(content.encode("utf-8")).hexdigest()
The database-level deduplication task (deduplicate_grants_task) identifies groups of grants sharing the same content_hash, keeps the most recently synced record as canonical, and marks duplicates as inactive.
Embedding Generation¶
Source: backend/app/services/embedding_service.py
The EmbeddingService generates 768-dimensional vectors using the all-mpnet-base-v2 sentence transformer model.
Configuration¶
@dataclass
class EmbeddingConfig:
model_name: str = "all-mpnet-base-v2"
embedding_dimension: int = 768
batch_size: int = 32
max_seq_length: int = 384
device: str | None = None # Auto-detect
normalize_embeddings: bool = True
Features¶
- Lazy model loading -- model is only loaded on first use
- In-memory caching -- embeddings are cached by MD5 hash of input text
- Batch processing -- multiple texts are processed together for GPU efficiency
- Grant-specific --
generate_grant_embeddings()produces both title and description embeddings - Async wrapper --
generate_grant_embeddings_async()runs in a thread pool for non-blocking use - Cosine similarity --
compute_similarity()for comparing embedding vectors
Batch Processing¶
service = EmbeddingService(config)
# Single text
result = service.generate_embedding("Solar panel installation grant")
# Batch processing (efficient for multiple grants)
results = service.generate_batch_embeddings([
"Energy efficiency grant",
"Renewable energy funding",
"Clean technology innovation",
])
# Grant-specific (title + description embeddings)
grant_result = service.generate_grant_embeddings(normalized_grant)
Celery Tasks¶
Pipeline tasks are defined in backend/app/tasks/grant_pipeline_tasks.py and routed to the sync queue:
| Task | Name | Description |
|---|---|---|
sync_cordis_grants | grant_pipeline.sync_cordis_grants | Fetch from CORDIS |
sync_eu_portal_grants | grant_pipeline.sync_eu_portal_grants | Fetch from EU Portal |
sync_iati_grants | grant_pipeline.sync_iati_grants | Fetch from IATI |
sync_gcf_projects | grant_pipeline.sync_gcf_projects | Fetch from GCF |
sync_world_bank_projects | grant_pipeline.sync_world_bank_projects | Fetch from World Bank |
sync_all_grants | grant_pipeline.sync_all_grants | Orchestrate all sources |
generate_grant_embeddings_task | grant_pipeline.generate_embeddings | Generate missing embeddings |
deduplicate_grants_task | grant_pipeline.deduplicate_grants | Remove duplicates |
All sync tasks use autoretry_for=(Exception,) with exponential backoff (max 600 seconds) and 3 retries.
Scheduling¶
Grant synchronization is automated via Celery Beat:
| Schedule | Task | Time (UTC) |
|---|---|---|
| Daily sync | daily_grant_sync | 2:00 AM |
| Weekly full sync | weekly_full_sync | Sunday 3:00 AM |
| Periodic sync | sync_all_grant_sources | Every 6 hours |
| Expired cleanup | cleanup_expired_grants | Daily 4:00 AM |
See Celery for full schedule details.