Spaces:
Paused
Paused
| from datetime import datetime, timezone | |
| from dateutil import parser | |
| from fastapi import APIRouter, Request | |
| from pydantic import BaseModel | |
| from src.common.logger import logger | |
| from src.common.utils import get_client_ip, response_error, response_success, rsa_sign | |
| from src.db.supabase_client import ( | |
| get_first_activated_device, | |
| get_license, | |
| insert_activation, | |
| supabase, | |
| ) | |
| router = APIRouter() | |
| class ActivationRequest(BaseModel): | |
| email: str | |
| license_key: str | |
| device_id: str | |
| def verify(req: ActivationRequest, request: Request): | |
| logger.info( | |
| f"Verifying license for email: {req.email}, device_id: {req.device_id}, license_key: {req.license_key}" | |
| ) | |
| lic = get_license(req.license_key) | |
| if not lic: | |
| return response_error("LICENSE_NOT_FOUND", "License not found", 404) | |
| if lic["user_id"] != req.email: | |
| return response_error("EMAIL_NOT_MATCH", "Email mismatch", 403) | |
| activated_device = get_first_activated_device(lic["id"]) | |
| if activated_device and activated_device["device_id"] != req.device_id: | |
| return response_error("DEVICE_LIMIT", "Bound to another device", 403) | |
| if lic["status"] != "active": | |
| return response_error("LICENSE_INACTIVE", "Inactive license", 403) | |
| if lic["expires_at"]: | |
| exp = parser.isoparse(lic["expires_at"]) | |
| if exp < datetime.now(timezone.utc): | |
| return response_error("LICENSE_EXPIRED", "License expired", 403) | |
| tool_res = ( | |
| supabase.table("tools") | |
| .select("id, name") | |
| .eq("id", lic["tool_id"]) | |
| .limit(1) | |
| .execute() | |
| ) | |
| tool = tool_res.data[0] if tool_res.data else None | |
| features_res = ( | |
| supabase.table("license_features") | |
| .select("feature_id, tool_features ( id, name )") | |
| .eq("license_id", lic["id"]) | |
| .execute() | |
| ) | |
| purchased_features = [ | |
| row["tool_features"] for row in features_res.data if row.get("tool_features") | |
| ] | |
| verified_at = ( | |
| activated_device.get("verified_at") | |
| if activated_device and activated_device.get("verified_at") | |
| else datetime.now(timezone.utc).isoformat() | |
| ) | |
| license_obj = { | |
| "product": tool["name"], | |
| "product_id": tool["id"], | |
| "features": purchased_features, | |
| "license_key": req.license_key, | |
| "device_id": req.device_id, | |
| "user_id": lic["user_id"], | |
| "period_type": lic["period_type"], | |
| "verified_at": verified_at, | |
| "expires_at": lic["expires_at"], | |
| } | |
| signature = rsa_sign(license_obj) | |
| if not activated_device: | |
| insert_activation( | |
| lic["id"], req.license_key, req.model_dump(), get_client_ip(request) | |
| ) | |
| supabase.table("licenses").update( | |
| {"last_verified_at": datetime.now(timezone.utc).isoformat()} | |
| ).eq("id", lic["id"]).execute() | |
| return response_success({"license": license_obj, "signature": signature}) | |