from celery import Celery from celery.schedules import timedelta from web.config import settings celery_app = Celery("evosync", broker=settings.REDIS_URL, backend=settings.REDIS_URL) celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], timezone="Europe/Moscow", enable_utc=True, task_track_started=True, task_acks_late=True, worker_prefetch_multiplier=1, broker_connection_retry_on_startup=True, task_routes={ "web.tasks.sync.*": {"queue": "sync"}, "web.tasks.vk_sync.*": {"queue": "sync"}, "web.tasks.health.*": {"queue": "health"}, "web.tasks.catalog.*": {"queue": "default"}, "web.tasks.vk_catalog.*": {"queue": "default"}, "web.notifications.tasks.*": {"queue": "notifications"}, }, beat_schedule={ # Chain: fetch Evotor → fetch VK catalog → mirror Evotor→VK # Beat fires the launcher task which chains all three sequentially. "sync-pipeline": { "task": "web.tasks.celery_app.run_sync_pipeline", "schedule": timedelta(seconds=settings.CATALOG_REFRESH_INTERVAL_SECONDS), }, }, ) # Register task modules so beat/worker can discover them celery_app.autodiscover_tasks([ "web.tasks.catalog", "web.tasks.vk_catalog", "web.tasks.vk_sync", "web.tasks.celery_app", ]) @celery_app.task(name="web.tasks.celery_app.run_sync_pipeline", queue="default") def run_sync_pipeline() -> str: """ Beat entry point. Chains refresh_catalog → refresh_vk_catalog → mirror_to_vk so that mirror only runs after both catalog fetches are complete. """ from celery import chain from web.tasks.catalog import refresh_catalog from web.tasks.vk_catalog import refresh_vk_catalog from web.tasks.vk_sync import mirror_to_vk chain( refresh_catalog.si(), refresh_vk_catalog.si(), mirror_to_vk.si(), ).apply_async() return "pipeline dispatched"