Files
evo-sync/web-python/sync_engine.py

486 lines
17 KiB
Python
Raw Normal View History

"""
Sync engine: syncs Evotor products to VK market for all enabled users.
Runs as a background asyncio loop inside the web app.
"""
import asyncio
import logging
from datetime import datetime
import httpx
from sqlalchemy.orm import Session
from web.database import SessionLocal
from web.models import CachedProduct, EvotorConnection, VkConnection, SyncConfig, SyncFilter
logger = logging.getLogger("uvicorn.error")
VK_API_HOST = "https://api.vk.ru/method"
VK_API_VERSION = "5.199"
EVOTOR_API_BASE = "https://api.evotor.ru"
VK_CATEGORY_ID = 40932
VK_STOCK_AMOUNT = 1000
WEIGHT_PRICE_MULTIPLIER = 10
WEIGHT_MEASURES = {"г", "г.", "грамм", "граммов", "гр", "гр."}
def _is_weight_measure(measure: str | None) -> bool:
if not measure:
return False
return measure.strip().lower() in WEIGHT_MEASURES
def _normalize_name(name: str) -> str:
return name.strip().replace(";", ",")
def _calc_price(price_kopecks, measure: str | None) -> tuple[int, str]:
"""Returns (price_in_kopecks_for_vk, price_info_label)."""
base = int(price_kopecks or 0)
if _is_weight_measure(measure):
return base * WEIGHT_PRICE_MULTIPLIER, f"{WEIGHT_PRICE_MULTIPLIER}{measure}"
return base, measure or ""
def _build_description(name: str, price_info: str, extra_desc: str | None) -> str:
desc = f"{name} (цена за {price_info}.)\n\n"
if extra_desc:
desc += extra_desc
return desc
def _get_included_store_ids(filters: list) -> list[str]:
return [f.entity_id for f in filters if f.entity_type == "store" and f.filter_mode == "include"]
def _is_group_included(group_id: str | None, filters: list) -> bool:
"""Returns True if the group should be synced based on filters."""
group_filters = {f.entity_id: f.filter_mode for f in filters if f.entity_type == "group"}
if not group_filters:
return True # no group filters → include all
mode = group_filters.get(group_id)
if mode == "exclude":
return False
if mode == "include":
return True
# Not mentioned — include if there are only excludes, exclude if there are only includes
has_includes = any(v == "include" for v in group_filters.values())
return not has_includes
def _is_product_included(product_id: str, filters: list) -> bool:
product_filters = {f.entity_id: f.filter_mode for f in filters if f.entity_type == "product"}
if not product_filters:
return True
mode = product_filters.get(product_id)
if mode == "exclude":
return False
if mode == "include":
return True
has_includes = any(v == "include" for v in product_filters.values())
return not has_includes
# ---------------------------------------------------------------------------
# Evotor API helpers
# ---------------------------------------------------------------------------
async def _evo_fetch_products(token: str, store_id: str) -> list[dict]:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{EVOTOR_API_BASE}/stores/{store_id}/products",
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
if resp.status_code in (402, 404):
return []
resp.raise_for_status()
data = resp.json()
return data.get("items", data) if isinstance(data, dict) else data
async def _evo_fetch_groups(token: str, store_id: str) -> list[dict]:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{EVOTOR_API_BASE}/stores/{store_id}/product-groups",
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
if resp.status_code in (402, 404):
return []
resp.raise_for_status()
data = resp.json()
return data.get("items", data) if isinstance(data, dict) else data
# ---------------------------------------------------------------------------
# VK API helpers
# ---------------------------------------------------------------------------
def _vk_params(token: str, **extra) -> dict:
return {"access_token": token, "v": VK_API_VERSION, **extra}
async def _vk_get_albums(token: str, owner_id: str) -> list[dict]:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{VK_API_HOST}/market.getAlbums",
params=_vk_params(token, owner_id=owner_id, count=200),
timeout=30,
)
resp.raise_for_status()
data = resp.json()
return data.get("response", {}).get("items", [])
async def _vk_create_album(token: str, owner_id: str, title: str) -> int | None:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{VK_API_HOST}/market.addAlbum",
data=_vk_params(token, owner_id=owner_id, title=title),
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if "error" in data:
logger.error("VK create album error: %s", data["error"])
return None
return data.get("response", {}).get("market_album_id")
async def _vk_get_products(token: str, owner_id: str) -> list[dict]:
"""Fetch all VK market items (handles pagination)."""
items = []
offset = 0
count = 200
async with httpx.AsyncClient() as client:
while True:
resp = await client.get(
f"{VK_API_HOST}/market.get",
params=_vk_params(token, owner_id=owner_id, extended=1,
with_disabled=1, count=count, offset=offset),
timeout=30,
)
resp.raise_for_status()
data = resp.json()
batch = data.get("response", {}).get("items", [])
items.extend(batch)
if len(batch) < count:
break
offset += count
return items
async def _vk_upload_photo(token: str, group_id: str, photo_path: str) -> str | None:
"""Upload a photo and return photo_id."""
async with httpx.AsyncClient() as client:
# Get upload URL
resp = await client.get(
f"{VK_API_HOST}/market.getProductPhotoUploadServer",
params=_vk_params(token, group_id=group_id),
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if "error" in data:
logger.error("VK get upload URL error: %s", data["error"])
return None
upload_url = data.get("response", {}).get("upload_url")
if not upload_url:
return None
# Upload photo
with open(photo_path, "rb") as f:
upload_resp = await client.post(upload_url, files={"file": f}, timeout=60)
upload_resp.raise_for_status()
upload_obj = upload_resp.json()
# Save photo
save_resp = await client.post(
f"{VK_API_HOST}/market.saveProductPhoto",
data=_vk_params(token, upload_response=upload_resp.text),
timeout=30,
)
save_resp.raise_for_status()
save_data = save_resp.json()
if "error" in save_data:
logger.error("VK save photo error: %s", save_data["error"])
return None
return save_data.get("response", {}).get("photo_id")
async def _vk_create_product(
token: str, owner_id: str, name: str, description: str,
price: int, stock_amount: int, photo_id: str, album_id: int | None,
) -> int | None:
params = _vk_params(
token,
owner_id=owner_id,
name=name,
description=description,
category_id=VK_CATEGORY_ID,
price=price,
main_photo_id=photo_id,
stock_amount=stock_amount,
)
async with httpx.AsyncClient() as client:
resp = await client.post(f"{VK_API_HOST}/market.add", data=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if "error" in data:
logger.error("VK create product error: %s", data["error"])
return None
product_id = data.get("response", {}).get("market_item_id")
if product_id and album_id:
await client.get(
f"{VK_API_HOST}/market.addToAlbum",
params=_vk_params(token, owner_id=owner_id,
item_ids=product_id, album_ids=album_id),
timeout=30,
)
return product_id
async def _vk_edit_product(
token: str, owner_id: str, item_id: int, name: str,
description: str, price: int, stock_amount: int,
) -> None:
params = _vk_params(
token,
owner_id=owner_id,
item_id=item_id,
name=name,
description=description,
category_id=VK_CATEGORY_ID,
price=price,
stock_amount=stock_amount,
)
async with httpx.AsyncClient() as client:
resp = await client.post(f"{VK_API_HOST}/market.edit", data=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if "error" in data:
logger.error("VK edit product error: %s", data["error"])
async def _vk_delete_product(token: str, owner_id: str, item_id: int) -> None:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{VK_API_HOST}/market.delete",
data=_vk_params(token, owner_id=owner_id, item_id=item_id),
timeout=30,
)
resp.raise_for_status()
# ---------------------------------------------------------------------------
# Main sync logic per user
# ---------------------------------------------------------------------------
def _stamp_synced(db: Session, user_id: int, evo_id: str, now: datetime) -> None:
db.query(CachedProduct).filter(
CachedProduct.user_id == user_id,
CachedProduct.evotor_id == evo_id,
).update({"synced_at": now})
db.commit()
async def sync_user(
user_id: int,
evo_token: str,
vk_token: str,
vk_group_id: str,
filters: list,
photo_path: str,
db: Session,
) -> None:
owner_id = f"-{vk_group_id}"
now = datetime.utcnow()
logger.info("Sync start: user_id=%d vk_group=%s", user_id, vk_group_id)
store_ids = _get_included_store_ids(filters)
if not store_ids:
logger.info("Sync skip: user_id=%d — no stores included in filters", user_id)
return
# Collect all Evotor products and groups across included stores
evo_products: list[dict] = []
groups_by_id: dict[str, dict] = {}
for store_id in store_ids:
raw_groups = await _evo_fetch_groups(evo_token, store_id)
for g in raw_groups:
gid = g.get("uuid") or g.get("id")
if gid:
groups_by_id[gid] = g
raw_products = await _evo_fetch_products(evo_token, store_id)
for p in raw_products:
pid = p.get("uuid") or p.get("id")
gid = p.get("parentUuid") or p.get("parent_id")
if not _is_group_included(gid, filters):
continue
if not _is_product_included(pid, filters):
continue
evo_products.append(p)
# Build evo product lookup by normalized name
# {normalized_name: product_dict}
evo_by_name: dict[str, dict] = {}
for p in evo_products:
raw_name = (p.get("name") or "").strip()
norm = _normalize_name(raw_name)
evo_by_name[norm] = p
# Fetch VK state
vk_products = await _vk_get_products(vk_token, owner_id)
vk_albums = await _vk_get_albums(vk_token, owner_id)
vk_album_by_title: dict[str, dict] = {a["title"]: a for a in vk_albums}
# Ensure albums exist for all included groups
for gid, group in groups_by_id.items():
if not _is_group_included(gid, filters):
continue
title = group.get("name", "")
if title and title not in vk_album_by_title:
new_album_id = await _vk_create_album(vk_token, owner_id, title)
if new_album_id:
vk_album_by_title[title] = {"id": new_album_id, "title": title}
logger.info("Created VK album '%s' for user_id=%d", title, user_id)
# Build VK product lookup by normalized name
# {normalized_name: [vk_item, ...]}
vk_by_name: dict[str, list[dict]] = {}
for item in vk_products:
norm = _normalize_name(item.get("title", ""))
vk_by_name.setdefault(norm, []).append(item)
# --- UPDATE / CREATE ---
for norm_name, evo_p in evo_by_name.items():
evo_id = evo_p.get("uuid") or evo_p.get("id")
raw_name = (evo_p.get("name") or "").strip()
name_for_vk = _normalize_name(raw_name)
measure = evo_p.get("measureName") or evo_p.get("measure_name")
raw_price = evo_p.get("price") or 0
price, price_info = _calc_price(raw_price, measure)
allow_to_sell = evo_p.get("allowToSell") if evo_p.get("allowToSell") is not None else evo_p.get("allow_to_sell")
stock_amount = VK_STOCK_AMOUNT if allow_to_sell else 0
extra_desc = evo_p.get("description") or ""
description = _build_description(raw_name, price_info, extra_desc).strip()
gid = evo_p.get("parentUuid") or evo_p.get("parent_id")
group_name = groups_by_id.get(gid, {}).get("name") if gid else None
album = vk_album_by_title.get(group_name) if group_name else None
album_id = album["id"] if album else None
if norm_name in vk_by_name:
# Update existing (use first match)
vk_item = vk_by_name[norm_name][0]
vk_id = vk_item["id"]
orig_price = vk_item.get("price", {}).get("amount", 0)
orig_price_int = int(orig_price) if orig_price else 0
orig_desc = (vk_item.get("description") or "").strip()
orig_stock = vk_item.get("stock_amount", 0)
price_changed = price != orig_price_int
desc_changed = description != orig_desc
stock_changed = stock_amount != orig_stock
if price_changed or desc_changed or stock_changed:
logger.info(
"Updating VK product '%s' user_id=%d (price=%s desc=%s stock=%s)",
name_for_vk, user_id, price_changed, desc_changed, stock_changed,
)
await _vk_edit_product(
vk_token, owner_id, vk_id, name_for_vk, description, price, stock_amount,
)
_stamp_synced(db, user_id, evo_id, now)
else:
# Create new (only if allow_to_sell)
if not allow_to_sell:
continue
photo_id = await _vk_upload_photo(vk_token, vk_group_id, photo_path)
if not photo_id:
logger.error("Skipping product '%s' — photo upload failed", name_for_vk)
continue
logger.info("Creating VK product '%s' user_id=%d", name_for_vk, user_id)
created = await _vk_create_product(
vk_token, owner_id, name_for_vk, description,
price, stock_amount, photo_id, album_id,
)
if created:
_stamp_synced(db, user_id, evo_id, now)
# --- DELETE products in VK that are no longer in Evo ---
for norm_name, vk_items in vk_by_name.items():
if norm_name in evo_by_name:
# Delete duplicates (keep first)
for dup in vk_items[1:]:
logger.info("Deleting duplicate VK product '%s' id=%d user_id=%d",
norm_name, dup["id"], user_id)
await _vk_delete_product(vk_token, owner_id, dup["id"])
else:
# Delete all — product removed from Evo
for item in vk_items:
logger.info("Deleting removed product '%s' id=%d user_id=%d",
norm_name, item["id"], user_id)
await _vk_delete_product(vk_token, owner_id, item["id"])
logger.info("Sync complete: user_id=%d", user_id)
# ---------------------------------------------------------------------------
# Background loop
# ---------------------------------------------------------------------------
async def run_sync() -> None:
from web.config import settings
db = SessionLocal()
try:
configs = db.query(SyncConfig).filter(
SyncConfig.is_enabled == True,
SyncConfig.confirmed_at != None,
).all()
for config in configs:
user_id = config.user_id
evo = db.query(EvotorConnection).filter(
EvotorConnection.user_id == user_id
).first()
vk = db.query(VkConnection).filter(
VkConnection.user_id == user_id
).first()
if not evo or not vk:
continue
if not evo.access_token or not vk.access_token:
continue
if not vk.vk_user_id:
continue
try:
await sync_user(
user_id=user_id,
evo_token=evo.access_token,
vk_token=vk.access_token,
vk_group_id=vk.vk_user_id,
filters=config.filters,
photo_path=settings.VK_DEFAULT_PHOTO_PATH,
db=db,
)
except Exception:
logger.exception("Sync failed for user_id=%d", user_id)
except Exception:
logger.exception("Error in sync runner")
db.rollback()
finally:
db.close()
async def sync_loop(interval: int) -> None:
while True:
await run_sync()
await asyncio.sleep(interval)