""" ClipCraft AI - FastAPI Application AI-powered video clip generator for social media """ import asyncio import json import logging import os import tempfile import urllib.request from datetime import datetime from pathlib import Path from typing import Dict, List, Optional from uuid import uuid4 import numpy as np import requests as _requests from PIL import Image, ImageDraw, ImageFont from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.gzip import GZipMiddleware from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from config import settings from models.schemas import ( Clip, JobResult, JobStatusEnum, ProcessRequest, TranscriptSegment, ) from services.caption_styler import CaptionStyler from services.clip_detector import ClipDetector from services.downloader import VideoDownloader from services.hook_rewriter import HookRewriter from services.transcriber import Transcriber from services.video_editor import VideoEditor # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI( title="ClipCraft AI", description="AI-powered video clip generator for social media", version="1.0.0" ) # Add middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware(GZipMiddleware, minimum_size=1000) # Initialize services downloader = VideoDownloader() transcriber = Transcriber() clip_detector = ClipDetector() video_editor = VideoEditor() caption_styler = CaptionStyler() hook_rewriter = HookRewriter() # In-memory job store jobs_store: Dict[str, JobResult] = {} websocket_connections: Dict[str, List[WebSocket]] = {} # WebSocket Connection Manager class ConnectionManager: """Manage WebSocket connections for real-time updates""" def __init__(self): self.active_connections: Dict[str, List[WebSocket]] = {} async def connect(self, job_id: str, websocket: WebSocket): """Add WebSocket connection""" await websocket.accept() if job_id not in self.active_connections: self.active_connections[job_id] = [] self.active_connections[job_id].append(websocket) async def disconnect(self, job_id: str, websocket: WebSocket): """Remove WebSocket connection""" if job_id in self.active_connections: self.active_connections[job_id].remove(websocket) if not self.active_connections[job_id]: del self.active_connections[job_id] async def broadcast(self, job_id: str, message: dict): """Broadcast message to all connected clients""" if job_id not in self.active_connections: return disconnected = [] for connection in self.active_connections[job_id]: try: await connection.send_json(message) except Exception as e: logger.error(f"Failed to send WebSocket message: {str(e)}") disconnected.append(connection) # Clean up disconnected clients for connection in disconnected: await self.disconnect(job_id, connection) manager = ConnectionManager() # Serve static frontend assets (JS, CSS, images) frontend_dir = Path(__file__).parent / "frontend" / "dist" if frontend_dir.exists(): app.mount("/assets", StaticFiles(directory=frontend_dir / "assets"), name="assets") @app.get("/") async def root(): """Serve frontend index.html""" index_path = frontend_dir / "index.html" if index_path.exists(): return FileResponse(index_path) return HTMLResponse("""
AI-powered video clip generator — frontend building...
""") @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "ok", "version": "1.0.0", "timestamp": datetime.utcnow().isoformat() } @app.post("/api/process") async def process_video( url: Optional[str] = Form(None), file: Optional[UploadFile] = File(None), language: str = Form("auto"), min_clip_duration: int = Form(30), max_clip_duration: int = Form(90), num_clips: int = Form(10), aspect_ratio: str = Form("9:16"), rewrite_hooks: bool = Form(True), caption_style: Optional[str] = Form(None), # BUG FIX: was caption_style_json brand_kit: Optional[str] = Form(None), # BUG FIX: was brand_kit_json logo_file: Optional[UploadFile] = File(None), # brand logo upload cookies_file: Optional[UploadFile] = File(None), # YouTube cookies.txt for datacenter IP bypass ): """ Start a new video processing job Returns: Job ID and initial status """ try: # Validate inputs if not url and not file: raise HTTPException(status_code=400, detail="Either URL or file must be provided") # Generate job ID job_id = str(uuid4()) logger.info(f"Created job {job_id}") # Parse optional JSON configs caption_style_dict = {} brand_kit_dict = {} if caption_style: try: caption_style_dict = json.loads(caption_style) except json.JSONDecodeError: pass if brand_kit: try: brand_kit_dict = json.loads(brand_kit) except json.JSONDecodeError: pass # Save uploaded logo if provided if logo_file and logo_file.filename: import shutil logo_dir = settings.uploads_dir / "logos" logo_dir.mkdir(parents=True, exist_ok=True) logo_path = logo_dir / logo_file.filename contents = await logo_file.read() logo_path.write_bytes(contents) brand_kit_dict["logo_url"] = str(logo_path) # Create request request = ProcessRequest( url=url, language=language, min_clip_duration=min_clip_duration, max_clip_duration=max_clip_duration, num_clips=num_clips, aspect_ratio=aspect_ratio, rewrite_hooks=rewrite_hooks, **caption_style_dict if caption_style_dict else {}, **brand_kit_dict if brand_kit_dict else {} ) # Initialize job result job_result = JobResult( job_id=job_id, status=JobStatusEnum.queued, progress=0, message="Job queued", source_url=url, ) jobs_store[job_id] = job_result # BUG FIX: save uploaded file to disk NOW (before the HTTP request closes) # Passing UploadFile to create_task causes "read of closed file" because # FastAPI closes the file after the endpoint returns. saved_file_path = None if file and file.filename: saved_file_path = settings.uploads_dir / file.filename saved_file_path.parent.mkdir(parents=True, exist_ok=True) saved_file_path.write_bytes(await file.read()) # Resolve cookies path for YouTube datacenter IP bypass. # Priority: 1) uploaded cookies_file 2) YT_COOKIES_PATH env var (HF Secret) resolved_cookies_path: Optional[str] = None env_cookies = os.getenv("YT_COOKIES_PATH") if cookies_file and cookies_file.filename: cookies_dir = settings.uploads_dir / "cookies" cookies_dir.mkdir(parents=True, exist_ok=True) cookies_save_path = cookies_dir / f"{job_id}_cookies.txt" cookies_save_path.write_bytes(await cookies_file.read()) resolved_cookies_path = str(cookies_save_path) logger.info(f"Job {job_id}: Using uploaded cookies file") elif env_cookies and os.path.isfile(env_cookies): resolved_cookies_path = env_cookies logger.info(f"Job {job_id}: Using YT_COOKIES_PATH env cookies") # Start background processing asyncio.create_task(process_video_job(job_id, request, saved_file_path, resolved_cookies_path)) return { "job_id": job_id, "status": JobStatusEnum.queued.value, "progress": 0 } except Exception as e: logger.error(f"Error processing video: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # NOTE: /api/job/{job_id} is now defined further down (after article_jobs store) # to handle both video-clip jobs AND article-reel jobs in one unified endpoint. @app.get("/api/jobs") async def get_recent_jobs(limit: int = 10): """Get list of recent jobs""" jobs = list(jobs_store.values()) jobs_sorted = sorted(jobs, key=lambda x: x.updated_at, reverse=True) return [job.to_dict() for job in jobs_sorted[:limit]] @app.delete("/api/job/{job_id}") async def delete_job(job_id: str): """Delete job and cleanup files""" try: if job_id not in jobs_store: raise HTTPException(status_code=404, detail="Job not found") # Delete job files job_dir = settings.output_dir / job_id if job_dir.exists(): import shutil shutil.rmtree(job_dir) # Remove from store del jobs_store[job_id] return {"status": "deleted", "job_id": job_id} except Exception as e: logger.error(f"Error deleting job: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/clips/{job_id}/{clip_id}/download") async def download_clip(job_id: str, clip_id: str): """Download processed clip video""" try: if job_id not in jobs_store: raise HTTPException(status_code=404, detail="Job not found") job = jobs_store[job_id] clip = next((c for c in job.clips if str(c.id) == clip_id), None) if not clip or not clip.output_path: raise HTTPException(status_code=404, detail="Clip not found") clip_path = Path(clip.output_path) if not clip_path.exists(): raise HTTPException(status_code=404, detail="Clip file not found") return FileResponse( path=clip_path, filename=f"clip_{clip_id}.mp4", media_type="video/mp4" ) except Exception as e: logger.error(f"Error downloading clip: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/clips/{job_id}/{clip_id}/thumbnail") async def get_thumbnail(job_id: str, clip_id: str): """Download clip thumbnail""" try: if job_id not in jobs_store: raise HTTPException(status_code=404, detail="Job not found") job = jobs_store[job_id] clip = next((c for c in job.clips if str(c.id) == clip_id), None) if not clip or not clip.thumbnail_path: raise HTTPException(status_code=404, detail="Thumbnail not found") thumb_path = Path(clip.thumbnail_path) if not thumb_path.exists(): raise HTTPException(status_code=404, detail="Thumbnail file not found") return FileResponse( path=thumb_path, media_type="image/jpeg" ) except Exception as e: logger.error(f"Error downloading thumbnail: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.websocket("/api/ws/{job_id}") async def websocket_endpoint(websocket: WebSocket, job_id: str): """WebSocket endpoint for real-time job progress""" await manager.connect(job_id, websocket) try: while True: # Keep connection alive await websocket.receive_text() except WebSocketDisconnect: await manager.disconnect(job_id, websocket) except Exception as e: logger.error(f"WebSocket error: {str(e)}") await manager.disconnect(job_id, websocket) # Background task for video processing async def process_video_job( job_id: str, request: ProcessRequest, file_path: Optional[Path], cookies_path: Optional[str] = None, ) -> None: """ Background task to process video and generate clips. file_path is a Path (already saved to disk) or None for URL jobs. cookies_path is an optional path to a Netscape-format cookies.txt for YouTube. """ job = jobs_store[job_id] try: # Step 1: Download or extract video logger.info(f"Job {job_id}: Starting download phase") job.status = JobStatusEnum.downloading job.progress = 5 await manager.broadcast(job_id, job.to_dict()) if file_path: # File already saved to disk by endpoint handler video_path = file_path job.source_title = file_path.name else: # Download from URL (pass cookies for YouTube datacenter IP bypass) result = await downloader.download(request.url, settings.uploads_dir, cookies_path=cookies_path) video_path = Path(result["path"]) job.source_title = result["title"] job.duration = result["duration"] logger.info(f"Job {job_id}: Video ready at {video_path}") # Step 2: Extract audio and transcribe logger.info(f"Job {job_id}: Starting transcription phase") job.status = JobStatusEnum.transcribing job.progress = 20 await manager.broadcast(job_id, job.to_dict()) audio_path = await downloader.extract_audio(video_path, settings.uploads_dir) segments: List[TranscriptSegment] = await transcriber.transcribe( audio_path, language=request.language ) job.language = request.language if request.language != "auto" else transcriber.detect_language(audio_path) logger.info(f"Job {job_id}: Transcription complete with {len(segments)} segments") # Step 3: Detect clips using AI logger.info(f"Job {job_id}: Starting clip detection phase") job.status = JobStatusEnum.analyzing job.progress = 40 await manager.broadcast(job_id, job.to_dict()) clips = await clip_detector.detect_clips(segments, request) for clip in clips: clip.job_id = job_id logger.info(f"Job {job_id}: Detected {len(clips)} clips") # Step 4: Rewrite hooks if request.rewrite_hooks: logger.info(f"Job {job_id}: Rewriting hooks") for clip in clips: rewritten = await hook_rewriter.rewrite(clip) if rewritten: clip.rewritten_hook = rewritten # Step 5: Edit videos logger.info(f"Job {job_id}: Starting video editing phase") job.status = JobStatusEnum.editing job.progress = 60 job.clips = clips await manager.broadcast(job_id, job.to_dict()) clips = await video_editor.process_clips(video_path, clips, request) # Step 6: Add captions logger.info(f"Job {job_id}: Starting caption phase") job.status = JobStatusEnum.captioning job.progress = 80 await manager.broadcast(job_id, job.to_dict()) clips = await caption_styler.add_captions(clips, request.caption_style, request.brand_kit) # Mark successful clips for clip in clips: if clip.output_path: clip.status = JobStatusEnum.completed job.clips = clips # Completion logger.info(f"Job {job_id}: Processing complete") job.status = JobStatusEnum.completed job.progress = 100 job.message = f"Successfully generated {len(clips)} clips" job.updated_at = datetime.utcnow() await manager.broadcast(job_id, job.to_dict()) except Exception as e: logger.error(f"Job {job_id} failed: {str(e)}") job.status = JobStatusEnum.failed job.progress = 0 job.message = "" job.error = str(e) job.updated_at = datetime.utcnow() await manager.broadcast(job_id, job.to_dict()) # ───────────────────────────────────────────────────────────────────────────── # ARTICLE → REELS PIPELINE # All heavy imports are lazy so server startup stays fast. # ───────────────────────────────────────────────────────────────────────────── FONT_BOLD_PATH = "/tmp/Montserrat-Bold.ttf" FONT_LIGHT_PATH = "/tmp/Montserrat-Light.ttf" FONT_URL = "https://github.com/google/fonts/raw/main/ofl/montserrat/Montserrat-Bold.ttf" VIDEO_W, VIDEO_H, FPS = 720, 1280, 24 _reel_fonts: dict = {} EDGE_VOICES_MAP = { "Guy - News Anchor (Male)": "en-US-GuyNeural", "Davis (Male)": "en-US-DavisNeural", "Ryan (Male)": "en-GB-RyanNeural", "William (Male)": "en-AU-WilliamNeural", "Aria (Female)": "en-US-AriaNeural", "Jenny (Female)": "en-US-JennyNeural", "Sonia (Female)": "en-GB-SoniaNeural", "Natasha (Female)": "en-AU-NatashaNeural", } REEL_PROMPTS = { "Financial News Reel": ( "You are a Bloomberg/WSJ-style breaking-news anchor writer.\n" "Create a fast-paced, professional financial-news reel script — urgent teaser style.\n" "Rules:\n" " 1. Line 1 MUST be a powerful shocking hook that stops the scroll.\n" " 2. Give only the high-level gist. Withhold specifics so viewers click through.\n" " 3. Tone: authoritative, slightly shocked, premium financial-media.\n" " 4. Write exactly {n} lines. Each line: 13-16 words, ~6 seconds spoken.\n" " 5. No bullet points, no numbering, no emojis — pure spoken sentences only.\n" " 6. FINAL LINE MUST BE: 'Read the full story at chainstreet dot i o.'\n" "Output ONLY the {n} lines, one per line, nothing else." ), "Top 5": ( "Create a viral 'Top 5' reel script. Hook sentence, then 5 items (5. … 1. …).\n" "Each item: one punchy sentence, 10-15 words.\n" "FINAL LINE: 'Get the full breakdown at chainstreet dot i o.'\n" "Output ONLY {n} lines, one per line, no extra text." ), "Fact": ( "Create a FACTS reel script. Lead with the most shocking fact as a hook.\n" "Each of {n} lines: one amazing fact, 10-15 words.\n" "FINAL LINE: 'Read more at chainstreet dot i o.'\n" "Output ONLY {n} lines, one per line." ), "Custom Prompt": ( "You are a senior content writer. Create a punchy short-form video script (max 45s) for this content.\n" "Structure: strong hook, conflict, key insight, stakes, rhetorical question, CTA.\n" "Write exactly {n} lines, each 8-15 words. " "FINAL LINE must end with 'chainstreet dot i o'.\n" "Output ONLY {n} lines, one per line, no extra text." ), } for _rt in [ "Ranking", "Step by Step Guide", "Statistics", "Quiz", "Famous Quotes", "Product Demo", "Joke", "Blog to Reel", ]: REEL_PROMPTS[_rt] = ( f"Create a {_rt} reel script from the content below.\n" "Each line: 10-15 words, punchy. FINAL LINE: 'Read the full story at chainstreet dot i o.'\n" "Write exactly {n} lines, one per line, no extra text." ) def _get_reel_font(size: int = 64): key = f"s{size}" if key not in _reel_fonts: if not os.path.exists(FONT_BOLD_PATH): try: with urllib.request.urlopen(FONT_URL, timeout=10) as r: Path(FONT_BOLD_PATH).write_bytes(r.read()) except Exception: pass # Try Montserrat first, then system fonts guaranteed by Dockerfile for font_path in [ FONT_BOLD_PATH, "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", ]: try: _reel_fonts[key] = ImageFont.truetype(font_path, size) break except Exception: continue else: _reel_fonts[key] = ImageFont.load_default() return _reel_fonts[key] def _scrape_url(url: str) -> str: try: import trafilatura dl = trafilatura.fetch_url(url) text = trafilatura.extract(dl, include_tables=False, include_comments=False, favor_recall=True) return (text or "").strip() except Exception as e: logger.warning(f"[scrape] {e}") return "" def _generate_reel_script(content: str, groq_key: str, reel_type: str, num_points: int) -> list: import random if not groq_key.strip(): groq_key = os.getenv("GROQ_API_KEY", "") if not groq_key: raise ValueError("Groq API key required for article reels.") from groq import Groq client = Groq(api_key=groq_key.strip()) template = REEL_PROMPTS.get(reel_type, REEL_PROMPTS["Custom Prompt"]) system = template.format(n=num_points) resp = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[ {"role": "system", "content": system}, {"role": "user", "content": f"Content:\n{content[:3500]}"}, ], temperature=0.78, max_tokens=600, ) raw = resp.choices[0].message.content.strip() lines = [l.strip().lstrip("•-*0123456789. ") for l in raw.splitlines() if l.strip()] lines = lines[:num_points] # Replace last line with a random CTA variation (avoids monotony across reels) if lines: lines[-1] = random.choice(CTA_POOL) return lines async def _edge_tts_save(text: str, voice: str, path: str): import edge_tts await edge_tts.Communicate(text, voice).save(path) def _generate_voice(text: str, voice_display: str) -> str: text = text.replace("chainstreet.io", "chainstreet dot i o") voice = EDGE_VOICES_MAP.get(voice_display, "en-US-GuyNeural") out = tempfile.mktemp(suffix=".mp3") for attempt in range(2): try: asyncio.run(_edge_tts_save(text, voice, out)) return out except Exception as e: logger.warning(f"[TTS] attempt {attempt+1}: {e}") voice = "en-US-AriaNeural" raise RuntimeError("TTS failed") CTA_POOL = [ "Read the full story at chainstreet dot i o.", "Get the complete breakdown at chainstreet dot i o.", "All the details are live at chainstreet dot i o.", "Don't miss the full report — chainstreet dot i o.", "Follow the full story only at chainstreet dot i o.", "Catch every detail at chainstreet dot i o.", "Dive deeper at chainstreet dot i o.", "Your source for this story: chainstreet dot i o.", "More context and analysis at chainstreet dot i o.", "Stay ahead — read the full piece at chainstreet dot i o.", ] def _render_slide(text: str, W: int, H: int, accent_rgb: tuple, slide_idx: int, total_slides: int) -> np.ndarray: """Render a complete full-frame RGB slide. All compositing done in PIL — MoviePy never sees RGBA.""" ar, ag, ab = accent_rgb # ── 1. Dark gradient background via numpy (fast, no brown) ───────────── rows = np.arange(H, dtype=np.float32) / H # 0 → 1 top-to-bottom bg = np.zeros((H, W, 4), dtype=np.uint8) bg[:, :, 0] = np.clip(8 + (rows * min(ar // 3, 38))[:, None], 0, 25).astype(np.uint8) bg[:, :, 1] = np.clip(5 + (rows * min(ag // 6, 18))[:, None], 0, 18).astype(np.uint8) bg[:, :, 2] = np.clip(18 + (rows * min(ab // 4, 52))[:, None], 0, 60).astype(np.uint8) bg[:, :, 3] = 255 canvas = Image.fromarray(bg, mode="RGBA") # ── 2. Text layout (word-wrap) ────────────────────────────────────────── font = _get_reel_font(62) font_sm = _get_reel_font(26) fsize = getattr(font, "size", 62) tmp_d = ImageDraw.Draw(Image.new("RGBA", (10, 10))) words, wrapped, cur = text.split(), [], [] for word in words: test = " ".join(cur + [word]) try: bb = tmp_d.textbbox((0, 0), test, font=font) w = bb[2] - bb[0] except Exception: w = len(test) * (fsize // 2) if w > W * 0.80 and cur: wrapped.append(" ".join(cur)); cur = [word] else: cur.append(word) if cur: wrapped.append(" ".join(cur)) line_h = int(fsize * 1.40) total_text_h = len(wrapped) * line_h pad_y = 44 cy = H // 2 - 30 # slightly above true centre card_top = cy - total_text_h // 2 - pad_y card_bot = cy + total_text_h // 2 + pad_y # ── 3. Frosted glass card (PIL alpha_composite — correct & reliable) ─── card = Image.new("RGBA", (W, H), (0, 0, 0, 0)) cd = ImageDraw.Draw(card) cd.rounded_rectangle([30, card_top, W - 30, card_bot], radius=26, fill=(20, 14, 42, 210)) cd.rounded_rectangle([30, card_top, 42, card_bot], radius=5, fill=(*accent_rgb, 230)) cd.rounded_rectangle([30, card_top, W - 30, card_bot], radius=26, outline=(255, 255, 255, 35), width=1) canvas = Image.alpha_composite(canvas, card) draw = ImageDraw.Draw(canvas) # ── 4. Text with drop-shadow ──────────────────────────────────────────── ty = cy - total_text_h // 2 for line in wrapped: try: bb = draw.textbbox((0, 0), line, font=font) tw = bb[2] - bb[0] except Exception: tw = len(line) * (fsize // 2) tx = (W - tw) // 2 draw.text((tx + 2, ty + 2), line, font=font, fill=(0, 0, 0, 170)) # shadow draw.text((tx, ty), line, font=font, fill=(255, 255, 255, 255)) ty += line_h # ── 5. Progress dots ──────────────────────────────────────────────────── dot_r, dot_gap = 5, 20 total_dots_w = total_slides * dot_gap dx = (W - total_dots_w) // 2 + dot_r dy = H - 68 for i in range(total_slides): r = dot_r if i == slide_idx else dot_r - 2 fill = (*accent_rgb, 255) if i == slide_idx else (255, 255, 255, 55) draw.ellipse([dx - r, dy - r, dx + r, dy + r], fill=fill) dx += dot_gap # ── 6. Slide counter (top-right, subtle) ──────────────────────────────── ctr = f"{slide_idx + 1} / {total_slides}" try: cb = draw.textbbox((0, 0), ctr, font=font_sm) draw.text((W - (cb[2] - cb[0]) - 26, 26), ctr, font=font_sm, fill=(255, 255, 255, 85)) except Exception: pass # ── 7. Bake to RGB (no RGBA handed to MoviePy) ────────────────────────── return np.array(canvas.convert("RGB")) def _render_endcard(W: int, H: int, accent_rgb: tuple) -> np.ndarray: """Premium chainstreet.io endcard slide.""" ar, ag, ab = accent_rgb rows = np.arange(H, dtype=np.float32) / H bg = np.zeros((H, W, 4), dtype=np.uint8) bg[:, :, 0] = np.clip(10 + (rows * min(ar // 2, 50))[:, None], 0, 60).astype(np.uint8) bg[:, :, 1] = np.clip(8 + (rows * min(ag // 4, 25))[:, None], 0, 30).astype(np.uint8) bg[:, :, 2] = np.clip(24 + (rows * min(ab // 3, 65))[:, None], 0, 90).astype(np.uint8) bg[:, :, 3] = 255 canvas = Image.fromarray(bg, mode="RGBA") # Radial glow glow = Image.new("RGBA", (W, H), (0, 0, 0, 0)) gd = ImageDraw.Draw(glow) cx, cy = W // 2, H // 2 - 100 for radius in range(260, 0, -14): alpha = int(35 * (260 - radius) / 260) gd.ellipse([cx - radius, cy - radius, cx + radius, cy + radius], fill=(*accent_rgb, alpha)) canvas = Image.alpha_composite(canvas, glow) draw = ImageDraw.Draw(canvas) font_big = _get_reel_font(86) font_med = _get_reel_font(44) font_sm = _get_reel_font(28) fsize_b = getattr(font_big, "size", 86) # "chainstreet" + ".io" (accent) brand, tld = "chainstreet", ".io" try: bb1 = draw.textbbox((0, 0), brand, font=font_big) bb2 = draw.textbbox((0, 0), tld, font=font_big) bw, tiw = bb1[2] - bb1[0], bb2[2] - bb2[0] except Exception: bw, tiw = len(brand) * (fsize_b // 2), len(tld) * (fsize_b // 2) total_w = bw + tiw tx = (W - total_w) // 2 ty = H // 2 - fsize_b - 20 draw.text((tx + 2, ty + 2), brand, font=font_big, fill=(0, 0, 0, 150)) draw.text((tx, ty), brand, font=font_big, fill=(255, 255, 255, 255)) draw.text((tx + bw + 2, ty + 2), tld, font=font_big, fill=(0, 0, 0, 150)) draw.text((tx + bw, ty), tld, font=font_big, fill=(*accent_rgb, 255)) # Subtitle sub = "Your daily crypto briefing" try: sb = draw.textbbox((0, 0), sub, font=font_sm) draw.text(((W - (sb[2] - sb[0])) // 2, H // 2 + 30), sub, font=font_sm, fill=(255, 255, 255, 160)) except Exception: pass # CTA pill button cta = "Follow for more ↗" try: cb = draw.textbbox((0, 0), cta, font=font_med) cw, ch = cb[2] - cb[0], cb[3] - cb[1] except Exception: cw, ch = len(cta) * 22, 44 bx1, by1 = (W - cw) // 2 - 32, H // 2 + 100 bx2, by2 = bx1 + cw + 64, by1 + ch + 28 btn = Image.new("RGBA", (W, H), (0, 0, 0, 0)) bd = ImageDraw.Draw(btn) bd.rounded_rectangle([bx1, by1, bx2, by2], radius=50, fill=(*accent_rgb, 255)) canvas = Image.alpha_composite(canvas, btn) draw = ImageDraw.Draw(canvas) draw.text(((W - cw) // 2, by1 + 14), cta, font=font_med, fill=(255, 255, 255, 255)) return np.array(canvas.convert("RGB")) def _build_article_reel(sentences: list, audio_path: str, accent_hex: str) -> str: """Build article reel using concatenated full-RGB slides — no MoviePy alpha compositing.""" import moviepy.editor as mpe W, H = VIDEO_W, VIDEO_H try: accent_rgb = tuple(int(accent_hex.lstrip("#")[i:i+2], 16) for i in (0, 2, 4)) except Exception: accent_rgb = (108, 99, 255) audio = mpe.AudioFileClip(audio_path) total_dur = audio.duration n = len(sentences) # Sentence slides get 92% of audio; endcard gets rest (min 2s) endcard_dur = max(total_dur * 0.08, 2.0) dur_each = (total_dur - endcard_dur) / n if n > 0 else total_dur clips = [] for i, sentence in enumerate(sentences): frame = _render_slide(sentence, W, H, accent_rgb, i, n) clips.append(mpe.ImageClip(frame).set_duration(dur_each)) # Endcard endcard_frame = _render_endcard(W, H, accent_rgb) clips.append(mpe.ImageClip(endcard_frame).set_duration(endcard_dur)) # Concatenate — simple, reliable, no alpha issues final = mpe.concatenate_videoclips(clips, method="chain").set_audio(audio) out = tempfile.mktemp(suffix=".mp4") final.write_videofile( out, codec="libx264", audio_codec="aac", fps=FPS, preset="ultrafast", threads=4, logger=None, ) return out # Article Reels job store article_jobs: dict = {} @app.post("/api/article-reels") async def api_article_reels( background_tasks: BackgroundTasks, article_url: str = Form(...), reel_type: str = Form("Financial News Reel"), num_points: int = Form(7), tts_voice: str = Form("Guy - News Anchor (Male)"), accent_hex: str = Form("#6C63FF"), groq_api_key: str = Form(""), ): """Generate a viral reel from a news article URL with chainstreet.io CTA.""" if not article_url.strip(): raise HTTPException(status_code=400, detail="article_url is required") job_id = str(uuid4()) article_jobs[job_id] = { "job_id": job_id, "status": "processing", "progress": 0, "message": "Starting…", "source_url": article_url, "output_path": None, "error": None, } background_tasks.add_task( _run_article_reel_job, job_id, article_url, groq_api_key, reel_type, num_points, tts_voice, accent_hex, ) return {"job_id": job_id, "status": "processing", "progress": 0} @app.get("/api/job/{job_id}") async def get_job_by_id(job_id: str): """Unified job status — checks both video-clip jobs and article-reel jobs.""" if job_id in jobs_store: return jobs_store[job_id].to_dict() if job_id in article_jobs: return article_jobs[job_id] raise HTTPException(status_code=404, detail="Job not found") @app.get("/api/article-download/{job_id}") async def download_article_reel(job_id: str): if job_id not in article_jobs: raise HTTPException(status_code=404, detail="Job not found") job = article_jobs[job_id] if job["status"] != "completed" or not job.get("output_path"): raise HTTPException(status_code=400, detail="Reel not ready") path = Path(job["output_path"]) if not path.exists(): raise HTTPException(status_code=404, detail="Reel file missing") return FileResponse(path, filename=f"article_reel_{job_id}.mp4", media_type="video/mp4") def _run_article_reel_job( job_id, article_url, groq_api_key, reel_type, num_points, tts_voice, accent_hex, ): def upd(pct, msg): article_jobs[job_id].update({"progress": int(pct * 100), "message": msg, "status": "processing"}) try: upd(0.05, "Scraping article…") content = _scrape_url(article_url) if not content or len(content) < 40: raise ValueError("Could not extract text from the URL. Try pasting content directly.") upd(0.20, "Generating script with AI…") sentences = _generate_reel_script(content, groq_api_key, reel_type, num_points) if not sentences: raise ValueError("Script generation failed — check your Groq API key.") upd(0.45, "Generating AI voiceover…") full_text = " ".join(sentences) audio_path = _generate_voice(full_text, tts_voice) upd(0.70, "Assembling reel…") reel_path = _build_article_reel(sentences, audio_path, accent_hex) article_jobs[job_id].update({ "status": "completed", "progress": 100, "message": "Reel ready!", "output_path": reel_path, "sentences": sentences, }) except Exception as e: import traceback logger.error(traceback.format_exc()) article_jobs[job_id].update({"status": "failed", "error": str(e), "message": str(e)}) @app.exception_handler(Exception) async def general_exception_handler(request, exc): """Global exception handler""" logger.error(f"Unhandled exception: {str(exc)}") return HTTPException(status_code=500, detail="Internal server error") # ───────────────────────────────────────────────────────────────────────────── # CATCH‑ALL SPA FALLBACK – must be last route # ───────────────────────────────────────────────────────────────────────────── @app.get("/{full_path:path}") async def spa_fallback(full_path: str): """SPA catch‑all — serves index.html for React Router, but ignores API routes.""" # Do NOT interfere with API endpoints if full_path.startswith("api/") or full_path.startswith("health") or full_path.startswith("docs"): raise HTTPException(status_code=404, detail="Not found") index_path = frontend_dir / "index.html" if index_path.exists(): return FileResponse(index_path) raise HTTPException(status_code=404, detail="Frontend not built") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860, workers=1)