Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from google import genai | |
| import tempfile | |
| import os | |
| import time | |
| import json | |
| from typing import Optional | |
| import pandas as pd | |
| import logging | |
| # Backend API Key Configuration | |
| GEMINI_API_KEY = os.getenv("GEMENI_KEY") | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="Video Analyser and Script Generator", | |
| page_icon="🎥", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def configure_gemini(): | |
| """Configure Gemini API with backend key""" | |
| return genai.Client(api_key=GEMINI_API_KEY) | |
| # The main prompt template with structured output requirements | |
| SYSTEM_PROMPT = f"""{os.getenv(SYS_PROMPT)}""" | |
| def analyze_video_and_generate_script( | |
| video_bytes, | |
| video_name, | |
| offer_details: str = "", | |
| target_audience: str = "", | |
| specific_hooks: str = "", | |
| additional_context: str = "" | |
| ): | |
| """ | |
| Analyze video and generate direct response script variations | |
| """ | |
| try: | |
| # Save uploaded video to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_name)[1]) as tmp_file: | |
| tmp_file.write(video_bytes) | |
| tmp_file_path = tmp_file.name | |
| # Configure Gemini | |
| client = configure_gemini() | |
| # Show upload progress | |
| upload_progress = st.progress(0) | |
| upload_status = st.empty() | |
| upload_status.text("Uploading video to Google AI...") | |
| upload_progress.progress(20) | |
| # Upload video to Gemini | |
| video_file_obj = client.files.upload(file=tmp_file_path) | |
| upload_progress.progress(40) | |
| upload_status.text("Processing video...") | |
| while video_file_obj.state.name == "PROCESSING": | |
| time.sleep(2) | |
| video_file_obj = client.files.get(name=video_file_obj.name) | |
| upload_progress.progress(60) | |
| if video_file_obj.state.name == "FAILED": | |
| upload_status.error("Google AI file processing failed. Please try another video.") | |
| return None | |
| upload_progress.progress(80) | |
| upload_status.text("Generating script variations...") | |
| # Build the user prompt with additional context | |
| user_prompt = f"""Analyze this reference video and generate 3 high-converting direct response video script variations. | |
| ADDITIONAL CONTEXT: | |
| - Offer Details: {offer_details if offer_details else 'Extract from video'} | |
| - Target Audience: {target_audience if target_audience else 'Determine from video content'} | |
| - Specific Hooks to Consider: {specific_hooks if specific_hooks else 'Create based on video analysis'} | |
| - Additional Context: {additional_context} | |
| Please analyze the video's: | |
| 1. Hook strategy and opening seconds | |
| 2. Pacing and visual transitions | |
| 3. Claims and promises made | |
| 4. Authority elements used | |
| 5. Urgency/scarcity tactics | |
| 6. CTA approach | |
| Then create 2-3 script variations that would perform 30-50% cheaper while maintaining conversion effectiveness. | |
| IMPORTANT: Return only valid JSON in the exact format specified in the system prompt. Do not include any text before or after the JSON.""" | |
| # Generate response | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=[video_file_obj, user_prompt + "\n\n" + SYSTEM_PROMPT] | |
| ) | |
| upload_progress.progress(100) | |
| upload_status.success("Analysis complete!") | |
| # Clean up temporary file | |
| os.unlink(tmp_file_path) | |
| # Parse JSON response | |
| try: | |
| response_text = response.text.strip() | |
| if response_text.startswith('```json'): | |
| response_text = response_text[7:-3] | |
| elif response_text.startswith('```'): | |
| response_text = response_text[3:-3] | |
| json_response = json.loads(response_text) | |
| return json_response | |
| except json.JSONDecodeError as e: | |
| st.error(f"Error parsing AI response: {str(e)}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error processing video: {str(e)}") | |
| return None | |
| def display_script_variations(json_data): | |
| """Display script variations in formatted tables""" | |
| if not json_data or "script_variations" not in json_data: | |
| st.error("No script variations found in the response") | |
| return | |
| for i, variation in enumerate(json_data["script_variations"], 1): | |
| variation_name = variation.get("variation_name", f"Variation {i}") | |
| st.subheader(variation_name) | |
| # Convert script table to DataFrame for better display | |
| script_data = variation.get("script_table", []) | |
| if script_data: | |
| df = pd.DataFrame(script_data) | |
| # Rename columns for better display | |
| column_mapping = { | |
| 'timestamp': 'Timestamp', | |
| 'script_voiceover': 'Script / Voiceover', | |
| 'visual_direction': 'Visual Direction', | |
| 'psychological_trigger': 'Psychological Trigger', | |
| 'cta_action': 'CTA / Action' | |
| } | |
| df = df.rename(columns=column_mapping) | |
| # Display as interactive table | |
| st.dataframe( | |
| df, | |
| use_container_width=True, | |
| hide_index=True, | |
| column_config={ | |
| "Timestamp": st.column_config.TextColumn(width="small"), | |
| "Script / Voiceover": st.column_config.TextColumn(width="large"), | |
| "Visual Direction": st.column_config.TextColumn(width="large"), | |
| "Psychological Trigger": st.column_config.TextColumn(width="medium"), | |
| "CTA / Action": st.column_config.TextColumn(width="medium") | |
| } | |
| ) | |
| else: | |
| st.warning(f"No script data available for {variation_name}") | |
| st.divider() | |
| def display_video_analysis(json_data): | |
| """Display video analysis in formatted sections""" | |
| if not json_data or "video_analysis" not in json_data: | |
| st.error("No video analysis found in the response") | |
| return | |
| analysis = json_data["video_analysis"] | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Effectiveness Factors") | |
| st.write(analysis.get('effectiveness_factors', 'N/A')) | |
| st.subheader("Target Audience") | |
| st.write(analysis.get('target_audience', 'N/A')) | |
| with col2: | |
| st.subheader("Psychological Triggers") | |
| st.write(analysis.get('psychological_triggers', 'N/A')) | |
| st.subheader("Improvement Recommendations") | |
| st.write(analysis.get('improvement_recommendations', 'N/A')) | |
| def create_download_content(json_data): | |
| """Create downloadable content from JSON data""" | |
| content = "VIDEO ANALYSER AND SCRIPT GENERATOR OUTPUT\n" | |
| content += "=" * 60 + "\n\n" | |
| # Add script variations | |
| for i, variation in enumerate(json_data.get("script_variations", []), 1): | |
| variation_name = variation.get("variation_name", f"Variation {i}") | |
| content += f"{variation_name}\n" | |
| content += "-" * 40 + "\n" | |
| for row in variation.get("script_table", []): | |
| content += f"Timestamp: {row.get('timestamp', '')}\n" | |
| content += f"Script: {row.get('script_voiceover', '')}\n" | |
| content += f"Visual: {row.get('visual_direction', '')}\n" | |
| content += f"Trigger: {row.get('psychological_trigger', '')}\n" | |
| content += f"CTA: {row.get('cta_action', '')}\n\n" | |
| content += "\n" | |
| # Add analysis | |
| analysis = json_data.get("video_analysis", {}) | |
| content += "VIDEO ANALYSIS\n" | |
| content += "=" * 20 + "\n" | |
| content += f"Effectiveness Factors: {analysis.get('effectiveness_factors', 'N/A')}\n\n" | |
| content += f"Psychological Triggers: {analysis.get('psychological_triggers', 'N/A')}\n\n" | |
| content += f"Target Audience: {analysis.get('target_audience', 'N/A')}\n\n" | |
| content += f"Recommendations: {analysis.get('improvement_recommendations', 'N/A')}\n" | |
| return content | |
| def check_token(user_token): | |
| ACCESS_TOKEN = os.getenv("ACCESS_TOKEN") | |
| if not ACCESS_TOKEN: | |
| logger.critical("ACCESS_TOKEN not set in environment.") | |
| return False, "Server error: Access token not configured." | |
| if user_token == ACCESS_TOKEN: | |
| logger.info("Access token validated successfully.") | |
| return True, "" | |
| logger.warning("Invalid access token attempt.") | |
| return False, "Invalid token." | |
| def main(): | |
| """Main application function""" | |
| # Header | |
| st.title("Video Analyser and Script Generator") | |
| st.divider() | |
| st.set_page_config(page_title="Bulk Creative Generation", layout="wide") | |
| if "authenticated" not in st.session_state: | |
| st.session_state["authenticated"] = False | |
| if not st.session_state["authenticated"]: | |
| st.markdown("## Access Required") | |
| token_input = st.text_input("Enter Access Token", type="password") | |
| if st.button("Unlock App"): | |
| ok, error_msg = check_token(token_input) | |
| if ok: | |
| st.session_state["authenticated"] = True | |
| st.rerun() | |
| else: | |
| st.error(error_msg) | |
| else: | |
| # Sidebar for inputs | |
| with st.sidebar: | |
| st.header("Input Configuration") | |
| # Video upload | |
| uploaded_video = st.file_uploader( | |
| "Upload Reference Video", | |
| type=['mp4', 'mov', 'avi', 'mkv'], | |
| help="Upload a profitable ad video to analyze and create variations from" | |
| ) | |
| st.subheader("Additional Context (Optional)") | |
| offer_details = st.text_area( | |
| "Offer Details", | |
| placeholder="e.g., Solar installation with $0 down payment...", | |
| height=80, | |
| help="Describe the product/service and main promise" | |
| ) | |
| target_audience = st.text_area( | |
| "Target Audience", | |
| placeholder="e.g., 40+ homeowners with high electricity bills...", | |
| height=80, | |
| help="Describe the ideal customer demographics and pain points" | |
| ) | |
| specific_hooks = st.text_area( | |
| "Specific Hooks to Test", | |
| placeholder="e.g., Government rebate angle, celebrity endorsement...", | |
| height=80, | |
| help="Any specific angles or hooks you want to incorporate" | |
| ) | |
| additional_context = st.text_area( | |
| "Additional Context", | |
| placeholder="Any other relevant information...", | |
| height=100, | |
| help="Compliance requirements, brand guidelines, or other notes" | |
| ) | |
| # Generate button | |
| generate_button = st.button( | |
| "Generate Script Variations", | |
| type="primary", | |
| use_container_width=True | |
| ) | |
| # Main content area | |
| if uploaded_video is None: | |
| st.info("Please upload a reference video to begin analysis.") | |
| # Instructions | |
| with st.expander("How to Use This Tool"): | |
| st.markdown(""" | |
| ### Upload Guidelines: | |
| - **Best videos to analyze**: Already profitable Facebook/TikTok ads in your niche | |
| - **Video length**: 30-90 seconds work best for analysis | |
| - **Quality**: Clear audio and visuals help with better analysis | |
| ### Context Tips: | |
| - **Offer details**: Be specific about your main promise and mechanism | |
| - **Audience**: Include demographics, pain points, and desires | |
| - **Hooks**: Mention any specific angles that have worked for you | |
| ### Script Optimization: | |
| - Generated scripts focus on stopping scroll and driving clicks | |
| - Each variation tests different psychological triggers | |
| - Use the timestamp format for precise video production | |
| - Test multiple variations to find your best performer | |
| """) | |
| elif generate_button: | |
| if not GEMINI_API_KEY or GEMINI_API_KEY == "your-gemini-api-key-here": | |
| st.error("Please configure your Gemini API key in the backend.") | |
| return | |
| # Process video | |
| with st.spinner("Analyzing video and generating scripts..."): | |
| video_bytes = uploaded_video.read() | |
| # Reset file pointer for potential re-use | |
| uploaded_video.seek(0) | |
| json_response = analyze_video_and_generate_script( | |
| video_bytes, | |
| uploaded_video.name, | |
| offer_details, | |
| target_audience, | |
| specific_hooks, | |
| additional_context | |
| ) | |
| if json_response: | |
| st.success("Analysis complete! Here are your script variations:") | |
| # Create tabs for different outputs | |
| tab1, tab2 = st.tabs(["Script Variations", "Video Analysis"]) | |
| with tab1: | |
| display_script_variations(json_response) | |
| # Download button | |
| download_content = create_download_content(json_response) | |
| st.download_button( | |
| label="Download All Scripts", | |
| data=download_content, | |
| file_name="video_script_variations.txt", | |
| mime="text/plain", | |
| type="secondary", | |
| use_container_width=True | |
| ) | |
| with tab2: | |
| display_video_analysis(json_response) | |
| else: | |
| st.error("Failed to generate script variations. Please try again.") | |
| else: | |
| st.info("Configure your inputs in the sidebar and click 'Generate Script Variations' to begin.") | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("Launching Streamlit app...") | |
| main() | |
| except Exception as e: | |
| logger.exception("Unhandled error during app launch.") |