Files
evo-sync/web/tasks/vk_sync.py

431 lines
16 KiB
Python
Raw Normal View History

"""
Mirror Evotor product catalog VK Market.
Runs after both refresh_catalog and refresh_vk_catalog complete.
Only processes stores and groups that have sync enabled (via SyncFilter).
Updates VK products only when at least one synced field has changed.
"""
import logging
import os
from datetime import datetime, timezone
from decimal import Decimal
from celery import shared_task
import web.lib.api_logger as api_logger
from web.config import settings
from web.database import SessionLocal
from web.models.connections import (
CachedGroup, CachedProduct, CachedStore,
SyncConfig, SyncFilter,
VkCachedAlbum, VkConnection,
)
logger = logging.getLogger(__name__)
VK_API = "https://api.vk.com/method"
WEIGHT_MEASURES = {"г", "г.", "грамм", "граммов", "гр", "гр."}
_PHOTO_CACHE: dict[int, str] = {} # user_id → uploaded photo_id for this run
def _now() -> datetime:
return datetime.now(timezone.utc).replace(tzinfo=None)
def _is_weight(measure: str | None) -> bool:
return (measure or "").strip().lower() in WEIGHT_MEASURES
def _calc_price(price: Decimal | None, measure: str | None) -> float:
"""Return price in rubles for VK Market API. Weight items are multiplied."""
if price is None:
return 0.0
base = float(price)
if _is_weight(measure):
base *= settings.VK_WEIGHT_PRICE_MULTIPLIER
return base
def _build_description(name: str, measure: str | None, evo_description: str | None) -> str:
if _is_weight(measure):
price_info = f"{settings.VK_WEIGHT_PRICE_MULTIPLIER}{(measure or '').strip()}"
else:
price_info = (measure or "").strip()
desc = f"{name} (цена за {price_info}.)\n\n"
if evo_description:
desc += evo_description
return desc
def _name_for_vk(name: str) -> str:
return name.replace(";", ",")
def _vk_post(method: str, data: dict, token: str, user_id: int | None = None) -> dict:
data = {**data, "access_token": token, "v": settings.VK_API_VERSION}
r = api_logger.post(f"{VK_API}/{method}", user_id=user_id, data=data, timeout=30)
r.raise_for_status()
return r.json()
def _upload_photo(token: str, group_id: str, user_id: int | None = None) -> str | None:
"""Upload the default product photo and return photo_id, or None on failure."""
photo_path = settings.VK_DEFAULT_PHOTO_PATH
if not os.path.exists(photo_path):
logger.warning("Default photo not found at %s", photo_path)
return None
try:
# Step 1: get upload URL
resp = _vk_post("market.getProductPhotoUploadServer", {"group_id": group_id}, token, user_id=user_id)
if "error" in resp:
logger.warning("getProductPhotoUploadServer error: %s", resp["error"])
return None
upload_url = resp["response"]["upload_url"]
# Step 2: upload file
with open(photo_path, "rb") as f:
up = api_logger.post(upload_url, user_id=user_id, files={"file": f}, timeout=30)
up.raise_for_status()
upload_obj = up.text
# Step 3: save
resp2 = _vk_post("market.saveProductPhoto", {"upload_response": upload_obj}, token, user_id=user_id)
if "error" in resp2:
logger.warning("saveProductPhoto error: %s", resp2["error"])
return None
return str(resp2["response"]["photo_id"])
except Exception as e:
logger.warning("Photo upload failed: %s", e)
return None
def _get_photo_id(user_id: int, token: str, group_id: str) -> str | None:
"""Upload photo once per sync run per user, cache the result."""
if user_id not in _PHOTO_CACHE:
_PHOTO_CACHE[user_id] = _upload_photo(token, group_id, user_id=user_id)
return _PHOTO_CACHE[user_id]
def _enabled_store_ids(db, user_id: int) -> set[str] | None:
cfg = db.query(SyncConfig).filter_by(user_id=user_id).first()
if not cfg:
return None
filters = db.query(SyncFilter).filter_by(
sync_config_id=cfg.id, entity_type="store", filter_mode="include",
).all()
return None if not filters else {f.entity_id for f in filters}
def _enabled_group_ids(db, user_id: int, store_evotor_id: str) -> set[str] | None:
cfg = db.query(SyncConfig).filter_by(user_id=user_id).first()
if not cfg:
return None
filters = db.query(SyncFilter).filter_by(
sync_config_id=cfg.id, entity_type="group", filter_mode="include",
parent_entity_id=store_evotor_id,
).all()
return None if not filters else {f.entity_id for f in filters}
def _ensure_album(db, user_id: int, vk_group_id: str, group_name: str, token: str) -> str | None:
"""Return VK album_id for the given group name, creating it if needed."""
album = db.query(VkCachedAlbum).filter_by(
user_id=user_id, vk_group_id=vk_group_id,
).filter(VkCachedAlbum.title == group_name).first()
if album:
return album.album_id
# Create album in VK
resp = _vk_post("market.addAlbum", {
"owner_id": f"-{vk_group_id}",
"title": group_name,
}, token, user_id=user_id)
if "error" in resp:
logger.warning("market.addAlbum error for '%s': %s", group_name, resp["error"])
return None
album_id = str(resp["response"]["market_album_id"])
db.add(VkCachedAlbum(
user_id=user_id,
vk_group_id=vk_group_id,
album_id=album_id,
title=group_name,
fetched_at=_now(),
))
db.flush()
logger.info("user=%s created VK album '%s' id=%s", user_id, group_name, album_id)
return album_id
def _sync_product(
db,
user_id: int,
product: CachedProduct,
album_id: str,
vk_group_id: str,
token: str,
) -> None:
name = _name_for_vk(product.name)
price_rubles = _calc_price(product.price, product.measure_name)
desc = _build_description(product.name, product.measure_name, None)
stock = settings.VK_STOCK_AMOUNT if product.allow_to_sell else 0
owner_id = f"-{vk_group_id}"
now = _now()
if product.vk_product_id:
# Check if update needed
changed = False
# Re-read current VK state from cache
from web.models.connections import VkCachedProduct
vk_p = db.query(VkCachedProduct).filter_by(
user_id=user_id, vk_group_id=vk_group_id, vk_product_id=product.vk_product_id,
).first()
album_changed = False
if vk_p:
vk_price = float(vk_p.price) if vk_p.price is not None else 0.0
vk_stock = settings.VK_STOCK_AMOUNT if vk_p.availability == 0 else 0
vk_name = vk_p.name or ""
vk_desc = (vk_p.description or "").strip()
curr_desc = desc.strip()
album_changed = str(vk_p.album_id) != str(album_id) if album_id else False
changed = (
name != vk_name
or price_rubles != vk_price
or curr_desc != vk_desc
or stock != vk_stock
or album_changed
)
else:
changed = True # cached VK product gone, push update
if not changed:
return
resp = _vk_post("market.edit", {
"owner_id": owner_id,
"item_id": product.vk_product_id,
"name": name,
"description": desc.strip(),
"category_id": settings.VK_CATEGORY_ID,
"price": price_rubles,
"stock_amount": stock,
}, token, user_id=user_id)
if "error" in resp:
logger.warning("market.edit error product=%s: %s", product.evotor_id, resp["error"])
return
if album_changed and album_id and vk_p:
old_album_id = str(vk_p.album_id)
_vk_post("market.removeFromAlbum", {
"owner_id": owner_id,
"item_id": product.vk_product_id,
"album_ids": old_album_id,
}, token, user_id=user_id)
resp_album = _vk_post("market.addToAlbum", {
"owner_id": owner_id,
"item_ids": product.vk_product_id,
"album_ids": album_id,
}, token, user_id=user_id)
if "error" in resp_album:
logger.warning("market.addToAlbum error product=%s: %s", product.evotor_id, resp_album["error"])
else:
logger.info("user=%s moved VK product '%s' album %s%s", user_id, name, old_album_id, album_id)
product.synced_at = now
logger.info("user=%s updated VK product '%s' id=%s", user_id, name, product.vk_product_id)
else:
# Create — only if allow_to_sell
if not product.allow_to_sell:
return
photo_id = _get_photo_id(user_id, token, vk_group_id)
if not photo_id:
logger.warning("user=%s skipping create for '%s': no photo", user_id, name)
return
resp = _vk_post("market.add", {
"owner_id": owner_id,
"name": name,
"description": desc,
"category_id": settings.VK_CATEGORY_ID,
"price": price_rubles,
"main_photo_id": photo_id,
"stock_amount": stock,
}, token, user_id=user_id)
if "error" in resp:
logger.warning("market.add error product=%s: %s", product.evotor_id, resp["error"])
return
vk_item_id = str(resp["response"]["market_item_id"])
product.vk_product_id = vk_item_id
product.synced_at = now
# Add to album
resp2 = _vk_post("market.addToAlbum", {
"owner_id": owner_id,
"item_ids": vk_item_id,
"album_ids": album_id,
}, token, user_id=user_id)
if "error" in resp2:
logger.warning("market.addToAlbum error product=%s: %s", product.evotor_id, resp2["error"])
logger.info("user=%s created VK product '%s' id=%s", user_id, name, vk_item_id)
def _sync_product_list(db, user_id, products, album_id, vk_group_id, token, results, owned_ids):
for product in products:
was_new = product.vk_product_id is None
try:
_sync_product(db, user_id, product, album_id, vk_group_id, token)
if product.vk_product_id:
owned_ids.add(product.vk_product_id)
if product.synced_at:
if was_new and product.vk_product_id:
results["created"] += 1
elif not was_new:
results["updated"] += 1
else:
results["skipped"] += 1
else:
results["skipped"] += 1
except Exception as e:
logger.error("user=%s sync_product failed '%s': %s", user_id, product.name, e)
results["errors"] += 1
def _delete_orphans(db, user_id, vk_group_id, owned_ids, token, results):
from web.models.connections import VkCachedProduct
orphans = db.query(VkCachedProduct).filter_by(
user_id=user_id, vk_group_id=vk_group_id,
).filter(VkCachedProduct.vk_product_id.notin_(owned_ids)).all() if owned_ids else \
db.query(VkCachedProduct).filter_by(user_id=user_id, vk_group_id=vk_group_id).all()
owner_id = f"-{vk_group_id}"
for vk_p in orphans:
try:
resp = _vk_post("market.delete", {
"owner_id": owner_id,
"item_id": vk_p.vk_product_id,
}, token, user_id=user_id)
if "error" in resp:
logger.warning("market.delete error id=%s: %s", vk_p.vk_product_id, resp["error"])
results["errors"] += 1
else:
logger.info("user=%s deleted VK product id=%s '%s'", user_id, vk_p.vk_product_id, vk_p.name)
db.delete(vk_p)
results["deleted"] += 1
except Exception as e:
logger.error("user=%s delete failed id=%s: %s", user_id, vk_p.vk_product_id, e)
results["errors"] += 1
# Also clear vk_product_id on any cached_products that pointed to deleted VK items
if orphans:
deleted_vk_ids = {vk_p.vk_product_id for vk_p in orphans}
stale = db.query(CachedProduct).filter(
CachedProduct.user_id == user_id,
CachedProduct.vk_product_id.in_(deleted_vk_ids),
).all()
for p in stale:
p.vk_product_id = None
def _sync_user(db, user_id: int, token: str, vk_group_id: str) -> dict:
results = {"created": 0, "updated": 0, "skipped": 0, "deleted": 0, "errors": 0}
owned_ids: set[str] = set() # VK product IDs that Evotor owns this run
enabled_stores = _enabled_store_ids(db, user_id)
stores = db.query(CachedStore).filter_by(user_id=user_id).all()
for store in stores:
if enabled_stores is not None and store.evotor_id not in enabled_stores:
continue
enabled_groups = _enabled_group_ids(db, user_id, store.evotor_id)
groups = db.query(CachedGroup).filter_by(
user_id=user_id, store_evotor_id=store.evotor_id,
).all()
for group in groups:
if enabled_groups is not None and group.evotor_id not in enabled_groups:
continue
try:
album_id = _ensure_album(db, user_id, vk_group_id, group.name, token)
except Exception as e:
logger.error("user=%s ensure_album failed for '%s': %s", user_id, group.name, e)
results["errors"] += 1
continue
if not album_id:
results["errors"] += 1
continue
products = db.query(CachedProduct).filter_by(
user_id=user_id, store_evotor_id=store.evotor_id, group_evotor_id=group.evotor_id,
).all()
_sync_product_list(db, user_id, products, album_id, vk_group_id, token, results, owned_ids)
# Ungrouped products → "Без категории" album
ungrouped = db.query(CachedProduct).filter_by(
user_id=user_id, store_evotor_id=store.evotor_id, group_evotor_id=None,
).all()
if ungrouped:
try:
fallback_album_id = _ensure_album(db, user_id, vk_group_id, "Без категории", token)
except Exception as e:
logger.error("user=%s ensure_album failed for 'Без категории': %s", user_id, e)
fallback_album_id = None
if fallback_album_id:
_sync_product_list(db, user_id, ungrouped, fallback_album_id, vk_group_id, token, results, owned_ids)
# Delete VK products not owned by any Evotor product
_delete_orphans(db, user_id, vk_group_id, owned_ids, token, results)
db.commit()
return results
@shared_task(
name="web.tasks.vk_sync.mirror_to_vk",
queue="sync",
bind=True,
max_retries=1,
default_retry_delay=120,
)
def mirror_to_vk(self) -> dict:
"""Mirror Evotor catalog → VK Market for all users with both connections active."""
_PHOTO_CACHE.clear()
db = SessionLocal()
totals = {"ok": 0, "failed": 0}
try:
vk_connections = (
db.query(VkConnection)
.filter(
VkConnection.user_id.isnot(None),
VkConnection.access_token.isnot(None),
VkConnection.access_token != "",
VkConnection.vk_user_id.isnot(None),
VkConnection.vk_user_id != "",
)
.all()
)
for conn in vk_connections:
cfg = db.query(SyncConfig).filter_by(user_id=conn.user_id).first()
if not cfg or not cfg.is_enabled:
continue
try:
result = _sync_user(db, conn.user_id, conn.access_token, conn.vk_user_id)
logger.info("user=%s mirror_to_vk: %s", conn.user_id, result)
totals["ok"] += 1
except Exception as exc:
logger.error("mirror_to_vk failed for user=%s: %s", conn.user_id, exc)
totals["failed"] += 1
finally:
db.close()
logger.info("mirror_to_vk done: %s", totals)
return totals