# ── unbuffered stdout so container logs appear in real-time ────────────────── import sys sys.stdout.reconfigure(line_buffering=True) # ── stdlib ──────────────────────────────────────────────────────────────────── import asyncio import os import re import tempfile import textwrap import urllib.request from pathlib import Path # ── third-party ─────────────────────────────────────────────────────────────── import gradio as gr import numpy as np import requests import soundfile as sf from groq import Groq from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageEnhance # 🔧 FIX: make ANTIALIAS an alias for LANCZOS for compatibility with Pillow >= 10 if not hasattr(Image, 'ANTIALIAS'): Image.ANTIALIAS = Image.LANCZOS print("✓ all top-level imports done", flush=True) # ───────────────────────────────────────────────────────────────────────────── # CONSTANTS # ───────────────────────────────────────────────────────────────────────────── VIDEO_W = 720 VIDEO_H = 1280 FPS = 24 FONT_URL = "https://github.com/google/fonts/raw/main/ofl/montserrat/Montserrat-Bold.ttf" FONT_LIGHT_URL = "https://github.com/google/fonts/raw/main/ofl/montserrat/Montserrat-Regular.ttf" FONT_BOLD_PATH = "/tmp/Montserrat-Bold.ttf" FONT_LIGHT_PATH = "/tmp/Montserrat-Regular.ttf" # ───────────────────────────────────────────────────────────────────────────── # VOICE CONFIGS # ───────────────────────────────────────────────────────────────────────────── EDGE_VOICES = { "Guy - News Anchor (Male)": "en-US-GuyNeural", "Davis (Male)": "en-US-DavisNeural", "Ryan (Male)": "en-GB-RyanNeural", "William (Male)": "en-AU-WilliamNeural", "Aria (Female)": "en-US-AriaNeural", "Jenny (Female)": "en-US-JennyNeural", "Ana (Female)": "en-US-AnaNeural", "Sonia (Female)": "en-GB-SoniaNeural", "Natasha (Female)": "en-AU-NatashaNeural", "Neerja (Female)": "en-IN-NeerjaNeural", } KOKORO_VOICES = { "George (UK Male)": "bm_george", "Adam (US Male)": "am_adam", "Michael (US Male)": "am_michael", "Heart (US Female)": "af_heart", "Bella (US Female)": "af_bella", "Nicole (US Female)":"af_nicole", "Sarah (US Female)": "af_sarah", "Emma (UK Female)": "bf_emma", } # ───────────────────────────────────────────────────────────────────────────── # REEL TYPES & PROMPT TEMPLATES (factual, CTA‑enforced) # ───────────────────────────────────────────────────────────────────────────── REEL_TYPES = [ "Financial News Reel", "Top 5", "Fact", "Ranking", "Step by Step Guide", "Statistics", "Quiz", "Famous Quotes", "Product Demo", "Joke", "Blog to Reel", "Custom Prompt", ] REEL_PROMPTS = { "Financial News Reel": ( "You are a Bloomberg/WSJ-style breaking-news anchor writer.\n" "Create a fast-paced, professional financial-news reel script — urgent teaser style.\n" "Rules:\n" " 1. Line 1 MUST be a powerful shocking hook that stops the scroll.\n" " 2. Give only the high-level gist — withhold specific technical details, exact names,\n" " figures, and outcome so viewers MUST click through to the full story.\n" " 3. Tone: authoritative, slightly shocked, premium financial-media (Bloomberg/WSJ/CNBC).\n" " 4. Write exactly {n} lines. Each line: 13-16 words, spoken aloud in ~6 seconds.\n" " Together the {n} lines must total roughly 45 seconds of audio (~100 words total).\n" " 5. No bullet points, no numbering, no emojis, no hashtags — pure spoken sentences only.\n" " 6. End with one line that creates urgency or mystery. The final line MUST be: 'Read the full story at chainstreet dot i o.'\n" "Output ONLY the {n} lines, one per line, nothing else." ), "Top 5": ( "Create a viral 'Top 5' reel script based STRICTLY on the provided content.\n" "Do NOT invent facts. Only use information explicitly stated.\n" "Format: hook sentence, then exactly 5 items numbered 5 down to 1.\n" "Each line: 15-20 words. The LAST line must be a call-to-action with the source domain (e.g., 'Visit chainstreet.io for the full list').\n" "Output ONLY {n} lines total, one per line. No extra text." ), "Fact": ( "Create a FACTUAL reel script based STRICTLY on the provided news article or content.\n" "Do NOT invent statistics, quotes, or events. Only use information explicitly stated.\n" "Open with the most striking fact from the content as a hook.\n" "Each of the {n} lines must be a true, verifiable fact from the content (15-20 words per line).\n" "The LAST line MUST be a clear call-to-action that includes the source domain (e.g., 'Visit chainstreet.io for the full story' or 'Link in bio for more details').\n" "Output ONLY {n} lines, one per line. No extra text." ), "Ranking": ( "Create a RANKING reel script based STRICTLY on the provided content.\n" "Do not add opinions or rankings not present in the content.\n" "Start with a strong hook. Each item has a clear rank from best to worst.\n" "Every line: 15-20 words. The LAST line must be a call-to-action with the source domain.\n" "Output ONLY {n} lines, one per line. No extra text." ), "Step by Step Guide": ( "Create a STEP-BY-STEP GUIDE reel script using ONLY the information from the provided content.\n" "Hook first, then clear numbered steps (Step 1: …, Step 2: …).\n" "Every line: 15-20 words. The LAST line must be a call-to-action with the source domain.\n" "Output ONLY {n} lines, one per line. No extra text." ), "Statistics": ( "Create a STATISTICS reel script with data points from the provided content only.\n" "Do not invent numbers. Lead with the most striking statistic.\n" "Every line: 15-20 words, include a specific number or percentage from the content.\n" "The LAST line must be a call-to-action with the source domain.\n" "Output ONLY {n} lines, one per line. No extra text." ), "Quiz": ( "Create an interactive QUIZ reel script based on the provided content.\n" "Open with 'Can you answer these?' then pose {n} quiz questions based on facts from the content.\n" "Close the last line with a CTA that includes the source domain (e.g., 'Check your answers at chainstreet.io').\n" "Every line: 15-20 words. Output ONLY {n} lines, one per line. No extra text." ), "Famous Quotes": ( "Create a FAMOUS QUOTES reel script related to the provided content.\n" "Each line is an accurate quote followed by — Author Name. Only use quotes mentioned in the content.\n" "The LAST line must include a call-to-action with the source domain.\n" "Output ONLY {n} quote lines, one per line. No extra text." ), "Product Demo": ( "Create a PRODUCT/IDEA DEMO reel script using ONLY details from the provided content.\n" "Hook with the core problem, then explain the solution step by step.\n" "Close with a call-to-action that includes the source domain. Every line: 15-20 words.\n" "Output ONLY {n} lines, one per line. No extra text." ), "Joke": ( "Create a COMEDY reel script based on the provided content (if humorous) or general topic.\n" "Build-up plus punchline format. Each line: 15-18 words.\n" "The LAST line must be a call-to-action that includes the source domain.\n" "Output ONLY {n} lines, one per line. No extra text." ), "Blog to Reel": ( "Distill the provided news article or blog post into a factual, accurate reel script.\n" "Only use information from the article. Do not add opinions or fake data.\n" "Hook first, then key takeaways, and end with a CTA that includes the source domain.\n" "Each line: 15-20 words. Output ONLY {n} lines, one per line." ), "Custom Prompt": ( "You are a senior financial journalist. Write a FACTUAL, accurate short-form script based ONLY on the provided news content.\n" "Do NOT add speculation, invented numbers, or quotes not present in the content.\n" "Structure: strong hook → key conflict → stakes → rhetorical question → clear CTA.\n" "Each line: 15-20 words, authoritative and precise.\n" "The LAST line MUST be a call-to-action that includes the source domain (e.g., 'Full analysis at chainstreet.io – link in bio').\n" "Output exactly {n} lines, one per line. No extra text." ), } # ───────────────────────────────────────────────────────────────────────────── # FONT LOADER # ───────────────────────────────────────────────────────────────────────────── _fonts: dict[str, str] = {} def _dl_font(url: str, path: str) -> str | None: try: if not os.path.exists(path): with urllib.request.urlopen(url, timeout=10) as resp: data = resp.read() with open(path, "wb") as f: f.write(data) return path except Exception: pass fallbacks = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", ] for f in fallbacks: if os.path.exists(f): return f return None def get_font(size: int = 72, bold: bool = True) -> ImageFont.FreeTypeFont: key = "bold" if bold else "light" if key not in _fonts: path = _dl_font( FONT_BOLD_PATH if bold else FONT_LIGHT_PATH, FONT_BOLD_PATH if bold else FONT_LIGHT_PATH, ) if path is None: # Download if needed path = _dl_font(FONT_URL if bold else FONT_LIGHT_URL, FONT_BOLD_PATH if bold else FONT_LIGHT_PATH) _fonts[key] = path path = _fonts.get(key) try: return ImageFont.truetype(path, size) if path else ImageFont.load_default() except Exception: return ImageFont.load_default() # ───────────────────────────────────────────────────────────────────────────── # URL SCRAPING # ───────────────────────────────────────────────────────────────────────────── def scrape_url(url: str) -> str: import trafilatura # lazy — avoid slow startup try: dl = trafilatura.fetch_url(url) text = trafilatura.extract(dl, include_tables=False, include_comments=False, favor_recall=True) return (text or "").strip() except Exception as e: print(f"[scrape] {e}") return "" # ───────────────────────────────────────────────────────────────────────────── # SCRIPT GENERATION (returns list of sentences) # ───────────────────────────────────────────────────────────────────────────── def generate_script( content: str, groq_key: str, reel_type: str = "Fact", num_points: int = 5, ) -> list[str]: # If no key provided, try environment variable if not groq_key.strip(): groq_key = os.getenv("GROQ_API_KEY", "") if not groq_key: raise ValueError("Groq API key not provided and not found in environment variable GROQ_API_KEY.") client = Groq(api_key=groq_key.strip()) template = REEL_PROMPTS.get(reel_type, REEL_PROMPTS["Custom Prompt"]) system = template.format(n=num_points) resp = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[ {"role": "system", "content": system}, {"role": "user", "content": f"Content / Topic:\n{content[:3500]}"}, ], temperature=0.78, max_tokens=600, ) raw = resp.choices[0].message.content.strip() lines = [l.strip().lstrip("•-*0123456789. ") for l in raw.splitlines() if l.strip()] lines = lines[:num_points] # Ensure last line is a CTA referencing chainstreet.io (if not already present) if not any('chainstreet.io' in line.lower() for line in lines): lines.append("Read the full story at chainstreet.io.") return lines # ───────────────────────────────────────────────────────────────────────────── # AUDIO — Edge TTS (improved with retry and fallback) # ──────────────────────────────────────────────────────────────────────────── async def _edge_save(text: str, voice: str, path: str) -> None: import edge_tts # lazy — avoid slow startup await edge_tts.Communicate(text, voice).save(path) def generate_audio_edge(text: str, voice_display: str) -> str: # Ensure text is not empty if not text or not text.strip(): raise ValueError("Text for TTS is empty.") # Replace domain with spoken form text = text.replace("chainstreet.io", "chainstreet dot i o") text = text.replace("chainstreet.com", "chainstreet dot com") # Map the friendly name to the actual Edge TTS voice ID voice = EDGE_VOICES.get(voice_display) or voice_display or "en-US-GuyNeural" print(f"[Edge TTS] Generating audio with voice: {voice}") print(f"[Edge TTS] Text: {text[:200]}...") # log first 200 chars out = tempfile.mktemp(suffix=".mp3") # Try once with the selected voice; if it fails, fall back to the default voice. for attempt in range(2): try: asyncio.run(_edge_save(text, voice, out)) print("[Edge TTS] Audio generated successfully.") return out except Exception as e: print(f"[Edge TTS] Attempt {attempt+1} failed: {e}") if attempt == 0: # Fallback to the most reliable voice (Aria) voice = "en-US-AriaNeural" print("[Edge TTS] Retrying with default voice: en-US-AriaNeural") else: # If both attempts fail, raise the original exception raise raise RuntimeError("Edge TTS failed to generate audio after retries.") # ───────────────────────────────────────────────────────────────────────────── # AUDIO — Kokoro TTS # ───────────────────────────────────────────────────────────────────────────── _kokoro = None _kokoro_available = None def _check_kokoro() -> bool: global _kokoro_available if _kokoro_available is None: try: import kokoro # noqa: F401 _kokoro_available = True except ImportError: _kokoro_available = False return _kokoro_available def _get_kokoro(): global _kokoro if _kokoro is None: from kokoro import KPipeline _kokoro = KPipeline(lang_code="a") return _kokoro def generate_audio_kokoro(text: str, voice_display: str) -> str: if not _check_kokoro(): raise ImportError("Kokoro TTS not installed") # Replace domain with spoken form text = text.replace("chainstreet.io", "chainstreet dot i o") text = text.replace("chainstreet.com", "chainstreet dot com") voice = KOKORO_VOICES.get(voice_display) or voice_display or "bm_george" chunks = [a for _, _, a in _get_kokoro()(text, voice=voice, speed=1.05)] out = tempfile.mktemp(suffix=".wav") sf.write(out, np.concatenate(chunks), 24000) return out # ───────────────────────────────────────────────────────────────────────────── # PEXELS VIDEO (SMART KEYWORD EXTRACTION with NLTK) # ───────────────────────────────────────────────────────────────────────────── _STOP = { "the","a","an","is","are","was","were","be","been","have","has","had","do","does","did", "will","would","could","should","may","might","can","just","it","its","this","that", "these","those","i","me","my","we","us","our","you","your","he","him","she","her", "they","them","what","which","who","how","all","both","each","many","more","most", "some","no","not","only","so","than","too","very","because","as","of","at","by", "for","with","about","into","through","before","after","to","from","in","out","on", "off","over","under","then","here","there","when","where","why","same", } def extract_keyword(sentences: list[str]) -> str: # Use simple heuristics; if NLTK not available, fallback to old method try: import nltk # Ensure resources are downloaded (they should be) nltk.data.find('tokenizers/punkt') nltk.data.find('taggers/averaged_perceptron_tagger_eng') except (LookupError, ImportError): # Fallback: use previous simple method text = " ".join(sentences[:3]) words = re.sub(r"[^a-zA-Z\s]", "", text).lower().split() kws = [w for w in words if w not in _STOP and len(w) > 3] return " ".join(kws[:3]) if kws else "technology finance" text = " ".join(sentences) # Tokenize and tag tokens = nltk.word_tokenize(text) tagged = nltk.pos_tag(tokens) # Keep nouns (NN, NNS, NNP, NNPS) with length > 3 nouns = [word for word, pos in tagged if pos.startswith('NN') and len(word) > 3] # Remove stopwords stopwords = _STOP filtered = [w for w in nouns if w.lower() not in stopwords] if not filtered: return "technology finance" # Get the most common noun from collections import Counter counter = Counter(filtered) top = counter.most_common(1)[0][0] # Return top + second if exists if len(filtered) > 1: second = counter.most_common(2)[1][0] return f"{top} {second}" return top def fetch_pexels_video(query: str, api_key: str) -> str | None: if not api_key: api_key = os.getenv("PEXELS_API_KEY", "") if not api_key: print("[pexels] No API key provided and not found in environment variable PEXELS_API_KEY.") return None try: resp = requests.get( "https://api.pexels.com/videos/search", headers={"Authorization": api_key.strip()}, params={"query": query, "per_page": 8, "orientation": "portrait"}, timeout=15, ) for vid in resp.json().get("videos", []): files = sorted(vid.get("video_files", []), key=lambda x: x.get("width", 0)) portrait = [f for f in files if f.get("width", 9999) < f.get("height", 0)] chosen = (portrait or files or [None])[0] if chosen: r = requests.get(chosen["link"], stream=True, timeout=30) out = tempfile.mktemp(suffix=".mp4") with open(out, "wb") as f: for chunk in r.iter_content(8192): f.write(chunk) return out except Exception as e: print(f"[pexels] {e}") return None # ───────────────────────────────────────────────────────────────────────────── # LOGO HELPER (with error handling) # ───────────────────────────────────────────────────────────────────────────── def load_logo(logo_path: str | None, target_w: int = 200) -> Image.Image | None: if not logo_path: return None try: # Check if file exists and is readable if not os.path.isfile(logo_path): print(f"[logo] File not found: {logo_path}") return None logo = Image.open(logo_path).convert("RGBA") ratio = target_w / logo.width logo = logo.resize((target_w, int(logo.height * ratio)), Image.LANCZOS) print(f"[logo] Loaded from {logo_path}, size: {logo.size}", flush=True) return logo except Exception as e: print(f"[logo] Error loading logo: {e}", flush=True) return None def paste_logo(base: Image.Image, logo: Image.Image | None, position: str = "top-right") -> Image.Image: if logo is None: return base W, H = base.size lw, lh = logo.size pad = 30 positions = { "top-right" : (W - lw - pad, pad), "top-left" : (pad, pad), "bottom-right": (W - lw - pad, H - lh - pad), "bottom-left" : (pad, H - lh - pad), } x, y = positions.get(position, positions["top-right"]) base = base.copy() base.paste(logo, (x, y), logo) return base # ───────────────────────────────────────────────────────────────────────────── # TEXT FRAME RENDERER (NO BACKGROUND, NO SHADOW) # ───────────────────────────────────────────────────────────────────────────── def render_text_frame( text: str, width: int, height: int, logo: Image.Image | None = None, logo_pos: str = "top-right", accent_color: tuple = (124, 58, 237), show_bg: bool = False, ) -> np.ndarray: img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) font = get_font(72) # Word-wrap words, lines, cur = text.split(), [], [] for word in words: test = " ".join(cur + [word]) bbox = draw.textbbox((0, 0), test, font=font) if bbox[2] - bbox[0] > width * 0.84 and cur: lines.append(" ".join(cur)); cur = [word] else: cur.append(word) if cur: lines.append(" ".join(cur)) line_h = 88 total_text_h = len(lines) * line_h pad = 44 box_y1 = (height - total_text_h) // 2 - pad box_y2 = (height + total_text_h) // 2 + pad # Only draw background if explicitly requested (default is False) if show_bg: overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0)) od = ImageDraw.Draw(overlay) od.rounded_rectangle([20, box_y1, width - 20, box_y2], radius=22, fill=(0, 0, 0, 120)) od.rectangle([20, box_y1, 26, box_y2], fill=(*accent_color, 230)) img = Image.alpha_composite(img, overlay) draw = ImageDraw.Draw(img) y = (height - total_text_h) // 2 for line in lines: bbox = draw.textbbox((0, 0), line, font=font) text_w = bbox[2] - bbox[0] x = (width - text_w) // 2 # Draw text without shadow (pure white) draw.text((x, y), line, font=font, fill=(255, 255, 255, 255)) y += line_h if logo: img = paste_logo(img, logo, logo_pos) return np.array(img) # ───────────────────────────────────────────────────────────────────────────── # VIDEO ASSEMBLY (main reel) # ───────────────────────────────────────────────────────────────────────────── def _fit_bg(clip, W: int, H: int): if clip.w / clip.h > W / H: clip = clip.resize(height=H) else: clip = clip.resize(width=W) if clip.w > W: clip = clip.crop(x_center=clip.w / 2, width=W) if clip.h > H: clip = clip.crop(y_center=clip.h / 2, height=H) return clip def create_reel( sentences: list[str], audio_path: str, bg_video_path: str | None, logo_path: str | None = None, logo_pos: str = "top-right", accent_hex: str = "#7c3aed", show_caption_bg: bool = False, ) -> str: import moviepy.editor as mpe W, H = VIDEO_W, VIDEO_H try: accent_hex = accent_hex.lstrip("#") accent_color = tuple(int(accent_hex[i:i+2], 16) for i in (0, 2, 4)) except Exception: accent_color = (124, 58, 237) logo = load_logo(logo_path) print(f"[logo] Loaded: {logo is not None} from {logo_path}", flush=True) audio = mpe.AudioFileClip(audio_path) total_dur = audio.duration dur_each = total_dur / len(sentences) print(f"[create_reel] Audio duration: {total_dur:.2f}s | Sentences: {len(sentences)} | Each segment: {dur_each:.2f}s", flush=True) # Background handling (loop if needed) if bg_video_path: bg = mpe.VideoFileClip(bg_video_path, audio=False) bg = _fit_bg(bg, W, H) # Loop the background video if it's shorter than the total duration if bg.duration < total_dur: loops = int(np.ceil(total_dur / bg.duration)) + 1 bg = mpe.concatenate_videoclips([bg] * loops) bg = bg.subclip(0, total_dur) # Add a dark overlay to make text pop dark = mpe.ColorClip((W, H), color=[0,0,0]).set_opacity(0.45).set_duration(total_dur) bg_layer = mpe.CompositeVideoClip([bg, dark]) else: # Generate a gradient background frame = np.zeros((H, W, 3), dtype=np.uint8) r1, g1, b1 = (accent_color[0]//3, accent_color[1]//3, accent_color[2]//2) for i in range(H): t = i / H frame[i, :] = [int(r1 + t*20), int(g1 + t*5), int(b1 + t*60)] bg_layer = mpe.ImageClip(frame).set_duration(total_dur) # Create text overlays text_clips = [] for i, sentence in enumerate(sentences): arr = render_text_frame(sentence, W, H, logo, logo_pos, accent_color, show_bg=show_caption_bg) tc = ( mpe.ImageClip(arr) .set_start(i * dur_each) .set_duration(dur_each) .crossfadein(0.25) .crossfadeout(0.1) ) text_clips.append(tc) final = mpe.CompositeVideoClip([bg_layer] + text_clips).set_audio(audio) out = tempfile.mktemp(suffix=".mp4") final.write_videofile(out, codec="libx264", audio_codec="aac", fps=FPS, preset="ultrafast", threads=4, logger=None) return out # ───────────────────────────────────────────────────────────────────────────── # NEWS ANCHOR MODE (unchanged) # ───────────────────────────────────────────────────────────────────────────── def _draw_lower_third( draw: ImageDraw.Draw, font_bold: ImageFont.FreeTypeFont, font_light: ImageFont.FreeTypeFont, name: str, title: str, W: int, H: int, accent_color: tuple, ) -> None: bar_h = 90 bar_y = H - 240 draw.rectangle([0, bar_y, W, bar_y + bar_h], fill=(*accent_color, 220)) if name: draw.text((30, bar_y + 8), name.upper(), font=font_bold, fill=(255, 255, 255, 255)) if title: draw.text((30, bar_y + 50), title, font=font_light, fill=(220, 220, 220, 220)) def _draw_ticker( draw: ImageDraw.Draw, font: ImageFont.FreeTypeFont, ticker_text: str, W: int, H: int, frame_num: int, scroll_speed: int = 4, ) -> None: ticker_h = 52 bar_y = H - ticker_h draw.rectangle([0, bar_y, W, H], fill=(20, 20, 20, 230)) text_x = W - (frame_num * scroll_speed % (W + len(ticker_text) * 14)) draw.text((text_x, bar_y + 8), f" ● {ticker_text} ● {ticker_text} ● {ticker_text}", font=font, fill=(255, 215, 0, 255)) def process_anchor_video( anchor_video_path: str, bg_choice: str, pexels_key: str, news_topic: str, anchor_name: str, anchor_title: str, ticker_text: str, logo_path: str | None, accent_hex: str, progress, ) -> str: import moviepy.editor as mpe try: accent_hex = accent_hex.lstrip("#") accent_color = tuple(int(accent_hex[i:i+2], 16) for i in (0, 2, 4)) except Exception: accent_color = (5, 38, 120) progress(0.1, desc="📹 Loading anchor video…") clip = mpe.VideoFileClip(anchor_video_path) W, H = VIDEO_W, VIDEO_H dur = clip.duration font_bold = get_font(36, bold=True) font_light = get_font(28, bold=False) ticker_font = get_font(24, bold=False) logo = load_logo(logo_path) progress(0.2, desc="🎨 Preparing background…") if bg_choice == "Blur original": bg_clip = clip.fl_image(lambda frame: np.array( Image.fromarray(frame).filter(ImageFilter.GaussianBlur(radius=20)) )) bg_clip = _fit_bg(bg_clip, W, H) elif bg_choice == "News studio (dark)": studio_frame = _make_studio_bg(W, H, dark=True, accent_color=accent_color) bg_clip = mpe.ImageClip(studio_frame).set_duration(dur) elif bg_choice == "News studio (light)": studio_frame = _make_studio_bg(W, H, dark=False, accent_color=accent_color) bg_clip = mpe.ImageClip(studio_frame).set_duration(dur) elif bg_choice == "Pexels" and pexels_key.strip(): kw = news_topic or "news studio" pex_path = fetch_pexels_video(kw, pexels_key) if pex_path: pex_clip = mpe.VideoFileClip(pex_path, audio=False) pex_clip = _fit_bg(pex_clip, W, H) if pex_clip.duration < dur: loops = int(np.ceil(dur / pex_clip.duration)) + 1 pex_clip = mpe.concatenate_videoclips([pex_clip] * loops) bg_clip = pex_clip.subclip(0, dur) else: studio_frame = _make_studio_bg(W, H, dark=True, accent_color=accent_color) bg_clip = mpe.ImageClip(studio_frame).set_duration(dur) else: frame = np.zeros((H, W, 3), dtype=np.uint8) frame[:, :] = accent_color bg_clip = mpe.ImageClip(frame).set_duration(dur) progress(0.45, desc="✂️ Compositing anchor…") anchor_w = int(W * 0.72) anchor_h = int(anchor_w * clip.h / clip.w) if anchor_h > int(H * 0.72): anchor_h = int(H * 0.72) anchor_w = int(anchor_h * clip.w / clip.h) anchor_clip = clip.resize(width=anchor_w) ax = (W - anchor_w) // 2 ay = int(H * 0.08) anchor_clip = anchor_clip.set_position((ax, ay)) progress(0.6, desc="🖼️ Adding news graphics…") def add_news_overlay(get_frame, t): frame = get_frame(t) img = Image.fromarray(frame).convert("RGBA") if logo: img = paste_logo(img, logo, "top-left") draw = ImageDraw.Draw(img) _draw_lower_third(draw, font_bold, font_light, anchor_name, anchor_title, W, H, accent_color) if ticker_text.strip(): fn = int(t * FPS) _draw_ticker(draw, ticker_font, ticker_text, W, H, fn) import datetime ts = datetime.datetime.now().strftime("%I:%M %p") draw.text((W - 130, 20), ts, font=get_font(28), fill=(255,255,255,200)) return np.array(img.convert("RGB")) composite = mpe.CompositeVideoClip( [bg_clip.set_duration(dur), anchor_clip], size=(W, H) ) final = composite.fl(add_news_overlay, apply_to=["mask", "video"]) if clip.audio: final = final.set_audio(clip.audio) progress(0.8, desc="🎞️ Rendering final video…") out = tempfile.mktemp(suffix=".mp4") final.write_videofile(out, codec="libx264", audio_codec="aac", fps=min(FPS, clip.fps or FPS), preset="ultrafast", threads=4, logger=None) return out def _make_studio_bg(W: int, H: int, dark: bool = True, accent_color: tuple = (5, 38, 120)) -> np.ndarray: frame = np.zeros((H, W, 3), dtype=np.uint8) base = (12, 14, 32) if dark else (230, 235, 248) frame[:, :] = base img = Image.fromarray(frame, "RGB") draw = ImageDraw.Draw(img) for i in range(0, H, 40): draw.rectangle([0, i, W, i + 20], fill=(*accent_color[:3],)) ow, oh = 420, 420 ox, oy = W // 2 - ow // 2, H // 2 - oh // 2 draw.ellipse([ox, oy, ox + ow, oy + oh], outline=(*accent_color, 60), width=3) draw.ellipse([ox + 40, oy + 40, ox + ow - 40, oy + oh - 40], outline=(*accent_color, 30), width=2) draw.rectangle([0, H - 300, W, H - 250], fill=accent_color) return np.array(img) # ───────────────────────────────────────────────────────────────────────────── # MAIN PIPELINE — Step 1: Generate Script only # ───────────────────────────────────────────────────────────────────────────── def generate_script_only( url_or_text, groq_key, reel_type, num_points, progress=gr.Progress(), ): try: # Fallback to environment variable if needed if not groq_key.strip(): groq_key = os.getenv("GROQ_API_KEY", "") if not groq_key: return "", "❌ **Groq API key required.** Please provide it in the textbox or set the GROQ_API_KEY secret." progress(0.05, desc="🔍 Fetching content…") raw = url_or_text.strip() if raw.startswith("http"): content = scrape_url(raw) if not content or len(content) < 60: return "", "❌ Could not extract text from that URL. Try pasting the article text directly." else: content = raw if len(content) < 20: return "", "❌ Please enter a URL or a text topic." progress(0.2, desc=f"✍️ Writing '{reel_type}' script with Llama 3.3…") sentences = generate_script(content, groq_key, reel_type, int(num_points)) if not sentences: return "", "❌ Script generation failed. Check your Groq API key." script_text = "\n".join(sentences) script_md = "\n\n".join(f"**{i+1}.** {s}" for i, s in enumerate(sentences)) progress(1.0, desc="✅ Script generated!") return script_text, f"## ✅ Script Ready!\n\n**Type:** {reel_type}\n\n**Edit below, then click 'Create Video'.**\n\n{script_md}" except Exception as e: import traceback return "", f"❌ **Error:** {str(e)}\n\n```\n{traceback.format_exc()}\n```" # ───────────────────────────────────────────────────────────────────────────── # MAIN PIPELINE — Step 2: Create Video from edited script + optional outro # ───────────────────────────────────────────────────────────────────────────── def create_video_from_script( script_text, groq_key, pexels_key, tts_engine, edge_voice, kokoro_voice, logo_file, logo_pos, accent_hex, show_caption_bg, outro_video, progress=gr.Progress(), ): try: # Fallback to environment variable if needed if not groq_key.strip(): groq_key = os.getenv("GROQ_API_KEY", "") if not groq_key: return None, "❌ **Groq API key required.** Please provide it in the textbox or set the GROQ_API_KEY secret." if not pexels_key.strip(): pexels_key = os.getenv("PEXELS_API_KEY", "") # Split the script text into lines sentences = [line.strip() for line in script_text.strip().split("\n") if line.strip()] if not sentences: return None, "❌ Script is empty. Please generate a script first." full_script = " ".join(sentences) if not full_script.strip(): return None, "❌ Generated script is empty. Please check your content or try again." print(f"[create_video] Full script length: {len(full_script)} chars") # Log sentences for debugging print("[create_video] Sentences:") for idx, s in enumerate(sentences): print(f" {idx+1}. {s} ({len(s)} chars)") using_kokoro = tts_engine == "Kokoro TTS" if using_kokoro and not _check_kokoro(): using_kokoro = False progress(0.40, desc=f"🎙️ Generating voice — {'Kokoro' if using_kokoro else 'Edge TTS'}…") if using_kokoro: try: audio_path = generate_audio_kokoro(full_script, kokoro_voice) except Exception as e: print(f"[kokoro fallback] {e}") audio_path = generate_audio_edge(full_script, "en-US-AriaNeural") else: audio_path = generate_audio_edge(full_script, edge_voice) bg_video = None if pexels_key.strip(): progress(0.60, desc="🎬 Fetching background video…") kw = extract_keyword(sentences) print(f"[pexels] Searching for: {kw}", flush=True) bg_video = fetch_pexels_video(kw, pexels_key) progress(0.75, desc="🎞️ Assembling main reel…") logo_path = logo_file if isinstance(logo_file, str) else (logo_file.name if logo_file else None) # Auto‑detect logo from root directory if nothing was uploaded if not logo_path: for _n in ["logo.png", "logo.jpg", "logo.PNG", "logo.JPG"]: _p = _n if os.path.exists(_p): logo_path = _p print(f"[auto-logo] Using {_p}", flush=True) break main_reel_path = create_reel(sentences, audio_path, bg_video, logo_path, logo_pos, accent_hex, show_caption_bg) # ── Stitch exit video — use uploaded file OR auto‑detect from root ── _auto_outro = None for _n in ["exit.mp4", "exit_video.mp4", "outro.mp4", "exit.mov", "outro.mov"]: _p = _n if os.path.exists(_p): _auto_outro = _p print(f"[auto-outro] Using {_p}", flush=True) break effective_outro = (outro_video if isinstance(outro_video, str) else (outro_video.name if outro_video else None)) or _auto_outro outro_error = False if effective_outro: progress(0.85, desc="➕ Stitching exit video…") import moviepy.editor as mpe outro_path = effective_outro try: # Check if file exists and is readable if not os.path.isfile(outro_path): print(f"[outro] File not found: {outro_path}") raise FileNotFoundError(f"Outro file not found: {outro_path}") # Try to open with moviepy outro_clip = mpe.VideoFileClip(outro_path) print(f"[outro] Successfully loaded {outro_path}") # Resize outro to match main reel dimensions W, H = VIDEO_W, VIDEO_H if outro_clip.w / outro_clip.h > W / H: outro_clip = outro_clip.resize(height=H) else: outro_clip = outro_clip.resize(width=W) if outro_clip.w > W: outro_clip = outro_clip.crop(x_center=outro_clip.w / 2, width=W) if outro_clip.h > H: outro_clip = outro_clip.crop(y_center=outro_clip.h / 2, height=H) # Load main reel and concatenate main_clip = mpe.VideoFileClip(main_reel_path) final_clip = mpe.concatenate_videoclips([main_clip, outro_clip], method="compose") final_path = tempfile.mktemp(suffix=".mp4") final_clip.write_videofile(final_path, codec="libx264", audio_codec="aac", fps=FPS, preset="ultrafast", threads=4, logger=None) # Clean up intermediate file os.unlink(main_reel_path) main_reel_path = final_path progress(0.95, desc="✅ Outro added") except Exception as e: print(f"[outro] Error processing outro video: {e}") # Continue without outro, but show a warning in the output progress(0.95, desc="⚠️ Outro skipped (file error)") outro_error = True else: print("[outro] No outro file found, skipping", flush=True) script_md = "\n\n".join(f"**{i+1}.** {s}" for i, s in enumerate(sentences)) progress(1.0, desc="✅ Video ready!") status_msg = f"## ✅ Reel Ready!\n\n**Script:**\n\n{script_md}" if outro_error: status_msg += "\n\n⚠️ **Note:** Outro video was skipped because it could not be read (file may be corrupted). Please re‑upload the outro video if needed." return main_reel_path, status_msg except Exception as e: import traceback return None, f"❌ **Error:** {str(e)}\n\n```\n{traceback.format_exc()}\n```" # ───────────────────────────────────────────────────────────────────────────── # ANCHOR PIPELINE (unchanged) # ───────────────────────────────────────────────────────────────────────────── def anchor_pipeline( anchor_video, bg_choice, pexels_key, news_topic, anchor_name, anchor_title, ticker_text, logo_file, accent_hex, progress=gr.Progress(), ): try: if anchor_video is None: return None, "❌ Please upload your anchor video first." video_path = anchor_video if isinstance(anchor_video, str) else anchor_video.name logo_path = logo_file if isinstance(logo_file, str) else (logo_file.name if logo_file else None) out = process_anchor_video( video_path, bg_choice, pexels_key, news_topic, anchor_name, anchor_title, ticker_text, logo_path, accent_hex, progress, ) return out, "## ✅ News Anchor Reel Ready!" except Exception as e: import traceback return None, f"❌ **Error:** {str(e)}\n\n```\n{traceback.format_exc()}\n```" # ───────────────────────────────────────────────────────────────────────────── # GRADIO UI # ───────────────────────────────────────────────────────────────────────────── CSS = """ footer { display: none !important; } .gen-btn { background: linear-gradient(135deg,#7c3aed,#a855f7) !important; border:none!important; color:#fff!important; } .anchor-btn { background: linear-gradient(135deg,#0a2648,#1d6fa4) !important; border:none!important; color:#fff!important; } """ with gr.Blocks(title="🎬 AI Reels Maker", css=CSS, theme=gr.themes.Soft()) as demo: gr.Markdown( "# 🎬 AI Reels Maker\n" "Create viral video reels from any URL or topic — free, no watermark" ) # ════════════════════════════════════════════════════════════════════════ # TAB 1 — Reel Generator (now with editable script and outro) # ════════════════════════════════════════════════════════════════════════ with gr.Tab("🎬 Reel Generator"): with gr.Row(): # Left: inputs with gr.Column(scale=1): url_input = gr.Textbox( label="📎 URL or Topic", placeholder="Paste a news article URL, blog post URL, or type a topic…", lines=3, ) reel_type = gr.Dropdown( choices=REEL_TYPES, value="Financial News Reel", label="🎯 Reel Type", ) with gr.Accordion("⚙️ API Keys", open=True): with gr.Row(): groq_key = gr.Textbox(label="🔑 Groq API Key", type="password", placeholder="gsk_… (or leave blank if set as secret)") pexels_key = gr.Textbox(label="🎥 Pexels API Key", type="password", placeholder="Optional — leave blank if set as secret") with gr.Accordion("🎙️ Voice", open=True): tts_engine = gr.Radio( choices=["Edge TTS", "Kokoro TTS"], value="Edge TTS", label="Voice Engine", info="Edge TTS = always available, natural & fast | Kokoro = open-source expressive (auto-falls back to Edge TTS if not installed)", ) with gr.Row(): edge_voice = gr.Dropdown(choices=[(k,v) for k,v in EDGE_VOICES.items()], value="en-US-GuyNeural", label="Edge TTS Voice", visible=True) kokoro_voice = gr.Dropdown(choices=[(k,v) for k,v in KOKORO_VOICES.items()], value="bm_george", label="Kokoro Voice", visible=False) with gr.Accordion("🎨 Branding", open=False): # Auto‑load logo from root if it exists logo_default = "logo.png" if os.path.exists("logo.png") else None logo_file = gr.File( label="📤 Upload Logo (PNG/JPG)", file_types=["image"], type="filepath", value=logo_default ) logo_pos = gr.Dropdown( choices=["top-right","top-left","bottom-right","bottom-left"], value="top-right", label="Logo Position" ) accent_hex = gr.ColorPicker(value="#d4af37", label="Accent Color") # gold default # 🔧 CHANGE: default caption background to False (no card, no gold accent) show_caption_bg = gr.Checkbox(label="Show caption background", value=False, info="Toggle the semi-transparent card behind text (unchecked = clean text)") # Auto‑load outro from root if it exists outro_default = "exit.mp4" if os.path.exists("exit.mp4") else None outro_video = gr.File( label="🎬 Optional Outro Video (plays after the reel)", file_types=["video"], type="filepath", value=outro_default ) num_points = gr.Slider(3, 8, value=7, step=1, label="📊 Number of points (7 = ~45 sec)") gen_btn = gr.Button("📝 Generate Script", variant="primary", size="lg", elem_classes=["gen-btn"]) create_btn = gr.Button("🎬 Create Video", variant="secondary", size="lg") # Right: output with gr.Column(scale=1): script_editor = gr.TextArea( label="✏️ Edit Script (one sentence per line)", lines=8, interactive=True, placeholder="Generated script will appear here…\nYou can edit each line before creating the video." ) video_out = gr.Video(label="🎬 Your Reel", height=560) script_out = gr.Markdown() # Toggle voice dropdowns tts_engine.change( lambda e: (gr.update(visible=e=="Edge TTS"), gr.update(visible=e=="Kokoro TTS")), inputs=tts_engine, outputs=[edge_voice, kokoro_voice], ) # Generate script → fill the textarea gen_btn.click( generate_script_only, inputs=[url_input, groq_key, reel_type, num_points], outputs=[script_editor, script_out], ) # Create video → use the edited script and optional outro create_btn.click( create_video_from_script, inputs=[script_editor, groq_key, pexels_key, tts_engine, edge_voice, kokoro_voice, logo_file, logo_pos, accent_hex, show_caption_bg, outro_video], outputs=[video_out, script_out], ) # ════════════════════════════════════════════════════════════════════════ # TAB 2 — News Anchor Mode (unchanged) # ════════════════════════════════════════════════════════════════════════ with gr.Tab("📺 News Anchor Mode"): gr.Markdown( "**Upload your anchor video** and the app will:\n" "- Replace or style the background (blur / news studio / Pexels video / solid)\n" "- Add a professional **lower-third** with your name & title\n" "- Add a **scrolling news ticker** at the bottom\n" "- Overlay your **channel logo**" ) with gr.Row(): with gr.Column(scale=1): anchor_video = gr.Video(label="📹 Upload Anchor Video (MP4/MOV)") bg_choice = gr.Dropdown( choices=["Blur original","News studio (dark)","News studio (light)","Pexels","Solid color"], value="News studio (dark)", label="🎨 Background Style", ) a_pexels_key = gr.Textbox(label="🎥 Pexels API Key", type="password", placeholder="Required if Background = Pexels") news_topic = gr.Textbox(label="🔎 Pexels search keyword", placeholder="e.g. 'city news'", visible=False) with gr.Accordion("🪪 Name & Title (Lower Third)", open=True): anchor_name = gr.Textbox(label="Anchor Name", placeholder="Jane Doe") anchor_title = gr.Textbox(label="Anchor Title", placeholder="Senior Correspondent") ticker_text = gr.Textbox( label="📰 Ticker Text (scrolls at bottom)", placeholder="Breaking News: Enter your headline here… | More updates coming soon…", ) # Auto‑load logo from root for anchor mode too a_logo_default = "logo.png" if os.path.exists("logo.png") else None a_logo_file = gr.File(label="📤 Upload Channel Logo", file_types=["image"], type="filepath", value=a_logo_default) a_accent_hex = gr.ColorPicker(value="#052680", label="Accent / Brand Color") anchor_btn = gr.Button("📺 Process Anchor Video", variant="primary", size="lg", elem_classes=["anchor-btn"]) with gr.Column(scale=1): anchor_video_out = gr.Video(label="📺 Processed Reel", height=560) anchor_status = gr.Markdown() bg_choice.change( lambda c: gr.update(visible=c == "Pexels"), inputs=bg_choice, outputs=news_topic, ) anchor_btn.click( anchor_pipeline, inputs=[anchor_video, bg_choice, a_pexels_key, news_topic, anchor_name, anchor_title, ticker_text, a_logo_file, a_accent_hex], outputs=[anchor_video_out, anchor_status], ) # ════════════════════════════════════════════════════════════════════════ # FOOTER # ════════════════════════════════════════════════════════════════════════ gr.Markdown(""" --- ### 🔑 Free API keys | Service | Purpose | Link | |---------|---------|------| | **Groq** *(required)* | AI script generation — Llama 3.3-70B | [console.groq.com](https://console.groq.com) | | **Pexels** *(optional)* | Free HD stock video backgrounds | [pexels.com/api](https://www.pexels.com/api/) | **Secrets**: Set `GROQ_API_KEY` and `PEXELS_API_KEY` as Hugging Face secrets to avoid typing them each time. **Persistent files**: Upload your logo and outro video once to the Space's root folder, then name them `logo.png` and `exit.mp4` – they will be automatically loaded on restart. """) os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "False") print("✓ UI built, launching Gradio server…", flush=True) demo.launch(server_name="0.0.0.0", server_port=7860)