| Синхронизация | +Название | +ID | +Обновлено | ++ |
|---|---|---|---|---|
| + + | +{{ g.name }} | +{{ g.evotor_id }} | +{{ g.fetched_at | datefmt }} | ++ + Товары + + | +
Группы для этого магазина ещё не загружены.
+diff --git a/README.md b/README.md index 252c34f..078a59f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # EvoSync -Web service for syncing a product catalog from Evotor POS → VK Market. Users connect their Evotor account and a VK page; products from the cash register then appear automatically in the VK store. +Web service for syncing a product catalog from Evotor POS → VK Market. Users connect their Evotor account and a VK community; products from the cash register then appear automatically in the VK store. --- @@ -23,20 +23,20 @@ Web service for syncing a product catalog from Evotor POS → VK Market. Users c ### Services -| Service | Image / Dockerfile | Purpose | External port | -|----------|---------------------|----------------------------------------------|---------------| +| Service | Image / Dockerfile | Purpose | External port | +|----------|---------------------|-----------------------------------------------|---------------| | `web` | `Dockerfile.web` | FastAPI app, runs Alembic migrations on start | 8080 → 8000 | -| `worker` | `Dockerfile.web` | Celery worker (sync, health, notifications…) | — | -| `beat` | `Dockerfile.web` | Celery Beat — periodic task scheduler | — | -| `flower` | `Dockerfile.web` | Celery queue monitoring UI | 5555 | -| `db` | `mariadb:11.4` | Primary relational database | — | -| `redis` | `redis:7-alpine` | Celery broker and result backend | — | +| `worker` | `Dockerfile.web` | Celery worker (sync, health, notifications…) | — | +| `beat` | `Dockerfile.web` | Celery Beat — periodic task scheduler | — | +| `flower` | `Dockerfile.web` | Celery queue monitoring UI | 5555 | +| `db` | `mariadb:11.4` | Primary relational database | — | +| `redis` | `redis:7-alpine` | Celery broker and result backend | — | ### Stack - **Python 3.12**, FastAPI 0.115, Uvicorn - **SQLAlchemy 2** + Alembic, MariaDB (PyMySQL) -- **Celery 5** + Redis +- **Celery 5** + Redis — background tasks, periodic catalog sync - **Jinja2** — server-side HTML rendering (`web/templates/`) - **Pydantic Settings** — configuration from env vars / `.env` - bcrypt — password hashing @@ -46,25 +46,62 @@ Web service for syncing a product catalog from Evotor POS → VK Market. Users c ## Database Schema -| Table | Purpose | -|----------------------|--------------------------------------------------------------------------------| -| `users` | User accounts (roles: system / admin / user; statuses: pending / active / suspended) | -| `evotor_connections` | User ↔ Evotor link (access_token, api_token returned to Evotor webhooks) | -| `vk_connections` | User ↔ VK OAuth link | -| `sync_configs` | Per-user sync settings | -| `sync_filters` | Product / group inclusion/exclusion filters | -| `cached_stores` | Cached list of Evotor stores | -| `cached_groups` | Cached Evotor product groups | -| `cached_products` | Cached Evotor product catalog | -| `roles` | RBAC roles | -| `permissions` | RBAC permissions | -| `role_permissions` | M2M: role ↔ permission | -| `user_roles` | M2M: user ↔ role | +| Table | Purpose | +|-----------------------|--------------------------------------------------------------------------------------| +| `users` | User accounts (roles: system / admin / user; statuses: pending / active / suspended) | +| `evotor_connections` | User ↔ Evotor link (access_token, api_token returned to Evotor webhooks) | +| `vk_connections` | User ↔ VK link (user access token + VK community ID) | +| `sync_configs` | Per-user sync settings | +| `sync_filters` | Store / group inclusion filters (entity_type: store / group) | +| `cached_stores` | Cached list of Evotor stores | +| `cached_groups` | Cached Evotor product groups | +| `cached_products` | Cached Evotor product catalog | +| `vk_cached_albums` | Cached VK Market albums (product groups) | +| `vk_cached_products` | Cached VK Market products | +| `roles` | RBAC roles | +| `permissions` | RBAC permissions | +| `role_permissions` | M2M: role ↔ permission | +| `user_roles` | M2M: user ↔ role | + +--- + +## Background Tasks + +Periodic tasks run via **Celery Beat** and are executed by the **worker** service. + +| Task | Schedule | Description | +|------|----------|-------------| +| `web.tasks.catalog.refresh_catalog` | Every `CATALOG_REFRESH_INTERVAL_SECONDS` | Fetches stores, product groups, and products from the Evotor API for every connected user; upserts into `cached_stores`, `cached_groups`, `cached_products` | +| `web.tasks.vk_catalog.refresh_vk_catalog` | Every `CATALOG_REFRESH_INTERVAL_SECONDS` | Fetches Market albums and products from VK API for every connected user; upserts into `vk_cached_albums`, `vk_cached_products` | + +**Evotor sync sequence per user:** +1. `GET /stores` → upsert `cached_stores` +2. For each store: `GET /stores/{id}/product-groups` → upsert `cached_groups` +3. For each store: `GET /stores/{id}/products` → upsert `cached_products` + +**VK sync sequence per user:** +1. `market.getAlbums` → upsert `vk_cached_albums` +2. `market.get` (extended=1, paginated) → upsert `vk_cached_products` with album membership + +Per-user failures are logged and skipped — one broken token does not block other users. +Evotor stores that return `402 Payment Required` (subscription limit) are silently skipped at debug log level. --- ## Routes +### Connections + +| Method | Path | Description | +|--------|-----------------------------------|------------------------------------------| +| GET | `/connections` | View Evotor and VK connection status | +| POST | `/connections/evotor` | Save / update Evotor API token manually | +| POST | `/connections/evotor/disconnect` | Remove Evotor connection | +| POST | `/connections/evotor/test` | Test Evotor connection (JSON) | +| POST | `/connections/vk` | Save / update VK token and group ID | +| POST | `/connections/vk/disconnect` | Remove VK connection | +| POST | `/connections/vk/test` | Test VK connection (JSON) | + ### Public / Authentication | Method | Path | Description | @@ -106,11 +143,29 @@ Web service for syncing a product catalog from Evotor POS → VK Market. Users c ### Evotor Webhooks (Bearer `EVOTOR_WEBHOOK_SECRET`) -| Method | Path | Description | -|--------|----------------|------------------------------------------------------------------| -| POST | `/user/create` | Evotor creates/links a user and receives an api_token | -| POST | `/user/verify` | Evotor verifies user credentials and receives an api_token | -| POST | `/user/token` | Evotor delivers its own access_token for a user | +| Method | Path | Description | +|--------|----------------|-----------------------------------------------------------| +| POST | `/user/create` | Evotor creates/links a user; returns api_token | +| POST | `/user/verify` | Evotor verifies user credentials; returns api_token | +| POST | `/user/token` | Evotor delivers its own access_token for a user | + +### Evotor Catalog (requires session) + +| Method | Path | Description | +|--------|---------------------------------------------------|--------------------------------------------| +| GET | `/catalog` | Redirects to `/catalog/stores` | +| GET | `/catalog/stores` | Evotor stores with per-store sync toggle | +| GET | `/catalog/stores/{id}/groups` | Product groups with per-group sync toggle | +| GET | `/catalog/stores/{id}/products` | Products (filterable by group) | +| POST | `/catalog/stores/{id}/toggle` | Enable / disable store sync | +| POST | `/catalog/stores/{id}/groups/{gid}/toggle` | Enable / disable group sync | + +### VK Catalog (requires session) + +| Method | Path | Description | +|--------|---------------------------------------|------------------------------------| +| GET | `/vk-catalog/albums` | VK Market albums (product groups) | +| GET | `/vk-catalog/albums/{id}/products` | Products in a VK album | ### API Docs @@ -125,22 +180,22 @@ Web service for syncing a product catalog from Evotor POS → VK Market. Users c All settings are read from environment variables or a `.env` file: -| Variable | Default | Description | -|------------------------------------|-------------------------------------|--------------------------------------| -| `DATABASE_URL` | `mysql+pymysql://…@db:3306/evosync` | MariaDB connection string | -| `REDIS_URL` | `redis://redis:6379/0` | Redis connection string | -| `SECRET_KEY` | `change-me-in-production` | Session signing key | -| `BASE_URL` | `http://localhost:8000` | Public URL of the service | -| `EVOTOR_APP_ID` | — | Evotor application ID | -| `EVOTOR_WEBHOOK_SECRET` | — | Bearer secret for webhook endpoints | -| `JIVOSITE_WIDGET_ID` | — | JivoSite widget ID | -| `VK_DEFAULT_PHOTO_PATH` | `/app/default_product.png` | Fallback image path for VK products | -| `VK_API_VERSION` | `5.199` | VK API version | -| `CATALOG_REFRESH_INTERVAL_SECONDS` | `3600` | Catalog cache refresh interval | -| `INVITE_EXPIRE_HOURS` | `48` | Invite link TTL in hours | -| `EMAIL_PROVIDER` | `console` | Email provider (console / smtp / …) | -| `SMS_PROVIDER` | `console` | SMS provider | -| `FLOWER_USER` / `FLOWER_PASSWORD` | `admin` / `changeme` | Basic Auth credentials for Flower | +| Variable | Default | Description | +|------------------------------------|-------------------------------------|---------------------------------------| +| `DATABASE_URL` | `mysql+pymysql://…@db:3306/evosync` | MariaDB connection string | +| `REDIS_URL` | `redis://redis:6379/0` | Redis connection string | +| `SECRET_KEY` | `change-me-in-production` | Session signing key | +| `BASE_URL` | `http://localhost:8000` | Public URL of the service | +| `EVOTOR_APP_ID` | — | Evotor application ID | +| `EVOTOR_WEBHOOK_SECRET` | — | Bearer secret for webhook endpoints | +| `JIVOSITE_WIDGET_ID` | — | JivoSite widget ID | +| `VK_DEFAULT_PHOTO_PATH` | `/app/default_product.png` | Fallback image path for VK products | +| `VK_API_VERSION` | `5.199` | VK API version | +| `CATALOG_REFRESH_INTERVAL_SECONDS` | `3600` | Evotor + VK catalog sync interval (s) | +| `INVITE_EXPIRE_HOURS` | `48` | Invite link TTL in hours | +| `EMAIL_PROVIDER` | `console` | Email provider (console / smtp / …) | +| `SMS_PROVIDER` | `console` | SMS provider | +| `FLOWER_USER` / `FLOWER_PASSWORD` | `admin` / `changeme` | Basic Auth credentials for Flower | --- diff --git a/docker-compose.yml b/docker-compose.yml index 82116f9..48dc7cf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,7 +73,7 @@ services: condition: service_healthy db: condition: service_healthy - command: celery -A web.tasks.celery_app worker --loglevel=info --concurrency=2 --queues=default,sync,health,notifications + command: celery -A web.tasks.celery_app worker --loglevel=info --concurrency=2 --queues=default,sync,health,notifications -E beat: build: @@ -84,6 +84,7 @@ services: DATABASE_URL: mysql+pymysql://${DB_USER}:${DB_PASSWORD}@db:3306/${DB_NAME} REDIS_URL: redis://redis:6379/0 SECRET_KEY: ${SECRET_KEY:-change-me-in-production} + CATALOG_REFRESH_INTERVAL_SECONDS: ${CATALOG_REFRESH_INTERVAL_SECONDS:-3600} depends_on: redis: condition: service_healthy diff --git a/web/main.py b/web/main.py index 755c326..998c430 100644 --- a/web/main.py +++ b/web/main.py @@ -35,6 +35,9 @@ from web.routes.invite import router as invite_router # noqa: E402 from web.routes.profile import router as profile_router # noqa: E402 from web.routes.evotor_webhooks import router as evotor_webhooks_router # noqa: E402 from web.routes.admin import router as admin_router # noqa: E402 +from web.routes.catalog import router as catalog_router # noqa: E402 +from web.routes.connections import router as connections_router # noqa: E402 +from web.routes.vk_catalog import router as vk_catalog_router # noqa: E402 app.include_router(auth_router) app.include_router(reset_router) @@ -42,6 +45,16 @@ app.include_router(invite_router) app.include_router(profile_router) app.include_router(evotor_webhooks_router) app.include_router(admin_router) +app.include_router(catalog_router) +app.include_router(connections_router) +app.include_router(vk_catalog_router) + + +# ── Catalog redirect ───────────────────────────────────────────────────────── +@app.get("/catalog") +async def catalog_redirect(): + from fastapi.responses import RedirectResponse + return RedirectResponse("/catalog/stores", 302) # ── Health ──────────────────────────────────────────────────────────────────── diff --git a/web/migrations/versions/0004_vk_catalog_tables.py b/web/migrations/versions/0004_vk_catalog_tables.py new file mode 100644 index 0000000..c624976 --- /dev/null +++ b/web/migrations/versions/0004_vk_catalog_tables.py @@ -0,0 +1,56 @@ +"""VK catalog tables (albums + products) + +Revision ID: 0004 +Revises: 0003 +Create Date: 2026-05-01 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "0004" +down_revision: Union[str, None] = "0003" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "vk_cached_albums", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("user_id", sa.Integer, sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False), + sa.Column("vk_group_id", sa.String(50), nullable=False), + sa.Column("album_id", sa.String(50), nullable=False), + sa.Column("title", sa.String(255), nullable=False), + sa.Column("count", sa.Integer, nullable=True), + sa.Column("fetched_at", sa.DateTime, nullable=False), + sa.UniqueConstraint("user_id", "vk_group_id", "album_id", name="uq_vk_cached_albums"), + ) + op.create_index("ix_vk_cached_albums_user_group", "vk_cached_albums", ["user_id", "vk_group_id"]) + + op.create_table( + "vk_cached_products", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("user_id", sa.Integer, sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False), + sa.Column("vk_group_id", sa.String(50), nullable=False), + sa.Column("vk_product_id", sa.String(50), nullable=False), + sa.Column("album_id", sa.String(50), nullable=True), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("description", sa.Text, nullable=True), + sa.Column("price", sa.Numeric(12, 2), nullable=True), + sa.Column("availability", sa.Integer, nullable=True), + sa.Column("thumb_url", sa.String(1024), nullable=True), + sa.Column("fetched_at", sa.DateTime, nullable=False), + sa.UniqueConstraint("user_id", "vk_group_id", "vk_product_id", name="uq_vk_cached_products"), + ) + op.create_index( + "ix_vk_cached_products_user_group_album", + "vk_cached_products", ["user_id", "vk_group_id", "album_id"], + ) + + +def downgrade() -> None: + op.drop_table("vk_cached_products") + op.drop_table("vk_cached_albums") diff --git a/web/models/connections.py b/web/models/connections.py index 0d9972a..108cbdd 100644 --- a/web/models/connections.py +++ b/web/models/connections.py @@ -81,6 +81,44 @@ class SyncFilter(Base): ) +class VkCachedAlbum(Base): + __tablename__ = "vk_cached_albums" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) + vk_group_id = Column(String(50), nullable=False) + album_id = Column(String(50), nullable=False) + title = Column(String(255), nullable=False) + count = Column(Integer, nullable=True) + fetched_at = Column(DateTime, nullable=False) + + __table_args__ = ( + UniqueConstraint("user_id", "vk_group_id", "album_id", name="uq_vk_cached_albums"), + Index("ix_vk_cached_albums_user_group", "user_id", "vk_group_id"), + ) + + +class VkCachedProduct(Base): + __tablename__ = "vk_cached_products" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) + vk_group_id = Column(String(50), nullable=False) + vk_product_id = Column(String(50), nullable=False) + album_id = Column(String(50), nullable=True) + name = Column(String(255), nullable=False) + description = Column(Text, nullable=True) + price = Column(Numeric(12, 2), nullable=True) + availability = Column(Integer, nullable=True) + thumb_url = Column(String(1024), nullable=True) + fetched_at = Column(DateTime, nullable=False) + + __table_args__ = ( + UniqueConstraint("user_id", "vk_group_id", "vk_product_id", name="uq_vk_cached_products"), + Index("ix_vk_cached_products_user_group_album", "user_id", "vk_group_id", "album_id"), + ) + + class CachedStore(Base): __tablename__ = "cached_stores" diff --git a/web/routes/catalog.py b/web/routes/catalog.py new file mode 100644 index 0000000..580aa77 --- /dev/null +++ b/web/routes/catalog.py @@ -0,0 +1,252 @@ +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from sqlalchemy.orm import Session + +from web.auth.session import get_current_user +from web.config import settings +from web.database import get_db +from web.models.connections import CachedGroup, CachedProduct, CachedStore, SyncConfig, SyncFilter +from web.templates_env import templates + +router = APIRouter() + + +def _get_or_create_sync_config(db: Session, user_id: int) -> SyncConfig: + cfg = db.query(SyncConfig).filter_by(user_id=user_id).first() + if not cfg: + cfg = SyncConfig(user_id=user_id, is_enabled=True) + db.add(cfg) + db.flush() + return cfg + + +def _enabled_store_ids(db: Session, user_id: int) -> set[str] | None: + """Return set of enabled store evotor_ids, or None if no filters set (all enabled).""" + cfg = db.query(SyncConfig).filter_by(user_id=user_id).first() + if not cfg: + return None + filters = db.query(SyncFilter).filter_by( + sync_config_id=cfg.id, entity_type="store", filter_mode="include" + ).all() + if not filters: + return None + return {f.entity_id for f in filters} + + +def _enabled_group_ids(db: Session, user_id: int, store_evotor_id: str) -> set[str] | None: + """Return set of enabled group evotor_ids for a store, or None if all enabled.""" + cfg = db.query(SyncConfig).filter_by(user_id=user_id).first() + if not cfg: + return None + filters = db.query(SyncFilter).filter_by( + sync_config_id=cfg.id, entity_type="group", filter_mode="include", + parent_entity_id=store_evotor_id, + ).all() + if not filters: + return None + return {f.entity_id for f in filters} + + +def _render(request: Request, template: str, ctx: dict) -> HTMLResponse: + ctx["request"] = request + ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID) + return templates.TemplateResponse(ctx.pop("request"), template, ctx) + + +@router.get("/catalog/stores") +async def catalog_stores(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + stores = ( + db.query(CachedStore) + .filter(CachedStore.user_id == user.id) + .order_by(CachedStore.name) + .all() + ) + enabled_ids = _enabled_store_ids(db, user.id) + return _render(request, "catalog/stores.html", { + "user": user, + "stores": stores, + "enabled_ids": enabled_ids, # None = all enabled, set = explicit list + "refresh_interval": settings.CATALOG_REFRESH_INTERVAL_SECONDS, + }) + + +@router.get("/catalog/stores/{store_evotor_id}/groups") +async def catalog_groups(store_evotor_id: str, request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + store = ( + db.query(CachedStore) + .filter(CachedStore.user_id == user.id, CachedStore.evotor_id == store_evotor_id) + .first() + ) + if not store: + return RedirectResponse("/catalog/stores", 303) + + groups = ( + db.query(CachedGroup) + .filter(CachedGroup.user_id == user.id, CachedGroup.store_evotor_id == store_evotor_id) + .order_by(CachedGroup.name) + .all() + ) + enabled_ids = _enabled_group_ids(db, user.id, store_evotor_id) + return _render(request, "catalog/groups.html", { + "user": user, "store": store, "groups": groups, + "enabled_ids": enabled_ids, + }) + + +@router.get("/catalog/stores/{store_evotor_id}/products") +async def catalog_products(store_evotor_id: str, request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + store = ( + db.query(CachedStore) + .filter(CachedStore.user_id == user.id, CachedStore.evotor_id == store_evotor_id) + .first() + ) + if not store: + return RedirectResponse("/catalog/stores", 303) + + group_id = request.query_params.get("group") + q = db.query(CachedProduct).filter( + CachedProduct.user_id == user.id, + CachedProduct.store_evotor_id == store_evotor_id, + ) + if group_id: + q = q.filter(CachedProduct.group_evotor_id == group_id) + + products = q.order_by(CachedProduct.name).all() + groups = ( + db.query(CachedGroup) + .filter(CachedGroup.user_id == user.id, CachedGroup.store_evotor_id == store_evotor_id) + .order_by(CachedGroup.name) + .all() + ) + return _render(request, "catalog/products.html", { + "user": user, + "store": store, + "products": products, + "groups": groups, + "group_id": group_id, + }) + + +@router.post("/catalog/stores/{store_evotor_id}/toggle") +async def catalog_store_toggle(store_evotor_id: str, request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + cfg = _get_or_create_sync_config(db, user.id) + + # If no filters exist yet, that means all stores are implicitly enabled. + # Toggling one store OFF means we create include-filters for all OTHER stores. + existing = db.query(SyncFilter).filter_by( + sync_config_id=cfg.id, entity_type="store", filter_mode="include" + ).all() + existing_ids = {f.entity_id for f in existing} + + if store_evotor_id in existing_ids: + # Currently enabled → disable: remove its filter + db.query(SyncFilter).filter_by( + sync_config_id=cfg.id, entity_type="store", + entity_id=store_evotor_id, filter_mode="include", + ).delete() + else: + if not existing_ids: + # First toggle: seed include-filters for all OTHER stores + all_stores = db.query(CachedStore).filter_by(user_id=user.id).all() + now = datetime.now(timezone.utc).replace(tzinfo=None) + for s in all_stores: + if s.evotor_id == store_evotor_id: + continue + db.add(SyncFilter( + sync_config_id=cfg.id, + entity_type="store", + entity_id=s.evotor_id, + entity_name=s.name, + filter_mode="include", + created_at=now, + )) + else: + # Re-enable: add its filter back + db.add(SyncFilter( + sync_config_id=cfg.id, + entity_type="store", + entity_id=store_evotor_id, + filter_mode="include", + created_at=datetime.now(timezone.utc).replace(tzinfo=None), + )) + + db.commit() + return RedirectResponse("/catalog/stores", 303) + + +@router.post("/catalog/stores/{store_evotor_id}/groups/{group_evotor_id}/toggle") +async def catalog_group_toggle( + store_evotor_id: str, group_evotor_id: str, + request: Request, db: Session = Depends(get_db), +): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + cfg = _get_or_create_sync_config(db, user.id) + + existing = db.query(SyncFilter).filter_by( + sync_config_id=cfg.id, entity_type="group", filter_mode="include", + parent_entity_id=store_evotor_id, + ).all() + existing_ids = {f.entity_id for f in existing} + + if group_evotor_id in existing_ids: + db.query(SyncFilter).filter_by( + sync_config_id=cfg.id, entity_type="group", + entity_id=group_evotor_id, filter_mode="include", + ).delete() + else: + if not existing_ids: + # First toggle: seed include-filters for all OTHER groups in this store + all_groups = db.query(CachedGroup).filter_by( + user_id=user.id, store_evotor_id=store_evotor_id, + ).all() + now = datetime.now(timezone.utc).replace(tzinfo=None) + for g in all_groups: + if g.evotor_id == group_evotor_id: + continue + db.add(SyncFilter( + sync_config_id=cfg.id, + entity_type="group", + entity_id=g.evotor_id, + entity_name=g.name, + filter_mode="include", + parent_entity_id=store_evotor_id, + created_at=now, + )) + else: + db.add(SyncFilter( + sync_config_id=cfg.id, + entity_type="group", + entity_id=group_evotor_id, + filter_mode="include", + parent_entity_id=store_evotor_id, + created_at=datetime.now(timezone.utc).replace(tzinfo=None), + )) + + db.commit() + return RedirectResponse(f"/catalog/stores/{store_evotor_id}/groups", 303) diff --git a/web/routes/connections.py b/web/routes/connections.py new file mode 100644 index 0000000..df8644c --- /dev/null +++ b/web/routes/connections.py @@ -0,0 +1,229 @@ +import secrets +from datetime import datetime, timezone + +import httpx +from fastapi import APIRouter, Depends, Request +from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse +from sqlalchemy.orm import Session + +from web.auth.session import get_current_user +from web.config import settings +from web.database import get_db +from web.models.connections import EvotorConnection, VkConnection +from web.templates_env import templates + +router = APIRouter() + + +def _render(request: Request, template: str, ctx: dict) -> HTMLResponse: + ctx["request"] = request + ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID) + return templates.TemplateResponse(ctx.pop("request"), template, ctx) + + +def _now() -> datetime: + return datetime.now(timezone.utc).replace(tzinfo=None) + + +@router.get("/connections") +async def connections_get(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + evotor = db.query(EvotorConnection).filter_by(user_id=user.id).first() + vk = db.query(VkConnection).filter_by(user_id=user.id).first() + return _render(request, "connections.html", {"user": user, "evotor": evotor, "vk": vk}) + + +@router.post("/connections/evotor") +async def connections_evotor_post(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + form = await request.form() + access_token = str(form.get("access_token", "")).strip() + evotor_user_id = str(form.get("evotor_user_id", "")).strip() or None + + if not access_token: + evotor = db.query(EvotorConnection).filter_by(user_id=user.id).first() + return _render(request, "connections.html", { + "user": user, + "evotor": evotor, + "errors": ["API-токен обязателен"], + }) + + now = _now() + conn = db.query(EvotorConnection).filter_by(user_id=user.id).first() + if conn: + conn.access_token = access_token + if evotor_user_id: + conn.evotor_user_id = evotor_user_id + conn.updated_at = now + else: + conn = EvotorConnection( + user_id=user.id, + evotor_user_id=evotor_user_id, + access_token=access_token, + api_token=secrets.token_urlsafe(32), + connected_at=now, + updated_at=now, + ) + db.add(conn) + + if evotor_user_id and not user.evotor_user_id: + user.evotor_user_id = evotor_user_id + + db.commit() + return RedirectResponse("/connections?success=1", 303) + + +@router.post("/connections/evotor/disconnect") +async def connections_evotor_disconnect(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + conn = db.query(EvotorConnection).filter_by(user_id=user.id).first() + if conn: + db.delete(conn) + db.commit() + return RedirectResponse("/connections", 303) + + +@router.post("/connections/vk") +async def connections_vk_post(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + form = await request.form() + access_token = str(form.get("access_token", "")).strip() + vk_group_id = str(form.get("vk_group_id", "")).strip() or None + + if not access_token: + evotor = db.query(EvotorConnection).filter_by(user_id=user.id).first() + vk = db.query(VkConnection).filter_by(user_id=user.id).first() + return _render(request, "connections.html", { + "user": user, + "evotor": evotor, + "vk": vk, + "errors": ["Токен VK обязателен"], + }) + + now = _now() + conn = db.query(VkConnection).filter_by(user_id=user.id).first() + if conn: + conn.access_token = access_token + if vk_group_id: + conn.vk_user_id = vk_group_id + conn.updated_at = now + else: + conn = VkConnection( + user_id=user.id, + access_token=access_token, + vk_user_id=vk_group_id, + connected_at=now, + updated_at=now, + ) + db.add(conn) + + db.commit() + return RedirectResponse("/connections?success=1", 303) + + +@router.post("/connections/vk/disconnect") +async def connections_vk_disconnect(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + conn = db.query(VkConnection).filter_by(user_id=user.id).first() + if conn: + db.delete(conn) + db.commit() + return RedirectResponse("/connections", 303) + + +@router.post("/connections/evotor/test") +async def connections_evotor_test(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return JSONResponse({"ok": False, "message": "Не авторизован"}, status_code=401) + + conn = db.query(EvotorConnection).filter_by(user_id=user.id).first() + if not conn: + return JSONResponse({"ok": False, "message": "Подключение не настроено"}) + + try: + r = httpx.get( + "https://api.evotor.ru/stores", + headers={ + "Authorization": f"Bearer {conn.access_token}", + "Accept": "application/vnd.evotor.v2+json", + }, + timeout=10, + ) + if r.status_code == 200: + data = r.json() + items = data.get("items", data) if isinstance(data, dict) else data + count = len(items) if isinstance(items, list) else "?" + return JSONResponse({"ok": True, "message": f"Успешно. Найдено магазинов: {count}"}) + elif r.status_code == 401: + return JSONResponse({"ok": False, "message": "Токен недействителен (401)"}) + else: + return JSONResponse({"ok": False, "message": f"Ошибка API: HTTP {r.status_code}"}) + except httpx.TimeoutException: + return JSONResponse({"ok": False, "message": "Таймаут запроса к Эвотор"}) + except Exception as e: + return JSONResponse({"ok": False, "message": f"Ошибка: {e}"}) + + +@router.post("/connections/vk/test") +async def connections_vk_test(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return JSONResponse({"ok": False, "message": "Не авторизован"}, status_code=401) + + conn = db.query(VkConnection).filter_by(user_id=user.id).first() + if not conn: + return JSONResponse({"ok": False, "message": "Подключение не настроено"}) + + try: + params = { + "access_token": conn.access_token, + "v": settings.VK_API_VERSION, + } + if conn.vk_user_id: + params["group_ids"] = conn.vk_user_id + + r = httpx.get( + "https://api.vk.com/method/groups.getById", + params=params, + timeout=10, + ) + data = r.json() + if "error" in data: + code = data["error"].get("error_code") + msg = data["error"].get("error_msg", "Неизвестная ошибка") + return JSONResponse({"ok": False, "message": f"Ошибка VK API ({code}): {msg}"}) + + groups = data.get("response", {}).get("groups", []) + if groups: + name = groups[0].get("name", "—") + return JSONResponse({"ok": True, "message": f"Успешно. Сообщество: «{name}»"}) + else: + return JSONResponse({"ok": True, "message": "Токен действителен. Укажите ID сообщества для полной проверки."}) + except httpx.TimeoutException: + return JSONResponse({"ok": False, "message": "Таймаут запроса к VK"}) + except Exception as e: + return JSONResponse({"ok": False, "message": f"Ошибка: {e}"}) + diff --git a/web/routes/vk_catalog.py b/web/routes/vk_catalog.py new file mode 100644 index 0000000..46233f2 --- /dev/null +++ b/web/routes/vk_catalog.py @@ -0,0 +1,63 @@ +from fastapi import APIRouter, Depends, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from sqlalchemy.orm import Session + +from web.auth.session import get_current_user +from web.config import settings +from web.database import get_db +from web.models.connections import VkCachedAlbum, VkCachedProduct, VkConnection +from web.templates_env import templates + +router = APIRouter() + + +def _render(request: Request, template: str, ctx: dict) -> HTMLResponse: + ctx["request"] = request + ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID) + return templates.TemplateResponse(ctx.pop("request"), template, ctx) + + +@router.get("/vk-catalog/albums") +async def vk_catalog_albums(request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + vk_conn = db.query(VkConnection).filter_by(user_id=user.id).first() + albums = ( + db.query(VkCachedAlbum) + .filter(VkCachedAlbum.user_id == user.id) + .order_by(VkCachedAlbum.title) + .all() + ) + return _render(request, "vk_catalog/albums.html", { + "user": user, + "albums": albums, + "vk_conn": vk_conn, + "refresh_interval": settings.CATALOG_REFRESH_INTERVAL_SECONDS, + }) + + +@router.get("/vk-catalog/albums/{album_id}/products") +async def vk_catalog_products(album_id: str, request: Request, db: Session = Depends(get_db)): + try: + user = get_current_user(request, db) + except Exception: + return RedirectResponse("/login", 303) + + album = db.query(VkCachedAlbum).filter_by(user_id=user.id, album_id=album_id).first() + if not album: + return RedirectResponse("/vk-catalog/albums", 303) + + products = ( + db.query(VkCachedProduct) + .filter(VkCachedProduct.user_id == user.id, VkCachedProduct.album_id == album_id) + .order_by(VkCachedProduct.name) + .all() + ) + return _render(request, "vk_catalog/products.html", { + "user": user, + "album": album, + "products": products, + }) diff --git a/web/tasks/catalog.py b/web/tasks/catalog.py new file mode 100644 index 0000000..5df5027 --- /dev/null +++ b/web/tasks/catalog.py @@ -0,0 +1,227 @@ +""" +Periodic catalog sync: fetch stores / product-groups / products from Evotor +for every connected user and upsert into cached_* tables. + +Beat schedule entry (set in celery_app.py): + refresh_catalog — runs every CATALOG_REFRESH_INTERVAL_SECONDS seconds +""" +import logging +from datetime import datetime, timezone + +import httpx +from celery import shared_task + +from web.config import settings +from web.database import SessionLocal +from web.models.connections import CachedGroup, CachedProduct, CachedStore, EvotorConnection, SyncConfig, SyncFilter + +logger = logging.getLogger(__name__) + +EVO_API = "https://api.evotor.ru" + + +def _headers(token: str) -> dict: + return { + "Authorization": f"Bearer {token}", + "Accept": "application/vnd.evotor.v2+json", + "Content-Type": "application/json", + } + + +def _now() -> datetime: + return datetime.now(timezone.utc).replace(tzinfo=None) + + +# ── per-user helpers ────────────────────────────────────────────────────────── + +def _fetch_stores(token: str) -> list[dict]: + r = httpx.get(f"{EVO_API}/stores", headers=_headers(token), timeout=15) + r.raise_for_status() + data = r.json() + return data.get("items", data) if isinstance(data, dict) else data + + +def _fetch_groups(token: str, store_id: str) -> list[dict] | None: + """Returns None if the store is not accessible (402/403), list otherwise.""" + r = httpx.get( + f"{EVO_API}/stores/{store_id}/product-groups", + headers=_headers(token), timeout=15, + ) + if r.status_code in (402, 403): + return None + r.raise_for_status() + data = r.json() + return data.get("items", data) if isinstance(data, dict) else data + + +def _fetch_products(token: str, store_id: str) -> list[dict] | None: + """Returns None if the store is not accessible (402/403), list otherwise.""" + r = httpx.get( + f"{EVO_API}/stores/{store_id}/products", + headers=_headers(token), timeout=30, + ) + if r.status_code in (402, 403): + return None + r.raise_for_status() + data = r.json() + return data.get("items", data) if isinstance(data, dict) else data + + +def _sync_user(db, user_id: int, token: str) -> None: + now = _now() + + # ── stores ──────────────────────────────────────────────────────────────── + try: + stores = _fetch_stores(token) + except Exception as e: + logger.warning("user=%s fetch stores failed: %s", user_id, e) + return + + store_ids = [] + for s in stores: + evo_id = s.get("id") or s.get("uuid") + if not evo_id: + continue + store_ids.append(evo_id) + row = db.query(CachedStore).filter_by(user_id=user_id, evotor_id=evo_id).first() + if row: + row.name = s.get("name", "") + row.address = s.get("address", {}).get("str") if isinstance(s.get("address"), dict) else s.get("address") + row.fetched_at = now + else: + db.add(CachedStore( + user_id=user_id, + evotor_id=evo_id, + name=s.get("name", ""), + address=s.get("address", {}).get("str") if isinstance(s.get("address"), dict) else s.get("address"), + fetched_at=now, + )) + + db.flush() + + # ── apply store filter ──────────────────────────────────────────────────── + cfg = db.query(SyncConfig).filter_by(user_id=user_id).first() + if cfg: + include_filters = db.query(SyncFilter).filter_by( + sync_config_id=cfg.id, entity_type="store", filter_mode="include" + ).all() + if include_filters: + allowed = {f.entity_id for f in include_filters} + store_ids = [s for s in store_ids if s in allowed] + + # ── groups & products per store ─────────────────────────────────────────── + for store_evo_id in store_ids: + # groups + try: + groups = _fetch_groups(token, store_evo_id) + except Exception as e: + logger.warning("user=%s store=%s fetch groups failed: %s", user_id, store_evo_id, e) + groups = [] + if groups is None: + logger.debug("user=%s store=%s groups not available (402/403), skipping", user_id, store_evo_id) + continue + + for g in groups: + evo_id = g.get("id") or g.get("uuid") + if not evo_id: + continue + row = db.query(CachedGroup).filter_by(user_id=user_id, evotor_id=evo_id).first() + if row: + row.name = g.get("name", "") + row.store_evotor_id = store_evo_id + row.fetched_at = now + else: + db.add(CachedGroup( + user_id=user_id, + evotor_id=evo_id, + store_evotor_id=store_evo_id, + name=g.get("name", ""), + fetched_at=now, + )) + + # products + try: + products = _fetch_products(token, store_evo_id) + except Exception as e: + logger.warning("user=%s store=%s fetch products failed: %s", user_id, store_evo_id, e) + products = [] + if products is None: + logger.debug("user=%s store=%s products not available (402/403), skipping", user_id, store_evo_id) + products = [] + + for p in products: + evo_id = p.get("id") or p.get("uuid") + if not evo_id: + continue + price = p.get("price") + quantity = p.get("quantity") + row = db.query(CachedProduct).filter_by(user_id=user_id, evotor_id=evo_id).first() + if row: + row.store_evotor_id = store_evo_id + row.group_evotor_id = p.get("group") or p.get("parentUuid") + row.name = p.get("name", "") + row.price = float(price) if price is not None else None + row.quantity = float(quantity) if quantity is not None else None + row.measure_name = p.get("measureName") or p.get("measure_name") + row.article_number = p.get("code") or p.get("article_number") + row.allow_to_sell = p.get("allowToSell") if p.get("allowToSell") is not None else p.get("allow_to_sell") + row.fetched_at = now + else: + db.add(CachedProduct( + user_id=user_id, + evotor_id=evo_id, + store_evotor_id=store_evo_id, + group_evotor_id=p.get("group") or p.get("parentUuid"), + name=p.get("name", ""), + price=float(price) if price is not None else None, + quantity=float(quantity) if quantity is not None else None, + measure_name=p.get("measureName") or p.get("measure_name"), + article_number=p.get("code") or p.get("article_number"), + allow_to_sell=p.get("allowToSell") if p.get("allowToSell") is not None else p.get("allow_to_sell"), + fetched_at=now, + )) + + db.commit() + logger.info( + "user=%s catalog synced: %d stores, %d groups, %d products", + user_id, + len(stores), + sum(1 for _ in db.query(CachedGroup).filter_by(user_id=user_id)), + sum(1 for _ in db.query(CachedProduct).filter_by(user_id=user_id)), + ) + + +# ── Celery task ─────────────────────────────────────────────────────────────── + +@shared_task( + name="web.tasks.catalog.refresh_catalog", + queue="default", + bind=True, + max_retries=2, + default_retry_delay=60, +) +def refresh_catalog(self) -> dict: + """Fetch and cache stores/groups/products for all connected Evotor users.""" + db = SessionLocal() + results = {"ok": 0, "failed": 0} + try: + connections = ( + db.query(EvotorConnection) + .filter( + EvotorConnection.user_id.isnot(None), + EvotorConnection.access_token.isnot(None), + EvotorConnection.access_token != "", + ) + .all() + ) + for conn in connections: + try: + _sync_user(db, conn.user_id, conn.access_token) + results["ok"] += 1 + except Exception as exc: + logger.error("catalog sync failed for user=%s: %s", conn.user_id, exc) + results["failed"] += 1 + finally: + db.close() + logger.info("refresh_catalog done: %s", results) + return results diff --git a/web/tasks/celery_app.py b/web/tasks/celery_app.py index eb5671a..ba340fc 100644 --- a/web/tasks/celery_app.py +++ b/web/tasks/celery_app.py @@ -1,4 +1,5 @@ from celery import Celery +from celery.schedules import timedelta from web.config import settings @@ -20,4 +21,17 @@ celery_app.conf.update( "web.tasks.catalog.*": {"queue": "default"}, "web.notifications.tasks.*": {"queue": "notifications"}, }, + beat_schedule={ + "refresh-catalog": { + "task": "web.tasks.catalog.refresh_catalog", + "schedule": timedelta(seconds=settings.CATALOG_REFRESH_INTERVAL_SECONDS), + }, + "refresh-vk-catalog": { + "task": "web.tasks.vk_catalog.refresh_vk_catalog", + "schedule": timedelta(seconds=settings.CATALOG_REFRESH_INTERVAL_SECONDS), + }, + }, ) + +# Register task modules so beat/worker can discover them +celery_app.autodiscover_tasks(["web.tasks.catalog", "web.tasks.vk_catalog"]) diff --git a/web/tasks/vk_catalog.py b/web/tasks/vk_catalog.py new file mode 100644 index 0000000..e274892 --- /dev/null +++ b/web/tasks/vk_catalog.py @@ -0,0 +1,167 @@ +""" +Periodic VK catalog sync: fetch albums and products from VK Market +for every connected user and upsert into vk_cached_* tables. +""" +import logging +from datetime import datetime, timezone + +import httpx +from celery import shared_task + +from web.config import settings +from web.database import SessionLocal +from web.models.connections import VkCachedAlbum, VkCachedProduct, VkConnection + +logger = logging.getLogger(__name__) + +VK_API = "https://api.vk.com/method" + + +def _now() -> datetime: + return datetime.now(timezone.utc).replace(tzinfo=None) + + +def _vk_get(method: str, params: dict, token: str) -> dict: + params = {**params, "access_token": token, "v": settings.VK_API_VERSION} + r = httpx.get(f"{VK_API}/{method}", params=params, timeout=20) + r.raise_for_status() + return r.json() + + +def _sync_user(db, user_id: int, token: str, group_id: str) -> None: + now = _now() + owner_id = f"-{group_id}" + + # ── albums ──────────────────────────────────────────────────────────────── + try: + data = _vk_get("market.getAlbums", {"owner_id": owner_id, "count": 100}, token) + except Exception as e: + logger.warning("user=%s vk fetch albums failed: %s", user_id, e) + return + + if "error" in data: + logger.warning("user=%s vk albums error: %s", user_id, data["error"]) + return + + albums = data.get("response", {}).get("items", []) + album_ids = [] + for a in albums: + aid = str(a["id"]) + album_ids.append(aid) + row = db.query(VkCachedAlbum).filter_by(user_id=user_id, vk_group_id=group_id, album_id=aid).first() + if row: + row.title = a.get("title", "") + row.count = a.get("count") + row.fetched_at = now + else: + db.add(VkCachedAlbum( + user_id=user_id, + vk_group_id=group_id, + album_id=aid, + title=a.get("title", ""), + count=a.get("count"), + fetched_at=now, + )) + db.flush() + + # ── products (extended=1 gives albums_ids per product) ─────────────────── + offset = 0 + all_products = [] + while True: + try: + data = _vk_get( + "market.get", + {"owner_id": owner_id, "count": 200, "offset": offset, "extended": 1}, + token, + ) + except Exception as e: + logger.warning("user=%s vk fetch products (extended) failed: %s", user_id, e) + break + + if "error" in data: + logger.warning("user=%s vk products (extended) error: %s", user_id, data["error"]) + break + + items = data.get("response", {}).get("items", []) + all_products.extend(items) + if len(items) < 200: + break + offset += 200 + + for p in all_products: + pid = str(p["id"]) + album_id = str(p["albums_ids"][0]) if p.get("albums_ids") else None + price_raw = p.get("price", {}).get("amount") + price = float(price_raw) / 100 if price_raw is not None else None + thumb = None + if p.get("thumb_photo"): + sizes = p["thumb_photo"].get("sizes", []) + if sizes: + thumb = sizes[-1].get("url") + + row = db.query(VkCachedProduct).filter_by( + user_id=user_id, vk_group_id=group_id, vk_product_id=pid, + ).first() + if row: + row.album_id = album_id + row.name = p.get("title", "") + row.description = p.get("description") + row.price = price + row.availability = p.get("availability") + row.thumb_url = thumb + row.fetched_at = now + else: + db.add(VkCachedProduct( + user_id=user_id, + vk_group_id=group_id, + vk_product_id=pid, + album_id=album_id, + name=p.get("title", ""), + description=p.get("description"), + price=price, + availability=p.get("availability"), + thumb_url=thumb, + fetched_at=now, + )) + + db.commit() + logger.info( + "user=%s vk catalog synced: group=%s albums=%d products=%d", + user_id, group_id, len(albums), len(all_products), + ) + + +@shared_task( + name="web.tasks.vk_catalog.refresh_vk_catalog", + queue="default", + bind=True, + max_retries=2, + default_retry_delay=60, +) +def refresh_vk_catalog(self) -> dict: + """Fetch and cache VK Market albums and products for all connected users.""" + db = SessionLocal() + results = {"ok": 0, "failed": 0} + try: + connections = ( + db.query(VkConnection) + .filter( + VkConnection.user_id.isnot(None), + VkConnection.access_token.isnot(None), + VkConnection.access_token != "", + VkConnection.vk_user_id.isnot(None), + VkConnection.vk_user_id != "", + ) + .all() + ) + for conn in connections: + try: + _sync_user(db, conn.user_id, conn.access_token, conn.vk_user_id) + results["ok"] += 1 + except Exception as exc: + logger.error("vk catalog sync failed for user=%s: %s", conn.user_id, exc) + results["failed"] += 1 + finally: + db.close() + logger.info("refresh_vk_catalog done: %s", results) + return results diff --git a/web/templates/base.html b/web/templates/base.html index 88f20e5..fb26005 100644 --- a/web/templates/base.html +++ b/web/templates/base.html @@ -17,7 +17,8 @@