Skip to content

Celery Workers

Carbon Connect uses Celery for background task processing with Valkey (Redis-compatible) as the message broker and result backend.

Sources:

  • backend/app/worker/celery_app.py -- Application configuration
  • backend/app/worker/celery_beat.py -- Periodic task schedules
  • backend/app/worker/async_utils.py -- Async bridge utility
  • backend/app/tasks/grant_pipeline_tasks.py -- Pipeline tasks

Queue Architecture

The system uses four dedicated queues with direct exchange routing:

Queue Exchange Purpose Rate Limit
default default Unrouted tasks, maintenance, health checks None
matching matching Company-grant matching calculations None
email email Email notifications, deadline checks 30/minute
sync sync Grant data synchronization, embeddings 10/minute

Task Routing

task_routes = {
    # Matching tasks
    "backend.app.worker.tasks.matching_tasks.*": {"queue": "matching"},
    # Email tasks
    "backend.app.worker.tasks.email_tasks.*": {"queue": "email"},
    # Sync tasks
    "backend.app.worker.tasks.sync_tasks.*": {"queue": "sync"},
    "backend.app.tasks.grant_pipeline_tasks.*": {"queue": "sync"},
}

Worker Configuration

Key Settings

Setting Value Description
task_serializer json Serialization format
result_expires 3600 Result TTL in seconds (1 hour)
task_time_limit 1800 Hard limit per task (30 minutes)
task_soft_time_limit 1500 Soft limit per task (25 minutes)
worker_prefetch_multiplier 4 Tasks prefetched per worker
worker_concurrency 4 Concurrent task execution
task_acks_late True Acknowledge after completion
task_reject_on_worker_lost True Requeue on worker crash
timezone UTC All schedules use UTC

Retry Policies

Each queue has a specific retry policy:

{
    "max_retries": 3,
    "retry_backoff": True,
    "retry_backoff_max": 300,  # 5 minutes
}
{
    "max_retries": 5,
    "retry_backoff": True,
    "retry_backoff_max": 1800,  # 30 minutes
}
{
    "max_retries": 3,
    "retry_backoff": True,
    "retry_backoff_max": 600,  # 10 minutes
}

Running Workers

Start All Queues

poetry run celery -A backend.app.worker.celery_app worker -l info

Start Specific Queue

# Matching only
poetry run celery -A backend.app.worker.celery_app worker -Q matching -l info

# Email only
poetry run celery -A backend.app.worker.celery_app worker -Q email -l info

# Sync only
poetry run celery -A backend.app.worker.celery_app worker -Q sync -l info

Start Beat Scheduler

poetry run celery -A backend.app.worker.celery_app beat -l info

Start Flower Monitoring

poetry run celery -A backend.app.worker.celery_app flower --port=5555

Celery Beat Schedule

All scheduled tasks are defined in backend/app/worker/celery_beat.py.

Grant Synchronization

Task Schedule Queue Description
daily-grant-sync Daily 2:00 AM UTC sync Incremental sync from all enabled sources
weekly-full-sync Sunday 3:00 AM UTC sync Full sync with deduplication
sync-grants-periodic Every 6 hours sync Legacy periodic sync

Grant Maintenance

Task Schedule Queue Description
daily-expired-cleanup Daily 4:00 AM UTC sync Mark expired grants as inactive
weekly-reindex-search Saturday 5:00 AM UTC sync Rebuild Meilisearch index

Matching

Task Schedule Queue Description
weekly-match-recalculation Saturday 2:30 AM UTC matching Recalculate all match scores
daily-match-refresh Daily 2:30 AM UTC matching Refresh matches for active companies

Notifications

Task Schedule Queue Description
hourly-deadline-check Every hour at :00 email Check for approaching deadlines
check-deadline-notifications-legacy Every hour at :30 email Legacy notification check

Maintenance

Task Schedule Queue Description
daily-session-cleanup Daily 3:00 AM UTC default Remove expired sessions and tokens
monthly-statistics 1st of month 5:00 AM UTC default Aggregate monthly statistics
weekly-database-vacuum Sunday 6:00 AM UTC default Run ANALYZE on database
health-check Every 5 minutes default Verify all services are running

Async Bridge

Source: backend/app/worker/async_utils.py

Celery tasks are synchronous, but the application code uses async (SQLAlchemy AsyncSession, httpx). The run_async() utility bridges this gap:

from backend.app.worker.async_utils import run_async

def run_async(coro_factory: Callable[[], Coroutine]) -> T:
    """
    Run an async coroutine from synchronous Celery task code.

    Handles two environments:
    - No running loop: calls asyncio.run() directly
    - Running loop exists: runs in a dedicated thread with its own loop
    """

This is used in all pipeline tasks:

@shared_task(bind=True)
def sync_cordis_grants(self, ...):
    async def _fetch():
        async with CordisScraper(config) as scraper:
            return await scraper.fetch_all_projects(...)

    projects = run_async(_fetch)

Task Modules

Grant Pipeline Tasks

backend/app/tasks/grant_pipeline_tasks.py

Task Description
sync_cordis_grants Fetch from CORDIS
sync_eu_portal_grants Fetch from EU Portal
sync_iati_grants Fetch from IATI Datastore
sync_gcf_projects Fetch from Green Climate Fund
sync_world_bank_projects Fetch from World Bank
sync_all_grants Orchestrate all source syncs
generate_grant_embeddings_task Generate missing embeddings
deduplicate_grants_task Database-level deduplication

Worker Tasks

Module Tasks
matching_tasks refresh_all_matches, calculate_company_matches
email_tasks send_email, check_deadline_notifications
sync_tasks sync_all_grant_sources
scheduled_tasks daily_grant_sync, weekly_full_sync, cleanup_expired_grants
maintenance_tasks reindex_search, cleanup_old_sessions, health_check, vacuum_database

Configuration

Environment Variable Description Default
REDIS_URL Valkey/Redis URL (broker + backend) redis://localhost:6379/0
CELERY_TASK_ALWAYS_EAGER Run tasks synchronously (testing) false
CELERY_BEAT_SCHEDULE_ENABLED Enable Beat schedule true
PIPELINE_SYNC_INTERVAL_HOURS Periodic sync interval 6
SYNC_SCHEDULE_HOUR Daily sync hour (UTC) 2