Catalog, VK catalog, and VK sync tasks were querying all connections regardless of user role. Admin and system accounts with stored tokens were generating unnecessary Evotor and VK API calls. Now all three tasks join to the users table and filter role = 'user' only. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
191 lines
6.5 KiB
Python
191 lines
6.5 KiB
Python
"""
|
|
Periodic VK catalog sync: fetch albums and products from VK Market
|
|
for every connected user and upsert into vk_cached_* tables.
|
|
"""
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from celery import shared_task
|
|
|
|
import web.lib.api_logger as api_logger
|
|
from web.config import settings
|
|
from web.database import SessionLocal
|
|
from web.models.connections import SyncConfig, VkCachedAlbum, VkCachedProduct, VkConnection
|
|
from web.models.user import User, UserRoleEnum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
VK_API = "https://api.vk.com/method"
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
|
|
def _vk_get(method: str, params: dict, token: str, user_id: int | None = None) -> dict:
|
|
params = {**params, "access_token": token, "v": settings.VK_API_VERSION}
|
|
r = api_logger.get(f"{VK_API}/{method}", user_id=user_id, params=params, timeout=20)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _sync_user(db, user_id: int, token: str, group_id: str) -> None:
|
|
now = _now()
|
|
owner_id = f"-{group_id}"
|
|
|
|
# ── albums ────────────────────────────────────────────────────────────────
|
|
try:
|
|
data = _vk_get("market.getAlbums", {"owner_id": owner_id, "count": 100}, token, user_id=user_id)
|
|
except Exception as e:
|
|
logger.warning("user=%s vk fetch albums failed: %s", user_id, e)
|
|
return
|
|
|
|
if "error" in data:
|
|
logger.warning("user=%s vk albums error: %s", user_id, data["error"])
|
|
return
|
|
|
|
albums = data.get("response", {}).get("items", [])
|
|
album_ids = []
|
|
for a in albums:
|
|
aid = str(a["id"])
|
|
album_ids.append(aid)
|
|
row = db.query(VkCachedAlbum).filter_by(user_id=user_id, vk_group_id=group_id, album_id=aid).first()
|
|
if row:
|
|
row.title = a.get("title", "")
|
|
row.count = a.get("count")
|
|
row.fetched_at = now
|
|
else:
|
|
db.add(VkCachedAlbum(
|
|
user_id=user_id,
|
|
vk_group_id=group_id,
|
|
album_id=aid,
|
|
title=a.get("title", ""),
|
|
count=a.get("count"),
|
|
fetched_at=now,
|
|
))
|
|
# Delete cached albums that no longer exist in VK
|
|
(
|
|
db.query(VkCachedAlbum)
|
|
.filter(
|
|
VkCachedAlbum.user_id == user_id,
|
|
VkCachedAlbum.vk_group_id == group_id,
|
|
VkCachedAlbum.album_id.notin_(album_ids),
|
|
)
|
|
.delete(synchronize_session=False)
|
|
)
|
|
db.flush()
|
|
|
|
# ── products (extended=1 gives albums_ids per product) ───────────────────
|
|
offset = 0
|
|
all_products = []
|
|
while True:
|
|
try:
|
|
data = _vk_get(
|
|
"market.get",
|
|
{"owner_id": owner_id, "count": 200, "offset": offset, "extended": 1},
|
|
token,
|
|
user_id=user_id,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("user=%s vk fetch products (extended) failed: %s", user_id, e)
|
|
break
|
|
|
|
if "error" in data:
|
|
logger.warning("user=%s vk products (extended) error: %s", user_id, data["error"])
|
|
break
|
|
|
|
items = data.get("response", {}).get("items", [])
|
|
all_products.extend(items)
|
|
if len(items) < 200:
|
|
break
|
|
offset += 200
|
|
|
|
for p in all_products:
|
|
pid = str(p["id"])
|
|
album_id = str(p["albums_ids"][0]) if p.get("albums_ids") else None
|
|
price_field = p.get("price")
|
|
if isinstance(price_field, dict):
|
|
price = float(price_field.get("amount", 0)) / 100 if price_field.get("amount") is not None else None
|
|
else:
|
|
price = float(price_field) if price_field is not None else None
|
|
thumb = None
|
|
thumb_field = p.get("thumb_photo")
|
|
if isinstance(thumb_field, dict):
|
|
sizes = thumb_field.get("sizes", [])
|
|
if sizes:
|
|
thumb = sizes[-1].get("url")
|
|
elif isinstance(thumb_field, str):
|
|
thumb = thumb_field
|
|
|
|
row = db.query(VkCachedProduct).filter_by(
|
|
user_id=user_id, vk_group_id=group_id, vk_product_id=pid,
|
|
).first()
|
|
if row:
|
|
row.album_id = album_id
|
|
row.name = p.get("title", "")
|
|
row.description = p.get("description")
|
|
row.price = price
|
|
row.availability = p.get("availability")
|
|
row.thumb_url = thumb
|
|
row.fetched_at = now
|
|
else:
|
|
db.add(VkCachedProduct(
|
|
user_id=user_id,
|
|
vk_group_id=group_id,
|
|
vk_product_id=pid,
|
|
album_id=album_id,
|
|
name=p.get("title", ""),
|
|
description=p.get("description"),
|
|
price=price,
|
|
availability=p.get("availability"),
|
|
thumb_url=thumb,
|
|
fetched_at=now,
|
|
))
|
|
|
|
db.commit()
|
|
logger.info(
|
|
"user=%s vk catalog synced: group=%s albums=%d products=%d",
|
|
user_id, group_id, len(albums), len(all_products),
|
|
)
|
|
|
|
|
|
@shared_task(
|
|
name="web.tasks.vk_catalog.refresh_vk_catalog",
|
|
queue="default",
|
|
bind=True,
|
|
max_retries=2,
|
|
default_retry_delay=60,
|
|
)
|
|
def refresh_vk_catalog(self) -> dict:
|
|
"""Fetch and cache VK Market albums and products for all connected users."""
|
|
db = SessionLocal()
|
|
results = {"ok": 0, "failed": 0}
|
|
try:
|
|
connections = (
|
|
db.query(VkConnection)
|
|
.join(User, User.id == VkConnection.user_id)
|
|
.filter(
|
|
VkConnection.user_id.isnot(None),
|
|
VkConnection.access_token.isnot(None),
|
|
VkConnection.access_token != "",
|
|
VkConnection.vk_user_id.isnot(None),
|
|
VkConnection.vk_user_id != "",
|
|
User.role == UserRoleEnum.user,
|
|
)
|
|
.all()
|
|
)
|
|
for conn in connections:
|
|
cfg = db.query(SyncConfig).filter_by(user_id=conn.user_id).first()
|
|
if not cfg or not cfg.vk_mirror_enabled:
|
|
continue
|
|
try:
|
|
_sync_user(db, conn.user_id, conn.access_token, conn.vk_user_id)
|
|
results["ok"] += 1
|
|
except Exception as exc:
|
|
logger.error("vk catalog sync failed for user=%s: %s", conn.user_id, exc)
|
|
results["failed"] += 1
|
|
finally:
|
|
db.close()
|
|
logger.info("refresh_vk_catalog done: %s", results)
|
|
return results
|