Video_Virality / ui /streamlit_app.py
github-actions[bot]
Automated UV deployment
ad2cb5b
import re
import sys
import json
import time
import signal
import traceback
import threading
import unicodedata
import hashlib
from pathlib import Path
import plotly.express as px
import yt_dlp
import streamlit as st
# -----------------------------
# Safe signal handling for non-main thread environments (yt_dlp)
# -----------------------------
if threading.current_thread() is not threading.main_thread():
_orig_signal = signal.signal
def _safe(sig, handler):
if sig in (signal.SIGTERM, signal.SIGINT):
return
return _orig_signal(sig, handler)
signal.signal = _safe
# -----------------------------
# Project paths & imports
# -----------------------------
ROOT = Path(__file__).resolve().parents[1] # project root
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# For debugging path issues
# print("sys.path:", sys.path)
# print("ROOT:", ROOT)
from config import make_path
from files.pipeline.scene_detect import SceneDetector
from files.pipeline.frame_extract import FrameExtractor
# -----------------------------
# Storage layout
# -----------------------------
DATA_DIR = ROOT / "data"
DATA_DIR.mkdir(parents=True, exist_ok=True)
for name in ["raw", "interim", "processed", "reports"]:
(DATA_DIR / name).mkdir(parents=True, exist_ok=True)
RAW_DIR = DATA_DIR / "raw"
INTERIM_DIR = DATA_DIR / "interim"
PROCESSED_DIR = DATA_DIR / "processed"
REPORTS_DIR = DATA_DIR / "reports"
# -----------------------------
# Utilities
# -----------------------------
def sanitize_title(title: str, max_length: int = 150) -> str:
title = unicodedata.normalize("NFKD", title)
title = title.encode("ascii", "ignore").decode("ascii")
title = re.sub(r"#\w+", "", title)
title = re.sub(r"[^\w\s]", "", title)
title = re.sub(r"\s+", " ", title).strip()
title = title.lower()
return title[:max_length]
def sanitize_filename(filename: str) -> str:
filename = filename.lower().replace(" ", "_")
filename = unicodedata.normalize("NFKD", filename)
filename = filename.encode("ascii", "ignore").decode("ascii")
filename = re.sub(r"[^a-z0-9._-]", "", filename)
return filename.strip()
def create_short_path(video_path: Path) -> str:
"""Create a short identifier for frame directories to avoid Windows path limits"""
path_str = str(video_path)
# Create a short hash of the full path
path_hash = hashlib.md5(path_str.encode()).hexdigest()[:12]
return f"frames_{path_hash}"
def get_frames_directory(video_path: Path) -> Path:
"""Get the frames directory path using short naming to avoid Windows path limits"""
short_id = create_short_path(video_path)
return INTERIM_DIR / "frames" / short_id
def download_video(url: str) -> tuple[Path, str]:
with yt_dlp.YoutubeDL({"quiet": True}) as ydl:
info = ydl.extract_info(url, download=False)
original_title = info.get("title", "video")
ext = info.get("ext", "mp4")
clean_title = sanitize_title(original_title)
sanitized_name = sanitize_filename(clean_title) or "video"
filename = f"{sanitized_name}.{ext}"
file_path = RAW_DIR / filename
if not file_path.exists():
ydl_opts = {
"outtmpl": str(file_path),
"restrictfilenames": True,
"quiet": True,
"noplaylist": True,
"no_color": True,
"format": "bv*+ba/b",
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return file_path, clean_title
def get_paths(video_path: Path):
vp_str = str(video_path)
audio_json = make_path("processed/audio-analysis", vp_str, "audio_analysis", "json")
report_json = make_path("reports", vp_str, "final_report", "json")
scene_json = make_path("processed/scene-detection", vp_str, "scene", "json")
frame_json = make_path("processed/frame-analysis", vp_str, "frame_analysis", "json")
hook_json = make_path("processed/hook-analysis", vp_str, "hook_analysis", "json")
return scene_json, frame_json, audio_json, hook_json, report_json
def safe_load_json(path: Path | str):
p = Path(path)
if p.exists():
try:
with p.open(encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
return {}
def remove_artifacts(video_path: Path):
try:
if video_path and video_path.exists():
video_path.unlink(missing_ok=True)
except Exception:
pass
# -----------------------------
# Streamlit page config & styles
# -----------------------------
st.set_page_config(page_title="Virality Coach", layout="wide")
st.markdown(
"""
<style>
footer{display:none}
.block-container{padding-top:1rem;padding-bottom:2rem;max-width:1100px}
.title-center{text-align:center;margin-bottom:0.2rem}
.desc-center{text-align:center;margin-bottom:1.2rem;color:#dbdbdb}
.metric-card{background:#1f2937;border-radius:12px;padding:1.25rem;text-align:center;box-shadow:0 2px 8px rgba(0,0,0,.08);height:100%}
.metric-card h4{margin:0;font-size:0.95rem;color:#d1d5db}
.metric-card p{margin:0;font-size:1.8rem;font-weight:700;color:#ffffff}
video{max-height:240px;border-radius:10px;margin-bottom:0.5rem}
.status-msg{font-size:0.9rem;margin:0}
</style>
""",
unsafe_allow_html=True,
)
st.markdown('<h1 class="title-center">Video Virality Coach</h1>', unsafe_allow_html=True)
st.markdown('<p class="desc-center">An AI-powered system that analyzes and scores the virality potential of short-form videos (TikTok, Reels, Shorts) and delivers clear, actionable feedback to creators and marketers.</p>', unsafe_allow_html=True)
# -----------------------------
# Session state
# -----------------------------
DEFAULT_STATE = {
"mode": None,
"url": "",
"uploaded_name": None,
"video_path": None,
"clean_title": None,
"stage": None,
"progress": 0,
"status": [],
"cancel": False,
"error_msg": None,
"_ready_to_run": False,
}
for k, v in DEFAULT_STATE.items():
if k not in st.session_state:
st.session_state[k] = v
def reset_state(clear_video: bool = True):
keep = st.session_state.get("video_path") if not clear_video else None
st.session_state.update(DEFAULT_STATE | {"video_path": keep})
def push_status(msg: str):
st.session_state.status.append(msg)
# -----------------------------
# Pipeline step executor
# -----------------------------
STAGES = ["download video", "scene detection", "frames extraction", "frame analysis", "audio analysis", "hook analysis", "report"]
PROGRESS_MAP = {
"download video": 10,
"scene detection": 25,
"frames extraction": 40,
"frame analysis": 55,
"audio analysis": 70,
"hook analysis": 85,
"report": 100,
}
def _run_current_stage():
"""
Run the heavy work for the current stage.
This is called only when _ready_to_run is True,
so the UI has already rendered progress/cancel.
"""
stage = st.session_state.stage
if not stage or stage in ("done", "error"):
return
if st.session_state.cancel:
push_status("⚠️ Process canceled by user.")
print("[INFO] Processing canceled by user.")
st.session_state.stage = None
st.session_state.progress = 0
try:
vp = st.session_state.video_path
if vp:
remove_artifacts(Path(vp))
except Exception:
pass
st.session_state._ready_to_run = False
st.rerun()
try:
vp = Path(st.session_state.video_path) if st.session_state.video_path else None
if stage == "download video":
push_status("Starting download…")
print(f"[INFO] Stage: Downloading video from {st.session_state.url}")
path, title = download_video(st.session_state.url)
st.session_state.video_path = str(path)
st.session_state.clean_title = title
# Skip full pipeline if a report already exists
_, _, _, _, report_json = get_paths(path)
if Path(report_json).exists():
push_status("📄 Report already exists. Skipping analysis.")
print("[INFO] Report already exists, skipping pipeline.")
st.session_state.progress = 100
st.session_state.stage = "done"
st.session_state._ready_to_run = False
st.rerun()
st.session_state.progress = PROGRESS_MAP[stage]
push_status("✅ Download complete.")
print("[INFO] Download complete.")
st.session_state.stage = "scene detection"
st.session_state._ready_to_run = False
st.rerun()
elif stage == "scene detection":
push_status("Detecting scenes…")
print("[INFO] Stage: Scene detection started.")
try:
scene_detector = SceneDetector(str(vp))
scene_detector.detect_and_save()
# Verify the scene detection file was created
scene_json, _, _, _, _ = get_paths(vp)
if not Path(scene_json).exists():
raise FileNotFoundError("Scene detection failed - no output file")
# Verify the scene file has the expected structure
scene_data = safe_load_json(scene_json)
if not scene_data or 'scenes' not in scene_data:
raise ValueError("Scene detection produced invalid results - no 'scenes' key")
# Check if scenes have the required 'start_time' field
if scene_data['scenes'] and 'start_time' not in scene_data['scenes'][0]:
print("[WARNING] Scene data missing 'start_time' field, adding compatible structure")
# Convert the scene data to the expected format
fixed_scenes = []
for i, scene in enumerate(scene_data['scenes']):
fixed_scene = {
'start_time': scene.get('start', 0), # Use 'start' if available, else 0
'end_time': scene.get('end', 0), # Use 'end' if available, else 0
'duration': scene.get('duration', 0), # Use 'duration' if available, else 0
'scene_number': i
}
fixed_scenes.append(fixed_scene)
scene_data['scenes'] = fixed_scenes
# Save the fixed scene data
with open(scene_json, 'w', encoding='utf-8') as f:
json.dump(scene_data, f, indent=2)
st.session_state.progress = PROGRESS_MAP[stage]
push_status("✅ Scene detection done.")
print("[INFO] Scene detection complete.")
st.session_state.stage = "frames extraction"
st.session_state._ready_to_run = False
st.rerun()
except Exception as e:
# If scene detection fails, create a compatible scene file
print(f"[WARNING] Scene detection failed: {e}. Creating fallback scene data.")
push_status("⚠️ Scene detection failed. Using fallback scene data.")
scene_json, _, _, _, _ = get_paths(vp)
# Create compatible scene data with required 'start_time' field
fallback_scene_data = {
"scenes": [{
"start_time": 0,
"end_time": 30, # Assume 30 second scenes
"duration": 30,
"scene_number": 0
}]
}
# Ensure directory exists
Path(scene_json).parent.mkdir(parents=True, exist_ok=True)
with open(scene_json, 'w', encoding='utf-8') as f:
json.dump(fallback_scene_data, f, indent=2)
st.session_state.progress = PROGRESS_MAP[stage]
push_status("✅ Using fallback scene detection.")
st.session_state.stage = "frames extraction"
st.session_state._ready_to_run = False
st.rerun()
elif stage == "frames extraction":
push_status("Extracting frames…")
print("[INFO] Stage: Frame extraction started.")
# Use the original FrameExtractor without modification
FrameExtractor(str(vp)).extract()
st.session_state.progress = PROGRESS_MAP[stage]
push_status("✅ Frame extraction done.")
print("[INFO] Frame extraction complete.")
st.session_state.stage = "frame analysis"
st.session_state._ready_to_run = False
st.rerun()
elif stage == "frame analysis":
push_status("Analyzing frames…")
if st.session_state.openai_key and st.session_state.openai_key.strip():
from files.pipeline.frame_analysis import FrameAnalyzer
try:
FrameAnalyzer(str(vp), openai_api_key=st.session_state.openai_key.strip()).analyze()
except Exception as api_error:
error_msg = str(api_error)
if "invalid" in error_msg.lower() or "401" in error_msg or "authentication" in error_msg.lower():
st.session_state.stage = "error"
st.session_state.error_msg = f"OPENAI API KEY FAILED: Invalid OpenAI API Key provided. Please verify your API key is correct."
st.session_state._ready_to_run = False
st.rerun()
else:
raise
else:
st.session_state.stage = "error"
st.session_state.error_msg = "OPENAI API KEY FAILED: OpenAI API Key is required for frame analysis but was not provided."
st.session_state._ready_to_run = False
st.rerun()
st.session_state.progress = PROGRESS_MAP[stage]
push_status("✅ Frame analysis done.")
st.session_state.stage = "audio analysis"
st.session_state._ready_to_run = False
st.rerun()
elif stage == "audio analysis":
push_status("Analyzing audio…")
if st.session_state.gemini_key and st.session_state.gemini_key.strip():
from files.pipeline.audio_analysis import AudioAnalyzer
try:
AudioAnalyzer(str(vp), gemini_api_key=st.session_state.gemini_key.strip()).analyze()
except (ValueError, Exception) as api_error:
error_msg = str(api_error)
if "invalid" in error_msg.lower() or "401" in error_msg or "403" in error_msg or "api_key" in error_msg.lower() or "authentication" in error_msg.lower():
st.session_state.stage = "error"
st.session_state.error_msg = f"GEMINI API KEY FAILED: Invalid Gemini API Key provided. Error: {error_msg}"
st.session_state._ready_to_run = False
st.rerun()
else:
raise
else:
st.session_state.stage = "error"
st.session_state.error_msg = "GEMINI API KEY FAILED: Gemini API Key is required for audio analysis but was not provided."
st.session_state._ready_to_run = False
st.rerun()
st.session_state.progress = PROGRESS_MAP[stage]
push_status("✅ Audio analysis done.")
st.session_state.stage = "hook analysis"
st.session_state._ready_to_run = False
st.rerun()
elif stage == "hook analysis":
push_status("Evaluating hook…")
if st.session_state.gemini_key and st.session_state.gemini_key.strip():
from files.pipeline.frame_analysis import HookAnalyzer
try:
HookAnalyzer(str(vp), gemini_api_key=st.session_state.gemini_key.strip()).analyze()
except (ValueError, Exception) as api_error:
error_msg = str(api_error)
if "invalid" in error_msg.lower() or "401" in error_msg or "403" in error_msg or "api_key" in error_msg.lower() or "authentication" in error_msg.lower():
st.session_state.stage = "error"
st.session_state.error_msg = f"GEMINI API KEY FAILED: Invalid Gemini API Key provided. Error: {error_msg}"
st.session_state._ready_to_run = False
st.rerun()
else:
raise
else:
st.session_state.stage = "error"
st.session_state.error_msg = "GEMINI API KEY FAILED: Gemini API Key is required for hook analysis but was not provided."
st.session_state._ready_to_run = False
st.rerun()
st.session_state.progress = PROGRESS_MAP[stage]
push_status("✅ Hook analysis done.")
st.session_state.stage = "report"
st.session_state._ready_to_run = False
st.rerun()
elif stage == "report":
push_status("Generating final report…")
if st.session_state.openai_key and st.session_state.openai_key.strip():
from files.pipeline.scoring import VideoReport
try:
VideoReport(str(vp), openai_api_key=st.session_state.openai_key.strip()).generate()
except Exception as api_error:
error_msg = str(api_error)
if "invalid" in error_msg.lower() or "401" in error_msg or "authentication" in error_msg.lower():
st.session_state.stage = "error"
st.session_state.error_msg = f"OPENAI API KEY FAILED: Invalid OpenAI API Key provided. Error: {error_msg}"
st.session_state._ready_to_run = False
st.rerun()
else:
raise
else:
st.session_state.stage = "error"
st.session_state.error_msg = "OPENAI API KEY FAILED: OpenAI API Key is required for report generation but was not provided."
st.session_state._ready_to_run = False
st.rerun()
st.session_state.progress = PROGRESS_MAP[stage]
push_status("🎉 Video report ready!")
st.session_state.stage = "done"
st.session_state._ready_to_run = False
st.rerun()
except Exception as e:
err_type = type(e).__name__
err_msg = str(e).strip()
tb_last = traceback.format_exc(limit=1).strip()
st.session_state.stage = "error"
st.session_state.error_msg = f"{err_type}: {err_msg}\n➡️ {tb_last}"
st.session_state.progress = 0
st.session_state._ready_to_run = False
push_status(f"❌ {err_type}: {err_msg}")
st.rerun()
def run_next_stage_if_needed():
if not st.session_state.stage or st.session_state.stage in ("done", "error"):
return
if not st.session_state._ready_to_run:
st.session_state._ready_to_run = True
time.sleep(0.01)
st.rerun()
else:
_run_current_stage()
# -----------------------------
# Input section
# -----------------------------
report, api_tab = st.tabs(["Upload Video", "🔑 API Configuration"])
with api_tab:
st.markdown("### Configure Your API Keys")
st.markdown("Enter your API keys below. Keys will be validated during analysis. If a key is invalid, you'll see an error message during the analysis stage.")
col1, col2 = st.columns(2)
with col1:
st.session_state.openai_key = st.text_input(
"OpenAI API Key",
type="password",
placeholder="sk-...",
help="Required for frame analysis and report generation",
value=st.session_state.get("openai_key", "")
)
with col2:
st.session_state.gemini_key = st.text_input(
"Gemini API Key",
type="password",
placeholder="AIza...",
help="Required for audio analysis and hook analysis",
value=st.session_state.get("gemini_key", "")
)
st.markdown("---")
st.info("💡 Add your API keys and return to the Upload Video tab to start analysis. Invalid keys will show error messages during the analysis process.")
if 'openai_key' not in st.session_state:
st.session_state.openai_key = ""
if 'gemini_key' not in st.session_state:
st.session_state.gemini_key = ""
with report:
method = st.radio("Choose Upload Method", ["Paste Video URL", "Upload MP4 File"], horizontal=True)
col_in_1, col_in_2 = st.columns([1, 1])
if method == "Paste Video URL":
st.session_state.mode = "url"
url = st.text_input(
"Paste direct video URL [insta / tiktok / yt-shorts]",
placeholder="https://example.com/@username/video/123",
value=st.session_state.url,
)
st.session_state.url = url
run_from_url = col_in_1.button("Run Analysis", key="run_url")
if run_from_url:
if not url:
st.error("❌ Please enter a video URL.")
else:
st.session_state.cancel = False
st.session_state.stage = "download video"
st.session_state.status = []
st.session_state.progress = 0
st.session_state._ready_to_run = False
st.rerun()
else:
st.session_state.mode = "file"
uploaded = st.file_uploader("Upload MP4 File", type=["mp4"])
run_from_file = col_in_1.button("Run Analysis", key="run_file")
if uploaded and run_from_file:
clean_name = sanitize_filename(Path(uploaded.name).stem) + ".mp4"
dest = RAW_DIR / clean_name
with dest.open("wb") as f:
f.write(uploaded.getbuffer())
st.session_state.video_path = str(dest)
st.session_state.clean_title = Path(clean_name).stem
# Skip if a report is already present
_, _, _, _, report_json = get_paths(dest)
if Path(report_json).exists():
st.session_state.stage = "done"
st.session_state.status = ["📄 Report already exists. Skipping analysis."]
st.session_state.progress = 100
st.rerun()
st.session_state.cancel = False
st.session_state.status = ["✅ Upload complete."]
st.session_state.progress = 0
st.session_state.stage = "scene detection"
st.session_state._ready_to_run = False
st.rerun()
# -----------------------------
# Progress & Status
# -----------------------------
if st.session_state.stage and st.session_state.stage not in ("done", "error"):
percent = st.session_state.progress
stage = st.session_state.stage.replace("_", " ").title()
st.markdown(f"##### {stage}: {percent}%")
st.progress(percent)
if st.button("Cancel Processing"):
st.session_state.cancel = True
st.rerun()
run_next_stage_if_needed()
# -----------------------------
# Error state
# -----------------------------
if st.session_state.stage == "error":
error_msg = st.session_state.error_msg or "An unknown error occurred."
# Detect which API key failed and display prominently
if "openai" in error_msg.lower() or "openai" in str(st.session_state.error_msg).lower():
st.error("🚨 **API KEY ERROR: OpenAI Key Failed**")
st.markdown("""
<div style='background-color: #fee2e2; border-left: 4px solid #ef4444; padding: 1rem; margin: 1rem 0; border-radius: 4px;'>
<h4 style='color: #991b1b; margin-top: 0;'>❌ OpenAI API Key Invalid or Missing</h4>
<p style='color: #7f1d1d; margin-bottom: 0;'><strong>Error Details:</strong> {}</p>
<p style='color: #7f1d1d; margin-top: 0.5rem;'>Please go to the <strong>🔑 API Configuration</strong> tab and update your OpenAI API key.</p>
</div>
""".format(error_msg), unsafe_allow_html=True)
elif "gemini" in error_msg.lower() or "gemini" in str(st.session_state.error_msg).lower():
st.error("🚨 **API KEY ERROR: Gemini Key Failed**")
st.markdown("""
<div style='background-color: #fee2e2; border-left: 4px solid #ef4444; padding: 1rem; margin: 1rem 0; border-radius: 4px;'>
<h4 style='color: #991b1b; margin-top: 0;'>❌ Gemini API Key Invalid or Missing</h4>
<p style='color: #7f1d1d; margin-bottom: 0;'><strong>Error Details:</strong> {}</p>
<p style='color: #7f1d1d; margin-top: 0.5rem;'>Please go to the <strong>🔑 API Configuration</strong> tab and update your Gemini API key.</p>
</div>
""".format(error_msg), unsafe_allow_html=True)
else:
# Generic error display
st.error("🚨 **ANALYSIS FAILED**")
st.markdown(f"""
<div style='background-color: #fee2e2; border-left: 4px solid #ef4444; padding: 1rem; margin: 1rem 0; border-radius: 4px;'>
<h4 style='color: #991b1b; margin-top: 0;'>❌ Error Occurred</h4>
<p style='color: #7f1d1d; margin-bottom: 0;'><strong>Error Details:</strong> {error_msg}</p>
</div>
""", unsafe_allow_html=True)
st.warning("⚠️ **Report Not Generated**: The analysis pipeline stopped due to the error above. No report was created.")
if st.button("🔄 Reset & Try Again", type="primary", use_container_width=True):
reset_state(clear_video=True)
st.rerun()
# -----------------------------
# Results section
# -----------------------------
if st.session_state.stage == "done" and st.session_state.video_path:
vp = Path(st.session_state.video_path)
scene_json, frame_json, audio_json, hook_json, report_json = get_paths(vp)
st.success("Analysis complete.")
with st.expander("Preview Video", expanded=False):
if vp.exists():
st.video(str(vp), format="video/mp4")
report = safe_load_json(report_json)
audio_data = safe_load_json(audio_json)
hook_data = safe_load_json(hook_json)
if not report:
st.warning("No report found. You can rerun the analysis.")
else:
results_tab, json_tab = st.tabs(["Results", "JSON Reports"])
with results_tab:
st.markdown(
"<h2 style='text-align: center;'>📝 Video Virality Report</h2>",
unsafe_allow_html=True
)
# --- Main Score Cards ---
total = report.get("total_score", 0)
st.markdown(f"""
<div style="text-align:center; margin-bottom:1rem;">
<div style="font-size:2rem; font-weight:bold; color:#10b981;">Total Score: {total}</div>
<p style="color:#9ca3af;">Overall Virality Potential</p>
</div>
""", unsafe_allow_html=True)
scores = report.get("scores", {})
if scores:
cols = st.columns(len(scores))
for col, (cat, val) in zip(cols, scores.items()):
color = "#10b981" if val >= 70 else "#fbbf24" if val >= 50 else "#ef4444"
with col:
st.markdown(
f"""
<div style="background:{color}22;
border-radius:12px;
padding:1rem;
text-align:center;
box-shadow:0 2px 8px rgba(0,0,0,.08);height:100%">
<h4 style="margin:0; font-size:0.9rem; color:#d1d5db">{cat.title()}</h4>
<p style="margin:0; font-size:1.5rem; font-weight:700; color:{color}">{val}</p>
</div>
""",
unsafe_allow_html=True,
)
# --- Matrices (tone, emotion, pace, facial_sync) ---
st.markdown(
"""
<div style="text-align:center; margin-bottom:1rem; margin-top:1rem;">
<p style="color:#9ca3af;">Video Attributes</p>
</div>
""",
unsafe_allow_html=True
)
matrices = report.get("matrices", {})
if matrices:
attr_cols = st.columns(len(matrices))
for col, (k, v) in zip(attr_cols, matrices.items()):
color = "#10b981" if str(v).lower() in ["high", "positive", "fast", "good", "funny", "joy"] else "#fbbf24" if str(v).lower() in ["medium", "neutral", "mixed"] else "#ef4444"
with col:
st.markdown(f"""
<div style="background:{color}22;
border-radius:12px;
padding:1rem;
text-align:center;
box-shadow:0 2px 6px rgba(0,0,0,0.1)">
<h4 style="margin:0; font-size:0.9rem; color:#d1d5db">{k.title()}</h4>
<p style="margin:0; font-size:1.3rem; font-weight:700; color:{color}">{v}</p>
</div>
""", unsafe_allow_html=True)
# --- Summary ---
if "summary" in report:
st.markdown(
"""
<h2 style='text-align: center; font-size:1.4rem; margin-top:1.3rem;'>
Report Summary
</h2>
""",
unsafe_allow_html=True
)
st.markdown(
f"""
<div style='background-color:#1e3a8a20;
border-left: 0.25rem solid #3b82f6;
border-radius: 8px;
padding: 1rem;
text-align: center;
color: #d1d5db;'>
{report["summary"]}
</div>
""",
unsafe_allow_html=True
)
# --- Suggestions ---
st.markdown(
"""
<h2 style='text-align: center; font-size:1.4rem; margin-top:1.3rem;'>
Suggestions
</h2>
""",
unsafe_allow_html=True
)
suggestions = report.get("suggestions", [])
if suggestions:
for i, s in enumerate(suggestions, start=1):
st.markdown(
f"<p style='text-align:center; font-size:1rem;'> {s}</p>",
unsafe_allow_html=True
)
else:
st.markdown(
"<p style='text-align:center; color:gray;'>No improvement suggestions provided.</p>",
unsafe_allow_html=True
)
# --- Audio Analysis ---
if audio_data:
st.markdown(
"""
<h2 style='text-align: center; font-size:1.4rem; margin-top:1.5rem;'>
Audio Analysis
</h2>
""",
unsafe_allow_html=True
)
# --- Audio Score Cards ---
metrics = {
"Delivery Score": audio_data.get("delivery_score", ""),
"Duration (s)": round(audio_data.get("duration_seconds", 0), 2),
"Words/Sec": audio_data.get("words_per_second", 0),
"Tone": audio_data.get("tone", ""),
"Emotion": audio_data.get("emotion", ""),
"Pace": audio_data.get("pace", ""),
}
cols = st.columns(len(metrics))
for col, (title, value) in zip(cols, metrics.items()):
color = "#10b981"
if title in ["Delivery Score", "Tone", "Emotion", "Pace"]:
if title == "Delivery Score" and isinstance(value, (int, float)):
color = "#10b981" if value >= 70 else "#fbbf24" if value >= 50 else "#ef4444"
else:
val = str(value).lower()
if val in ["high", "positive", "fast", "good", "funny", "clear", "joy"]:
color = "#10b981"
elif val in ["medium", "neutral", "mixed", "average"]:
color = "#fbbf24"
elif val in ["low", "negative", "slow", "bad", "sad"]:
color = "#ef4444"
else:
color = "#d1d5db"
with col:
st.markdown(
f"""
<div style="background:{color}22;
border-radius:12px;
padding:1rem;
text-align:center;
box-shadow:0 2px 6px rgba(0,0,0,0.15);
margin-bottom:0.8rem;">
<h4 style="margin:0; font-size:0.85rem; color:#d1d5db">{title}</h4>
<p style="margin:0; font-size:1.3rem; font-weight:700; color:{color}">{value}</p>
</div>
""",
unsafe_allow_html=True,
)
# Transcript box
st.markdown(
f"""
<div style='background:#111827;
border-left: 4px solid #3b82f6;
padding:1rem;
margin-top:1rem;
border-radius:8px;
text-align:left;
color:#e5e7eb;'>
<b>Transcript:</b><br>
<i>{audio_data.get("full_transcript","")}</i>
</div>
""",
unsafe_allow_html=True
)
# Comment box
st.markdown(
f"""
<div style='background:#1e293b;
border-radius:8px;
padding:0.8rem;
margin-top:0.5rem;
text-align:center;
font-size:0.95rem;
color:#d1d5db;'>
{audio_data.get("comment","")}
</div>
""",
unsafe_allow_html=True
)
# --- Hook Analysis ---
if hook_data:
st.markdown(
"""
<h2 style='text-align: center; font-size:1.4rem; margin-top:1.5rem;'>
Hook Analysis
</h2>
""",
unsafe_allow_html=True
)
# --- Hook Score Card ---
score = hook_data.get("hook_alignment_score", 0)
color = "#10b981" if score >= 70 else "#fbbf24" if score >= 50 else "#ef4444"
st.markdown(
f"""
<div style="background:{color}22;
border-radius:12px;
padding:1.2rem;
text-align:center;
box-shadow:0 2px 6px rgba(0,0,0,0.1);
margin:0 auto;
width:50%;">
<h4 style="margin:0; font-size:1rem; color:#d1d5db;">Hook Alignment Score</h4>
<p style="margin:0; font-size:2rem; font-weight:700; color:{color};">{score}</p>
</div>
""",
unsafe_allow_html=True
)
# --- Comment Box ---
st.markdown(
f"""
<div style='background:#1e293b;
border-radius:8px;
padding:0.8rem;
margin-top:0.5rem;
text-align:center;
font-size:0.95rem;
color:#d1d5db;'>
{audio_data.get("comment","")}
</div>
""",
unsafe_allow_html=True
)
# --- Download Report ---
st.markdown("<br>", unsafe_allow_html=True)
st.download_button(
"Download Final Report",
json.dumps(report, indent=2),
file_name="final_report.json",
)
with json_tab:
with st.expander("Scene Detection", expanded=False):
st.json(safe_load_json(scene_json))
with st.expander("Extracted Frames", expanded=False):
frames_dir = INTERIM_DIR / "frames" / f"{vp.stem}_"
if frames_dir.exists():
imgs = sorted(frames_dir.glob("*.jpg"))
if imgs:
cols = st.columns(4)
for i, img in enumerate(imgs):
with cols[i % 4]:
st.image(str(img), use_container_width=True)
else:
st.info("No frames found.")
else:
st.info("No frames directory found.")
with st.expander("Frame Analysis", expanded=False):
st.json(safe_load_json(frame_json))
with st.expander("Audio Analysis", expanded=False):
st.json(audio_data)
with st.expander("Hook Analysis", expanded=False):
st.json(hook_data)
with st.expander("Final Report", expanded=False):
st.json(report)
# Reset button only after analysis is done
if st.button("Reset Session"):
reset_state(clear_video=True)
st.rerun()