verivid / app /api /routes.py
bahaeddinmselmi
fix(mobile): optimize frame extraction with scaling and allow direct video URLs
418f08d
# C:\Users\bahae\.gemini\antigravity\scratch\verivid-ai\backend\app\api\routes.py
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, Request, Header, HTTPException
from fastapi.responses import JSONResponse
from app.services.pipeline import run_analysis_pipeline
from app.core.licensing import check_rate_limit, verify_gumroad_license, FREE_DAILY_LIMIT, get_remaining_analyses
from pydantic import BaseModel
import uuid
import os
import shutil
import json
from datetime import datetime
router = APIRouter()
JOBS = {}
TEMP_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'temp')
REPORTS_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'reports')
EMAILS_FILE = os.path.join(os.path.dirname(__file__), '..', '..', 'emails.json')
# Ensure reports directory exists
os.makedirs(REPORTS_DIR, exist_ok=True)
class LicenseCheck(BaseModel):
license_key: str
@router.get("/usage")
async def get_usage(
request: Request,
x_license_key: str = Header(None),
x_fingerprint: str = Header(None),
authorization: str = Header(None)
):
"""Get remaining analyses for this user without incrementing."""
client_ip = request.client.host
result = get_remaining_analyses(client_ip, x_license_key, x_fingerprint, authorization)
return result
@router.post("/verify-license")
async def check_license(data: LicenseCheck):
"""Verify Gumroad license key"""
result = verify_gumroad_license(data.license_key)
if result["valid"]:
return result
else:
return JSONResponse(status_code=400, content=result)
@router.post("/analyze")
async def start_analysis(
request: Request,
background_tasks: BackgroundTasks,
url: str = Form(None),
file: UploadFile = File(None),
x_license_key: str = Header(None),
x_fingerprint: str = Header(None),
authorization: str = Header(None)
):
"""
Start video analysis.
Enforces 5/day limit for free users.
Unlimited for Pro users (with valid x-license-key or Supabase Auth).
Uses browser fingerprint for VPN-resistant tracking.
"""
# 1. Check Rate Limit / License (with fingerprint for VPN protection)
client_ip = request.client.host
limit_check = check_rate_limit(client_ip, x_license_key, x_fingerprint, authorization)
if not limit_check["allowed"]:
return JSONResponse(
status_code=429,
content={
"error": limit_check["message"],
"is_rate_limit": True,
"limit": FREE_DAILY_LIMIT
}
)
# Validate input
if not url and not (file and file.filename):
return JSONResponse(
status_code=400,
content={"error": "Please provide either a URL or upload a file"}
)
# Supported video platforms
SUPPORTED_PLATFORMS = [
'tiktok.com', 'youtube.com', 'youtu.be', 'instagram.com',
'twitter.com', 'x.com', 'facebook.com', 'fb.watch',
'vimeo.com', 'dailymotion.com', 'twitch.tv'
]
# Enhanced URL validation
if url:
if len(url) > 2000:
return JSONResponse(status_code=400, content={"error": "URL too long"})
if not url.startswith(("http://", "https://")):
return JSONResponse(status_code=400, content={"error": "Invalid URL format. Must start with http:// or https://"})
# Check if URL is from a supported platform OR a direct video file
url_lower = url.lower()
is_supported = any(platform in url_lower for platform in SUPPORTED_PLATFORMS)
is_direct_video = any(ext in url_lower.split("?")[0] for ext in ['.mp4', '.mov', '.webm', '.avi', '.mkv'])
if not is_supported and not is_direct_video:
return JSONResponse(
status_code=400,
content={"error": "Unsupported platform. We support: TikTok, YouTube, Instagram, Twitter/X, Facebook and direct video files (.mp4, .mov, etc.)"}
)
job_id = str(uuid.uuid4())
JOBS[job_id] = {"status": "queued"}
# If file uploaded, validate and save with size limit
file_path = None
if file and file.filename:
# Allowed video MIME types
ALLOWED_TYPES = ['video/mp4', 'video/webm', 'video/quicktime', 'video/x-msvideo', 'video/x-matroska']
# Check content type
if file.content_type and file.content_type not in ALLOWED_TYPES:
return JSONResponse(
status_code=400,
content={"error": f"Invalid file type. Allowed: MP4, WebM, MOV, AVI, MKV"}
)
# Check file extension as backup
allowed_extensions = ['.mp4', '.webm', '.mov', '.avi', '.mkv']
file_ext = os.path.splitext(file.filename)[1].lower()
if file_ext not in allowed_extensions:
return JSONResponse(
status_code=400,
content={"error": f"Invalid file extension. Allowed: {', '.join(allowed_extensions)}"}
)
# 100MB limit
MAX_SIZE = 100 * 1024 * 1024
contents = await file.read()
if len(contents) > MAX_SIZE:
return JSONResponse(status_code=400, content={"error": "File too large (max 100MB)"})
os.makedirs(TEMP_DIR, exist_ok=True)
file_path = os.path.join(TEMP_DIR, f"{job_id}_upload{file_ext}")
with open(file_path, 'wb') as f:
f.write(contents)
# Start async processing
background_tasks.add_task(
run_analysis_pipeline,
job_id,
url,
file_path,
JOBS
)
return {
"job_id": job_id,
"status": "queued",
"remaining_free": limit_check.get("remaining", "unlimited")
}
@router.get("/analyze/{job_id}")
async def get_result(request: Request, job_id: str):
"""Get analysis result by job ID"""
# Validate job_id format (UUID)
try:
uuid.UUID(job_id)
except ValueError:
return JSONResponse(status_code=400, content={"error": "Invalid job ID"})
result = JOBS.get(job_id)
if not result:
return JSONResponse(status_code=404, content={"status": "not_found"})
return result
@router.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "ok", "jobs_in_memory": len(JOBS)}
class ReportData(BaseModel):
"""Model for saving a report"""
report_id: str
score: int
recommendation: str
confidence: str
explanation: str = None
signals: dict = None
video_info: dict = None
disclaimer: str = None
@router.post("/reports")
async def save_report(data: ReportData):
"""Save a report for sharing. Returns the shareable ID."""
try:
report = data.dict()
report["created_at"] = datetime.utcnow().isoformat()
# Save to file
report_path = os.path.join(REPORTS_DIR, f"{data.report_id}.json")
with open(report_path, 'w') as f:
json.dump(report, f)
return {"success": True, "report_id": data.report_id}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@router.get("/reports/{report_id}")
async def get_report(report_id: str):
"""Get a saved report by ID for sharing."""
# Validate report_id format (UUID)
try:
uuid.UUID(report_id)
except ValueError:
return JSONResponse(status_code=400, content={"error": "Invalid report ID"})
report_path = os.path.join(REPORTS_DIR, f"{report_id}.json")
if not os.path.exists(report_path):
return JSONResponse(status_code=404, content={"error": "Report not found"})
try:
with open(report_path, 'r') as f:
report = json.load(f)
return report
except Exception as e:
return JSONResponse(status_code=500, content={"error": "Failed to load report"})
class EmailSubmission(BaseModel):
"""Model for email submission"""
email: str
@router.post("/emails")
async def save_email(data: EmailSubmission):
"""Save email for lead generation."""
email = data.email.strip().lower()
# Basic validation
if not email or "@" not in email or "." not in email:
return JSONResponse(status_code=400, content={"error": "Invalid email"})
try:
# Load existing emails
emails = []
if os.path.exists(EMAILS_FILE):
with open(EMAILS_FILE, 'r') as f:
emails = json.load(f)
# Check if email already exists
existing = [e for e in emails if e.get("email") == email]
if not existing:
emails.append({
"email": email,
"created_at": datetime.utcnow().isoformat()
})
# Save back
with open(EMAILS_FILE, 'w') as f:
json.dump(emails, f, indent=2)
return {"success": True, "message": "Email saved"}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@router.get("/admin/emails")
async def get_emails(admin_key: str = ""):
"""
Retrieve collected emails.
Protected by simple admin key.
"""
# Simple hardcoded key for now - user is the only admin
if admin_key != "verivid_admin_secret_2024":
return JSONResponse(status_code=403, content={"error": "Unauthorized"})
if not os.path.exists(EMAILS_FILE):
return []
try:
with open(EMAILS_FILE, 'r') as f:
emails = json.load(f)
return emails
except Exception as e:
return JSONResponse(status_code=500, content={"error": "Failed to load emails"})
class BatchAnalysisRequest(BaseModel):
urls: list[str]
@router.post("/analyze/batch")
async def batch_analysis(
request: Request,
background_tasks: BackgroundTasks,
data: BatchAnalysisRequest,
x_license_key: str = Header(None)
):
"""
Start batch analysis (PRO feature).
Requires a valid license key.
Limits to 5 URLs per batch.
"""
# 1. Enforce Pro License
if not x_license_key:
return JSONResponse(
status_code=403,
content={"error": "Batch analysis is a Pro feature. Please upgrade."}
)
license_status = verify_gumroad_license(x_license_key)
if not license_status["valid"]:
return JSONResponse(
status_code=403,
content={"error": "Invalid license key"}
)
# 2. Validate Requests
urls = [u for u in data.urls if u.strip()]
if not urls:
return JSONResponse(status_code=400, content={"error": "No URLs provided"})
if len(urls) > 5:
return JSONResponse(
status_code=400,
content={"error": "Batch limit is 5 videos at a time."}
)
# 3. Start Jobs
jobs_started = []
for url in urls:
job_id = str(uuid.uuid4())
JOBS[job_id] = {"status": "processing", "url": url}
# Add to background task
background_tasks.add_task(
run_analysis_pipeline,
job_id,
url,
None, # No file path for URL analysis
JOBS
)
jobs_started.append({"url": url, "job_id": job_id})
return {
"batch_id": str(uuid.uuid4()),
"jobs": jobs_started,
"message": f"Started {len(jobs_started)} analyses"
}