|
|
import streamlit as st |
|
|
import google.generativeai as genai |
|
|
import tempfile |
|
|
import os |
|
|
import time |
|
|
import json |
|
|
from typing import Optional |
|
|
import pandas as pd |
|
|
import logging |
|
|
from database import insert_analysis_result |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_KEY") |
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Video Analyser and Script Generator", |
|
|
page_icon="🎥", |
|
|
layout="wide", |
|
|
initial_sidebar_state="expanded" |
|
|
) |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.DEBUG, |
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
|
|
handlers=[ |
|
|
logging.StreamHandler(), |
|
|
logging.FileHandler('app.log', mode='a') |
|
|
] |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def configure_gemini(): |
|
|
"""Configure Gemini API with backend key""" |
|
|
logger.info("Starting Gemini API configuration...") |
|
|
|
|
|
if not GEMINI_API_KEY: |
|
|
error_msg = "GEMINI_KEY not found in environment variables" |
|
|
logger.error(error_msg) |
|
|
st.error(error_msg) |
|
|
return False |
|
|
|
|
|
logger.info(f"API Key found, length: {len(GEMINI_API_KEY)}") |
|
|
logger.debug(f"API Key starts with: {GEMINI_API_KEY[:10]}..." if len(GEMINI_API_KEY) > 10 else "API Key too short") |
|
|
|
|
|
try: |
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
logger.info("Gemini API configured successfully") |
|
|
|
|
|
|
|
|
logger.info("Testing API connection...") |
|
|
models = list(genai.list_models()) |
|
|
logger.info(f"Available models: {[model.name for model in models]}") |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
error_msg = f"Failed to configure Gemini API: {str(e)}" |
|
|
logger.error(error_msg, exc_info=True) |
|
|
st.error(error_msg) |
|
|
return False |
|
|
|
|
|
|
|
|
SYSTEM_PROMPT = f"""{os.getenv("SYS_PROMPT")}""" |
|
|
logger.info(f"System prompt loaded, length: {len(SYSTEM_PROMPT) if SYSTEM_PROMPT else 0}") |
|
|
|
|
|
def analyze_video_and_generate_script( |
|
|
video_bytes, |
|
|
video_name, |
|
|
offer_details: str = "", |
|
|
target_audience: str = "", |
|
|
specific_hooks: str = "", |
|
|
additional_context: str = "" |
|
|
): |
|
|
""" |
|
|
Analyze video and generate direct response script variations |
|
|
""" |
|
|
logger.info(f"Starting video analysis for: {video_name}") |
|
|
logger.info(f"Video size: {len(video_bytes)} bytes") |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Creating temporary file...") |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_name)[1]) as tmp_file: |
|
|
tmp_file.write(video_bytes) |
|
|
tmp_file_path = tmp_file.name |
|
|
|
|
|
logger.info(f"Temporary file created: {tmp_file_path}") |
|
|
logger.info(f"File size on disk: {os.path.getsize(tmp_file_path)} bytes") |
|
|
|
|
|
|
|
|
logger.info("Configuring Gemini API...") |
|
|
if not configure_gemini(): |
|
|
logger.error("Gemini configuration failed") |
|
|
return None |
|
|
|
|
|
|
|
|
upload_progress = st.progress(0) |
|
|
upload_status = st.empty() |
|
|
|
|
|
upload_status.text("Uploading video to Google AI...") |
|
|
upload_progress.progress(20) |
|
|
logger.info("Starting file upload to Gemini...") |
|
|
|
|
|
try: |
|
|
|
|
|
video_file_obj = genai.upload_file(tmp_file_path) |
|
|
logger.info(f"File uploaded successfully. File URI: {video_file_obj.uri}") |
|
|
logger.info(f"File state: {video_file_obj.state.name}") |
|
|
upload_progress.progress(40) |
|
|
|
|
|
except Exception as upload_error: |
|
|
error_msg = f"File upload failed: {str(upload_error)}" |
|
|
logger.error(error_msg, exc_info=True) |
|
|
upload_status.error(error_msg) |
|
|
return None |
|
|
|
|
|
upload_status.text("Processing video...") |
|
|
logger.info("Waiting for video processing...") |
|
|
|
|
|
processing_attempts = 0 |
|
|
max_processing_attempts = 30 |
|
|
|
|
|
while video_file_obj.state.name == "PROCESSING": |
|
|
processing_attempts += 1 |
|
|
logger.debug(f"Processing attempt {processing_attempts}/{max_processing_attempts}") |
|
|
|
|
|
if processing_attempts > max_processing_attempts: |
|
|
error_msg = "Video processing timed out after 1 minute" |
|
|
logger.error(error_msg) |
|
|
upload_status.error(error_msg) |
|
|
return None |
|
|
|
|
|
time.sleep(2) |
|
|
try: |
|
|
video_file_obj = genai.get_file(video_file_obj.name) |
|
|
logger.debug(f"Processing state: {video_file_obj.state.name}") |
|
|
except Exception as get_file_error: |
|
|
logger.error(f"Error checking file status: {str(get_file_error)}", exc_info=True) |
|
|
break |
|
|
|
|
|
upload_progress.progress(40 + (processing_attempts * 20 // max_processing_attempts)) |
|
|
|
|
|
logger.info(f"Final file state: {video_file_obj.state.name}") |
|
|
|
|
|
if video_file_obj.state.name == "FAILED": |
|
|
error_msg = "Google AI file processing failed. Please try another video." |
|
|
logger.error(error_msg) |
|
|
upload_status.error(error_msg) |
|
|
return None |
|
|
|
|
|
if video_file_obj.state.name != "ACTIVE": |
|
|
error_msg = f"Unexpected file state: {video_file_obj.state.name}" |
|
|
logger.error(error_msg) |
|
|
upload_status.error(error_msg) |
|
|
return None |
|
|
|
|
|
upload_progress.progress(80) |
|
|
upload_status.text("Generating script variations...") |
|
|
logger.info("Starting content generation...") |
|
|
|
|
|
|
|
|
user_prompt = f"""Analyze this reference video and generate 3 high-converting direct response video script variations with detailed timestamp-based improvements. |
|
|
|
|
|
IMPORTANT CONTEXT TO FOLLOW WHEN CREATING OUTPUT: |
|
|
- Offer Details: {offer_details} |
|
|
- Target Audience: {target_audience} |
|
|
- Specific Hooks: {specific_hooks} |
|
|
|
|
|
ADDITIONAL CONTEXT (MANDATORY TO FOLLOW): |
|
|
{additional_context} |
|
|
|
|
|
You must reflect this additional context in: |
|
|
- The script tone, CTA, visuals |
|
|
- Compliance or branding constraints |
|
|
- Any assumptions about audience or product |
|
|
|
|
|
Failure to include this will be considered incomplete. |
|
|
|
|
|
Please provide a comprehensive analysis including: |
|
|
|
|
|
1. DETAILED VIDEO ANALYSIS with timestamp-based metrics: |
|
|
- Break down the video into 5-10 second segments |
|
|
- Rate each segment's effectiveness (1-10 scale) |
|
|
- Identify specific elements (hook, transition, proof, CTA, etc.) |
|
|
|
|
|
2. TIMESTAMP-BASED IMPROVEMENTS: |
|
|
- Specific recommendations for each time segment |
|
|
- Priority level for each improvement |
|
|
- Expected impact of implementing changes |
|
|
|
|
|
3. SCRIPT VARIATIONS: |
|
|
- Create 2-3 complete script variations |
|
|
- Each with timestamp-by-timestamp breakdown |
|
|
- Different psychological triggers and approaches |
|
|
|
|
|
IMPORTANT: Return only valid JSON in the exact format specified in the system prompt. Analyze the video second-by-second for maximum detail.""" |
|
|
|
|
|
logger.info(f"User prompt length: {len(user_prompt)}") |
|
|
logger.info(f"System prompt length: {len(SYSTEM_PROMPT) if SYSTEM_PROMPT else 0}") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("Creating GenerativeModel instance...") |
|
|
model = genai.GenerativeModel("gemini-2.0-flash-exp") |
|
|
logger.info("Model created successfully") |
|
|
|
|
|
logger.info("Generating content with video and prompts...") |
|
|
full_prompt = user_prompt + "\n\n" + (SYSTEM_PROMPT or "") |
|
|
logger.debug(f"Full prompt length: {len(full_prompt)}") |
|
|
|
|
|
response = model.generate_content([video_file_obj, full_prompt]) |
|
|
logger.info("Content generation completed successfully") |
|
|
logger.debug(f"Response text length: {len(response.text) if hasattr(response, 'text') else 'No text attribute'}") |
|
|
|
|
|
except Exception as generation_error: |
|
|
error_msg = f"Error generating content with Gemini: {str(generation_error)}" |
|
|
logger.error(error_msg, exc_info=True) |
|
|
upload_status.error(error_msg) |
|
|
return None |
|
|
|
|
|
upload_progress.progress(100) |
|
|
upload_status.success("Analysis complete!") |
|
|
logger.info("Video analysis completed successfully") |
|
|
|
|
|
|
|
|
try: |
|
|
os.unlink(tmp_file_path) |
|
|
logger.info(f"Temporary file deleted: {tmp_file_path}") |
|
|
except Exception as cleanup_error: |
|
|
logger.warning(f"Failed to delete temporary file: {str(cleanup_error)}") |
|
|
|
|
|
|
|
|
logger.info("Parsing JSON response...") |
|
|
try: |
|
|
if not hasattr(response, 'text'): |
|
|
error_msg = "Response object has no text attribute" |
|
|
logger.error(error_msg) |
|
|
st.error(error_msg) |
|
|
return None |
|
|
|
|
|
response_text = response.text.strip() |
|
|
logger.debug(f"Raw response text preview: {response_text[:500]}...") |
|
|
|
|
|
if response_text.startswith('```json'): |
|
|
response_text = response_text[7:-3] |
|
|
logger.debug("Removed json code block markers") |
|
|
elif response_text.startswith('```'): |
|
|
response_text = response_text[3:-3] |
|
|
logger.debug("Removed generic code block markers") |
|
|
|
|
|
logger.debug(f"Cleaned response text preview: {response_text[:500]}...") |
|
|
|
|
|
json_response = json.loads(response_text) |
|
|
logger.info("JSON parsing successful") |
|
|
logger.debug(f"JSON keys: {list(json_response.keys()) if isinstance(json_response, dict) else 'Not a dict'}") |
|
|
|
|
|
return json_response |
|
|
|
|
|
except json.JSONDecodeError as json_error: |
|
|
error_msg = f"Error parsing AI response as JSON: {str(json_error)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(f"Response text that failed to parse: {response_text[:1000]}...") |
|
|
st.error(error_msg) |
|
|
st.text_area("Raw Response (for debugging):", response_text, height=200) |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error processing video: {str(e)}" |
|
|
logger.error(error_msg, exc_info=True) |
|
|
st.error(error_msg) |
|
|
return None |
|
|
|
|
|
def display_script_variations(json_data): |
|
|
"""Display script variations in formatted tables""" |
|
|
logger.info("Displaying script variations...") |
|
|
|
|
|
if not json_data or "script_variations" not in json_data: |
|
|
error_msg = "No script variations found in the response" |
|
|
logger.error(error_msg) |
|
|
logger.debug(f"JSON data keys: {list(json_data.keys()) if isinstance(json_data, dict) else 'Not a dict'}") |
|
|
st.error(error_msg) |
|
|
return |
|
|
|
|
|
try: |
|
|
variations = json_data["script_variations"] |
|
|
logger.info(f"Found {len(variations)} script variations") |
|
|
|
|
|
for i, variation in enumerate(variations, 1): |
|
|
variation_name = variation.get("variation_name", f"Variation {i}") |
|
|
logger.debug(f"Processing variation {i}: {variation_name}") |
|
|
|
|
|
st.markdown(f"### Variation {i}: {variation_name}") |
|
|
|
|
|
|
|
|
script_data = variation.get("script_table") |
|
|
if not script_data: |
|
|
warning_msg = f"No script data for {variation_name}" |
|
|
logger.warning(warning_msg) |
|
|
st.warning(warning_msg) |
|
|
continue |
|
|
|
|
|
logger.debug(f"Script data for {variation_name}: {len(script_data)} rows") |
|
|
|
|
|
df = pd.DataFrame(script_data) |
|
|
|
|
|
|
|
|
df = df.rename(columns={ |
|
|
'timestamp': 'Timestamp', |
|
|
'script_voiceover': 'Script / Voiceover', |
|
|
'visual_direction': 'Visual Direction', |
|
|
'psychological_trigger': 'Psychological Trigger', |
|
|
'cta_action': 'CTA / Action' |
|
|
}) |
|
|
|
|
|
st.table(df) |
|
|
st.markdown("---") |
|
|
|
|
|
logger.info("Script variations displayed successfully") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error displaying script variations: {str(e)}" |
|
|
logger.error(error_msg, exc_info=True) |
|
|
st.error(error_msg) |
|
|
|
|
|
def display_video_analysis(json_data): |
|
|
"""Display video analysis in tabular format""" |
|
|
logger.info("Displaying video analysis...") |
|
|
|
|
|
if not json_data or "video_analysis" not in json_data: |
|
|
error_msg = "No video analysis found in the response" |
|
|
logger.error(error_msg) |
|
|
st.error(error_msg) |
|
|
return |
|
|
|
|
|
try: |
|
|
analysis = json_data["video_analysis"] |
|
|
logger.debug(f"Video analysis type: {type(analysis)}") |
|
|
|
|
|
|
|
|
video_metrics = [] |
|
|
if isinstance(analysis, dict): |
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
with col1: |
|
|
st.subheader("Effectiveness Factors") |
|
|
effectiveness = analysis.get('effectiveness_factors', 'N/A') |
|
|
st.write(effectiveness) |
|
|
logger.debug(f"Effectiveness factors: {effectiveness}") |
|
|
|
|
|
st.subheader("Target Audience") |
|
|
audience = analysis.get('target_audience', 'N/A') |
|
|
st.write(audience) |
|
|
logger.debug(f"Target audience: {audience}") |
|
|
|
|
|
with col2: |
|
|
st.subheader("Psychological Triggers") |
|
|
triggers = analysis.get('psychological_triggers', 'N/A') |
|
|
st.write(triggers) |
|
|
logger.debug(f"Psychological triggers: {triggers}") |
|
|
|
|
|
video_metrics = analysis.get("video_metrics", []) |
|
|
logger.debug(f"Video metrics count: {len(video_metrics)}") |
|
|
|
|
|
else: |
|
|
warning_msg = "Unexpected format in video_analysis. Skipping metadata." |
|
|
logger.warning(warning_msg) |
|
|
st.warning(warning_msg) |
|
|
if isinstance(analysis, list): |
|
|
video_metrics = analysis |
|
|
|
|
|
if video_metrics: |
|
|
logger.info(f"Processing {len(video_metrics)} video metrics") |
|
|
metrics_df = pd.DataFrame(video_metrics) |
|
|
|
|
|
|
|
|
column_mapping = { |
|
|
'timestamp': 'Timestamp', |
|
|
'element': 'Element', |
|
|
'current_approach': 'Current Approach', |
|
|
'effectiveness_score': 'Score', |
|
|
'notes': 'Analysis Notes' |
|
|
} |
|
|
|
|
|
metrics_df = metrics_df.rename(columns=column_mapping) |
|
|
logger.debug(f"Metrics dataframe columns: {list(metrics_df.columns)}") |
|
|
|
|
|
st.dataframe( |
|
|
metrics_df, |
|
|
use_container_width=True, |
|
|
hide_index=True, |
|
|
column_config={ |
|
|
"Timestamp": st.column_config.TextColumn(width="small"), |
|
|
"Element": st.column_config.TextColumn(width="medium"), |
|
|
"Current Approach": st.column_config.TextColumn(width="large"), |
|
|
"Score": st.column_config.TextColumn(width="small"), |
|
|
"Analysis Notes": st.column_config.TextColumn(width="large") |
|
|
} |
|
|
) |
|
|
else: |
|
|
warning_msg = "No detailed video metrics available" |
|
|
logger.warning(warning_msg) |
|
|
st.warning(warning_msg) |
|
|
|
|
|
logger.info("Video analysis displayed successfully") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error displaying video analysis: {str(e)}" |
|
|
logger.error(error_msg, exc_info=True) |
|
|
st.error(error_msg) |
|
|
|
|
|
def display_timestamp_improvements(json_data): |
|
|
"""Display timestamp-based improvements in tabular format""" |
|
|
logger.info("Displaying timestamp improvements...") |
|
|
|
|
|
improvements = json_data.get("timestamp_improvements") |
|
|
|
|
|
if improvements is None: |
|
|
error_msg = "No timestamp improvements found in the response" |
|
|
logger.error(error_msg) |
|
|
st.error(error_msg) |
|
|
return |
|
|
|
|
|
if not improvements: |
|
|
warning_msg = "No timestamp improvements available" |
|
|
logger.warning(warning_msg) |
|
|
st.warning(warning_msg) |
|
|
return |
|
|
|
|
|
try: |
|
|
st.subheader("Timestamp-by-Timestamp Improvement Recommendations") |
|
|
logger.info(f"Processing {len(improvements)} improvement recommendations") |
|
|
|
|
|
improvements_df = pd.DataFrame(improvements) |
|
|
|
|
|
|
|
|
column_mapping = { |
|
|
'timestamp': 'Timestamp', |
|
|
'current_element': 'Current Element', |
|
|
'improvement_type': 'Improvement Type', |
|
|
'recommended_change': 'Recommended Change', |
|
|
'expected_impact': 'Expected Impact', |
|
|
'priority': 'Priority' |
|
|
} |
|
|
|
|
|
improvements_df = improvements_df.rename(columns=column_mapping) |
|
|
logger.debug(f"Improvements dataframe columns: {list(improvements_df.columns)}") |
|
|
|
|
|
|
|
|
def color_priority(val): |
|
|
if val == 'High': |
|
|
return 'background-color: #ffcccb' |
|
|
elif val == 'Medium': |
|
|
return 'background-color: #ffffcc' |
|
|
elif val == 'Low': |
|
|
return 'background-color: #ccffcc' |
|
|
return '' |
|
|
|
|
|
styled_df = improvements_df.style.applymap(color_priority, subset=['Priority']) |
|
|
|
|
|
st.dataframe( |
|
|
styled_df, |
|
|
use_container_width=True, |
|
|
hide_index=True, |
|
|
column_config={ |
|
|
"Timestamp": st.column_config.TextColumn(width="small"), |
|
|
"Current Element": st.column_config.TextColumn(width="medium"), |
|
|
"Improvement Type": st. |