""" Mirror Evotor product catalog → VK Market. Runs after both refresh_catalog and refresh_vk_catalog complete. Only processes stores and groups that have sync enabled (via SyncFilter). Updates VK products only when at least one synced field has changed. """ import logging import os from datetime import datetime, timezone from decimal import Decimal from celery import shared_task import web.lib.api_logger as api_logger from web.config import settings from web.database import SessionLocal from web.models.connections import ( CachedGroup, CachedProduct, CachedStore, SyncConfig, SyncFilter, VkCachedAlbum, VkConnection, ) logger = logging.getLogger(__name__) VK_API = "https://api.vk.com/method" _PHOTO_CACHE: dict[int, str] = {} # user_id → uploaded photo_id for this run def _now() -> datetime: return datetime.now(timezone.utc).replace(tzinfo=None) def _calc_price(price: Decimal | None) -> int: """Return price in kopecks for VK Market API.""" if price is None: return 0 return int(float(price) * 100) def _build_description(name: str, measure: str | None, evo_description: str | None) -> str: price_info = (measure or "").strip() desc = f"{name} (цена за {price_info}.)\n\n" if evo_description: desc += evo_description return desc def _name_for_vk(name: str) -> str: return name.replace(";", ",") def _vk_post(method: str, data: dict, token: str, user_id: int | None = None) -> dict: data = {**data, "access_token": token, "v": settings.VK_API_VERSION} r = api_logger.post(f"{VK_API}/{method}", user_id=user_id, data=data, timeout=30) r.raise_for_status() return r.json() def _upload_photo(token: str, group_id: str, user_id: int | None = None) -> str | None: """Upload the default product photo and return photo_id, or None on failure.""" photo_path = settings.VK_DEFAULT_PHOTO_PATH if not os.path.exists(photo_path): logger.warning("Default photo not found at %s", photo_path) return None try: # Step 1: get upload URL resp = _vk_post("market.getProductPhotoUploadServer", {"group_id": group_id}, token, user_id=user_id) if "error" in resp: logger.warning("getProductPhotoUploadServer error: %s", resp["error"]) return None upload_url = resp["response"]["upload_url"] # Step 2: upload file with open(photo_path, "rb") as f: up = api_logger.post(upload_url, user_id=user_id, files={"file": f}, timeout=30) up.raise_for_status() upload_obj = up.text # Step 3: save resp2 = _vk_post("market.saveProductPhoto", {"upload_response": upload_obj}, token, user_id=user_id) if "error" in resp2: logger.warning("saveProductPhoto error: %s", resp2["error"]) return None return str(resp2["response"]["photo_id"]) except Exception as e: logger.warning("Photo upload failed: %s", e) return None def _get_photo_id(user_id: int, token: str, group_id: str) -> str | None: """Upload photo once per sync run per user, cache the result.""" if user_id not in _PHOTO_CACHE: _PHOTO_CACHE[user_id] = _upload_photo(token, group_id, user_id=user_id) return _PHOTO_CACHE[user_id] def _enabled_store_ids(db, user_id: int) -> set[str] | None: cfg = db.query(SyncConfig).filter_by(user_id=user_id).first() if not cfg: return None filters = db.query(SyncFilter).filter_by( sync_config_id=cfg.id, entity_type="store", filter_mode="include", ).all() return None if not filters else {f.entity_id for f in filters} def _enabled_group_ids(db, user_id: int, store_evotor_id: str) -> set[str] | None: cfg = db.query(SyncConfig).filter_by(user_id=user_id).first() if not cfg: return None filters = db.query(SyncFilter).filter_by( sync_config_id=cfg.id, entity_type="group", filter_mode="include", parent_entity_id=store_evotor_id, ).all() return None if not filters else {f.entity_id for f in filters} def _ensure_album(db, user_id: int, vk_group_id: str, group_name: str, token: str) -> str | None: """Return VK album_id for the given group name, creating it if needed.""" album = db.query(VkCachedAlbum).filter_by( user_id=user_id, vk_group_id=vk_group_id, ).filter(VkCachedAlbum.title == group_name).first() if album: return album.album_id # Create album in VK resp = _vk_post("market.addAlbum", { "owner_id": f"-{vk_group_id}", "title": group_name, }, token, user_id=user_id) if "error" in resp: logger.warning("market.addAlbum error for '%s': %s", group_name, resp["error"]) return None album_id = str(resp["response"]["market_album_id"]) db.add(VkCachedAlbum( user_id=user_id, vk_group_id=vk_group_id, album_id=album_id, title=group_name, fetched_at=_now(), )) db.flush() logger.info("user=%s created VK album '%s' id=%s", user_id, group_name, album_id) return album_id def _sync_product( db, user_id: int, product: CachedProduct, album_id: str, vk_group_id: str, token: str, ) -> None: name = _name_for_vk(product.name) price_kopecks = _calc_price(product.price) desc = _build_description(product.name, product.measure_name, None) stock = settings.VK_STOCK_AMOUNT if product.allow_to_sell else 0 owner_id = f"-{vk_group_id}" now = _now() if product.vk_product_id: # Check if update needed changed = False # Re-read current VK state from cache from web.models.connections import VkCachedProduct vk_p = db.query(VkCachedProduct).filter_by( user_id=user_id, vk_group_id=vk_group_id, vk_product_id=product.vk_product_id, ).first() album_changed = False if vk_p: vk_price_kopecks = int(vk_p.price * 100) if vk_p.price is not None else 0 vk_stock = settings.VK_STOCK_AMOUNT if vk_p.availability == 0 else 0 vk_name = vk_p.name or "" vk_desc = (vk_p.description or "").strip() curr_desc = desc.strip() album_changed = str(vk_p.album_id) != str(album_id) if album_id else False changed = ( name != vk_name or price_kopecks != vk_price_kopecks or curr_desc != vk_desc or stock != vk_stock or album_changed ) else: changed = True # cached VK product gone, push update if not changed: return resp = _vk_post("market.edit", { "owner_id": owner_id, "item_id": product.vk_product_id, "name": name, "description": desc.strip(), "category_id": settings.VK_CATEGORY_ID, "price": price_kopecks, "stock_amount": stock, }, token, user_id=user_id) if "error" in resp: logger.warning("market.edit error product=%s: %s", product.evotor_id, resp["error"]) return if album_changed and album_id and vk_p: old_album_id = str(vk_p.album_id) _vk_post("market.removeFromAlbum", { "owner_id": owner_id, "item_id": product.vk_product_id, "album_ids": old_album_id, }, token, user_id=user_id) resp_album = _vk_post("market.addToAlbum", { "owner_id": owner_id, "item_ids": product.vk_product_id, "album_ids": album_id, }, token, user_id=user_id) if "error" in resp_album: logger.warning("market.addToAlbum error product=%s: %s", product.evotor_id, resp_album["error"]) else: logger.info("user=%s moved VK product '%s' album %s→%s", user_id, name, old_album_id, album_id) product.synced_at = now logger.info("user=%s updated VK product '%s' id=%s", user_id, name, product.vk_product_id) else: # Create — only if allow_to_sell if not product.allow_to_sell: return photo_id = _get_photo_id(user_id, token, vk_group_id) if not photo_id: logger.warning("user=%s skipping create for '%s': no photo", user_id, name) return resp = _vk_post("market.add", { "owner_id": owner_id, "name": name, "description": desc, "category_id": settings.VK_CATEGORY_ID, "price": price_kopecks, "main_photo_id": photo_id, "stock_amount": stock, }, token, user_id=user_id) if "error" in resp: logger.warning("market.add error product=%s: %s", product.evotor_id, resp["error"]) return vk_item_id = str(resp["response"]["market_item_id"]) product.vk_product_id = vk_item_id product.synced_at = now # Add to album resp2 = _vk_post("market.addToAlbum", { "owner_id": owner_id, "item_ids": vk_item_id, "album_ids": album_id, }, token, user_id=user_id) if "error" in resp2: logger.warning("market.addToAlbum error product=%s: %s", product.evotor_id, resp2["error"]) logger.info("user=%s created VK product '%s' id=%s", user_id, name, vk_item_id) def _sync_product_list(db, user_id, products, album_id, vk_group_id, token, results, owned_ids): for product in products: was_new = product.vk_product_id is None try: _sync_product(db, user_id, product, album_id, vk_group_id, token) if product.vk_product_id: owned_ids.add(product.vk_product_id) if product.synced_at: if was_new and product.vk_product_id: results["created"] += 1 elif not was_new: results["updated"] += 1 else: results["skipped"] += 1 else: results["skipped"] += 1 except Exception as e: logger.error("user=%s sync_product failed '%s': %s", user_id, product.name, e) results["errors"] += 1 def _delete_orphans(db, user_id, vk_group_id, owned_ids, token, results): from web.models.connections import VkCachedProduct orphans = db.query(VkCachedProduct).filter_by( user_id=user_id, vk_group_id=vk_group_id, ).filter(VkCachedProduct.vk_product_id.notin_(owned_ids)).all() if owned_ids else \ db.query(VkCachedProduct).filter_by(user_id=user_id, vk_group_id=vk_group_id).all() owner_id = f"-{vk_group_id}" for vk_p in orphans: try: resp = _vk_post("market.delete", { "owner_id": owner_id, "item_id": vk_p.vk_product_id, }, token, user_id=user_id) if "error" in resp: logger.warning("market.delete error id=%s: %s", vk_p.vk_product_id, resp["error"]) results["errors"] += 1 else: logger.info("user=%s deleted VK product id=%s '%s'", user_id, vk_p.vk_product_id, vk_p.name) db.delete(vk_p) results["deleted"] += 1 except Exception as e: logger.error("user=%s delete failed id=%s: %s", user_id, vk_p.vk_product_id, e) results["errors"] += 1 # Also clear vk_product_id on any cached_products that pointed to deleted VK items if orphans: deleted_vk_ids = {vk_p.vk_product_id for vk_p in orphans} stale = db.query(CachedProduct).filter( CachedProduct.user_id == user_id, CachedProduct.vk_product_id.in_(deleted_vk_ids), ).all() for p in stale: p.vk_product_id = None def _sync_user(db, user_id: int, token: str, vk_group_id: str) -> dict: results = {"created": 0, "updated": 0, "skipped": 0, "deleted": 0, "errors": 0} owned_ids: set[str] = set() # VK product IDs that Evotor owns this run enabled_stores = _enabled_store_ids(db, user_id) stores = db.query(CachedStore).filter_by(user_id=user_id).all() for store in stores: if enabled_stores is not None and store.evotor_id not in enabled_stores: continue enabled_groups = _enabled_group_ids(db, user_id, store.evotor_id) groups = db.query(CachedGroup).filter_by( user_id=user_id, store_evotor_id=store.evotor_id, ).all() for group in groups: if enabled_groups is not None and group.evotor_id not in enabled_groups: continue try: album_id = _ensure_album(db, user_id, vk_group_id, group.name, token) except Exception as e: logger.error("user=%s ensure_album failed for '%s': %s", user_id, group.name, e) results["errors"] += 1 continue if not album_id: results["errors"] += 1 continue products = db.query(CachedProduct).filter_by( user_id=user_id, store_evotor_id=store.evotor_id, group_evotor_id=group.evotor_id, ).all() _sync_product_list(db, user_id, products, album_id, vk_group_id, token, results, owned_ids) # Ungrouped products → "Без категории" album ungrouped = db.query(CachedProduct).filter_by( user_id=user_id, store_evotor_id=store.evotor_id, group_evotor_id=None, ).all() if ungrouped: try: fallback_album_id = _ensure_album(db, user_id, vk_group_id, "Без категории", token) except Exception as e: logger.error("user=%s ensure_album failed for 'Без категории': %s", user_id, e) fallback_album_id = None if fallback_album_id: _sync_product_list(db, user_id, ungrouped, fallback_album_id, vk_group_id, token, results, owned_ids) # Delete VK products not owned by any Evotor product _delete_orphans(db, user_id, vk_group_id, owned_ids, token, results) db.commit() return results @shared_task( name="web.tasks.vk_sync.mirror_to_vk", queue="sync", bind=True, max_retries=1, default_retry_delay=120, ) def mirror_to_vk(self) -> dict: """Mirror Evotor catalog → VK Market for all users with both connections active.""" _PHOTO_CACHE.clear() db = SessionLocal() totals = {"ok": 0, "failed": 0} try: vk_connections = ( db.query(VkConnection) .filter( VkConnection.user_id.isnot(None), VkConnection.access_token.isnot(None), VkConnection.access_token != "", VkConnection.vk_user_id.isnot(None), VkConnection.vk_user_id != "", ) .all() ) for conn in vk_connections: cfg = db.query(SyncConfig).filter_by(user_id=conn.user_id).first() if not cfg or not cfg.is_enabled: continue try: result = _sync_user(db, conn.user_id, conn.access_token, conn.vk_user_id) logger.info("user=%s mirror_to_vk: %s", conn.user_id, result) totals["ok"] += 1 except Exception as exc: logger.error("mirror_to_vk failed for user=%s: %s", conn.user_id, exc) totals["failed"] += 1 finally: db.close() logger.info("mirror_to_vk done: %s", totals) return totals