- Add VK OAuth implicit flow: /vk-auth redirect, /vk-callback JS page, /vk-callback/save endpoint with state validation - Add VK_CLIENT_ID/VK_CLIENT_SECRET to config - Add refresh_token/token_expires_at columns to vk_connections (migration 0006) - Fix vk_catalog task: handle price/thumb_photo as string or dict (VK API v5.199) - Fix connections/vk/test: use groups.getById instead of market.getAlbums (works with both user and group tokens) - Add orphan deletion to mirror_to_vk: VK products not in Evotor are removed - Handle ungrouped Evotor products: push to "Без категории" VK album - Respect SyncConfig.is_enabled in mirror_to_vk - Add product count column to catalog groups page - Add group name column to catalog products page - Expand test suite: 73 new tests covering connections routes, catalog routes, vk_sync task logic, and catalog task helpers (138 total, all passing) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
134 lines
5.2 KiB
Python
134 lines
5.2 KiB
Python
"""Unit tests for catalog task helpers and refresh_catalog logic."""
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from web.tasks.catalog import _fetch_groups, _fetch_products, _fetch_stores
|
|
|
|
|
|
# ── _fetch_stores ─────────────────────────────────────────────────────────────
|
|
|
|
def test_fetch_stores_list_response():
|
|
mock = MagicMock()
|
|
mock.raise_for_status = MagicMock()
|
|
mock.json.return_value = [{"id": "s1", "name": "Магазин"}]
|
|
with patch("web.tasks.catalog.httpx.get", return_value=mock):
|
|
result = _fetch_stores("tok")
|
|
assert result == [{"id": "s1", "name": "Магазин"}]
|
|
|
|
|
|
def test_fetch_stores_dict_with_items():
|
|
mock = MagicMock()
|
|
mock.raise_for_status = MagicMock()
|
|
mock.json.return_value = {"items": [{"id": "s1"}], "total": 1}
|
|
with patch("web.tasks.catalog.httpx.get", return_value=mock):
|
|
result = _fetch_stores("tok")
|
|
assert result == [{"id": "s1"}]
|
|
|
|
|
|
# ── _fetch_groups ─────────────────────────────────────────────────────────────
|
|
|
|
def test_fetch_groups_success():
|
|
mock = MagicMock()
|
|
mock.status_code = 200
|
|
mock.raise_for_status = MagicMock()
|
|
mock.json.return_value = {"items": [{"id": "g1", "name": "Чай"}]}
|
|
with patch("web.tasks.catalog.httpx.get", return_value=mock):
|
|
result = _fetch_groups("tok", "s1")
|
|
assert result == [{"id": "g1", "name": "Чай"}]
|
|
|
|
|
|
@pytest.mark.parametrize("status_code", [402, 403])
|
|
def test_fetch_groups_returns_none_on_restricted(status_code):
|
|
mock = MagicMock()
|
|
mock.status_code = status_code
|
|
with patch("web.tasks.catalog.httpx.get", return_value=mock):
|
|
result = _fetch_groups("tok", "s1")
|
|
assert result is None
|
|
|
|
|
|
# ── _fetch_products ───────────────────────────────────────────────────────────
|
|
|
|
def test_fetch_products_success():
|
|
mock = MagicMock()
|
|
mock.status_code = 200
|
|
mock.raise_for_status = MagicMock()
|
|
mock.json.return_value = [{"id": "p1", "name": "Пуэр", "price": {"sum": 15000}}]
|
|
with patch("web.tasks.catalog.httpx.get", return_value=mock):
|
|
result = _fetch_products("tok", "s1")
|
|
assert len(result) == 1
|
|
assert result[0]["name"] == "Пуэр"
|
|
|
|
|
|
@pytest.mark.parametrize("status_code", [402, 403])
|
|
def test_fetch_products_returns_none_on_restricted(status_code):
|
|
mock = MagicMock()
|
|
mock.status_code = status_code
|
|
with patch("web.tasks.catalog.httpx.get", return_value=mock):
|
|
result = _fetch_products("tok", "s1")
|
|
assert result is None
|
|
|
|
|
|
# ── refresh_catalog task (integration with mocked HTTP) ──────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_catalog_upserts_stores(override_db):
|
|
from web.database import SessionLocal
|
|
from web.models.connections import CachedStore, EvotorConnection
|
|
from web.tasks.catalog import _sync_user
|
|
|
|
user_id = 1
|
|
token = "test-tok"
|
|
|
|
stores_data = [{"id": "s-new", "name": "Новый магазин", "address": "ул. Ленина 1"}]
|
|
groups_data = []
|
|
products_data = []
|
|
|
|
with patch("web.tasks.catalog._fetch_stores", return_value=stores_data), \
|
|
patch("web.tasks.catalog._fetch_groups", return_value=groups_data), \
|
|
patch("web.tasks.catalog._fetch_products", return_value=products_data):
|
|
_sync_user(override_db, user_id, token)
|
|
|
|
store = override_db.query(CachedStore).filter_by(user_id=user_id, evotor_id="s-new").first()
|
|
assert store is not None
|
|
assert store.name == "Новый магазин"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_catalog_upserts_products(override_db):
|
|
from web.models.connections import CachedProduct
|
|
from web.tasks.catalog import _sync_user
|
|
|
|
user_id = 2
|
|
token = "tok"
|
|
|
|
stores_data = [{"id": "s1", "name": "Магазин"}]
|
|
groups_data = [{"id": "g1", "name": "Чай"}]
|
|
products_data = [{
|
|
"id": "p1", "name": "Пуэр", "price": 35000,
|
|
"quantity": 10, "measureName": "шт", "code": "001",
|
|
"allowToSell": True, "group": "g1",
|
|
}]
|
|
|
|
with patch("web.tasks.catalog._fetch_stores", return_value=stores_data), \
|
|
patch("web.tasks.catalog._fetch_groups", return_value=groups_data), \
|
|
patch("web.tasks.catalog._fetch_products", return_value=products_data):
|
|
_sync_user(override_db, user_id, token)
|
|
|
|
p = override_db.query(CachedProduct).filter_by(user_id=user_id, evotor_id="p1").first()
|
|
assert p is not None
|
|
assert p.name == "Пуэр"
|
|
assert p.group_evotor_id == "g1"
|
|
assert p.allow_to_sell is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_catalog_skips_fetch_stores_failure(override_db):
|
|
from web.models.connections import CachedStore
|
|
from web.tasks.catalog import _sync_user
|
|
|
|
with patch("web.tasks.catalog._fetch_stores", side_effect=Exception("network error")):
|
|
_sync_user(override_db, user_id=99, token="tok")
|
|
|
|
assert override_db.query(CachedStore).filter_by(user_id=99).count() == 0
|