| """ |
| ClipCraft AI - FastAPI Application |
| AI-powered video clip generator for social media |
| """ |
|
|
| import asyncio |
| import json |
| import logging |
| import os |
| import tempfile |
| import urllib.request |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Dict, List, Optional |
| from uuid import uuid4 |
|
|
| import numpy as np |
| import requests as _requests |
| from PIL import Image, ImageDraw, ImageFont |
|
|
| from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect |
| from fastapi.middleware.cors import CORSMiddleware |
| from starlette.middleware.gzip import GZipMiddleware |
| from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
|
|
| from config import settings |
| from models.schemas import ( |
| Clip, |
| JobResult, |
| JobStatusEnum, |
| ProcessRequest, |
| TranscriptSegment, |
| ) |
| from services.caption_styler import CaptionStyler |
| from services.clip_detector import ClipDetector |
| from services.downloader import VideoDownloader |
| from services.hook_rewriter import HookRewriter |
| from services.transcriber import Transcriber |
| from services.video_editor import VideoEditor |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| app = FastAPI( |
| title="ClipCraft AI", |
| description="AI-powered video clip generator for social media", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
| app.add_middleware(GZipMiddleware, minimum_size=1000) |
|
|
| |
| downloader = VideoDownloader() |
| transcriber = Transcriber() |
| clip_detector = ClipDetector() |
| video_editor = VideoEditor() |
| caption_styler = CaptionStyler() |
| hook_rewriter = HookRewriter() |
|
|
| |
| jobs_store: Dict[str, JobResult] = {} |
| websocket_connections: Dict[str, List[WebSocket]] = {} |
|
|
|
|
| |
| class ConnectionManager: |
| """Manage WebSocket connections for real-time updates""" |
|
|
| def __init__(self): |
| self.active_connections: Dict[str, List[WebSocket]] = {} |
|
|
| async def connect(self, job_id: str, websocket: WebSocket): |
| """Add WebSocket connection""" |
| await websocket.accept() |
| if job_id not in self.active_connections: |
| self.active_connections[job_id] = [] |
| self.active_connections[job_id].append(websocket) |
|
|
| async def disconnect(self, job_id: str, websocket: WebSocket): |
| """Remove WebSocket connection""" |
| if job_id in self.active_connections: |
| self.active_connections[job_id].remove(websocket) |
| if not self.active_connections[job_id]: |
| del self.active_connections[job_id] |
|
|
| async def broadcast(self, job_id: str, message: dict): |
| """Broadcast message to all connected clients""" |
| if job_id not in self.active_connections: |
| return |
|
|
| disconnected = [] |
| for connection in self.active_connections[job_id]: |
| try: |
| await connection.send_json(message) |
| except Exception as e: |
| logger.error(f"Failed to send WebSocket message: {str(e)}") |
| disconnected.append(connection) |
|
|
| |
| for connection in disconnected: |
| await self.disconnect(job_id, connection) |
|
|
|
|
| manager = ConnectionManager() |
|
|
|
|
| |
| frontend_dir = Path(__file__).parent / "frontend" / "dist" |
| if frontend_dir.exists(): |
| app.mount("/assets", StaticFiles(directory=frontend_dir / "assets"), name="assets") |
|
|
|
|
| @app.get("/") |
| async def root(): |
| """Serve frontend index.html""" |
| index_path = frontend_dir / "index.html" |
| if index_path.exists(): |
| return FileResponse(index_path) |
| return HTMLResponse(""" |
| <html> |
| <head><title>ClipCraft AI</title> |
| <style>body{background:#0A0A0F;color:#fff;font-family:Inter,sans-serif;display:flex;align-items:center;justify-content:center;height:100vh;flex-direction:column;gap:16px}</style> |
| </head> |
| <body> |
| <h1 style="background:linear-gradient(135deg,#6C63FF,#FF6B6B);-webkit-background-clip:text;-webkit-text-fill-color:transparent;font-size:2.5rem">ClipCraft AI</h1> |
| <p style="color:#888">AI-powered video clip generator β frontend building...</p> |
| </body> |
| </html> |
| """) |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint""" |
| return { |
| "status": "ok", |
| "version": "1.0.0", |
| "timestamp": datetime.utcnow().isoformat() |
| } |
|
|
|
|
| @app.post("/api/process") |
| async def process_video( |
| url: Optional[str] = Form(None), |
| file: Optional[UploadFile] = File(None), |
| language: str = Form("auto"), |
| min_clip_duration: int = Form(30), |
| max_clip_duration: int = Form(90), |
| num_clips: int = Form(10), |
| aspect_ratio: str = Form("9:16"), |
| rewrite_hooks: bool = Form(True), |
| caption_style: Optional[str] = Form(None), |
| brand_kit: Optional[str] = Form(None), |
| logo_file: Optional[UploadFile] = File(None), |
| cookies_file: Optional[UploadFile] = File(None), |
| ): |
| """ |
| Start a new video processing job |
| |
| Returns: |
| Job ID and initial status |
| """ |
| try: |
| |
| if not url and not file: |
| raise HTTPException(status_code=400, detail="Either URL or file must be provided") |
|
|
| |
| job_id = str(uuid4()) |
| logger.info(f"Created job {job_id}") |
|
|
| |
| caption_style_dict = {} |
| brand_kit_dict = {} |
| if caption_style: |
| try: |
| caption_style_dict = json.loads(caption_style) |
| except json.JSONDecodeError: |
| pass |
|
|
| if brand_kit: |
| try: |
| brand_kit_dict = json.loads(brand_kit) |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| if logo_file and logo_file.filename: |
| import shutil |
| logo_dir = settings.uploads_dir / "logos" |
| logo_dir.mkdir(parents=True, exist_ok=True) |
| logo_path = logo_dir / logo_file.filename |
| contents = await logo_file.read() |
| logo_path.write_bytes(contents) |
| brand_kit_dict["logo_url"] = str(logo_path) |
|
|
| |
| request = ProcessRequest( |
| url=url, |
| language=language, |
| min_clip_duration=min_clip_duration, |
| max_clip_duration=max_clip_duration, |
| num_clips=num_clips, |
| aspect_ratio=aspect_ratio, |
| rewrite_hooks=rewrite_hooks, |
| **caption_style_dict if caption_style_dict else {}, |
| **brand_kit_dict if brand_kit_dict else {} |
| ) |
|
|
| |
| job_result = JobResult( |
| job_id=job_id, |
| status=JobStatusEnum.queued, |
| progress=0, |
| message="Job queued", |
| source_url=url, |
| ) |
| jobs_store[job_id] = job_result |
|
|
| |
| |
| |
| saved_file_path = None |
| if file and file.filename: |
| saved_file_path = settings.uploads_dir / file.filename |
| saved_file_path.parent.mkdir(parents=True, exist_ok=True) |
| saved_file_path.write_bytes(await file.read()) |
|
|
| |
| |
| resolved_cookies_path: Optional[str] = None |
| env_cookies = os.getenv("YT_COOKIES_PATH") |
| if cookies_file and cookies_file.filename: |
| cookies_dir = settings.uploads_dir / "cookies" |
| cookies_dir.mkdir(parents=True, exist_ok=True) |
| cookies_save_path = cookies_dir / f"{job_id}_cookies.txt" |
| cookies_save_path.write_bytes(await cookies_file.read()) |
| resolved_cookies_path = str(cookies_save_path) |
| logger.info(f"Job {job_id}: Using uploaded cookies file") |
| elif env_cookies and os.path.isfile(env_cookies): |
| resolved_cookies_path = env_cookies |
| logger.info(f"Job {job_id}: Using YT_COOKIES_PATH env cookies") |
|
|
| |
| asyncio.create_task(process_video_job(job_id, request, saved_file_path, resolved_cookies_path)) |
|
|
| return { |
| "job_id": job_id, |
| "status": JobStatusEnum.queued.value, |
| "progress": 0 |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error processing video: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
| |
|
|
|
|
| @app.get("/api/jobs") |
| async def get_recent_jobs(limit: int = 10): |
| """Get list of recent jobs""" |
| jobs = list(jobs_store.values()) |
| jobs_sorted = sorted(jobs, key=lambda x: x.updated_at, reverse=True) |
| return [job.to_dict() for job in jobs_sorted[:limit]] |
|
|
|
|
| @app.delete("/api/job/{job_id}") |
| async def delete_job(job_id: str): |
| """Delete job and cleanup files""" |
| try: |
| if job_id not in jobs_store: |
| raise HTTPException(status_code=404, detail="Job not found") |
|
|
| |
| job_dir = settings.output_dir / job_id |
| if job_dir.exists(): |
| import shutil |
| shutil.rmtree(job_dir) |
|
|
| |
| del jobs_store[job_id] |
|
|
| return {"status": "deleted", "job_id": job_id} |
|
|
| except Exception as e: |
| logger.error(f"Error deleting job: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.get("/api/clips/{job_id}/{clip_id}/download") |
| async def download_clip(job_id: str, clip_id: str): |
| """Download processed clip video""" |
| try: |
| if job_id not in jobs_store: |
| raise HTTPException(status_code=404, detail="Job not found") |
|
|
| job = jobs_store[job_id] |
| clip = next((c for c in job.clips if str(c.id) == clip_id), None) |
|
|
| if not clip or not clip.output_path: |
| raise HTTPException(status_code=404, detail="Clip not found") |
|
|
| clip_path = Path(clip.output_path) |
| if not clip_path.exists(): |
| raise HTTPException(status_code=404, detail="Clip file not found") |
|
|
| return FileResponse( |
| path=clip_path, |
| filename=f"clip_{clip_id}.mp4", |
| media_type="video/mp4" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error downloading clip: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.get("/api/clips/{job_id}/{clip_id}/thumbnail") |
| async def get_thumbnail(job_id: str, clip_id: str): |
| """Download clip thumbnail""" |
| try: |
| if job_id not in jobs_store: |
| raise HTTPException(status_code=404, detail="Job not found") |
|
|
| job = jobs_store[job_id] |
| clip = next((c for c in job.clips if str(c.id) == clip_id), None) |
|
|
| if not clip or not clip.thumbnail_path: |
| raise HTTPException(status_code=404, detail="Thumbnail not found") |
|
|
| thumb_path = Path(clip.thumbnail_path) |
| if not thumb_path.exists(): |
| raise HTTPException(status_code=404, detail="Thumbnail file not found") |
|
|
| return FileResponse( |
| path=thumb_path, |
| media_type="image/jpeg" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error downloading thumbnail: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.websocket("/api/ws/{job_id}") |
| async def websocket_endpoint(websocket: WebSocket, job_id: str): |
| """WebSocket endpoint for real-time job progress""" |
| await manager.connect(job_id, websocket) |
|
|
| try: |
| while True: |
| |
| await websocket.receive_text() |
| except WebSocketDisconnect: |
| await manager.disconnect(job_id, websocket) |
| except Exception as e: |
| logger.error(f"WebSocket error: {str(e)}") |
| await manager.disconnect(job_id, websocket) |
|
|
|
|
| |
| async def process_video_job( |
| job_id: str, |
| request: ProcessRequest, |
| file_path: Optional[Path], |
| cookies_path: Optional[str] = None, |
| ) -> None: |
| """ |
| Background task to process video and generate clips. |
| file_path is a Path (already saved to disk) or None for URL jobs. |
| cookies_path is an optional path to a Netscape-format cookies.txt for YouTube. |
| """ |
| job = jobs_store[job_id] |
|
|
| try: |
| |
| logger.info(f"Job {job_id}: Starting download phase") |
| job.status = JobStatusEnum.downloading |
| job.progress = 5 |
| await manager.broadcast(job_id, job.to_dict()) |
|
|
| if file_path: |
| |
| video_path = file_path |
| job.source_title = file_path.name |
| else: |
| |
| result = await downloader.download(request.url, settings.uploads_dir, cookies_path=cookies_path) |
| video_path = Path(result["path"]) |
| job.source_title = result["title"] |
| job.duration = result["duration"] |
|
|
| logger.info(f"Job {job_id}: Video ready at {video_path}") |
|
|
| |
| logger.info(f"Job {job_id}: Starting transcription phase") |
| job.status = JobStatusEnum.transcribing |
| job.progress = 20 |
| await manager.broadcast(job_id, job.to_dict()) |
|
|
| audio_path = await downloader.extract_audio(video_path, settings.uploads_dir) |
| segments: List[TranscriptSegment] = await transcriber.transcribe( |
| audio_path, |
| language=request.language |
| ) |
| job.language = request.language if request.language != "auto" else transcriber.detect_language(audio_path) |
|
|
| logger.info(f"Job {job_id}: Transcription complete with {len(segments)} segments") |
|
|
| |
| logger.info(f"Job {job_id}: Starting clip detection phase") |
| job.status = JobStatusEnum.analyzing |
| job.progress = 40 |
| await manager.broadcast(job_id, job.to_dict()) |
|
|
| clips = await clip_detector.detect_clips(segments, request) |
| for clip in clips: |
| clip.job_id = job_id |
|
|
| logger.info(f"Job {job_id}: Detected {len(clips)} clips") |
|
|
| |
| if request.rewrite_hooks: |
| logger.info(f"Job {job_id}: Rewriting hooks") |
| for clip in clips: |
| rewritten = await hook_rewriter.rewrite(clip) |
| if rewritten: |
| clip.rewritten_hook = rewritten |
|
|
| |
| logger.info(f"Job {job_id}: Starting video editing phase") |
| job.status = JobStatusEnum.editing |
| job.progress = 60 |
| job.clips = clips |
| await manager.broadcast(job_id, job.to_dict()) |
|
|
| clips = await video_editor.process_clips(video_path, clips, request) |
|
|
| |
| logger.info(f"Job {job_id}: Starting caption phase") |
| job.status = JobStatusEnum.captioning |
| job.progress = 80 |
| await manager.broadcast(job_id, job.to_dict()) |
|
|
| clips = await caption_styler.add_captions(clips, request.caption_style, request.brand_kit) |
|
|
| |
| for clip in clips: |
| if clip.output_path: |
| clip.status = JobStatusEnum.completed |
|
|
| job.clips = clips |
|
|
| |
| logger.info(f"Job {job_id}: Processing complete") |
| job.status = JobStatusEnum.completed |
| job.progress = 100 |
| job.message = f"Successfully generated {len(clips)} clips" |
| job.updated_at = datetime.utcnow() |
|
|
| await manager.broadcast(job_id, job.to_dict()) |
|
|
| except Exception as e: |
| logger.error(f"Job {job_id} failed: {str(e)}") |
| job.status = JobStatusEnum.failed |
| job.progress = 0 |
| job.message = "" |
| job.error = str(e) |
| job.updated_at = datetime.utcnow() |
|
|
| await manager.broadcast(job_id, job.to_dict()) |
|
|
|
|
| |
| |
| |
| |
|
|
| FONT_BOLD_PATH = "/tmp/Montserrat-Bold.ttf" |
| FONT_LIGHT_PATH = "/tmp/Montserrat-Light.ttf" |
| FONT_URL = "https://github.com/google/fonts/raw/main/ofl/montserrat/Montserrat-Bold.ttf" |
| VIDEO_W, VIDEO_H, FPS = 720, 1280, 24 |
|
|
| _reel_fonts: dict = {} |
|
|
| EDGE_VOICES_MAP = { |
| "Guy - News Anchor (Male)": "en-US-GuyNeural", |
| "Davis (Male)": "en-US-DavisNeural", |
| "Ryan (Male)": "en-GB-RyanNeural", |
| "William (Male)": "en-AU-WilliamNeural", |
| "Aria (Female)": "en-US-AriaNeural", |
| "Jenny (Female)": "en-US-JennyNeural", |
| "Sonia (Female)": "en-GB-SoniaNeural", |
| "Natasha (Female)": "en-AU-NatashaNeural", |
| } |
|
|
| REEL_PROMPTS = { |
| "Financial News Reel": ( |
| "You are a Bloomberg/WSJ-style breaking-news anchor writer.\n" |
| "Create a fast-paced, professional financial-news reel script β urgent teaser style.\n" |
| "Rules:\n" |
| " 1. Line 1 MUST be a powerful shocking hook that stops the scroll.\n" |
| " 2. Give only the high-level gist. Withhold specifics so viewers click through.\n" |
| " 3. Tone: authoritative, slightly shocked, premium financial-media.\n" |
| " 4. Write exactly {n} lines. Each line: 13-16 words, ~6 seconds spoken.\n" |
| " 5. No bullet points, no numbering, no emojis β pure spoken sentences only.\n" |
| " 6. FINAL LINE MUST BE: 'Read the full story at chainstreet dot i o.'\n" |
| "Output ONLY the {n} lines, one per line, nothing else." |
| ), |
| "Top 5": ( |
| "Create a viral 'Top 5' reel script. Hook sentence, then 5 items (5. β¦ 1. β¦).\n" |
| "Each item: one punchy sentence, 10-15 words.\n" |
| "FINAL LINE: 'Get the full breakdown at chainstreet dot i o.'\n" |
| "Output ONLY {n} lines, one per line, no extra text." |
| ), |
| "Fact": ( |
| "Create a FACTS reel script. Lead with the most shocking fact as a hook.\n" |
| "Each of {n} lines: one amazing fact, 10-15 words.\n" |
| "FINAL LINE: 'Read more at chainstreet dot i o.'\n" |
| "Output ONLY {n} lines, one per line." |
| ), |
| "Custom Prompt": ( |
| "You are a senior content writer. Create a punchy short-form video script (max 45s) for this content.\n" |
| "Structure: strong hook, conflict, key insight, stakes, rhetorical question, CTA.\n" |
| "Write exactly {n} lines, each 8-15 words. " |
| "FINAL LINE must end with 'chainstreet dot i o'.\n" |
| "Output ONLY {n} lines, one per line, no extra text." |
| ), |
| } |
| for _rt in [ |
| "Ranking", "Step by Step Guide", "Statistics", "Quiz", "Famous Quotes", |
| "Product Demo", "Joke", "Blog to Reel", |
| ]: |
| REEL_PROMPTS[_rt] = ( |
| f"Create a {_rt} reel script from the content below.\n" |
| "Each line: 10-15 words, punchy. FINAL LINE: 'Read the full story at chainstreet dot i o.'\n" |
| "Write exactly {n} lines, one per line, no extra text." |
| ) |
|
|
|
|
| def _get_reel_font(size: int = 64): |
| key = f"s{size}" |
| if key not in _reel_fonts: |
| if not os.path.exists(FONT_BOLD_PATH): |
| try: |
| with urllib.request.urlopen(FONT_URL, timeout=10) as r: |
| Path(FONT_BOLD_PATH).write_bytes(r.read()) |
| except Exception: |
| pass |
| |
| for font_path in [ |
| FONT_BOLD_PATH, |
| "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", |
| ]: |
| try: |
| _reel_fonts[key] = ImageFont.truetype(font_path, size) |
| break |
| except Exception: |
| continue |
| else: |
| _reel_fonts[key] = ImageFont.load_default() |
| return _reel_fonts[key] |
|
|
|
|
| def _scrape_url(url: str) -> str: |
| try: |
| import trafilatura |
| dl = trafilatura.fetch_url(url) |
| text = trafilatura.extract(dl, include_tables=False, include_comments=False, favor_recall=True) |
| return (text or "").strip() |
| except Exception as e: |
| logger.warning(f"[scrape] {e}") |
| return "" |
|
|
|
|
| def _generate_reel_script(content: str, groq_key: str, reel_type: str, num_points: int) -> list: |
| import random |
| if not groq_key.strip(): |
| groq_key = os.getenv("GROQ_API_KEY", "") |
| if not groq_key: |
| raise ValueError("Groq API key required for article reels.") |
| from groq import Groq |
| client = Groq(api_key=groq_key.strip()) |
| template = REEL_PROMPTS.get(reel_type, REEL_PROMPTS["Custom Prompt"]) |
| system = template.format(n=num_points) |
| resp = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=[ |
| {"role": "system", "content": system}, |
| {"role": "user", "content": f"Content:\n{content[:3500]}"}, |
| ], |
| temperature=0.78, |
| max_tokens=600, |
| ) |
| raw = resp.choices[0].message.content.strip() |
| lines = [l.strip().lstrip("β’-*0123456789. ") for l in raw.splitlines() if l.strip()] |
| lines = lines[:num_points] |
|
|
| |
| if lines: |
| lines[-1] = random.choice(CTA_POOL) |
|
|
| return lines |
|
|
|
|
| async def _edge_tts_save(text: str, voice: str, path: str): |
| import edge_tts |
| await edge_tts.Communicate(text, voice).save(path) |
|
|
|
|
| def _generate_voice(text: str, voice_display: str) -> str: |
| text = text.replace("chainstreet.io", "chainstreet dot i o") |
| voice = EDGE_VOICES_MAP.get(voice_display, "en-US-GuyNeural") |
| out = tempfile.mktemp(suffix=".mp3") |
| for attempt in range(2): |
| try: |
| asyncio.run(_edge_tts_save(text, voice, out)) |
| return out |
| except Exception as e: |
| logger.warning(f"[TTS] attempt {attempt+1}: {e}") |
| voice = "en-US-AriaNeural" |
| raise RuntimeError("TTS failed") |
|
|
|
|
| CTA_POOL = [ |
| "Read the full story at chainstreet dot i o.", |
| "Get the complete breakdown at chainstreet dot i o.", |
| "All the details are live at chainstreet dot i o.", |
| "Don't miss the full report β chainstreet dot i o.", |
| "Follow the full story only at chainstreet dot i o.", |
| "Catch every detail at chainstreet dot i o.", |
| "Dive deeper at chainstreet dot i o.", |
| "Your source for this story: chainstreet dot i o.", |
| "More context and analysis at chainstreet dot i o.", |
| "Stay ahead β read the full piece at chainstreet dot i o.", |
| ] |
|
|
|
|
| def _render_slide(text: str, W: int, H: int, accent_rgb: tuple, |
| slide_idx: int, total_slides: int) -> np.ndarray: |
| """Render a complete full-frame RGB slide. All compositing done in PIL β MoviePy never sees RGBA.""" |
| ar, ag, ab = accent_rgb |
|
|
| |
| rows = np.arange(H, dtype=np.float32) / H |
| bg = np.zeros((H, W, 4), dtype=np.uint8) |
| bg[:, :, 0] = np.clip(8 + (rows * min(ar // 3, 38))[:, None], 0, 25).astype(np.uint8) |
| bg[:, :, 1] = np.clip(5 + (rows * min(ag // 6, 18))[:, None], 0, 18).astype(np.uint8) |
| bg[:, :, 2] = np.clip(18 + (rows * min(ab // 4, 52))[:, None], 0, 60).astype(np.uint8) |
| bg[:, :, 3] = 255 |
| canvas = Image.fromarray(bg, mode="RGBA") |
|
|
| |
| font = _get_reel_font(62) |
| font_sm = _get_reel_font(26) |
| fsize = getattr(font, "size", 62) |
|
|
| tmp_d = ImageDraw.Draw(Image.new("RGBA", (10, 10))) |
| words, wrapped, cur = text.split(), [], [] |
| for word in words: |
| test = " ".join(cur + [word]) |
| try: |
| bb = tmp_d.textbbox((0, 0), test, font=font) |
| w = bb[2] - bb[0] |
| except Exception: |
| w = len(test) * (fsize // 2) |
| if w > W * 0.80 and cur: |
| wrapped.append(" ".join(cur)); cur = [word] |
| else: |
| cur.append(word) |
| if cur: |
| wrapped.append(" ".join(cur)) |
|
|
| line_h = int(fsize * 1.40) |
| total_text_h = len(wrapped) * line_h |
| pad_y = 44 |
|
|
| cy = H // 2 - 30 |
| card_top = cy - total_text_h // 2 - pad_y |
| card_bot = cy + total_text_h // 2 + pad_y |
|
|
| |
| card = Image.new("RGBA", (W, H), (0, 0, 0, 0)) |
| cd = ImageDraw.Draw(card) |
| cd.rounded_rectangle([30, card_top, W - 30, card_bot], radius=26, fill=(20, 14, 42, 210)) |
| cd.rounded_rectangle([30, card_top, 42, card_bot], radius=5, fill=(*accent_rgb, 230)) |
| cd.rounded_rectangle([30, card_top, W - 30, card_bot], radius=26, |
| outline=(255, 255, 255, 35), width=1) |
|
|
| canvas = Image.alpha_composite(canvas, card) |
| draw = ImageDraw.Draw(canvas) |
|
|
| |
| ty = cy - total_text_h // 2 |
| for line in wrapped: |
| try: |
| bb = draw.textbbox((0, 0), line, font=font) |
| tw = bb[2] - bb[0] |
| except Exception: |
| tw = len(line) * (fsize // 2) |
| tx = (W - tw) // 2 |
| draw.text((tx + 2, ty + 2), line, font=font, fill=(0, 0, 0, 170)) |
| draw.text((tx, ty), line, font=font, fill=(255, 255, 255, 255)) |
| ty += line_h |
|
|
| |
| dot_r, dot_gap = 5, 20 |
| total_dots_w = total_slides * dot_gap |
| dx = (W - total_dots_w) // 2 + dot_r |
| dy = H - 68 |
| for i in range(total_slides): |
| r = dot_r if i == slide_idx else dot_r - 2 |
| fill = (*accent_rgb, 255) if i == slide_idx else (255, 255, 255, 55) |
| draw.ellipse([dx - r, dy - r, dx + r, dy + r], fill=fill) |
| dx += dot_gap |
|
|
| |
| ctr = f"{slide_idx + 1} / {total_slides}" |
| try: |
| cb = draw.textbbox((0, 0), ctr, font=font_sm) |
| draw.text((W - (cb[2] - cb[0]) - 26, 26), ctr, font=font_sm, fill=(255, 255, 255, 85)) |
| except Exception: |
| pass |
|
|
| |
| return np.array(canvas.convert("RGB")) |
|
|
|
|
| def _render_endcard(W: int, H: int, accent_rgb: tuple) -> np.ndarray: |
| """Premium chainstreet.io endcard slide.""" |
| ar, ag, ab = accent_rgb |
| rows = np.arange(H, dtype=np.float32) / H |
| bg = np.zeros((H, W, 4), dtype=np.uint8) |
| bg[:, :, 0] = np.clip(10 + (rows * min(ar // 2, 50))[:, None], 0, 60).astype(np.uint8) |
| bg[:, :, 1] = np.clip(8 + (rows * min(ag // 4, 25))[:, None], 0, 30).astype(np.uint8) |
| bg[:, :, 2] = np.clip(24 + (rows * min(ab // 3, 65))[:, None], 0, 90).astype(np.uint8) |
| bg[:, :, 3] = 255 |
| canvas = Image.fromarray(bg, mode="RGBA") |
|
|
| |
| glow = Image.new("RGBA", (W, H), (0, 0, 0, 0)) |
| gd = ImageDraw.Draw(glow) |
| cx, cy = W // 2, H // 2 - 100 |
| for radius in range(260, 0, -14): |
| alpha = int(35 * (260 - radius) / 260) |
| gd.ellipse([cx - radius, cy - radius, cx + radius, cy + radius], |
| fill=(*accent_rgb, alpha)) |
| canvas = Image.alpha_composite(canvas, glow) |
| draw = ImageDraw.Draw(canvas) |
|
|
| font_big = _get_reel_font(86) |
| font_med = _get_reel_font(44) |
| font_sm = _get_reel_font(28) |
| fsize_b = getattr(font_big, "size", 86) |
|
|
| |
| brand, tld = "chainstreet", ".io" |
| try: |
| bb1 = draw.textbbox((0, 0), brand, font=font_big) |
| bb2 = draw.textbbox((0, 0), tld, font=font_big) |
| bw, tiw = bb1[2] - bb1[0], bb2[2] - bb2[0] |
| except Exception: |
| bw, tiw = len(brand) * (fsize_b // 2), len(tld) * (fsize_b // 2) |
|
|
| total_w = bw + tiw |
| tx = (W - total_w) // 2 |
| ty = H // 2 - fsize_b - 20 |
| draw.text((tx + 2, ty + 2), brand, font=font_big, fill=(0, 0, 0, 150)) |
| draw.text((tx, ty), brand, font=font_big, fill=(255, 255, 255, 255)) |
| draw.text((tx + bw + 2, ty + 2), tld, font=font_big, fill=(0, 0, 0, 150)) |
| draw.text((tx + bw, ty), tld, font=font_big, fill=(*accent_rgb, 255)) |
|
|
| |
| sub = "Your daily crypto briefing" |
| try: |
| sb = draw.textbbox((0, 0), sub, font=font_sm) |
| draw.text(((W - (sb[2] - sb[0])) // 2, H // 2 + 30), sub, font=font_sm, |
| fill=(255, 255, 255, 160)) |
| except Exception: |
| pass |
|
|
| |
| cta = "Follow for more β" |
| try: |
| cb = draw.textbbox((0, 0), cta, font=font_med) |
| cw, ch = cb[2] - cb[0], cb[3] - cb[1] |
| except Exception: |
| cw, ch = len(cta) * 22, 44 |
| bx1, by1 = (W - cw) // 2 - 32, H // 2 + 100 |
| bx2, by2 = bx1 + cw + 64, by1 + ch + 28 |
|
|
| btn = Image.new("RGBA", (W, H), (0, 0, 0, 0)) |
| bd = ImageDraw.Draw(btn) |
| bd.rounded_rectangle([bx1, by1, bx2, by2], radius=50, fill=(*accent_rgb, 255)) |
| canvas = Image.alpha_composite(canvas, btn) |
| draw = ImageDraw.Draw(canvas) |
| draw.text(((W - cw) // 2, by1 + 14), cta, font=font_med, fill=(255, 255, 255, 255)) |
|
|
| return np.array(canvas.convert("RGB")) |
|
|
|
|
| def _build_article_reel(sentences: list, audio_path: str, accent_hex: str) -> str: |
| """Build article reel using concatenated full-RGB slides β no MoviePy alpha compositing.""" |
| import moviepy.editor as mpe |
| W, H = VIDEO_W, VIDEO_H |
| try: |
| accent_rgb = tuple(int(accent_hex.lstrip("#")[i:i+2], 16) for i in (0, 2, 4)) |
| except Exception: |
| accent_rgb = (108, 99, 255) |
|
|
| audio = mpe.AudioFileClip(audio_path) |
| total_dur = audio.duration |
| n = len(sentences) |
|
|
| |
| endcard_dur = max(total_dur * 0.08, 2.0) |
| dur_each = (total_dur - endcard_dur) / n if n > 0 else total_dur |
|
|
| clips = [] |
| for i, sentence in enumerate(sentences): |
| frame = _render_slide(sentence, W, H, accent_rgb, i, n) |
| clips.append(mpe.ImageClip(frame).set_duration(dur_each)) |
|
|
| |
| endcard_frame = _render_endcard(W, H, accent_rgb) |
| clips.append(mpe.ImageClip(endcard_frame).set_duration(endcard_dur)) |
|
|
| |
| final = mpe.concatenate_videoclips(clips, method="chain").set_audio(audio) |
| out = tempfile.mktemp(suffix=".mp4") |
| final.write_videofile( |
| out, codec="libx264", audio_codec="aac", |
| fps=FPS, preset="ultrafast", threads=4, logger=None, |
| ) |
| return out |
|
|
|
|
| |
| article_jobs: dict = {} |
|
|
|
|
| @app.post("/api/article-reels") |
| async def api_article_reels( |
| background_tasks: BackgroundTasks, |
| article_url: str = Form(...), |
| reel_type: str = Form("Financial News Reel"), |
| num_points: int = Form(7), |
| tts_voice: str = Form("Guy - News Anchor (Male)"), |
| accent_hex: str = Form("#6C63FF"), |
| groq_api_key: str = Form(""), |
| ): |
| """Generate a viral reel from a news article URL with chainstreet.io CTA.""" |
| if not article_url.strip(): |
| raise HTTPException(status_code=400, detail="article_url is required") |
|
|
| job_id = str(uuid4()) |
| article_jobs[job_id] = { |
| "job_id": job_id, |
| "status": "processing", |
| "progress": 0, |
| "message": "Startingβ¦", |
| "source_url": article_url, |
| "output_path": None, |
| "error": None, |
| } |
| background_tasks.add_task( |
| _run_article_reel_job, |
| job_id, article_url, groq_api_key, reel_type, |
| num_points, tts_voice, accent_hex, |
| ) |
| return {"job_id": job_id, "status": "processing", "progress": 0} |
|
|
|
|
| @app.get("/api/job/{job_id}") |
| async def get_job_by_id(job_id: str): |
| """Unified job status β checks both video-clip jobs and article-reel jobs.""" |
| if job_id in jobs_store: |
| return jobs_store[job_id].to_dict() |
| if job_id in article_jobs: |
| return article_jobs[job_id] |
| raise HTTPException(status_code=404, detail="Job not found") |
|
|
|
|
| @app.get("/api/article-download/{job_id}") |
| async def download_article_reel(job_id: str): |
| if job_id not in article_jobs: |
| raise HTTPException(status_code=404, detail="Job not found") |
| job = article_jobs[job_id] |
| if job["status"] != "completed" or not job.get("output_path"): |
| raise HTTPException(status_code=400, detail="Reel not ready") |
| path = Path(job["output_path"]) |
| if not path.exists(): |
| raise HTTPException(status_code=404, detail="Reel file missing") |
| return FileResponse(path, filename=f"article_reel_{job_id}.mp4", media_type="video/mp4") |
|
|
|
|
| def _run_article_reel_job( |
| job_id, article_url, groq_api_key, reel_type, num_points, tts_voice, accent_hex, |
| ): |
| def upd(pct, msg): |
| article_jobs[job_id].update({"progress": int(pct * 100), "message": msg, "status": "processing"}) |
|
|
| try: |
| upd(0.05, "Scraping articleβ¦") |
| content = _scrape_url(article_url) |
| if not content or len(content) < 40: |
| raise ValueError("Could not extract text from the URL. Try pasting content directly.") |
|
|
| upd(0.20, "Generating script with AIβ¦") |
| sentences = _generate_reel_script(content, groq_api_key, reel_type, num_points) |
| if not sentences: |
| raise ValueError("Script generation failed β check your Groq API key.") |
|
|
| upd(0.45, "Generating AI voiceoverβ¦") |
| full_text = " ".join(sentences) |
| audio_path = _generate_voice(full_text, tts_voice) |
|
|
| upd(0.70, "Assembling reelβ¦") |
| reel_path = _build_article_reel(sentences, audio_path, accent_hex) |
|
|
| article_jobs[job_id].update({ |
| "status": "completed", |
| "progress": 100, |
| "message": "Reel ready!", |
| "output_path": reel_path, |
| "sentences": sentences, |
| }) |
| except Exception as e: |
| import traceback |
| logger.error(traceback.format_exc()) |
| article_jobs[job_id].update({"status": "failed", "error": str(e), "message": str(e)}) |
|
|
|
|
| @app.exception_handler(Exception) |
| async def general_exception_handler(request, exc): |
| """Global exception handler""" |
| logger.error(f"Unhandled exception: {str(exc)}") |
| return HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
| |
| |
| |
| @app.get("/{full_path:path}") |
| async def spa_fallback(full_path: str): |
| """SPA catchβall β serves index.html for React Router, but ignores API routes.""" |
| |
| if full_path.startswith("api/") or full_path.startswith("health") or full_path.startswith("docs"): |
| raise HTTPException(status_code=404, detail="Not found") |
| index_path = frontend_dir / "index.html" |
| if index_path.exists(): |
| return FileResponse(index_path) |
| raise HTTPException(status_code=404, detail="Frontend not built") |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860, workers=1) |