""" Periodic catalog sync: fetch stores / product-groups / products from Evotor for every connected user and upsert into cached_* tables. Beat schedule entry (set in celery_app.py): refresh_catalog — runs every CATALOG_REFRESH_INTERVAL_SECONDS seconds """ import logging from datetime import datetime, timezone import httpx from celery import shared_task from web.config import settings from web.database import SessionLocal from web.models.connections import CachedGroup, CachedProduct, CachedStore, EvotorConnection, SyncConfig, SyncFilter logger = logging.getLogger(__name__) EVO_API = "https://api.evotor.ru" def _headers(token: str) -> dict: return { "Authorization": f"Bearer {token}", "Accept": "application/vnd.evotor.v2+json", "Content-Type": "application/json", } def _now() -> datetime: return datetime.now(timezone.utc).replace(tzinfo=None) # ── per-user helpers ────────────────────────────────────────────────────────── def _fetch_stores(token: str) -> list[dict]: r = httpx.get(f"{EVO_API}/stores", headers=_headers(token), timeout=15) r.raise_for_status() data = r.json() return data.get("items", data) if isinstance(data, dict) else data def _fetch_groups(token: str, store_id: str) -> list[dict] | None: """Returns None if the store is not accessible (402/403), list otherwise.""" r = httpx.get( f"{EVO_API}/stores/{store_id}/product-groups", headers=_headers(token), timeout=15, ) if r.status_code in (402, 403): return None r.raise_for_status() data = r.json() return data.get("items", data) if isinstance(data, dict) else data def _fetch_products(token: str, store_id: str) -> list[dict] | None: """Returns None if the store is not accessible (402/403), list otherwise.""" r = httpx.get( f"{EVO_API}/stores/{store_id}/products", headers=_headers(token), timeout=30, ) if r.status_code in (402, 403): return None r.raise_for_status() data = r.json() return data.get("items", data) if isinstance(data, dict) else data def _sync_user(db, user_id: int, token: str) -> None: now = _now() # ── stores ──────────────────────────────────────────────────────────────── try: stores = _fetch_stores(token) except Exception as e: logger.warning("user=%s fetch stores failed: %s", user_id, e) return store_ids = [] for s in stores: evo_id = s.get("id") or s.get("uuid") if not evo_id: continue store_ids.append(evo_id) row = db.query(CachedStore).filter_by(user_id=user_id, evotor_id=evo_id).first() if row: row.name = s.get("name", "") row.address = s.get("address", {}).get("str") if isinstance(s.get("address"), dict) else s.get("address") row.fetched_at = now else: db.add(CachedStore( user_id=user_id, evotor_id=evo_id, name=s.get("name", ""), address=s.get("address", {}).get("str") if isinstance(s.get("address"), dict) else s.get("address"), fetched_at=now, )) db.flush() # ── apply store filter ──────────────────────────────────────────────────── cfg = db.query(SyncConfig).filter_by(user_id=user_id).first() if cfg: include_filters = db.query(SyncFilter).filter_by( sync_config_id=cfg.id, entity_type="store", filter_mode="include" ).all() if include_filters: allowed = {f.entity_id for f in include_filters} store_ids = [s for s in store_ids if s in allowed] # ── groups & products per store ─────────────────────────────────────────── for store_evo_id in store_ids: # groups try: groups = _fetch_groups(token, store_evo_id) except Exception as e: logger.warning("user=%s store=%s fetch groups failed: %s", user_id, store_evo_id, e) groups = [] if groups is None: logger.debug("user=%s store=%s groups not available (402/403), skipping", user_id, store_evo_id) continue for g in groups: evo_id = g.get("id") or g.get("uuid") if not evo_id: continue row = db.query(CachedGroup).filter_by(user_id=user_id, evotor_id=evo_id).first() if row: row.name = g.get("name", "") row.store_evotor_id = store_evo_id row.fetched_at = now else: db.add(CachedGroup( user_id=user_id, evotor_id=evo_id, store_evotor_id=store_evo_id, name=g.get("name", ""), fetched_at=now, )) # products try: products = _fetch_products(token, store_evo_id) except Exception as e: logger.warning("user=%s store=%s fetch products failed: %s", user_id, store_evo_id, e) products = [] if products is None: logger.debug("user=%s store=%s products not available (402/403), skipping", user_id, store_evo_id) products = [] for p in products: evo_id = p.get("id") or p.get("uuid") if not evo_id: continue price = p.get("price") quantity = p.get("quantity") row = db.query(CachedProduct).filter_by(user_id=user_id, evotor_id=evo_id).first() if row: row.store_evotor_id = store_evo_id row.group_evotor_id = p.get("group") or p.get("parentUuid") row.name = p.get("name", "") row.price = float(price) if price is not None else None row.quantity = float(quantity) if quantity is not None else None row.measure_name = p.get("measureName") or p.get("measure_name") row.article_number = p.get("code") or p.get("article_number") row.allow_to_sell = p.get("allowToSell") if p.get("allowToSell") is not None else p.get("allow_to_sell") row.fetched_at = now else: db.add(CachedProduct( user_id=user_id, evotor_id=evo_id, store_evotor_id=store_evo_id, group_evotor_id=p.get("group") or p.get("parentUuid"), name=p.get("name", ""), price=float(price) if price is not None else None, quantity=float(quantity) if quantity is not None else None, measure_name=p.get("measureName") or p.get("measure_name"), article_number=p.get("code") or p.get("article_number"), allow_to_sell=p.get("allowToSell") if p.get("allowToSell") is not None else p.get("allow_to_sell"), fetched_at=now, )) db.commit() logger.info( "user=%s catalog synced: %d stores, %d groups, %d products", user_id, len(stores), sum(1 for _ in db.query(CachedGroup).filter_by(user_id=user_id)), sum(1 for _ in db.query(CachedProduct).filter_by(user_id=user_id)), ) # ── Celery task ─────────────────────────────────────────────────────────────── @shared_task( name="web.tasks.catalog.refresh_catalog", queue="default", bind=True, max_retries=2, default_retry_delay=60, ) def refresh_catalog(self) -> dict: """Fetch and cache stores/groups/products for all connected Evotor users.""" db = SessionLocal() results = {"ok": 0, "failed": 0} try: connections = ( db.query(EvotorConnection) .filter( EvotorConnection.user_id.isnot(None), EvotorConnection.access_token.isnot(None), EvotorConnection.access_token != "", ) .all() ) for conn in connections: try: _sync_user(db, conn.user_id, conn.access_token) results["ok"] += 1 except Exception as exc: logger.error("catalog sync failed for user=%s: %s", conn.user_id, exc) results["failed"] += 1 finally: db.close() logger.info("refresh_catalog done: %s", results) return results