import json import subprocess import tempfile from pathlib import Path import httpx from fastapi import FastAPI, HTTPException from pydantic import BaseModel app = FastAPI(title="Media Processor") class ProcessRequest(BaseModel): asset_id: str video_url: str api_url: str api_key: str class ThumbnailRequest(BaseModel): asset_id: str file_url: str api_url: str api_key: str # ── Helpers ────────────────────────────────────────────── async def _download_file(url: str, dest: Path): """Stream-download a file to disk.""" async with httpx.AsyncClient(timeout=120) as client: async with client.stream("GET", url) as resp: if resp.status_code != 200: raise HTTPException( status_code=400, detail=f"Failed to download: {resp.status_code}", ) with open(dest, "wb") as f: async for chunk in resp.aiter_bytes(chunk_size=65536): f.write(chunk) def _resize_to_thumbnail(src: Path, dest: Path): """Resize image to width=300, height proportional.""" subprocess.run( [ "ffmpeg", "-i", str(src), "-vf", "scale=300:-1", "-q:v", "3", str(dest), ], capture_output=True, ) async def _upload_image(file_path: Path, filename: str, api_url: str, api_key: str) -> str | None: """Upload an image file to asset-service and return its file URL (with filename).""" if not file_path.exists(): return None async with httpx.AsyncClient(timeout=60) as client: with open(file_path, "rb") as f: resp = await client.post( f"{api_url}/api/assets/upload", headers={"Authorization": f"Bearer {api_key}"}, files={"file": (filename, f, "image/jpeg")}, data={"type": "image", "title": filename, "app": "media_processor"}, ) if resp.status_code in (200, 201): asset = resp.json().get("asset", {}) # Use the url from response directly (already has filename in path) return asset.get("url") or asset.get("asset_details", {}).get("url") return None # ── Endpoints ──────────────────────────────────────────── @app.get("/health") async def health(): ffmpeg_ok = subprocess.run( ["ffmpeg", "-version"], capture_output=True ).returncode == 0 return {"status": "ok", "ffmpeg": ffmpeg_ok} @app.post("/thumbnail") async def generate_thumbnail(req: ThumbnailRequest): """Generate a 300px-wide thumbnail for an image asset.""" tmp_dir = tempfile.mkdtemp() src_path = Path(tmp_dir) / f"{req.asset_id}_src" thumb_path = Path(tmp_dir) / f"{req.asset_id}_thumb.jpg" try: # 1. Download image await _download_file(req.file_url, src_path) # 2. Resize to 300px width _resize_to_thumbnail(src_path, thumb_path) # 3. Upload thumbnail thumbnail_url = await _upload_image( thumb_path, f"{req.asset_id}_thumb.jpg", req.api_url, req.api_key ) # 4. Update asset with thumbnail_url if thumbnail_url: async with httpx.AsyncClient(timeout=30) as client: await client.put( f"{req.api_url}/api/assets/{req.asset_id}", headers={ "Authorization": f"Bearer {req.api_key}", "Content-Type": "application/json", }, json={"asset_details": {"thumbnail_url": thumbnail_url}}, ) return {"success": True, "thumbnail_url": thumbnail_url} except HTTPException: raise except Exception as e: return {"success": False, "error": str(e)} finally: for p in (src_path, thumb_path): if p.exists(): p.unlink() if Path(tmp_dir).exists(): Path(tmp_dir).rmdir() @app.post("/process") async def process_video(req: ProcessRequest): """Extract cover + thumbnail + metadata from a video asset.""" tmp_dir = tempfile.mkdtemp() video_path = Path(tmp_dir) / f"{req.asset_id}.mp4" cover_path = Path(tmp_dir) / f"{req.asset_id}_cover.jpg" thumb_path = Path(tmp_dir) / f"{req.asset_id}_thumb.jpg" try: # 1. Download video await _download_file(req.video_url, video_path) # 2. Extract metadata with ffprobe probe_result = subprocess.run( [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(video_path), ], capture_output=True, text=True, ) if probe_result.returncode != 0: raise HTTPException(status_code=500, detail="ffprobe failed") probe_data = json.loads(probe_result.stdout) video_stream = next( (s for s in probe_data.get("streams", []) if s.get("codec_type") == "video"), None, ) audio_stream = next( (s for s in probe_data.get("streams", []) if s.get("codec_type") == "audio"), None, ) fmt = probe_data.get("format", {}) duration = float(fmt.get("duration", 0)) width = int(video_stream.get("width", 0)) if video_stream else 0 height = int(video_stream.get("height", 0)) if video_stream else 0 fps = 0.0 if video_stream and video_stream.get("r_frame_rate"): parts = video_stream["r_frame_rate"].split("/") if len(parts) == 2 and int(parts[1]) != 0: fps = round(int(parts[0]) / int(parts[1]), 2) media_info = { "format": fmt.get("format_name", ""), "video_codec": video_stream.get("codec_name", "") if video_stream else "", "audio_codec": audio_stream.get("codec_name", "") if audio_stream else "", "bitrate": int(fmt.get("bit_rate", 0)), "fps": fps, } # 3. Extract cover frame (full resolution) seek_time = "1" if duration > 1 else "0" result = subprocess.run( [ "ffmpeg", "-ss", seek_time, "-i", str(video_path), "-frames:v", "1", "-q:v", "2", str(cover_path), ], capture_output=True, ) if result.returncode != 0: subprocess.run( [ "ffmpeg", "-ss", "0", "-i", str(video_path), "-frames:v", "1", "-q:v", "2", str(cover_path), ], capture_output=True, ) # 4. Generate 300px thumbnail from cover if cover_path.exists(): _resize_to_thumbnail(cover_path, thumb_path) # 5. Upload cover + thumbnail cover_url = await _upload_image( cover_path, f"{req.asset_id}_cover.jpg", req.api_url, req.api_key ) thumbnail_url = await _upload_image( thumb_path, f"{req.asset_id}_thumb.jpg", req.api_url, req.api_key ) # 6. Update original asset async with httpx.AsyncClient(timeout=30) as client: await client.put( f"{req.api_url}/api/assets/{req.asset_id}", headers={ "Authorization": f"Bearer {req.api_key}", "Content-Type": "application/json", }, json={ "asset_details": { "cover_url": cover_url, "thumbnail_url": thumbnail_url, "duration": duration, "width": width, "height": height, "media_info": json.dumps(media_info), } }, ) return { "success": True, "cover_url": cover_url, "thumbnail_url": thumbnail_url, "duration": duration, "width": width, "height": height, "media_info": media_info, } except HTTPException: raise except Exception as e: return {"success": False, "error": str(e)} finally: for p in (video_path, cover_path, thumb_path): if p.exists(): p.unlink() if Path(tmp_dir).exists(): Path(tmp_dir).rmdir()