| | import os |
| | import uuid |
| | import httpx |
| | import torch |
| | import logging |
| | import re |
| | import json |
| | import asyncio |
| | from typing import Dict, Optional, List, Union |
| | from fastapi import FastAPI, Request, BackgroundTasks, HTTPException, Depends |
| | from fastapi.responses import JSONResponse |
| | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| | from transformers import AutoTokenizer, AutoModelForCausalLM |
| | import uvicorn |
| | from contextlib import asynccontextmanager |
| |
|
| | |
| | MODEL_ID = "google/gemma-1.1-2b-it" |
| | HF_TOKEN = os.getenv("HF_TOKEN", "") |
| | API_KEY = os.getenv("API_KEY", "default-key-123") |
| | MAX_TOKENS = 450 |
| | DEVICE = "cpu" |
| | PORT = int(os.getenv("PORT", 7860)) |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | security = HTTPBearer() |
| |
|
| | |
| | jobs: Dict[str, dict] = {} |
| |
|
| | class ScriptGenerator: |
| | def __init__(self): |
| | self.tokenizer = None |
| | self.model = None |
| | self.loaded = False |
| | self.load_error = None |
| | |
| | def load_model(self): |
| | if self.loaded: |
| | return True |
| | |
| | logger.info("Loading model...") |
| | try: |
| | self.tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, token=HF_TOKEN) |
| | logger.info("β
Tokenizer loaded") |
| | |
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | MODEL_ID, |
| | torch_dtype=torch.float32, |
| | token=HF_TOKEN, |
| | device_map=None |
| | ) |
| | |
| | self.model = self.model.to(DEVICE) |
| | self.model.eval() |
| | |
| | self.loaded = True |
| | logger.info("β
Model loaded successfully") |
| | return True |
| | |
| | except Exception as e: |
| | self.load_error = str(e) |
| | logger.error(f"β Model loading failed: {str(e)}") |
| | return False |
| |
|
| | |
| | generator = ScriptGenerator() |
| |
|
| | async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): |
| | """Verify API key""" |
| | if credentials.credentials != API_KEY: |
| | raise HTTPException(status_code=401, detail="Invalid API key") |
| | return True |
| |
|
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | logger.info("π API Server starting up...") |
| | yield |
| |
|
| | app = FastAPI(lifespan=lifespan) |
| |
|
| | def extract_topics(topic_input: Union[str, List[str]]) -> List[str]: |
| | """Extract and validate topics from input""" |
| | if isinstance(topic_input, str): |
| | try: |
| | |
| | parsed = json.loads(topic_input) |
| | if isinstance(parsed, list): |
| | return [str(topic).strip() for topic in parsed if str(topic).strip()] |
| | return [str(parsed).strip()] |
| | except json.JSONDecodeError: |
| | |
| | if "," in topic_input: |
| | return [topic.strip() for topic in topic_input.split(",") if topic.strip()] |
| | return [topic_input.strip()] |
| | elif isinstance(topic_input, list): |
| | return [str(topic).strip() for topic in topic_input if str(topic).strip()] |
| | |
| | return [] |
| |
|
| | def generate_topic_from_trends(trending_topics: List[str]) -> str: |
| | """Generate a viral topic based on trending topics""" |
| | if not generator.loaded: |
| | if not generator.load_model(): |
| | raise Exception(f"Model failed to load: {generator.load_error}") |
| | |
| | logger.info(f"π§ Generating viral topic from trends: {trending_topics}") |
| | |
| | prompt = ( |
| | f"Based on these 5 trending topics: {', '.join(trending_topics)}\n\n" |
| | "Create ONE highly engaging, viral topic for a YouTube/TikTok short video that:\n" |
| | "1. Combines elements from these trends in a creative way\n" |
| | "2. Has high viral potential (emotional, surprising, or controversial)\n" |
| | "3. Is suitable for a 60-second video format\n" |
| | "4. Appeals to a broad audience\n" |
| | "5. Focus on informative video title (not promotion, not service, not event, not product, not sale, not challenging)\n" |
| | "6. Is specific enough to be interesting but broad enough to allow creative interpretation\n\n" |
| | "Respond with ONLY the topic (no explanations, no bullet points, no numbering).\n" |
| | "The topic should be 5-10 words maximum.\n\n" |
| | "VIRAL TOPIC:" |
| | ) |
| | |
| | inputs = generator.tokenizer( |
| | prompt, |
| | return_tensors="pt", |
| | truncation=True, |
| | max_length=512 |
| | ) |
| | |
| | inputs = {k: v.to(DEVICE) for k, v in inputs.items()} |
| | |
| | with torch.no_grad(): |
| | outputs = generator.model.generate( |
| | **inputs, |
| | max_new_tokens=100, |
| | do_sample=True, |
| | top_p=0.9, |
| | temperature=0.8, |
| | pad_token_id=generator.tokenizer.eos_token_id, |
| | repetition_penalty=1.1 |
| | ) |
| | |
| | generated_text = generator.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| | topic = generated_text.replace(prompt, "").strip() |
| | |
| | |
| | topic = re.split(r'[\n\.]', topic)[0].strip() |
| | topic = re.sub(r'^["\'](.*)["\']$', r'\1', topic) |
| | |
| | logger.info(f"π― Generated topic: '{topic}'") |
| | return topic |
| |
|
| | def clean_generated_script(script: str, prompt: str) -> str: |
| | """Clean up the generated script to remove prompt remnants and instructions""" |
| | |
| | if prompt in script: |
| | script = script.replace(prompt, "") |
| | |
| | |
| | patterns_to_remove = [ |
| | r'CRITICAL REQUIREMENTS:.*?(\n\n|$)', |
| | r'SCRIPT STRUCTURE:.*?(\n\n|$)', |
| | r'VISUAL DESCRIPTION GUIDELINES:.*?(\n\n|$)', |
| | r'VOICEOVER GUIDELINES:.*?(\n\n|$)', |
| | r'EXAMPLE FORMAT:.*?(\n\n|$)', |
| | r'NOW CREATE SCRIPT FOR:.*?(\n\n|$)', |
| | r'ONLY RETURN THE SCRIPT CONTENT.*?(\n\n|$)', |
| | r'IMPORTANT: ONLY generate.*?(\n\n|$)', |
| | r'BEGIN SCRIPT:.*?(\n\n|$)', |
| | r'NO PROMOTIONAL CONTENT.*?(\n\n|$)', |
| | r'FOCUS ON EDUCATIONAL VALUE.*?(\n\n|$)', |
| | r'FOCUS ON INFORMATIVE CONTENT.*?(\n\n|$)', |
| | r'FOCUS ON MEANINGFUL VIDEO SCRIPT.*?(\n\n|$)', |
| | ] |
| | |
| | for pattern in patterns_to_remove: |
| | script = re.sub(pattern, '', script, flags=re.DOTALL | re.IGNORECASE) |
| | |
| | |
| | promotional_patterns = [ |
| | r'visit our (website|app|page)', |
| | r'download (the|our) app', |
| | r'check out our (product|service)', |
| | r'buy now', |
| | r'sign up', |
| | r'click the link', |
| | r'in the description below', |
| | r'link in bio', |
| | r'use code.*?', |
| | r'promo code', |
| | r'discount code', |
| | ] |
| | |
| | for pattern in promotional_patterns: |
| | script = re.sub(pattern, '', script, flags=re.IGNORECASE) |
| | |
| | |
| | script = re.sub(r'\n\s*\n', '\n\n', script) |
| | |
| | return script.strip() |
| |
|
| | def generate_script(topic: str) -> str: |
| | """Generate high-quality video script""" |
| | try: |
| | if not generator.loaded: |
| | if not generator.load_model(): |
| | raise Exception(f"Model failed to load: {generator.load_error}") |
| | |
| | clean_topic = topic.strip().strip("['").strip("']").strip('"').strip("'") |
| | logger.info(f"π― Generating script for: '{clean_topic}'") |
| | |
| | |
| | prompt = ( |
| | f"IMPORTANT: Create a purely informative 60-second YouTube/TikTok video script about: {clean_topic}\n\n" |
| | "CRITICAL REQUIREMENTS:\n" |
| | "- Total duration: 60 seconds exactly with clear timestamps\n" |
| | "- Each scene must have BOTH visual description AND voiceover text\n" |
| | "- Visual descriptions should be specific, searchable keywords for stock videos\n" |
| | "- Voiceover should be conversational, educational, and engaging\n" |
| | "- NO personal introductions ('I'm...', 'My name is...')\n" |
| | "- NO promotional content (no apps, websites, products, or services)\n" |
| | "- NO calls to action (no 'visit our', 'download', 'buy now', 'sign up')\n" |
| | "- Focus on educational value and useful information only\n" |
| | "- Provide practical tips, facts, or insights that viewers can use immediately\n\n" |
| | |
| | "SCRIPT STRUCTURE:\n" |
| | "[0:00-0:08] VISUAL: [Attention-grabbing visual - dramatic/curious imagery]\n" |
| | "VOICEOVER: [8-second hook that creates curiosity and grabs attention]\n\n" |
| | |
| | "[0:08-0:45] VISUAL: [Action-oriented visuals demonstrating the topic]\n" |
| | "VOICEOVER: [37-second valuable content with key insights, facts, and practical tips]\n\n" |
| | |
| | "[0:45-0:55] VISUAL: [Transformation/result visual showing benefits]\n" |
| | "VOICEOVER: [10-second summary of key benefits and value]\n\n" |
| | |
| | "[0:55-1:00] VISUAL: [Inspiring visual that reinforces the main message]\n" |
| | "VOICEOVER: [5-second inspiring closing thought]\n\n" |
| | |
| | "VOICEOVER GUIDELINES:\n" |
| | "- Focus on viewer benefits and valuable information\n" |
| | "- Include surprising facts, statistics, or insights\n" |
| | "- Use conversational, engaging tone\n" |
| | "- End with an inspiring thought, not a call to action\n\n" |
| | |
| | "NOW CREATE A PURELY INFORMATIVE SCRIPT FOR: {clean_topic}\n\n" |
| | "SCRIPT:\n" |
| | ) |
| | |
| | inputs = generator.tokenizer( |
| | prompt, |
| | return_tensors="pt", |
| | truncation=True, |
| | max_length=512 |
| | ) |
| | |
| | inputs = {k: v.to(DEVICE) for k, v in inputs.items()} |
| | |
| | with torch.no_grad(): |
| | outputs = generator.model.generate( |
| | **inputs, |
| | max_new_tokens=MAX_TOKENS, |
| | do_sample=True, |
| | top_p=0.9, |
| | temperature=0.8, |
| | pad_token_id=generator.tokenizer.eos_token_id, |
| | repetition_penalty=1.1 |
| | ) |
| | |
| | |
| | full_output = generator.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| | |
| | |
| | script_content = clean_generated_script(full_output, prompt) |
| | |
| | |
| | if not script_content or len(script_content) < 50: |
| | if "SCRIPT:" in full_output: |
| | script_content = full_output.split("SCRIPT:")[-1].strip() |
| | else: |
| | script_content = full_output.replace(prompt, "").strip() |
| | |
| | |
| | script_content = re.sub(r'(visit|download|buy|sign up|check out).*?\.', '', script_content, flags=re.IGNORECASE) |
| | script_content = re.sub(r'link (in|below).*?', '', script_content, flags=re.IGNORECASE) |
| | |
| | logger.info(f"π Generated {len(script_content)} characters") |
| | return script_content |
| | |
| | except Exception as e: |
| | logger.error(f"β Script generation failed: {str(e)}") |
| | raise |
| |
|
| | async def process_job(job_id: str, topics_input: Union[str, List[str]], callback_url: str = None): |
| | """Background task to process job""" |
| | try: |
| | |
| | topics = extract_topics(topics_input) |
| | if len(topics) < 3: |
| | raise HTTPException(status_code=400, detail="At least 3 topics are required") |
| | |
| | logger.info(f"π― Processing {len(topics)} topics: {topics}") |
| | |
| | |
| | generated_topic = generate_topic_from_trends(topics) |
| | |
| | |
| | script = generate_script(generated_topic) |
| | |
| | |
| | jobs[job_id] = { |
| | "status": "complete", |
| | "result": script, |
| | "original_topics": topics, |
| | "generated_topic": generated_topic, |
| | "script_length": len(script), |
| | "formatted": True |
| | } |
| | |
| | logger.info(f"β
Completed job {job_id}") |
| | |
| | |
| | if callback_url: |
| | try: |
| | |
| | webhook_data = { |
| | "job_id": job_id, |
| | "status": "complete", |
| | "result": script, |
| | "topic": generated_topic, |
| | "script_length": len(script), |
| | "formatted": True, |
| | "original_topics": topics |
| | } |
| | |
| | |
| | logger.info(f"π¨ Sending webhook to: {callback_url}") |
| | |
| | async with httpx.AsyncClient(timeout=30.0) as client: |
| | response = await client.post( |
| | callback_url, |
| | json=webhook_data, |
| | headers={"Content-Type": "application/json"} |
| | ) |
| | |
| | if response.status_code >= 200 and response.status_code < 300: |
| | logger.info(f"β
Webhook delivered successfully: {response.status_code}") |
| | else: |
| | logger.warning(f"β οΈ Webhook returned non-2xx status: {response.status_code} - {response.text}") |
| | |
| | except Exception as e: |
| | logger.error(f"β Webhook failed: {str(e)}") |
| | |
| | except Exception as e: |
| | error_msg = f"Job failed: {str(e)}" |
| | logger.error(f"β Job {job_id} failed: {error_msg}") |
| | |
| | |
| | jobs[job_id] = { |
| | "status": "failed", |
| | "error": error_msg, |
| | "topics": extract_topics(topics_input) if topics_input else [] |
| | } |
| | |
| | |
| | if callback_url: |
| | try: |
| | async with httpx.AsyncClient(timeout=10.0) as client: |
| | await client.post( |
| | callback_url, |
| | json={ |
| | "job_id": job_id, |
| | "status": "failed", |
| | "error": error_msg, |
| | "topics": extract_topics(topics_input) if topics_input else [] |
| | }, |
| | headers={"Content-Type": "application/json"} |
| | ) |
| | except Exception as e: |
| | logger.error(f"Failed to send error webhook: {e}") |
| |
|
| | @app.post("/api/submit") |
| | async def submit_job( |
| | request: Request, |
| | background_tasks: BackgroundTasks, |
| | auth: bool = Depends(verify_api_key) |
| | ): |
| | """Endpoint to submit new job""" |
| | try: |
| | data = await request.json() |
| | job_id = str(uuid.uuid4()) |
| | |
| | |
| | if not data.get("topics"): |
| | raise HTTPException(status_code=400, detail="Topics are required") |
| | |
| | callback_url = data.get("callback_url") |
| | topics_input = data["topics"] |
| | topics = extract_topics(topics_input) |
| | |
| | if len(topics) < 3: |
| | raise HTTPException(status_code=400, detail="At least 3 topics are required") |
| | |
| | logger.info(f"π₯ Received job {job_id} with {len(topics)} topics: {topics}") |
| | |
| | |
| | jobs[job_id] = { |
| | "status": "processing", |
| | "callback_url": callback_url, |
| | "topics": topics |
| | } |
| | |
| | |
| | background_tasks.add_task( |
| | process_job, |
| | job_id, |
| | topics_input, |
| | callback_url |
| | ) |
| | |
| | return JSONResponse({ |
| | "job_id": job_id, |
| | "status": "queued", |
| | "topics": topics, |
| | "estimated_time": "90-120 seconds", |
| | "message": "Topic generation and script creation started" |
| | }) |
| | |
| | except Exception as e: |
| | logger.error(f"β Submission error: {str(e)}") |
| | raise HTTPException(status_code=400, detail=str(e)) |
| |
|
| | @app.get("/api/status/{job_id}") |
| | async def get_status(job_id: str, auth: bool = Depends(verify_api_key)): |
| | """Check job status""" |
| | if job_id not in jobs: |
| | raise HTTPException(status_code=404, detail="Job not found") |
| | |
| | return JSONResponse(jobs[job_id]) |
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | """Health check endpoint""" |
| | completed_jobs = [job for job in jobs.values() if job.get("status") == "complete"] |
| | avg_length = sum(job.get("script_length", 0) for job in completed_jobs) / max(1, len(completed_jobs)) |
| | |
| | return JSONResponse({ |
| | "status": "healthy", |
| | "model_loaded": generator.loaded, |
| | "total_jobs": len(jobs), |
| | "completed_jobs": len(completed_jobs), |
| | "failed_jobs": sum(1 for job in jobs.values() if job.get("status") == "failed"), |
| | "average_script_length": round(avg_length, 2) |
| | }) |
| |
|
| | @app.get("/test/generation") |
| | async def test_generation(auth: bool = Depends(verify_api_key)): |
| | """Test script generation""" |
| | try: |
| | if not generator.loaded: |
| | if not generator.load_model(): |
| | return JSONResponse({"status": "error", "error": "Model failed to load"}) |
| | |
| | test_topics = [ |
| | "Home workout", |
| | "Healthy meal prep", |
| | "Yoga for beginners" |
| | ] |
| | |
| | logger.info(f"π§ͺ Testing topic generation with: {test_topics}") |
| | |
| | |
| | generated_topic = generate_topic_from_trends(test_topics) |
| | |
| | |
| | script = generate_script(generated_topic) |
| | |
| | return JSONResponse({ |
| | "status": "success", |
| | "test_topics": test_topics, |
| | "generated_topic": generated_topic, |
| | "script_length": len(script), |
| | "script_preview": script[:300] + "..." if len(script) > 300 else script, |
| | "estimated_duration": "60 seconds", |
| | "quality": "good" if len(script) >= 200 else "needs improvement" |
| | }) |
| | |
| | except Exception as e: |
| | logger.error(f"β Test generation failed: {str(e)}") |
| | return JSONResponse({"status": "error", "error": str(e)}) |
| |
|
| | @app.get("/") |
| | async def root(): |
| | """Root endpoint""" |
| | return JSONResponse({ |
| | "message": "Video Script Generator API", |
| | "version": "2.0", |
| | "features": "Generates viral topics from trends and creates informative video scripts", |
| | "endpoints": { |
| | "submit_job": "POST /api/submit (with 'topics' array)", |
| | "check_status": "GET /api/status/{job_id}", |
| | "health": "GET /health", |
| | "test_generation": "GET /test/generation" |
| | }, |
| | "status": "operational" |
| | }) |
| |
|
| | if __name__ == "__main__": |
| | uvicorn.run( |
| | app, |
| | host="0.0.0.0", |
| | port=PORT, |
| | log_level="info" |
| | ) |