clipon / app.py
yonagush
Fix YouTube download failures on datacenter IPs via cookies.txt support
31dce00
"""
ClipCraft AI - FastAPI Application
AI-powered video clip generator for social media
"""
import asyncio
import json
import logging
import os
import tempfile
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from uuid import uuid4
import numpy as np
import requests as _requests
from PIL import Image, ImageDraw, ImageFont
from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from config import settings
from models.schemas import (
Clip,
JobResult,
JobStatusEnum,
ProcessRequest,
TranscriptSegment,
)
from services.caption_styler import CaptionStyler
from services.clip_detector import ClipDetector
from services.downloader import VideoDownloader
from services.hook_rewriter import HookRewriter
from services.transcriber import Transcriber
from services.video_editor import VideoEditor
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="ClipCraft AI",
description="AI-powered video clip generator for social media",
version="1.0.0"
)
# Add middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Initialize services
downloader = VideoDownloader()
transcriber = Transcriber()
clip_detector = ClipDetector()
video_editor = VideoEditor()
caption_styler = CaptionStyler()
hook_rewriter = HookRewriter()
# In-memory job store
jobs_store: Dict[str, JobResult] = {}
websocket_connections: Dict[str, List[WebSocket]] = {}
# WebSocket Connection Manager
class ConnectionManager:
"""Manage WebSocket connections for real-time updates"""
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, job_id: str, websocket: WebSocket):
"""Add WebSocket connection"""
await websocket.accept()
if job_id not in self.active_connections:
self.active_connections[job_id] = []
self.active_connections[job_id].append(websocket)
async def disconnect(self, job_id: str, websocket: WebSocket):
"""Remove WebSocket connection"""
if job_id in self.active_connections:
self.active_connections[job_id].remove(websocket)
if not self.active_connections[job_id]:
del self.active_connections[job_id]
async def broadcast(self, job_id: str, message: dict):
"""Broadcast message to all connected clients"""
if job_id not in self.active_connections:
return
disconnected = []
for connection in self.active_connections[job_id]:
try:
await connection.send_json(message)
except Exception as e:
logger.error(f"Failed to send WebSocket message: {str(e)}")
disconnected.append(connection)
# Clean up disconnected clients
for connection in disconnected:
await self.disconnect(job_id, connection)
manager = ConnectionManager()
# Serve static frontend assets (JS, CSS, images)
frontend_dir = Path(__file__).parent / "frontend" / "dist"
if frontend_dir.exists():
app.mount("/assets", StaticFiles(directory=frontend_dir / "assets"), name="assets")
@app.get("/")
async def root():
"""Serve frontend index.html"""
index_path = frontend_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
return HTMLResponse("""
<html>
<head><title>ClipCraft AI</title>
<style>body{background:#0A0A0F;color:#fff;font-family:Inter,sans-serif;display:flex;align-items:center;justify-content:center;height:100vh;flex-direction:column;gap:16px}</style>
</head>
<body>
<h1 style="background:linear-gradient(135deg,#6C63FF,#FF6B6B);-webkit-background-clip:text;-webkit-text-fill-color:transparent;font-size:2.5rem">ClipCraft AI</h1>
<p style="color:#888">AI-powered video clip generator β€” frontend building...</p>
</body>
</html>
""")
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "ok",
"version": "1.0.0",
"timestamp": datetime.utcnow().isoformat()
}
@app.post("/api/process")
async def process_video(
url: Optional[str] = Form(None),
file: Optional[UploadFile] = File(None),
language: str = Form("auto"),
min_clip_duration: int = Form(30),
max_clip_duration: int = Form(90),
num_clips: int = Form(10),
aspect_ratio: str = Form("9:16"),
rewrite_hooks: bool = Form(True),
caption_style: Optional[str] = Form(None), # BUG FIX: was caption_style_json
brand_kit: Optional[str] = Form(None), # BUG FIX: was brand_kit_json
logo_file: Optional[UploadFile] = File(None), # brand logo upload
cookies_file: Optional[UploadFile] = File(None), # YouTube cookies.txt for datacenter IP bypass
):
"""
Start a new video processing job
Returns:
Job ID and initial status
"""
try:
# Validate inputs
if not url and not file:
raise HTTPException(status_code=400, detail="Either URL or file must be provided")
# Generate job ID
job_id = str(uuid4())
logger.info(f"Created job {job_id}")
# Parse optional JSON configs
caption_style_dict = {}
brand_kit_dict = {}
if caption_style:
try:
caption_style_dict = json.loads(caption_style)
except json.JSONDecodeError:
pass
if brand_kit:
try:
brand_kit_dict = json.loads(brand_kit)
except json.JSONDecodeError:
pass
# Save uploaded logo if provided
if logo_file and logo_file.filename:
import shutil
logo_dir = settings.uploads_dir / "logos"
logo_dir.mkdir(parents=True, exist_ok=True)
logo_path = logo_dir / logo_file.filename
contents = await logo_file.read()
logo_path.write_bytes(contents)
brand_kit_dict["logo_url"] = str(logo_path)
# Create request
request = ProcessRequest(
url=url,
language=language,
min_clip_duration=min_clip_duration,
max_clip_duration=max_clip_duration,
num_clips=num_clips,
aspect_ratio=aspect_ratio,
rewrite_hooks=rewrite_hooks,
**caption_style_dict if caption_style_dict else {},
**brand_kit_dict if brand_kit_dict else {}
)
# Initialize job result
job_result = JobResult(
job_id=job_id,
status=JobStatusEnum.queued,
progress=0,
message="Job queued",
source_url=url,
)
jobs_store[job_id] = job_result
# BUG FIX: save uploaded file to disk NOW (before the HTTP request closes)
# Passing UploadFile to create_task causes "read of closed file" because
# FastAPI closes the file after the endpoint returns.
saved_file_path = None
if file and file.filename:
saved_file_path = settings.uploads_dir / file.filename
saved_file_path.parent.mkdir(parents=True, exist_ok=True)
saved_file_path.write_bytes(await file.read())
# Resolve cookies path for YouTube datacenter IP bypass.
# Priority: 1) uploaded cookies_file 2) YT_COOKIES_PATH env var (HF Secret)
resolved_cookies_path: Optional[str] = None
env_cookies = os.getenv("YT_COOKIES_PATH")
if cookies_file and cookies_file.filename:
cookies_dir = settings.uploads_dir / "cookies"
cookies_dir.mkdir(parents=True, exist_ok=True)
cookies_save_path = cookies_dir / f"{job_id}_cookies.txt"
cookies_save_path.write_bytes(await cookies_file.read())
resolved_cookies_path = str(cookies_save_path)
logger.info(f"Job {job_id}: Using uploaded cookies file")
elif env_cookies and os.path.isfile(env_cookies):
resolved_cookies_path = env_cookies
logger.info(f"Job {job_id}: Using YT_COOKIES_PATH env cookies")
# Start background processing
asyncio.create_task(process_video_job(job_id, request, saved_file_path, resolved_cookies_path))
return {
"job_id": job_id,
"status": JobStatusEnum.queued.value,
"progress": 0
}
except Exception as e:
logger.error(f"Error processing video: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# NOTE: /api/job/{job_id} is now defined further down (after article_jobs store)
# to handle both video-clip jobs AND article-reel jobs in one unified endpoint.
@app.get("/api/jobs")
async def get_recent_jobs(limit: int = 10):
"""Get list of recent jobs"""
jobs = list(jobs_store.values())
jobs_sorted = sorted(jobs, key=lambda x: x.updated_at, reverse=True)
return [job.to_dict() for job in jobs_sorted[:limit]]
@app.delete("/api/job/{job_id}")
async def delete_job(job_id: str):
"""Delete job and cleanup files"""
try:
if job_id not in jobs_store:
raise HTTPException(status_code=404, detail="Job not found")
# Delete job files
job_dir = settings.output_dir / job_id
if job_dir.exists():
import shutil
shutil.rmtree(job_dir)
# Remove from store
del jobs_store[job_id]
return {"status": "deleted", "job_id": job_id}
except Exception as e:
logger.error(f"Error deleting job: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/clips/{job_id}/{clip_id}/download")
async def download_clip(job_id: str, clip_id: str):
"""Download processed clip video"""
try:
if job_id not in jobs_store:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs_store[job_id]
clip = next((c for c in job.clips if str(c.id) == clip_id), None)
if not clip or not clip.output_path:
raise HTTPException(status_code=404, detail="Clip not found")
clip_path = Path(clip.output_path)
if not clip_path.exists():
raise HTTPException(status_code=404, detail="Clip file not found")
return FileResponse(
path=clip_path,
filename=f"clip_{clip_id}.mp4",
media_type="video/mp4"
)
except Exception as e:
logger.error(f"Error downloading clip: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/clips/{job_id}/{clip_id}/thumbnail")
async def get_thumbnail(job_id: str, clip_id: str):
"""Download clip thumbnail"""
try:
if job_id not in jobs_store:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs_store[job_id]
clip = next((c for c in job.clips if str(c.id) == clip_id), None)
if not clip or not clip.thumbnail_path:
raise HTTPException(status_code=404, detail="Thumbnail not found")
thumb_path = Path(clip.thumbnail_path)
if not thumb_path.exists():
raise HTTPException(status_code=404, detail="Thumbnail file not found")
return FileResponse(
path=thumb_path,
media_type="image/jpeg"
)
except Exception as e:
logger.error(f"Error downloading thumbnail: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.websocket("/api/ws/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: str):
"""WebSocket endpoint for real-time job progress"""
await manager.connect(job_id, websocket)
try:
while True:
# Keep connection alive
await websocket.receive_text()
except WebSocketDisconnect:
await manager.disconnect(job_id, websocket)
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
await manager.disconnect(job_id, websocket)
# Background task for video processing
async def process_video_job(
job_id: str,
request: ProcessRequest,
file_path: Optional[Path],
cookies_path: Optional[str] = None,
) -> None:
"""
Background task to process video and generate clips.
file_path is a Path (already saved to disk) or None for URL jobs.
cookies_path is an optional path to a Netscape-format cookies.txt for YouTube.
"""
job = jobs_store[job_id]
try:
# Step 1: Download or extract video
logger.info(f"Job {job_id}: Starting download phase")
job.status = JobStatusEnum.downloading
job.progress = 5
await manager.broadcast(job_id, job.to_dict())
if file_path:
# File already saved to disk by endpoint handler
video_path = file_path
job.source_title = file_path.name
else:
# Download from URL (pass cookies for YouTube datacenter IP bypass)
result = await downloader.download(request.url, settings.uploads_dir, cookies_path=cookies_path)
video_path = Path(result["path"])
job.source_title = result["title"]
job.duration = result["duration"]
logger.info(f"Job {job_id}: Video ready at {video_path}")
# Step 2: Extract audio and transcribe
logger.info(f"Job {job_id}: Starting transcription phase")
job.status = JobStatusEnum.transcribing
job.progress = 20
await manager.broadcast(job_id, job.to_dict())
audio_path = await downloader.extract_audio(video_path, settings.uploads_dir)
segments: List[TranscriptSegment] = await transcriber.transcribe(
audio_path,
language=request.language
)
job.language = request.language if request.language != "auto" else transcriber.detect_language(audio_path)
logger.info(f"Job {job_id}: Transcription complete with {len(segments)} segments")
# Step 3: Detect clips using AI
logger.info(f"Job {job_id}: Starting clip detection phase")
job.status = JobStatusEnum.analyzing
job.progress = 40
await manager.broadcast(job_id, job.to_dict())
clips = await clip_detector.detect_clips(segments, request)
for clip in clips:
clip.job_id = job_id
logger.info(f"Job {job_id}: Detected {len(clips)} clips")
# Step 4: Rewrite hooks
if request.rewrite_hooks:
logger.info(f"Job {job_id}: Rewriting hooks")
for clip in clips:
rewritten = await hook_rewriter.rewrite(clip)
if rewritten:
clip.rewritten_hook = rewritten
# Step 5: Edit videos
logger.info(f"Job {job_id}: Starting video editing phase")
job.status = JobStatusEnum.editing
job.progress = 60
job.clips = clips
await manager.broadcast(job_id, job.to_dict())
clips = await video_editor.process_clips(video_path, clips, request)
# Step 6: Add captions
logger.info(f"Job {job_id}: Starting caption phase")
job.status = JobStatusEnum.captioning
job.progress = 80
await manager.broadcast(job_id, job.to_dict())
clips = await caption_styler.add_captions(clips, request.caption_style, request.brand_kit)
# Mark successful clips
for clip in clips:
if clip.output_path:
clip.status = JobStatusEnum.completed
job.clips = clips
# Completion
logger.info(f"Job {job_id}: Processing complete")
job.status = JobStatusEnum.completed
job.progress = 100
job.message = f"Successfully generated {len(clips)} clips"
job.updated_at = datetime.utcnow()
await manager.broadcast(job_id, job.to_dict())
except Exception as e:
logger.error(f"Job {job_id} failed: {str(e)}")
job.status = JobStatusEnum.failed
job.progress = 0
job.message = ""
job.error = str(e)
job.updated_at = datetime.utcnow()
await manager.broadcast(job_id, job.to_dict())
# ─────────────────────────────────────────────────────────────────────────────
# ARTICLE β†’ REELS PIPELINE
# All heavy imports are lazy so server startup stays fast.
# ─────────────────────────────────────────────────────────────────────────────
FONT_BOLD_PATH = "/tmp/Montserrat-Bold.ttf"
FONT_LIGHT_PATH = "/tmp/Montserrat-Light.ttf"
FONT_URL = "https://github.com/google/fonts/raw/main/ofl/montserrat/Montserrat-Bold.ttf"
VIDEO_W, VIDEO_H, FPS = 720, 1280, 24
_reel_fonts: dict = {}
EDGE_VOICES_MAP = {
"Guy - News Anchor (Male)": "en-US-GuyNeural",
"Davis (Male)": "en-US-DavisNeural",
"Ryan (Male)": "en-GB-RyanNeural",
"William (Male)": "en-AU-WilliamNeural",
"Aria (Female)": "en-US-AriaNeural",
"Jenny (Female)": "en-US-JennyNeural",
"Sonia (Female)": "en-GB-SoniaNeural",
"Natasha (Female)": "en-AU-NatashaNeural",
}
REEL_PROMPTS = {
"Financial News Reel": (
"You are a Bloomberg/WSJ-style breaking-news anchor writer.\n"
"Create a fast-paced, professional financial-news reel script β€” urgent teaser style.\n"
"Rules:\n"
" 1. Line 1 MUST be a powerful shocking hook that stops the scroll.\n"
" 2. Give only the high-level gist. Withhold specifics so viewers click through.\n"
" 3. Tone: authoritative, slightly shocked, premium financial-media.\n"
" 4. Write exactly {n} lines. Each line: 13-16 words, ~6 seconds spoken.\n"
" 5. No bullet points, no numbering, no emojis β€” pure spoken sentences only.\n"
" 6. FINAL LINE MUST BE: 'Read the full story at chainstreet dot i o.'\n"
"Output ONLY the {n} lines, one per line, nothing else."
),
"Top 5": (
"Create a viral 'Top 5' reel script. Hook sentence, then 5 items (5. … 1. …).\n"
"Each item: one punchy sentence, 10-15 words.\n"
"FINAL LINE: 'Get the full breakdown at chainstreet dot i o.'\n"
"Output ONLY {n} lines, one per line, no extra text."
),
"Fact": (
"Create a FACTS reel script. Lead with the most shocking fact as a hook.\n"
"Each of {n} lines: one amazing fact, 10-15 words.\n"
"FINAL LINE: 'Read more at chainstreet dot i o.'\n"
"Output ONLY {n} lines, one per line."
),
"Custom Prompt": (
"You are a senior content writer. Create a punchy short-form video script (max 45s) for this content.\n"
"Structure: strong hook, conflict, key insight, stakes, rhetorical question, CTA.\n"
"Write exactly {n} lines, each 8-15 words. "
"FINAL LINE must end with 'chainstreet dot i o'.\n"
"Output ONLY {n} lines, one per line, no extra text."
),
}
for _rt in [
"Ranking", "Step by Step Guide", "Statistics", "Quiz", "Famous Quotes",
"Product Demo", "Joke", "Blog to Reel",
]:
REEL_PROMPTS[_rt] = (
f"Create a {_rt} reel script from the content below.\n"
"Each line: 10-15 words, punchy. FINAL LINE: 'Read the full story at chainstreet dot i o.'\n"
"Write exactly {n} lines, one per line, no extra text."
)
def _get_reel_font(size: int = 64):
key = f"s{size}"
if key not in _reel_fonts:
if not os.path.exists(FONT_BOLD_PATH):
try:
with urllib.request.urlopen(FONT_URL, timeout=10) as r:
Path(FONT_BOLD_PATH).write_bytes(r.read())
except Exception:
pass
# Try Montserrat first, then system fonts guaranteed by Dockerfile
for font_path in [
FONT_BOLD_PATH,
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
]:
try:
_reel_fonts[key] = ImageFont.truetype(font_path, size)
break
except Exception:
continue
else:
_reel_fonts[key] = ImageFont.load_default()
return _reel_fonts[key]
def _scrape_url(url: str) -> str:
try:
import trafilatura
dl = trafilatura.fetch_url(url)
text = trafilatura.extract(dl, include_tables=False, include_comments=False, favor_recall=True)
return (text or "").strip()
except Exception as e:
logger.warning(f"[scrape] {e}")
return ""
def _generate_reel_script(content: str, groq_key: str, reel_type: str, num_points: int) -> list:
import random
if not groq_key.strip():
groq_key = os.getenv("GROQ_API_KEY", "")
if not groq_key:
raise ValueError("Groq API key required for article reels.")
from groq import Groq
client = Groq(api_key=groq_key.strip())
template = REEL_PROMPTS.get(reel_type, REEL_PROMPTS["Custom Prompt"])
system = template.format(n=num_points)
resp = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Content:\n{content[:3500]}"},
],
temperature=0.78,
max_tokens=600,
)
raw = resp.choices[0].message.content.strip()
lines = [l.strip().lstrip("β€’-*0123456789. ") for l in raw.splitlines() if l.strip()]
lines = lines[:num_points]
# Replace last line with a random CTA variation (avoids monotony across reels)
if lines:
lines[-1] = random.choice(CTA_POOL)
return lines
async def _edge_tts_save(text: str, voice: str, path: str):
import edge_tts
await edge_tts.Communicate(text, voice).save(path)
def _generate_voice(text: str, voice_display: str) -> str:
text = text.replace("chainstreet.io", "chainstreet dot i o")
voice = EDGE_VOICES_MAP.get(voice_display, "en-US-GuyNeural")
out = tempfile.mktemp(suffix=".mp3")
for attempt in range(2):
try:
asyncio.run(_edge_tts_save(text, voice, out))
return out
except Exception as e:
logger.warning(f"[TTS] attempt {attempt+1}: {e}")
voice = "en-US-AriaNeural"
raise RuntimeError("TTS failed")
CTA_POOL = [
"Read the full story at chainstreet dot i o.",
"Get the complete breakdown at chainstreet dot i o.",
"All the details are live at chainstreet dot i o.",
"Don't miss the full report β€” chainstreet dot i o.",
"Follow the full story only at chainstreet dot i o.",
"Catch every detail at chainstreet dot i o.",
"Dive deeper at chainstreet dot i o.",
"Your source for this story: chainstreet dot i o.",
"More context and analysis at chainstreet dot i o.",
"Stay ahead β€” read the full piece at chainstreet dot i o.",
]
def _render_slide(text: str, W: int, H: int, accent_rgb: tuple,
slide_idx: int, total_slides: int) -> np.ndarray:
"""Render a complete full-frame RGB slide. All compositing done in PIL β€” MoviePy never sees RGBA."""
ar, ag, ab = accent_rgb
# ── 1. Dark gradient background via numpy (fast, no brown) ─────────────
rows = np.arange(H, dtype=np.float32) / H # 0 β†’ 1 top-to-bottom
bg = np.zeros((H, W, 4), dtype=np.uint8)
bg[:, :, 0] = np.clip(8 + (rows * min(ar // 3, 38))[:, None], 0, 25).astype(np.uint8)
bg[:, :, 1] = np.clip(5 + (rows * min(ag // 6, 18))[:, None], 0, 18).astype(np.uint8)
bg[:, :, 2] = np.clip(18 + (rows * min(ab // 4, 52))[:, None], 0, 60).astype(np.uint8)
bg[:, :, 3] = 255
canvas = Image.fromarray(bg, mode="RGBA")
# ── 2. Text layout (word-wrap) ──────────────────────────────────────────
font = _get_reel_font(62)
font_sm = _get_reel_font(26)
fsize = getattr(font, "size", 62)
tmp_d = ImageDraw.Draw(Image.new("RGBA", (10, 10)))
words, wrapped, cur = text.split(), [], []
for word in words:
test = " ".join(cur + [word])
try:
bb = tmp_d.textbbox((0, 0), test, font=font)
w = bb[2] - bb[0]
except Exception:
w = len(test) * (fsize // 2)
if w > W * 0.80 and cur:
wrapped.append(" ".join(cur)); cur = [word]
else:
cur.append(word)
if cur:
wrapped.append(" ".join(cur))
line_h = int(fsize * 1.40)
total_text_h = len(wrapped) * line_h
pad_y = 44
cy = H // 2 - 30 # slightly above true centre
card_top = cy - total_text_h // 2 - pad_y
card_bot = cy + total_text_h // 2 + pad_y
# ── 3. Frosted glass card (PIL alpha_composite β€” correct & reliable) ───
card = Image.new("RGBA", (W, H), (0, 0, 0, 0))
cd = ImageDraw.Draw(card)
cd.rounded_rectangle([30, card_top, W - 30, card_bot], radius=26, fill=(20, 14, 42, 210))
cd.rounded_rectangle([30, card_top, 42, card_bot], radius=5, fill=(*accent_rgb, 230))
cd.rounded_rectangle([30, card_top, W - 30, card_bot], radius=26,
outline=(255, 255, 255, 35), width=1)
canvas = Image.alpha_composite(canvas, card)
draw = ImageDraw.Draw(canvas)
# ── 4. Text with drop-shadow ────────────────────────────────────────────
ty = cy - total_text_h // 2
for line in wrapped:
try:
bb = draw.textbbox((0, 0), line, font=font)
tw = bb[2] - bb[0]
except Exception:
tw = len(line) * (fsize // 2)
tx = (W - tw) // 2
draw.text((tx + 2, ty + 2), line, font=font, fill=(0, 0, 0, 170)) # shadow
draw.text((tx, ty), line, font=font, fill=(255, 255, 255, 255))
ty += line_h
# ── 5. Progress dots ────────────────────────────────────────────────────
dot_r, dot_gap = 5, 20
total_dots_w = total_slides * dot_gap
dx = (W - total_dots_w) // 2 + dot_r
dy = H - 68
for i in range(total_slides):
r = dot_r if i == slide_idx else dot_r - 2
fill = (*accent_rgb, 255) if i == slide_idx else (255, 255, 255, 55)
draw.ellipse([dx - r, dy - r, dx + r, dy + r], fill=fill)
dx += dot_gap
# ── 6. Slide counter (top-right, subtle) ────────────────────────────────
ctr = f"{slide_idx + 1} / {total_slides}"
try:
cb = draw.textbbox((0, 0), ctr, font=font_sm)
draw.text((W - (cb[2] - cb[0]) - 26, 26), ctr, font=font_sm, fill=(255, 255, 255, 85))
except Exception:
pass
# ── 7. Bake to RGB (no RGBA handed to MoviePy) ──────────────────────────
return np.array(canvas.convert("RGB"))
def _render_endcard(W: int, H: int, accent_rgb: tuple) -> np.ndarray:
"""Premium chainstreet.io endcard slide."""
ar, ag, ab = accent_rgb
rows = np.arange(H, dtype=np.float32) / H
bg = np.zeros((H, W, 4), dtype=np.uint8)
bg[:, :, 0] = np.clip(10 + (rows * min(ar // 2, 50))[:, None], 0, 60).astype(np.uint8)
bg[:, :, 1] = np.clip(8 + (rows * min(ag // 4, 25))[:, None], 0, 30).astype(np.uint8)
bg[:, :, 2] = np.clip(24 + (rows * min(ab // 3, 65))[:, None], 0, 90).astype(np.uint8)
bg[:, :, 3] = 255
canvas = Image.fromarray(bg, mode="RGBA")
# Radial glow
glow = Image.new("RGBA", (W, H), (0, 0, 0, 0))
gd = ImageDraw.Draw(glow)
cx, cy = W // 2, H // 2 - 100
for radius in range(260, 0, -14):
alpha = int(35 * (260 - radius) / 260)
gd.ellipse([cx - radius, cy - radius, cx + radius, cy + radius],
fill=(*accent_rgb, alpha))
canvas = Image.alpha_composite(canvas, glow)
draw = ImageDraw.Draw(canvas)
font_big = _get_reel_font(86)
font_med = _get_reel_font(44)
font_sm = _get_reel_font(28)
fsize_b = getattr(font_big, "size", 86)
# "chainstreet" + ".io" (accent)
brand, tld = "chainstreet", ".io"
try:
bb1 = draw.textbbox((0, 0), brand, font=font_big)
bb2 = draw.textbbox((0, 0), tld, font=font_big)
bw, tiw = bb1[2] - bb1[0], bb2[2] - bb2[0]
except Exception:
bw, tiw = len(brand) * (fsize_b // 2), len(tld) * (fsize_b // 2)
total_w = bw + tiw
tx = (W - total_w) // 2
ty = H // 2 - fsize_b - 20
draw.text((tx + 2, ty + 2), brand, font=font_big, fill=(0, 0, 0, 150))
draw.text((tx, ty), brand, font=font_big, fill=(255, 255, 255, 255))
draw.text((tx + bw + 2, ty + 2), tld, font=font_big, fill=(0, 0, 0, 150))
draw.text((tx + bw, ty), tld, font=font_big, fill=(*accent_rgb, 255))
# Subtitle
sub = "Your daily crypto briefing"
try:
sb = draw.textbbox((0, 0), sub, font=font_sm)
draw.text(((W - (sb[2] - sb[0])) // 2, H // 2 + 30), sub, font=font_sm,
fill=(255, 255, 255, 160))
except Exception:
pass
# CTA pill button
cta = "Follow for more β†—"
try:
cb = draw.textbbox((0, 0), cta, font=font_med)
cw, ch = cb[2] - cb[0], cb[3] - cb[1]
except Exception:
cw, ch = len(cta) * 22, 44
bx1, by1 = (W - cw) // 2 - 32, H // 2 + 100
bx2, by2 = bx1 + cw + 64, by1 + ch + 28
btn = Image.new("RGBA", (W, H), (0, 0, 0, 0))
bd = ImageDraw.Draw(btn)
bd.rounded_rectangle([bx1, by1, bx2, by2], radius=50, fill=(*accent_rgb, 255))
canvas = Image.alpha_composite(canvas, btn)
draw = ImageDraw.Draw(canvas)
draw.text(((W - cw) // 2, by1 + 14), cta, font=font_med, fill=(255, 255, 255, 255))
return np.array(canvas.convert("RGB"))
def _build_article_reel(sentences: list, audio_path: str, accent_hex: str) -> str:
"""Build article reel using concatenated full-RGB slides β€” no MoviePy alpha compositing."""
import moviepy.editor as mpe
W, H = VIDEO_W, VIDEO_H
try:
accent_rgb = tuple(int(accent_hex.lstrip("#")[i:i+2], 16) for i in (0, 2, 4))
except Exception:
accent_rgb = (108, 99, 255)
audio = mpe.AudioFileClip(audio_path)
total_dur = audio.duration
n = len(sentences)
# Sentence slides get 92% of audio; endcard gets rest (min 2s)
endcard_dur = max(total_dur * 0.08, 2.0)
dur_each = (total_dur - endcard_dur) / n if n > 0 else total_dur
clips = []
for i, sentence in enumerate(sentences):
frame = _render_slide(sentence, W, H, accent_rgb, i, n)
clips.append(mpe.ImageClip(frame).set_duration(dur_each))
# Endcard
endcard_frame = _render_endcard(W, H, accent_rgb)
clips.append(mpe.ImageClip(endcard_frame).set_duration(endcard_dur))
# Concatenate β€” simple, reliable, no alpha issues
final = mpe.concatenate_videoclips(clips, method="chain").set_audio(audio)
out = tempfile.mktemp(suffix=".mp4")
final.write_videofile(
out, codec="libx264", audio_codec="aac",
fps=FPS, preset="ultrafast", threads=4, logger=None,
)
return out
# Article Reels job store
article_jobs: dict = {}
@app.post("/api/article-reels")
async def api_article_reels(
background_tasks: BackgroundTasks,
article_url: str = Form(...),
reel_type: str = Form("Financial News Reel"),
num_points: int = Form(7),
tts_voice: str = Form("Guy - News Anchor (Male)"),
accent_hex: str = Form("#6C63FF"),
groq_api_key: str = Form(""),
):
"""Generate a viral reel from a news article URL with chainstreet.io CTA."""
if not article_url.strip():
raise HTTPException(status_code=400, detail="article_url is required")
job_id = str(uuid4())
article_jobs[job_id] = {
"job_id": job_id,
"status": "processing",
"progress": 0,
"message": "Starting…",
"source_url": article_url,
"output_path": None,
"error": None,
}
background_tasks.add_task(
_run_article_reel_job,
job_id, article_url, groq_api_key, reel_type,
num_points, tts_voice, accent_hex,
)
return {"job_id": job_id, "status": "processing", "progress": 0}
@app.get("/api/job/{job_id}")
async def get_job_by_id(job_id: str):
"""Unified job status β€” checks both video-clip jobs and article-reel jobs."""
if job_id in jobs_store:
return jobs_store[job_id].to_dict()
if job_id in article_jobs:
return article_jobs[job_id]
raise HTTPException(status_code=404, detail="Job not found")
@app.get("/api/article-download/{job_id}")
async def download_article_reel(job_id: str):
if job_id not in article_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = article_jobs[job_id]
if job["status"] != "completed" or not job.get("output_path"):
raise HTTPException(status_code=400, detail="Reel not ready")
path = Path(job["output_path"])
if not path.exists():
raise HTTPException(status_code=404, detail="Reel file missing")
return FileResponse(path, filename=f"article_reel_{job_id}.mp4", media_type="video/mp4")
def _run_article_reel_job(
job_id, article_url, groq_api_key, reel_type, num_points, tts_voice, accent_hex,
):
def upd(pct, msg):
article_jobs[job_id].update({"progress": int(pct * 100), "message": msg, "status": "processing"})
try:
upd(0.05, "Scraping article…")
content = _scrape_url(article_url)
if not content or len(content) < 40:
raise ValueError("Could not extract text from the URL. Try pasting content directly.")
upd(0.20, "Generating script with AI…")
sentences = _generate_reel_script(content, groq_api_key, reel_type, num_points)
if not sentences:
raise ValueError("Script generation failed β€” check your Groq API key.")
upd(0.45, "Generating AI voiceover…")
full_text = " ".join(sentences)
audio_path = _generate_voice(full_text, tts_voice)
upd(0.70, "Assembling reel…")
reel_path = _build_article_reel(sentences, audio_path, accent_hex)
article_jobs[job_id].update({
"status": "completed",
"progress": 100,
"message": "Reel ready!",
"output_path": reel_path,
"sentences": sentences,
})
except Exception as e:
import traceback
logger.error(traceback.format_exc())
article_jobs[job_id].update({"status": "failed", "error": str(e), "message": str(e)})
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""Global exception handler"""
logger.error(f"Unhandled exception: {str(exc)}")
return HTTPException(status_code=500, detail="Internal server error")
# ─────────────────────────────────────────────────────────────────────────────
# CATCH‑ALL SPA FALLBACK – must be last route
# ─────────────────────────────────────────────────────────────────────────────
@app.get("/{full_path:path}")
async def spa_fallback(full_path: str):
"""SPA catch‑all β€” serves index.html for React Router, but ignores API routes."""
# Do NOT interfere with API endpoints
if full_path.startswith("api/") or full_path.startswith("health") or full_path.startswith("docs"):
raise HTTPException(status_code=404, detail="Not found")
index_path = frontend_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
raise HTTPException(status_code=404, detail="Frontend not built")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860, workers=1)