Spaces:
Sleeping
Sleeping
bahaeddinmselmi
fix(mobile): optimize frame extraction with scaling and allow direct video URLs
418f08d | # C:\Users\bahae\.gemini\antigravity\scratch\verivid-ai\backend\app\api\routes.py | |
| from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, Request, Header, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from app.services.pipeline import run_analysis_pipeline | |
| from app.core.licensing import check_rate_limit, verify_gumroad_license, FREE_DAILY_LIMIT, get_remaining_analyses | |
| from pydantic import BaseModel | |
| import uuid | |
| import os | |
| import shutil | |
| import json | |
| from datetime import datetime | |
| router = APIRouter() | |
| JOBS = {} | |
| TEMP_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'temp') | |
| REPORTS_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'reports') | |
| EMAILS_FILE = os.path.join(os.path.dirname(__file__), '..', '..', 'emails.json') | |
| # Ensure reports directory exists | |
| os.makedirs(REPORTS_DIR, exist_ok=True) | |
| class LicenseCheck(BaseModel): | |
| license_key: str | |
| async def get_usage( | |
| request: Request, | |
| x_license_key: str = Header(None), | |
| x_fingerprint: str = Header(None), | |
| authorization: str = Header(None) | |
| ): | |
| """Get remaining analyses for this user without incrementing.""" | |
| client_ip = request.client.host | |
| result = get_remaining_analyses(client_ip, x_license_key, x_fingerprint, authorization) | |
| return result | |
| async def check_license(data: LicenseCheck): | |
| """Verify Gumroad license key""" | |
| result = verify_gumroad_license(data.license_key) | |
| if result["valid"]: | |
| return result | |
| else: | |
| return JSONResponse(status_code=400, content=result) | |
| async def start_analysis( | |
| request: Request, | |
| background_tasks: BackgroundTasks, | |
| url: str = Form(None), | |
| file: UploadFile = File(None), | |
| x_license_key: str = Header(None), | |
| x_fingerprint: str = Header(None), | |
| authorization: str = Header(None) | |
| ): | |
| """ | |
| Start video analysis. | |
| Enforces 5/day limit for free users. | |
| Unlimited for Pro users (with valid x-license-key or Supabase Auth). | |
| Uses browser fingerprint for VPN-resistant tracking. | |
| """ | |
| # 1. Check Rate Limit / License (with fingerprint for VPN protection) | |
| client_ip = request.client.host | |
| limit_check = check_rate_limit(client_ip, x_license_key, x_fingerprint, authorization) | |
| if not limit_check["allowed"]: | |
| return JSONResponse( | |
| status_code=429, | |
| content={ | |
| "error": limit_check["message"], | |
| "is_rate_limit": True, | |
| "limit": FREE_DAILY_LIMIT | |
| } | |
| ) | |
| # Validate input | |
| if not url and not (file and file.filename): | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Please provide either a URL or upload a file"} | |
| ) | |
| # Supported video platforms | |
| SUPPORTED_PLATFORMS = [ | |
| 'tiktok.com', 'youtube.com', 'youtu.be', 'instagram.com', | |
| 'twitter.com', 'x.com', 'facebook.com', 'fb.watch', | |
| 'vimeo.com', 'dailymotion.com', 'twitch.tv' | |
| ] | |
| # Enhanced URL validation | |
| if url: | |
| if len(url) > 2000: | |
| return JSONResponse(status_code=400, content={"error": "URL too long"}) | |
| if not url.startswith(("http://", "https://")): | |
| return JSONResponse(status_code=400, content={"error": "Invalid URL format. Must start with http:// or https://"}) | |
| # Check if URL is from a supported platform OR a direct video file | |
| url_lower = url.lower() | |
| is_supported = any(platform in url_lower for platform in SUPPORTED_PLATFORMS) | |
| is_direct_video = any(ext in url_lower.split("?")[0] for ext in ['.mp4', '.mov', '.webm', '.avi', '.mkv']) | |
| if not is_supported and not is_direct_video: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Unsupported platform. We support: TikTok, YouTube, Instagram, Twitter/X, Facebook and direct video files (.mp4, .mov, etc.)"} | |
| ) | |
| job_id = str(uuid.uuid4()) | |
| JOBS[job_id] = {"status": "queued"} | |
| # If file uploaded, validate and save with size limit | |
| file_path = None | |
| if file and file.filename: | |
| # Allowed video MIME types | |
| ALLOWED_TYPES = ['video/mp4', 'video/webm', 'video/quicktime', 'video/x-msvideo', 'video/x-matroska'] | |
| # Check content type | |
| if file.content_type and file.content_type not in ALLOWED_TYPES: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": f"Invalid file type. Allowed: MP4, WebM, MOV, AVI, MKV"} | |
| ) | |
| # Check file extension as backup | |
| allowed_extensions = ['.mp4', '.webm', '.mov', '.avi', '.mkv'] | |
| file_ext = os.path.splitext(file.filename)[1].lower() | |
| if file_ext not in allowed_extensions: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": f"Invalid file extension. Allowed: {', '.join(allowed_extensions)}"} | |
| ) | |
| # 100MB limit | |
| MAX_SIZE = 100 * 1024 * 1024 | |
| contents = await file.read() | |
| if len(contents) > MAX_SIZE: | |
| return JSONResponse(status_code=400, content={"error": "File too large (max 100MB)"}) | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| file_path = os.path.join(TEMP_DIR, f"{job_id}_upload{file_ext}") | |
| with open(file_path, 'wb') as f: | |
| f.write(contents) | |
| # Start async processing | |
| background_tasks.add_task( | |
| run_analysis_pipeline, | |
| job_id, | |
| url, | |
| file_path, | |
| JOBS | |
| ) | |
| return { | |
| "job_id": job_id, | |
| "status": "queued", | |
| "remaining_free": limit_check.get("remaining", "unlimited") | |
| } | |
| async def get_result(request: Request, job_id: str): | |
| """Get analysis result by job ID""" | |
| # Validate job_id format (UUID) | |
| try: | |
| uuid.UUID(job_id) | |
| except ValueError: | |
| return JSONResponse(status_code=400, content={"error": "Invalid job ID"}) | |
| result = JOBS.get(job_id) | |
| if not result: | |
| return JSONResponse(status_code=404, content={"status": "not_found"}) | |
| return result | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "ok", "jobs_in_memory": len(JOBS)} | |
| class ReportData(BaseModel): | |
| """Model for saving a report""" | |
| report_id: str | |
| score: int | |
| recommendation: str | |
| confidence: str | |
| explanation: str = None | |
| signals: dict = None | |
| video_info: dict = None | |
| disclaimer: str = None | |
| async def save_report(data: ReportData): | |
| """Save a report for sharing. Returns the shareable ID.""" | |
| try: | |
| report = data.dict() | |
| report["created_at"] = datetime.utcnow().isoformat() | |
| # Save to file | |
| report_path = os.path.join(REPORTS_DIR, f"{data.report_id}.json") | |
| with open(report_path, 'w') as f: | |
| json.dump(report, f) | |
| return {"success": True, "report_id": data.report_id} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def get_report(report_id: str): | |
| """Get a saved report by ID for sharing.""" | |
| # Validate report_id format (UUID) | |
| try: | |
| uuid.UUID(report_id) | |
| except ValueError: | |
| return JSONResponse(status_code=400, content={"error": "Invalid report ID"}) | |
| report_path = os.path.join(REPORTS_DIR, f"{report_id}.json") | |
| if not os.path.exists(report_path): | |
| return JSONResponse(status_code=404, content={"error": "Report not found"}) | |
| try: | |
| with open(report_path, 'r') as f: | |
| report = json.load(f) | |
| return report | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": "Failed to load report"}) | |
| class EmailSubmission(BaseModel): | |
| """Model for email submission""" | |
| email: str | |
| async def save_email(data: EmailSubmission): | |
| """Save email for lead generation.""" | |
| email = data.email.strip().lower() | |
| # Basic validation | |
| if not email or "@" not in email or "." not in email: | |
| return JSONResponse(status_code=400, content={"error": "Invalid email"}) | |
| try: | |
| # Load existing emails | |
| emails = [] | |
| if os.path.exists(EMAILS_FILE): | |
| with open(EMAILS_FILE, 'r') as f: | |
| emails = json.load(f) | |
| # Check if email already exists | |
| existing = [e for e in emails if e.get("email") == email] | |
| if not existing: | |
| emails.append({ | |
| "email": email, | |
| "created_at": datetime.utcnow().isoformat() | |
| }) | |
| # Save back | |
| with open(EMAILS_FILE, 'w') as f: | |
| json.dump(emails, f, indent=2) | |
| return {"success": True, "message": "Email saved"} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def get_emails(admin_key: str = ""): | |
| """ | |
| Retrieve collected emails. | |
| Protected by simple admin key. | |
| """ | |
| # Simple hardcoded key for now - user is the only admin | |
| if admin_key != "verivid_admin_secret_2024": | |
| return JSONResponse(status_code=403, content={"error": "Unauthorized"}) | |
| if not os.path.exists(EMAILS_FILE): | |
| return [] | |
| try: | |
| with open(EMAILS_FILE, 'r') as f: | |
| emails = json.load(f) | |
| return emails | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": "Failed to load emails"}) | |
| class BatchAnalysisRequest(BaseModel): | |
| urls: list[str] | |
| async def batch_analysis( | |
| request: Request, | |
| background_tasks: BackgroundTasks, | |
| data: BatchAnalysisRequest, | |
| x_license_key: str = Header(None) | |
| ): | |
| """ | |
| Start batch analysis (PRO feature). | |
| Requires a valid license key. | |
| Limits to 5 URLs per batch. | |
| """ | |
| # 1. Enforce Pro License | |
| if not x_license_key: | |
| return JSONResponse( | |
| status_code=403, | |
| content={"error": "Batch analysis is a Pro feature. Please upgrade."} | |
| ) | |
| license_status = verify_gumroad_license(x_license_key) | |
| if not license_status["valid"]: | |
| return JSONResponse( | |
| status_code=403, | |
| content={"error": "Invalid license key"} | |
| ) | |
| # 2. Validate Requests | |
| urls = [u for u in data.urls if u.strip()] | |
| if not urls: | |
| return JSONResponse(status_code=400, content={"error": "No URLs provided"}) | |
| if len(urls) > 5: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Batch limit is 5 videos at a time."} | |
| ) | |
| # 3. Start Jobs | |
| jobs_started = [] | |
| for url in urls: | |
| job_id = str(uuid.uuid4()) | |
| JOBS[job_id] = {"status": "processing", "url": url} | |
| # Add to background task | |
| background_tasks.add_task( | |
| run_analysis_pipeline, | |
| job_id, | |
| url, | |
| None, # No file path for URL analysis | |
| JOBS | |
| ) | |
| jobs_started.append({"url": url, "job_id": job_id}) | |
| return { | |
| "batch_id": str(uuid.uuid4()), | |
| "jobs": jobs_started, | |
| "message": f"Started {len(jobs_started)} analyses" | |
| } | |