import os import uuid import asyncio import tempfile import logging import requests from pathlib import Path from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from playwright.async_api import async_playwright # ── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger(__name__) # ── Cloudinary — unsigned upload (no API key needed) ───────────────────────── CLOUD_NAME = os.environ.get("CLOUDINARY_CLOUD_NAME", "doxoms9hd") UPLOAD_PRESET = os.environ.get("CLOUDINARY_UPLOAD_PRESET", "testing") # ── FastAPI ─────────────────────────────────────────────────────────────────── app = FastAPI(title="HTML → Video Recorder", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # ── Request / Response models ───────────────────────────────────────────────── class RecordRequest(BaseModel): html_url: str # URL of the HTML page to record duration_ms: int = 3000 # how long to record (ms) width: int = 1080 # viewport width (px) height: int = 1920 # viewport height (px) fps: int = 30 # not used by Playwright directly but kept for docs format: str = "mp4" # "mp4" or "webm" class RecordResponse(BaseModel): video_url: str duration_ms: int width: int height: int # ── Core recording logic ────────────────────────────────────────────────────── async def record_html( html_url: str, duration_ms: int, width: int, height: int, ) -> str: """ Opens html_url in a headless Chromium browser, records the viewport for duration_ms milliseconds, and returns the local path to the webm file. """ video_dir = Path(tempfile.mkdtemp(dir="/tmp/videos")) log.info(f"Recording {html_url} for {duration_ms}ms → {video_dir}") async with async_playwright() as p: browser = await p.chromium.launch( args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--disable-software-rasterizer", ] ) context = await browser.new_context( viewport={"width": width, "height": height}, record_video_dir=str(video_dir), record_video_size={"width": width, "height": height}, device_scale_factor=1, ) page = await context.new_page() try: # Navigate and wait for network + animations to settle await page.goto(html_url, wait_until="networkidle", timeout=30_000) log.info("Page loaded, recording animation…") except Exception as e: log.warning(f"networkidle timed out ({e}), continuing anyway") # Let the animation play await asyncio.sleep(duration_ms / 1000) # Grab the video path BEFORE closing the context (it finalises the file) video_path = await page.video.path() await context.close() await browser.close() log.info(f"Video saved: {video_path}") return str(video_path) async def convert_to_mp4(webm_path: str) -> str: """Converts a .webm file to .mp4 using ffmpeg for maximum compatibility.""" mp4_path = webm_path.replace(".webm", ".mp4") proc = await asyncio.create_subprocess_exec( "ffmpeg", "-y", "-i", webm_path, "-c:v", "libx264", "-preset", "ultrafast", # as fast as possible "-crf", "20", "-movflags", "+faststart", "-an", # no audio (pure animation) mp4_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"ffmpeg failed: {stderr.decode()}") log.info(f"Converted to MP4: {mp4_path}") return mp4_path def upload_to_cloudinary(file_path: str) -> str: """Unsigned upload to Cloudinary using upload preset — no API key needed.""" public_id = f"html_recordings/{uuid.uuid4().hex}" log.info(f"Uploading {file_path} → Cloudinary (unsigned) {public_id}") url = f"https://api.cloudinary.com/v1_1/{CLOUD_NAME}/video/upload" with open(file_path, "rb") as f: resp = requests.post(url, data={ "upload_preset": UPLOAD_PRESET, "public_id": public_id, }, files={"file": f}) resp.raise_for_status() return resp.json()["secure_url"] # ── Endpoints ───────────────────────────────────────────────────────────────── @app.get("/health") async def health(): return {"status": "ok"} @app.post("/record", response_model=RecordResponse) async def record(req: RecordRequest): if req.duration_ms < 100 or req.duration_ms > 60_000: raise HTTPException(400, "duration_ms must be between 100 and 60000") if req.width < 100 or req.width > 3840: raise HTTPException(400, "width must be between 100 and 3840") if req.height < 100 or req.height > 3840: raise HTTPException(400, "height must be between 100 and 3840") try: # 1. Record the page webm_path = await record_html(req.html_url, req.duration_ms, req.width, req.height) # 2. Convert to MP4 (unless user asked for webm) if req.format.lower() == "mp4": final_path = await convert_to_mp4(webm_path) else: final_path = webm_path # 3. Upload to Cloudinary video_url = upload_to_cloudinary(final_path) # 4. Cleanup try: os.remove(webm_path) if final_path != webm_path: os.remove(final_path) except Exception: pass return RecordResponse( video_url=video_url, duration_ms=req.duration_ms, width=req.width, height=req.height, ) except Exception as e: log.error(f"Recording failed: {e}", exc_info=True) raise HTTPException(500, f"Recording failed: {str(e)}")