- Replace manual community token entry with OAuth button that redirects to VK authorization and auto-saves token via /vk/callback - Fix groups.get API call (was groups.getById) to correctly retrieve admin group id and name from user token response - Fix price comparison: VK price.amount is in roubles, not kopecks - Keep manual token input as fallback when VK_CLIENT_ID is not set Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
169 lines
5.0 KiB
Python
169 lines
5.0 KiB
Python
from datetime import datetime
|
|
from urllib.parse import urlencode
|
|
|
|
import httpx
|
|
|
|
from fastapi import APIRouter, Request, Depends
|
|
from fastapi.responses import RedirectResponse
|
|
from web.templates_env import templates
|
|
from sqlalchemy.orm import Session
|
|
|
|
from web.auth import get_current_user
|
|
from web.config import settings
|
|
from web.database import get_db
|
|
from web.models import User, VkConnection
|
|
|
|
router = APIRouter(prefix="/vk")
|
|
|
|
VK_API_URL = "https://api.vk.com/method"
|
|
VK_OAUTH_URL = "https://oauth.vk.com/authorize"
|
|
|
|
|
|
async def _fetch_group_info(token: str) -> tuple[str | None, str | None]:
|
|
"""Returns (group_id, group_name) for the first admin group, or (None, None)."""
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.get(
|
|
f"{VK_API_URL}/groups.get",
|
|
params={
|
|
"access_token": token,
|
|
"v": settings.VK_API_VERSION,
|
|
"filter": "admin",
|
|
"extended": 1,
|
|
"count": 1,
|
|
},
|
|
timeout=15,
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if "error" not in data:
|
|
items = data.get("response", {}).get("items", [])
|
|
if items:
|
|
return str(items[0].get("id", "")), items[0].get("name")
|
|
except Exception:
|
|
pass
|
|
return None, None
|
|
|
|
|
|
def _save_connection(db: Session, user_id: int, token: str,
|
|
group_id: str | None, group_name: str | None) -> None:
|
|
now = datetime.utcnow()
|
|
connection = db.query(VkConnection).filter(VkConnection.user_id == user_id).first()
|
|
if connection:
|
|
connection.access_token = token
|
|
connection.vk_user_id = group_id
|
|
connection.first_name = group_name
|
|
connection.last_name = None
|
|
connection.is_online = True
|
|
connection.last_checked_at = now
|
|
else:
|
|
db.add(VkConnection(
|
|
user_id=user_id,
|
|
access_token=token,
|
|
vk_user_id=group_id,
|
|
first_name=group_name,
|
|
last_name=None,
|
|
is_online=True,
|
|
last_checked_at=now,
|
|
))
|
|
db.commit()
|
|
|
|
|
|
@router.get("")
|
|
def vk_page(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
user: User | None = Depends(get_current_user),
|
|
):
|
|
if not user:
|
|
return RedirectResponse("/login", 303)
|
|
|
|
connection = db.query(VkConnection).filter(VkConnection.user_id == user.id).first()
|
|
error = request.query_params.get("error")
|
|
return templates.TemplateResponse("vk.html", {
|
|
"request": request,
|
|
"user": user,
|
|
"connection": connection,
|
|
"error": error,
|
|
"vk_client_id": settings.VK_CLIENT_ID,
|
|
"callback_url": f"{settings.BASE_URL}/vk/callback",
|
|
})
|
|
|
|
|
|
@router.get("/connect")
|
|
def vk_connect(
|
|
request: Request,
|
|
user: User | None = Depends(get_current_user),
|
|
):
|
|
"""Redirect to VK OAuth authorization page."""
|
|
if not user:
|
|
return RedirectResponse("/login", 303)
|
|
|
|
if not settings.VK_CLIENT_ID:
|
|
return RedirectResponse("/vk?error=no_client_id", 303)
|
|
|
|
params = urlencode({
|
|
"client_id": settings.VK_CLIENT_ID,
|
|
"scope": "market,groups",
|
|
"redirect_uri": f"{settings.BASE_URL}/vk/callback",
|
|
"display": "page",
|
|
"response_type": "token",
|
|
"v": settings.VK_API_VERSION,
|
|
})
|
|
return RedirectResponse(f"{VK_OAUTH_URL}?{params}", 302)
|
|
|
|
|
|
@router.get("/callback")
|
|
def vk_callback(
|
|
request: Request,
|
|
user: User | None = Depends(get_current_user),
|
|
):
|
|
"""Landing page after VK OAuth. JS reads the token from the URL fragment and POSTs it."""
|
|
if not user:
|
|
return RedirectResponse("/login", 303)
|
|
|
|
return templates.TemplateResponse("vk_callback.html", {
|
|
"request": request,
|
|
"user": user,
|
|
})
|
|
|
|
|
|
@router.post("/token")
|
|
async def vk_token(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
user: User | None = Depends(get_current_user),
|
|
):
|
|
"""Save a VK user access token (from manual entry or OAuth callback)."""
|
|
if not user:
|
|
return RedirectResponse("/login", 303)
|
|
|
|
form = await request.form()
|
|
token = (form.get("token") or "").strip()
|
|
if not token:
|
|
return RedirectResponse("/vk?error=empty_token", 303)
|
|
|
|
group_id, group_name = await _fetch_group_info(token)
|
|
if not group_id:
|
|
return RedirectResponse("/vk?error=invalid_token", 303)
|
|
|
|
_save_connection(db, user.id, token, group_id, group_name)
|
|
return RedirectResponse("/connections", 303)
|
|
|
|
|
|
@router.post("/disconnect")
|
|
async def vk_disconnect(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
user: User | None = Depends(get_current_user),
|
|
):
|
|
if not user:
|
|
return RedirectResponse("/login", 303)
|
|
|
|
connection = db.query(VkConnection).filter(VkConnection.user_id == user.id).first()
|
|
if connection:
|
|
db.delete(connection)
|
|
db.commit()
|
|
|
|
return RedirectResponse("/connections", 303)
|