Celery Workers
Carbon Connect uses Celery for background task processing with Valkey (Redis-compatible) as the message broker and result backend.
Sources:
backend/app/worker/celery_app.py -- Application configuration backend/app/worker/celery_beat.py -- Periodic task schedules backend/app/worker/async_utils.py -- Async bridge utility backend/app/tasks/grant_pipeline_tasks.py -- Pipeline tasks
Queue Architecture
The system uses four dedicated queues with direct exchange routing:
| Queue | Exchange | Purpose | Rate Limit |
default | default | Unrouted tasks, maintenance, health checks | None |
matching | matching | Company-grant matching calculations | None |
email | email | Email notifications, deadline checks | 30/minute |
sync | sync | Grant data synchronization, embeddings | 10/minute |
Task Routing
task_routes = {
# Matching tasks
"backend.app.worker.tasks.matching_tasks.*": {"queue": "matching"},
# Email tasks
"backend.app.worker.tasks.email_tasks.*": {"queue": "email"},
# Sync tasks
"backend.app.worker.tasks.sync_tasks.*": {"queue": "sync"},
"backend.app.tasks.grant_pipeline_tasks.*": {"queue": "sync"},
}
Worker Configuration
Key Settings
| Setting | Value | Description |
task_serializer | json | Serialization format |
result_expires | 3600 | Result TTL in seconds (1 hour) |
task_time_limit | 1800 | Hard limit per task (30 minutes) |
task_soft_time_limit | 1500 | Soft limit per task (25 minutes) |
worker_prefetch_multiplier | 4 | Tasks prefetched per worker |
worker_concurrency | 4 | Concurrent task execution |
task_acks_late | True | Acknowledge after completion |
task_reject_on_worker_lost | True | Requeue on worker crash |
timezone | UTC | All schedules use UTC |
Retry Policies
Each queue has a specific retry policy:
Running Workers
Start All Queues
poetry run celery -A backend.app.worker.celery_app worker -l info
Start Specific Queue
# Matching only
poetry run celery -A backend.app.worker.celery_app worker -Q matching -l info
# Email only
poetry run celery -A backend.app.worker.celery_app worker -Q email -l info
# Sync only
poetry run celery -A backend.app.worker.celery_app worker -Q sync -l info
Start Beat Scheduler
poetry run celery -A backend.app.worker.celery_app beat -l info
Start Flower Monitoring
poetry run celery -A backend.app.worker.celery_app flower --port=5555
Celery Beat Schedule
All scheduled tasks are defined in backend/app/worker/celery_beat.py.
Grant Synchronization
| Task | Schedule | Queue | Description |
daily-grant-sync | Daily 2:00 AM UTC | sync | Incremental sync from all enabled sources |
weekly-full-sync | Sunday 3:00 AM UTC | sync | Full sync with deduplication |
sync-grants-periodic | Every 6 hours | sync | Legacy periodic sync |
Grant Maintenance
| Task | Schedule | Queue | Description |
daily-expired-cleanup | Daily 4:00 AM UTC | sync | Mark expired grants as inactive |
weekly-reindex-search | Saturday 5:00 AM UTC | sync | Rebuild Meilisearch index |
Matching
| Task | Schedule | Queue | Description |
weekly-match-recalculation | Saturday 2:30 AM UTC | matching | Recalculate all match scores |
daily-match-refresh | Daily 2:30 AM UTC | matching | Refresh matches for active companies |
Notifications
| Task | Schedule | Queue | Description |
hourly-deadline-check | Every hour at :00 | email | Check for approaching deadlines |
check-deadline-notifications-legacy | Every hour at :30 | email | Legacy notification check |
Maintenance
| Task | Schedule | Queue | Description |
daily-session-cleanup | Daily 3:00 AM UTC | default | Remove expired sessions and tokens |
monthly-statistics | 1st of month 5:00 AM UTC | default | Aggregate monthly statistics |
weekly-database-vacuum | Sunday 6:00 AM UTC | default | Run ANALYZE on database |
health-check | Every 5 minutes | default | Verify all services are running |
Async Bridge
Source: backend/app/worker/async_utils.py
Celery tasks are synchronous, but the application code uses async (SQLAlchemy AsyncSession, httpx). The run_async() utility bridges this gap:
from backend.app.worker.async_utils import run_async
def run_async(coro_factory: Callable[[], Coroutine]) -> T:
"""
Run an async coroutine from synchronous Celery task code.
Handles two environments:
- No running loop: calls asyncio.run() directly
- Running loop exists: runs in a dedicated thread with its own loop
"""
This is used in all pipeline tasks:
@shared_task(bind=True)
def sync_cordis_grants(self, ...):
async def _fetch():
async with CordisScraper(config) as scraper:
return await scraper.fetch_all_projects(...)
projects = run_async(_fetch)
Task Modules
Grant Pipeline Tasks
backend/app/tasks/grant_pipeline_tasks.py
| Task | Description |
sync_cordis_grants | Fetch from CORDIS |
sync_eu_portal_grants | Fetch from EU Portal |
sync_iati_grants | Fetch from IATI Datastore |
sync_gcf_projects | Fetch from Green Climate Fund |
sync_world_bank_projects | Fetch from World Bank |
sync_all_grants | Orchestrate all source syncs |
generate_grant_embeddings_task | Generate missing embeddings |
deduplicate_grants_task | Database-level deduplication |
Worker Tasks
| Module | Tasks |
matching_tasks | refresh_all_matches, calculate_company_matches |
email_tasks | send_email, check_deadline_notifications |
sync_tasks | sync_all_grant_sources |
scheduled_tasks | daily_grant_sync, weekly_full_sync, cleanup_expired_grants |
maintenance_tasks | reindex_search, cleanup_old_sessions, health_check, vacuum_database |
Configuration
| Environment Variable | Description | Default |
REDIS_URL | Valkey/Redis URL (broker + backend) | redis://localhost:6379/0 |
CELERY_TASK_ALWAYS_EAGER | Run tasks synchronously (testing) | false |
CELERY_BEAT_SCHEDULE_ENABLED | Enable Beat schedule | true |
PIPELINE_SYNC_INTERVAL_HOURS | Periodic sync interval | 6 |
SYNC_SCHEDULE_HOUR | Daily sync hour (UTC) | 2 |