Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from google import genai | |
| import tempfile | |
| import os | |
| import time | |
| import json | |
| from typing import Optional | |
| import pandas as pd | |
| import logging | |
| # Backend API Key Configuration | |
| GEMINI_API_KEY = os.getenv("GEMENI_KEY") | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="Video Analyser and Script Generator", | |
| page_icon="🎥", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def configure_gemini(): | |
| """Configure Gemini API with backend key""" | |
| return genai.Client(api_key=GEMINI_API_KEY) | |
| # Enhanced system prompt with timestamp-based improvements | |
| SYSTEM_PROMPT = f"""{os.getenv("SYS_PROMPT")}""" | |
| def analyze_video_and_generate_script( | |
| video_bytes, | |
| video_name, | |
| offer_details: str = "", | |
| target_audience: str = "", | |
| specific_hooks: str = "", | |
| additional_context: str = "" | |
| ): | |
| """ | |
| Analyze video and generate direct response script variations | |
| """ | |
| try: | |
| # Save uploaded video to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_name)[1]) as tmp_file: | |
| tmp_file.write(video_bytes) | |
| tmp_file_path = tmp_file.name | |
| # Configure Gemini | |
| client = configure_gemini() | |
| # Show upload progress | |
| upload_progress = st.progress(0) | |
| upload_status = st.empty() | |
| upload_status.text("Uploading video to Google AI...") | |
| upload_progress.progress(20) | |
| # Upload video to Gemini | |
| video_file_obj = client.files.upload(file=tmp_file_path) | |
| upload_progress.progress(40) | |
| upload_status.text("Processing video...") | |
| while video_file_obj.state.name == "PROCESSING": | |
| time.sleep(2) | |
| video_file_obj = client.files.get(name=video_file_obj.name) | |
| upload_progress.progress(60) | |
| if video_file_obj.state.name == "FAILED": | |
| upload_status.error("Google AI file processing failed. Please try another video.") | |
| return None | |
| upload_progress.progress(80) | |
| upload_status.text("Generating script variations...") | |
| # Build the enhanced user prompt | |
| user_prompt = f"""Analyze this reference video and generate 3 high-converting direct response video script variations with detailed timestamp-based improvements. | |
| ADDITIONAL CONTEXT: | |
| - Offer Details: {offer_details if offer_details else 'Extract from video'} | |
| - Target Audience: {target_audience if target_audience else 'Determine from video content'} | |
| - Specific Hooks to Consider: {specific_hooks if specific_hooks else 'Create based on video analysis'} | |
| - Additional Context: {additional_context} | |
| Please provide a comprehensive analysis including: | |
| 1. DETAILED VIDEO ANALYSIS with timestamp-based metrics: | |
| - Break down the video into 5-10 second segments | |
| - Rate each segment's effectiveness (1-10 scale) | |
| - Identify specific elements (hook, transition, proof, CTA, etc.) | |
| 2. TIMESTAMP-BASED IMPROVEMENTS: | |
| - Specific recommendations for each time segment | |
| - Priority level for each improvement | |
| - Expected impact of implementing changes | |
| 3. SCRIPT VARIATIONS: | |
| - Create 2-3 complete script variations | |
| - Each with timestamp-by-timestamp breakdown | |
| - Different psychological triggers and approaches | |
| IMPORTANT: Return only valid JSON in the exact format specified in the system prompt. Analyze the video second-by-second for maximum detail.""" | |
| # Generate response | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=[video_file_obj, user_prompt + "\n\n" + SYSTEM_PROMPT] | |
| ) | |
| upload_progress.progress(100) | |
| upload_status.success("Analysis complete!") | |
| # Clean up temporary file | |
| os.unlink(tmp_file_path) | |
| # Parse JSON response | |
| try: | |
| response_text = response.text.strip() | |
| if response_text.startswith('```json'): | |
| response_text = response_text[7:-3] | |
| elif response_text.startswith('```'): | |
| response_text = response_text[3:-3] | |
| json_response = json.loads(response_text) | |
| return json_response | |
| except json.JSONDecodeError as e: | |
| st.error(f"Error parsing AI response: {str(e)}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error processing video: {str(e)}") | |
| return None | |
| def display_script_variations(json_data): | |
| """Display script variations in formatted tables""" | |
| if not json_data or "script_variations" not in json_data: | |
| st.error("No script variations found in the response") | |
| return | |
| for i, variation in enumerate(json_data["script_variations"], 1): | |
| variation_name = variation.get("variation_name", f"Variation {i}") | |
| st.subheader(variation_name) | |
| # Convert script table to DataFrame for better display | |
| script_data = variation.get("script_table", []) | |
| if script_data: | |
| df = pd.DataFrame(script_data) | |
| # Rename columns for better display | |
| column_mapping = { | |
| 'timestamp': 'Timestamp', | |
| 'script_voiceover': 'Script / Voiceover', | |
| 'visual_direction': 'Visual Direction', | |
| 'psychological_trigger': 'Psychological Trigger', | |
| 'cta_action': 'CTA / Action' | |
| } | |
| df = df.rename(columns=column_mapping) | |
| # Display as interactive table | |
| st.dataframe( | |
| df, | |
| use_container_width=True, | |
| hide_index=True, | |
| column_config={ | |
| "Timestamp": st.column_config.TextColumn(width="small"), | |
| "Script / Voiceover": st.column_config.TextColumn(width="large"), | |
| "Visual Direction": st.column_config.TextColumn(width="large"), | |
| "Psychological Trigger": st.column_config.TextColumn(width="medium"), | |
| "CTA / Action": st.column_config.TextColumn(width="medium") | |
| } | |
| ) | |
| else: | |
| st.warning(f"No script data available for {variation_name}") | |
| st.divider() | |
| def display_video_analysis(json_data): | |
| """Display video analysis in tabular format""" | |
| if not json_data or "video_analysis" not in json_data: | |
| st.error("No video analysis found in the response") | |
| return | |
| analysis = json_data["video_analysis"] | |
| # Display general analysis | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Effectiveness Factors") | |
| st.write(analysis.get('effectiveness_factors', 'N/A')) | |
| st.subheader("Target Audience") | |
| st.write(analysis.get('target_audience', 'N/A')) | |
| with col2: | |
| st.subheader("Psychological Triggers") | |
| st.write(analysis.get('psychological_triggers', 'N/A')) | |
| # Display video metrics in tabular format | |
| st.subheader("Detailed Video Metrics (Timestamp Analysis)") | |
| video_metrics = analysis.get('video_metrics', []) | |
| if video_metrics: | |
| metrics_df = pd.DataFrame(video_metrics) | |
| # Rename columns for better display | |
| column_mapping = { | |
| 'timestamp': 'Timestamp', | |
| 'element': 'Element', | |
| 'current_approach': 'Current Approach', | |
| 'effectiveness_score': 'Score', | |
| 'notes': 'Analysis Notes' | |
| } | |
| metrics_df = metrics_df.rename(columns=column_mapping) | |
| st.dataframe( | |
| metrics_df, | |
| use_container_width=True, | |
| hide_index=True, | |
| column_config={ | |
| "Timestamp": st.column_config.TextColumn(width="small"), | |
| "Element": st.column_config.TextColumn(width="medium"), | |
| "Current Approach": st.column_config.TextColumn(width="large"), | |
| "Score": st.column_config.TextColumn(width="small"), | |
| "Analysis Notes": st.column_config.TextColumn(width="large") | |
| } | |
| ) | |
| else: | |
| st.warning("No detailed video metrics available") | |
| def display_timestamp_improvements(json_data): | |
| """Display timestamp-based improvements in tabular format""" | |
| if not json_data or "timestamp_improvements" not in json_data: | |
| st.error("No timestamp improvements found in the response") | |
| return | |
| st.subheader("Timestamp-by-Timestamp Improvement Recommendations") | |
| improvements = json_data["timestamp_improvements"] | |
| if improvements: | |
| improvements_df = pd.DataFrame(improvements) | |
| # Rename columns for better display | |
| column_mapping = { | |
| 'timestamp': 'Timestamp', | |
| 'current_element': 'Current Element', | |
| 'improvement_type': 'Improvement Type', | |
| 'recommended_change': 'Recommended Change', | |
| 'expected_impact': 'Expected Impact', | |
| 'priority': 'Priority' | |
| } | |
| improvements_df = improvements_df.rename(columns=column_mapping) | |
| # Color code priority | |
| def color_priority(val): | |
| if val == 'High': | |
| return 'background-color: #ffcccb' | |
| elif val == 'Medium': | |
| return 'background-color: #ffffcc' | |
| elif val == 'Low': | |
| return 'background-color: #ccffcc' | |
| return '' | |
| styled_df = improvements_df.style.applymap(color_priority, subset=['Priority']) | |
| st.dataframe( | |
| styled_df, | |
| use_container_width=True, | |
| hide_index=True, | |
| column_config={ | |
| "Timestamp": st.column_config.TextColumn(width="small"), | |
| "Current Element": st.column_config.TextColumn(width="medium"), | |
| "Improvement Type": st.column_config.TextColumn(width="medium"), | |
| "Recommended Change": st.column_config.TextColumn(width="large"), | |
| "Expected Impact": st.column_config.TextColumn(width="medium"), | |
| "Priority": st.column_config.TextColumn(width="small") | |
| } | |
| ) | |
| else: | |
| st.warning("No timestamp improvements available") | |
| def create_csv_download(json_data): | |
| """Create CSV content with all scripts combined""" | |
| all_scripts_data = [] | |
| # Combine all script variations into one dataset | |
| for i, variation in enumerate(json_data.get("script_variations", []), 1): | |
| variation_name = variation.get("variation_name", f"Variation {i}") | |
| for row in variation.get("script_table", []): | |
| script_row = { | |
| 'Variation': variation_name, | |
| 'Timestamp': row.get('timestamp', ''), | |
| 'Script_Voiceover': row.get('script_voiceover', ''), | |
| 'Visual_Direction': row.get('visual_direction', ''), | |
| 'Psychological_Trigger': row.get('psychological_trigger', ''), | |
| 'CTA_Action': row.get('cta_action', '') | |
| } | |
| all_scripts_data.append(script_row) | |
| # Convert to DataFrame and then to CSV | |
| if all_scripts_data: | |
| df = pd.DataFrame(all_scripts_data) | |
| return df.to_csv(index=False) | |
| else: | |
| return "No script data available" | |
| def check_token(user_token): | |
| ACCESS_TOKEN = os.getenv("ACCESS_TOKEN") | |
| if not ACCESS_TOKEN: | |
| logger.critical("ACCESS_TOKEN not set in environment.") | |
| return False, "Server error: Access token not configured." | |
| if user_token == ACCESS_TOKEN: | |
| logger.info("Access token validated successfully.") | |
| return True, "" | |
| logger.warning("Invalid access token attempt.") | |
| return False, "Invalid token." | |
| def main(): | |
| """Main application function""" | |
| # Header | |
| st.title("Video Analyser and Script Generator") | |
| st.divider() | |
| if "authenticated" not in st.session_state: | |
| st.session_state["authenticated"] = False | |
| if not st.session_state["authenticated"]: | |
| st.markdown("## Access Required") | |
| token_input = st.text_input("Enter Access Token", type="password") | |
| if st.button("Unlock App"): | |
| ok, error_msg = check_token(token_input) | |
| if ok: | |
| st.session_state["authenticated"] = True | |
| st.rerun() | |
| else: | |
| st.error(error_msg) | |
| else: | |
| # Sidebar for inputs | |
| with st.sidebar: | |
| st.header("Input Configuration") | |
| # Video upload | |
| uploaded_video = st.file_uploader( | |
| "Upload Reference Video", | |
| type=['mp4', 'mov', 'avi', 'mkv'], | |
| help="Upload a profitable ad video to analyze and create variations from" | |
| ) | |
| st.subheader("Additional Context (Optional)") | |
| offer_details = st.text_area( | |
| "Offer Details", | |
| placeholder="e.g., Solar installation with $0 down payment...", | |
| height=80, | |
| help="Describe the product/service and main promise" | |
| ) | |
| target_audience = st.text_area( | |
| "Target Audience", | |
| placeholder="e.g., 40+ homeowners with high electricity bills...", | |
| height=80, | |
| help="Describe the ideal customer demographics and pain points" | |
| ) | |
| specific_hooks = st.text_area( | |
| "Specific Hooks to Test", | |
| placeholder="e.g., Government rebate angle, celebrity endorsement...", | |
| height=80, | |
| help="Any specific angles or hooks you want to incorporate" | |
| ) | |
| additional_context = st.text_area( | |
| "Additional Context", | |
| placeholder="Any other relevant information...", | |
| height=100, | |
| help="Compliance requirements, brand guidelines, or other notes" | |
| ) | |
| # Generate button | |
| generate_button = st.button( | |
| "Generate Script Variations", | |
| type="primary", | |
| use_container_width=True | |
| ) | |
| # Clear results button (only show if results exist) | |
| if "analysis_results" in st.session_state and st.session_state["analysis_results"]: | |
| if st.button( | |
| "Clear Results", | |
| type="secondary", | |
| use_container_width=True | |
| ): | |
| del st.session_state["analysis_results"] | |
| st.rerun() | |
| # Main content area | |
| if uploaded_video is None: | |
| st.info("Please upload a reference video to begin analysis.") | |
| # Instructions | |
| with st.expander("How to Use This Tool"): | |
| st.markdown(""" | |
| ### Upload Guidelines: | |
| - **Best videos to analyze**: Already profitable Facebook/TikTok ads in your niche | |
| - **Video length**: 30-90 seconds work best for analysis | |
| - **Quality**: Clear audio and visuals help with better analysis | |
| ### Context Tips: | |
| - **Offer details**: Be specific about your main promise and mechanism | |
| - **Audience**: Include demographics, pain points, and desires | |
| - **Hooks**: Mention any specific angles that have worked for you | |
| ### Script Optimization: | |
| - Generated scripts focus on stopping scroll and driving clicks | |
| - Each variation tests different psychological triggers | |
| - Use the timestamp format for precise video production | |
| - Test multiple variations to find your best performer | |
| """) | |
| elif generate_button: | |
| if not GEMINI_API_KEY or GEMINI_API_KEY == "your-gemini-api-key-here": | |
| st.error("Please configure your Gemini API key in the backend.") | |
| return | |
| # Process video | |
| with st.spinner("Analyzing video and generating scripts..."): | |
| video_bytes = uploaded_video.read() | |
| # Reset file pointer for potential re-use | |
| uploaded_video.seek(0) | |
| json_response = analyze_video_and_generate_script( | |
| video_bytes, | |
| uploaded_video.name, | |
| offer_details, | |
| target_audience, | |
| specific_hooks, | |
| additional_context | |
| ) | |
| if json_response: | |
| # Store results in session state | |
| st.session_state["analysis_results"] = json_response | |
| st.success("Analysis complete! Here are your script variations:") | |
| else: | |
| st.error("Failed to generate script variations. Please try again.") | |
| # Display results if they exist in session state | |
| if "analysis_results" in st.session_state and st.session_state["analysis_results"]: | |
| json_response = st.session_state["analysis_results"] | |
| # Create tabs for different outputs | |
| tab1, tab2, tab3 = st.tabs(["Script Variations", "Video Analysis", "Improvement Recommendations"]) | |
| with tab1: | |
| display_script_variations(json_response) | |
| # CSV Download button | |
| csv_content = create_csv_download(json_response) | |
| st.download_button( | |
| label="Download All Scripts (CSV)", | |
| data=csv_content, | |
| file_name="video_script_variations.csv", | |
| mime="text/csv", | |
| type="secondary", | |
| use_container_width=True | |
| ) | |
| with tab2: | |
| display_video_analysis(json_response) | |
| with tab3: | |
| display_timestamp_improvements(json_response) | |
| else: | |
| st.info("Configure your inputs in the sidebar and click 'Generate Script Variations' to begin.") | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("Launching Streamlit app...") | |
| main() | |
| except Exception as e: | |
| logger.exception("Unhandled error during app launch.") |