""" Periodic VK catalog sync: fetch albums and products from VK Market for every connected user and upsert into vk_cached_* tables. """ import logging from datetime import datetime, timezone from celery import shared_task import web.lib.api_logger as api_logger from web.config import settings from web.database import SessionLocal from web.models.connections import SyncConfig, VkCachedAlbum, VkCachedProduct, VkConnection logger = logging.getLogger(__name__) VK_API = "https://api.vk.com/method" def _now() -> datetime: return datetime.now(timezone.utc).replace(tzinfo=None) def _vk_get(method: str, params: dict, token: str, user_id: int | None = None) -> dict: params = {**params, "access_token": token, "v": settings.VK_API_VERSION} r = api_logger.get(f"{VK_API}/{method}", user_id=user_id, params=params, timeout=20) r.raise_for_status() return r.json() def _sync_user(db, user_id: int, token: str, group_id: str) -> None: now = _now() owner_id = f"-{group_id}" # ── albums ──────────────────────────────────────────────────────────────── try: data = _vk_get("market.getAlbums", {"owner_id": owner_id, "count": 100}, token, user_id=user_id) except Exception as e: logger.warning("user=%s vk fetch albums failed: %s", user_id, e) return if "error" in data: logger.warning("user=%s vk albums error: %s", user_id, data["error"]) return albums = data.get("response", {}).get("items", []) album_ids = [] for a in albums: aid = str(a["id"]) album_ids.append(aid) row = db.query(VkCachedAlbum).filter_by(user_id=user_id, vk_group_id=group_id, album_id=aid).first() if row: row.title = a.get("title", "") row.count = a.get("count") row.fetched_at = now else: db.add(VkCachedAlbum( user_id=user_id, vk_group_id=group_id, album_id=aid, title=a.get("title", ""), count=a.get("count"), fetched_at=now, )) # Delete cached albums that no longer exist in VK ( db.query(VkCachedAlbum) .filter( VkCachedAlbum.user_id == user_id, VkCachedAlbum.vk_group_id == group_id, VkCachedAlbum.album_id.notin_(album_ids), ) .delete(synchronize_session=False) ) db.flush() # ── products (extended=1 gives albums_ids per product) ─────────────────── offset = 0 all_products = [] while True: try: data = _vk_get( "market.get", {"owner_id": owner_id, "count": 200, "offset": offset, "extended": 1}, token, user_id=user_id, ) except Exception as e: logger.warning("user=%s vk fetch products (extended) failed: %s", user_id, e) break if "error" in data: logger.warning("user=%s vk products (extended) error: %s", user_id, data["error"]) break items = data.get("response", {}).get("items", []) all_products.extend(items) if len(items) < 200: break offset += 200 for p in all_products: pid = str(p["id"]) album_id = str(p["albums_ids"][0]) if p.get("albums_ids") else None price_field = p.get("price") if isinstance(price_field, dict): price = float(price_field.get("amount", 0)) / 100 if price_field.get("amount") is not None else None else: price = float(price_field) if price_field is not None else None thumb = None thumb_field = p.get("thumb_photo") if isinstance(thumb_field, dict): sizes = thumb_field.get("sizes", []) if sizes: thumb = sizes[-1].get("url") elif isinstance(thumb_field, str): thumb = thumb_field row = db.query(VkCachedProduct).filter_by( user_id=user_id, vk_group_id=group_id, vk_product_id=pid, ).first() if row: row.album_id = album_id row.name = p.get("title", "") row.description = p.get("description") row.price = price row.availability = p.get("availability") row.thumb_url = thumb row.fetched_at = now else: db.add(VkCachedProduct( user_id=user_id, vk_group_id=group_id, vk_product_id=pid, album_id=album_id, name=p.get("title", ""), description=p.get("description"), price=price, availability=p.get("availability"), thumb_url=thumb, fetched_at=now, )) db.commit() logger.info( "user=%s vk catalog synced: group=%s albums=%d products=%d", user_id, group_id, len(albums), len(all_products), ) @shared_task( name="web.tasks.vk_catalog.refresh_vk_catalog", queue="default", bind=True, max_retries=2, default_retry_delay=60, ) def refresh_vk_catalog(self) -> dict: """Fetch and cache VK Market albums and products for all connected users.""" db = SessionLocal() results = {"ok": 0, "failed": 0} try: connections = ( db.query(VkConnection) .filter( VkConnection.user_id.isnot(None), VkConnection.access_token.isnot(None), VkConnection.access_token != "", VkConnection.vk_user_id.isnot(None), VkConnection.vk_user_id != "", ) .all() ) for conn in connections: cfg = db.query(SyncConfig).filter_by(user_id=conn.user_id).first() if not cfg or not cfg.vk_mirror_enabled: continue try: _sync_user(db, conn.user_id, conn.access_token, conn.vk_user_id) results["ok"] += 1 except Exception as exc: logger.error("vk catalog sync failed for user=%s: %s", conn.user_id, exc) results["failed"] += 1 finally: db.close() logger.info("refresh_vk_catalog done: %s", results) return results