From 1bf82adbfcec9cf23ecad8bb79b5fe4448d68c5c Mon Sep 17 00:00:00 2001 From: mguschin Date: Tue, 10 Mar 2026 17:05:37 +0300 Subject: [PATCH] Add sync engine and wire it into the web app - Add sync_engine.py: background asyncio loop syncing Evotor products to VK market - Wire sync_loop into lifespan alongside health_check_loop - Add SYNC_INTERVAL_SECONDS and VK_DEFAULT_PHOTO_PATH settings to config - Mount default product image in docker-compose - Add synced_at column to CachedProduct model + migration - Show synced_at status in catalog products template - Fix VK groups API response parsing (handle list vs dict) Co-Authored-By: Claude Sonnet 4.6 --- docker-compose.yml | 1 + web/config.py | 3 + web/main.py | 18 +- ...d0e1f2_add_synced_at_to_cached_products.py | 26 + web/models.py | 1 + web/routes/vk.py | 3 +- web/sync_engine.py | 485 ++++++++++++++++++ web/templates/catalog_products.html | 10 + 8 files changed, 540 insertions(+), 7 deletions(-) create mode 100644 web/migrations/versions/a7b8c9d0e1f2_add_synced_at_to_cached_products.py create mode 100644 web/sync_engine.py diff --git a/docker-compose.yml b/docker-compose.yml index 58fc589..c06883e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,7 @@ services: - ./web:/app/web - ./alembic.ini:/app/alembic.ini - ./docker-entrypoint.sh:/app/docker-entrypoint.sh + - ./5393364294319597854.png:/app/default_product.png:ro restart: unless-stopped extra_hosts: - "host.docker.internal:host-gateway" diff --git a/web/config.py b/web/config.py index 2e6d99d..fabba82 100644 --- a/web/config.py +++ b/web/config.py @@ -14,6 +14,9 @@ class Settings(BaseSettings): HEALTH_CHECK_INTERVAL_SECONDS: int = 600 CATALOG_REFRESH_INTERVAL_SECONDS: int = 3600 + SYNC_INTERVAL_SECONDS: int = 3600 + + VK_DEFAULT_PHOTO_PATH: str = "/app/default_product.png" VK_API_VERSION: str = "5.131" diff --git a/web/main.py b/web/main.py index 0e00f32..84bc7d7 100644 --- a/web/main.py +++ b/web/main.py @@ -9,6 +9,7 @@ from starlette.middleware.sessions import SessionMiddleware from web.auth import get_current_user from web.config import settings from web.health_checker import health_check_loop +from web.sync_engine import sync_loop from web.models import User from web.routes import auth, profile, reset, evotor, vk, sync, catalog from web.routes import connections @@ -16,13 +17,18 @@ from web.routes import connections @asynccontextmanager async def lifespan(app: FastAPI): - task = asyncio.create_task(health_check_loop(settings.HEALTH_CHECK_INTERVAL_SECONDS)) + tasks = [ + asyncio.create_task(health_check_loop(settings.HEALTH_CHECK_INTERVAL_SECONDS)), + asyncio.create_task(sync_loop(settings.SYNC_INTERVAL_SECONDS)), + ] yield - task.cancel() - try: - await task - except asyncio.CancelledError: - pass + for t in tasks: + t.cancel() + for t in tasks: + try: + await t + except asyncio.CancelledError: + pass app = FastAPI(title="ЭВОСИНК — Личный кабинет", lifespan=lifespan) diff --git a/web/migrations/versions/a7b8c9d0e1f2_add_synced_at_to_cached_products.py b/web/migrations/versions/a7b8c9d0e1f2_add_synced_at_to_cached_products.py new file mode 100644 index 0000000..a399d89 --- /dev/null +++ b/web/migrations/versions/a7b8c9d0e1f2_add_synced_at_to_cached_products.py @@ -0,0 +1,26 @@ +"""add synced_at to cached_products + +Revision ID: a7b8c9d0e1f2 +Revises: f6a7b8c9d0e1 +Branch Labels: None +Depends On: None + +""" +from alembic import op +import sqlalchemy as sa + +revision = "a7b8c9d0e1f2" +down_revision = "f6a7b8c9d0e1" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "cached_products", + sa.Column("synced_at", sa.DateTime(), nullable=True), + ) + + +def downgrade(): + op.drop_column("cached_products", "synced_at") diff --git a/web/models.py b/web/models.py index 390286f..fcd27b6 100644 --- a/web/models.py +++ b/web/models.py @@ -149,6 +149,7 @@ class CachedProduct(Base): article_number = Column(String(100), nullable=True) allow_to_sell = Column(Boolean, nullable=True) fetched_at = Column(DateTime, nullable=False) + synced_at = Column(DateTime, nullable=True) __table_args__ = ( UniqueConstraint("user_id", "evotor_id"), diff --git a/web/routes/vk.py b/web/routes/vk.py index 9d1c445..8927843 100644 --- a/web/routes/vk.py +++ b/web/routes/vk.py @@ -65,7 +65,8 @@ async def vk_token( data = resp.json() if "error" in data: return RedirectResponse("/vk?error=invalid_token", 303) - groups = data.get("response", {}).get("groups", []) + response = data.get("response", []) + groups = response if isinstance(response, list) else response.get("groups", []) if groups: group_id = str(groups[0].get("id", "")) group_name = groups[0].get("name") diff --git a/web/sync_engine.py b/web/sync_engine.py new file mode 100644 index 0000000..8b40f84 --- /dev/null +++ b/web/sync_engine.py @@ -0,0 +1,485 @@ +""" +Sync engine: syncs Evotor products to VK market for all enabled users. +Runs as a background asyncio loop inside the web app. +""" + +import asyncio +import logging +from datetime import datetime + +import httpx +from sqlalchemy.orm import Session + +from web.database import SessionLocal +from web.models import CachedProduct, EvotorConnection, VkConnection, SyncConfig, SyncFilter + +logger = logging.getLogger("uvicorn.error") + +VK_API_HOST = "https://api.vk.ru/method" +VK_API_VERSION = "5.199" +EVOTOR_API_BASE = "https://api.evotor.ru" +VK_CATEGORY_ID = 40932 +VK_STOCK_AMOUNT = 1000 +WEIGHT_PRICE_MULTIPLIER = 10 +WEIGHT_MEASURES = {"г", "г.", "грамм", "граммов", "гр", "гр."} + + +def _is_weight_measure(measure: str | None) -> bool: + if not measure: + return False + return measure.strip().lower() in WEIGHT_MEASURES + + +def _normalize_name(name: str) -> str: + return name.strip().replace(";", ",") + + +def _calc_price(price_kopecks, measure: str | None) -> tuple[int, str]: + """Returns (price_in_kopecks_for_vk, price_info_label).""" + base = int(price_kopecks or 0) + if _is_weight_measure(measure): + return base * WEIGHT_PRICE_MULTIPLIER, f"{WEIGHT_PRICE_MULTIPLIER}{measure}" + return base, measure or "" + + +def _build_description(name: str, price_info: str, extra_desc: str | None) -> str: + desc = f"{name} (цена за {price_info}.)\n\n" + if extra_desc: + desc += extra_desc + return desc + + +def _get_included_store_ids(filters: list) -> list[str]: + return [f.entity_id for f in filters if f.entity_type == "store" and f.filter_mode == "include"] + + +def _is_group_included(group_id: str | None, filters: list) -> bool: + """Returns True if the group should be synced based on filters.""" + group_filters = {f.entity_id: f.filter_mode for f in filters if f.entity_type == "group"} + if not group_filters: + return True # no group filters → include all + mode = group_filters.get(group_id) + if mode == "exclude": + return False + if mode == "include": + return True + # Not mentioned — include if there are only excludes, exclude if there are only includes + has_includes = any(v == "include" for v in group_filters.values()) + return not has_includes + + +def _is_product_included(product_id: str, filters: list) -> bool: + product_filters = {f.entity_id: f.filter_mode for f in filters if f.entity_type == "product"} + if not product_filters: + return True + mode = product_filters.get(product_id) + if mode == "exclude": + return False + if mode == "include": + return True + has_includes = any(v == "include" for v in product_filters.values()) + return not has_includes + + +# --------------------------------------------------------------------------- +# Evotor API helpers +# --------------------------------------------------------------------------- + +async def _evo_fetch_products(token: str, store_id: str) -> list[dict]: + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{EVOTOR_API_BASE}/stores/{store_id}/products", + headers={"Authorization": f"Bearer {token}"}, + timeout=30, + ) + if resp.status_code in (402, 404): + return [] + resp.raise_for_status() + data = resp.json() + return data.get("items", data) if isinstance(data, dict) else data + + +async def _evo_fetch_groups(token: str, store_id: str) -> list[dict]: + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{EVOTOR_API_BASE}/stores/{store_id}/product-groups", + headers={"Authorization": f"Bearer {token}"}, + timeout=30, + ) + if resp.status_code in (402, 404): + return [] + resp.raise_for_status() + data = resp.json() + return data.get("items", data) if isinstance(data, dict) else data + + +# --------------------------------------------------------------------------- +# VK API helpers +# --------------------------------------------------------------------------- + +def _vk_params(token: str, **extra) -> dict: + return {"access_token": token, "v": VK_API_VERSION, **extra} + + +async def _vk_get_albums(token: str, owner_id: str) -> list[dict]: + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{VK_API_HOST}/market.getAlbums", + params=_vk_params(token, owner_id=owner_id, count=200), + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + return data.get("response", {}).get("items", []) + + +async def _vk_create_album(token: str, owner_id: str, title: str) -> int | None: + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{VK_API_HOST}/market.addAlbum", + data=_vk_params(token, owner_id=owner_id, title=title), + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + if "error" in data: + logger.error("VK create album error: %s", data["error"]) + return None + return data.get("response", {}).get("market_album_id") + + +async def _vk_get_products(token: str, owner_id: str) -> list[dict]: + """Fetch all VK market items (handles pagination).""" + items = [] + offset = 0 + count = 200 + async with httpx.AsyncClient() as client: + while True: + resp = await client.get( + f"{VK_API_HOST}/market.get", + params=_vk_params(token, owner_id=owner_id, extended=1, + with_disabled=1, count=count, offset=offset), + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + batch = data.get("response", {}).get("items", []) + items.extend(batch) + if len(batch) < count: + break + offset += count + return items + + +async def _vk_upload_photo(token: str, group_id: str, photo_path: str) -> str | None: + """Upload a photo and return photo_id.""" + async with httpx.AsyncClient() as client: + # Get upload URL + resp = await client.get( + f"{VK_API_HOST}/market.getProductPhotoUploadServer", + params=_vk_params(token, group_id=group_id), + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + if "error" in data: + logger.error("VK get upload URL error: %s", data["error"]) + return None + upload_url = data.get("response", {}).get("upload_url") + if not upload_url: + return None + + # Upload photo + with open(photo_path, "rb") as f: + upload_resp = await client.post(upload_url, files={"file": f}, timeout=60) + upload_resp.raise_for_status() + upload_obj = upload_resp.json() + + # Save photo + save_resp = await client.post( + f"{VK_API_HOST}/market.saveProductPhoto", + data=_vk_params(token, upload_response=upload_resp.text), + timeout=30, + ) + save_resp.raise_for_status() + save_data = save_resp.json() + if "error" in save_data: + logger.error("VK save photo error: %s", save_data["error"]) + return None + return save_data.get("response", {}).get("photo_id") + + +async def _vk_create_product( + token: str, owner_id: str, name: str, description: str, + price: int, stock_amount: int, photo_id: str, album_id: int | None, +) -> int | None: + params = _vk_params( + token, + owner_id=owner_id, + name=name, + description=description, + category_id=VK_CATEGORY_ID, + price=price, + main_photo_id=photo_id, + stock_amount=stock_amount, + ) + async with httpx.AsyncClient() as client: + resp = await client.post(f"{VK_API_HOST}/market.add", data=params, timeout=30) + resp.raise_for_status() + data = resp.json() + if "error" in data: + logger.error("VK create product error: %s", data["error"]) + return None + product_id = data.get("response", {}).get("market_item_id") + if product_id and album_id: + await client.get( + f"{VK_API_HOST}/market.addToAlbum", + params=_vk_params(token, owner_id=owner_id, + item_ids=product_id, album_ids=album_id), + timeout=30, + ) + return product_id + + +async def _vk_edit_product( + token: str, owner_id: str, item_id: int, name: str, + description: str, price: int, stock_amount: int, +) -> None: + params = _vk_params( + token, + owner_id=owner_id, + item_id=item_id, + name=name, + description=description, + category_id=VK_CATEGORY_ID, + price=price, + stock_amount=stock_amount, + ) + async with httpx.AsyncClient() as client: + resp = await client.post(f"{VK_API_HOST}/market.edit", data=params, timeout=30) + resp.raise_for_status() + data = resp.json() + if "error" in data: + logger.error("VK edit product error: %s", data["error"]) + + +async def _vk_delete_product(token: str, owner_id: str, item_id: int) -> None: + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{VK_API_HOST}/market.delete", + data=_vk_params(token, owner_id=owner_id, item_id=item_id), + timeout=30, + ) + resp.raise_for_status() + + +# --------------------------------------------------------------------------- +# Main sync logic per user +# --------------------------------------------------------------------------- + +def _stamp_synced(db: Session, user_id: int, evo_id: str, now: datetime) -> None: + db.query(CachedProduct).filter( + CachedProduct.user_id == user_id, + CachedProduct.evotor_id == evo_id, + ).update({"synced_at": now}) + db.commit() + + +async def sync_user( + user_id: int, + evo_token: str, + vk_token: str, + vk_group_id: str, + filters: list, + photo_path: str, + db: Session, +) -> None: + owner_id = f"-{vk_group_id}" + now = datetime.utcnow() + logger.info("Sync start: user_id=%d vk_group=%s", user_id, vk_group_id) + + store_ids = _get_included_store_ids(filters) + if not store_ids: + logger.info("Sync skip: user_id=%d — no stores included in filters", user_id) + return + + # Collect all Evotor products and groups across included stores + evo_products: list[dict] = [] + groups_by_id: dict[str, dict] = {} + + for store_id in store_ids: + raw_groups = await _evo_fetch_groups(evo_token, store_id) + for g in raw_groups: + gid = g.get("uuid") or g.get("id") + if gid: + groups_by_id[gid] = g + + raw_products = await _evo_fetch_products(evo_token, store_id) + for p in raw_products: + pid = p.get("uuid") or p.get("id") + gid = p.get("parentUuid") or p.get("parent_id") + if not _is_group_included(gid, filters): + continue + if not _is_product_included(pid, filters): + continue + evo_products.append(p) + + # Build evo product lookup by normalized name + # {normalized_name: product_dict} + evo_by_name: dict[str, dict] = {} + for p in evo_products: + raw_name = (p.get("name") or "").strip() + norm = _normalize_name(raw_name) + evo_by_name[norm] = p + + # Fetch VK state + vk_products = await _vk_get_products(vk_token, owner_id) + vk_albums = await _vk_get_albums(vk_token, owner_id) + vk_album_by_title: dict[str, dict] = {a["title"]: a for a in vk_albums} + + # Ensure albums exist for all included groups + for gid, group in groups_by_id.items(): + if not _is_group_included(gid, filters): + continue + title = group.get("name", "") + if title and title not in vk_album_by_title: + new_album_id = await _vk_create_album(vk_token, owner_id, title) + if new_album_id: + vk_album_by_title[title] = {"id": new_album_id, "title": title} + logger.info("Created VK album '%s' for user_id=%d", title, user_id) + + # Build VK product lookup by normalized name + # {normalized_name: [vk_item, ...]} + vk_by_name: dict[str, list[dict]] = {} + for item in vk_products: + norm = _normalize_name(item.get("title", "")) + vk_by_name.setdefault(norm, []).append(item) + + # --- UPDATE / CREATE --- + for norm_name, evo_p in evo_by_name.items(): + evo_id = evo_p.get("uuid") or evo_p.get("id") + raw_name = (evo_p.get("name") or "").strip() + name_for_vk = _normalize_name(raw_name) + measure = evo_p.get("measureName") or evo_p.get("measure_name") + raw_price = evo_p.get("price") or 0 + price, price_info = _calc_price(raw_price, measure) + allow_to_sell = evo_p.get("allowToSell") if evo_p.get("allowToSell") is not None else evo_p.get("allow_to_sell") + stock_amount = VK_STOCK_AMOUNT if allow_to_sell else 0 + extra_desc = evo_p.get("description") or "" + description = _build_description(raw_name, price_info, extra_desc).strip() + + gid = evo_p.get("parentUuid") or evo_p.get("parent_id") + group_name = groups_by_id.get(gid, {}).get("name") if gid else None + album = vk_album_by_title.get(group_name) if group_name else None + album_id = album["id"] if album else None + + if norm_name in vk_by_name: + # Update existing (use first match) + vk_item = vk_by_name[norm_name][0] + vk_id = vk_item["id"] + orig_price = vk_item.get("price", {}).get("amount", 0) + orig_price_int = int(orig_price) if orig_price else 0 + orig_desc = (vk_item.get("description") or "").strip() + orig_stock = vk_item.get("stock_amount", 0) + + price_changed = price != orig_price_int + desc_changed = description != orig_desc + stock_changed = stock_amount != orig_stock + + if price_changed or desc_changed or stock_changed: + logger.info( + "Updating VK product '%s' user_id=%d (price=%s desc=%s stock=%s)", + name_for_vk, user_id, price_changed, desc_changed, stock_changed, + ) + await _vk_edit_product( + vk_token, owner_id, vk_id, name_for_vk, description, price, stock_amount, + ) + _stamp_synced(db, user_id, evo_id, now) + else: + # Create new (only if allow_to_sell) + if not allow_to_sell: + continue + photo_id = await _vk_upload_photo(vk_token, vk_group_id, photo_path) + if not photo_id: + logger.error("Skipping product '%s' — photo upload failed", name_for_vk) + continue + logger.info("Creating VK product '%s' user_id=%d", name_for_vk, user_id) + created = await _vk_create_product( + vk_token, owner_id, name_for_vk, description, + price, stock_amount, photo_id, album_id, + ) + if created: + _stamp_synced(db, user_id, evo_id, now) + + # --- DELETE products in VK that are no longer in Evo --- + for norm_name, vk_items in vk_by_name.items(): + if norm_name in evo_by_name: + # Delete duplicates (keep first) + for dup in vk_items[1:]: + logger.info("Deleting duplicate VK product '%s' id=%d user_id=%d", + norm_name, dup["id"], user_id) + await _vk_delete_product(vk_token, owner_id, dup["id"]) + else: + # Delete all — product removed from Evo + for item in vk_items: + logger.info("Deleting removed product '%s' id=%d user_id=%d", + norm_name, item["id"], user_id) + await _vk_delete_product(vk_token, owner_id, item["id"]) + + logger.info("Sync complete: user_id=%d", user_id) + + +# --------------------------------------------------------------------------- +# Background loop +# --------------------------------------------------------------------------- + +async def run_sync() -> None: + from web.config import settings + + db = SessionLocal() + try: + configs = db.query(SyncConfig).filter( + SyncConfig.is_enabled == True, + SyncConfig.confirmed_at != None, + ).all() + + for config in configs: + user_id = config.user_id + evo = db.query(EvotorConnection).filter( + EvotorConnection.user_id == user_id + ).first() + vk = db.query(VkConnection).filter( + VkConnection.user_id == user_id + ).first() + + if not evo or not vk: + continue + if not evo.access_token or not vk.access_token: + continue + if not vk.vk_user_id: + continue + + try: + await sync_user( + user_id=user_id, + evo_token=evo.access_token, + vk_token=vk.access_token, + vk_group_id=vk.vk_user_id, + filters=config.filters, + photo_path=settings.VK_DEFAULT_PHOTO_PATH, + db=db, + ) + except Exception: + logger.exception("Sync failed for user_id=%d", user_id) + + except Exception: + logger.exception("Error in sync runner") + db.rollback() + finally: + db.close() + + +async def sync_loop(interval: int) -> None: + while True: + await run_sync() + await asyncio.sleep(interval) diff --git a/web/templates/catalog_products.html b/web/templates/catalog_products.html index b8dc34d..cf9df49 100644 --- a/web/templates/catalog_products.html +++ b/web/templates/catalog_products.html @@ -40,6 +40,7 @@ Кол-во Ед. изм. В продаже + Синхронизирован Фильтр @@ -62,6 +63,15 @@ {% endif %} + + {% if product.synced_at %} + + + + {% else %} + + {% endif %} + {% if mode == "include" %} ✓ Включено