import os import uuid import httpx import torch import logging import re import json import asyncio from typing import Dict, Optional, List, Union from fastapi import FastAPI, Request, BackgroundTasks, HTTPException, Depends from fastapi.responses import JSONResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from transformers import AutoTokenizer, AutoModelForCausalLM import uvicorn from contextlib import asynccontextmanager # Configuration MODEL_ID = "google/gemma-1.1-2b-it" HF_TOKEN = os.getenv("HF_TOKEN", "") API_KEY = os.getenv("API_KEY", "default-key-123") MAX_TOKENS = 450 DEVICE = "cpu" PORT = int(os.getenv("PORT", 7860)) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Security security = HTTPBearer() # Job storage jobs: Dict[str, dict] = {} class ScriptGenerator: def __init__(self): self.tokenizer = None self.model = None self.loaded = False self.load_error = None def load_model(self): if self.loaded: return True logger.info("Loading model...") try: self.tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, token=HF_TOKEN) logger.info("โœ… Tokenizer loaded") self.model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.float32, token=HF_TOKEN, device_map=None ) self.model = self.model.to(DEVICE) self.model.eval() self.loaded = True logger.info("โœ… Model loaded successfully") return True except Exception as e: self.load_error = str(e) logger.error(f"โŒ Model loading failed: {str(e)}") return False # Global generator instance generator = ScriptGenerator() async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify API key""" if credentials.credentials != API_KEY: raise HTTPException(status_code=401, detail="Invalid API key") return True @asynccontextmanager async def lifespan(app: FastAPI): logger.info("๐Ÿš€ API Server starting up...") yield app = FastAPI(lifespan=lifespan) def extract_topics(topic_input: Union[str, List[str]]) -> List[str]: """Extract and validate topics from input""" if isinstance(topic_input, str): try: # Try to parse as JSON if it's a string parsed = json.loads(topic_input) if isinstance(parsed, list): return [str(topic).strip() for topic in parsed if str(topic).strip()] return [str(parsed).strip()] except json.JSONDecodeError: # If not JSON, treat as comma-separated string if "," in topic_input: return [topic.strip() for topic in topic_input.split(",") if topic.strip()] return [topic_input.strip()] elif isinstance(topic_input, list): return [str(topic).strip() for topic in topic_input if str(topic).strip()] return [] def generate_topic_from_trends(trending_topics: List[str]) -> str: """Generate a viral topic based on trending topics""" if not generator.loaded: if not generator.load_model(): raise Exception(f"Model failed to load: {generator.load_error}") logger.info(f"๐Ÿง  Generating viral topic from trends: {trending_topics}") prompt = ( f"Based on these 5 trending topics: {', '.join(trending_topics)}\n\n" "Create ONE highly engaging, viral topic for a YouTube/TikTok short video that:\n" "1. Combines elements from these trends in a creative way\n" "2. Has high viral potential (emotional, surprising, or controversial)\n" "3. Is suitable for a 60-second video format\n" "4. Appeals to a broad audience\n" "5. Focus on informative video title (not promotion, not service, not event, not product, not sale, not challenging)\n" "6. Is specific enough to be interesting but broad enough to allow creative interpretation\n\n" "Respond with ONLY the topic (no explanations, no bullet points, no numbering).\n" "The topic should be 5-10 words maximum.\n\n" "VIRAL TOPIC:" ) inputs = generator.tokenizer( prompt, return_tensors="pt", truncation=True, max_length=512 ) inputs = {k: v.to(DEVICE) for k, v in inputs.items()} with torch.no_grad(): outputs = generator.model.generate( **inputs, max_new_tokens=100, do_sample=True, top_p=0.9, temperature=0.8, pad_token_id=generator.tokenizer.eos_token_id, repetition_penalty=1.1 ) generated_text = generator.tokenizer.decode(outputs[0], skip_special_tokens=True) topic = generated_text.replace(prompt, "").strip() # Clean up the topic topic = re.split(r'[\n\.]', topic)[0].strip() topic = re.sub(r'^["\'](.*)["\']$', r'\1', topic) # Remove surrounding quotes logger.info(f"๐ŸŽฏ Generated topic: '{topic}'") return topic def clean_generated_script(script: str, prompt: str) -> str: """Clean up the generated script to remove prompt remnants and instructions""" # Remove the prompt if it's included if prompt in script: script = script.replace(prompt, "") # Remove common instruction patterns patterns_to_remove = [ r'CRITICAL REQUIREMENTS:.*?(\n\n|$)', r'SCRIPT STRUCTURE:.*?(\n\n|$)', r'VISUAL DESCRIPTION GUIDELINES:.*?(\n\n|$)', r'VOICEOVER GUIDELINES:.*?(\n\n|$)', r'EXAMPLE FORMAT:.*?(\n\n|$)', r'NOW CREATE SCRIPT FOR:.*?(\n\n|$)', r'ONLY RETURN THE SCRIPT CONTENT.*?(\n\n|$)', r'IMPORTANT: ONLY generate.*?(\n\n|$)', r'BEGIN SCRIPT:.*?(\n\n|$)', r'NO PROMOTIONAL CONTENT.*?(\n\n|$)', r'FOCUS ON EDUCATIONAL VALUE.*?(\n\n|$)', r'FOCUS ON INFORMATIVE CONTENT.*?(\n\n|$)', r'FOCUS ON MEANINGFUL VIDEO SCRIPT.*?(\n\n|$)', ] for pattern in patterns_to_remove: script = re.sub(pattern, '', script, flags=re.DOTALL | re.IGNORECASE) # Remove promotional content (apps, websites, products) promotional_patterns = [ r'visit our (website|app|page)', r'download (the|our) app', r'check out our (product|service)', r'buy now', r'sign up', r'click the link', r'in the description below', r'link in bio', r'use code.*?', r'promo code', r'discount code', ] for pattern in promotional_patterns: script = re.sub(pattern, '', script, flags=re.IGNORECASE) # Remove multiple empty lines script = re.sub(r'\n\s*\n', '\n\n', script) return script.strip() def generate_script(topic: str) -> str: """Generate high-quality video script""" try: if not generator.loaded: if not generator.load_model(): raise Exception(f"Model failed to load: {generator.load_error}") clean_topic = topic.strip().strip("['").strip("']").strip('"').strip("'") logger.info(f"๐ŸŽฏ Generating script for: '{clean_topic}'") # IMPROVED PROMPT - Focus on informative content, no promotions prompt = ( f"IMPORTANT: Create a purely informative 60-second YouTube/TikTok video script about: {clean_topic}\n\n" "CRITICAL REQUIREMENTS:\n" "- Total duration: 60 seconds exactly with clear timestamps\n" "- Each scene must have BOTH visual description AND voiceover text\n" "- Visual descriptions should be specific, searchable keywords for stock videos\n" "- Voiceover should be conversational, educational, and engaging\n" "- NO personal introductions ('I'm...', 'My name is...')\n" "- NO promotional content (no apps, websites, products, or services)\n" "- NO calls to action (no 'visit our', 'download', 'buy now', 'sign up')\n" "- Focus on educational value and useful information only\n" "- Provide practical tips, facts, or insights that viewers can use immediately\n\n" "SCRIPT STRUCTURE:\n" "[0:00-0:08] VISUAL: [Attention-grabbing visual - dramatic/curious imagery]\n" "VOICEOVER: [8-second hook that creates curiosity and grabs attention]\n\n" "[0:08-0:45] VISUAL: [Action-oriented visuals demonstrating the topic]\n" "VOICEOVER: [37-second valuable content with key insights, facts, and practical tips]\n\n" "[0:45-0:55] VISUAL: [Transformation/result visual showing benefits]\n" "VOICEOVER: [10-second summary of key benefits and value]\n\n" "[0:55-1:00] VISUAL: [Inspiring visual that reinforces the main message]\n" "VOICEOVER: [5-second inspiring closing thought]\n\n" "VOICEOVER GUIDELINES:\n" "- Focus on viewer benefits and valuable information\n" "- Include surprising facts, statistics, or insights\n" "- Use conversational, engaging tone\n" "- End with an inspiring thought, not a call to action\n\n" "NOW CREATE A PURELY INFORMATIVE SCRIPT FOR: {clean_topic}\n\n" "SCRIPT:\n" ) inputs = generator.tokenizer( prompt, return_tensors="pt", truncation=True, max_length=512 ) inputs = {k: v.to(DEVICE) for k, v in inputs.items()} with torch.no_grad(): outputs = generator.model.generate( **inputs, max_new_tokens=MAX_TOKENS, do_sample=True, top_p=0.9, temperature=0.8, pad_token_id=generator.tokenizer.eos_token_id, repetition_penalty=1.1 ) # Extract only the generated part full_output = generator.tokenizer.decode(outputs[0], skip_special_tokens=True) # Clean up - remove the prompt and get only the script content script_content = clean_generated_script(full_output, prompt) # If cleaning removed too much, fallback to basic extraction if not script_content or len(script_content) < 50: if "SCRIPT:" in full_output: script_content = full_output.split("SCRIPT:")[-1].strip() else: script_content = full_output.replace(prompt, "").strip() # Final cleanup to ensure no promotional content script_content = re.sub(r'(visit|download|buy|sign up|check out).*?\.', '', script_content, flags=re.IGNORECASE) script_content = re.sub(r'link (in|below).*?', '', script_content, flags=re.IGNORECASE) logger.info(f"๐Ÿ“ Generated {len(script_content)} characters") return script_content except Exception as e: logger.error(f"โŒ Script generation failed: {str(e)}") raise async def process_job(job_id: str, topics_input: Union[str, List[str]], callback_url: str = None): """Background task to process job""" try: # Extract and validate topics topics = extract_topics(topics_input) if len(topics) < 3: raise HTTPException(status_code=400, detail="At least 3 topics are required") logger.info(f"๐ŸŽฏ Processing {len(topics)} topics: {topics}") # Step 1: Generate a viral topic from the trends generated_topic = generate_topic_from_trends(topics) # Step 2: Generate script based on the created topic script = generate_script(generated_topic) # Store job results jobs[job_id] = { "status": "complete", "result": script, "original_topics": topics, "generated_topic": generated_topic, "script_length": len(script), "formatted": True } logger.info(f"โœ… Completed job {job_id}") # Send webhook callback if URL provided if callback_url: try: # Prepare the webhook data with proper structure webhook_data = { "job_id": job_id, "status": "complete", "result": script, "topic": generated_topic, "script_length": len(script), "formatted": True, "original_topics": topics } # Log what we're sending logger.info(f"๐Ÿ“จ Sending webhook to: {callback_url}") async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( callback_url, json=webhook_data, headers={"Content-Type": "application/json"} ) if response.status_code >= 200 and response.status_code < 300: logger.info(f"โœ… Webhook delivered successfully: {response.status_code}") else: logger.warning(f"โš ๏ธ Webhook returned non-2xx status: {response.status_code} - {response.text}") except Exception as e: logger.error(f"โŒ Webhook failed: {str(e)}") except Exception as e: error_msg = f"Job failed: {str(e)}" logger.error(f"โŒ Job {job_id} failed: {error_msg}") # Store failure information jobs[job_id] = { "status": "failed", "error": error_msg, "topics": extract_topics(topics_input) if topics_input else [] } # Send failure webhook if callback URL exists if callback_url: try: async with httpx.AsyncClient(timeout=10.0) as client: await client.post( callback_url, json={ "job_id": job_id, "status": "failed", "error": error_msg, "topics": extract_topics(topics_input) if topics_input else [] }, headers={"Content-Type": "application/json"} ) except Exception as e: logger.error(f"Failed to send error webhook: {e}") @app.post("/api/submit") async def submit_job( request: Request, background_tasks: BackgroundTasks, auth: bool = Depends(verify_api_key) ): """Endpoint to submit new job""" try: data = await request.json() job_id = str(uuid.uuid4()) # Validate input if not data.get("topics"): raise HTTPException(status_code=400, detail="Topics are required") callback_url = data.get("callback_url") topics_input = data["topics"] topics = extract_topics(topics_input) if len(topics) < 3: raise HTTPException(status_code=400, detail="At least 3 topics are required") logger.info(f"๐Ÿ“ฅ Received job {job_id} with {len(topics)} topics: {topics}") # Store initial job data jobs[job_id] = { "status": "processing", "callback_url": callback_url, "topics": topics } # Process job in background background_tasks.add_task( process_job, job_id, topics_input, callback_url ) return JSONResponse({ "job_id": job_id, "status": "queued", "topics": topics, "estimated_time": "90-120 seconds", "message": "Topic generation and script creation started" }) except Exception as e: logger.error(f"โŒ Submission error: {str(e)}") raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/status/{job_id}") async def get_status(job_id: str, auth: bool = Depends(verify_api_key)): """Check job status""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") return JSONResponse(jobs[job_id]) @app.get("/health") async def health_check(): """Health check endpoint""" completed_jobs = [job for job in jobs.values() if job.get("status") == "complete"] avg_length = sum(job.get("script_length", 0) for job in completed_jobs) / max(1, len(completed_jobs)) return JSONResponse({ "status": "healthy", "model_loaded": generator.loaded, "total_jobs": len(jobs), "completed_jobs": len(completed_jobs), "failed_jobs": sum(1 for job in jobs.values() if job.get("status") == "failed"), "average_script_length": round(avg_length, 2) }) @app.get("/test/generation") async def test_generation(auth: bool = Depends(verify_api_key)): """Test script generation""" try: if not generator.loaded: if not generator.load_model(): return JSONResponse({"status": "error", "error": "Model failed to load"}) test_topics = [ "Home workout", "Healthy meal prep", "Yoga for beginners" ] logger.info(f"๐Ÿงช Testing topic generation with: {test_topics}") # Test topic generation generated_topic = generate_topic_from_trends(test_topics) # Test script generation script = generate_script(generated_topic) return JSONResponse({ "status": "success", "test_topics": test_topics, "generated_topic": generated_topic, "script_length": len(script), "script_preview": script[:300] + "..." if len(script) > 300 else script, "estimated_duration": "60 seconds", "quality": "good" if len(script) >= 200 else "needs improvement" }) except Exception as e: logger.error(f"โŒ Test generation failed: {str(e)}") return JSONResponse({"status": "error", "error": str(e)}) @app.get("/") async def root(): """Root endpoint""" return JSONResponse({ "message": "Video Script Generator API", "version": "2.0", "features": "Generates viral topics from trends and creates informative video scripts", "endpoints": { "submit_job": "POST /api/submit (with 'topics' array)", "check_status": "GET /api/status/{job_id}", "health": "GET /health", "test_generation": "GET /test/generation" }, "status": "operational" }) if __name__ == "__main__": uvicorn.run( app, host="0.0.0.0", port=PORT, log_level="info" )