#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Configure UTF-8 encoding for stdout/stderr on Windows
import sys
import io
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
from nicegui import app, ui, events
import asyncio
import warnings
import logging
# Suppress Windows asyncio ConnectionResetError warnings (harmless cleanup warnings)
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Suppress specific asyncio warnings
warnings.filterwarnings('ignore', category=ResourceWarning)
logging.getLogger('asyncio').setLevel(logging.ERROR)
import textwrap
import json
import os
import re
from io import StringIO
import glob
from types import SimpleNamespace, MethodType
from pathlib import Path
# --- Imports from the original script ---
from moviepy import VideoFileClip, concatenate_videoclips
import os
import json
import uuid
import glob
import re
from dotenv import load_dotenv
# Assuming these are in a 'src' directory relative to the script
from src.config.config import Config
from utils.litellm import LiteLLMWrapper
from src.core.video_planner import VideoPlanner
from src.core.code_generator import CodeGenerator
from src.core.video_renderer import VideoRenderer
from src.utils.utils import extract_xml
from src.utils.error_recovery import ErrorRecovery
from prompts import get_banned_reasonings
from prompts.prompts_raw import (
_code_font_size,
_code_disable,
_code_limit,
_prompt_manim_cheatsheet,
)
# Load allowed models list from JSON file
allowed_models_path = os.path.join(
os.path.dirname(__file__), "src", "utils", "models.json"
)
with open(allowed_models_path, "r", encoding="utf-8") as f:
allowed_models = json.load(f).get("allowed_models", [])
load_dotenv(override=True)
# --- App State (Replaces st.session_state) ---
app_state = {
"log_stream": StringIO(),
"selected_topic": None,
"current_topic_inspector": None, # For the inspector tab
"latest_pause_time": 0.0,
"current_tcm_entry": None,
"chat_histories": {}, # one history per topic
"planner_model_name": next(
(m for m in allowed_models if "gemini" in m), allowed_models[0]
),
"max_retries": 5,
"max_scene_concurrency": 5,
}
# --- All Helper Functions from the original script (remain unchanged) ---
# --- (This section is collapsed for brevity, it's identical to your script) ---
def get_topic_folders(output_dir):
"""
Finds and returns the list of high-level topic names (e.g., "Bubble Sort").
A valid topic folder must contain at least one run subfolder, which in turn
must contain a scene outline.
"""
if not os.path.exists(output_dir):
return []
valid_topics = set()
for top_level_dir in os.listdir(output_dir):
top_level_path = os.path.join(output_dir, top_level_dir)
if os.path.isdir(top_level_path):
# Check inside this folder for run folders (e.g., "bubble_sort")
for inner_item in os.listdir(top_level_path):
inner_path = os.path.join(top_level_path, inner_item)
# If there's a sub-directory containing an outline, the top-level is a valid topic
if os.path.isdir(inner_path) and glob.glob(
os.path.join(inner_path, "*_scene_outline.txt")
):
valid_topics.add(
top_level_dir
) # Add the human-readable name, e.g., "Bubble Sort"
break # Found a valid run, no need to check other runs in this topic folder
return sorted(list(valid_topics))
def get_project_path(output_dir, topic_name):
"""
Gets the path to the specific run folder (e.g., "output/Bubble Sort/bubble_sort").
It finds the first sub-directory within the main topic folder.
"""
top_level_path = os.path.join(output_dir, topic_name)
if not os.path.isdir(top_level_path):
# This case handles when the topic folder itself hasn't been created yet.
return top_level_path
# Find the first subdirectory inside the topic folder (e.g., "bubble_sort")
for item in os.listdir(top_level_path):
potential_path = os.path.join(top_level_path, item)
if os.path.isdir(potential_path):
# Return the path to the inner run folder
return potential_path
# Fallback if no inner run folder is found (shouldn't happen for valid topics)
return top_level_path
def safe_read_file(path, clean=True):
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
if not clean:
return content
patterns_to_remove = [
r"?SCENE_VISION_STORYBOARD_PLAN>",
r"?SCENE_TECHNICAL_IMPLEMENTATION_PLAN>",
r"?SCENE_ANIMATION_NARRATION_PLAN>",
r"# Scene \d+ Implementation Plan",
r"\[SCENE_VISION\]",
r"\[STORYBOARD\]",
r"\[ANIMATION_STRATEGY\]",
r"\[NARRATION\]",
r"\[ANIMATION:.*?\]",
]
for pattern in patterns_to_remove:
content = re.sub(pattern, "", content)
return content.strip()
except FileNotFoundError:
return f"File not found at: {path}"
except Exception as e:
return f"Error reading file: {e}"
def get_scene_implementation_context(output_dir, topic, scene_number):
"""
Retrieves scene implementation details including code, visual plan,
technical implementation, and animation narration for a specific scene.
Returns a dictionary with all available implementation details.
"""
project_path = get_project_path(output_dir, topic)
file_prefix = os.path.basename(project_path)
scene_dir = os.path.join(project_path, f"scene{scene_number}")
context = {
"scene_number": scene_number,
"implementation_plan": None,
"code": None,
"visual_plan": None,
"technical_plan": None,
"animation_narration": None,
}
# Load implementation plan
plan_path = os.path.join(scene_dir, f"{file_prefix}_scene{scene_number}_implementation_plan.txt")
if os.path.exists(plan_path):
context["implementation_plan"] = safe_read_file(plan_path, clean=False)
# Extract specific sections from implementation plan
plan_content = context["implementation_plan"]
# Extract visual storyboard
visual_match = re.search(
r'(.*?)',
plan_content, re.DOTALL
)
if visual_match:
context["visual_plan"] = visual_match.group(1).strip()
# Extract technical implementation
tech_match = re.search(
r'(.*?)',
plan_content, re.DOTALL
)
if tech_match:
context["technical_plan"] = tech_match.group(1).strip()
# Extract animation narration
anim_match = re.search(
r'(.*?)',
plan_content, re.DOTALL
)
if anim_match:
context["animation_narration"] = anim_match.group(1).strip()
# Load generated code
code_dir = os.path.join(scene_dir, "code")
if os.path.exists(code_dir):
code_files = [f for f in os.listdir(code_dir) if f.endswith('.py')]
if code_files:
# Get the latest version
code_files.sort(reverse=True)
code_path = os.path.join(code_dir, code_files[0])
context["code"] = safe_read_file(code_path, clean=False)
return context
def extract_scene_number_from_concept_id(concept_id):
"""
Extracts scene number from concept ID.
Example: "Bubble_Sort.scene_1.array_initialization" -> 1
"""
match = re.search(r'scene_(\d+)', concept_id)
if match:
return int(match.group(1))
return None
def check_status(self, theorem: dict):
topic = theorem["theorem"]
project_path = get_project_path(self.output_dir, topic)
inner_folder_name = os.path.basename(project_path)
file_prefix = inner_folder_name
scene_outline_path = os.path.join(project_path, f"{file_prefix}_scene_outline.txt")
has_scene_outline = os.path.exists(scene_outline_path)
num_scenes = 0
if has_scene_outline:
with open(scene_outline_path, "r") as f:
scene_outline = f.read()
scene_outline_content = extract_xml(scene_outline, "SCENE OUTLINE")
num_scenes = len(re.findall(r"[^<]", scene_outline_content))
implementation_plans, code_files, rendered_scenes = 0, 0, 0
scene_status = []
for i in range(1, num_scenes + 1):
scene_dir = os.path.join(project_path, f"scene{i}")
plan_path = os.path.join(
scene_dir, f"{file_prefix}_scene{i}_implementation_plan.txt"
)
has_plan = os.path.exists(plan_path)
if has_plan:
implementation_plans += 1
code_dir = os.path.join(scene_dir, "code")
has_code = os.path.exists(code_dir) and any(
f.endswith(".py") for f in os.listdir(code_dir)
)
if has_code:
code_files += 1
has_render = os.path.exists(os.path.join(scene_dir, "succ_rendered.txt"))
if has_render:
rendered_scenes += 1
scene_status.append(
{
"scene_number": i,
"has_plan": has_plan,
"has_code": has_code,
"has_render": has_render,
}
)
has_combined_video = os.path.exists(
os.path.join(project_path, f"{file_prefix}_combined.mp4")
)
return {
"topic": topic,
"has_scene_outline": has_scene_outline,
"total_scenes": num_scenes,
"implementation_plans": implementation_plans,
"code_files": code_files,
"rendered_scenes": rendered_scenes,
"has_combined_video": has_combined_video,
"scene_status": scene_status,
}
def set_active_output_dir(self, new_output_dir):
self.output_dir = new_output_dir
self.planner.output_dir = new_output_dir
self.code_generator.output_dir = new_output_dir
self.video_renderer.output_dir = new_output_dir
def load_voices(file_path="src/tts/voices.json"):
if not os.path.exists(file_path):
return []
try:
with open(file_path, "r", encoding="utf-8") as f:
voices = json.load(f)
return [v for v in voices if "id" in v and "name" in v]
except Exception as e:
ui.notify(f"Error loading src/tts/voices.json: {e}", type="negative")
return []
# --- UPDATED/NEW HELPER FUNCTIONS ---
def find_latest_video_for_scene(project_path: str, scene_num: int) -> str | None:
"""
Finds the latest rendered video for a given scene number in the project.
(Implementation from Streamlit script)
"""
videos_dir = os.path.join(project_path, "media", "videos")
search_pattern = os.path.join(videos_dir, f"*scene{scene_num}_v*")
potential_folders = glob.glob(search_pattern)
if not potential_folders:
print(f" - No video folders found for scene {scene_num}.")
return None
def get_version(path):
m = re.search(r"_v(\d+)", path)
return int(m.group(1)) if m else -1
latest_folder = max(potential_folders, key=get_version)
for res in ["1080p60", "720p30", "480p15"]:
video_file = os.path.join(latest_folder, res, f"Scene{scene_num}.mp4")
if os.path.exists(video_file):
return video_file
print(f" - No video file found for scene {scene_num} in {latest_folder}.")
return None
def split_narration_to_chunks(narration, max_chars=40, max_lines=2):
"""
Split narration into chunks suitable for SRT (by sentences, then by line length).
(New function from Streamlit script)
"""
sentences = re.split(r"(?<=[.!?])\s+", narration)
chunks = []
for sentence in sentences:
if not sentence.strip():
continue
wrapped = textwrap.wrap(sentence.strip(), width=max_chars)
for i in range(0, len(wrapped), max_lines):
block = wrapped[i : i + max_lines]
chunks.append(" ".join(block))
return chunks
def tcm_to_srt(
tcm: list, max_line_length=40, max_lines=2, max_block_duration=4.0
) -> str:
"""
Convert TCM events to SRT, splitting long narration and duration into multiple blocks.
(Implementation from Streamlit script)
"""
def sec_to_srt_time(sec):
h = int(sec // 3600)
m = int((sec % 3600) // 60)
s = int(sec % 60)
ms = int(round((sec - int(sec)) * 1000))
return f"{h:02}:{m:02}:{s:02},{ms:003}"
srt_lines = []
idx = 1
for event in tcm:
narration = event.get("narrationText", "").replace("*", "").strip()
if not narration or narration == "...":
continue
start = float(event["startTime"])
end = float(event["endTime"])
duration = end - start
chunks = split_narration_to_chunks(
narration, max_chars=max_line_length, max_lines=max_lines
)
n_chunks = len(chunks)
if n_chunks == 0:
continue
chunk_duration = min(duration / n_chunks, max_block_duration)
chunk_start = start
for chunk in chunks:
chunk_end = min(chunk_start + chunk_duration, end)
srt_lines.append(f"{idx}")
srt_lines.append(
f"{sec_to_srt_time(chunk_start)} --> {sec_to_srt_time(chunk_end)}"
)
wrapped = textwrap.wrap(chunk, width=max_line_length)
if len(wrapped) > max_lines:
wrapped = wrapped[: max_lines - 1] + [
" ".join(wrapped[max_lines - 1 :])
]
srt_lines.extend(wrapped)
srt_lines.append("")
idx += 1
chunk_start = chunk_end
if chunk_start >= end:
break
return "\n".join(srt_lines)
def tcm_to_vtt(
tcm: list, max_line_length=40, max_lines=2, max_block_duration=4.0
) -> str:
"""
Convert TCM events directly to WebVTT format.
More efficient than converting SRT ā VTT.
"""
def sec_to_vtt_time(sec):
h = int(sec // 3600)
m = int((sec % 3600) // 60)
s = int(sec % 60)
ms = int(round((sec - int(sec)) * 1000))
return f"{h:02}:{m:02}:{s:02}.{ms:003}"
vtt_lines = ['WEBVTT', ''] # Header with blank line
idx = 1
for event in tcm:
narration = event.get("narrationText", "").replace("*", "").strip()
if not narration or narration == "...":
continue
start = float(event["startTime"])
end = float(event["endTime"])
duration = end - start
chunks = split_narration_to_chunks(
narration, max_chars=max_line_length, max_lines=max_lines
)
n_chunks = len(chunks)
if n_chunks == 0:
continue
chunk_duration = min(duration / n_chunks, max_block_duration)
chunk_start = start
for chunk in chunks:
chunk_end = min(chunk_start + chunk_duration, end)
vtt_lines.append(f"{idx}")
vtt_lines.append(
f"{sec_to_vtt_time(chunk_start)} --> {sec_to_vtt_time(chunk_end)}"
)
wrapped = textwrap.wrap(chunk, width=max_line_length)
if len(wrapped) > max_lines:
wrapped = wrapped[: max_lines - 1] + [
" ".join(wrapped[max_lines - 1 :])
]
vtt_lines.extend(wrapped)
vtt_lines.append("")
idx += 1
chunk_start = chunk_end
if chunk_start >= end:
break
return "\n".join(vtt_lines)
def srt_to_vtt(srt_content: str) -> str:
"""
Convert SRT subtitle format to WebVTT format.
Note: For better performance, use tcm_to_vtt() directly instead of tcm_to_srt() ā srt_to_vtt().
"""
lines = srt_content.strip().split('\n')
vtt_lines = ['WEBVTT\n']
for line in lines:
# Convert SRT timestamp format (00:00:00,000) to VTT format (00:00:00.000)
if '-->' in line:
line = line.replace(',', '.')
vtt_lines.append(line)
return '\n'.join(vtt_lines)
def regenerate_subtitles_only(topic: str, output_dir: str = "output"):
"""
Regenerate ONLY subtitle files (SRT/VTT) without re-combining videos.
Much faster than combine_videos() when video already exists.
"""
project_path = get_project_path(output_dir, topic)
project_name = os.path.basename(os.path.dirname(project_path))
inner_folder_name = os.path.basename(project_path)
output_video_path = os.path.join(project_path, f"{inner_folder_name}_combined.mp4")
output_tcm_path = os.path.join(project_path, f"{inner_folder_name}_combined_tcm.json")
output_srt_path = os.path.join(project_path, f"{inner_folder_name}_combined.srt")
output_vtt_path = os.path.join(project_path, f"{inner_folder_name}_combined.vtt")
# Check if combined video exists - REMOVED CHECK to allow rebuilding from scenes
# if not os.path.exists(output_video_path):
# print(f"[{topic}] ERROR: Combined video not found. Run full combine_videos() first.")
# return "no_video"
print(f"[{topic}] ==> Regenerating subtitles and Re-combining video scenes...")
# Build TCM from scene proto_tcm files
final_tcm = []
global_time_offset = 0.0
video_clips = [] # Collect scene clips for recombination
try:
scene_dirs = sorted(
glob.glob(os.path.join(project_path, "scene*")),
key=lambda x: int(re.search(r"scene(\d+)", x).group(1)),
)
except (TypeError, ValueError):
print(f" - ERROR: Could not sort scene directories in '{project_path}'.")
return "error"
for scene_dir in scene_dirs:
scene_num = int(re.search(r"scene(\d+)", os.path.basename(scene_dir)).group(1))
video_path = find_latest_video_for_scene(project_path, scene_num)
proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json")
succ_rendered_path = os.path.join(scene_dir, "succ_rendered.txt")
if not video_path or not os.path.exists(succ_rendered_path):
continue
# Collect video clip for recombination
from moviepy import VideoFileClip
video_clips.append(VideoFileClip(video_path))
if not os.path.exists(proto_tcm_path):
continue
try:
# Get video duration from the clip we just loaded or the path
# We already loaded it into video_clips, let's use that to be safe/efficient?
# actually accessing .duration on the list item is fine
actual_duration = video_clips[-1].duration
with open(proto_tcm_path, "r", encoding="utf-8") as f:
proto_tcm = json.load(f)
# Load actual audio durations AND paths from voiceover cache
actual_audio_durations = {}
actual_audio_paths = {} # New: Store paths to rebuild audio track
possible_cache_dirs = [
os.path.join(scene_dir, "code", "media", "voiceovers"),
os.path.join(project_path, "media", "voiceovers"),
os.path.join(os.path.dirname(video_path), "voiceovers"),
]
# Find the active voiceover cache directory
active_cache_dir = None
for d in possible_cache_dirs:
if os.path.exists(d):
active_cache_dir = d
break
# If no cache dir exists, default to one
if not active_cache_dir:
active_cache_dir = possible_cache_dirs[0]
os.makedirs(active_cache_dir, exist_ok=True)
# Ensure all audio files exist (including "..." pauses) to fix sync
# This is critical because if "..." files are missing, their duration is treated as 0 or estimated,
# but having the real audio file (even if silent) locks the timing perfectly.
from src.utils.kokoro_voiceover import KokoroService
from pathlib import Path
# We initialize a temporary service just for generation
temp_speech_service = None
for event in proto_tcm:
narration_text = event.get("narrationText", "")
if not narration_text:
continue
# User requested full regeneration of voiceovers to ensure sync.
if narration_text.strip():
if temp_speech_service is None:
temp_speech_service = KokoroService()
# Force generation/retrieval for ALL text to ensure file exists and is consistent
try:
temp_speech_service.generate_from_text(narration_text, cache_dir=Path(active_cache_dir))
except Exception as e:
print(f" - Warning: Failed to regenerate Voiceover experienced error: {e}")
import traceback
traceback.print_exc()
# Now proceed to load the cache (durations and paths)
for voiceover_cache_dir in possible_cache_dirs:
if os.path.exists(voiceover_cache_dir):
# Check for cache.json file (Manim voiceover format)
cache_file = os.path.join(voiceover_cache_dir, "cache.json")
if os.path.exists(cache_file):
try:
with open(cache_file, "r", encoding="utf-8") as f:
cache_data = json.load(f)
# cache_data is an array of audio entries
for entry in cache_data:
# Normalize text keys
text_key = entry.get("input_text", "")
if text_key and "original_audio" in entry:
audio_file = os.path.join(voiceover_cache_dir, entry["original_audio"])
if os.path.exists(audio_file):
from moviepy import AudioFileClip
with AudioFileClip(audio_file) as audio:
actual_audio_durations[text_key] = audio.duration
actual_audio_paths[text_key] = audio_file
except Exception as e:
print(f" - Warning: Could not load cache.json: {e}")
if actual_audio_durations:
print(f" - Loaded {len(actual_audio_durations)} actual audio durations from cache")
break
# CRITICAL FIX: Only count estimated duration for events WITH narration
# Empty narration events don't contribute to video duration
# UPDATE: We MUST count all events, even silent ones, otherwise we drift early
# Base calculation on max(audio, estimated) to account for visual padding
total_ideal_duration = 0.0
event_base_durations = []
for event in proto_tcm:
narration_text = event.get("narrationText", "")
est_duration = event.get("estimatedDuration", 1.0)
# Determine audio duration if exists
audio_dur = 0.0
if narration_text and narration_text.strip(): # Check all text, including "..."
if narration_text in actual_audio_durations:
audio_dur = actual_audio_durations[narration_text]
else:
# Fallback try to find match
for cached_text, cached_duration in actual_audio_durations.items():
if cached_text.strip() == narration_text.strip():
audio_dur = cached_duration
break
# Base Duration: Max of Audio and Estimate
# This ensures we respect the visual time (estimate) if it's longer than audio
# And respect audio time if it's longer than visual estimate
base_dur = max(audio_dur, est_duration)
event_base_durations.append(base_dur)
total_ideal_duration += base_dur
scaling_factor = actual_duration / total_ideal_duration if total_ideal_duration > 0 else 1.0
# Collect audio clips for reconstruction
audio_clips_collection = []
# Build TCM with proper timing
scene_time_offset = 0
for i, event in enumerate(proto_tcm):
narration_text = event.get("narrationText", "")
# Apply scaling to the ideal base duration
# This distributes any global timing difference (transistions, rendering overhead) proportionally
base_dur = event_base_durations[i]
final_event_duration = base_dur * scaling_factor
current_start_time = global_time_offset + scene_time_offset
event["startTime"] = f"{current_start_time:.3f}"
event["endTime"] = f"{current_start_time + final_event_duration:.3f}"
event["conceptId"] = f"{project_name}.scene_{scene_num}.{event.get('conceptName', 'event').replace(' ', '_')}"
event["_sync_fix_applied"] = True
# Add audio clip to collection if available
# This reconstructs the audio track with the exact same timing as the subtitles
if narration_text and narration_text.strip():
audio_path = None
if narration_text in actual_audio_paths:
audio_path = actual_audio_paths[narration_text]
else:
# Fallback search
for cached_text, cached_path in actual_audio_paths.items():
if cached_text.strip() == narration_text.strip():
audio_path = cached_path
break
if audio_path:
try:
from moviepy import AudioFileClip
# Create Audio clip starting at the exact synchronized time
# We limit duration to final_event_duration to avoid overlap if massive scaling happened (rare)
ac = AudioFileClip(audio_path).set_start(current_start_time)
audio_clips_collection.append(ac)
except Exception as audio_err:
print(f"Warning: Failed to load audio clip {audio_path}: {audio_err}")
if "estimatedDuration" in event:
del event["estimatedDuration"]
final_tcm.append(event)
scene_time_offset += final_event_duration
global_time_offset += actual_duration
except Exception as e:
print(f" - ERROR processing Scene {scene_num}: {e}")
if final_tcm:
# Save updated TCM
with open(output_tcm_path, "w", encoding="utf-8") as f:
json.dump(final_tcm, f, indent=2, ensure_ascii=False)
# Generate subtitles
srt_content = tcm_to_srt(final_tcm)
with open(output_srt_path, "w", encoding="utf-8") as f:
f.write(srt_content)
vtt_content = tcm_to_vtt(final_tcm)
with open(output_vtt_path, "w", encoding="utf-8") as f:
f.write(vtt_content)
# RE-RENDER VIDEO WITH NEW AUDIO BY RE-COMBINING SCENES
try:
print(f"[{topic}] ==> Re-combining scene videos with synchronized audio...")
from moviepy import VideoFileClip, CompositeAudioClip, concatenate_videoclips
# Combine original scene clips
if not video_clips:
print(f"[{topic}] Error: No video clips found to combine.")
return "error"
original_video = concatenate_videoclips(video_clips)
# Create new composite audio
if audio_clips_collection:
new_audio = CompositeAudioClip(audio_clips_collection)
# Ensure audio doesn't exceed video duration
new_audio = new_audio.set_duration(original_video.duration)
# Set new audio to video
final_video = original_video.set_audio(new_audio)
# Write to temp file first then rename to avoid corruption
temp_output = output_video_path.replace(".mp4", "_temp_sync.mp4")
final_video.write_videofile(
temp_output,
codec="libx264",
audio_codec="aac",
logger="bar",
threads=4
)
# Close clips
original_video.close()
new_audio.close()
final_video.close()
# Close individual scene clips
for clip in video_clips:
clip.close()
# Replace old file
if os.path.exists(temp_output):
if os.path.exists(output_video_path):
os.remove(output_video_path)
os.rename(temp_output, output_video_path)
print(f"[{topic}] ==> Video re-combined and re-rendered successfully with new audio!")
else:
# If no Audio, just save the combined video?
# Or wait, if we have no audio clips, maybe we shouldn't save?
# But the user might want a silent video.
# For now let's assume we proceed but warn.
print(f"[{topic}] Warning: No audio clips collected. Saving Combined Video without new audio track.")
original_video.write_videofile(
output_video_path,
codec="libx264",
audio_codec="aac",
logger="bar",
threads=4
)
original_video.close()
for clip in video_clips:
clip.close()
except Exception as vid_err:
print(f"[{topic}] ERROR re-rendering video: {vid_err}")
import traceback
traceback.print_exc()
print(f"[{topic}] ==> Project finalized.")
return "success"
else:
print(f"[{topic}] <== No TCM data found.")
return "no_data"
def combine_videos(topic: str, output_dir: str = "output"):
"""
Combines all videos for a topic and generates the final, fine-grained TCM and SRT subtitles.
(Implementation from Streamlit script, adapted with NiceGUI notifications)
"""
project_path = get_project_path(output_dir, topic)
project_name = os.path.basename(os.path.dirname(project_path))
inner_folder_name = os.path.basename(project_path)
output_video_path = os.path.join(project_path, f"{inner_folder_name}_combined.mp4")
output_tcm_path = os.path.join(
project_path, f"{inner_folder_name}_combined_tcm.json"
)
output_srt_path = os.path.join(project_path, f"{inner_folder_name}_combined.srt")
output_vtt_path = os.path.join(project_path, f"{inner_folder_name}_combined.vtt")
# Check if we need to regenerate due to subtitle sync improvements
needs_regeneration = False
if os.path.exists(output_tcm_path):
# Check if TCM was created with actual audio durations (has a marker)
try:
with open(output_tcm_path, "r", encoding="utf-8") as f:
tcm_data = json.load(f)
# Check if this TCM has the sync fix marker
has_sync_fix = any(event.get("_sync_fix_applied", False) for event in tcm_data)
if not has_sync_fix:
print(f"[{topic}] Detected old subtitle timing - will regenerate with improved sync")
needs_regeneration = True
except:
needs_regeneration = True
if (
os.path.exists(output_video_path)
and os.path.exists(output_tcm_path)
and os.path.exists(output_srt_path)
and os.path.exists(output_vtt_path)
and not needs_regeneration
):
msg = f"[{topic}] Combined assets already exist with improved sync. Skipping."
print(msg)
return "already_exists"
print(
f"[{topic}] ==> Finalizing project: Combining videos and creating global TCM..."
)
print(
f"[{topic}] ==> Finalizing project: Combining videos and creating global TCM..."
)
final_tcm = []
video_clips = [] # Collect VideoFileClip objects
audio_clips_collection = [] # Collect AudioFileClip objects for reconstruction
global_time_offset = 0.0
try:
scene_dirs = sorted(
glob.glob(os.path.join(project_path, "scene*")),
key=lambda x: int(re.search(r"scene(\d+)", x).group(1)),
)
except (TypeError, ValueError):
print(
f" - ERROR: Could not sort scene directories in '{project_path}'. Check folder names."
)
return
# Initialize Kokoro service lazily if needed for regeneration
from src.utils.kokoro_voiceover import KokoroService
from pathlib import Path
from moviepy import VideoFileClip, AudioFileClip, CompositeAudioClip, concatenate_videoclips
temp_speech_service = None
for scene_dir in scene_dirs:
scene_num = int(re.search(r"scene(\d+)", os.path.basename(scene_dir)).group(1))
video_path = find_latest_video_for_scene(project_path, scene_num)
proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json")
succ_rendered_path = os.path.join(scene_dir, "succ_rendered.txt")
if not video_path or not os.path.exists(succ_rendered_path):
print(
f" - Skipping Scene {scene_num}: Video or succ_rendered.txt missing."
)
continue
if not os.path.exists(proto_tcm_path):
print(
f" - WARNING: proto_tcm.json not found for Scene {scene_num}. Skipping fine-grained analysis."
)
continue
try:
# Load Video Clip for Recombination
current_video_clip = VideoFileClip(video_path)
video_clips.append(current_video_clip)
actual_duration = current_video_clip.duration
with open(proto_tcm_path, "r", encoding="utf-8") as f:
proto_tcm = json.load(f)
# --- AUDIO VERIFICATION & LOADING ---
actual_audio_durations = {}
actual_audio_paths = {}
possible_cache_dirs = [
os.path.join(scene_dir, "code", "media", "voiceovers"),
os.path.join(project_path, "media", "voiceovers"),
os.path.join(os.path.dirname(video_path), "voiceovers"),
]
active_cache_dir = None
for d in possible_cache_dirs:
if os.path.exists(d):
active_cache_dir = d
break
if not active_cache_dir:
active_cache_dir = possible_cache_dirs[0]
os.makedirs(active_cache_dir, exist_ok=True)
# 1. Verify/Regenerate all audio files
for event in proto_tcm:
narration_text = event.get("narrationText", "")
if not narration_text: continue
# Check consistency / regenerate if needed
if narration_text.strip():
if temp_speech_service is None:
temp_speech_service = KokoroService()
try:
temp_speech_service.generate_from_text(narration_text, cache_dir=Path(active_cache_dir))
except Exception as e:
print(f" - Warning: Failed to regenerate Voiceover experienced error: {e}")
# 2. Load Audio Paths from Cache
for voiceover_cache_dir in possible_cache_dirs:
if os.path.exists(voiceover_cache_dir):
cache_file = os.path.join(voiceover_cache_dir, "cache.json")
if os.path.exists(cache_file):
try:
with open(cache_file, "r", encoding="utf-8") as f:
cache_data = json.load(f)
for entry in cache_data:
text_key = entry.get("input_text", "")
if text_key and "original_audio" in entry:
audio_file = os.path.join(voiceover_cache_dir, entry["original_audio"])
if os.path.exists(audio_file):
with AudioFileClip(audio_file) as audio:
actual_audio_durations[text_key] = audio.duration
actual_audio_paths[text_key] = audio_file
except Exception as e:
print(f" - Warning: Could not load cache.json: {e}")
if actual_audio_durations:
break
# --- DURATION CALCULATION ---
total_ideal_duration = 0.0
event_base_durations = []
for event in proto_tcm:
narration_text = event.get("narrationText", "")
est_duration = event.get("estimatedDuration", 1.0)
audio_dur = 0.0
if narration_text and narration_text.strip():
if narration_text in actual_audio_durations:
audio_dur = actual_audio_durations[narration_text]
else:
for cached_text, cached_duration in actual_audio_durations.items():
if cached_text.strip() == narration_text.strip():
audio_dur = cached_duration
break
base_dur = max(audio_dur, est_duration)
event_base_durations.append(base_dur)
total_ideal_duration += base_dur
scaling_factor = actual_duration / total_ideal_duration if total_ideal_duration > 0 else 1.0
if not actual_audio_durations:
print(f" - Warning: No actual audio durations found, using proportional scaling based on estimates")
# --- BUILD TCM & COLLECT AUDIO CLIPS ---
scene_time_offset = 0
for i, event in enumerate(proto_tcm):
narration_text = event.get("narrationText", "")
base_dur = event_base_durations[i]
final_event_duration = base_dur * scaling_factor
current_start_time = global_time_offset + scene_time_offset
event["startTime"] = f"{current_start_time:.3f}"
event["endTime"] = f"{current_start_time + final_event_duration:.3f}"
event["conceptId"] = f"{project_name}.scene_{scene_num}.{event.get('conceptName', 'event').replace(' ', '_')}"
event["_sync_fix_applied"] = True
# Collect Audio Clip
if narration_text and narration_text.strip():
audio_path = None
if narration_text in actual_audio_paths:
audio_path = actual_audio_paths[narration_text]
else:
for cached_text, cached_path in actual_audio_paths.items():
if cached_text.strip() == narration_text.strip():
audio_path = cached_path
break
if audio_path:
try:
ac = AudioFileClip(audio_path).set_start(current_start_time)
audio_clips_collection.append(ac)
except Exception as audio_err:
print(f"Warning: Failed to load audio clip {audio_path}: {audio_err}")
if "estimatedDuration" in event:
del event["estimatedDuration"]
final_tcm.append(event)
scene_time_offset += final_event_duration
print(
f" - Processed Scene {scene_num} (Duration: {actual_duration:.2f}s), {len(proto_tcm)} TCM entries."
)
global_time_offset += actual_duration
except Exception as e:
print(f" - ERROR processing Scene {scene_num}: {e}")
import traceback
traceback.print_exc()
if video_clips:
# Concatenate Visuals
final_video_clip = concatenate_videoclips(video_clips)
# Apply Composite Audio if available
if audio_clips_collection:
new_audio = CompositeAudioClip(audio_clips_collection)
new_audio = new_audio.set_duration(final_video_clip.duration)
final_video_clip = final_video_clip.set_audio(new_audio)
print(f"[{topic}] ==> Applied synchronized composite audio track.")
final_video_clip.write_videofile(
output_video_path, codec="libx264", audio_codec="aac", logger="bar", threads=4
)
with open(output_tcm_path, "w", encoding="utf-8") as f:
json.dump(final_tcm, f, indent=2, ensure_ascii=False)
# Generate subtitles in both SRT and VTT formats
srt_content = tcm_to_srt(final_tcm)
with open(output_srt_path, "w", encoding="utf-8") as f:
f.write(srt_content)
vtt_content = tcm_to_vtt(final_tcm)
with open(output_vtt_path, "w", encoding="utf-8") as f:
f.write(vtt_content)
print(f"[{topic}] ==> Project finalized.")
# Close all clips
# Note: CompositeAudioClip closes its children when closed if we are careful,
# but manual closing is safer in moviepy
if audio_clips_collection:
new_audio.close() # Close composite
final_video_clip.close()
for clip in video_clips:
clip.close()
return "success"
else:
print(f"[{topic}] <== No rendered scenes found to finalize.")
return "no_scenes"
# --- VideoGenerator Class (from generate_video.py) ---
class VideoGenerator:
"""
A class for generating manim videos using AI models.
This class coordinates the video generation pipeline by managing scene planning,
code generation, and video rendering. It supports concurrent scene processing,
visual code fixing, and RAG (Retrieval Augmented Generation).
"""
def __init__(
self,
planner_model,
scene_model=None,
output_dir="output",
verbose=False,
use_rag=False,
use_context_learning=False,
context_learning_path="data/context_learning",
chroma_db_path="data/rag/chroma_db",
manim_docs_path="data/rag/manim_docs",
embedding_model="azure/text-embedding-3-large",
use_visual_fix_code=False,
use_langfuse=True,
trace_id=None,
max_scene_concurrency: int = 5,
):
print("Initializing VideoGenerator...")
self.output_dir = output_dir
self.verbose = verbose
self.use_visual_fix_code = use_visual_fix_code
self.session_id = self._load_or_create_session_id()
self.scene_semaphore = asyncio.Semaphore(max_scene_concurrency)
print(f"Scene concurrency limit set to: {max_scene_concurrency}")
self.banned_reasonings = get_banned_reasonings()
self.failed_scenes = []
self.error_recovery = ErrorRecovery(output_dir)
self.rate_limit_detected = False
self.last_rate_limit_time = 0
print("Initializing sub-modules: VideoPlanner, CodeGenerator, VideoRenderer...")
self.planner = VideoPlanner(
planner_model=planner_model,
output_dir=output_dir,
print_response=verbose,
use_context_learning=use_context_learning,
context_learning_path=context_learning_path,
use_rag=use_rag,
session_id=self.session_id,
chroma_db_path=chroma_db_path,
manim_docs_path=manim_docs_path,
embedding_model=embedding_model,
use_langfuse=use_langfuse,
)
self.code_generator = CodeGenerator(
scene_model=scene_model if scene_model is not None else planner_model,
output_dir=output_dir,
print_response=verbose,
use_rag=use_rag,
use_context_learning=use_context_learning,
context_learning_path=context_learning_path,
chroma_db_path=chroma_db_path,
manim_docs_path=manim_docs_path,
embedding_model=embedding_model,
use_visual_fix_code=use_visual_fix_code,
use_langfuse=use_langfuse,
session_id=self.session_id,
)
self.video_renderer = VideoRenderer(
output_dir=output_dir,
print_response=verbose,
use_visual_fix_code=self.use_visual_fix_code,
)
print("VideoGenerator initialized successfully.")
def _load_or_create_session_id(self) -> str:
"""Load existing session ID from file or create a new one."""
session_file = os.path.join(self.output_dir, "session_id.txt")
if os.path.exists(session_file):
with open(session_file, "r") as f:
session_id = f.read().strip()
return session_id
session_id = str(uuid.uuid4())
print(f"No existing session ID found. Creating a new one: {session_id}")
os.makedirs(self.output_dir, exist_ok=True)
with open(session_file, "w", encoding='utf-8') as f:
f.write(session_id)
print(f"Saved new session ID to {session_file}")
return session_id
def generate_scene_outline(self, topic: str, description: str, session_id: str) -> str:
"""Generate scene outline using VideoPlanner."""
print(f"[{topic}] ==> Generating scene outline...")
outline = self.planner.generate_scene_outline(topic, description, session_id)
print(f"[{topic}] ==> Scene outline generated successfully.")
return outline
async def generate_scene_implementation_concurrently(
self, topic: str, description: str, plan: str, session_id: str
):
"""Generate scene implementations concurrently using VideoPlanner."""
print(f"[{topic}] ==> Generating scene implementations concurrently...")
implementations = await self.planner.generate_scene_implementation_concurrently(
topic, description, plan, session_id, self.scene_semaphore
)
print(f"[{topic}] ==> All concurrent scene implementations generated.")
return implementations
async def render_video_fix_code(
self,
topic: str,
description: str,
scene_outline: str,
implementation_plans: list,
max_retries=3,
session_id: str = None,
):
"""Render the video for all scenes with code fixing capability."""
print(f"[{topic}] ==> Preparing to render {len(implementation_plans)} scenes...")
file_prefix = topic.lower()
file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix)
tasks = []
for scene_num, implementation_plan in implementation_plans:
scene_dir = os.path.join(self.output_dir, topic, file_prefix, f"scene{scene_num}")
subplan_dir = os.path.join(scene_dir, "subplans")
os.makedirs(subplan_dir, exist_ok=True)
scene_trace_id_path = os.path.join(subplan_dir, "scene_trace_id.txt")
try:
with open(scene_trace_id_path, "r") as f:
scene_trace_id = f.read().strip()
except FileNotFoundError:
scene_trace_id = str(uuid.uuid4())
with open(scene_trace_id_path, "w", encoding='utf-8') as f:
f.write(scene_trace_id)
proto_tcm_str = ""
proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json")
if os.path.exists(proto_tcm_path):
with open(proto_tcm_path, "r") as f:
proto_tcm_str = f.read()
task = self.process_scene(
scene_num - 1,
scene_outline,
implementation_plan,
proto_tcm_str,
topic,
description,
max_retries,
file_prefix,
session_id,
scene_trace_id,
)
tasks.append(task)
print(f"[{topic}] Starting concurrent processing of {len(tasks)} scenes...")
await asyncio.gather(*tasks)
print(f"[{topic}] <== All scene processing tasks completed.")
def _save_topic_session_id(self, topic: str, session_id: str):
"""Save session ID for a specific topic."""
file_prefix = topic.lower()
file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
topic_dir = os.path.join(self.output_dir, topic, file_prefix)
os.makedirs(topic_dir, exist_ok=True)
session_file = os.path.join(topic_dir, "session_id.txt")
with open(session_file, 'w', encoding='utf-8') as f:
f.write(session_id)
def _load_topic_session_id(self, topic: str):
"""Load session ID for a specific topic if it exists."""
file_prefix = topic.lower()
file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix)
session_file = os.path.join(self.output_dir, topic, file_prefix, "session_id.txt")
if os.path.exists(session_file):
with open(session_file, "r") as f:
return f.read().strip()
return None
def cleanup_invalid_success_markers(self, topic: str) -> int:
"""Remove succ_rendered.txt files for scenes that don't actually have rendered videos."""
file_prefix = re.sub(r'[^a-z0-9_]+', '_', topic.lower())
topic_dir = os.path.join(self.output_dir, topic, file_prefix)
if not os.path.exists(topic_dir):
return 0
removed_count = 0
scene_dirs = glob.glob(os.path.join(topic_dir, "scene*"))
for scene_dir in scene_dirs:
if not os.path.isdir(scene_dir):
continue
scene_match = re.search(r'scene(\d+)', os.path.basename(scene_dir))
if not scene_match:
continue
scene_num = scene_match.group(1)
succ_file = os.path.join(scene_dir, "succ_rendered.txt")
if os.path.exists(succ_file):
media_dir = os.path.join(topic_dir, "media", "videos")
video_pattern = os.path.join(media_dir, f"{file_prefix}_scene{scene_num}_v*")
video_folders = glob.glob(video_pattern)
has_video = False
for video_folder in video_folders:
for res_dir in ["1080p60", "720p30", "480p15"]:
video_file = os.path.join(video_folder, res_dir, f"Scene{scene_num}.mp4")
if os.path.exists(video_file):
has_video = True
break
if has_video:
break
if not has_video:
os.remove(succ_file)
removed_count += 1
return removed_count
def load_implementation_plans(self, topic: str):
"""Load implementation plans for each scene."""
file_prefix = topic.lower()
file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix)
scene_outline_path = os.path.join(
self.output_dir, topic, file_prefix, f"{file_prefix}_scene_outline.txt"
)
if not os.path.exists(scene_outline_path):
return {}
with open(scene_outline_path, "r") as f:
scene_outline = f.read()
scene_outline_content = extract_xml(scene_outline, "SCENE_OUTLINE")
scene_number = 0
if scene_outline_content:
scene_number = len(re.findall(r"[^<]", scene_outline_content))
implementation_plans = {}
for i in range(1, scene_number + 1):
plan_path = os.path.join(
self.output_dir, topic, file_prefix, f"scene{i}",
f"{file_prefix}_scene{i}_implementation_plan.txt",
)
if os.path.exists(plan_path):
with open(plan_path, "r") as f:
implementation_plans[i] = f.read()
else:
implementation_plans[i] = None
return implementation_plans
async def _generate_scene_implementation_single(
self, topic: str, description: str, scene_outline_i: str, i: int,
file_prefix: str, session_id: str, scene_trace_id: str
):
"""Orchestrates the generation of a detailed plan and Proto-TCM for a single scene."""
full_llm_response_obj = await self.planner._generate_scene_implementation_single(
topic, description, scene_outline_i, i, file_prefix, session_id, scene_trace_id
)
if isinstance(full_llm_response_obj, dict) and "plan" in full_llm_response_obj and "proto_tcm" in full_llm_response_obj:
plan = full_llm_response_obj["plan"]
proto_tcm_str = full_llm_response_obj["proto_tcm"]
else:
full_llm_response = ""
if isinstance(full_llm_response_obj, str):
full_llm_response = full_llm_response_obj
elif isinstance(full_llm_response_obj, dict):
try:
full_llm_response = full_llm_response_obj["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError):
if "content" in full_llm_response_obj:
full_llm_response = full_llm_response_obj["content"]
else:
full_llm_response = str(full_llm_response_obj)
else:
full_llm_response = str(full_llm_response_obj)
plan = extract_xml(full_llm_response, "SCENE_TECHNICAL_IMPLEMENTATION_PLAN")
if not plan or "" not in plan:
plan = full_llm_response
proto_tcm_str = extract_xml(full_llm_response, "SCENE_PROTO_TCM")
scene_dir = os.path.join(self.output_dir, topic, file_prefix, f"scene{i}")
os.makedirs(scene_dir, exist_ok=True)
if proto_tcm_str and "" not in proto_tcm_str:
try:
proto_tcm_data = json.loads(proto_tcm_str)
proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json")
with open(proto_tcm_path, "w", encoding="utf-8") as f:
json.dump(proto_tcm_data, f, indent=2, ensure_ascii=False)
except json.JSONDecodeError:
proto_tcm_str = ""
else:
proto_tcm_str = ""
plan_path = os.path.join(scene_dir, f"{file_prefix}_scene{i}_implementation_plan.txt")
with open(plan_path, "w", encoding="utf-8") as f:
f.write(plan)
return {"plan": plan, "proto_tcm": proto_tcm_str}
async def generate_video_pipeline(
self, topic: str, description: str, max_retries: int,
only_plan: bool = False, specific_scenes: list = None, progress_callback=None
):
"""Modified pipeline to handle partial scene completions."""
# Create a simple args object for compatibility
class Args:
only_render = False
args = Args()
topic_folder_session_file = os.path.join(self.output_dir, topic, "session_id.txt")
if os.path.exists(topic_folder_session_file):
with open(topic_folder_session_file, "r") as f:
session_id = f.read().strip()
else:
session_id = self._load_or_create_session_id()
os.makedirs(os.path.join(self.output_dir, topic), exist_ok=True)
with open(topic_folder_session_file, "w", encoding='utf-8') as f:
f.write(session_id)
self._save_topic_session_id(topic, session_id)
file_prefix = topic.lower()
file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix)
scene_outline_path = os.path.join(
self.output_dir, topic, file_prefix, f"{file_prefix}_scene_outline.txt"
)
if progress_callback:
progress_callback(0.05, "š Planning your video structure...")
if os.path.exists(scene_outline_path):
with open(scene_outline_path, "r") as f:
scene_outline = f.read()
else:
scene_outline_obj = self.planner.generate_scene_outline(topic, description, session_id)
scene_outline = ""
if isinstance(scene_outline_obj, str):
scene_outline = scene_outline_obj
elif isinstance(scene_outline_obj, dict):
try:
scene_outline = scene_outline_obj["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError):
if "content" in scene_outline_obj:
scene_outline = scene_outline_obj["content"]
else:
scene_outline = str(scene_outline_obj)
else:
scene_outline = str(scene_outline_obj)
if not scene_outline or "" not in scene_outline:
raise ValueError(f"[{topic}] FAILED to generate a valid scene outline.")
os.makedirs(os.path.join(self.output_dir, topic, file_prefix), exist_ok=True)
with open(scene_outline_path, "w", encoding="utf-8") as f:
f.write(scene_outline)
if progress_callback:
progress_callback(0.15, "ā
Video structure ready")
removed = self.cleanup_invalid_success_markers(topic)
if progress_callback:
progress_callback(0.20, "šØ Designing each scene...")
implementation_plans_dict = self.load_implementation_plans(topic)
scene_outline_content = extract_xml(scene_outline, "SCENE_OUTLINE")
scene_numbers = len(re.findall(r"[^<]", scene_outline_content)) if scene_outline_content else 0
missing_scenes = []
for i in range(1, scene_numbers + 1):
if implementation_plans_dict.get(i) is None and (specific_scenes is None or i in specific_scenes):
missing_scenes.append(i)
if missing_scenes:
for idx, scene_num in enumerate(missing_scenes):
if progress_callback:
plan_progress = 0.20 + (0.15 * (idx / len(missing_scenes)))
progress_callback(plan_progress, f"šØ Designing scene {scene_num} of {scene_numbers}...")
scene_match = re.search(f"(.*?)", scene_outline_content, re.DOTALL)
if scene_match:
scene_outline_i = scene_match.group(1)
scene_trace_id = str(uuid.uuid4())
implementation_details = await self._generate_scene_implementation_single(
topic, description, scene_outline_i, scene_num, file_prefix, session_id, scene_trace_id
)
implementation_plans_dict[scene_num] = implementation_details["plan"]
if progress_callback:
progress_callback(0.35, "ā
All scenes designed")
if only_plan:
return
if progress_callback:
progress_callback(0.40, "š¬ Creating animations...")
sorted_scene_numbers = sorted(implementation_plans_dict.keys())
processed_count = 0
total_scenes_to_process = len([s for s in sorted_scene_numbers if not specific_scenes or s in specific_scenes])
for scene_num in sorted_scene_numbers:
if specific_scenes and scene_num not in specific_scenes:
continue
scene_dir = os.path.join(self.output_dir, topic, file_prefix, f"scene{scene_num}")
is_rendered = os.path.exists(os.path.join(scene_dir, "succ_rendered.txt"))
if is_rendered and not args.only_render:
continue
implementation_plan = implementation_plans_dict.get(scene_num)
if not implementation_plan:
continue
proto_tcm_str = ""
proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json")
if os.path.exists(proto_tcm_path):
with open(proto_tcm_path, "r") as f:
proto_tcm_str = f.read()
scene_trace_id_path = os.path.join(scene_dir, "subplans", "scene_trace_id.txt")
if os.path.exists(scene_trace_id_path):
with open(scene_trace_id_path, "r") as f:
scene_trace_id = f.read().strip()
else:
os.makedirs(os.path.dirname(scene_trace_id_path), exist_ok=True)
scene_trace_id = str(uuid.uuid4())
with open(scene_trace_id_path, "w", encoding='utf-8') as f:
f.write(scene_trace_id)
if progress_callback:
scene_progress = 0.40 + (0.50 * (processed_count / total_scenes_to_process))
progress_callback(scene_progress, f"š¬ Animating scene {scene_num} of {scene_numbers}...")
await self.process_scene(
i=scene_num - 1, scene_outline=scene_outline, scene_implementation=implementation_plan,
proto_tcm=proto_tcm_str, topic=topic, description=description, max_retries=max_retries,
file_prefix=file_prefix, session_id=session_id, scene_trace_id=scene_trace_id,
)
processed_count += 1
if progress_callback:
scene_progress = 0.40 + (0.50 * (processed_count / total_scenes_to_process))
progress_callback(scene_progress, f"ā
Scene {scene_num} done!")
if progress_callback:
progress_callback(0.90, "ā
All animations complete!")
async def process_scene(
self, i: int, scene_outline: str, scene_implementation: str, proto_tcm: str,
topic: str, description: str, max_retries: int, file_prefix: str,
session_id: str, scene_trace_id: str,
):
"""Process a single scene using CodeGenerator and VideoRenderer."""
curr_scene = i + 1
curr_version = 0
rag_queries_cache = {}
code_dir = os.path.join(
self.output_dir, topic, file_prefix, f"scene{curr_scene}", "code"
)
os.makedirs(code_dir, exist_ok=True)
media_dir = os.path.join(self.output_dir, topic, file_prefix, "media")
async with self.scene_semaphore:
print(f"Scene {curr_scene} ---> Generating animation code...")
code, log = self.code_generator.generate_manim_code(
topic=topic, description=description, scene_outline=scene_outline,
scene_implementation=scene_implementation, proto_tcm=proto_tcm,
scene_number=curr_scene, additional_context=[
_prompt_manim_cheatsheet, _code_font_size, _code_limit, _code_disable,
], scene_trace_id=scene_trace_id, session_id=session_id,
rag_queries_cache=rag_queries_cache,
)
log_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_init_log.txt")
code_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py")
with open(log_path, "w", encoding="utf-8") as f:
f.write(log)
with open(code_path, "w", encoding="utf-8") as f:
f.write(code)
print(f"Scene {curr_scene} ā
Animation code generated.")
print(f"Scene {curr_scene} ---> Rendering animation...")
error_message = None
while curr_version < max_retries:
code, error_message = await self.video_renderer.render_scene(
code=code, file_prefix=file_prefix, curr_scene=curr_scene,
curr_version=curr_version, code_dir=code_dir, media_dir=media_dir,
max_retries=max_retries, use_visual_fix_code=self.use_visual_fix_code,
visual_self_reflection_func=self.code_generator.visual_self_reflection,
banned_reasonings=self.banned_reasonings, scene_trace_id=scene_trace_id,
topic=topic, session_id=session_id,
)
if error_message is None:
print(f"Scene {curr_scene} ā
Animation rendered successfully.")
break
curr_version += 1
if curr_version >= max_retries:
print(f"Scene {curr_scene} ā ļø Failed to render after {max_retries} attempts. Check logs for details.")
self.failed_scenes.append({
'topic': topic, 'scene': curr_scene,
'last_error': error_message, 'total_attempts': curr_version + 1,
})
break
print(f"Scene {curr_scene} ---> Fixing code issues (attempt {curr_version + 1}/{max_retries})...")
print(f"Scene {curr_scene} ---> Analyzing error and generating fix...")
code, log = self.code_generator.fix_code_errors(
implementation_plan=scene_implementation, proto_tcm=proto_tcm,
code=code, error=error_message, scene_trace_id=scene_trace_id,
topic=topic, scene_number=curr_scene, session_id=session_id,
rag_queries_cache=rag_queries_cache,
)
print(f"Scene {curr_scene} ---> Fixed code generated, saving...")
log_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_fix_log.txt")
code_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py")
with open(log_path, "w", encoding="utf-8") as f:
f.write(log)
with open(code_path, "w", encoding="utf-8") as f:
f.write(code)
print(f"Scene {curr_scene} ---> Re-rendering with fixed code...")
# --- Initialize Video Generator ---
def get_video_generator(
planner_name,
scene_concurrency,
):
llm_kwargs = {
"temperature": 0.7,
"print_cost": True,
}
planner_model, scene_model = LiteLLMWrapper(
model_name=planner_name, **llm_kwargs
), LiteLLMWrapper(model_name=planner_name, **llm_kwargs)
return VideoGenerator(
planner_model=planner_model,
scene_model=scene_model,
output_dir="output",
max_scene_concurrency=scene_concurrency,
)
try:
video_generator = get_video_generator(
app_state["planner_model_name"], app_state["max_scene_concurrency"]
)
video_generator.check_theorem_status = MethodType(
check_status, video_generator
)
video_generator.set_active_output_dir = MethodType(
set_active_output_dir, video_generator
)
except Exception as e:
print(f"Failed to initialize VideoGenerator: {e}")
video_generator = None
# --- UI Definition ---
app.add_static_files("/output", "output")
# --- Streamlit-Inspired Color Scheme ---
THEME_COLORS = {
"primary": "#FF4B4B", # Streamlit's signature red
"secondary": "#4A4A4A", # Dark gray for text and secondary elements
"accent": "#00A2FF", # A bright blue for accents
"positive": "#28A745", # Success green
"negative": "#DC3545", # Error red
"info": "#17A2B8", # Informational teal
"warning": "#FFC107", # Warning yellow
}
@ui.page("/")
async def main_page():
# --- Page Configuration ---
ui.colors(
primary=THEME_COLORS["primary"],
secondary=THEME_COLORS["secondary"],
accent=THEME_COLORS["accent"],
positive=THEME_COLORS["positive"],
negative=THEME_COLORS["negative"],
info=THEME_COLORS["info"],
warning=THEME_COLORS["warning"],
)
ui.add_head_html(
''
)
ui.add_body_html('''
''')
# Add custom CSS for chat messages
ui.add_head_html('''
''')
dark_mode = ui.dark_mode(value=False)
# Add comprehensive JavaScript to enforce dark mode
ui.add_body_html('''
''')
# --- Modern Header with Enhanced Styling ---
with ui.header().classes(
"bg-white dark:bg-gray-900 text-gray-900 dark:text-white shadow-sm"
).style("height: 64px; padding: 0 24px; display: flex; align-items: center; backdrop-filter: blur(10px);"):
with ui.row().classes("w-full items-center justify-between").style("height: 100%;"):
with ui.row(align_items="center").classes("gap-3"):
# Enhanced menu button with hover effect
menu_btn = ui.button(
icon="menu",
on_click=lambda: left_drawer.toggle()
).props("flat round").classes(
"text-gray-600 dark:text-gray-300 hover:bg-gray-100 dark:hover:bg-gray-800"
).tooltip("Toggle sidebar")
# Animated brand section
with ui.row(align_items="center").classes("gap-2"):
ui.icon("movie", size="md").classes("text-primary")
ui.label("AlgoVision").classes("text-xl font-bold tracking-tight text-gray-900 dark:text-white")
with ui.row(align_items="center").classes("gap-4"):
ui.label("Educational Video Generator").classes(
"text-sm text-gray-500 dark:text-gray-400 hidden md:block font-medium"
)
# Enhanced dark mode toggle with icon transition
dark_mode_btn = ui.button(
icon="light_mode" if dark_mode.value else "dark_mode",
on_click=lambda: (dark_mode.toggle(), dark_mode_btn.props(f"icon={'dark_mode' if dark_mode.value else 'light_mode'}"))
).props("flat round").classes(
"text-gray-600 dark:text-gray-300 hover:bg-gray-100 dark:hover:bg-gray-800"
).tooltip("Toggle dark mode")
# --- Modern Sidebar (Left Drawer) with Enhanced UX ---
left_drawer = ui.left_drawer(value=True, fixed=True, bordered=True).classes(
"bg-gradient-to-b from-gray-50 to-white dark:from-gray-900 dark:to-gray-800"
)
with left_drawer:
# Sidebar Header with branding and primary accent
with ui.row().classes("w-full items-center justify-center px-4 py-3 border-b-2 border-primary/30 dark:border-primary/40").style("flex-shrink: 0; backdrop-filter: blur(10px);"):
with ui.column().classes("gap-1 items-center text-center"):
with ui.row().classes("items-center gap-2"):
ui.icon("settings", size="md").classes("text-primary")
ui.label("Settings").classes("text-2xl font-bold text-gray-900 dark:text-white")
ui.label("Configure your workspace").classes("text-xs text-primary/70 dark:text-primary/60 font-medium")
# Scrollable content area with custom scrollbar - fixed height to enable proper scrolling
with ui.column().classes("w-full gap-3 px-4 pt-4 pb-2 overflow-y-auto overflow-x-hidden custom-scrollbar").style("flex: 1 1 auto; min-height: 0; max-height: 100%;"):
# Model Configuration Section with primary accent
with ui.expansion(
"Model Configuration",
icon="psychology",
value=True
).classes(
"w-full bg-white dark:bg-gray-800 rounded-lg shadow-sm "
"hover:shadow-md border border-gray-200 dark:border-gray-700"
).props("dense duration=600"):
ui.separator().classes("bg-gradient-to-r from-primary/30 via-primary/10 to-transparent").style("margin: 0; height: 1px;")
with ui.column().classes("w-full gap-3 px-3 pt-2 pb-3"):
with ui.row().classes("items-center gap-2 mb-2"):
ui.icon("smart_toy", size="sm").classes("text-primary")
ui.label("AI Model").classes("text-sm font-semibold text-primary dark:text-primary")
def prettify_model_name(model_id: str) -> str:
name_part = model_id.split("/")[-1].replace("-", " ").replace("_", " ")
return " ".join(word.capitalize() for word in name_part.split())
model_display_map = {
model_id: prettify_model_name(model_id) for model_id in allowed_models
}
default_model = app_state.get("planner_model_name") or next(
iter(model_display_map)
)
model_select = ui.select(
model_display_map,
label="Select LLM Model",
value=default_model,
on_change=lambda e: app_state.update({"planner_model_name": e.value}),
).props("outlined dense").classes("w-full")
# Model info badge
with ui.row().classes("items-center gap-2 mt-1"):
ui.icon("info", size="xs").classes("text-blue-500")
ui.label("Powers system logic & output").classes("text-xs text-gray-500 dark:text-gray-400")
# TTS Configuration Section with primary accent on hover
with ui.expansion(
"Voice Configuration",
icon="record_voice_over",
value=True
).classes(
"w-full bg-white dark:bg-gray-800 rounded-lg shadow-sm "
"hover:shadow-md border border-gray-200 dark:border-gray-700"
).props("dense duration=600"):
ui.separator().classes("bg-gradient-to-r from-primary/30 via-primary/10 to-transparent").style("margin: 0; height: 1px;")
with ui.column().classes("w-full gap-3 px-3 pt-2 pb-3"):
with ui.row().classes("items-center gap-2 mb-2"):
ui.icon("mic", size="sm").classes("text-primary")
ui.label("Voice Selection").classes("text-sm font-semibold text-primary dark:text-primary")
voices = load_voices("src/tts/voices.json")
if voices:
voice_options = {v["id"]: v["name"] for v in voices}
first_voice_id = next(iter(voice_options))
ui.select(
voice_options,
label="Choose Voice",
value=first_voice_id
).props("outlined dense").classes("w-full")
with ui.row().classes("items-center gap-1 mt-1"):
ui.icon("info", size="xs").classes("text-blue-500")
ui.label(f"{len(voices)} voices available").classes("text-xs text-gray-500 dark:text-gray-400")
else:
with ui.card().classes("w-full bg-orange-50 dark:bg-orange-900/20 border border-orange-200 dark:border-orange-800"):
with ui.row().classes("items-center gap-2 p-2"):
ui.icon("warning", size="sm").classes("text-orange-500")
ui.label("No voices found").classes("text-sm text-orange-700 dark:text-orange-300")
# Pipeline Configuration Section with primary accent on hover
with ui.expansion(
"Pipeline Settings",
icon="tune",
value=True
).props("dense duration=600").classes(
"w-full bg-white dark:bg-gray-800 rounded-lg shadow-sm "
"hover:shadow-md border border-gray-200 dark:border-gray-700"
).props("dense"):
ui.separator().classes("bg-gradient-to-r from-primary/30 via-primary/10 to-transparent").style("margin: 0; height: 1px;")
# Max retries is now fixed at 5 (no UI control needed)
# Sidebar Footer with quick actions and primary accent - fixed at bottom
with ui.row().classes("w-full items-center justify-between px-4 py-2 border-t-2 border-primary/30 dark:border-primary/40").style("flex-shrink: 0; position: sticky; bottom: 0; z-index: 10; backdrop-filter: blur(10px); background: linear-gradient(0deg, rgba(255,75,75,0.03) 0%, transparent 100%);"):
ui.label("v1.0.0").classes("text-xs text-gray-400")
with ui.row().classes("gap-1"):
ui.button(icon="help_outline", on_click=lambda: ui.notify("Documentation coming soon!", type="info")).props("flat round dense").classes("text-primary hover:bg-primary/10").tooltip("Help")
ui.button(icon="bug_report", on_click=lambda: ui.notify("Report issues on GitHub", type="info")).props("flat round dense").classes("text-primary hover:bg-primary/10").tooltip("Report Bug")
# --- ALL HANDLER FUNCTIONS DEFINED FIRST (will be defined after UI elements) ---
def delete_project(topic_name):
"""Delete a project and all its files."""
import shutil
async def confirm_delete():
dialog.close()
try:
project_path = get_project_path("output", topic_name)
# Get the parent directory (the topic folder)
topic_folder = os.path.dirname(project_path)
# Delete the entire topic folder
if os.path.exists(topic_folder):
shutil.rmtree(topic_folder)
ui.notify(f"Project '{topic_name}' deleted successfully", color="positive", icon="delete")
# Clear chat history for this topic
if topic_name in app_state["chat_histories"]:
del app_state["chat_histories"][topic_name]
# Clear selected topic if it was deleted
if app_state.get("selected_topic") == topic_name:
app_state["selected_topic"] = None
app_state["current_topic_inspector"] = None
# Refresh dashboard and inspector
await update_dashboard()
await update_inspector(None)
else:
ui.notify(f"Project folder not found: {topic_folder}", color="warning")
except Exception as e:
ui.notify(f"Error deleting project: {e}", color="negative", multi_line=True)
# Create confirmation dialog
with ui.dialog() as dialog, ui.card().classes("p-6"):
with ui.column().classes("gap-4"):
with ui.row().classes("items-center gap-3"):
ui.icon("warning", size="lg").classes("text-orange-500")
ui.label("Delete Project?").classes("text-xl font-bold")
ui.label(f"Are you sure you want to delete '{topic_name}'?").classes("text-gray-700 dark:text-gray-300")
ui.label("This will permanently delete all files, videos, and data for this project.").classes("text-sm text-gray-600 dark:text-gray-400")
ui.label("This action cannot be undone.").classes("text-sm font-semibold text-red-600 dark:text-red-400")
with ui.row().classes("w-full justify-end gap-2 mt-2"):
ui.button("Cancel", on_click=dialog.close).props("flat")
ui.button("Delete", on_click=confirm_delete).props("unelevated").classes("bg-red-600 text-white")
dialog.open()
async def update_dashboard():
dashboard_content.clear()
if not video_generator:
with dashboard_content:
ui.label("Generator not initialized.").classes("text-negative")
return
topic_folders = get_topic_folders("output")
if not topic_folders:
with dashboard_content:
# Empty state
with ui.card().classes("w-full").style("padding: 60px; text-align: center;"):
ui.icon("folder_open", size="xl").classes("text-gray-400")
ui.label("No Projects Yet").classes("text-2xl font-semibold text-gray-700 dark:text-gray-300 mt-4")
ui.label("Create your first educational video to get started").classes("text-gray-500 dark:text-gray-400 mt-2")
ui.button("Create Project", icon="add", on_click=lambda: main_tabs.set_value("⨠Generate")).props("unelevated no-caps").classes("mt-4")
return
with dashboard_content:
all_statuses = [
video_generator.check_theorem_status({"theorem": th})
for th in topic_folders
]
# Stats Cards
with ui.grid(columns=3).classes("w-full gap-4 mb-6"):
# Total Projects
with ui.card().classes("w-full").style("padding: 24px;"):
with ui.row().classes("w-full items-center justify-between"):
with ui.column().classes("gap-1"):
ui.label("Total Projects").classes("text-sm font-medium text-gray-600 dark:text-gray-400")
ui.label(str(len(all_statuses))).classes("text-3xl font-bold text-gray-900 dark:text-white")
ui.icon("folder", size="lg").classes("text-primary opacity-80")
# Scenes Progress
with ui.card().classes("w-full").style("padding: 24px;"):
with ui.row().classes("w-full items-center justify-between"):
with ui.column().classes("gap-1"):
ui.label("Scenes Rendered").classes("text-sm font-medium text-gray-600 dark:text-gray-400")
total_scenes = sum(s["total_scenes"] for s in all_statuses)
total_renders = sum(s["rendered_scenes"] for s in all_statuses)
ui.label(f"{total_renders}/{total_scenes}").classes("text-3xl font-bold text-gray-900 dark:text-white")
ui.icon("movie_filter", size="lg").classes("text-blue-500 opacity-80")
# Completed Videos
with ui.card().classes("w-full").style("padding: 24px;"):
with ui.row().classes("w-full items-center justify-between"):
with ui.column().classes("gap-1"):
ui.label("Completed").classes("text-sm font-medium text-gray-600 dark:text-gray-400")
total_combined = sum(1 for s in all_statuses if s["has_combined_video"])
ui.label(f"{total_combined}/{len(all_statuses)}").classes("text-3xl font-bold text-gray-900 dark:text-white")
ui.icon("check_circle", size="lg").classes("text-green-500 opacity-80")
# Projects List Header with View Toggle
if "dashboard_view" not in app_state:
app_state["dashboard_view"] = "list"
view_mode = app_state["dashboard_view"]
with ui.row().classes("w-full items-center justify-between mb-3"):
ui.label("Your Projects").classes("text-lg font-semibold text-gray-900 dark:text-white")
async def toggle_view():
app_state["dashboard_view"] = "grid" if view_mode == "list" else "list"
await update_dashboard()
ui.button(
icon="grid_view" if view_mode == "list" else "view_list",
on_click=toggle_view
).props("outline dense").tooltip("Toggle Grid/List View")
# Render projects based on view mode
if view_mode == "grid":
# Grid View
with ui.grid(columns=2).classes("w-full gap-4"):
for status in sorted(all_statuses, key=lambda x: x["topic"]):
with ui.card().classes("w-full hover:shadow-lg transition-shadow").style("padding: 20px;"):
with ui.column().classes("w-full gap-3"):
# Header
with ui.row().classes("w-full items-start justify-between"):
ui.icon("video_library", size="lg").classes("text-primary")
with ui.row().classes("items-center gap-1"):
ui.badge(
"ā" if status["has_combined_video"] else "...",
color="positive" if status["has_combined_video"] else "orange"
).props("rounded")
ui.button(
icon="delete",
on_click=lambda s=status["topic"]: delete_project(s)
).props("flat round dense").classes("text-red-500").tooltip("Delete project")
# Title
ui.label(status["topic"]).classes("text-lg font-semibold text-gray-900 dark:text-white")
# Stats
progress_pct = int((status["rendered_scenes"] / status["total_scenes"]) * 100) if status["total_scenes"] > 0 else 0
with ui.column().classes("w-full gap-1"):
ui.label(f"{status['rendered_scenes']}/{status['total_scenes']} scenes").classes("text-sm text-gray-600 dark:text-gray-400")
ui.linear_progress(progress_pct / 100, show_value=False).props('rounded color="primary"').style("height: 6px;")
# Action
ui.button(
"View Details",
icon="arrow_forward",
on_click=lambda s=status["topic"]: asyncio.create_task(inspect_project(s))
).props("flat no-caps dense").classes("w-full text-primary")
else:
# List View
for status in sorted(all_statuses, key=lambda x: x["topic"]):
with ui.card().classes("w-full hover:shadow-lg transition-shadow").style("padding: 20px;"):
with ui.row().classes("w-full items-center justify-between mb-3"):
with ui.row().classes("items-center gap-3"):
ui.icon("video_library", size="md").classes("text-primary")
ui.label(status["topic"]).classes("text-lg font-semibold text-gray-900 dark:text-white")
with ui.row().classes("items-center gap-2"):
ui.badge(
"Completed" if status["has_combined_video"] else "In Progress",
color="positive" if status["has_combined_video"] else "orange"
).props("rounded")
ui.button(
icon="delete",
on_click=lambda s=status["topic"]: delete_project(s)
).props("flat round dense").classes("text-red-500").tooltip("Delete project")
# Progress bar
progress_pct = int((status["rendered_scenes"] / status["total_scenes"]) * 100) if status["total_scenes"] > 0 else 0
with ui.column().classes("w-full gap-1"):
with ui.row().classes("w-full items-center justify-between"):
ui.label(f"{status['rendered_scenes']} of {status['total_scenes']} scenes").classes("text-sm text-gray-600 dark:text-gray-400")
ui.label(f"{progress_pct}%").classes("text-sm font-medium text-gray-700 dark:text-gray-300")
ui.linear_progress(progress_pct / 100, show_value=False).props('rounded color="primary"').style("height: 8px;")
# Action button
ui.button(
"View Details",
icon="arrow_forward",
on_click=lambda s=status: asyncio.create_task(inspect_project(s["topic"]))
).props("flat no-caps dense").classes("mt-2 text-primary")
async def update_inspector(topic):
inspector_content.clear()
if not topic:
with inspector_content:
# Empty state with better design
with ui.column().classes("w-full items-center justify-center py-20"):
ui.icon("video_library", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4")
ui.label("No Project Selected").classes("text-2xl font-semibold text-gray-400 dark:text-gray-500")
ui.label("Select a project from the dropdown above to view details").classes("text-gray-400 dark:text-gray-500 mt-2")
return
app_state["current_topic_inspector"] = topic
with inspector_content:
project_path = get_project_path("output", topic)
inner_folder_name = os.path.basename(project_path)
scene_dirs = sorted(
[
d
for d in os.listdir(project_path)
if d.startswith("scene")
and os.path.isdir(os.path.join(project_path, d))
],
key=lambda x: int(x.replace("scene", "")),
)
# Modern Project Header Card
with ui.card().classes("w-full mb-6 shadow-lg border-l-4 border-primary"):
with ui.row().classes("w-full items-center justify-between p-4"):
with ui.column().classes("gap-1"):
ui.label(topic).classes("text-2xl font-bold text-gray-900 dark:text-white")
with ui.row().classes("gap-4 items-center mt-2"):
with ui.row().classes("items-center gap-1"):
ui.icon("movie", size="sm").classes("text-gray-500")
ui.label(f"{len(scene_dirs)} Scenes").classes("text-sm text-gray-600 dark:text-gray-400")
# Check if video exists
video_path = os.path.join(project_path, f"{inner_folder_name}_combined.mp4")
if os.path.exists(video_path):
video_size = os.path.getsize(video_path) / (1024 * 1024) # MB
with ui.row().classes("items-center gap-1"):
ui.icon("check_circle", size="sm").classes("text-green-500")
ui.label(f"Video Ready ({video_size:.1f} MB)").classes("text-sm text-gray-600 dark:text-gray-400")
else:
with ui.row().classes("items-center gap-1"):
ui.icon("pending", size="sm").classes("text-orange-500")
ui.label("Video Pending").classes("text-sm text-gray-600 dark:text-gray-400")
# Quick actions
with ui.row().classes("gap-2"):
is_editing = app_state.get("inspector_edit_mode", False)
def handle_approve():
# Turn off edit mode
app_state["inspector_edit_mode"] = False
# Switch to generate tab
main_tabs.set_value("⨠Generate")
# Pre-fill inputs to help user resume
topic_input.value = topic
# Disable review mode so it runs full pipeline
review_checkbox.value = False
ui.notify(f"Script approved for '{topic}'! Click 'Generate Video' to proceed with rendering.", color="positive", icon="check_circle", timeout=5000)
# Edit Toggle
ui.button(
icon="edit_off" if is_editing else "edit",
on_click=lambda: [app_state.update({"inspector_edit_mode": not is_editing}), asyncio.create_task(update_inspector(topic))]
).props(f"flat round color={'warning' if is_editing else 'primary'}").tooltip("Toggle Edit Mode" if not is_editing else "Exit Edit Mode")
# Approve & Resume (if video pending)
if not os.path.exists(video_path):
ui.button(
"Approve & Resume",
icon="play_arrow",
on_click=handle_approve
).props("unelevated color=green no-caps").tooltip("Approve plans and proceed to rendering")
ui.button(icon="folder_open", on_click=lambda: os.startfile(project_path)).props("flat round").tooltip("Open in Explorer")
ui.button(icon="refresh", on_click=lambda: asyncio.create_task(update_inspector(topic))).props("flat round").tooltip("Refresh")
with ui.tabs().classes("w-full bg-transparent").props("dense align=left") as inspector_tabs:
t_video = ui.tab("š¬ Player").props("no-caps")
t_plan = ui.tab("š Master Plan").props("no-caps")
scene_tabs_list = [
ui.tab(f"šļø Scene {i+1}").props("no-caps") for i in range(len(scene_dirs))
]
with ui.tab_panels(inspector_tabs, value=t_video).classes(
"w-full bg-transparent mt-6"
):
with ui.tab_panel(t_video):
video_local_path = os.path.join(
project_path, f"{inner_folder_name}_combined.mp4"
)
tcm_path = os.path.join(
project_path, f"{inner_folder_name}_combined_tcm.json"
)
srt_local_path = os.path.join(
project_path, f"{inner_folder_name}_combined.srt"
)
vtt_local_path = os.path.join(
project_path, f"{inner_folder_name}_combined.vtt"
)
# Convert SRT to VTT if needed
if os.path.exists(srt_local_path) and not os.path.exists(vtt_local_path):
with open(srt_local_path, 'r', encoding='utf-8') as f:
srt_content = f.read()
vtt_content = srt_to_vtt(srt_content)
with open(vtt_local_path, 'w', encoding='utf-8') as f:
f.write(vtt_content)
if os.path.exists(video_local_path) and os.path.exists(tcm_path):
video_url = (
f"/{os.path.relpath(video_local_path, '.')}".replace(
"\\", "/"
)
)
# Use VTT file (better browser support)
subtitle_url = None
if os.path.exists(vtt_local_path):
subtitle_url = f"/{os.path.relpath(vtt_local_path, '.')}".replace("\\", "/")
with open(tcm_path, "r", encoding="utf-8") as f:
tcm_data = json.load(f)
# Helper function to render Streamlit-style chat messages
def render_chat_message(container, content, is_user=False):
"""Render a Streamlit-style chat message"""
with container:
role = "user" if is_user else "assistant"
avatar_text = "š¤" if is_user else "š¤"
with ui.element('div').classes(f"stchat-message {role}"):
# Avatar
with ui.element('div').classes("stchat-avatar"):
ui.label(avatar_text).classes("text-xl")
# Content
with ui.element('div').classes("stchat-content"):
ui.markdown(content)
with ui.row().classes(
"w-full no-wrap grid grid-cols-1 lg:grid-cols-5 gap-6"
):
with ui.column().classes("lg:col-span-3 gap-4").style("height: calc(100vh - 200px); min-height: 500px; max-height: 700px; display: flex; flex-direction: column;"):
# Video player card
with ui.card().classes("w-full shadow-2xl overflow-hidden p-0 border-0").style("flex-shrink: 0;"):
# Use HTML video element with SRT subtitles
video_player = None # Initialize for later reference
if subtitle_url and os.path.exists(vtt_local_path):
# Create a dummy element to attach events to
video_player = ui.element('div').classes('hidden')
# Simple video ID
video_id = "video_player"
ui.html(
f'''
''',
sanitize=False
)
# Enable subtitles automatically
def enable_subs():
ui.run_javascript(f'''
const video = document.getElementById('{video_id}');
if (video && video.textTracks.length > 0) {{
video.textTracks[0].mode = 'showing';
}}
''')
# Try multiple times to ensure subtitles are enabled
ui.timer(0.5, enable_subs, once=True)
ui.timer(1.5, enable_subs, once=True)
else:
video_player = ui.video(video_url).classes(
"w-full"
).style("display: block;")
video_player.props('id="video_player"')
# Quiz Section - matching AI Tutor styling
quiz_state = {
"questions": [],
"current_index": 0,
"show_answer": False,
"quiz_started": False
}
with ui.card().classes("w-full shadow-2xl border-0").style("flex: 1; min-height: 0; display: flex; flex-direction: column; overflow: hidden;"):
# Quiz header - matching AI Tutor header
with ui.row().classes("w-full items-center justify-between px-5 py-3 border-b dark:border-gray-700").style("flex-shrink: 0; background: linear-gradient(135deg, rgba(255, 75, 75, 0.05) 0%, rgba(255, 107, 107, 0.02) 100%);"):
with ui.row().classes("items-center gap-3"):
with ui.avatar(size="md").classes("shadow-md").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%);"):
ui.icon("quiz", size="sm").classes("text-white")
with ui.column().classes("gap-0"):
ui.label("Interactive Quiz").classes("text-lg font-bold text-gray-900 dark:text-white")
ui.label("Test your understanding").classes("text-xs text-gray-500 dark:text-gray-400")
# Quiz content container - matching AI Tutor scrollable area
with ui.column().style("flex: 1; min-height: 0; overflow-y: auto; overflow-x: hidden; width: 100% !important; box-sizing: border-box;").classes("p-4 gap-3"):
# Store quiz settings at this scope
quiz_settings = {"num_questions": 5, "question_type": "Multiple Choice", "difficulty": "Medium"}
# Create a container for dynamic content (this will be cleared)
quiz_content_container = ui.column().classes("w-full gap-3")
# Define helper functions at the proper scope
def show_question():
"""Display the current question"""
try:
quiz_content_container.clear()
if quiz_state["current_index"] >= len(quiz_state["questions"]):
# Quiz completed - celebration card
with quiz_content_container:
with ui.card().classes("w-full border-0 shadow-2xl").style("background: linear-gradient(135deg, #d1fae5 0%, #a7f3d0 100%); border-left: 4px solid #10b981; padding: 40px;"):
with ui.column().classes("items-center gap-4 w-full"):
with ui.avatar(size="xl").classes("shadow-lg").style("background: linear-gradient(135deg, #10b981 0%, #059669 100%);"):
ui.icon("emoji_events", size="lg").classes("text-white")
ui.label("Quiz Completed!").classes("text-2xl font-bold text-green-900")
ui.label("Great job reviewing the content!").classes("text-base text-green-800")
ui.separator().classes("bg-green-300 w-24 my-2")
ui.label(f"You answered {len(quiz_state['questions'])} questions").classes("text-sm text-green-700")
ui.button("Start New Quiz", on_click=lambda: (
quiz_state.update({"quiz_started": False, "current_index": 0, "questions": []}),
show_quiz_setup()
), icon="refresh").props("unelevated no-caps").classes("shadow-md mt-3").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 20px; font-size: 0.875rem;")
return
q = quiz_state["questions"][quiz_state["current_index"]]
with quiz_content_container:
# Progress indicator - modern design
progress_percent = int(((quiz_state['current_index'] + 1) / len(quiz_state['questions'])) * 100)
with ui.card().classes("w-full border-0 shadow-sm mb-4").style("background: linear-gradient(135deg, rgba(255, 75, 75, 0.05) 0%, rgba(255, 107, 107, 0.02) 100%); padding: 16px;"):
with ui.row().classes("w-full items-center justify-between"):
with ui.row().classes("items-center gap-2"):
ui.icon("quiz", size="sm").classes("text-primary")
ui.label(f"Question {quiz_state['current_index'] + 1} of {len(quiz_state['questions'])}").classes("text-sm font-bold text-gray-700 dark:text-gray-300")
with ui.row().classes("items-center gap-2"):
ui.label(f"{progress_percent}%").classes("text-xs font-semibold text-primary")
ui.linear_progress(
value=(quiz_state['current_index'] + 1) / len(quiz_state['questions']),
show_value=False
).props("color=primary size=8px").classes("w-32")
# Question card - modern professional style
with ui.card().classes("w-full border-0 shadow-xl").style("background: #ffffff; border: 1px solid #e6e9ef; padding: 24px;"):
with ui.column().classes("gap-4 w-full"):
# Question text
ui.label(q["question"]).classes("text-base font-semibold text-gray-900 dark:text-white mb-2")
if not quiz_state["show_answer"]:
# Show options - matching AI Tutor suggestion button style
ui.separator().classes("my-2")
for idx, option in enumerate(q.get("options", [])):
with ui.button(
on_click=lambda opt=option: handle_answer(opt)
).props("outline no-caps").classes("w-full hover:bg-primary/5 mb-2").style("justify-content: flex-start; padding: 12px 16px; border-radius: 8px; border-color: rgba(255, 75, 75, 0.2);"):
with ui.row().classes("items-center gap-3 w-full").style("flex-wrap: nowrap;"):
with ui.avatar(size="sm").classes("text-xs font-bold").style(f"background: linear-gradient(135deg, rgba(255, 75, 75, 0.1) 0%, rgba(255, 107, 107, 0.05) 100%); color: #FF4B4B;"):
ui.label(chr(65 + idx)) # A, B, C, D
ui.label(option).classes("text-sm text-gray-700 dark:text-gray-300 flex-grow text-left")
else:
# Show answer and explanation - modern professional style
ui.separator().classes("my-3")
with ui.card().classes("w-full border-0 shadow-lg").style("background: linear-gradient(135deg, #d1fae5 0%, #a7f3d0 100%); border-left: 4px solid #10b981; padding: 20px;"):
with ui.column().classes("gap-3 w-full"):
with ui.row().classes("items-center gap-2 mb-2"):
ui.icon("check_circle", size="md").classes("text-green-700")
ui.label("Correct Answer").classes("font-bold text-base text-green-900")
ui.label(q["correct_answer"]).classes("text-base font-semibold text-green-800 ml-8")
if q.get("explanation"):
ui.separator().classes("bg-green-300 my-2")
with ui.row().classes("items-start gap-2"):
ui.icon("lightbulb", size="sm").classes("text-green-700 flex-shrink-0").style("margin-top: 2px;")
with ui.column().classes("gap-1 flex-grow"):
ui.label("Explanation").classes("font-bold text-sm text-green-800")
ui.label(q["explanation"]).classes("text-sm text-green-700")
# Navigation buttons - modern style
with ui.row().classes("w-full justify-end gap-2 mt-3"):
ui.button(
"Next Question" if quiz_state["current_index"] < len(quiz_state["questions"]) - 1 else "Finish Quiz",
on_click=lambda: (
quiz_state.update({"current_index": quiz_state["current_index"] + 1, "show_answer": False}),
show_question()
),
icon="arrow_forward"
).props("unelevated no-caps").classes("shadow-md").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 20px; font-size: 0.875rem;")
except Exception as e:
print(f"ERROR in show_question: {e}")
import traceback
traceback.print_exc()
def handle_answer(selected):
"""Handle user's answer selection"""
quiz_state["show_answer"] = True
show_question()
async def generate_quiz_async():
"""Generate quiz questions using LLM"""
try:
quiz_content_container.clear()
with quiz_content_container:
with ui.card().classes("border-0 shadow-lg").style("width: 100% !important; height: 100%; min-height: 300px; display: flex; align-items: center; justify-content: center; background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%);"):
with ui.column().classes("items-center justify-center gap-3"):
ui.spinner(size="lg", color="primary")
ui.label("Generating quiz questions...").classes("text-sm font-semibold text-gray-700 dark:text-gray-300")
ui.label("This may take a moment").classes("text-xs text-gray-500 dark:text-gray-400")
# Build context from TCM
context_parts = [f"Video Topic: {topic}\n"]
if tcm_data:
context_parts.append("Video Content Summary:")
for entry in tcm_data[:10]: # Use first 10 entries for context
concept = entry.get("conceptName", "")
narration = entry.get("narrationText", "")
if concept and narration:
context_parts.append(f"- {concept}: {narration[:200]}")
context = "\n".join(context_parts)
prompt = f"""Based on this educational video content, generate {quiz_settings["num_questions"]} {quiz_settings["question_type"]} quiz questions at {quiz_settings["difficulty"]} difficulty level.
{context}
Generate questions in this EXACT JSON format:
[
{{
"question": "Question text here?",
"type": "multiple_choice",
"options": ["Option A", "Option B", "Option C", "Option D"],
"correct_answer": "Option A",
"explanation": "Brief explanation of why this is correct"
}}
]
For True/False questions, use options: ["True", "False"]
Make questions relevant to the video content and educational."""
quiz_llm = LiteLLMWrapper(model_name=app_state["planner_model_name"])
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: quiz_llm([{"type": "text", "content": prompt}])
)
# Parse JSON response
import json
import re
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if json_match:
quiz_state["questions"] = json.loads(json_match.group())
quiz_state["current_index"] = 0
quiz_state["show_answer"] = False
quiz_state["quiz_started"] = True
show_question()
else:
raise ValueError("Could not parse quiz questions")
except Exception as e:
quiz_content_container.clear()
with quiz_content_container:
with ui.card().classes("w-full border-0 shadow-lg").style("background: linear-gradient(135deg, #fee2e2 0%, #fecaca 100%); border-left: 4px solid #ef4444; padding: 24px;"):
with ui.column().classes("gap-3 w-full"):
with ui.row().classes("items-center gap-2"):
ui.icon("error_outline", size="md").classes("text-red-600")
ui.label("Error Generating Quiz").classes("text-lg font-bold text-red-900")
ui.label(f"{str(e)}").classes("text-sm text-red-700 ml-8")
ui.button("Try Again", on_click=show_quiz_setup, icon="refresh").props("unelevated no-caps").classes("shadow-md mt-2").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 16px; font-size: 0.875rem;")
def show_quiz_setup():
"""Display the initial quiz configuration form"""
try:
quiz_content_container.clear()
with quiz_content_container:
# Initial quiz setup - matching AI Tutor welcome card
with ui.card().classes("w-full border-0 shadow-lg").style("background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%); padding: 28px; border-left: 4px solid #FF4B4B;"):
# Instructions - Row 1
with ui.row().classes("items-center gap-2 mb-3"):
ui.icon("info_outline", size="sm").classes("text-blue-500 flex-shrink-0")
ui.label("Configure your quiz settings below and click Generate Quiz to start:").classes("text-xs text-gray-700 dark:text-gray-300 flex-grow")
# Settings - Row 2
with ui.row().classes("gap-3 mb-3").style("width: 100% !important; display: flex; box-sizing: border-box;"):
with ui.column().style("flex: 1; min-width: 0;"):
with ui.row().classes("items-center gap-2 mb-1"):
ui.icon("numbers", size="sm").classes("text-primary")
ui.label("Questions").classes("text-sm font-semibold text-gray-700 dark:text-gray-300")
num_questions = ui.number(value=quiz_settings["num_questions"], min=1, max=10, step=1, on_change=lambda e: quiz_settings.update({"num_questions": int(e.value)})).props("outlined dense").style("width: 100%; border-radius: 8px;")
with ui.column().style("flex: 1; min-width: 0;"):
with ui.row().classes("items-center gap-2 mb-1"):
ui.icon("category", size="sm").classes("text-primary")
ui.label("Type").classes("text-sm font-semibold text-gray-700 dark:text-gray-300")
question_type = ui.select(
options=["Multiple Choice", "True/False", "Mixed"],
value=quiz_settings["question_type"],
on_change=lambda e: quiz_settings.update({"question_type": e.value})
).props("outlined dense").style("width: 100%; border-radius: 8px;")
with ui.column().style("flex: 1; min-width: 0;"):
with ui.row().classes("items-center gap-2 mb-1"):
ui.icon("speed", size="sm").classes("text-primary")
ui.label("Difficulty").classes("text-sm font-semibold text-gray-700 dark:text-gray-300")
difficulty = ui.select(
options=["Easy", "Medium", "Hard"],
value=quiz_settings["difficulty"],
on_change=lambda e: quiz_settings.update({"difficulty": e.value})
).props("outlined dense").style("width: 100%; border-radius: 8px;")
# Separator
ui.separator().classes("my-2")
# Generate Button
ui.button(
"Generate Quiz",
on_click=generate_quiz_async,
icon="auto_awesome"
).props("unelevated no-caps").classes("shadow-md mt-2 w-full").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 16px; font-size: 0.875rem;")
except Exception as e:
print(f"ERROR in show_quiz_setup: {e}")
import traceback
traceback.print_exc()
# Show initial setup
show_quiz_setup()
with ui.column().classes("lg:col-span-2"):
# Add Streamlit-style chat CSS
ui.add_head_html("""
""")
with ui.card().classes("w-full shadow-2xl border-0").style("height: calc(100vh - 200px); min-height: 500px; max-height: 700px; display: flex; flex-direction: column; overflow: hidden;"):
# ---------------- HEADER ----------------
with ui.row().classes(
"w-full items-center justify-between px-5 py-3 border-b dark:border-gray-700"
).style("flex-shrink: 0; background: linear-gradient(135deg, rgba(255, 75, 75, 0.05) 0%, rgba(255, 107, 107, 0.02) 100%);"):
with ui.row().classes("items-center gap-3"):
with ui.avatar(size="md").classes("shadow-md").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%);"):
ui.icon("psychology", size="sm").classes("text-white")
with ui.column().classes("gap-0"):
ui.label("AI Tutor").classes(
"text-lg font-bold text-gray-900 dark:text-white"
)
ui.label("Your intelligent learning companion").classes(
"text-xs text-gray-500 dark:text-gray-400"
)
def clear_chat():
chat_history.clear()
chat_container.clear()
# Recreate the welcome card with suggestions
with chat_container:
with ui.card().classes(
"w-full border-0 shadow-lg"
).style("background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%); padding: 28px; border-left: 4px solid #FF4B4B;"):
# Instructions
with ui.row().classes("items-start gap-2 mb-3").style("flex-wrap: nowrap;"):
ui.icon("info_outline", size="sm").classes("text-blue-500 flex-shrink-0").style("margin-top: 2px;")
ui.label(
"Pause the video at any moment to ask questions. Try these suggestions:"
).classes("text-xs text-gray-700 dark:text-gray-300 flex-grow")
# Suggestion buttons
suggestions = [
("help_outline", "Explain this in simpler terms"),
("lightbulb", "Give me a real-world example"),
("list", "Break down this step-by-step"),
("star", "Why is this concept important?"),
]
for icon, suggestion in suggestions:
with ui.button(
on_click=lambda s=suggestion: (
setattr(chat_input, 'value', s),
chat_input.run_method("focus")
)
).props("outline no-caps").classes(
"w-full mb-2 hover:bg-primary/5"
).style("justify-content: flex-start; padding: 12px 16px; border-radius: 8px; border-color: rgba(255, 75, 75, 0.2);"):
with ui.row().classes("items-center gap-3").style("flex-wrap: nowrap;"):
ui.icon(icon, size="sm").classes("text-primary flex-shrink-0")
ui.label(suggestion).classes("text-sm text-gray-700 dark:text-gray-300 flex-grow")
ui.notify("Chat history cleared", type="positive", icon="check_circle")
ui.button(
icon="refresh",
on_click=clear_chat
).props("flat round dense").classes("text-gray-600 dark:text-gray-400 hover:text-primary hover:bg-primary/10").tooltip("Clear chat history")
# ---------------- CHAT CONTAINER ----------------
with ui.column().style("flex: 1; min-height: 0; position: relative; overflow: hidden;"):
chat_container = ui.column().classes(
"h-full p-4 overflow-y-auto overflow-x-hidden gap-3"
).props('id="chat_container"').style("scroll-behavior: smooth;")
# Scroll to bottom button (initially hidden)
scroll_btn = ui.button(
icon="arrow_downward",
on_click=lambda: ui.run_javascript(
'document.getElementById("chat_container").scrollTop = '
'document.getElementById("chat_container").scrollHeight'
)
).props("fab-mini color=primary").classes(
"absolute bottom-4 right-4 shadow-lg"
).style("display: none;")
scroll_btn.tooltip("Scroll to bottom")
# Show/hide scroll button based on scroll position
ui.add_body_html("""
""")
# ------------------------------------------------------
# LOAD HISTORY
# ------------------------------------------------------
chat_history = app_state["chat_histories"].setdefault(
topic, []
)
with chat_container:
if not chat_history:
# Professional welcome card
with ui.card().classes(
"w-full border-0 shadow-lg"
).style("background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%); padding: 28px; border-left: 4px solid #FF4B4B;"):
# Instructions
with ui.row().classes("items-start gap-2 mb-3").style("flex-wrap: nowrap;"):
ui.icon("info_outline", size="sm").classes("text-blue-500 flex-shrink-0").style("margin-top: 2px;")
ui.label(
"Pause the video at any moment to ask questions. Try these suggestions:"
).classes("text-xs text-gray-700 dark:text-gray-300 flex-grow")
# Suggestion buttons
suggestions = [
("help_outline", "Explain this in simpler terms"),
("lightbulb", "Give me a real-world example"),
("list", "Break down this step-by-step"),
("star", "Why is this concept important?"),
]
for icon, suggestion in suggestions:
with ui.button(
on_click=lambda s=suggestion: (
setattr(chat_input, 'value', s),
chat_input.run_method("focus")
)
).props("outline no-caps").classes(
"w-full mb-2 hover:bg-primary/5"
).style("justify-content: flex-start; padding: 12px 16px; border-radius: 8px; border-color: rgba(255, 75, 75, 0.2);"):
with ui.row().classes("items-center gap-3").style("flex-wrap: nowrap;"):
ui.icon(icon, size="sm").classes("text-primary flex-shrink-0")
ui.label(suggestion).classes("text-sm text-gray-700 dark:text-gray-300 flex-grow")
else:
# Load existing messages with custom styling
for msg in chat_history:
is_user = msg["role"] == "user"
render_chat_message(chat_container, msg["content"], is_user)
# ------------------------------------------------------
# INPUT BAR
# ------------------------------------------------------
with ui.column().classes("w-full").style("flex-shrink: 0; background: linear-gradient(to top, rgba(255,255,255,0.98), rgba(255,255,255,0.95)); backdrop-filter: blur(10px); border-top: 1px solid rgba(0,0,0,0.08); padding-top: 16px;"):
# Input row
with ui.row().classes("w-full items-end gap-3 px-5 pb-3"):
with ui.card().classes("flex-grow shadow-sm border-0").style("background: white; padding: 8px 16px; border-radius: 20px; border: 2px solid rgba(255, 75, 75, 0.15); overflow: hidden;"):
chat_input = (
ui.textarea(placeholder="Ask me anything about the video...")
.props("borderless dense autogrow")
.classes("w-full")
.style("max-height: 100px; min-height: 40px; font-size: 14px; overflow-y: auto; word-wrap: break-word; overflow-wrap: break-word;")
)
# Send button
send_button = ui.button(
icon="send"
).props("round unelevated").classes("shadow-lg").style(
"flex-shrink: 0; width: 48px; height: 48px; background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%);"
)
send_button.tooltip("Send message")
# JavaScript to handle Enter key properly
ui.add_body_html("""
""")
# Auto-focus input on load
ui.timer(0.1, lambda: chat_input.run_method("focus"), once=True)
# ------------------------------------------------------
# SEND MESSAGE FUNCTION (define before use)
# ------------------------------------------------------
async def send_message():
# Retrieve text from input box
question = chat_input.value or ""
if not question.strip():
return
# Clear input and disable controls
chat_input.value = ""
chat_input.disable()
send_button.disable()
try:
# Store in history
chat_history.append(
{"role": "user", "content": question}
)
# Display user message
render_chat_message(chat_container, question, is_user=True)
# Auto-scroll to bottom
await ui.run_javascript(
'document.getElementById("chat_container").scrollTop = '
'document.getElementById("chat_container").scrollHeight'
)
# Show Streamlit-style typing indicator
with chat_container:
with ui.element('div').classes("stchat-message assistant") as typing_indicator:
with ui.element('div').classes("stchat-avatar"):
ui.label("š¤").classes("text-xl")
with ui.element('div').classes("stchat-content"):
with ui.row().classes("items-center gap-2"):
ui.spinner(size="sm", color="primary")
ui.label("Thinking...").classes("text-sm")
# Auto-scroll to show typing indicator
await ui.run_javascript(
'document.getElementById("chat_container").scrollTop = '
'document.getElementById("chat_container").scrollHeight'
)
# Build LLM prompt with rich TCM context
current_entry = app_state["current_tcm_entry"] or {}
timestamp = app_state.get("latest_pause_time", 0.0)
# Load system prompt from file
try:
with open("prompts/ai_tutor_system_prompt.txt", "r", encoding="utf-8") as f:
system_prompt_template = f.read()
# Replace placeholders
system_prompt = system_prompt_template.format(
topic=topic,
time=f"{timestamp:.1f}"
)
except FileNotFoundError:
# Fallback if file doesn't exist
system_prompt = (
f"You are an AI tutor helping students understand educational video content about {topic}. "
f"The student paused at {timestamp:.1f}s. "
"Provide clear, concise explanations based on the context provided."
)
# Build rich context from TCM
context_parts = [f"=== VIDEO: {topic} ===\n"]
if current_entry:
concept = current_entry.get("conceptName", "Unknown")
narration = current_entry.get("narrationText", "")
visual = current_entry.get("visualDescription", "")
concept_id = current_entry.get("conceptId", "")
context_parts.append(f"STUDENT PAUSED AT: {timestamp:.1f} seconds")
context_parts.append(f"\n--- WHAT'S HAPPENING RIGHT NOW ---")
context_parts.append(f"Concept Being Explained: {concept}")
if narration:
context_parts.append(f"\nNarration (what's being said):\n\"{narration}\"")
if visual:
context_parts.append(f"\nVisual (what's on screen):\n{visual}")
if concept_id:
context_parts.append(f"\nConcept ID: {concept_id}")
# Extract scene number and load implementation context
scene_num = extract_scene_number_from_concept_id(concept_id)
if scene_num:
try:
scene_context = get_scene_implementation_context(
"output", topic, scene_num
)
context_parts.append(f"\n--- SCENE {scene_num} IMPLEMENTATION DETAILS ---")
# Add visual storyboard plan
if scene_context.get("visual_plan"):
context_parts.append(f"\nVisual Storyboard Plan:")
context_parts.append(scene_context["visual_plan"][:800] + "..." if len(scene_context["visual_plan"]) > 800 else scene_context["visual_plan"])
# Add technical implementation plan
if scene_context.get("technical_plan"):
context_parts.append(f"\nTechnical Implementation:")
context_parts.append(scene_context["technical_plan"][:800] + "..." if len(scene_context["technical_plan"]) > 800 else scene_context["technical_plan"])
# Add animation narration plan
if scene_context.get("animation_narration"):
context_parts.append(f"\nAnimation & Narration Plan:")
context_parts.append(scene_context["animation_narration"][:800] + "..." if len(scene_context["animation_narration"]) > 800 else scene_context["animation_narration"])
# Add code implementation (truncated for context window)
if scene_context.get("code"):
context_parts.append(f"\nMaim Code Implementation:")
code_preview = scene_context["code"][:1500] + "..." if len(scene_context["code"]) > 1500 else scene_context["code"]
context_parts.append(f"```python\n{code_preview}\n```")
except Exception as e:
# Silently fail if scene context can't be loaded
print(f"Could not load scene context: {e}")
else:
context_parts.append("(Video is currently playing - student needs to pause to get specific context)")
# Add surrounding context for better understanding
current_idx = next((i for i, e in enumerate(tcm_data) if e == current_entry), -1)
if current_idx > 0:
prev_entry = tcm_data[current_idx - 1]
context_parts.append(f"\n--- WHAT CAME BEFORE ---")
context_parts.append(f"Previous Concept: {prev_entry.get('conceptName', 'Unknown')}")
prev_narration = prev_entry.get('narrationText', '')
if len(prev_narration) > 150:
prev_narration = prev_narration[:150] + "..."
context_parts.append(f"Previous Narration: \"{prev_narration}\"")
if current_idx < len(tcm_data) - 1:
next_entry = tcm_data[current_idx + 1]
context_parts.append(f"\n--- WHAT COMES NEXT ---")
context_parts.append(f"Next Concept: {next_entry.get('conceptName', 'Unknown')}")
next_narration = next_entry.get('narrationText', '')
if len(next_narration) > 150:
next_narration = next_narration[:150] + "..."
context_parts.append(f"Next Narration: \"{next_narration}\"")
else:
context_parts.append("(Video is currently playing - student needs to pause to get specific context)")
context_info = "\n".join(context_parts)
# Build messages in the format expected by LiteLLMWrapper
messages_for_llm = [
{"type": "system", "content": system_prompt},
{"type": "text", "content": f"VIDEO CONTEXT:\n{context_info}"},
{"type": "text", "content": f"STUDENT QUESTION: {question}"}
]
# Call model wrapper
chat_llm = LiteLLMWrapper(
model_name=app_state["planner_model_name"]
)
# Run in executor to avoid blocking
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: chat_llm(messages_for_llm)
)
# Remove typing indicator
chat_container.remove(typing_indicator)
# Add assistant response to history
chat_history.append(
{"role": "assistant", "content": response}
)
# Display assistant message
render_chat_message(chat_container, response, is_user=False)
# Auto-scroll to bottom
await ui.run_javascript(
'document.getElementById("chat_container").scrollTop = '
'document.getElementById("chat_container").scrollHeight'
)
except Exception as e:
# Remove typing indicator if it exists
try:
chat_container.remove(typing_indicator)
except:
pass
# Show error message
error_msg = str(e) if str(e) else repr(e)
with chat_container:
with ui.element('div').classes("stchat-message assistant").style("background-color: #fee2e2; border-color: #fca5a5;"):
with ui.element('div').classes("stchat-avatar").style("background: linear-gradient(135deg, #ef4444 0%, #dc2626 100%);"):
ui.label("ā ļø").classes("text-xl")
with ui.element('div').classes("stchat-content").style("color: #991b1b;"):
ui.markdown(f"**Error:** Unable to get response.\n\n`{error_msg}`")
ui.notify(
f"Failed to get AI response: {error_msg}",
type="negative"
)
finally:
# Re-enable controls
chat_input.enable()
send_button.enable()
chat_input.run_method("focus")
# Connect button to function
send_button.on_click(send_message)
# ------------------------------------------------------
# UPDATE VIDEO CONTEXT
# ------------------------------------------------------
async def update_context(
e: events.GenericEventArguments,
):
ts = await ui.run_javascript(
"document.getElementById('video_player')?.currentTime || 0"
)
try:
ts = float(ts)
except Exception:
ts = 0.0
app_state["latest_pause_time"] = ts
def valid_entry(entry):
try:
return (
float(entry["startTime"])
<= ts
< float(entry["endTime"])
)
except Exception:
return False
found_entry = next(
(x for x in tcm_data if valid_entry(x)), None
)
app_state["current_tcm_entry"] = found_entry
if found_entry:
concept_name = found_entry.get('conceptName', 'Unknown')
narration_preview = found_entry.get('narrationText', '')[:80]
if len(found_entry.get('narrationText', '')) > 80:
narration_preview += "..."
context_label.set_content(
f"āøļø **Current Topic:** {concept_name} (`{ts:.1f}s`)\n\n"
f"_{narration_preview}_"
)
context_label.classes(
"p-3 text-sm bg-green-50 dark:bg-green-900/20 "
"border-l-4 border-green-500 rounded-r",
remove="bg-blue-50 dark:bg-gray-800 border-blue-500"
)
else:
context_label.set_content(
"ā¶ļø **Pause the video** to ask a question about what you're watching."
)
context_label.classes(
"p-3 text-sm bg-blue-50 dark:bg-gray-800 "
"border-l-4 border-blue-500 rounded-r",
remove="bg-green-50 dark:bg-green-900/20 border-green-500"
)
video_player.on("seeked", update_context)
video_player.on("pause", update_context)
video_player.on("play", lambda e: context_label.set_content(
"ā¶ļø **Video Playing** - Pause to ask questions."
))
else:
# Better empty state for missing video
with ui.card().classes("w-full p-12 text-center"):
ui.icon("video_camera_back", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4")
ui.label("Video Not Generated Yet").classes("text-xl font-semibold text-gray-600 dark:text-gray-400 mb-2")
ui.label(
"Use the Utilities tab to continue and complete the video generation for this project."
).classes("text-gray-500 dark:text-gray-500 mb-4")
ui.button("Go to Utilities", icon="build", on_click=lambda: main_tabs.set_value("š§ Utilities")).props("unelevated no-caps").classes("mt-2")
with ui.tab_panel(t_plan):
outline_path = os.path.join(
project_path, f"{inner_folder_name}_scene_outline.txt"
)
if os.path.exists(outline_path):
# Parse the scene outline
outline_content = safe_read_file(outline_path, clean=False)
# Extract scenes using regex
import re
scene_pattern = r'(.*?)'
scenes = re.findall(scene_pattern, outline_content, re.DOTALL)
if scenes:
# Header card
with ui.card().classes("w-full shadow-lg border-l-4 border-primary mb-6"):
with ui.row().classes("w-full items-center gap-3 p-6"):
ui.icon("movie_creation", size="lg").classes("text-primary")
with ui.column().classes("gap-1"):
ui.label(topic).classes("text-2xl font-bold text-gray-900 dark:text-white")
ui.label(f"{len(scenes)} scenes planned").classes("text-sm text-gray-600 dark:text-gray-400")
# Scene cards
for scene_num, scene_content in scenes:
# Extract scene details
title_match = re.search(r'Scene Title:\s*(.+?)(?:\n|$)', scene_content)
purpose_match = re.search(r'Scene Purpose:\s*(.+?)(?:\n|Scene Description)', scene_content, re.DOTALL)
desc_match = re.search(r'Scene Description:\s*(.+?)(?:\n|Scene Layout)', scene_content, re.DOTALL)
title = title_match.group(1).strip() if title_match else f"Scene {scene_num}"
purpose = purpose_match.group(1).strip() if purpose_match else ""
description = desc_match.group(1).strip() if desc_match else ""
with ui.card().classes("w-full shadow-md hover:shadow-lg transition-shadow mb-4"):
# Scene header
with ui.row().classes("w-full items-start gap-3 p-4 bg-gradient-to-r from-primary/5 to-transparent border-b dark:border-gray-700"):
with ui.column().classes("gap-2 flex-grow"):
# Title with scene number
with ui.row().classes("items-center gap-2"):
ui.label(f"Scene {scene_num}:").classes("text-lg font-bold text-primary")
ui.label(title).classes("text-lg font-semibold text-gray-900 dark:text-white")
# Purpose with inline icon
if purpose:
with ui.row().classes("items-center gap-1.5").style("flex-wrap: nowrap;"):
ui.icon("lightbulb", size="xs").classes("text-amber-500 flex-shrink-0")
ui.label(purpose).classes("text-sm text-gray-600 dark:text-gray-400")
# Scene content with inline icon
if description:
with ui.row().classes("items-start gap-2 p-4").style("flex-wrap: nowrap;"):
ui.icon("description", size="sm").classes("text-blue-500 flex-shrink-0").style("margin-top: 2px;")
ui.label(description).classes("text-sm text-gray-700 dark:text-gray-300 leading-relaxed flex-grow")
else:
# Fallback to showing formatted text if parsing fails
with ui.card().classes("w-full shadow-lg"):
with ui.row().classes("w-full items-center gap-2 p-4 border-b dark:border-gray-700 bg-gray-50 dark:bg-gray-800"):
ui.icon("description", size="sm").classes("text-primary")
ui.label("Video Plan").classes("text-lg font-semibold")
with ui.column().classes("p-6"):
ui.markdown(outline_content).classes("prose dark:prose-invert max-w-none")
else:
with ui.card().classes("w-full p-12 text-center"):
ui.icon("description", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4")
ui.label("Plan Not Available").classes("text-xl font-semibold text-gray-600 dark:text-gray-400")
ui.label("The video plan hasn't been generated yet").classes("text-sm text-gray-500 dark:text-gray-500 mt-2")
for i, scene_dir_name in enumerate(scene_dirs):
with ui.tab_panel(scene_tabs_list[i]):
scene_num = i + 1
scene_path = os.path.join(project_path, scene_dir_name)
# Scene header
with ui.card().classes("w-full mb-4 shadow-md border-l-4 border-secondary"):
with ui.row().classes("w-full items-center p-3"):
ui.icon("movie_filter", size="md").classes("text-secondary")
ui.label(f"Scene {scene_num}").classes("text-xl font-bold ml-2")
ui.space()
with ui.tabs().props("dense align=left").classes("w-full") as asset_tabs:
at_vision = ui.tab("šŗļø Vision").props("no-caps")
at_tech = ui.tab("š ļø Technical").props("no-caps")
at_narr = ui.tab("šļø Animation").props("no-caps")
at_code = ui.tab("š» Code").props("no-caps")
at_vid = ui.tab("š¬ Output").props("no-caps")
with ui.tab_panels(asset_tabs, value=at_vision).classes(
"w-full bg-transparent mt-4"
):
def display_plan(file_pattern, default_text, icon_name="description", is_editing=False):
files = glob.glob(
os.path.join(scene_path, file_pattern)
)
if files:
file_path = files[0]
plan_text = safe_read_file(
file_path, clean=True
) # remove XML tags etc. for display, but for editing we might want raw?
# Actually safe_read_file with clean=True removes the XML tags which makes it nicer to edit.
# And since the planner parses XML, we should be careful.
# But safe_read_file removes the outer tags.
# If we save it back without tags, the parser might fail if it expects them?
# The parser in `check_status` uses `extract_xml`.
# But `VideoGenerator` re-reads these files?
# `VideoGenerator` reads the plan files during code generation?
# `CodeGenerator` reads implementation plans.
# It likely reads the raw file.
# So if we strip tags, we might break it.
# Let's read RAW for editing to be safe.
if is_editing:
raw_text = safe_read_file(file_path, clean=False)
if not plan_text.strip() and not is_editing:
with ui.card().classes("w-full p-8 text-center"):
ui.icon(icon_name, size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2")
ui.label(f"{default_text} is empty").classes("text-gray-500")
return
if is_editing:
with ui.card().classes("w-full shadow-md"):
with ui.column().classes("p-4 w-full"):
editor = ui.textarea(value=raw_text).props("outlined autogrow").classes("w-full font-mono text-sm")
editor.style("min-height: 300px;")
def save_plan(path=file_path, content_getter=lambda: editor.value):
try:
with open(path, "w", encoding="utf-8") as f:
f.write(content_getter())
ui.notify("Plan saved successfully!", color="positive", icon="save")
except Exception as e:
ui.notify(f"Error saving plan: {e}", color="negative")
with ui.row().classes("w-full justify-end mt-2"):
ui.button("Save Changes", icon="save", on_click=lambda: save_plan()).props("unelevated color=primary")
else:
# Optional cleanup for readability
plan_text = re.sub(
r"#+\s*Scene.*", "", plan_text
).strip()
plan_text = plan_text.replace(
"[SCENE_VISION]", "### š¬ Vision"
)
plan_text = plan_text.replace(
"[STORYBOARD]", "### šŗļø Storyboard"
)
plan_text = plan_text.replace(
"[ANIMATION_STRATEGY]",
"### šØ Animation Strategy",
)
plan_text = plan_text.replace(
"[NARRATION]", "### šļø Animation"
)
with ui.card().classes("w-full shadow-md"):
with ui.column().classes("p-6"):
ui.markdown(plan_text).classes("prose dark:prose-invert max-w-none")
else:
with ui.card().classes("w-full p-8 text-center"):
ui.icon(icon_name, size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2")
ui.label(f"{default_text} not found").classes("text-gray-500")
with ui.tab_panel(at_vision):
display_plan(
"subplans/*_vision_storyboard_plan.txt",
"Vision Plan",
"visibility",
is_editing=is_editing
)
with ui.tab_panel(at_tech):
display_plan(
"subplans/*_technical_implementation_plan.txt",
"Technical Plan",
"engineering",
is_editing=is_editing
)
with ui.tab_panel(at_narr):
display_plan(
"subplans/*_animation_narration_plan.txt",
"Narration Plan",
"mic",
is_editing=is_editing
)
with ui.tab_panel(at_code):
code_path = os.path.join(scene_path, "code")
if os.path.exists(code_path):
code_files = sorted(
glob.glob(os.path.join(code_path, "*.py"))
)
if not code_files:
with ui.card().classes("w-full p-8 text-center"):
ui.icon("code", size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2")
ui.label("No code files found").classes("text-gray-500")
else:
with ui.card().classes("w-full shadow-md"):
code_versions = {
os.path.basename(f): f
for f in code_files
}
# Header with file selector
with ui.row().classes("w-full items-center gap-3 p-4 border-b dark:border-gray-700 bg-gray-50 dark:bg-gray-800"):
ui.icon("code", size="sm").classes("text-primary")
ui.label("Scene Code").classes("text-lg font-semibold")
ui.space()
code_display_area = ui.column().classes("w-full p-4")
def show_code(path):
code_display_area.clear()
with code_display_area:
ui.code(
safe_read_file(
path, clean=False
)
).classes("w-full")
ui.select(
options=code_versions,
label="Version",
on_change=lambda e: show_code(e.value),
).props("outlined dense").style("min-width: 200px;")
show_code(code_files[-1])
else:
with ui.card().classes("w-full p-8 text-center"):
ui.icon("code_off", size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2")
ui.label("Code not generated yet").classes("text-gray-500")
with ui.tab_panel(at_vid):
video_file = find_latest_video_for_scene(
project_path, scene_num
)
if video_file:
with ui.card().classes("w-full shadow-lg overflow-hidden p-0"):
video_url = (
f"/{os.path.relpath(video_file, '.')}".replace(
"\\", "/"
)
)
ui.video(video_url).classes("w-full").style("display: block;")
else:
with ui.card().classes("w-full p-12 text-center"):
ui.icon("videocam_off", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4")
ui.label("Scene Not Rendered Yet").classes("text-xl font-semibold text-gray-600 dark:text-gray-400 mb-2")
ui.label("This scene hasn't been rendered. Generate it from the main tab.").classes("text-gray-500")
def update_util_tab():
util_content.clear()
async def handle_finalize(topic):
if not topic:
ui.notify("Please select a project.", color="warning")
return
finalize_button.disable()
finalize_button.set_text("ā³ Finalizing...")
try:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, combine_videos, topic)
# Handle result and show appropriate notification
if result == "success":
ui.notify(f"ā
Project '{topic}' finalized successfully!", color="positive")
elif result == "already_exists":
ui.notify(f"ā¹ļø Combined assets already exist for '{topic}'", color="info")
elif result == "no_scenes":
ui.notify(f"ā ļø No rendered scenes found for '{topic}'", color="warning")
project_path = get_project_path("output", topic)
inner_folder_name = os.path.basename(project_path)
video_local_path = os.path.join(
project_path, f"{inner_folder_name}_combined.mp4"
)
if os.path.exists(video_local_path):
video_url = f"/{os.path.relpath(video_local_path, '.')}".replace(
"\\", "/"
)
finalized_video_player.set_source(video_url)
finalized_video_player.style("display: block;")
await update_dashboard()
except Exception as e:
ui.notify(f"Error finalizing project: {e}", color="negative")
finally:
finalize_button.enable()
finalize_button.set_text("š¬ Finalize Project")
async def handle_continue(topic):
if not topic:
ui.notify("Please select a project.", color="warning")
return
# Get project description from outline
project_path = get_project_path("output", topic)
inner_folder_name = os.path.basename(project_path)
scene_outline_path = os.path.join(project_path, f"{inner_folder_name}_scene_outline.txt")
description = f"Continue generating the unfinished scenes for {topic}"
if os.path.exists(scene_outline_path):
with open(scene_outline_path, "r", encoding="utf-8") as f:
outline_content = f.read()
# Try to extract description from outline if available
if "Description:" in outline_content:
desc_start = outline_content.find("Description:") + len("Description:")
desc_end = outline_content.find("\n\n", desc_start)
if desc_end > desc_start:
description = outline_content[desc_start:desc_end].strip()
continue_button.disable()
continue_button.set_text("ā³ Resuming...")
progress_container_continue.style("display: block;")
progress_bar_continue.set_visibility(True)
progress_label_continue.set_text("š Starting continuation...")
progress_bar_continue.value = 0
log_output_continue.clear()
log_output_continue.push(f"š Starting continuation for '{topic}'")
log_output_continue.push(f"š Checking project status...")
try:
# Shared state for progress updates
import concurrent.futures
import threading
import sys
from io import StringIO
generation_complete = threading.Event()
generation_error = [None]
current_progress = {"value": 0.1, "text": "Starting..."}
log_buffer = []
progress_lock = threading.Lock()
def progress_callback(value, text):
with progress_lock:
current_progress["value"] = value
current_progress["text"] = text
class LogCapture:
def __init__(self, original_stream, buffer_list):
self.original_stream = original_stream
self.buffer_list = buffer_list
self.skip_patterns = [
"Langfuse client is disabled",
"LANGFUSE_PUBLIC_KEY",
"No video folders found",
"langfuse.com/docs",
"See our docs:",
"==> Loading existing implementation plans",
"<== Finished loading plans",
"Found: 0, Missing:",
"Generating missing implementation plans for scenes:",
"Loaded existing topic session ID:",
"Saved topic session ID to",
]
def write(self, text):
self.original_stream.write(text)
if text.strip():
should_skip = any(pattern in text for pattern in self.skip_patterns)
if not should_skip:
cleaned_text = text.rstrip()
replacements = {
"STARTING VIDEO PIPELINE FOR TOPIC:": "š Continuation started for:",
"[PHASE 1: SCENE OUTLINE]": "š Phase 1: Checking video structure",
"[PHASE 1 COMPLETE]": "ā
Phase 1: Video structure ready",
"[PHASE 2: IMPLEMENTATION PLANS]": "šØ Phase 2: Planning remaining scenes",
"[PHASE 2 COMPLETE]": "ā
Phase 2: All scenes planned",
"[PHASE 3: CODE GENERATION & RENDERING (SCENE-BY-SCENE)]": "š» Phase 3: Rendering remaining scenes",
"[PHASE 3 COMPLETE]": "ā
Phase 3: All scenes rendered",
"PIPELINE FINISHED FOR TOPIC:": "ā
Continuation complete for:",
"scene outline saved": "ā
Video structure saved",
"Loaded existing scene outline": "ā
Using existing video structure",
"Loaded existing topic session ID": "",
"Total Cost:": "š° Cost:",
"Already completed scenes:": "ā
Already rendered:",
"Scenes with plans ready:": "ā
Already planned:",
"Scenes to render:": "ā³ Remaining to render:",
"Already rendered, skipping": "ā
Already done, skipping",
"Finished generating missing plans": "ā
All scene plans ready",
"==> Generating scene outline": "š Starting: Generating video structure",
"==> Scene outline generated": "ā
Finished: Video structure generated",
"==> Generating scene implementations": "šØ Starting: Planning scene details",
"==> All concurrent scene implementations generated": "ā
Finished: All scene details planned",
"==> Preparing to render": "š» Starting: Scene rendering",
"Starting concurrent processing": "š» Processing scenes concurrently",
"<== All scene processing tasks completed": "ā
Finished: All scenes processed",
}
for old, new in replacements.items():
if old in cleaned_text:
cleaned_text = cleaned_text.replace(old, new)
import re
scenes_match = re.search(r'Found (\d+) scenes? in outline', cleaned_text)
if scenes_match:
num_scenes = scenes_match.group(1)
scene_word = "scene" if num_scenes == "1" else "scenes"
cleaned_text = f"š Video has {num_scenes} {scene_word}"
scene_prefix_match = re.search(r'\[([^\]]+) \| Scene (\d+)\]', cleaned_text)
if scene_prefix_match:
scene_num = scene_prefix_match.group(2)
cleaned_text = re.sub(r'\[([^\]]+) \| Scene (\d+)\]', f'Scene {scene_num}:', cleaned_text)
cleaned_text = re.sub(r'\[([^\]]+)\]', '', cleaned_text).strip()
if "======================================================" in cleaned_text or not cleaned_text.strip():
return
with progress_lock:
self.buffer_list.append(cleaned_text)
def flush(self):
self.original_stream.flush()
def run_generation_sync():
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
sys.stdout = LogCapture(old_stdout, log_buffer)
sys.stderr = LogCapture(old_stderr, log_buffer)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
video_generator.generate_video_pipeline(
topic,
description,
max_retries=app_state["max_retries"],
progress_callback=progress_callback
)
)
loop.close()
except Exception as e:
generation_error[0] = e
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
generation_complete.set()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
generation_future = executor.submit(run_generation_sync)
heartbeat_counter = 0
last_log_count = 0
last_progress_text = ""
while not generation_complete.is_set():
await asyncio.sleep(0.5)
heartbeat_counter += 1
with progress_lock:
progress_bar_continue.value = current_progress["value"]
# Update progress label with elapsed time
elapsed = heartbeat_counter // 2
if current_progress["text"]:
progress_label_continue.set_text(f"{current_progress['text']} ({elapsed}s)")
else:
progress_label_continue.set_text(f"ā³ Working... ({elapsed}s)")
# Only push new log messages, not repeated status updates
if len(log_buffer) > last_log_count:
for log_line in log_buffer[last_log_count:]:
log_output_continue.push(log_line)
last_log_count = len(log_buffer)
last_progress_text = current_progress["text"]
if generation_error[0]:
raise generation_error[0]
generation_future.result()
with progress_lock:
if len(log_buffer) > last_log_count:
for log_line in log_buffer[last_log_count:]:
log_output_continue.push(log_line)
progress_label_continue.set_text("šļø Finalizing project...")
progress_bar_continue.value = 0.9
# Check if any scenes failed
has_failures = len(video_generator.failed_scenes) > 0
if has_failures:
log_output_continue.push(f"\nā ļø Generation completed with {len(video_generator.failed_scenes)} failed scene(s). Finalizing...")
else:
log_output_continue.push("\nā
All scenes generated successfully! Finalizing...")
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, combine_videos, topic)
progress_bar_continue.value = 1.0
if has_failures:
progress_label_continue.set_text(f"ā ļø Project completed with {len(video_generator.failed_scenes)} failed scene(s)")
log_output_continue.push(f"ā ļø Project finalized with {len(video_generator.failed_scenes)} scene(s) failed. Check logs for details.")
ui.notify(f"ā ļø Project '{topic}' completed with failures. Check logs.", color="warning", icon="warning")
else:
progress_label_continue.set_text("ā
Project complete!")
log_output_continue.push("ā
Project finalized successfully!")
if result == "success":
ui.notify(f"ā
Project '{topic}' completed successfully!", color="positive", icon="check_circle")
elif result == "already_exists":
ui.notify(f"ā¹ļø Project '{topic}' already finalized", color="info", icon="info")
elif result == "no_scenes":
ui.notify(f"ā ļø No rendered scenes found for '{topic}'", color="warning", icon="warning")
await asyncio.sleep(2)
progress_container_continue.style("display: none;")
await update_dashboard()
except Exception as e:
progress_label_continue.set_text(f"ā Error: {str(e)[:50]}...")
progress_bar_continue.value = 0
log_output_continue.push(f"\nā An error occurred: {e}")
ui.notify(f"Error continuing project: {e}", color="negative", multi_line=True)
finally:
continue_button.enable()
continue_button.set_text("ā¶ļø Continue Project")
with util_content:
topic_folders_util = get_topic_folders("output")
if not topic_folders_util:
with ui.card().classes("w-full").style("padding: 60px; text-align: center;"):
ui.icon("folder_open", size="xl").classes("text-gray-400")
ui.label("No Projects Found").classes("text-2xl font-semibold text-gray-700 dark:text-gray-300 mt-4")
ui.label("Create a project first to use utilities").classes("text-gray-500 dark:text-gray-400 mt-2")
return
# Get project statuses
all_statuses = [video_generator.check_theorem_status({"theorem": th}) for th in topic_folders_util]
incomplete_projects = [s for s in all_statuses if not s["has_combined_video"]]
complete_projects = [s for s in all_statuses if s["has_combined_video"]]
# Continue Unfinished Projects Section
with ui.card().classes("w-full mb-4").style("padding: 24px;"):
with ui.row().classes("w-full items-center gap-3 mb-4"):
ui.icon("play_circle", size="lg").classes("text-primary")
with ui.column().classes("gap-1"):
ui.label("Continue Unfinished Projects").classes("text-xl font-bold text-gray-900 dark:text-white")
ui.label("Resume generation for incomplete projects").classes("text-sm text-gray-600 dark:text-gray-400")
if incomplete_projects:
continue_select = ui.select(
[p["topic"] for p in incomplete_projects],
label="Select Project to Continue"
).props("outlined").classes("w-full")
# Show project status
with ui.row().classes("w-full items-center gap-2 mt-2 mb-3"):
ui.icon("info", size="sm").classes("text-blue-500")
status_label = ui.label("Select a project to see its status").classes("text-sm text-gray-600 dark:text-gray-400")
def update_status():
if continue_select.value:
status = next((s for s in incomplete_projects if s["topic"] == continue_select.value), None)
if status:
rendered = status["rendered_scenes"]
total = status["total_scenes"]
status_label.set_text(f"Progress: {rendered}/{total} scenes rendered ({rendered/total*100:.0f}%)")
continue_select.on_value_change(lambda: update_status())
continue_button = ui.button(
"ā¶ļø Continue Project",
on_click=lambda: handle_continue(continue_select.value),
).props("unelevated no-caps").classes("w-full mt-2")
# Progress section for continue
progress_container_continue = ui.column().classes("w-full gap-3 mt-4").style("display: none;")
with progress_container_continue:
progress_label_continue = ui.label("Starting...").classes("text-sm font-medium text-gray-700 dark:text-gray-300")
progress_bar_continue = ui.linear_progress(value=0, show_value=False).props("color=primary size=8px").classes("w-full")
with ui.expansion("View Detailed Logs", icon="terminal").props("duration=600").classes("w-full mt-2"):
log_output_continue = ui.log().props("id=log_output_continue").classes(
"w-full h-64 bg-gray-900 dark:bg-black text-white font-mono rounded-lg"
)
else:
with ui.row().classes("w-full items-center gap-2 p-4 bg-green-50 dark:bg-green-900/20 rounded-lg"):
ui.icon("check_circle", size="md").classes("text-green-600")
ui.label("All projects are complete! š").classes("text-green-700 dark:text-green-300 font-medium")
# Regenerate Subtitles Section
with ui.card().classes("w-full mb-4").style("padding: 24px;"):
with ui.row().classes("w-full items-center gap-3 mb-4"):
ui.icon("movie_edit", size="lg").classes("text-primary")
with ui.column().classes("gap-1"):
ui.label("Re-Assemble Video & Subtitles (Fix Sync)").classes("text-xl font-bold text-gray-900 dark:text-white")
ui.label("Regenerate all audio, fix sync, and re-render full video.").classes("text-sm text-gray-600 dark:text-gray-400")
# Info box explaining the feature
with ui.expansion("ā¹ļø What does this do?", icon="help_outline").props("dense").classes("w-full mb-3"):
with ui.column().classes("gap-2 p-2"):
ui.label("This tool completely rebuilds the final video to ensure perfect synchronization.").classes("text-sm text-gray-700 dark:text-gray-300")
ui.separator()
ui.label("How it works:").classes("text-sm font-semibold text-gray-800 dark:text-gray-200")
ui.label("⢠Verifies and regenerates ALL voiceover audio files (including pauses)").classes("text-sm text-gray-600 dark:text-gray-400")
ui.label("⢠Reconstructs the entire audio track for millisecond-perfect subtitle match").classes("text-sm text-gray-600 dark:text-gray-400")
ui.label("⢠Stitches scene videos together with the new audio track").classes("text-sm text-gray-600 dark:text-gray-400")
ui.separator()
ui.label("Note: This completely rebuilds the final video file (visuals + audio).").classes("text-xs text-gray-500 dark:text-gray-400 italic")
if complete_projects:
regen_select = ui.select(
[p["topic"] for p in complete_projects],
label="Select Project to Fix"
).props("outlined").classes("w-full")
# Status indicator
with ui.row().classes("w-full items-center gap-2 mt-2 mb-3"):
ui.icon("info", size="sm").classes("text-blue-500")
regen_status_label = ui.label("Select a project to check subtitle status").classes("text-sm text-gray-600 dark:text-gray-400")
async def check_subtitle_status():
if regen_select.value:
project_path = get_project_path("output", regen_select.value)
inner_folder_name = os.path.basename(project_path)
# Check if already has sync fix applied
tcm_path = os.path.join(project_path, f"{inner_folder_name}_combined_tcm.json")
has_sync_fix = False
if os.path.exists(tcm_path):
try:
with open(tcm_path, "r", encoding="utf-8") as f:
tcm_data = json.load(f)
has_sync_fix = any(event.get("_sync_fix_applied", False) for event in tcm_data)
except:
pass
if has_sync_fix:
regen_status_label.set_text("ā Subtitles already use improved sync (you can still regenerate if needed)")
regen_status_label.classes(
remove="text-gray-600 dark:text-gray-400 text-orange-600 dark:text-orange-400 text-red-600 dark:text-red-400",
add="text-green-600 dark:text-green-400"
)
return
# Check for voiceover cache
has_cache = False
scene_dirs = sorted(
glob.glob(os.path.join(project_path, "scene*")),
key=lambda x: int(re.search(r"scene(\d+)", x).group(1)) if re.search(r"scene(\d+)", x) else 0
)
for scene_dir in scene_dirs:
cache_dirs = [
os.path.join(scene_dir, "code", "media", "voiceovers"),
os.path.join(project_path, "media", "voiceovers"),
]
for cache_dir in cache_dirs:
if os.path.exists(cache_dir) and glob.glob(os.path.join(cache_dir, "*.json")):
has_cache = True
break
if has_cache:
break
if has_cache:
regen_status_label.set_text("ā Old subtitle timing detected - regeneration recommended")
regen_status_label.classes(
remove="text-gray-600 dark:text-gray-400 text-green-600 dark:text-green-400 text-red-600 dark:text-red-400",
add="text-orange-600 dark:text-orange-400"
)
else:
regen_status_label.set_text("ā No cache found - will use fallback scaling (less accurate)")
regen_status_label.classes(
remove="text-gray-600 dark:text-gray-400 text-green-600 dark:text-green-400 text-red-600 dark:text-red-400",
add="text-orange-600 dark:text-orange-400"
)
regen_select.on_value_change(lambda: asyncio.create_task(check_subtitle_status()))
async def handle_regenerate_subtitles(topic):
if not topic:
ui.notify("Please select a project.", color="warning")
return
# Disable button and update UI
regen_button.disable()
regen_button.set_text("ā³ Re-Assembling...")
regen_status_container.style("display: block;")
# Beautiful processing status
regen_status_container.clear()
with regen_status_container:
with ui.card().classes("w-full bg-blue-50 dark:bg-blue-900/10 border-blue-200 dark:border-blue-800"):
with ui.row().classes("items-center gap-4"):
ui.spinner("dots", size="lg").classes("text-blue-600")
with ui.column().classes("gap-1"):
ui.label("Re-Assembling Project Constraints").classes("text-lg font-bold text-blue-800 dark:text-blue-200")
ui.label("Step 1: Verifying & Regenerating all audio assets...").classes("text-sm text-blue-600 dark:text-blue-400")
ui.label("Step 2: Reconstructing millisecond-perfect audio track...").classes("text-sm text-blue-600 dark:text-blue-400")
ui.label("Step 3: Stitching scenes & Re-rendering final video...").classes("text-sm text-blue-600 dark:text-blue-400")
ui.label("Please wait, this will take a moment.").classes("text-xs text-gray-500 italic mt-1")
try:
import concurrent.futures
def regenerate_subtitles_task():
"""Background task to regenerate subtitles AND re-render video"""
# Delete old subtitle files to force regeneration
project_path = get_project_path("output", topic)
inner_folder_name = os.path.basename(project_path)
files_to_delete = [
os.path.join(project_path, f"{inner_folder_name}_combined_tcm.json"),
os.path.join(project_path, f"{inner_folder_name}_combined.srt"),
os.path.join(project_path, f"{inner_folder_name}_combined.vtt"),
# We don't delete the video here, the function overwrites it safely
]
for file_path in files_to_delete:
if os.path.exists(file_path):
os.remove(file_path)
# Run the full re-assembly
result = regenerate_subtitles_only(topic)
return result
# Run in executor to avoid blocking UI
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, regenerate_subtitles_task)
# Update UI based on result (back in main event loop, context is restored)
regen_status_container.clear()
with regen_status_container:
if result == "success":
with ui.card().classes("w-full bg-green-50 dark:bg-green-900/10 border-green-200 dark:border-green-800"):
with ui.row().classes("items-center gap-4"):
ui.icon("check_circle", size="lg").classes("text-green-600")
with ui.column().classes("gap-1"):
ui.label("Re-Assembly Complete!").classes("text-lg font-bold text-green-800 dark:text-green-200")
ui.label("Video and subtitles have been perfectly synchronized.").classes("text-sm text-green-600 dark:text-green-400")
ui.notify(f"ā
Project '{topic}' successfully re-assembled!", color="positive", icon="check_circle")
elif result == "no_video":
ui.label("ā ļø Missing source video scenes").classes("text-orange-600")
ui.notify(f"ā ļø No source scenes found for '{topic}'.", color="warning")
else:
ui.label("ā Unknown error").classes("text-red-600")
ui.notify("Something went wrong.", color="negative")
except Exception as e:
regen_status_container.clear()
with regen_status_container:
ui.label(f"ā Error: {str(e)}").classes("text-red-600 font-bold")
ui.notify(f"Error re-assembling project: {e}", color="negative", multi_line=True)
finally:
# Re-enable button
regen_button.enable()
regen_button.set_text("š Re-Assemble Video")
with regen_status_container:
# Wrapping this too just in case update_dashboard needs context,
# though it likely has its own. But staying safe.
pass
await update_dashboard()
if app_state.get("current_topic_inspector") == topic:
await update_inspector(topic)
regen_button = ui.button(
"š Regenerate Subtitles",
on_click=lambda: asyncio.create_task(handle_regenerate_subtitles(regen_select.value)),
).props("unelevated no-caps").classes("w-full mt-2")
# Status container
regen_status_container = ui.column().classes("w-full gap-2 mt-3").style("display: none;")
with regen_status_container:
with ui.card().classes("w-full border-l-4 border-primary").style("padding: 16px; background: rgba(255, 75, 75, 0.05);"):
regen_status_label_progress = ui.label("Processing...").classes("text-sm font-medium text-gray-700 dark:text-gray-300")
else:
with ui.row().classes("w-full items-center gap-2 p-4 bg-orange-50 dark:bg-orange-900/20 rounded-lg"):
ui.icon("info", size="md").classes("text-orange-600")
ui.label("No completed projects found. Complete a project first to use this feature.").classes("text-orange-700 dark:text-orange-300 font-medium")
# --- Main Content Area (UI Definition) ---
with ui.column().classes("w-full max-w-6xl mx-auto gap-6").style("padding: 24px 16px;"):
with ui.tabs().classes("w-full mb-4") as main_tabs:
one = ui.tab("⨠Generate").props("no-caps")
two = ui.tab("š Dashboard").props("no-caps")
three = ui.tab("šŗ Project View").props("no-caps")
four = ui.tab("š§ Utilities").props("no-caps")
with ui.tab_panels(main_tabs, value=one).classes("w-full"):
with ui.tab_panel(one):
# Modern header with gradient background
with ui.card().classes("w-full shadow-lg border-0 mb-6").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"):
with ui.row().classes("w-full items-center gap-3"):
ui.icon("auto_awesome", size="xl").classes("text-white")
with ui.column().classes("gap-1"):
ui.label("Generate").classes("text-3xl font-bold text-white")
ui.label("Create educational videos with AI-powered animations").classes("text-white/80")
# Main Form
with ui.card().classes("w-full").style("padding: 32px;"):
with ui.column().classes("w-full gap-3"):
# Project Name and Tips Row
with ui.row().classes("w-full gap-4 items-end"):
# Project Name Section (left side)
with ui.column().classes("flex-grow gap-2"):
with ui.row().classes("items-center gap-2"):
ui.icon("label", size="sm").classes("text-primary")
ui.label("Project Name").classes("text-sm font-semibold text-gray-900 dark:text-white")
topic_input = (
ui.input(placeholder="e.g., Binary Search, Bubble Sort, Linked Lists...")
.props("outlined dense")
.classes("w-full")
)
topic_input.value = "Binary Search"
# Tips Card (right side) - aligned to bottom
with ui.card().classes("flex-shrink-0").style("padding: 10px 14px; background-color: #fffbeb; border: 1px solid #fef3c7; width: 320px;"):
with ui.row().classes("items-center gap-2"):
ui.icon("tips_and_updates", size="sm").classes("text-primary")
ui.label("More details = better video!").classes("text-xs font-medium")
# Description Section
with ui.column().classes("w-full").style("gap: 8px;"):
with ui.row().classes("items-center gap-2 justify-between w-full"):
with ui.row().classes("items-center gap-2"):
ui.icon("description", size="sm").classes("text-primary")
ui.label("Video Description").classes("text-sm font-semibold text-gray-900 dark:text-white")
auto_gen_button = (
ui.button("Auto-Generate", icon="auto_awesome")
.props("flat dense no-caps")
.classes("text-primary")
.style("font-size: 0.875rem;")
.tooltip("Generate description from project name using AI")
)
desc_input = (
ui.textarea(
placeholder="""Tips for better videos:
⢠Be specific about the algorithm and target audience level (beginner/intermediate/advanced)
⢠Mention if you want code examples, analogies, or visualizations
⢠Include the key concepts you want to emphasize
⢠Specify the programming language if showing code
⢠Mention any real-world applications or use cases"""
)
.props("outlined rows=15")
.classes("w-full")
.style("margin-bottom: 12px;")
)
desc_input.value = """Create a short video for beginners explaining Binary Search algorithm.
Target Audience: Complete beginners to algorithms
Key Concepts: Divide and conquer, sorted arrays, logarithmic time complexity
Content:
⢠Start with a sorted array example: [1, 3, 5, 7, 9, 11, 13]
⢠Show step-by-step how to find a target number (e.g., 7)
⢠Visualize checking the middle element
⢠Demonstrate eliminating half the array each time
⢠Compare with linear search to show efficiency
⢠End with time complexity: O(log n) vs O(n)
Style: Use simple animations, friendly tone, and a relatable analogy (like guessing a number between 1-100)"""
# Review Mode Option
with ui.row().classes("w-full items-center gap-2 mb-2"):
review_checkbox = ui.checkbox("Interactive Script Review (Pause after planning)").props("dense")
review_checkbox.tooltip("Generates plans and stops. You can review/edit the script in 'Project View' before rendering.")
ui.icon("edit_note", size="sm").classes("text-gray-500")
# Action Section - tight to textarea
with ui.row().classes("w-full justify-end"):
generate_button = (
ui.button("Generate Video", icon="play_circle")
.props("unelevated no-caps")
.classes("px-6")
.style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); color: white;")
)
# Progress Indicator (hidden by default)
progress_container = ui.column().classes("w-full gap-3 mt-6").style("display: none;")
with progress_container:
with ui.card().classes("w-full shadow-lg border-l-4 border-primary").style("padding: 24px;"):
with ui.column().classes("w-full gap-4"):
with ui.row().classes("w-full items-center gap-3"):
ui.spinner(size="lg", color="primary")
progress_label = ui.label("Generating video...").classes("text-lg font-semibold text-gray-900 dark:text-white")
progress_bar = ui.linear_progress(value=0, show_value=False).props('rounded color="primary"').style("height: 8px;")
ui.label("This may take several minutes depending on the complexity of your project.").classes("text-sm text-gray-600 dark:text-gray-400")
# --- TRIVIA CAROUSEL SECTION ---
trivia_container = ui.column().classes("w-full mt-2 hidden transition-all duration-500")
with trivia_container:
with ui.card().classes("w-full bg-gradient-to-r from-blue-50 to-indigo-50 dark:from-gray-800 dark:to-gray-700 border-none shadow-inner p-4"):
with ui.row().classes("w-full items-center gap-3"):
with ui.avatar(size="md").classes("bg-white text-blue-600 shadow-sm"): #Icon background
ui.icon("lightbulb", size="sm")
with ui.column().classes("gap-1 flex-grow"):
ui.label("Did You Know?").classes("text-xs font-bold text-blue-600 uppercase tracking-wide")
trivia_label = ui.label("Fetching interesting facts...").classes("text-sm font-medium text-gray-800 dark:text-gray-200 italic transition-opacity duration-300")
# -------------------------------
# Expandable log section
with ui.expansion("View Detailed Logs", icon="terminal").props("duration=600").classes("w-full mt-2"):
log_output = (
ui.log()
.props("id=log_output")
.classes(
"w-full h-64 bg-gray-900 dark:bg-black text-white font-mono rounded-lg"
)
)
# Define the auto-generate description handler
async def handle_auto_generate_description():
topic = topic_input.value
if not topic or not topic.strip():
ui.notify("Please enter a Project Name first.", color="warning")
return
# Create a dialog to ask for difficulty level
difficulty_level = None
with ui.dialog().props('persistent') as difficulty_dialog, ui.card().classes("w-full max-w-md"):
# Header with icon
with ui.row().classes("items-center gap-3 p-6 pb-4 border-b border-gray-200 dark:border-gray-700"):
ui.icon("school", size="md").classes("text-primary")
with ui.column().classes("gap-1"):
ui.label("Select Difficulty Level").classes("text-xl font-bold text-gray-900 dark:text-white")
ui.label("Choose the target audience for your video").classes("text-sm text-gray-600 dark:text-gray-400")
difficulty_options = [
{
"label": "Beginner",
"value": "beginner",
"icon": "lightbulb",
"color": "green",
"desc": "Perfect for newcomers with no prior knowledge"
},
{
"label": "Intermediate",
"value": "intermediate",
"icon": "trending_up",
"color": "blue",
"desc": "For learners with basic understanding"
},
{
"label": "Advanced",
"value": "advanced",
"icon": "psychology",
"color": "purple",
"desc": "Deep dive for experts and professionals"
},
]
selected_difficulty = {"value": "intermediate"} # Default
option_cards = []
# Options container
with ui.column().classes("gap-3 p-6"):
for option in difficulty_options:
with ui.card().classes(
"p-4 cursor-pointer transition-all duration-200 "
"border-2 border-transparent hover:border-primary/50 "
"hover:shadow-lg"
).style("border-radius: 12px") as option_card:
with ui.row().classes("items-center gap-4 w-full"):
# Icon with colored background
with ui.element("div").classes(
f"flex items-center justify-center w-12 h-12 rounded-full "
f"bg-{option['color']}-100 dark:bg-{option['color']}-900/30"
):
ui.icon(option["icon"], size="sm").classes(f"text-{option['color']}-600 dark:text-{option['color']}-400")
# Text content
with ui.column().classes("flex-1 gap-1"):
ui.label(option["label"]).classes("text-base font-semibold text-gray-900 dark:text-white")
ui.label(option["desc"]).classes("text-sm text-gray-600 dark:text-gray-400")
# Radio indicator
ui.icon("radio_button_unchecked", size="sm").classes("text-gray-400").bind_visibility_from(
selected_difficulty, "value",
backward=lambda v, opt=option: v != opt["value"]
)
ui.icon("check_circle", size="sm").classes("text-primary").bind_visibility_from(
selected_difficulty, "value",
backward=lambda v, opt=option: v == opt["value"]
)
# Store card reference and make clickable
option_cards.append((option_card, option))
def make_click_handler(opt):
def handler():
selected_difficulty["value"] = opt["value"]
# Update card styles
for card, card_opt in option_cards:
if card_opt["value"] == opt["value"]:
card.classes(remove="border-transparent", add="border-primary")
else:
card.classes(remove="border-primary", add="border-transparent")
return handler
option_card.on("click", make_click_handler(option))
# Set initial selected state
if option["value"] == "intermediate":
option_card.classes(remove="border-transparent", add="border-primary")
# Action buttons
with ui.row().classes("gap-3 p-6 pt-4 border-t border-gray-200 dark:border-gray-700 justify-end w-full"):
ui.button("Cancel", icon="close", on_click=lambda: difficulty_dialog.close()).props("flat outline").classes("px-4")
ui.button(
"Generate Description",
icon="auto_awesome",
on_click=lambda: [
difficulty_dialog.submit(selected_difficulty["value"])
]
).props("unelevated").classes("px-6 bg-primary text-white")
# Show the dialog and wait for user selection
difficulty_level = await difficulty_dialog
if not difficulty_level:
return # User cancelled
# Disable button and show loading state
auto_gen_button.disable()
auto_gen_button.props("loading")
ui.notify(f"Generating {difficulty_level} level description...", color="info")
try:
# Run LLM call in executor to keep UI responsive
import concurrent.futures
def generate_description():
# Use the planner model to generate description
llm = LiteLLMWrapper(
model_name=app_state["planner_model_name"],
temperature=0.7,
print_cost=False,
verbose=False,
use_langfuse=False
)
# Map difficulty to audience description
audience_map = {
"beginner": "beginners with no prior knowledge, using simple language and basic concepts",
"intermediate": "learners with basic understanding, building on foundational knowledge",
"advanced": "experts and advanced learners, covering complex details and nuances"
}
audience_desc = audience_map.get(difficulty_level, audience_map["intermediate"])
# Create prompt that produces clean output without asterisks or preambles
prompt = f"""Generate a detailed video description for an educational video about "{topic}".
IMPORTANT INSTRUCTIONS:
- Start directly with the content, NO preambles like "Here is..." or "Of course!"
- Use plain text formatting with line breaks, NOT markdown asterisks or bold
- Use bullet points with ⢠symbol, not asterisks
- Be specific, include concrete examples
- Make it educational and engaging
- Target audience level: {difficulty_level.upper()} - {audience_desc}
REQUIRED FORMAT:
Create a short video for {audience_desc} explaining {topic}.
Target Audience: [Specify {difficulty_level} level audience and their background]
Key Concepts: [List 2-4 main concepts appropriate for {difficulty_level} level]
Content:
⢠[First key point with specific example suitable for {difficulty_level} level]
⢠[Second key point with visualization approach]
⢠[Third key point showing step-by-step process]
⢠[Fourth key point with comparison or application]
⢠[Final point with complexity or summary appropriate for {difficulty_level} level]
Style: [Describe the teaching approach and tone for {difficulty_level} learners]
Now generate the description for "{topic}" following this exact format."""
messages = [{"type": "text", "content": prompt}]
return llm(messages, metadata={})
# Run in thread pool to avoid blocking UI
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
generated_desc = await loop.run_in_executor(executor, generate_description)
if generated_desc and not generated_desc.startswith("Error"):
# Clean up any remaining asterisks or unwanted formatting
generated_desc = generated_desc.strip()
# Remove common preambles
preambles = [
"Of course! Here is",
"Here is a detailed",
"Here's a detailed",
"Sure! Here is",
"Certainly! Here is",
]
for preamble in preambles:
if generated_desc.startswith(preamble):
# Find the first newline after preamble and start from there
first_newline = generated_desc.find('\n')
if first_newline > 0:
generated_desc = generated_desc[first_newline:].strip()
break
desc_input.value = generated_desc
ui.notify(f"{difficulty_level.capitalize()} level description generated successfully!", color="positive")
else:
ui.notify("Failed to generate description. Please try again.", color="negative")
except Exception as e:
print(f"Error generating description: {e}")
ui.notify(f"Error: {str(e)}", color="negative")
finally:
# Re-enable button
auto_gen_button.enable()
auto_gen_button.props(remove="loading")
# Connect the auto-generate button handler
auto_gen_button.on_click(handle_auto_generate_description)
# Define the generate handler now that UI elements exist
async def handle_generate():
topic = topic_input.value
description = desc_input.value
review_mode = review_checkbox.value
if not topic or not description:
ui.notify(
"Please provide both a Project Name and Description.", color="warning"
)
return
# Check if project already exists
project_path = get_project_path("output", topic)
is_existing = os.path.exists(project_path)
# Disable button and show progress
generate_button.disable()
progress_container.style("display: block;")
progress_bar.set_visibility(True)
progress_label.set_text("š Starting generation...")
progress_bar.value = 0
log_output.clear()
# Reset trivia state
trivia_container.classes(remove="hidden")
trivia_label.set_text("Thinking of interesting facts...")
# Start background trivia generation
async def generate_and_cycle_trivia():
facts = [
"Algorithm visualization helps in understanding complex logic by 70%.",
"Python's Timsort is a hybrid sorting algorithm, derived from merge sort and insertion sort.",
"Manim was originally created by 3Blue1Brown to explain math concepts.",
"Big O notation frames the upper bound of an algorithm's running time.",
"The first computer algorithm was written by Ada Lovelace in the mid-1800s."
]
try:
# Quick LLM call for custom trivia
llm = LiteLLMWrapper(model_name=app_state["planner_model_name"], temperature=0.7)
prompt = f"Generate 5 short, interesting, one-sentence facts or tips about {topic} or computer science. Return ONLY the facts, one per line."
response = await asyncio.get_event_loop().run_in_executor(None, lambda: llm([{"type": "text", "content": prompt}]))
if response:
new_facts = [line.strip() for line in response.split('\n') if line.strip() and not line.strip().startswith(('Here', 'Sure'))]
if len(new_facts) >= 3:
facts = new_facts
except Exception as e:
print(f"Trivia gen failed, using defaults: {e}")
i = 0
while progress_container.visible: # Keep cycling while visible
trivia_label.set_text(facts[i % len(facts)])
# Animate transition
trivia_label.classes(add="opacity-0")
await asyncio.sleep(0.5)
trivia_label.classes(remove="opacity-0")
i += 1
# Wait before next fact
for _ in range(80): # 8 seconds, checking visibility every 0.1s
if not progress_container.visible: return
await asyncio.sleep(0.1)
asyncio.create_task(generate_and_cycle_trivia())
if is_existing:
log_output.push(f"š Resuming existing project: '{topic}'")
log_output.push(f"ā
Checking project status and continuing from where it left off...")
else:
log_output.push(f"š Starting new project: '{topic}'")
log_output.push(f"š Initializing video generation pipeline...")
try:
# Don't modify output_dir - generate_video.py handles folder structure
# Update progress: Planning phase
progress_label.set_text("š Planning video structure...")
progress_bar.value = 0.1
await asyncio.sleep(0.1) # Allow UI to update
# Start generation in background thread to keep UI responsive
import concurrent.futures
import threading
import sys
from io import StringIO
# Shared state for progress updates
generation_complete = threading.Event()
generation_error = [None]
current_progress = {"value": 0.1, "text": "Starting..."}
log_buffer = [] # Buffer for log messages
progress_lock = threading.Lock()
def progress_callback(value, text):
"""Thread-safe progress callback"""
with progress_lock:
current_progress["value"] = value
current_progress["text"] = text
class LogCapture:
"""Capture stdout/stderr and store in buffer"""
def __init__(self, original_stream, buffer_list):
self.original_stream = original_stream
self.buffer_list = buffer_list
self.skip_patterns = [
"Langfuse client is disabled",
"LANGFUSE_PUBLIC_KEY",
"No video folders found",
"langfuse.com/docs",
"See our docs:",
"==> Loading existing implementation plans",
"<== Finished loading plans",
"Found: 0, Missing:",
"Generating missing implementation plans for scenes:",
"Loaded existing topic session ID:",
"Saved topic session ID to",
]
def write(self, text):
self.original_stream.write(text) # Still print to console
if text.strip(): # Only add non-empty lines
# Filter out unnecessary technical messages
should_skip = any(pattern in text for pattern in self.skip_patterns)
if not should_skip:
# Transform technical messages to user-friendly ones
cleaned_text = text.rstrip()
# Replace technical terms
replacements = {
"STARTING VIDEO PIPELINE FOR TOPIC:": "š Starting video generation for:",
"[PHASE 1: SCENE OUTLINE]": "š Phase 1: Planning video structure",
"[PHASE 1 COMPLETE]": "ā
Phase 1: Video structure ready",
"[PHASE 2: IMPLEMENTATION PLANS]": "šØ Phase 2: Designing all scenes",
"[PHASE 2 COMPLETE]": "ā
Phase 2: All scenes designed",
"[PHASE 3: CODE GENERATION & RENDERING (SCENE-BY-SCENE)]": "š¬ Phase 3: Rendering all scenes",
"[PHASE 3 COMPLETE]": "ā
Phase 3: All scenes rendered",
"PIPELINE FINISHED FOR TOPIC:": "ā
Video generation complete for:",
"scene outline saved": "ā
Video structure saved",
"Loaded existing scene outline": "ā
Using existing video structure",
"Loaded existing topic session ID": "",
"Total Cost:": "š° Cost:",
"==> Generating scene outline": "š Starting: Generating video structure",
"==> Scene outline generated": "ā
Finished: Video structure generated",
"==> Generating scene implementations": "šØ Starting: Planning scene details",
"==> All concurrent scene implementations generated": "ā
Finished: All scene details planned",
"==> Preparing to render": "š» Starting: Scene rendering",
"Starting concurrent processing": "š» Processing scenes concurrently",
"<== All scene processing tasks completed": "ā
Finished: All scenes processed",
}
# Apply replacements
for old, new in replacements.items():
if old in cleaned_text:
cleaned_text = cleaned_text.replace(old, new)
# Handle "Found X scenes in outline" dynamically
import re
scenes_match = re.search(r'Found (\d+) scenes? in outline', cleaned_text)
if scenes_match:
num_scenes = scenes_match.group(1)
scene_word = "scene" if num_scenes == "1" else "scenes"
cleaned_text = f"š Video has {num_scenes} {scene_word}"
# Clean up scene prefixes like "[Binary Search | Scene 2]"
scene_prefix_match = re.search(r'\[([^\]]+) \| Scene (\d+)\]', cleaned_text)
if scene_prefix_match:
scene_num = scene_prefix_match.group(2)
cleaned_text = re.sub(r'\[([^\]]+) \| Scene (\d+)\]', f'Scene {scene_num}:', cleaned_text)
# Clean up topic prefixes like "[Binary Search]"
cleaned_text = re.sub(r'\[([^\]]+)\]', '', cleaned_text).strip()
# Remove excessive equals signs and empty messages
if "======================================================" in cleaned_text or not cleaned_text.strip():
return
with progress_lock:
self.buffer_list.append(cleaned_text)
def flush(self):
self.original_stream.flush()
def run_generation_sync():
"""Run generation in a separate thread"""
# Capture stdout/stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
# Redirect output to our capture
sys.stdout = LogCapture(old_stdout, log_buffer)
sys.stderr = LogCapture(old_stderr, log_buffer)
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Run the generation pipeline with progress callback
loop.run_until_complete(
video_generator.generate_video_pipeline(
topic,
description,
max_retries=app_state["max_retries"],
only_plan=review_mode,
progress_callback=progress_callback
)
)
loop.close()
except Exception as e:
generation_error[0] = e
finally:
# Restore original stdout/stderr
sys.stdout = old_stdout
sys.stderr = old_stderr
generation_complete.set()
# Start generation in background thread
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
generation_future = executor.submit(run_generation_sync)
heartbeat_counter = 0
last_log_count = 0
last_progress_text = ""
# Keep UI responsive and update progress from callback
while not generation_complete.is_set():
await asyncio.sleep(0.5) # Check twice per second for smoother updates
heartbeat_counter += 1
# Update UI with actual progress from pipeline
with progress_lock:
progress_bar.value = current_progress["value"]
# Update progress label with elapsed time
elapsed = heartbeat_counter // 2
if current_progress["text"]:
progress_label.set_text(f"{current_progress['text']} ({elapsed}s)")
else:
progress_label.set_text(f"ā³ Working... ({elapsed}s)")
# Push new log messages to UI
if len(log_buffer) > last_log_count:
for log_line in log_buffer[last_log_count:]:
log_output.push(log_line)
last_log_count = len(log_buffer)
last_progress_text = current_progress["text"]
# Check for errors
if generation_error[0]:
raise generation_error[0]
# Wait for thread to fully complete
generation_future.result()
# Push any remaining logs
with progress_lock:
if len(log_buffer) > last_log_count:
for log_line in log_buffer[last_log_count:]:
log_output.push(log_line)
# Handle "Plan Only" / Review Mode completion
if review_mode:
progress_bar.value = 1.0
progress_label.set_text("ā
Planning Complete! Ready for Review.")
log_output.push("\nā
Video plans generated successfully!")
log_output.push("ā¹ļø Go to 'Project View' tab to review and edit the script before rendering.")
ui.notify(f"Planning complete for '{topic}'. Please review in Project View.", color="positive", icon="edit_note", timeout=0)
# Help user by enabling buttons
generate_button.enable()
# Add a button to go to review directly
with progress_container:
ui.button("Go to Review", icon="arrow_forward", on_click=lambda: [
main_tabs.set_value("šŗ Project View"),
# We can't easily auto-select the topic in the other tab without more state management,
# but the user can select it.
# Actually, let's try to set it:
app_state.update({"current_topic_inspector": topic}),
# Trigger update (this is async, so we wrap it)
asyncio.create_task(update_inspector(topic))
]).props("unelevated no-caps").classes("mt-4 bg-secondary text-white")
return
progress_label.set_text("šļø Finalizing project...")
progress_bar.value = 0.9 # Finalization phase
progress_label.set_text("šļø Combining videos and creating subtitles...")
progress_bar.value = 0.9
# Check if any scenes failed
has_failures = len(video_generator.failed_scenes) > 0
if has_failures:
log_output.push(f"\nā ļø Pipeline completed with {len(video_generator.failed_scenes)} failed scene(s). Finalizing project...")
else:
log_output.push("\nā
Pipeline completed! Finalizing project...")
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, combine_videos, topic)
# Complete
progress_bar.value = 1.0
if has_failures:
progress_label.set_text(f"ā ļø Video generation completed with {len(video_generator.failed_scenes)} failed scene(s)")
log_output.push(f"ā ļø Project finalized with {len(video_generator.failed_scenes)} scene(s) failed. Check logs for details.")
ui.notify(
f"ā ļø Video for '{topic}' completed with failures. Check logs.",
color="warning",
icon="warning",
)
else:
progress_label.set_text("ā
Video generation complete!")
log_output.push("ā
Project finalized successfully!")
if result == "success":
ui.notify(
f"Video for '{topic}' generated!",
color="positive",
icon="check_circle",
)
elif result == "already_exists":
ui.notify(f"ā¹ļø Video for '{topic}' already finalized", color="info", icon="info")
elif result == "no_scenes":
ui.notify(f"ā ļø No rendered scenes found for '{topic}'", color="warning", icon="warning")
# Hide progress after 2 seconds
await asyncio.sleep(2)
progress_container.style("display: none;")
except Exception as e:
progress_label.set_text(f"ā Error: {str(e)[:50]}...")
progress_bar.value = 0
log_output.push(f"\nā An error occurred: {e}")
ui.notify(f"An error occurred: {e}", color="negative", multi_line=True)
finally:
generate_button.enable()
await update_dashboard()
# Connect the button to the handler
generate_button.on_click(handle_generate)
with ui.tab_panel(two):
with ui.column().classes("w-full gap-6"):
# Modern header with gradient background
with ui.card().classes("w-full shadow-lg border-0").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"):
with ui.row().classes("w-full items-center justify-between"):
with ui.row().classes("items-center gap-3"):
ui.icon("dashboard", size="xl").classes("text-white")
with ui.column().classes("gap-1"):
ui.label("Dashboard").classes("text-3xl font-bold text-white")
ui.label("Monitor your projects and track progress").classes("text-white/80")
ui.button("Refresh", on_click=update_dashboard, icon="refresh").props(
"flat round"
).classes("text-white").tooltip("Refresh dashboard")
dashboard_content = ui.column().classes("w-full gap-4 mt-4")
with ui.tab_panel(three):
with ui.column().classes("w-full gap-6"):
# Modern header with gradient background
with ui.card().classes("w-full shadow-lg border-0").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"):
with ui.row().classes("w-full items-center gap-3"):
ui.icon("tv", size="xl").classes("text-white")
with ui.column().classes("gap-1"):
ui.label("Project View").classes("text-3xl font-bold text-white")
ui.label("Watch videos, explore code, and learn with AI").classes("text-white/80")
topic_folders = get_topic_folders("output")
default_topic = app_state.get("selected_topic") or (
topic_folders[0] if topic_folders else None
)
# Project selector with modern styling
with ui.card().classes("w-full shadow-md"):
with ui.row().classes("w-full items-center gap-3 p-4"):
ui.icon("folder_open", size="lg").classes("text-primary")
inspector_select = (
ui.select(
topic_folders,
label="Select a Project",
value=default_topic,
on_change=lambda e: update_inspector(e.value),
)
.props("outlined")
.classes("flex-grow text-lg")
.style("font-size: 1.125rem; min-height: 56px;")
)
inspector_content = ui.column().classes("w-full gap-6 mt-4")
with ui.tab_panel(four):
with ui.column().classes("w-full gap-6"):
# Modern header with gradient background
with ui.card().classes("w-full shadow-lg border-0").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"):
with ui.row().classes("w-full items-center gap-3"):
ui.icon("build_circle", size="xl").classes("text-white")
with ui.column().classes("gap-1"):
ui.label("Utilities").classes("text-3xl font-bold text-white")
ui.label("Continue and manage your video projects").classes("text-white/80")
util_content = ui.column().classes("w-full mt-4")
# --- Define inspect_project after all UI elements are created ---
async def inspect_project(topic_name):
app_state["selected_topic"] = topic_name
inspector_select.set_value(topic_name)
await update_inspector(topic_name)
main_tabs.set_value("šŗ Project View")
# --- Initial UI State Population ---
await update_dashboard()
# Initialize inspector with default topic if available
topic_folders = get_topic_folders("output")
default_topic = app_state.get("selected_topic") or (
topic_folders[0] if topic_folders else None
)
if default_topic:
app_state["selected_topic"] = default_topic
await update_inspector(default_topic)
update_util_tab()
# Run the app with increased WebSocket timeout to prevent disconnections during long operations
ui.run(
title='AlgoVision',
port=int(os.environ.get('PORT', 8080)),
reconnect_timeout=3600.0, # 5 minutes timeout instead of default 30 seconds
reload=False, # Disable auto-reload to prevent interruption of Manim rendering
)