410 lines
15 KiB
Python
410 lines
15 KiB
Python
|
|
"""
|
|||
|
|
Mirror Evotor product catalog → VK Market.
|
|||
|
|
|
|||
|
|
Runs after both refresh_catalog and refresh_vk_catalog complete.
|
|||
|
|
Only processes stores and groups that have sync enabled (via SyncFilter).
|
|||
|
|
Updates VK products only when at least one synced field has changed.
|
|||
|
|
"""
|
|||
|
|
import logging
|
|||
|
|
import os
|
|||
|
|
from datetime import datetime, timezone
|
|||
|
|
from decimal import Decimal
|
|||
|
|
|
|||
|
|
import httpx
|
|||
|
|
from celery import shared_task
|
|||
|
|
|
|||
|
|
from web.config import settings
|
|||
|
|
from web.database import SessionLocal
|
|||
|
|
from web.models.connections import (
|
|||
|
|
CachedGroup, CachedProduct, CachedStore,
|
|||
|
|
SyncConfig, SyncFilter,
|
|||
|
|
VkCachedAlbum, VkConnection,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
VK_API = "https://api.vk.com/method"
|
|||
|
|
|
|||
|
|
WEIGHT_MEASURES = {"г", "г.", "грамм", "граммов", "гр", "гр."}
|
|||
|
|
|
|||
|
|
_PHOTO_CACHE: dict[int, str] = {} # user_id → uploaded photo_id for this run
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _now() -> datetime:
|
|||
|
|
return datetime.now(timezone.utc).replace(tzinfo=None)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _is_weight(measure: str | None) -> bool:
|
|||
|
|
return (measure or "").strip().lower() in WEIGHT_MEASURES
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _calc_price(price: Decimal | None, measure: str | None) -> int:
|
|||
|
|
"""Return price in VK kopecks (integer). Weight items are multiplied."""
|
|||
|
|
if price is None:
|
|||
|
|
return 0
|
|||
|
|
base = int(price)
|
|||
|
|
if _is_weight(measure):
|
|||
|
|
base *= settings.VK_WEIGHT_PRICE_MULTIPLIER
|
|||
|
|
return base * 100 # kopecks
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _build_description(name: str, measure: str | None, evo_description: str | None) -> str:
|
|||
|
|
if _is_weight(measure):
|
|||
|
|
price_info = f"{settings.VK_WEIGHT_PRICE_MULTIPLIER}{(measure or '').strip()}"
|
|||
|
|
else:
|
|||
|
|
price_info = (measure or "").strip()
|
|||
|
|
desc = f"{name} (цена за {price_info}.)\n\n"
|
|||
|
|
if evo_description:
|
|||
|
|
desc += evo_description
|
|||
|
|
return desc
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _name_for_vk(name: str) -> str:
|
|||
|
|
return name.replace(";", ",")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _vk_post(method: str, data: dict, token: str) -> dict:
|
|||
|
|
data = {**data, "access_token": token, "v": settings.VK_API_VERSION}
|
|||
|
|
r = httpx.post(f"{VK_API}/{method}", data=data, timeout=30)
|
|||
|
|
r.raise_for_status()
|
|||
|
|
return r.json()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _upload_photo(token: str, group_id: str) -> str | None:
|
|||
|
|
"""Upload the default product photo and return photo_id, or None on failure."""
|
|||
|
|
photo_path = settings.VK_DEFAULT_PHOTO_PATH
|
|||
|
|
if not os.path.exists(photo_path):
|
|||
|
|
logger.warning("Default photo not found at %s", photo_path)
|
|||
|
|
return None
|
|||
|
|
try:
|
|||
|
|
# Step 1: get upload URL
|
|||
|
|
resp = _vk_post("market.getProductPhotoUploadServer", {"group_id": group_id}, token)
|
|||
|
|
if "error" in resp:
|
|||
|
|
logger.warning("getProductPhotoUploadServer error: %s", resp["error"])
|
|||
|
|
return None
|
|||
|
|
upload_url = resp["response"]["upload_url"]
|
|||
|
|
|
|||
|
|
# Step 2: upload file
|
|||
|
|
with open(photo_path, "rb") as f:
|
|||
|
|
up = httpx.post(upload_url, files={"file": f}, timeout=30)
|
|||
|
|
up.raise_for_status()
|
|||
|
|
upload_obj = up.text
|
|||
|
|
|
|||
|
|
# Step 3: save
|
|||
|
|
resp2 = _vk_post("market.saveProductPhoto", {"upload_response": upload_obj}, token)
|
|||
|
|
if "error" in resp2:
|
|||
|
|
logger.warning("saveProductPhoto error: %s", resp2["error"])
|
|||
|
|
return None
|
|||
|
|
return str(resp2["response"]["photo_id"])
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning("Photo upload failed: %s", e)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _get_photo_id(user_id: int, token: str, group_id: str) -> str | None:
|
|||
|
|
"""Upload photo once per sync run per user, cache the result."""
|
|||
|
|
if user_id not in _PHOTO_CACHE:
|
|||
|
|
_PHOTO_CACHE[user_id] = _upload_photo(token, group_id)
|
|||
|
|
return _PHOTO_CACHE[user_id]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _enabled_store_ids(db, user_id: int) -> set[str] | None:
|
|||
|
|
cfg = db.query(SyncConfig).filter_by(user_id=user_id).first()
|
|||
|
|
if not cfg:
|
|||
|
|
return None
|
|||
|
|
filters = db.query(SyncFilter).filter_by(
|
|||
|
|
sync_config_id=cfg.id, entity_type="store", filter_mode="include",
|
|||
|
|
).all()
|
|||
|
|
return None if not filters else {f.entity_id for f in filters}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _enabled_group_ids(db, user_id: int, store_evotor_id: str) -> set[str] | None:
|
|||
|
|
cfg = db.query(SyncConfig).filter_by(user_id=user_id).first()
|
|||
|
|
if not cfg:
|
|||
|
|
return None
|
|||
|
|
filters = db.query(SyncFilter).filter_by(
|
|||
|
|
sync_config_id=cfg.id, entity_type="group", filter_mode="include",
|
|||
|
|
parent_entity_id=store_evotor_id,
|
|||
|
|
).all()
|
|||
|
|
return None if not filters else {f.entity_id for f in filters}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _ensure_album(db, user_id: int, vk_group_id: str, group_name: str, token: str) -> str | None:
|
|||
|
|
"""Return VK album_id for the given group name, creating it if needed."""
|
|||
|
|
album = db.query(VkCachedAlbum).filter_by(
|
|||
|
|
user_id=user_id, vk_group_id=vk_group_id,
|
|||
|
|
).filter(VkCachedAlbum.title == group_name).first()
|
|||
|
|
|
|||
|
|
if album:
|
|||
|
|
return album.album_id
|
|||
|
|
|
|||
|
|
# Create album in VK
|
|||
|
|
resp = _vk_post("market.addAlbum", {
|
|||
|
|
"owner_id": f"-{vk_group_id}",
|
|||
|
|
"title": group_name,
|
|||
|
|
}, token)
|
|||
|
|
if "error" in resp:
|
|||
|
|
logger.warning("market.addAlbum error for '%s': %s", group_name, resp["error"])
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
album_id = str(resp["response"]["market_album_id"])
|
|||
|
|
db.add(VkCachedAlbum(
|
|||
|
|
user_id=user_id,
|
|||
|
|
vk_group_id=vk_group_id,
|
|||
|
|
album_id=album_id,
|
|||
|
|
title=group_name,
|
|||
|
|
fetched_at=_now(),
|
|||
|
|
))
|
|||
|
|
db.flush()
|
|||
|
|
logger.info("user=%s created VK album '%s' id=%s", user_id, group_name, album_id)
|
|||
|
|
return album_id
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _sync_product(
|
|||
|
|
db,
|
|||
|
|
user_id: int,
|
|||
|
|
product: CachedProduct,
|
|||
|
|
album_id: str,
|
|||
|
|
vk_group_id: str,
|
|||
|
|
token: str,
|
|||
|
|
) -> None:
|
|||
|
|
name = _name_for_vk(product.name)
|
|||
|
|
price_kopecks = _calc_price(product.price, product.measure_name)
|
|||
|
|
desc = _build_description(product.name, product.measure_name, None)
|
|||
|
|
stock = settings.VK_STOCK_AMOUNT if product.allow_to_sell else 0
|
|||
|
|
owner_id = f"-{vk_group_id}"
|
|||
|
|
now = _now()
|
|||
|
|
|
|||
|
|
if product.vk_product_id:
|
|||
|
|
# Check if update needed
|
|||
|
|
changed = False
|
|||
|
|
# Re-read current VK state from cache
|
|||
|
|
from web.models.connections import VkCachedProduct
|
|||
|
|
vk_p = db.query(VkCachedProduct).filter_by(
|
|||
|
|
user_id=user_id, vk_group_id=vk_group_id, vk_product_id=product.vk_product_id,
|
|||
|
|
).first()
|
|||
|
|
if vk_p:
|
|||
|
|
vk_price_kopecks = int(vk_p.price * 100) if vk_p.price is not None else 0
|
|||
|
|
vk_stock = settings.VK_STOCK_AMOUNT if vk_p.availability == 0 else 0
|
|||
|
|
vk_name = vk_p.name or ""
|
|||
|
|
vk_desc = (vk_p.description or "").strip()
|
|||
|
|
curr_desc = desc.strip()
|
|||
|
|
changed = (
|
|||
|
|
name != vk_name
|
|||
|
|
or price_kopecks != vk_price_kopecks
|
|||
|
|
or curr_desc != vk_desc
|
|||
|
|
or stock != vk_stock
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
changed = True # cached VK product gone, push update
|
|||
|
|
|
|||
|
|
if not changed:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
resp = _vk_post("market.edit", {
|
|||
|
|
"owner_id": owner_id,
|
|||
|
|
"item_id": product.vk_product_id,
|
|||
|
|
"name": name,
|
|||
|
|
"description": desc.strip(),
|
|||
|
|
"category_id": settings.VK_CATEGORY_ID,
|
|||
|
|
"price": price_kopecks,
|
|||
|
|
"stock_amount": stock,
|
|||
|
|
}, token)
|
|||
|
|
if "error" in resp:
|
|||
|
|
logger.warning("market.edit error product=%s: %s", product.evotor_id, resp["error"])
|
|||
|
|
return
|
|||
|
|
product.synced_at = now
|
|||
|
|
logger.info("user=%s updated VK product '%s' id=%s", user_id, name, product.vk_product_id)
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
# Create — only if allow_to_sell
|
|||
|
|
if not product.allow_to_sell:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
photo_id = _get_photo_id(user_id, token, vk_group_id)
|
|||
|
|
if not photo_id:
|
|||
|
|
logger.warning("user=%s skipping create for '%s': no photo", user_id, name)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
resp = _vk_post("market.add", {
|
|||
|
|
"owner_id": owner_id,
|
|||
|
|
"name": name,
|
|||
|
|
"description": desc,
|
|||
|
|
"category_id": settings.VK_CATEGORY_ID,
|
|||
|
|
"price": price_kopecks,
|
|||
|
|
"main_photo_id": photo_id,
|
|||
|
|
"stock_amount": stock,
|
|||
|
|
}, token)
|
|||
|
|
if "error" in resp:
|
|||
|
|
logger.warning("market.add error product=%s: %s", product.evotor_id, resp["error"])
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
vk_item_id = str(resp["response"]["market_item_id"])
|
|||
|
|
product.vk_product_id = vk_item_id
|
|||
|
|
product.synced_at = now
|
|||
|
|
|
|||
|
|
# Add to album
|
|||
|
|
resp2 = _vk_post("market.addToAlbum", {
|
|||
|
|
"owner_id": owner_id,
|
|||
|
|
"item_ids": vk_item_id,
|
|||
|
|
"album_ids": album_id,
|
|||
|
|
}, token)
|
|||
|
|
if "error" in resp2:
|
|||
|
|
logger.warning("market.addToAlbum error product=%s: %s", product.evotor_id, resp2["error"])
|
|||
|
|
|
|||
|
|
logger.info("user=%s created VK product '%s' id=%s", user_id, name, vk_item_id)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _sync_product_list(db, user_id, products, album_id, vk_group_id, token, results, owned_ids):
|
|||
|
|
for product in products:
|
|||
|
|
was_new = product.vk_product_id is None
|
|||
|
|
try:
|
|||
|
|
_sync_product(db, user_id, product, album_id, vk_group_id, token)
|
|||
|
|
if product.vk_product_id:
|
|||
|
|
owned_ids.add(product.vk_product_id)
|
|||
|
|
if product.synced_at:
|
|||
|
|
if was_new and product.vk_product_id:
|
|||
|
|
results["created"] += 1
|
|||
|
|
elif not was_new:
|
|||
|
|
results["updated"] += 1
|
|||
|
|
else:
|
|||
|
|
results["skipped"] += 1
|
|||
|
|
else:
|
|||
|
|
results["skipped"] += 1
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("user=%s sync_product failed '%s': %s", user_id, product.name, e)
|
|||
|
|
results["errors"] += 1
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _delete_orphans(db, user_id, vk_group_id, owned_ids, token, results):
|
|||
|
|
from web.models.connections import VkCachedProduct
|
|||
|
|
orphans = db.query(VkCachedProduct).filter_by(
|
|||
|
|
user_id=user_id, vk_group_id=vk_group_id,
|
|||
|
|
).filter(VkCachedProduct.vk_product_id.notin_(owned_ids)).all() if owned_ids else \
|
|||
|
|
db.query(VkCachedProduct).filter_by(user_id=user_id, vk_group_id=vk_group_id).all()
|
|||
|
|
|
|||
|
|
owner_id = f"-{vk_group_id}"
|
|||
|
|
for vk_p in orphans:
|
|||
|
|
try:
|
|||
|
|
resp = _vk_post("market.delete", {
|
|||
|
|
"owner_id": owner_id,
|
|||
|
|
"item_id": vk_p.vk_product_id,
|
|||
|
|
}, token)
|
|||
|
|
if "error" in resp:
|
|||
|
|
logger.warning("market.delete error id=%s: %s", vk_p.vk_product_id, resp["error"])
|
|||
|
|
results["errors"] += 1
|
|||
|
|
else:
|
|||
|
|
logger.info("user=%s deleted VK product id=%s '%s'", user_id, vk_p.vk_product_id, vk_p.name)
|
|||
|
|
db.delete(vk_p)
|
|||
|
|
results["deleted"] += 1
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("user=%s delete failed id=%s: %s", user_id, vk_p.vk_product_id, e)
|
|||
|
|
results["errors"] += 1
|
|||
|
|
|
|||
|
|
# Also clear vk_product_id on any cached_products that pointed to deleted VK items
|
|||
|
|
if orphans:
|
|||
|
|
deleted_vk_ids = {vk_p.vk_product_id for vk_p in orphans}
|
|||
|
|
stale = db.query(CachedProduct).filter(
|
|||
|
|
CachedProduct.user_id == user_id,
|
|||
|
|
CachedProduct.vk_product_id.in_(deleted_vk_ids),
|
|||
|
|
).all()
|
|||
|
|
for p in stale:
|
|||
|
|
p.vk_product_id = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _sync_user(db, user_id: int, token: str, vk_group_id: str) -> dict:
|
|||
|
|
results = {"created": 0, "updated": 0, "skipped": 0, "deleted": 0, "errors": 0}
|
|||
|
|
owned_ids: set[str] = set() # VK product IDs that Evotor owns this run
|
|||
|
|
|
|||
|
|
enabled_stores = _enabled_store_ids(db, user_id)
|
|||
|
|
|
|||
|
|
stores = db.query(CachedStore).filter_by(user_id=user_id).all()
|
|||
|
|
for store in stores:
|
|||
|
|
if enabled_stores is not None and store.evotor_id not in enabled_stores:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
enabled_groups = _enabled_group_ids(db, user_id, store.evotor_id)
|
|||
|
|
|
|||
|
|
groups = db.query(CachedGroup).filter_by(
|
|||
|
|
user_id=user_id, store_evotor_id=store.evotor_id,
|
|||
|
|
).all()
|
|||
|
|
|
|||
|
|
for group in groups:
|
|||
|
|
if enabled_groups is not None and group.evotor_id not in enabled_groups:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
album_id = _ensure_album(db, user_id, vk_group_id, group.name, token)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("user=%s ensure_album failed for '%s': %s", user_id, group.name, e)
|
|||
|
|
results["errors"] += 1
|
|||
|
|
continue
|
|||
|
|
if not album_id:
|
|||
|
|
results["errors"] += 1
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
products = db.query(CachedProduct).filter_by(
|
|||
|
|
user_id=user_id, store_evotor_id=store.evotor_id, group_evotor_id=group.evotor_id,
|
|||
|
|
).all()
|
|||
|
|
_sync_product_list(db, user_id, products, album_id, vk_group_id, token, results, owned_ids)
|
|||
|
|
|
|||
|
|
# Ungrouped products → "Без категории" album
|
|||
|
|
ungrouped = db.query(CachedProduct).filter_by(
|
|||
|
|
user_id=user_id, store_evotor_id=store.evotor_id, group_evotor_id=None,
|
|||
|
|
).all()
|
|||
|
|
if ungrouped:
|
|||
|
|
try:
|
|||
|
|
fallback_album_id = _ensure_album(db, user_id, vk_group_id, "Без категории", token)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("user=%s ensure_album failed for 'Без категории': %s", user_id, e)
|
|||
|
|
fallback_album_id = None
|
|||
|
|
if fallback_album_id:
|
|||
|
|
_sync_product_list(db, user_id, ungrouped, fallback_album_id, vk_group_id, token, results, owned_ids)
|
|||
|
|
|
|||
|
|
# Delete VK products not owned by any Evotor product
|
|||
|
|
_delete_orphans(db, user_id, vk_group_id, owned_ids, token, results)
|
|||
|
|
|
|||
|
|
db.commit()
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
|
|||
|
|
@shared_task(
|
|||
|
|
name="web.tasks.vk_sync.mirror_to_vk",
|
|||
|
|
queue="sync",
|
|||
|
|
bind=True,
|
|||
|
|
max_retries=1,
|
|||
|
|
default_retry_delay=120,
|
|||
|
|
)
|
|||
|
|
def mirror_to_vk(self) -> dict:
|
|||
|
|
"""Mirror Evotor catalog → VK Market for all users with both connections active."""
|
|||
|
|
_PHOTO_CACHE.clear()
|
|||
|
|
db = SessionLocal()
|
|||
|
|
totals = {"ok": 0, "failed": 0}
|
|||
|
|
try:
|
|||
|
|
vk_connections = (
|
|||
|
|
db.query(VkConnection)
|
|||
|
|
.filter(
|
|||
|
|
VkConnection.user_id.isnot(None),
|
|||
|
|
VkConnection.access_token.isnot(None),
|
|||
|
|
VkConnection.access_token != "",
|
|||
|
|
VkConnection.vk_user_id.isnot(None),
|
|||
|
|
VkConnection.vk_user_id != "",
|
|||
|
|
)
|
|||
|
|
.all()
|
|||
|
|
)
|
|||
|
|
for conn in vk_connections:
|
|||
|
|
cfg = db.query(SyncConfig).filter_by(user_id=conn.user_id).first()
|
|||
|
|
if not cfg or not cfg.is_enabled:
|
|||
|
|
continue
|
|||
|
|
try:
|
|||
|
|
result = _sync_user(db, conn.user_id, conn.access_token, conn.vk_user_id)
|
|||
|
|
logger.info("user=%s mirror_to_vk: %s", conn.user_id, result)
|
|||
|
|
totals["ok"] += 1
|
|||
|
|
except Exception as exc:
|
|||
|
|
logger.error("mirror_to_vk failed for user=%s: %s", conn.user_id, exc)
|
|||
|
|
totals["failed"] += 1
|
|||
|
|
finally:
|
|||
|
|
db.close()
|
|||
|
|
logger.info("mirror_to_vk done: %s", totals)
|
|||
|
|
return totals
|