#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Configure UTF-8 encoding for stdout/stderr on Windows import sys import io if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') from nicegui import app, ui, events import asyncio import warnings import logging # Suppress Windows asyncio ConnectionResetError warnings (harmless cleanup warnings) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # Suppress specific asyncio warnings warnings.filterwarnings('ignore', category=ResourceWarning) logging.getLogger('asyncio').setLevel(logging.ERROR) import textwrap import json import os import re from io import StringIO import glob from types import SimpleNamespace, MethodType from pathlib import Path # --- Imports from the original script --- from moviepy import VideoFileClip, concatenate_videoclips import os import json import uuid import glob import re from dotenv import load_dotenv # Assuming these are in a 'src' directory relative to the script from src.config.config import Config from utils.litellm import LiteLLMWrapper from src.core.video_planner import VideoPlanner from src.core.code_generator import CodeGenerator from src.core.video_renderer import VideoRenderer from src.utils.utils import extract_xml from src.utils.error_recovery import ErrorRecovery from prompts import get_banned_reasonings from prompts.prompts_raw import ( _code_font_size, _code_disable, _code_limit, _prompt_manim_cheatsheet, ) # Load allowed models list from JSON file allowed_models_path = os.path.join( os.path.dirname(__file__), "src", "utils", "models.json" ) with open(allowed_models_path, "r", encoding="utf-8") as f: allowed_models = json.load(f).get("allowed_models", []) load_dotenv(override=True) # --- App State (Replaces st.session_state) --- app_state = { "log_stream": StringIO(), "selected_topic": None, "current_topic_inspector": None, # For the inspector tab "latest_pause_time": 0.0, "current_tcm_entry": None, "chat_histories": {}, # one history per topic "planner_model_name": next( (m for m in allowed_models if "gemini" in m), allowed_models[0] ), "max_retries": 5, "max_scene_concurrency": 5, } # --- All Helper Functions from the original script (remain unchanged) --- # --- (This section is collapsed for brevity, it's identical to your script) --- def get_topic_folders(output_dir): """ Finds and returns the list of high-level topic names (e.g., "Bubble Sort"). A valid topic folder must contain at least one run subfolder, which in turn must contain a scene outline. """ if not os.path.exists(output_dir): return [] valid_topics = set() for top_level_dir in os.listdir(output_dir): top_level_path = os.path.join(output_dir, top_level_dir) if os.path.isdir(top_level_path): # Check inside this folder for run folders (e.g., "bubble_sort") for inner_item in os.listdir(top_level_path): inner_path = os.path.join(top_level_path, inner_item) # If there's a sub-directory containing an outline, the top-level is a valid topic if os.path.isdir(inner_path) and glob.glob( os.path.join(inner_path, "*_scene_outline.txt") ): valid_topics.add( top_level_dir ) # Add the human-readable name, e.g., "Bubble Sort" break # Found a valid run, no need to check other runs in this topic folder return sorted(list(valid_topics)) def get_project_path(output_dir, topic_name): """ Gets the path to the specific run folder (e.g., "output/Bubble Sort/bubble_sort"). It finds the first sub-directory within the main topic folder. """ top_level_path = os.path.join(output_dir, topic_name) if not os.path.isdir(top_level_path): # This case handles when the topic folder itself hasn't been created yet. return top_level_path # Find the first subdirectory inside the topic folder (e.g., "bubble_sort") for item in os.listdir(top_level_path): potential_path = os.path.join(top_level_path, item) if os.path.isdir(potential_path): # Return the path to the inner run folder return potential_path # Fallback if no inner run folder is found (shouldn't happen for valid topics) return top_level_path def safe_read_file(path, clean=True): try: with open(path, "r", encoding="utf-8") as f: content = f.read() if not clean: return content patterns_to_remove = [ r"", r"", r"", r"# Scene \d+ Implementation Plan", r"\[SCENE_VISION\]", r"\[STORYBOARD\]", r"\[ANIMATION_STRATEGY\]", r"\[NARRATION\]", r"\[ANIMATION:.*?\]", ] for pattern in patterns_to_remove: content = re.sub(pattern, "", content) return content.strip() except FileNotFoundError: return f"File not found at: {path}" except Exception as e: return f"Error reading file: {e}" def get_scene_implementation_context(output_dir, topic, scene_number): """ Retrieves scene implementation details including code, visual plan, technical implementation, and animation narration for a specific scene. Returns a dictionary with all available implementation details. """ project_path = get_project_path(output_dir, topic) file_prefix = os.path.basename(project_path) scene_dir = os.path.join(project_path, f"scene{scene_number}") context = { "scene_number": scene_number, "implementation_plan": None, "code": None, "visual_plan": None, "technical_plan": None, "animation_narration": None, } # Load implementation plan plan_path = os.path.join(scene_dir, f"{file_prefix}_scene{scene_number}_implementation_plan.txt") if os.path.exists(plan_path): context["implementation_plan"] = safe_read_file(plan_path, clean=False) # Extract specific sections from implementation plan plan_content = context["implementation_plan"] # Extract visual storyboard visual_match = re.search( r'(.*?)', plan_content, re.DOTALL ) if visual_match: context["visual_plan"] = visual_match.group(1).strip() # Extract technical implementation tech_match = re.search( r'(.*?)', plan_content, re.DOTALL ) if tech_match: context["technical_plan"] = tech_match.group(1).strip() # Extract animation narration anim_match = re.search( r'(.*?)', plan_content, re.DOTALL ) if anim_match: context["animation_narration"] = anim_match.group(1).strip() # Load generated code code_dir = os.path.join(scene_dir, "code") if os.path.exists(code_dir): code_files = [f for f in os.listdir(code_dir) if f.endswith('.py')] if code_files: # Get the latest version code_files.sort(reverse=True) code_path = os.path.join(code_dir, code_files[0]) context["code"] = safe_read_file(code_path, clean=False) return context def extract_scene_number_from_concept_id(concept_id): """ Extracts scene number from concept ID. Example: "Bubble_Sort.scene_1.array_initialization" -> 1 """ match = re.search(r'scene_(\d+)', concept_id) if match: return int(match.group(1)) return None def check_status(self, theorem: dict): topic = theorem["theorem"] project_path = get_project_path(self.output_dir, topic) inner_folder_name = os.path.basename(project_path) file_prefix = inner_folder_name scene_outline_path = os.path.join(project_path, f"{file_prefix}_scene_outline.txt") has_scene_outline = os.path.exists(scene_outline_path) num_scenes = 0 if has_scene_outline: with open(scene_outline_path, "r") as f: scene_outline = f.read() scene_outline_content = extract_xml(scene_outline, "SCENE OUTLINE") num_scenes = len(re.findall(r"[^<]", scene_outline_content)) implementation_plans, code_files, rendered_scenes = 0, 0, 0 scene_status = [] for i in range(1, num_scenes + 1): scene_dir = os.path.join(project_path, f"scene{i}") plan_path = os.path.join( scene_dir, f"{file_prefix}_scene{i}_implementation_plan.txt" ) has_plan = os.path.exists(plan_path) if has_plan: implementation_plans += 1 code_dir = os.path.join(scene_dir, "code") has_code = os.path.exists(code_dir) and any( f.endswith(".py") for f in os.listdir(code_dir) ) if has_code: code_files += 1 has_render = os.path.exists(os.path.join(scene_dir, "succ_rendered.txt")) if has_render: rendered_scenes += 1 scene_status.append( { "scene_number": i, "has_plan": has_plan, "has_code": has_code, "has_render": has_render, } ) has_combined_video = os.path.exists( os.path.join(project_path, f"{file_prefix}_combined.mp4") ) return { "topic": topic, "has_scene_outline": has_scene_outline, "total_scenes": num_scenes, "implementation_plans": implementation_plans, "code_files": code_files, "rendered_scenes": rendered_scenes, "has_combined_video": has_combined_video, "scene_status": scene_status, } def set_active_output_dir(self, new_output_dir): self.output_dir = new_output_dir self.planner.output_dir = new_output_dir self.code_generator.output_dir = new_output_dir self.video_renderer.output_dir = new_output_dir def load_voices(file_path="src/tts/voices.json"): if not os.path.exists(file_path): return [] try: with open(file_path, "r", encoding="utf-8") as f: voices = json.load(f) return [v for v in voices if "id" in v and "name" in v] except Exception as e: ui.notify(f"Error loading src/tts/voices.json: {e}", type="negative") return [] # --- UPDATED/NEW HELPER FUNCTIONS --- def find_latest_video_for_scene(project_path: str, scene_num: int) -> str | None: """ Finds the latest rendered video for a given scene number in the project. (Implementation from Streamlit script) """ videos_dir = os.path.join(project_path, "media", "videos") search_pattern = os.path.join(videos_dir, f"*scene{scene_num}_v*") potential_folders = glob.glob(search_pattern) if not potential_folders: print(f" - No video folders found for scene {scene_num}.") return None def get_version(path): m = re.search(r"_v(\d+)", path) return int(m.group(1)) if m else -1 latest_folder = max(potential_folders, key=get_version) for res in ["1080p60", "720p30", "480p15"]: video_file = os.path.join(latest_folder, res, f"Scene{scene_num}.mp4") if os.path.exists(video_file): return video_file print(f" - No video file found for scene {scene_num} in {latest_folder}.") return None def split_narration_to_chunks(narration, max_chars=40, max_lines=2): """ Split narration into chunks suitable for SRT (by sentences, then by line length). (New function from Streamlit script) """ sentences = re.split(r"(?<=[.!?])\s+", narration) chunks = [] for sentence in sentences: if not sentence.strip(): continue wrapped = textwrap.wrap(sentence.strip(), width=max_chars) for i in range(0, len(wrapped), max_lines): block = wrapped[i : i + max_lines] chunks.append(" ".join(block)) return chunks def tcm_to_srt( tcm: list, max_line_length=40, max_lines=2, max_block_duration=4.0 ) -> str: """ Convert TCM events to SRT, splitting long narration and duration into multiple blocks. (Implementation from Streamlit script) """ def sec_to_srt_time(sec): h = int(sec // 3600) m = int((sec % 3600) // 60) s = int(sec % 60) ms = int(round((sec - int(sec)) * 1000)) return f"{h:02}:{m:02}:{s:02},{ms:003}" srt_lines = [] idx = 1 for event in tcm: narration = event.get("narrationText", "").replace("*", "").strip() if not narration or narration == "...": continue start = float(event["startTime"]) end = float(event["endTime"]) duration = end - start chunks = split_narration_to_chunks( narration, max_chars=max_line_length, max_lines=max_lines ) n_chunks = len(chunks) if n_chunks == 0: continue chunk_duration = min(duration / n_chunks, max_block_duration) chunk_start = start for chunk in chunks: chunk_end = min(chunk_start + chunk_duration, end) srt_lines.append(f"{idx}") srt_lines.append( f"{sec_to_srt_time(chunk_start)} --> {sec_to_srt_time(chunk_end)}" ) wrapped = textwrap.wrap(chunk, width=max_line_length) if len(wrapped) > max_lines: wrapped = wrapped[: max_lines - 1] + [ " ".join(wrapped[max_lines - 1 :]) ] srt_lines.extend(wrapped) srt_lines.append("") idx += 1 chunk_start = chunk_end if chunk_start >= end: break return "\n".join(srt_lines) def tcm_to_vtt( tcm: list, max_line_length=40, max_lines=2, max_block_duration=4.0 ) -> str: """ Convert TCM events directly to WebVTT format. More efficient than converting SRT → VTT. """ def sec_to_vtt_time(sec): h = int(sec // 3600) m = int((sec % 3600) // 60) s = int(sec % 60) ms = int(round((sec - int(sec)) * 1000)) return f"{h:02}:{m:02}:{s:02}.{ms:003}" vtt_lines = ['WEBVTT', ''] # Header with blank line idx = 1 for event in tcm: narration = event.get("narrationText", "").replace("*", "").strip() if not narration or narration == "...": continue start = float(event["startTime"]) end = float(event["endTime"]) duration = end - start chunks = split_narration_to_chunks( narration, max_chars=max_line_length, max_lines=max_lines ) n_chunks = len(chunks) if n_chunks == 0: continue chunk_duration = min(duration / n_chunks, max_block_duration) chunk_start = start for chunk in chunks: chunk_end = min(chunk_start + chunk_duration, end) vtt_lines.append(f"{idx}") vtt_lines.append( f"{sec_to_vtt_time(chunk_start)} --> {sec_to_vtt_time(chunk_end)}" ) wrapped = textwrap.wrap(chunk, width=max_line_length) if len(wrapped) > max_lines: wrapped = wrapped[: max_lines - 1] + [ " ".join(wrapped[max_lines - 1 :]) ] vtt_lines.extend(wrapped) vtt_lines.append("") idx += 1 chunk_start = chunk_end if chunk_start >= end: break return "\n".join(vtt_lines) def srt_to_vtt(srt_content: str) -> str: """ Convert SRT subtitle format to WebVTT format. Note: For better performance, use tcm_to_vtt() directly instead of tcm_to_srt() → srt_to_vtt(). """ lines = srt_content.strip().split('\n') vtt_lines = ['WEBVTT\n'] for line in lines: # Convert SRT timestamp format (00:00:00,000) to VTT format (00:00:00.000) if '-->' in line: line = line.replace(',', '.') vtt_lines.append(line) return '\n'.join(vtt_lines) def regenerate_subtitles_only(topic: str, output_dir: str = "output"): """ Regenerate ONLY subtitle files (SRT/VTT) without re-combining videos. Much faster than combine_videos() when video already exists. """ project_path = get_project_path(output_dir, topic) project_name = os.path.basename(os.path.dirname(project_path)) inner_folder_name = os.path.basename(project_path) output_video_path = os.path.join(project_path, f"{inner_folder_name}_combined.mp4") output_tcm_path = os.path.join(project_path, f"{inner_folder_name}_combined_tcm.json") output_srt_path = os.path.join(project_path, f"{inner_folder_name}_combined.srt") output_vtt_path = os.path.join(project_path, f"{inner_folder_name}_combined.vtt") # Check if combined video exists - REMOVED CHECK to allow rebuilding from scenes # if not os.path.exists(output_video_path): # print(f"[{topic}] ERROR: Combined video not found. Run full combine_videos() first.") # return "no_video" print(f"[{topic}] ==> Regenerating subtitles and Re-combining video scenes...") # Build TCM from scene proto_tcm files final_tcm = [] global_time_offset = 0.0 video_clips = [] # Collect scene clips for recombination try: scene_dirs = sorted( glob.glob(os.path.join(project_path, "scene*")), key=lambda x: int(re.search(r"scene(\d+)", x).group(1)), ) except (TypeError, ValueError): print(f" - ERROR: Could not sort scene directories in '{project_path}'.") return "error" for scene_dir in scene_dirs: scene_num = int(re.search(r"scene(\d+)", os.path.basename(scene_dir)).group(1)) video_path = find_latest_video_for_scene(project_path, scene_num) proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json") succ_rendered_path = os.path.join(scene_dir, "succ_rendered.txt") if not video_path or not os.path.exists(succ_rendered_path): continue # Collect video clip for recombination from moviepy import VideoFileClip video_clips.append(VideoFileClip(video_path)) if not os.path.exists(proto_tcm_path): continue try: # Get video duration from the clip we just loaded or the path # We already loaded it into video_clips, let's use that to be safe/efficient? # actually accessing .duration on the list item is fine actual_duration = video_clips[-1].duration with open(proto_tcm_path, "r", encoding="utf-8") as f: proto_tcm = json.load(f) # Load actual audio durations AND paths from voiceover cache actual_audio_durations = {} actual_audio_paths = {} # New: Store paths to rebuild audio track possible_cache_dirs = [ os.path.join(scene_dir, "code", "media", "voiceovers"), os.path.join(project_path, "media", "voiceovers"), os.path.join(os.path.dirname(video_path), "voiceovers"), ] # Find the active voiceover cache directory active_cache_dir = None for d in possible_cache_dirs: if os.path.exists(d): active_cache_dir = d break # If no cache dir exists, default to one if not active_cache_dir: active_cache_dir = possible_cache_dirs[0] os.makedirs(active_cache_dir, exist_ok=True) # Ensure all audio files exist (including "..." pauses) to fix sync # This is critical because if "..." files are missing, their duration is treated as 0 or estimated, # but having the real audio file (even if silent) locks the timing perfectly. from src.utils.kokoro_voiceover import KokoroService from pathlib import Path # We initialize a temporary service just for generation temp_speech_service = None for event in proto_tcm: narration_text = event.get("narrationText", "") if not narration_text: continue # User requested full regeneration of voiceovers to ensure sync. if narration_text.strip(): if temp_speech_service is None: temp_speech_service = KokoroService() # Force generation/retrieval for ALL text to ensure file exists and is consistent try: temp_speech_service.generate_from_text(narration_text, cache_dir=Path(active_cache_dir)) except Exception as e: print(f" - Warning: Failed to regenerate Voiceover experienced error: {e}") import traceback traceback.print_exc() # Now proceed to load the cache (durations and paths) for voiceover_cache_dir in possible_cache_dirs: if os.path.exists(voiceover_cache_dir): # Check for cache.json file (Manim voiceover format) cache_file = os.path.join(voiceover_cache_dir, "cache.json") if os.path.exists(cache_file): try: with open(cache_file, "r", encoding="utf-8") as f: cache_data = json.load(f) # cache_data is an array of audio entries for entry in cache_data: # Normalize text keys text_key = entry.get("input_text", "") if text_key and "original_audio" in entry: audio_file = os.path.join(voiceover_cache_dir, entry["original_audio"]) if os.path.exists(audio_file): from moviepy import AudioFileClip with AudioFileClip(audio_file) as audio: actual_audio_durations[text_key] = audio.duration actual_audio_paths[text_key] = audio_file except Exception as e: print(f" - Warning: Could not load cache.json: {e}") if actual_audio_durations: print(f" - Loaded {len(actual_audio_durations)} actual audio durations from cache") break # CRITICAL FIX: Only count estimated duration for events WITH narration # Empty narration events don't contribute to video duration # UPDATE: We MUST count all events, even silent ones, otherwise we drift early # Base calculation on max(audio, estimated) to account for visual padding total_ideal_duration = 0.0 event_base_durations = [] for event in proto_tcm: narration_text = event.get("narrationText", "") est_duration = event.get("estimatedDuration", 1.0) # Determine audio duration if exists audio_dur = 0.0 if narration_text and narration_text.strip(): # Check all text, including "..." if narration_text in actual_audio_durations: audio_dur = actual_audio_durations[narration_text] else: # Fallback try to find match for cached_text, cached_duration in actual_audio_durations.items(): if cached_text.strip() == narration_text.strip(): audio_dur = cached_duration break # Base Duration: Max of Audio and Estimate # This ensures we respect the visual time (estimate) if it's longer than audio # And respect audio time if it's longer than visual estimate base_dur = max(audio_dur, est_duration) event_base_durations.append(base_dur) total_ideal_duration += base_dur scaling_factor = actual_duration / total_ideal_duration if total_ideal_duration > 0 else 1.0 # Collect audio clips for reconstruction audio_clips_collection = [] # Build TCM with proper timing scene_time_offset = 0 for i, event in enumerate(proto_tcm): narration_text = event.get("narrationText", "") # Apply scaling to the ideal base duration # This distributes any global timing difference (transistions, rendering overhead) proportionally base_dur = event_base_durations[i] final_event_duration = base_dur * scaling_factor current_start_time = global_time_offset + scene_time_offset event["startTime"] = f"{current_start_time:.3f}" event["endTime"] = f"{current_start_time + final_event_duration:.3f}" event["conceptId"] = f"{project_name}.scene_{scene_num}.{event.get('conceptName', 'event').replace(' ', '_')}" event["_sync_fix_applied"] = True # Add audio clip to collection if available # This reconstructs the audio track with the exact same timing as the subtitles if narration_text and narration_text.strip(): audio_path = None if narration_text in actual_audio_paths: audio_path = actual_audio_paths[narration_text] else: # Fallback search for cached_text, cached_path in actual_audio_paths.items(): if cached_text.strip() == narration_text.strip(): audio_path = cached_path break if audio_path: try: from moviepy import AudioFileClip # Create Audio clip starting at the exact synchronized time # We limit duration to final_event_duration to avoid overlap if massive scaling happened (rare) ac = AudioFileClip(audio_path).set_start(current_start_time) audio_clips_collection.append(ac) except Exception as audio_err: print(f"Warning: Failed to load audio clip {audio_path}: {audio_err}") if "estimatedDuration" in event: del event["estimatedDuration"] final_tcm.append(event) scene_time_offset += final_event_duration global_time_offset += actual_duration except Exception as e: print(f" - ERROR processing Scene {scene_num}: {e}") if final_tcm: # Save updated TCM with open(output_tcm_path, "w", encoding="utf-8") as f: json.dump(final_tcm, f, indent=2, ensure_ascii=False) # Generate subtitles srt_content = tcm_to_srt(final_tcm) with open(output_srt_path, "w", encoding="utf-8") as f: f.write(srt_content) vtt_content = tcm_to_vtt(final_tcm) with open(output_vtt_path, "w", encoding="utf-8") as f: f.write(vtt_content) # RE-RENDER VIDEO WITH NEW AUDIO BY RE-COMBINING SCENES try: print(f"[{topic}] ==> Re-combining scene videos with synchronized audio...") from moviepy import VideoFileClip, CompositeAudioClip, concatenate_videoclips # Combine original scene clips if not video_clips: print(f"[{topic}] Error: No video clips found to combine.") return "error" original_video = concatenate_videoclips(video_clips) # Create new composite audio if audio_clips_collection: new_audio = CompositeAudioClip(audio_clips_collection) # Ensure audio doesn't exceed video duration new_audio = new_audio.set_duration(original_video.duration) # Set new audio to video final_video = original_video.set_audio(new_audio) # Write to temp file first then rename to avoid corruption temp_output = output_video_path.replace(".mp4", "_temp_sync.mp4") final_video.write_videofile( temp_output, codec="libx264", audio_codec="aac", logger="bar", threads=4 ) # Close clips original_video.close() new_audio.close() final_video.close() # Close individual scene clips for clip in video_clips: clip.close() # Replace old file if os.path.exists(temp_output): if os.path.exists(output_video_path): os.remove(output_video_path) os.rename(temp_output, output_video_path) print(f"[{topic}] ==> Video re-combined and re-rendered successfully with new audio!") else: # If no Audio, just save the combined video? # Or wait, if we have no audio clips, maybe we shouldn't save? # But the user might want a silent video. # For now let's assume we proceed but warn. print(f"[{topic}] Warning: No audio clips collected. Saving Combined Video without new audio track.") original_video.write_videofile( output_video_path, codec="libx264", audio_codec="aac", logger="bar", threads=4 ) original_video.close() for clip in video_clips: clip.close() except Exception as vid_err: print(f"[{topic}] ERROR re-rendering video: {vid_err}") import traceback traceback.print_exc() print(f"[{topic}] ==> Project finalized.") return "success" else: print(f"[{topic}] <== No TCM data found.") return "no_data" def combine_videos(topic: str, output_dir: str = "output"): """ Combines all videos for a topic and generates the final, fine-grained TCM and SRT subtitles. (Implementation from Streamlit script, adapted with NiceGUI notifications) """ project_path = get_project_path(output_dir, topic) project_name = os.path.basename(os.path.dirname(project_path)) inner_folder_name = os.path.basename(project_path) output_video_path = os.path.join(project_path, f"{inner_folder_name}_combined.mp4") output_tcm_path = os.path.join( project_path, f"{inner_folder_name}_combined_tcm.json" ) output_srt_path = os.path.join(project_path, f"{inner_folder_name}_combined.srt") output_vtt_path = os.path.join(project_path, f"{inner_folder_name}_combined.vtt") # Check if we need to regenerate due to subtitle sync improvements needs_regeneration = False if os.path.exists(output_tcm_path): # Check if TCM was created with actual audio durations (has a marker) try: with open(output_tcm_path, "r", encoding="utf-8") as f: tcm_data = json.load(f) # Check if this TCM has the sync fix marker has_sync_fix = any(event.get("_sync_fix_applied", False) for event in tcm_data) if not has_sync_fix: print(f"[{topic}] Detected old subtitle timing - will regenerate with improved sync") needs_regeneration = True except: needs_regeneration = True if ( os.path.exists(output_video_path) and os.path.exists(output_tcm_path) and os.path.exists(output_srt_path) and os.path.exists(output_vtt_path) and not needs_regeneration ): msg = f"[{topic}] Combined assets already exist with improved sync. Skipping." print(msg) return "already_exists" print( f"[{topic}] ==> Finalizing project: Combining videos and creating global TCM..." ) print( f"[{topic}] ==> Finalizing project: Combining videos and creating global TCM..." ) final_tcm = [] video_clips = [] # Collect VideoFileClip objects audio_clips_collection = [] # Collect AudioFileClip objects for reconstruction global_time_offset = 0.0 try: scene_dirs = sorted( glob.glob(os.path.join(project_path, "scene*")), key=lambda x: int(re.search(r"scene(\d+)", x).group(1)), ) except (TypeError, ValueError): print( f" - ERROR: Could not sort scene directories in '{project_path}'. Check folder names." ) return # Initialize Kokoro service lazily if needed for regeneration from src.utils.kokoro_voiceover import KokoroService from pathlib import Path from moviepy import VideoFileClip, AudioFileClip, CompositeAudioClip, concatenate_videoclips temp_speech_service = None for scene_dir in scene_dirs: scene_num = int(re.search(r"scene(\d+)", os.path.basename(scene_dir)).group(1)) video_path = find_latest_video_for_scene(project_path, scene_num) proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json") succ_rendered_path = os.path.join(scene_dir, "succ_rendered.txt") if not video_path or not os.path.exists(succ_rendered_path): print( f" - Skipping Scene {scene_num}: Video or succ_rendered.txt missing." ) continue if not os.path.exists(proto_tcm_path): print( f" - WARNING: proto_tcm.json not found for Scene {scene_num}. Skipping fine-grained analysis." ) continue try: # Load Video Clip for Recombination current_video_clip = VideoFileClip(video_path) video_clips.append(current_video_clip) actual_duration = current_video_clip.duration with open(proto_tcm_path, "r", encoding="utf-8") as f: proto_tcm = json.load(f) # --- AUDIO VERIFICATION & LOADING --- actual_audio_durations = {} actual_audio_paths = {} possible_cache_dirs = [ os.path.join(scene_dir, "code", "media", "voiceovers"), os.path.join(project_path, "media", "voiceovers"), os.path.join(os.path.dirname(video_path), "voiceovers"), ] active_cache_dir = None for d in possible_cache_dirs: if os.path.exists(d): active_cache_dir = d break if not active_cache_dir: active_cache_dir = possible_cache_dirs[0] os.makedirs(active_cache_dir, exist_ok=True) # 1. Verify/Regenerate all audio files for event in proto_tcm: narration_text = event.get("narrationText", "") if not narration_text: continue # Check consistency / regenerate if needed if narration_text.strip(): if temp_speech_service is None: temp_speech_service = KokoroService() try: temp_speech_service.generate_from_text(narration_text, cache_dir=Path(active_cache_dir)) except Exception as e: print(f" - Warning: Failed to regenerate Voiceover experienced error: {e}") # 2. Load Audio Paths from Cache for voiceover_cache_dir in possible_cache_dirs: if os.path.exists(voiceover_cache_dir): cache_file = os.path.join(voiceover_cache_dir, "cache.json") if os.path.exists(cache_file): try: with open(cache_file, "r", encoding="utf-8") as f: cache_data = json.load(f) for entry in cache_data: text_key = entry.get("input_text", "") if text_key and "original_audio" in entry: audio_file = os.path.join(voiceover_cache_dir, entry["original_audio"]) if os.path.exists(audio_file): with AudioFileClip(audio_file) as audio: actual_audio_durations[text_key] = audio.duration actual_audio_paths[text_key] = audio_file except Exception as e: print(f" - Warning: Could not load cache.json: {e}") if actual_audio_durations: break # --- DURATION CALCULATION --- total_ideal_duration = 0.0 event_base_durations = [] for event in proto_tcm: narration_text = event.get("narrationText", "") est_duration = event.get("estimatedDuration", 1.0) audio_dur = 0.0 if narration_text and narration_text.strip(): if narration_text in actual_audio_durations: audio_dur = actual_audio_durations[narration_text] else: for cached_text, cached_duration in actual_audio_durations.items(): if cached_text.strip() == narration_text.strip(): audio_dur = cached_duration break base_dur = max(audio_dur, est_duration) event_base_durations.append(base_dur) total_ideal_duration += base_dur scaling_factor = actual_duration / total_ideal_duration if total_ideal_duration > 0 else 1.0 if not actual_audio_durations: print(f" - Warning: No actual audio durations found, using proportional scaling based on estimates") # --- BUILD TCM & COLLECT AUDIO CLIPS --- scene_time_offset = 0 for i, event in enumerate(proto_tcm): narration_text = event.get("narrationText", "") base_dur = event_base_durations[i] final_event_duration = base_dur * scaling_factor current_start_time = global_time_offset + scene_time_offset event["startTime"] = f"{current_start_time:.3f}" event["endTime"] = f"{current_start_time + final_event_duration:.3f}" event["conceptId"] = f"{project_name}.scene_{scene_num}.{event.get('conceptName', 'event').replace(' ', '_')}" event["_sync_fix_applied"] = True # Collect Audio Clip if narration_text and narration_text.strip(): audio_path = None if narration_text in actual_audio_paths: audio_path = actual_audio_paths[narration_text] else: for cached_text, cached_path in actual_audio_paths.items(): if cached_text.strip() == narration_text.strip(): audio_path = cached_path break if audio_path: try: ac = AudioFileClip(audio_path).set_start(current_start_time) audio_clips_collection.append(ac) except Exception as audio_err: print(f"Warning: Failed to load audio clip {audio_path}: {audio_err}") if "estimatedDuration" in event: del event["estimatedDuration"] final_tcm.append(event) scene_time_offset += final_event_duration print( f" - Processed Scene {scene_num} (Duration: {actual_duration:.2f}s), {len(proto_tcm)} TCM entries." ) global_time_offset += actual_duration except Exception as e: print(f" - ERROR processing Scene {scene_num}: {e}") import traceback traceback.print_exc() if video_clips: # Concatenate Visuals final_video_clip = concatenate_videoclips(video_clips) # Apply Composite Audio if available if audio_clips_collection: new_audio = CompositeAudioClip(audio_clips_collection) new_audio = new_audio.set_duration(final_video_clip.duration) final_video_clip = final_video_clip.set_audio(new_audio) print(f"[{topic}] ==> Applied synchronized composite audio track.") final_video_clip.write_videofile( output_video_path, codec="libx264", audio_codec="aac", logger="bar", threads=4 ) with open(output_tcm_path, "w", encoding="utf-8") as f: json.dump(final_tcm, f, indent=2, ensure_ascii=False) # Generate subtitles in both SRT and VTT formats srt_content = tcm_to_srt(final_tcm) with open(output_srt_path, "w", encoding="utf-8") as f: f.write(srt_content) vtt_content = tcm_to_vtt(final_tcm) with open(output_vtt_path, "w", encoding="utf-8") as f: f.write(vtt_content) print(f"[{topic}] ==> Project finalized.") # Close all clips # Note: CompositeAudioClip closes its children when closed if we are careful, # but manual closing is safer in moviepy if audio_clips_collection: new_audio.close() # Close composite final_video_clip.close() for clip in video_clips: clip.close() return "success" else: print(f"[{topic}] <== No rendered scenes found to finalize.") return "no_scenes" # --- VideoGenerator Class (from generate_video.py) --- class VideoGenerator: """ A class for generating manim videos using AI models. This class coordinates the video generation pipeline by managing scene planning, code generation, and video rendering. It supports concurrent scene processing, visual code fixing, and RAG (Retrieval Augmented Generation). """ def __init__( self, planner_model, scene_model=None, output_dir="output", verbose=False, use_rag=False, use_context_learning=False, context_learning_path="data/context_learning", chroma_db_path="data/rag/chroma_db", manim_docs_path="data/rag/manim_docs", embedding_model="azure/text-embedding-3-large", use_visual_fix_code=False, use_langfuse=True, trace_id=None, max_scene_concurrency: int = 5, ): print("Initializing VideoGenerator...") self.output_dir = output_dir self.verbose = verbose self.use_visual_fix_code = use_visual_fix_code self.session_id = self._load_or_create_session_id() self.scene_semaphore = asyncio.Semaphore(max_scene_concurrency) print(f"Scene concurrency limit set to: {max_scene_concurrency}") self.banned_reasonings = get_banned_reasonings() self.failed_scenes = [] self.error_recovery = ErrorRecovery(output_dir) self.rate_limit_detected = False self.last_rate_limit_time = 0 print("Initializing sub-modules: VideoPlanner, CodeGenerator, VideoRenderer...") self.planner = VideoPlanner( planner_model=planner_model, output_dir=output_dir, print_response=verbose, use_context_learning=use_context_learning, context_learning_path=context_learning_path, use_rag=use_rag, session_id=self.session_id, chroma_db_path=chroma_db_path, manim_docs_path=manim_docs_path, embedding_model=embedding_model, use_langfuse=use_langfuse, ) self.code_generator = CodeGenerator( scene_model=scene_model if scene_model is not None else planner_model, output_dir=output_dir, print_response=verbose, use_rag=use_rag, use_context_learning=use_context_learning, context_learning_path=context_learning_path, chroma_db_path=chroma_db_path, manim_docs_path=manim_docs_path, embedding_model=embedding_model, use_visual_fix_code=use_visual_fix_code, use_langfuse=use_langfuse, session_id=self.session_id, ) self.video_renderer = VideoRenderer( output_dir=output_dir, print_response=verbose, use_visual_fix_code=self.use_visual_fix_code, ) print("VideoGenerator initialized successfully.") def _load_or_create_session_id(self) -> str: """Load existing session ID from file or create a new one.""" session_file = os.path.join(self.output_dir, "session_id.txt") if os.path.exists(session_file): with open(session_file, "r") as f: session_id = f.read().strip() return session_id session_id = str(uuid.uuid4()) print(f"No existing session ID found. Creating a new one: {session_id}") os.makedirs(self.output_dir, exist_ok=True) with open(session_file, "w", encoding='utf-8') as f: f.write(session_id) print(f"Saved new session ID to {session_file}") return session_id def generate_scene_outline(self, topic: str, description: str, session_id: str) -> str: """Generate scene outline using VideoPlanner.""" print(f"[{topic}] ==> Generating scene outline...") outline = self.planner.generate_scene_outline(topic, description, session_id) print(f"[{topic}] ==> Scene outline generated successfully.") return outline async def generate_scene_implementation_concurrently( self, topic: str, description: str, plan: str, session_id: str ): """Generate scene implementations concurrently using VideoPlanner.""" print(f"[{topic}] ==> Generating scene implementations concurrently...") implementations = await self.planner.generate_scene_implementation_concurrently( topic, description, plan, session_id, self.scene_semaphore ) print(f"[{topic}] ==> All concurrent scene implementations generated.") return implementations async def render_video_fix_code( self, topic: str, description: str, scene_outline: str, implementation_plans: list, max_retries=3, session_id: str = None, ): """Render the video for all scenes with code fixing capability.""" print(f"[{topic}] ==> Preparing to render {len(implementation_plans)} scenes...") file_prefix = topic.lower() file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix) tasks = [] for scene_num, implementation_plan in implementation_plans: scene_dir = os.path.join(self.output_dir, topic, file_prefix, f"scene{scene_num}") subplan_dir = os.path.join(scene_dir, "subplans") os.makedirs(subplan_dir, exist_ok=True) scene_trace_id_path = os.path.join(subplan_dir, "scene_trace_id.txt") try: with open(scene_trace_id_path, "r") as f: scene_trace_id = f.read().strip() except FileNotFoundError: scene_trace_id = str(uuid.uuid4()) with open(scene_trace_id_path, "w", encoding='utf-8') as f: f.write(scene_trace_id) proto_tcm_str = "" proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json") if os.path.exists(proto_tcm_path): with open(proto_tcm_path, "r") as f: proto_tcm_str = f.read() task = self.process_scene( scene_num - 1, scene_outline, implementation_plan, proto_tcm_str, topic, description, max_retries, file_prefix, session_id, scene_trace_id, ) tasks.append(task) print(f"[{topic}] Starting concurrent processing of {len(tasks)} scenes...") await asyncio.gather(*tasks) print(f"[{topic}] <== All scene processing tasks completed.") def _save_topic_session_id(self, topic: str, session_id: str): """Save session ID for a specific topic.""" file_prefix = topic.lower() file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) topic_dir = os.path.join(self.output_dir, topic, file_prefix) os.makedirs(topic_dir, exist_ok=True) session_file = os.path.join(topic_dir, "session_id.txt") with open(session_file, 'w', encoding='utf-8') as f: f.write(session_id) def _load_topic_session_id(self, topic: str): """Load session ID for a specific topic if it exists.""" file_prefix = topic.lower() file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix) session_file = os.path.join(self.output_dir, topic, file_prefix, "session_id.txt") if os.path.exists(session_file): with open(session_file, "r") as f: return f.read().strip() return None def cleanup_invalid_success_markers(self, topic: str) -> int: """Remove succ_rendered.txt files for scenes that don't actually have rendered videos.""" file_prefix = re.sub(r'[^a-z0-9_]+', '_', topic.lower()) topic_dir = os.path.join(self.output_dir, topic, file_prefix) if not os.path.exists(topic_dir): return 0 removed_count = 0 scene_dirs = glob.glob(os.path.join(topic_dir, "scene*")) for scene_dir in scene_dirs: if not os.path.isdir(scene_dir): continue scene_match = re.search(r'scene(\d+)', os.path.basename(scene_dir)) if not scene_match: continue scene_num = scene_match.group(1) succ_file = os.path.join(scene_dir, "succ_rendered.txt") if os.path.exists(succ_file): media_dir = os.path.join(topic_dir, "media", "videos") video_pattern = os.path.join(media_dir, f"{file_prefix}_scene{scene_num}_v*") video_folders = glob.glob(video_pattern) has_video = False for video_folder in video_folders: for res_dir in ["1080p60", "720p30", "480p15"]: video_file = os.path.join(video_folder, res_dir, f"Scene{scene_num}.mp4") if os.path.exists(video_file): has_video = True break if has_video: break if not has_video: os.remove(succ_file) removed_count += 1 return removed_count def load_implementation_plans(self, topic: str): """Load implementation plans for each scene.""" file_prefix = topic.lower() file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix) scene_outline_path = os.path.join( self.output_dir, topic, file_prefix, f"{file_prefix}_scene_outline.txt" ) if not os.path.exists(scene_outline_path): return {} with open(scene_outline_path, "r") as f: scene_outline = f.read() scene_outline_content = extract_xml(scene_outline, "SCENE_OUTLINE") scene_number = 0 if scene_outline_content: scene_number = len(re.findall(r"[^<]", scene_outline_content)) implementation_plans = {} for i in range(1, scene_number + 1): plan_path = os.path.join( self.output_dir, topic, file_prefix, f"scene{i}", f"{file_prefix}_scene{i}_implementation_plan.txt", ) if os.path.exists(plan_path): with open(plan_path, "r") as f: implementation_plans[i] = f.read() else: implementation_plans[i] = None return implementation_plans async def _generate_scene_implementation_single( self, topic: str, description: str, scene_outline_i: str, i: int, file_prefix: str, session_id: str, scene_trace_id: str ): """Orchestrates the generation of a detailed plan and Proto-TCM for a single scene.""" full_llm_response_obj = await self.planner._generate_scene_implementation_single( topic, description, scene_outline_i, i, file_prefix, session_id, scene_trace_id ) if isinstance(full_llm_response_obj, dict) and "plan" in full_llm_response_obj and "proto_tcm" in full_llm_response_obj: plan = full_llm_response_obj["plan"] proto_tcm_str = full_llm_response_obj["proto_tcm"] else: full_llm_response = "" if isinstance(full_llm_response_obj, str): full_llm_response = full_llm_response_obj elif isinstance(full_llm_response_obj, dict): try: full_llm_response = full_llm_response_obj["choices"][0]["message"]["content"] except (KeyError, IndexError, TypeError): if "content" in full_llm_response_obj: full_llm_response = full_llm_response_obj["content"] else: full_llm_response = str(full_llm_response_obj) else: full_llm_response = str(full_llm_response_obj) plan = extract_xml(full_llm_response, "SCENE_TECHNICAL_IMPLEMENTATION_PLAN") if not plan or "" not in plan: plan = full_llm_response proto_tcm_str = extract_xml(full_llm_response, "SCENE_PROTO_TCM") scene_dir = os.path.join(self.output_dir, topic, file_prefix, f"scene{i}") os.makedirs(scene_dir, exist_ok=True) if proto_tcm_str and "" not in proto_tcm_str: try: proto_tcm_data = json.loads(proto_tcm_str) proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json") with open(proto_tcm_path, "w", encoding="utf-8") as f: json.dump(proto_tcm_data, f, indent=2, ensure_ascii=False) except json.JSONDecodeError: proto_tcm_str = "" else: proto_tcm_str = "" plan_path = os.path.join(scene_dir, f"{file_prefix}_scene{i}_implementation_plan.txt") with open(plan_path, "w", encoding="utf-8") as f: f.write(plan) return {"plan": plan, "proto_tcm": proto_tcm_str} async def generate_video_pipeline( self, topic: str, description: str, max_retries: int, only_plan: bool = False, specific_scenes: list = None, progress_callback=None ): """Modified pipeline to handle partial scene completions.""" # Create a simple args object for compatibility class Args: only_render = False args = Args() topic_folder_session_file = os.path.join(self.output_dir, topic, "session_id.txt") if os.path.exists(topic_folder_session_file): with open(topic_folder_session_file, "r") as f: session_id = f.read().strip() else: session_id = self._load_or_create_session_id() os.makedirs(os.path.join(self.output_dir, topic), exist_ok=True) with open(topic_folder_session_file, "w", encoding='utf-8') as f: f.write(session_id) self._save_topic_session_id(topic, session_id) file_prefix = topic.lower() file_prefix = re.sub(r"[^a-z0-9_]+", "_", file_prefix) scene_outline_path = os.path.join( self.output_dir, topic, file_prefix, f"{file_prefix}_scene_outline.txt" ) if progress_callback: progress_callback(0.05, "šŸ“ Planning your video structure...") if os.path.exists(scene_outline_path): with open(scene_outline_path, "r") as f: scene_outline = f.read() else: scene_outline_obj = self.planner.generate_scene_outline(topic, description, session_id) scene_outline = "" if isinstance(scene_outline_obj, str): scene_outline = scene_outline_obj elif isinstance(scene_outline_obj, dict): try: scene_outline = scene_outline_obj["choices"][0]["message"]["content"] except (KeyError, IndexError, TypeError): if "content" in scene_outline_obj: scene_outline = scene_outline_obj["content"] else: scene_outline = str(scene_outline_obj) else: scene_outline = str(scene_outline_obj) if not scene_outline or "" not in scene_outline: raise ValueError(f"[{topic}] FAILED to generate a valid scene outline.") os.makedirs(os.path.join(self.output_dir, topic, file_prefix), exist_ok=True) with open(scene_outline_path, "w", encoding="utf-8") as f: f.write(scene_outline) if progress_callback: progress_callback(0.15, "āœ… Video structure ready") removed = self.cleanup_invalid_success_markers(topic) if progress_callback: progress_callback(0.20, "šŸŽØ Designing each scene...") implementation_plans_dict = self.load_implementation_plans(topic) scene_outline_content = extract_xml(scene_outline, "SCENE_OUTLINE") scene_numbers = len(re.findall(r"[^<]", scene_outline_content)) if scene_outline_content else 0 missing_scenes = [] for i in range(1, scene_numbers + 1): if implementation_plans_dict.get(i) is None and (specific_scenes is None or i in specific_scenes): missing_scenes.append(i) if missing_scenes: for idx, scene_num in enumerate(missing_scenes): if progress_callback: plan_progress = 0.20 + (0.15 * (idx / len(missing_scenes))) progress_callback(plan_progress, f"šŸŽØ Designing scene {scene_num} of {scene_numbers}...") scene_match = re.search(f"(.*?)", scene_outline_content, re.DOTALL) if scene_match: scene_outline_i = scene_match.group(1) scene_trace_id = str(uuid.uuid4()) implementation_details = await self._generate_scene_implementation_single( topic, description, scene_outline_i, scene_num, file_prefix, session_id, scene_trace_id ) implementation_plans_dict[scene_num] = implementation_details["plan"] if progress_callback: progress_callback(0.35, "āœ… All scenes designed") if only_plan: return if progress_callback: progress_callback(0.40, "šŸŽ¬ Creating animations...") sorted_scene_numbers = sorted(implementation_plans_dict.keys()) processed_count = 0 total_scenes_to_process = len([s for s in sorted_scene_numbers if not specific_scenes or s in specific_scenes]) for scene_num in sorted_scene_numbers: if specific_scenes and scene_num not in specific_scenes: continue scene_dir = os.path.join(self.output_dir, topic, file_prefix, f"scene{scene_num}") is_rendered = os.path.exists(os.path.join(scene_dir, "succ_rendered.txt")) if is_rendered and not args.only_render: continue implementation_plan = implementation_plans_dict.get(scene_num) if not implementation_plan: continue proto_tcm_str = "" proto_tcm_path = os.path.join(scene_dir, "proto_tcm.json") if os.path.exists(proto_tcm_path): with open(proto_tcm_path, "r") as f: proto_tcm_str = f.read() scene_trace_id_path = os.path.join(scene_dir, "subplans", "scene_trace_id.txt") if os.path.exists(scene_trace_id_path): with open(scene_trace_id_path, "r") as f: scene_trace_id = f.read().strip() else: os.makedirs(os.path.dirname(scene_trace_id_path), exist_ok=True) scene_trace_id = str(uuid.uuid4()) with open(scene_trace_id_path, "w", encoding='utf-8') as f: f.write(scene_trace_id) if progress_callback: scene_progress = 0.40 + (0.50 * (processed_count / total_scenes_to_process)) progress_callback(scene_progress, f"šŸŽ¬ Animating scene {scene_num} of {scene_numbers}...") await self.process_scene( i=scene_num - 1, scene_outline=scene_outline, scene_implementation=implementation_plan, proto_tcm=proto_tcm_str, topic=topic, description=description, max_retries=max_retries, file_prefix=file_prefix, session_id=session_id, scene_trace_id=scene_trace_id, ) processed_count += 1 if progress_callback: scene_progress = 0.40 + (0.50 * (processed_count / total_scenes_to_process)) progress_callback(scene_progress, f"āœ… Scene {scene_num} done!") if progress_callback: progress_callback(0.90, "āœ… All animations complete!") async def process_scene( self, i: int, scene_outline: str, scene_implementation: str, proto_tcm: str, topic: str, description: str, max_retries: int, file_prefix: str, session_id: str, scene_trace_id: str, ): """Process a single scene using CodeGenerator and VideoRenderer.""" curr_scene = i + 1 curr_version = 0 rag_queries_cache = {} code_dir = os.path.join( self.output_dir, topic, file_prefix, f"scene{curr_scene}", "code" ) os.makedirs(code_dir, exist_ok=True) media_dir = os.path.join(self.output_dir, topic, file_prefix, "media") async with self.scene_semaphore: print(f"Scene {curr_scene} ---> Generating animation code...") code, log = self.code_generator.generate_manim_code( topic=topic, description=description, scene_outline=scene_outline, scene_implementation=scene_implementation, proto_tcm=proto_tcm, scene_number=curr_scene, additional_context=[ _prompt_manim_cheatsheet, _code_font_size, _code_limit, _code_disable, ], scene_trace_id=scene_trace_id, session_id=session_id, rag_queries_cache=rag_queries_cache, ) log_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_init_log.txt") code_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py") with open(log_path, "w", encoding="utf-8") as f: f.write(log) with open(code_path, "w", encoding="utf-8") as f: f.write(code) print(f"Scene {curr_scene} āœ… Animation code generated.") print(f"Scene {curr_scene} ---> Rendering animation...") error_message = None while curr_version < max_retries: code, error_message = await self.video_renderer.render_scene( code=code, file_prefix=file_prefix, curr_scene=curr_scene, curr_version=curr_version, code_dir=code_dir, media_dir=media_dir, max_retries=max_retries, use_visual_fix_code=self.use_visual_fix_code, visual_self_reflection_func=self.code_generator.visual_self_reflection, banned_reasonings=self.banned_reasonings, scene_trace_id=scene_trace_id, topic=topic, session_id=session_id, ) if error_message is None: print(f"Scene {curr_scene} āœ… Animation rendered successfully.") break curr_version += 1 if curr_version >= max_retries: print(f"Scene {curr_scene} āš ļø Failed to render after {max_retries} attempts. Check logs for details.") self.failed_scenes.append({ 'topic': topic, 'scene': curr_scene, 'last_error': error_message, 'total_attempts': curr_version + 1, }) break print(f"Scene {curr_scene} ---> Fixing code issues (attempt {curr_version + 1}/{max_retries})...") print(f"Scene {curr_scene} ---> Analyzing error and generating fix...") code, log = self.code_generator.fix_code_errors( implementation_plan=scene_implementation, proto_tcm=proto_tcm, code=code, error=error_message, scene_trace_id=scene_trace_id, topic=topic, scene_number=curr_scene, session_id=session_id, rag_queries_cache=rag_queries_cache, ) print(f"Scene {curr_scene} ---> Fixed code generated, saving...") log_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_fix_log.txt") code_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py") with open(log_path, "w", encoding="utf-8") as f: f.write(log) with open(code_path, "w", encoding="utf-8") as f: f.write(code) print(f"Scene {curr_scene} ---> Re-rendering with fixed code...") # --- Initialize Video Generator --- def get_video_generator( planner_name, scene_concurrency, ): llm_kwargs = { "temperature": 0.7, "print_cost": True, } planner_model, scene_model = LiteLLMWrapper( model_name=planner_name, **llm_kwargs ), LiteLLMWrapper(model_name=planner_name, **llm_kwargs) return VideoGenerator( planner_model=planner_model, scene_model=scene_model, output_dir="output", max_scene_concurrency=scene_concurrency, ) try: video_generator = get_video_generator( app_state["planner_model_name"], app_state["max_scene_concurrency"] ) video_generator.check_theorem_status = MethodType( check_status, video_generator ) video_generator.set_active_output_dir = MethodType( set_active_output_dir, video_generator ) except Exception as e: print(f"Failed to initialize VideoGenerator: {e}") video_generator = None # --- UI Definition --- app.add_static_files("/output", "output") # --- Streamlit-Inspired Color Scheme --- THEME_COLORS = { "primary": "#FF4B4B", # Streamlit's signature red "secondary": "#4A4A4A", # Dark gray for text and secondary elements "accent": "#00A2FF", # A bright blue for accents "positive": "#28A745", # Success green "negative": "#DC3545", # Error red "info": "#17A2B8", # Informational teal "warning": "#FFC107", # Warning yellow } @ui.page("/") async def main_page(): # --- Page Configuration --- ui.colors( primary=THEME_COLORS["primary"], secondary=THEME_COLORS["secondary"], accent=THEME_COLORS["accent"], positive=THEME_COLORS["positive"], negative=THEME_COLORS["negative"], info=THEME_COLORS["info"], warning=THEME_COLORS["warning"], ) ui.add_head_html( '' ) ui.add_body_html(''' ''') # Add custom CSS for chat messages ui.add_head_html(''' ''') dark_mode = ui.dark_mode(value=False) # Add comprehensive JavaScript to enforce dark mode ui.add_body_html(''' ''') # --- Modern Header with Enhanced Styling --- with ui.header().classes( "bg-white dark:bg-gray-900 text-gray-900 dark:text-white shadow-sm" ).style("height: 64px; padding: 0 24px; display: flex; align-items: center; backdrop-filter: blur(10px);"): with ui.row().classes("w-full items-center justify-between").style("height: 100%;"): with ui.row(align_items="center").classes("gap-3"): # Enhanced menu button with hover effect menu_btn = ui.button( icon="menu", on_click=lambda: left_drawer.toggle() ).props("flat round").classes( "text-gray-600 dark:text-gray-300 hover:bg-gray-100 dark:hover:bg-gray-800" ).tooltip("Toggle sidebar") # Animated brand section with ui.row(align_items="center").classes("gap-2"): ui.icon("movie", size="md").classes("text-primary") ui.label("AlgoVision").classes("text-xl font-bold tracking-tight text-gray-900 dark:text-white") with ui.row(align_items="center").classes("gap-4"): ui.label("Educational Video Generator").classes( "text-sm text-gray-500 dark:text-gray-400 hidden md:block font-medium" ) # Enhanced dark mode toggle with icon transition dark_mode_btn = ui.button( icon="light_mode" if dark_mode.value else "dark_mode", on_click=lambda: (dark_mode.toggle(), dark_mode_btn.props(f"icon={'dark_mode' if dark_mode.value else 'light_mode'}")) ).props("flat round").classes( "text-gray-600 dark:text-gray-300 hover:bg-gray-100 dark:hover:bg-gray-800" ).tooltip("Toggle dark mode") # --- Modern Sidebar (Left Drawer) with Enhanced UX --- left_drawer = ui.left_drawer(value=True, fixed=True, bordered=True).classes( "bg-gradient-to-b from-gray-50 to-white dark:from-gray-900 dark:to-gray-800" ) with left_drawer: # Sidebar Header with branding and primary accent with ui.row().classes("w-full items-center justify-center px-4 py-3 border-b-2 border-primary/30 dark:border-primary/40").style("flex-shrink: 0; backdrop-filter: blur(10px);"): with ui.column().classes("gap-1 items-center text-center"): with ui.row().classes("items-center gap-2"): ui.icon("settings", size="md").classes("text-primary") ui.label("Settings").classes("text-2xl font-bold text-gray-900 dark:text-white") ui.label("Configure your workspace").classes("text-xs text-primary/70 dark:text-primary/60 font-medium") # Scrollable content area with custom scrollbar - fixed height to enable proper scrolling with ui.column().classes("w-full gap-3 px-4 pt-4 pb-2 overflow-y-auto overflow-x-hidden custom-scrollbar").style("flex: 1 1 auto; min-height: 0; max-height: 100%;"): # Model Configuration Section with primary accent with ui.expansion( "Model Configuration", icon="psychology", value=True ).classes( "w-full bg-white dark:bg-gray-800 rounded-lg shadow-sm " "hover:shadow-md border border-gray-200 dark:border-gray-700" ).props("dense duration=600"): ui.separator().classes("bg-gradient-to-r from-primary/30 via-primary/10 to-transparent").style("margin: 0; height: 1px;") with ui.column().classes("w-full gap-3 px-3 pt-2 pb-3"): with ui.row().classes("items-center gap-2 mb-2"): ui.icon("smart_toy", size="sm").classes("text-primary") ui.label("AI Model").classes("text-sm font-semibold text-primary dark:text-primary") def prettify_model_name(model_id: str) -> str: name_part = model_id.split("/")[-1].replace("-", " ").replace("_", " ") return " ".join(word.capitalize() for word in name_part.split()) model_display_map = { model_id: prettify_model_name(model_id) for model_id in allowed_models } default_model = app_state.get("planner_model_name") or next( iter(model_display_map) ) model_select = ui.select( model_display_map, label="Select LLM Model", value=default_model, on_change=lambda e: app_state.update({"planner_model_name": e.value}), ).props("outlined dense").classes("w-full") # Model info badge with ui.row().classes("items-center gap-2 mt-1"): ui.icon("info", size="xs").classes("text-blue-500") ui.label("Powers system logic & output").classes("text-xs text-gray-500 dark:text-gray-400") # TTS Configuration Section with primary accent on hover with ui.expansion( "Voice Configuration", icon="record_voice_over", value=True ).classes( "w-full bg-white dark:bg-gray-800 rounded-lg shadow-sm " "hover:shadow-md border border-gray-200 dark:border-gray-700" ).props("dense duration=600"): ui.separator().classes("bg-gradient-to-r from-primary/30 via-primary/10 to-transparent").style("margin: 0; height: 1px;") with ui.column().classes("w-full gap-3 px-3 pt-2 pb-3"): with ui.row().classes("items-center gap-2 mb-2"): ui.icon("mic", size="sm").classes("text-primary") ui.label("Voice Selection").classes("text-sm font-semibold text-primary dark:text-primary") voices = load_voices("src/tts/voices.json") if voices: voice_options = {v["id"]: v["name"] for v in voices} first_voice_id = next(iter(voice_options)) ui.select( voice_options, label="Choose Voice", value=first_voice_id ).props("outlined dense").classes("w-full") with ui.row().classes("items-center gap-1 mt-1"): ui.icon("info", size="xs").classes("text-blue-500") ui.label(f"{len(voices)} voices available").classes("text-xs text-gray-500 dark:text-gray-400") else: with ui.card().classes("w-full bg-orange-50 dark:bg-orange-900/20 border border-orange-200 dark:border-orange-800"): with ui.row().classes("items-center gap-2 p-2"): ui.icon("warning", size="sm").classes("text-orange-500") ui.label("No voices found").classes("text-sm text-orange-700 dark:text-orange-300") # Pipeline Configuration Section with primary accent on hover with ui.expansion( "Pipeline Settings", icon="tune", value=True ).props("dense duration=600").classes( "w-full bg-white dark:bg-gray-800 rounded-lg shadow-sm " "hover:shadow-md border border-gray-200 dark:border-gray-700" ).props("dense"): ui.separator().classes("bg-gradient-to-r from-primary/30 via-primary/10 to-transparent").style("margin: 0; height: 1px;") # Max retries is now fixed at 5 (no UI control needed) # Sidebar Footer with quick actions and primary accent - fixed at bottom with ui.row().classes("w-full items-center justify-between px-4 py-2 border-t-2 border-primary/30 dark:border-primary/40").style("flex-shrink: 0; position: sticky; bottom: 0; z-index: 10; backdrop-filter: blur(10px); background: linear-gradient(0deg, rgba(255,75,75,0.03) 0%, transparent 100%);"): ui.label("v1.0.0").classes("text-xs text-gray-400") with ui.row().classes("gap-1"): ui.button(icon="help_outline", on_click=lambda: ui.notify("Documentation coming soon!", type="info")).props("flat round dense").classes("text-primary hover:bg-primary/10").tooltip("Help") ui.button(icon="bug_report", on_click=lambda: ui.notify("Report issues on GitHub", type="info")).props("flat round dense").classes("text-primary hover:bg-primary/10").tooltip("Report Bug") # --- ALL HANDLER FUNCTIONS DEFINED FIRST (will be defined after UI elements) --- def delete_project(topic_name): """Delete a project and all its files.""" import shutil async def confirm_delete(): dialog.close() try: project_path = get_project_path("output", topic_name) # Get the parent directory (the topic folder) topic_folder = os.path.dirname(project_path) # Delete the entire topic folder if os.path.exists(topic_folder): shutil.rmtree(topic_folder) ui.notify(f"Project '{topic_name}' deleted successfully", color="positive", icon="delete") # Clear chat history for this topic if topic_name in app_state["chat_histories"]: del app_state["chat_histories"][topic_name] # Clear selected topic if it was deleted if app_state.get("selected_topic") == topic_name: app_state["selected_topic"] = None app_state["current_topic_inspector"] = None # Refresh dashboard and inspector await update_dashboard() await update_inspector(None) else: ui.notify(f"Project folder not found: {topic_folder}", color="warning") except Exception as e: ui.notify(f"Error deleting project: {e}", color="negative", multi_line=True) # Create confirmation dialog with ui.dialog() as dialog, ui.card().classes("p-6"): with ui.column().classes("gap-4"): with ui.row().classes("items-center gap-3"): ui.icon("warning", size="lg").classes("text-orange-500") ui.label("Delete Project?").classes("text-xl font-bold") ui.label(f"Are you sure you want to delete '{topic_name}'?").classes("text-gray-700 dark:text-gray-300") ui.label("This will permanently delete all files, videos, and data for this project.").classes("text-sm text-gray-600 dark:text-gray-400") ui.label("This action cannot be undone.").classes("text-sm font-semibold text-red-600 dark:text-red-400") with ui.row().classes("w-full justify-end gap-2 mt-2"): ui.button("Cancel", on_click=dialog.close).props("flat") ui.button("Delete", on_click=confirm_delete).props("unelevated").classes("bg-red-600 text-white") dialog.open() async def update_dashboard(): dashboard_content.clear() if not video_generator: with dashboard_content: ui.label("Generator not initialized.").classes("text-negative") return topic_folders = get_topic_folders("output") if not topic_folders: with dashboard_content: # Empty state with ui.card().classes("w-full").style("padding: 60px; text-align: center;"): ui.icon("folder_open", size="xl").classes("text-gray-400") ui.label("No Projects Yet").classes("text-2xl font-semibold text-gray-700 dark:text-gray-300 mt-4") ui.label("Create your first educational video to get started").classes("text-gray-500 dark:text-gray-400 mt-2") ui.button("Create Project", icon="add", on_click=lambda: main_tabs.set_value("✨ Generate")).props("unelevated no-caps").classes("mt-4") return with dashboard_content: all_statuses = [ video_generator.check_theorem_status({"theorem": th}) for th in topic_folders ] # Stats Cards with ui.grid(columns=3).classes("w-full gap-4 mb-6"): # Total Projects with ui.card().classes("w-full").style("padding: 24px;"): with ui.row().classes("w-full items-center justify-between"): with ui.column().classes("gap-1"): ui.label("Total Projects").classes("text-sm font-medium text-gray-600 dark:text-gray-400") ui.label(str(len(all_statuses))).classes("text-3xl font-bold text-gray-900 dark:text-white") ui.icon("folder", size="lg").classes("text-primary opacity-80") # Scenes Progress with ui.card().classes("w-full").style("padding: 24px;"): with ui.row().classes("w-full items-center justify-between"): with ui.column().classes("gap-1"): ui.label("Scenes Rendered").classes("text-sm font-medium text-gray-600 dark:text-gray-400") total_scenes = sum(s["total_scenes"] for s in all_statuses) total_renders = sum(s["rendered_scenes"] for s in all_statuses) ui.label(f"{total_renders}/{total_scenes}").classes("text-3xl font-bold text-gray-900 dark:text-white") ui.icon("movie_filter", size="lg").classes("text-blue-500 opacity-80") # Completed Videos with ui.card().classes("w-full").style("padding: 24px;"): with ui.row().classes("w-full items-center justify-between"): with ui.column().classes("gap-1"): ui.label("Completed").classes("text-sm font-medium text-gray-600 dark:text-gray-400") total_combined = sum(1 for s in all_statuses if s["has_combined_video"]) ui.label(f"{total_combined}/{len(all_statuses)}").classes("text-3xl font-bold text-gray-900 dark:text-white") ui.icon("check_circle", size="lg").classes("text-green-500 opacity-80") # Projects List Header with View Toggle if "dashboard_view" not in app_state: app_state["dashboard_view"] = "list" view_mode = app_state["dashboard_view"] with ui.row().classes("w-full items-center justify-between mb-3"): ui.label("Your Projects").classes("text-lg font-semibold text-gray-900 dark:text-white") async def toggle_view(): app_state["dashboard_view"] = "grid" if view_mode == "list" else "list" await update_dashboard() ui.button( icon="grid_view" if view_mode == "list" else "view_list", on_click=toggle_view ).props("outline dense").tooltip("Toggle Grid/List View") # Render projects based on view mode if view_mode == "grid": # Grid View with ui.grid(columns=2).classes("w-full gap-4"): for status in sorted(all_statuses, key=lambda x: x["topic"]): with ui.card().classes("w-full hover:shadow-lg transition-shadow").style("padding: 20px;"): with ui.column().classes("w-full gap-3"): # Header with ui.row().classes("w-full items-start justify-between"): ui.icon("video_library", size="lg").classes("text-primary") with ui.row().classes("items-center gap-1"): ui.badge( "āœ“" if status["has_combined_video"] else "...", color="positive" if status["has_combined_video"] else "orange" ).props("rounded") ui.button( icon="delete", on_click=lambda s=status["topic"]: delete_project(s) ).props("flat round dense").classes("text-red-500").tooltip("Delete project") # Title ui.label(status["topic"]).classes("text-lg font-semibold text-gray-900 dark:text-white") # Stats progress_pct = int((status["rendered_scenes"] / status["total_scenes"]) * 100) if status["total_scenes"] > 0 else 0 with ui.column().classes("w-full gap-1"): ui.label(f"{status['rendered_scenes']}/{status['total_scenes']} scenes").classes("text-sm text-gray-600 dark:text-gray-400") ui.linear_progress(progress_pct / 100, show_value=False).props('rounded color="primary"').style("height: 6px;") # Action ui.button( "View Details", icon="arrow_forward", on_click=lambda s=status["topic"]: asyncio.create_task(inspect_project(s)) ).props("flat no-caps dense").classes("w-full text-primary") else: # List View for status in sorted(all_statuses, key=lambda x: x["topic"]): with ui.card().classes("w-full hover:shadow-lg transition-shadow").style("padding: 20px;"): with ui.row().classes("w-full items-center justify-between mb-3"): with ui.row().classes("items-center gap-3"): ui.icon("video_library", size="md").classes("text-primary") ui.label(status["topic"]).classes("text-lg font-semibold text-gray-900 dark:text-white") with ui.row().classes("items-center gap-2"): ui.badge( "Completed" if status["has_combined_video"] else "In Progress", color="positive" if status["has_combined_video"] else "orange" ).props("rounded") ui.button( icon="delete", on_click=lambda s=status["topic"]: delete_project(s) ).props("flat round dense").classes("text-red-500").tooltip("Delete project") # Progress bar progress_pct = int((status["rendered_scenes"] / status["total_scenes"]) * 100) if status["total_scenes"] > 0 else 0 with ui.column().classes("w-full gap-1"): with ui.row().classes("w-full items-center justify-between"): ui.label(f"{status['rendered_scenes']} of {status['total_scenes']} scenes").classes("text-sm text-gray-600 dark:text-gray-400") ui.label(f"{progress_pct}%").classes("text-sm font-medium text-gray-700 dark:text-gray-300") ui.linear_progress(progress_pct / 100, show_value=False).props('rounded color="primary"').style("height: 8px;") # Action button ui.button( "View Details", icon="arrow_forward", on_click=lambda s=status: asyncio.create_task(inspect_project(s["topic"])) ).props("flat no-caps dense").classes("mt-2 text-primary") async def update_inspector(topic): inspector_content.clear() if not topic: with inspector_content: # Empty state with better design with ui.column().classes("w-full items-center justify-center py-20"): ui.icon("video_library", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4") ui.label("No Project Selected").classes("text-2xl font-semibold text-gray-400 dark:text-gray-500") ui.label("Select a project from the dropdown above to view details").classes("text-gray-400 dark:text-gray-500 mt-2") return app_state["current_topic_inspector"] = topic with inspector_content: project_path = get_project_path("output", topic) inner_folder_name = os.path.basename(project_path) scene_dirs = sorted( [ d for d in os.listdir(project_path) if d.startswith("scene") and os.path.isdir(os.path.join(project_path, d)) ], key=lambda x: int(x.replace("scene", "")), ) # Modern Project Header Card with ui.card().classes("w-full mb-6 shadow-lg border-l-4 border-primary"): with ui.row().classes("w-full items-center justify-between p-4"): with ui.column().classes("gap-1"): ui.label(topic).classes("text-2xl font-bold text-gray-900 dark:text-white") with ui.row().classes("gap-4 items-center mt-2"): with ui.row().classes("items-center gap-1"): ui.icon("movie", size="sm").classes("text-gray-500") ui.label(f"{len(scene_dirs)} Scenes").classes("text-sm text-gray-600 dark:text-gray-400") # Check if video exists video_path = os.path.join(project_path, f"{inner_folder_name}_combined.mp4") if os.path.exists(video_path): video_size = os.path.getsize(video_path) / (1024 * 1024) # MB with ui.row().classes("items-center gap-1"): ui.icon("check_circle", size="sm").classes("text-green-500") ui.label(f"Video Ready ({video_size:.1f} MB)").classes("text-sm text-gray-600 dark:text-gray-400") else: with ui.row().classes("items-center gap-1"): ui.icon("pending", size="sm").classes("text-orange-500") ui.label("Video Pending").classes("text-sm text-gray-600 dark:text-gray-400") # Quick actions with ui.row().classes("gap-2"): is_editing = app_state.get("inspector_edit_mode", False) def handle_approve(): # Turn off edit mode app_state["inspector_edit_mode"] = False # Switch to generate tab main_tabs.set_value("✨ Generate") # Pre-fill inputs to help user resume topic_input.value = topic # Disable review mode so it runs full pipeline review_checkbox.value = False ui.notify(f"Script approved for '{topic}'! Click 'Generate Video' to proceed with rendering.", color="positive", icon="check_circle", timeout=5000) # Edit Toggle ui.button( icon="edit_off" if is_editing else "edit", on_click=lambda: [app_state.update({"inspector_edit_mode": not is_editing}), asyncio.create_task(update_inspector(topic))] ).props(f"flat round color={'warning' if is_editing else 'primary'}").tooltip("Toggle Edit Mode" if not is_editing else "Exit Edit Mode") # Approve & Resume (if video pending) if not os.path.exists(video_path): ui.button( "Approve & Resume", icon="play_arrow", on_click=handle_approve ).props("unelevated color=green no-caps").tooltip("Approve plans and proceed to rendering") ui.button(icon="folder_open", on_click=lambda: os.startfile(project_path)).props("flat round").tooltip("Open in Explorer") ui.button(icon="refresh", on_click=lambda: asyncio.create_task(update_inspector(topic))).props("flat round").tooltip("Refresh") with ui.tabs().classes("w-full bg-transparent").props("dense align=left") as inspector_tabs: t_video = ui.tab("šŸŽ¬ Player").props("no-caps") t_plan = ui.tab("šŸ“– Master Plan").props("no-caps") scene_tabs_list = [ ui.tab(f"šŸŽžļø Scene {i+1}").props("no-caps") for i in range(len(scene_dirs)) ] with ui.tab_panels(inspector_tabs, value=t_video).classes( "w-full bg-transparent mt-6" ): with ui.tab_panel(t_video): video_local_path = os.path.join( project_path, f"{inner_folder_name}_combined.mp4" ) tcm_path = os.path.join( project_path, f"{inner_folder_name}_combined_tcm.json" ) srt_local_path = os.path.join( project_path, f"{inner_folder_name}_combined.srt" ) vtt_local_path = os.path.join( project_path, f"{inner_folder_name}_combined.vtt" ) # Convert SRT to VTT if needed if os.path.exists(srt_local_path) and not os.path.exists(vtt_local_path): with open(srt_local_path, 'r', encoding='utf-8') as f: srt_content = f.read() vtt_content = srt_to_vtt(srt_content) with open(vtt_local_path, 'w', encoding='utf-8') as f: f.write(vtt_content) if os.path.exists(video_local_path) and os.path.exists(tcm_path): video_url = ( f"/{os.path.relpath(video_local_path, '.')}".replace( "\\", "/" ) ) # Use VTT file (better browser support) subtitle_url = None if os.path.exists(vtt_local_path): subtitle_url = f"/{os.path.relpath(vtt_local_path, '.')}".replace("\\", "/") with open(tcm_path, "r", encoding="utf-8") as f: tcm_data = json.load(f) # Helper function to render Streamlit-style chat messages def render_chat_message(container, content, is_user=False): """Render a Streamlit-style chat message""" with container: role = "user" if is_user else "assistant" avatar_text = "šŸ‘¤" if is_user else "šŸ¤–" with ui.element('div').classes(f"stchat-message {role}"): # Avatar with ui.element('div').classes("stchat-avatar"): ui.label(avatar_text).classes("text-xl") # Content with ui.element('div').classes("stchat-content"): ui.markdown(content) with ui.row().classes( "w-full no-wrap grid grid-cols-1 lg:grid-cols-5 gap-6" ): with ui.column().classes("lg:col-span-3 gap-4").style("height: calc(100vh - 200px); min-height: 500px; max-height: 700px; display: flex; flex-direction: column;"): # Video player card with ui.card().classes("w-full shadow-2xl overflow-hidden p-0 border-0").style("flex-shrink: 0;"): # Use HTML video element with SRT subtitles video_player = None # Initialize for later reference if subtitle_url and os.path.exists(vtt_local_path): # Create a dummy element to attach events to video_player = ui.element('div').classes('hidden') # Simple video ID video_id = "video_player" ui.html( f''' ''', sanitize=False ) # Enable subtitles automatically def enable_subs(): ui.run_javascript(f''' const video = document.getElementById('{video_id}'); if (video && video.textTracks.length > 0) {{ video.textTracks[0].mode = 'showing'; }} ''') # Try multiple times to ensure subtitles are enabled ui.timer(0.5, enable_subs, once=True) ui.timer(1.5, enable_subs, once=True) else: video_player = ui.video(video_url).classes( "w-full" ).style("display: block;") video_player.props('id="video_player"') # Quiz Section - matching AI Tutor styling quiz_state = { "questions": [], "current_index": 0, "show_answer": False, "quiz_started": False } with ui.card().classes("w-full shadow-2xl border-0").style("flex: 1; min-height: 0; display: flex; flex-direction: column; overflow: hidden;"): # Quiz header - matching AI Tutor header with ui.row().classes("w-full items-center justify-between px-5 py-3 border-b dark:border-gray-700").style("flex-shrink: 0; background: linear-gradient(135deg, rgba(255, 75, 75, 0.05) 0%, rgba(255, 107, 107, 0.02) 100%);"): with ui.row().classes("items-center gap-3"): with ui.avatar(size="md").classes("shadow-md").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%);"): ui.icon("quiz", size="sm").classes("text-white") with ui.column().classes("gap-0"): ui.label("Interactive Quiz").classes("text-lg font-bold text-gray-900 dark:text-white") ui.label("Test your understanding").classes("text-xs text-gray-500 dark:text-gray-400") # Quiz content container - matching AI Tutor scrollable area with ui.column().style("flex: 1; min-height: 0; overflow-y: auto; overflow-x: hidden; width: 100% !important; box-sizing: border-box;").classes("p-4 gap-3"): # Store quiz settings at this scope quiz_settings = {"num_questions": 5, "question_type": "Multiple Choice", "difficulty": "Medium"} # Create a container for dynamic content (this will be cleared) quiz_content_container = ui.column().classes("w-full gap-3") # Define helper functions at the proper scope def show_question(): """Display the current question""" try: quiz_content_container.clear() if quiz_state["current_index"] >= len(quiz_state["questions"]): # Quiz completed - celebration card with quiz_content_container: with ui.card().classes("w-full border-0 shadow-2xl").style("background: linear-gradient(135deg, #d1fae5 0%, #a7f3d0 100%); border-left: 4px solid #10b981; padding: 40px;"): with ui.column().classes("items-center gap-4 w-full"): with ui.avatar(size="xl").classes("shadow-lg").style("background: linear-gradient(135deg, #10b981 0%, #059669 100%);"): ui.icon("emoji_events", size="lg").classes("text-white") ui.label("Quiz Completed!").classes("text-2xl font-bold text-green-900") ui.label("Great job reviewing the content!").classes("text-base text-green-800") ui.separator().classes("bg-green-300 w-24 my-2") ui.label(f"You answered {len(quiz_state['questions'])} questions").classes("text-sm text-green-700") ui.button("Start New Quiz", on_click=lambda: ( quiz_state.update({"quiz_started": False, "current_index": 0, "questions": []}), show_quiz_setup() ), icon="refresh").props("unelevated no-caps").classes("shadow-md mt-3").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 20px; font-size: 0.875rem;") return q = quiz_state["questions"][quiz_state["current_index"]] with quiz_content_container: # Progress indicator - modern design progress_percent = int(((quiz_state['current_index'] + 1) / len(quiz_state['questions'])) * 100) with ui.card().classes("w-full border-0 shadow-sm mb-4").style("background: linear-gradient(135deg, rgba(255, 75, 75, 0.05) 0%, rgba(255, 107, 107, 0.02) 100%); padding: 16px;"): with ui.row().classes("w-full items-center justify-between"): with ui.row().classes("items-center gap-2"): ui.icon("quiz", size="sm").classes("text-primary") ui.label(f"Question {quiz_state['current_index'] + 1} of {len(quiz_state['questions'])}").classes("text-sm font-bold text-gray-700 dark:text-gray-300") with ui.row().classes("items-center gap-2"): ui.label(f"{progress_percent}%").classes("text-xs font-semibold text-primary") ui.linear_progress( value=(quiz_state['current_index'] + 1) / len(quiz_state['questions']), show_value=False ).props("color=primary size=8px").classes("w-32") # Question card - modern professional style with ui.card().classes("w-full border-0 shadow-xl").style("background: #ffffff; border: 1px solid #e6e9ef; padding: 24px;"): with ui.column().classes("gap-4 w-full"): # Question text ui.label(q["question"]).classes("text-base font-semibold text-gray-900 dark:text-white mb-2") if not quiz_state["show_answer"]: # Show options - matching AI Tutor suggestion button style ui.separator().classes("my-2") for idx, option in enumerate(q.get("options", [])): with ui.button( on_click=lambda opt=option: handle_answer(opt) ).props("outline no-caps").classes("w-full hover:bg-primary/5 mb-2").style("justify-content: flex-start; padding: 12px 16px; border-radius: 8px; border-color: rgba(255, 75, 75, 0.2);"): with ui.row().classes("items-center gap-3 w-full").style("flex-wrap: nowrap;"): with ui.avatar(size="sm").classes("text-xs font-bold").style(f"background: linear-gradient(135deg, rgba(255, 75, 75, 0.1) 0%, rgba(255, 107, 107, 0.05) 100%); color: #FF4B4B;"): ui.label(chr(65 + idx)) # A, B, C, D ui.label(option).classes("text-sm text-gray-700 dark:text-gray-300 flex-grow text-left") else: # Show answer and explanation - modern professional style ui.separator().classes("my-3") with ui.card().classes("w-full border-0 shadow-lg").style("background: linear-gradient(135deg, #d1fae5 0%, #a7f3d0 100%); border-left: 4px solid #10b981; padding: 20px;"): with ui.column().classes("gap-3 w-full"): with ui.row().classes("items-center gap-2 mb-2"): ui.icon("check_circle", size="md").classes("text-green-700") ui.label("Correct Answer").classes("font-bold text-base text-green-900") ui.label(q["correct_answer"]).classes("text-base font-semibold text-green-800 ml-8") if q.get("explanation"): ui.separator().classes("bg-green-300 my-2") with ui.row().classes("items-start gap-2"): ui.icon("lightbulb", size="sm").classes("text-green-700 flex-shrink-0").style("margin-top: 2px;") with ui.column().classes("gap-1 flex-grow"): ui.label("Explanation").classes("font-bold text-sm text-green-800") ui.label(q["explanation"]).classes("text-sm text-green-700") # Navigation buttons - modern style with ui.row().classes("w-full justify-end gap-2 mt-3"): ui.button( "Next Question" if quiz_state["current_index"] < len(quiz_state["questions"]) - 1 else "Finish Quiz", on_click=lambda: ( quiz_state.update({"current_index": quiz_state["current_index"] + 1, "show_answer": False}), show_question() ), icon="arrow_forward" ).props("unelevated no-caps").classes("shadow-md").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 20px; font-size: 0.875rem;") except Exception as e: print(f"ERROR in show_question: {e}") import traceback traceback.print_exc() def handle_answer(selected): """Handle user's answer selection""" quiz_state["show_answer"] = True show_question() async def generate_quiz_async(): """Generate quiz questions using LLM""" try: quiz_content_container.clear() with quiz_content_container: with ui.card().classes("border-0 shadow-lg").style("width: 100% !important; height: 100%; min-height: 300px; display: flex; align-items: center; justify-content: center; background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%);"): with ui.column().classes("items-center justify-center gap-3"): ui.spinner(size="lg", color="primary") ui.label("Generating quiz questions...").classes("text-sm font-semibold text-gray-700 dark:text-gray-300") ui.label("This may take a moment").classes("text-xs text-gray-500 dark:text-gray-400") # Build context from TCM context_parts = [f"Video Topic: {topic}\n"] if tcm_data: context_parts.append("Video Content Summary:") for entry in tcm_data[:10]: # Use first 10 entries for context concept = entry.get("conceptName", "") narration = entry.get("narrationText", "") if concept and narration: context_parts.append(f"- {concept}: {narration[:200]}") context = "\n".join(context_parts) prompt = f"""Based on this educational video content, generate {quiz_settings["num_questions"]} {quiz_settings["question_type"]} quiz questions at {quiz_settings["difficulty"]} difficulty level. {context} Generate questions in this EXACT JSON format: [ {{ "question": "Question text here?", "type": "multiple_choice", "options": ["Option A", "Option B", "Option C", "Option D"], "correct_answer": "Option A", "explanation": "Brief explanation of why this is correct" }} ] For True/False questions, use options: ["True", "False"] Make questions relevant to the video content and educational.""" quiz_llm = LiteLLMWrapper(model_name=app_state["planner_model_name"]) response = await asyncio.get_event_loop().run_in_executor( None, lambda: quiz_llm([{"type": "text", "content": prompt}]) ) # Parse JSON response import json import re json_match = re.search(r'\[.*\]', response, re.DOTALL) if json_match: quiz_state["questions"] = json.loads(json_match.group()) quiz_state["current_index"] = 0 quiz_state["show_answer"] = False quiz_state["quiz_started"] = True show_question() else: raise ValueError("Could not parse quiz questions") except Exception as e: quiz_content_container.clear() with quiz_content_container: with ui.card().classes("w-full border-0 shadow-lg").style("background: linear-gradient(135deg, #fee2e2 0%, #fecaca 100%); border-left: 4px solid #ef4444; padding: 24px;"): with ui.column().classes("gap-3 w-full"): with ui.row().classes("items-center gap-2"): ui.icon("error_outline", size="md").classes("text-red-600") ui.label("Error Generating Quiz").classes("text-lg font-bold text-red-900") ui.label(f"{str(e)}").classes("text-sm text-red-700 ml-8") ui.button("Try Again", on_click=show_quiz_setup, icon="refresh").props("unelevated no-caps").classes("shadow-md mt-2").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 16px; font-size: 0.875rem;") def show_quiz_setup(): """Display the initial quiz configuration form""" try: quiz_content_container.clear() with quiz_content_container: # Initial quiz setup - matching AI Tutor welcome card with ui.card().classes("w-full border-0 shadow-lg").style("background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%); padding: 28px; border-left: 4px solid #FF4B4B;"): # Instructions - Row 1 with ui.row().classes("items-center gap-2 mb-3"): ui.icon("info_outline", size="sm").classes("text-blue-500 flex-shrink-0") ui.label("Configure your quiz settings below and click Generate Quiz to start:").classes("text-xs text-gray-700 dark:text-gray-300 flex-grow") # Settings - Row 2 with ui.row().classes("gap-3 mb-3").style("width: 100% !important; display: flex; box-sizing: border-box;"): with ui.column().style("flex: 1; min-width: 0;"): with ui.row().classes("items-center gap-2 mb-1"): ui.icon("numbers", size="sm").classes("text-primary") ui.label("Questions").classes("text-sm font-semibold text-gray-700 dark:text-gray-300") num_questions = ui.number(value=quiz_settings["num_questions"], min=1, max=10, step=1, on_change=lambda e: quiz_settings.update({"num_questions": int(e.value)})).props("outlined dense").style("width: 100%; border-radius: 8px;") with ui.column().style("flex: 1; min-width: 0;"): with ui.row().classes("items-center gap-2 mb-1"): ui.icon("category", size="sm").classes("text-primary") ui.label("Type").classes("text-sm font-semibold text-gray-700 dark:text-gray-300") question_type = ui.select( options=["Multiple Choice", "True/False", "Mixed"], value=quiz_settings["question_type"], on_change=lambda e: quiz_settings.update({"question_type": e.value}) ).props("outlined dense").style("width: 100%; border-radius: 8px;") with ui.column().style("flex: 1; min-width: 0;"): with ui.row().classes("items-center gap-2 mb-1"): ui.icon("speed", size="sm").classes("text-primary") ui.label("Difficulty").classes("text-sm font-semibold text-gray-700 dark:text-gray-300") difficulty = ui.select( options=["Easy", "Medium", "Hard"], value=quiz_settings["difficulty"], on_change=lambda e: quiz_settings.update({"difficulty": e.value}) ).props("outlined dense").style("width: 100%; border-radius: 8px;") # Separator ui.separator().classes("my-2") # Generate Button ui.button( "Generate Quiz", on_click=generate_quiz_async, icon="auto_awesome" ).props("unelevated no-caps").classes("shadow-md mt-2 w-full").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); border-radius: 8px; padding: 10px 16px; font-size: 0.875rem;") except Exception as e: print(f"ERROR in show_quiz_setup: {e}") import traceback traceback.print_exc() # Show initial setup show_quiz_setup() with ui.column().classes("lg:col-span-2"): # Add Streamlit-style chat CSS ui.add_head_html(""" """) with ui.card().classes("w-full shadow-2xl border-0").style("height: calc(100vh - 200px); min-height: 500px; max-height: 700px; display: flex; flex-direction: column; overflow: hidden;"): # ---------------- HEADER ---------------- with ui.row().classes( "w-full items-center justify-between px-5 py-3 border-b dark:border-gray-700" ).style("flex-shrink: 0; background: linear-gradient(135deg, rgba(255, 75, 75, 0.05) 0%, rgba(255, 107, 107, 0.02) 100%);"): with ui.row().classes("items-center gap-3"): with ui.avatar(size="md").classes("shadow-md").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%);"): ui.icon("psychology", size="sm").classes("text-white") with ui.column().classes("gap-0"): ui.label("AI Tutor").classes( "text-lg font-bold text-gray-900 dark:text-white" ) ui.label("Your intelligent learning companion").classes( "text-xs text-gray-500 dark:text-gray-400" ) def clear_chat(): chat_history.clear() chat_container.clear() # Recreate the welcome card with suggestions with chat_container: with ui.card().classes( "w-full border-0 shadow-lg" ).style("background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%); padding: 28px; border-left: 4px solid #FF4B4B;"): # Instructions with ui.row().classes("items-start gap-2 mb-3").style("flex-wrap: nowrap;"): ui.icon("info_outline", size="sm").classes("text-blue-500 flex-shrink-0").style("margin-top: 2px;") ui.label( "Pause the video at any moment to ask questions. Try these suggestions:" ).classes("text-xs text-gray-700 dark:text-gray-300 flex-grow") # Suggestion buttons suggestions = [ ("help_outline", "Explain this in simpler terms"), ("lightbulb", "Give me a real-world example"), ("list", "Break down this step-by-step"), ("star", "Why is this concept important?"), ] for icon, suggestion in suggestions: with ui.button( on_click=lambda s=suggestion: ( setattr(chat_input, 'value', s), chat_input.run_method("focus") ) ).props("outline no-caps").classes( "w-full mb-2 hover:bg-primary/5" ).style("justify-content: flex-start; padding: 12px 16px; border-radius: 8px; border-color: rgba(255, 75, 75, 0.2);"): with ui.row().classes("items-center gap-3").style("flex-wrap: nowrap;"): ui.icon(icon, size="sm").classes("text-primary flex-shrink-0") ui.label(suggestion).classes("text-sm text-gray-700 dark:text-gray-300 flex-grow") ui.notify("Chat history cleared", type="positive", icon="check_circle") ui.button( icon="refresh", on_click=clear_chat ).props("flat round dense").classes("text-gray-600 dark:text-gray-400 hover:text-primary hover:bg-primary/10").tooltip("Clear chat history") # ---------------- CHAT CONTAINER ---------------- with ui.column().style("flex: 1; min-height: 0; position: relative; overflow: hidden;"): chat_container = ui.column().classes( "h-full p-4 overflow-y-auto overflow-x-hidden gap-3" ).props('id="chat_container"').style("scroll-behavior: smooth;") # Scroll to bottom button (initially hidden) scroll_btn = ui.button( icon="arrow_downward", on_click=lambda: ui.run_javascript( 'document.getElementById("chat_container").scrollTop = ' 'document.getElementById("chat_container").scrollHeight' ) ).props("fab-mini color=primary").classes( "absolute bottom-4 right-4 shadow-lg" ).style("display: none;") scroll_btn.tooltip("Scroll to bottom") # Show/hide scroll button based on scroll position ui.add_body_html(""" """) # ------------------------------------------------------ # LOAD HISTORY # ------------------------------------------------------ chat_history = app_state["chat_histories"].setdefault( topic, [] ) with chat_container: if not chat_history: # Professional welcome card with ui.card().classes( "w-full border-0 shadow-lg" ).style("background: linear-gradient(135deg, #ffffff 0%, #fef2f2 100%); padding: 28px; border-left: 4px solid #FF4B4B;"): # Instructions with ui.row().classes("items-start gap-2 mb-3").style("flex-wrap: nowrap;"): ui.icon("info_outline", size="sm").classes("text-blue-500 flex-shrink-0").style("margin-top: 2px;") ui.label( "Pause the video at any moment to ask questions. Try these suggestions:" ).classes("text-xs text-gray-700 dark:text-gray-300 flex-grow") # Suggestion buttons suggestions = [ ("help_outline", "Explain this in simpler terms"), ("lightbulb", "Give me a real-world example"), ("list", "Break down this step-by-step"), ("star", "Why is this concept important?"), ] for icon, suggestion in suggestions: with ui.button( on_click=lambda s=suggestion: ( setattr(chat_input, 'value', s), chat_input.run_method("focus") ) ).props("outline no-caps").classes( "w-full mb-2 hover:bg-primary/5" ).style("justify-content: flex-start; padding: 12px 16px; border-radius: 8px; border-color: rgba(255, 75, 75, 0.2);"): with ui.row().classes("items-center gap-3").style("flex-wrap: nowrap;"): ui.icon(icon, size="sm").classes("text-primary flex-shrink-0") ui.label(suggestion).classes("text-sm text-gray-700 dark:text-gray-300 flex-grow") else: # Load existing messages with custom styling for msg in chat_history: is_user = msg["role"] == "user" render_chat_message(chat_container, msg["content"], is_user) # ------------------------------------------------------ # INPUT BAR # ------------------------------------------------------ with ui.column().classes("w-full").style("flex-shrink: 0; background: linear-gradient(to top, rgba(255,255,255,0.98), rgba(255,255,255,0.95)); backdrop-filter: blur(10px); border-top: 1px solid rgba(0,0,0,0.08); padding-top: 16px;"): # Input row with ui.row().classes("w-full items-end gap-3 px-5 pb-3"): with ui.card().classes("flex-grow shadow-sm border-0").style("background: white; padding: 8px 16px; border-radius: 20px; border: 2px solid rgba(255, 75, 75, 0.15); overflow: hidden;"): chat_input = ( ui.textarea(placeholder="Ask me anything about the video...") .props("borderless dense autogrow") .classes("w-full") .style("max-height: 100px; min-height: 40px; font-size: 14px; overflow-y: auto; word-wrap: break-word; overflow-wrap: break-word;") ) # Send button send_button = ui.button( icon="send" ).props("round unelevated").classes("shadow-lg").style( "flex-shrink: 0; width: 48px; height: 48px; background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%);" ) send_button.tooltip("Send message") # JavaScript to handle Enter key properly ui.add_body_html(""" """) # Auto-focus input on load ui.timer(0.1, lambda: chat_input.run_method("focus"), once=True) # ------------------------------------------------------ # SEND MESSAGE FUNCTION (define before use) # ------------------------------------------------------ async def send_message(): # Retrieve text from input box question = chat_input.value or "" if not question.strip(): return # Clear input and disable controls chat_input.value = "" chat_input.disable() send_button.disable() try: # Store in history chat_history.append( {"role": "user", "content": question} ) # Display user message render_chat_message(chat_container, question, is_user=True) # Auto-scroll to bottom await ui.run_javascript( 'document.getElementById("chat_container").scrollTop = ' 'document.getElementById("chat_container").scrollHeight' ) # Show Streamlit-style typing indicator with chat_container: with ui.element('div').classes("stchat-message assistant") as typing_indicator: with ui.element('div').classes("stchat-avatar"): ui.label("šŸ¤–").classes("text-xl") with ui.element('div').classes("stchat-content"): with ui.row().classes("items-center gap-2"): ui.spinner(size="sm", color="primary") ui.label("Thinking...").classes("text-sm") # Auto-scroll to show typing indicator await ui.run_javascript( 'document.getElementById("chat_container").scrollTop = ' 'document.getElementById("chat_container").scrollHeight' ) # Build LLM prompt with rich TCM context current_entry = app_state["current_tcm_entry"] or {} timestamp = app_state.get("latest_pause_time", 0.0) # Load system prompt from file try: with open("prompts/ai_tutor_system_prompt.txt", "r", encoding="utf-8") as f: system_prompt_template = f.read() # Replace placeholders system_prompt = system_prompt_template.format( topic=topic, time=f"{timestamp:.1f}" ) except FileNotFoundError: # Fallback if file doesn't exist system_prompt = ( f"You are an AI tutor helping students understand educational video content about {topic}. " f"The student paused at {timestamp:.1f}s. " "Provide clear, concise explanations based on the context provided." ) # Build rich context from TCM context_parts = [f"=== VIDEO: {topic} ===\n"] if current_entry: concept = current_entry.get("conceptName", "Unknown") narration = current_entry.get("narrationText", "") visual = current_entry.get("visualDescription", "") concept_id = current_entry.get("conceptId", "") context_parts.append(f"STUDENT PAUSED AT: {timestamp:.1f} seconds") context_parts.append(f"\n--- WHAT'S HAPPENING RIGHT NOW ---") context_parts.append(f"Concept Being Explained: {concept}") if narration: context_parts.append(f"\nNarration (what's being said):\n\"{narration}\"") if visual: context_parts.append(f"\nVisual (what's on screen):\n{visual}") if concept_id: context_parts.append(f"\nConcept ID: {concept_id}") # Extract scene number and load implementation context scene_num = extract_scene_number_from_concept_id(concept_id) if scene_num: try: scene_context = get_scene_implementation_context( "output", topic, scene_num ) context_parts.append(f"\n--- SCENE {scene_num} IMPLEMENTATION DETAILS ---") # Add visual storyboard plan if scene_context.get("visual_plan"): context_parts.append(f"\nVisual Storyboard Plan:") context_parts.append(scene_context["visual_plan"][:800] + "..." if len(scene_context["visual_plan"]) > 800 else scene_context["visual_plan"]) # Add technical implementation plan if scene_context.get("technical_plan"): context_parts.append(f"\nTechnical Implementation:") context_parts.append(scene_context["technical_plan"][:800] + "..." if len(scene_context["technical_plan"]) > 800 else scene_context["technical_plan"]) # Add animation narration plan if scene_context.get("animation_narration"): context_parts.append(f"\nAnimation & Narration Plan:") context_parts.append(scene_context["animation_narration"][:800] + "..." if len(scene_context["animation_narration"]) > 800 else scene_context["animation_narration"]) # Add code implementation (truncated for context window) if scene_context.get("code"): context_parts.append(f"\nMaim Code Implementation:") code_preview = scene_context["code"][:1500] + "..." if len(scene_context["code"]) > 1500 else scene_context["code"] context_parts.append(f"```python\n{code_preview}\n```") except Exception as e: # Silently fail if scene context can't be loaded print(f"Could not load scene context: {e}") else: context_parts.append("(Video is currently playing - student needs to pause to get specific context)") # Add surrounding context for better understanding current_idx = next((i for i, e in enumerate(tcm_data) if e == current_entry), -1) if current_idx > 0: prev_entry = tcm_data[current_idx - 1] context_parts.append(f"\n--- WHAT CAME BEFORE ---") context_parts.append(f"Previous Concept: {prev_entry.get('conceptName', 'Unknown')}") prev_narration = prev_entry.get('narrationText', '') if len(prev_narration) > 150: prev_narration = prev_narration[:150] + "..." context_parts.append(f"Previous Narration: \"{prev_narration}\"") if current_idx < len(tcm_data) - 1: next_entry = tcm_data[current_idx + 1] context_parts.append(f"\n--- WHAT COMES NEXT ---") context_parts.append(f"Next Concept: {next_entry.get('conceptName', 'Unknown')}") next_narration = next_entry.get('narrationText', '') if len(next_narration) > 150: next_narration = next_narration[:150] + "..." context_parts.append(f"Next Narration: \"{next_narration}\"") else: context_parts.append("(Video is currently playing - student needs to pause to get specific context)") context_info = "\n".join(context_parts) # Build messages in the format expected by LiteLLMWrapper messages_for_llm = [ {"type": "system", "content": system_prompt}, {"type": "text", "content": f"VIDEO CONTEXT:\n{context_info}"}, {"type": "text", "content": f"STUDENT QUESTION: {question}"} ] # Call model wrapper chat_llm = LiteLLMWrapper( model_name=app_state["planner_model_name"] ) # Run in executor to avoid blocking loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: chat_llm(messages_for_llm) ) # Remove typing indicator chat_container.remove(typing_indicator) # Add assistant response to history chat_history.append( {"role": "assistant", "content": response} ) # Display assistant message render_chat_message(chat_container, response, is_user=False) # Auto-scroll to bottom await ui.run_javascript( 'document.getElementById("chat_container").scrollTop = ' 'document.getElementById("chat_container").scrollHeight' ) except Exception as e: # Remove typing indicator if it exists try: chat_container.remove(typing_indicator) except: pass # Show error message error_msg = str(e) if str(e) else repr(e) with chat_container: with ui.element('div').classes("stchat-message assistant").style("background-color: #fee2e2; border-color: #fca5a5;"): with ui.element('div').classes("stchat-avatar").style("background: linear-gradient(135deg, #ef4444 0%, #dc2626 100%);"): ui.label("āš ļø").classes("text-xl") with ui.element('div').classes("stchat-content").style("color: #991b1b;"): ui.markdown(f"**Error:** Unable to get response.\n\n`{error_msg}`") ui.notify( f"Failed to get AI response: {error_msg}", type="negative" ) finally: # Re-enable controls chat_input.enable() send_button.enable() chat_input.run_method("focus") # Connect button to function send_button.on_click(send_message) # ------------------------------------------------------ # UPDATE VIDEO CONTEXT # ------------------------------------------------------ async def update_context( e: events.GenericEventArguments, ): ts = await ui.run_javascript( "document.getElementById('video_player')?.currentTime || 0" ) try: ts = float(ts) except Exception: ts = 0.0 app_state["latest_pause_time"] = ts def valid_entry(entry): try: return ( float(entry["startTime"]) <= ts < float(entry["endTime"]) ) except Exception: return False found_entry = next( (x for x in tcm_data if valid_entry(x)), None ) app_state["current_tcm_entry"] = found_entry if found_entry: concept_name = found_entry.get('conceptName', 'Unknown') narration_preview = found_entry.get('narrationText', '')[:80] if len(found_entry.get('narrationText', '')) > 80: narration_preview += "..." context_label.set_content( f"āøļø **Current Topic:** {concept_name} (`{ts:.1f}s`)\n\n" f"_{narration_preview}_" ) context_label.classes( "p-3 text-sm bg-green-50 dark:bg-green-900/20 " "border-l-4 border-green-500 rounded-r", remove="bg-blue-50 dark:bg-gray-800 border-blue-500" ) else: context_label.set_content( "ā–¶ļø **Pause the video** to ask a question about what you're watching." ) context_label.classes( "p-3 text-sm bg-blue-50 dark:bg-gray-800 " "border-l-4 border-blue-500 rounded-r", remove="bg-green-50 dark:bg-green-900/20 border-green-500" ) video_player.on("seeked", update_context) video_player.on("pause", update_context) video_player.on("play", lambda e: context_label.set_content( "ā–¶ļø **Video Playing** - Pause to ask questions." )) else: # Better empty state for missing video with ui.card().classes("w-full p-12 text-center"): ui.icon("video_camera_back", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4") ui.label("Video Not Generated Yet").classes("text-xl font-semibold text-gray-600 dark:text-gray-400 mb-2") ui.label( "Use the Utilities tab to continue and complete the video generation for this project." ).classes("text-gray-500 dark:text-gray-500 mb-4") ui.button("Go to Utilities", icon="build", on_click=lambda: main_tabs.set_value("šŸ”§ Utilities")).props("unelevated no-caps").classes("mt-2") with ui.tab_panel(t_plan): outline_path = os.path.join( project_path, f"{inner_folder_name}_scene_outline.txt" ) if os.path.exists(outline_path): # Parse the scene outline outline_content = safe_read_file(outline_path, clean=False) # Extract scenes using regex import re scene_pattern = r'(.*?)' scenes = re.findall(scene_pattern, outline_content, re.DOTALL) if scenes: # Header card with ui.card().classes("w-full shadow-lg border-l-4 border-primary mb-6"): with ui.row().classes("w-full items-center gap-3 p-6"): ui.icon("movie_creation", size="lg").classes("text-primary") with ui.column().classes("gap-1"): ui.label(topic).classes("text-2xl font-bold text-gray-900 dark:text-white") ui.label(f"{len(scenes)} scenes planned").classes("text-sm text-gray-600 dark:text-gray-400") # Scene cards for scene_num, scene_content in scenes: # Extract scene details title_match = re.search(r'Scene Title:\s*(.+?)(?:\n|$)', scene_content) purpose_match = re.search(r'Scene Purpose:\s*(.+?)(?:\n|Scene Description)', scene_content, re.DOTALL) desc_match = re.search(r'Scene Description:\s*(.+?)(?:\n|Scene Layout)', scene_content, re.DOTALL) title = title_match.group(1).strip() if title_match else f"Scene {scene_num}" purpose = purpose_match.group(1).strip() if purpose_match else "" description = desc_match.group(1).strip() if desc_match else "" with ui.card().classes("w-full shadow-md hover:shadow-lg transition-shadow mb-4"): # Scene header with ui.row().classes("w-full items-start gap-3 p-4 bg-gradient-to-r from-primary/5 to-transparent border-b dark:border-gray-700"): with ui.column().classes("gap-2 flex-grow"): # Title with scene number with ui.row().classes("items-center gap-2"): ui.label(f"Scene {scene_num}:").classes("text-lg font-bold text-primary") ui.label(title).classes("text-lg font-semibold text-gray-900 dark:text-white") # Purpose with inline icon if purpose: with ui.row().classes("items-center gap-1.5").style("flex-wrap: nowrap;"): ui.icon("lightbulb", size="xs").classes("text-amber-500 flex-shrink-0") ui.label(purpose).classes("text-sm text-gray-600 dark:text-gray-400") # Scene content with inline icon if description: with ui.row().classes("items-start gap-2 p-4").style("flex-wrap: nowrap;"): ui.icon("description", size="sm").classes("text-blue-500 flex-shrink-0").style("margin-top: 2px;") ui.label(description).classes("text-sm text-gray-700 dark:text-gray-300 leading-relaxed flex-grow") else: # Fallback to showing formatted text if parsing fails with ui.card().classes("w-full shadow-lg"): with ui.row().classes("w-full items-center gap-2 p-4 border-b dark:border-gray-700 bg-gray-50 dark:bg-gray-800"): ui.icon("description", size="sm").classes("text-primary") ui.label("Video Plan").classes("text-lg font-semibold") with ui.column().classes("p-6"): ui.markdown(outline_content).classes("prose dark:prose-invert max-w-none") else: with ui.card().classes("w-full p-12 text-center"): ui.icon("description", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4") ui.label("Plan Not Available").classes("text-xl font-semibold text-gray-600 dark:text-gray-400") ui.label("The video plan hasn't been generated yet").classes("text-sm text-gray-500 dark:text-gray-500 mt-2") for i, scene_dir_name in enumerate(scene_dirs): with ui.tab_panel(scene_tabs_list[i]): scene_num = i + 1 scene_path = os.path.join(project_path, scene_dir_name) # Scene header with ui.card().classes("w-full mb-4 shadow-md border-l-4 border-secondary"): with ui.row().classes("w-full items-center p-3"): ui.icon("movie_filter", size="md").classes("text-secondary") ui.label(f"Scene {scene_num}").classes("text-xl font-bold ml-2") ui.space() with ui.tabs().props("dense align=left").classes("w-full") as asset_tabs: at_vision = ui.tab("šŸ—ŗļø Vision").props("no-caps") at_tech = ui.tab("šŸ› ļø Technical").props("no-caps") at_narr = ui.tab("šŸŽžļø Animation").props("no-caps") at_code = ui.tab("šŸ’» Code").props("no-caps") at_vid = ui.tab("šŸŽ¬ Output").props("no-caps") with ui.tab_panels(asset_tabs, value=at_vision).classes( "w-full bg-transparent mt-4" ): def display_plan(file_pattern, default_text, icon_name="description", is_editing=False): files = glob.glob( os.path.join(scene_path, file_pattern) ) if files: file_path = files[0] plan_text = safe_read_file( file_path, clean=True ) # remove XML tags etc. for display, but for editing we might want raw? # Actually safe_read_file with clean=True removes the XML tags which makes it nicer to edit. # And since the planner parses XML, we should be careful. # But safe_read_file removes the outer tags. # If we save it back without tags, the parser might fail if it expects them? # The parser in `check_status` uses `extract_xml`. # But `VideoGenerator` re-reads these files? # `VideoGenerator` reads the plan files during code generation? # `CodeGenerator` reads implementation plans. # It likely reads the raw file. # So if we strip tags, we might break it. # Let's read RAW for editing to be safe. if is_editing: raw_text = safe_read_file(file_path, clean=False) if not plan_text.strip() and not is_editing: with ui.card().classes("w-full p-8 text-center"): ui.icon(icon_name, size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2") ui.label(f"{default_text} is empty").classes("text-gray-500") return if is_editing: with ui.card().classes("w-full shadow-md"): with ui.column().classes("p-4 w-full"): editor = ui.textarea(value=raw_text).props("outlined autogrow").classes("w-full font-mono text-sm") editor.style("min-height: 300px;") def save_plan(path=file_path, content_getter=lambda: editor.value): try: with open(path, "w", encoding="utf-8") as f: f.write(content_getter()) ui.notify("Plan saved successfully!", color="positive", icon="save") except Exception as e: ui.notify(f"Error saving plan: {e}", color="negative") with ui.row().classes("w-full justify-end mt-2"): ui.button("Save Changes", icon="save", on_click=lambda: save_plan()).props("unelevated color=primary") else: # Optional cleanup for readability plan_text = re.sub( r"#+\s*Scene.*", "", plan_text ).strip() plan_text = plan_text.replace( "[SCENE_VISION]", "### šŸŽ¬ Vision" ) plan_text = plan_text.replace( "[STORYBOARD]", "### šŸ—ŗļø Storyboard" ) plan_text = plan_text.replace( "[ANIMATION_STRATEGY]", "### šŸŽØ Animation Strategy", ) plan_text = plan_text.replace( "[NARRATION]", "### šŸŽžļø Animation" ) with ui.card().classes("w-full shadow-md"): with ui.column().classes("p-6"): ui.markdown(plan_text).classes("prose dark:prose-invert max-w-none") else: with ui.card().classes("w-full p-8 text-center"): ui.icon(icon_name, size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2") ui.label(f"{default_text} not found").classes("text-gray-500") with ui.tab_panel(at_vision): display_plan( "subplans/*_vision_storyboard_plan.txt", "Vision Plan", "visibility", is_editing=is_editing ) with ui.tab_panel(at_tech): display_plan( "subplans/*_technical_implementation_plan.txt", "Technical Plan", "engineering", is_editing=is_editing ) with ui.tab_panel(at_narr): display_plan( "subplans/*_animation_narration_plan.txt", "Narration Plan", "mic", is_editing=is_editing ) with ui.tab_panel(at_code): code_path = os.path.join(scene_path, "code") if os.path.exists(code_path): code_files = sorted( glob.glob(os.path.join(code_path, "*.py")) ) if not code_files: with ui.card().classes("w-full p-8 text-center"): ui.icon("code", size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2") ui.label("No code files found").classes("text-gray-500") else: with ui.card().classes("w-full shadow-md"): code_versions = { os.path.basename(f): f for f in code_files } # Header with file selector with ui.row().classes("w-full items-center gap-3 p-4 border-b dark:border-gray-700 bg-gray-50 dark:bg-gray-800"): ui.icon("code", size="sm").classes("text-primary") ui.label("Scene Code").classes("text-lg font-semibold") ui.space() code_display_area = ui.column().classes("w-full p-4") def show_code(path): code_display_area.clear() with code_display_area: ui.code( safe_read_file( path, clean=False ) ).classes("w-full") ui.select( options=code_versions, label="Version", on_change=lambda e: show_code(e.value), ).props("outlined dense").style("min-width: 200px;") show_code(code_files[-1]) else: with ui.card().classes("w-full p-8 text-center"): ui.icon("code_off", size="3rem").classes("text-gray-300 dark:text-gray-600 mb-2") ui.label("Code not generated yet").classes("text-gray-500") with ui.tab_panel(at_vid): video_file = find_latest_video_for_scene( project_path, scene_num ) if video_file: with ui.card().classes("w-full shadow-lg overflow-hidden p-0"): video_url = ( f"/{os.path.relpath(video_file, '.')}".replace( "\\", "/" ) ) ui.video(video_url).classes("w-full").style("display: block;") else: with ui.card().classes("w-full p-12 text-center"): ui.icon("videocam_off", size="4rem").classes("text-gray-300 dark:text-gray-600 mb-4") ui.label("Scene Not Rendered Yet").classes("text-xl font-semibold text-gray-600 dark:text-gray-400 mb-2") ui.label("This scene hasn't been rendered. Generate it from the main tab.").classes("text-gray-500") def update_util_tab(): util_content.clear() async def handle_finalize(topic): if not topic: ui.notify("Please select a project.", color="warning") return finalize_button.disable() finalize_button.set_text("ā³ Finalizing...") try: loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, combine_videos, topic) # Handle result and show appropriate notification if result == "success": ui.notify(f"āœ… Project '{topic}' finalized successfully!", color="positive") elif result == "already_exists": ui.notify(f"ā„¹ļø Combined assets already exist for '{topic}'", color="info") elif result == "no_scenes": ui.notify(f"āš ļø No rendered scenes found for '{topic}'", color="warning") project_path = get_project_path("output", topic) inner_folder_name = os.path.basename(project_path) video_local_path = os.path.join( project_path, f"{inner_folder_name}_combined.mp4" ) if os.path.exists(video_local_path): video_url = f"/{os.path.relpath(video_local_path, '.')}".replace( "\\", "/" ) finalized_video_player.set_source(video_url) finalized_video_player.style("display: block;") await update_dashboard() except Exception as e: ui.notify(f"Error finalizing project: {e}", color="negative") finally: finalize_button.enable() finalize_button.set_text("šŸŽ¬ Finalize Project") async def handle_continue(topic): if not topic: ui.notify("Please select a project.", color="warning") return # Get project description from outline project_path = get_project_path("output", topic) inner_folder_name = os.path.basename(project_path) scene_outline_path = os.path.join(project_path, f"{inner_folder_name}_scene_outline.txt") description = f"Continue generating the unfinished scenes for {topic}" if os.path.exists(scene_outline_path): with open(scene_outline_path, "r", encoding="utf-8") as f: outline_content = f.read() # Try to extract description from outline if available if "Description:" in outline_content: desc_start = outline_content.find("Description:") + len("Description:") desc_end = outline_content.find("\n\n", desc_start) if desc_end > desc_start: description = outline_content[desc_start:desc_end].strip() continue_button.disable() continue_button.set_text("ā³ Resuming...") progress_container_continue.style("display: block;") progress_bar_continue.set_visibility(True) progress_label_continue.set_text("šŸš€ Starting continuation...") progress_bar_continue.value = 0 log_output_continue.clear() log_output_continue.push(f"šŸš€ Starting continuation for '{topic}'") log_output_continue.push(f"šŸ“‹ Checking project status...") try: # Shared state for progress updates import concurrent.futures import threading import sys from io import StringIO generation_complete = threading.Event() generation_error = [None] current_progress = {"value": 0.1, "text": "Starting..."} log_buffer = [] progress_lock = threading.Lock() def progress_callback(value, text): with progress_lock: current_progress["value"] = value current_progress["text"] = text class LogCapture: def __init__(self, original_stream, buffer_list): self.original_stream = original_stream self.buffer_list = buffer_list self.skip_patterns = [ "Langfuse client is disabled", "LANGFUSE_PUBLIC_KEY", "No video folders found", "langfuse.com/docs", "See our docs:", "==> Loading existing implementation plans", "<== Finished loading plans", "Found: 0, Missing:", "Generating missing implementation plans for scenes:", "Loaded existing topic session ID:", "Saved topic session ID to", ] def write(self, text): self.original_stream.write(text) if text.strip(): should_skip = any(pattern in text for pattern in self.skip_patterns) if not should_skip: cleaned_text = text.rstrip() replacements = { "STARTING VIDEO PIPELINE FOR TOPIC:": "šŸš€ Continuation started for:", "[PHASE 1: SCENE OUTLINE]": "šŸ“ Phase 1: Checking video structure", "[PHASE 1 COMPLETE]": "āœ… Phase 1: Video structure ready", "[PHASE 2: IMPLEMENTATION PLANS]": "šŸŽØ Phase 2: Planning remaining scenes", "[PHASE 2 COMPLETE]": "āœ… Phase 2: All scenes planned", "[PHASE 3: CODE GENERATION & RENDERING (SCENE-BY-SCENE)]": "šŸ’» Phase 3: Rendering remaining scenes", "[PHASE 3 COMPLETE]": "āœ… Phase 3: All scenes rendered", "PIPELINE FINISHED FOR TOPIC:": "āœ… Continuation complete for:", "scene outline saved": "āœ… Video structure saved", "Loaded existing scene outline": "āœ… Using existing video structure", "Loaded existing topic session ID": "", "Total Cost:": "šŸ’° Cost:", "Already completed scenes:": "āœ… Already rendered:", "Scenes with plans ready:": "āœ… Already planned:", "Scenes to render:": "ā³ Remaining to render:", "Already rendered, skipping": "āœ… Already done, skipping", "Finished generating missing plans": "āœ… All scene plans ready", "==> Generating scene outline": "šŸ“ Starting: Generating video structure", "==> Scene outline generated": "āœ… Finished: Video structure generated", "==> Generating scene implementations": "šŸŽØ Starting: Planning scene details", "==> All concurrent scene implementations generated": "āœ… Finished: All scene details planned", "==> Preparing to render": "šŸ’» Starting: Scene rendering", "Starting concurrent processing": "šŸ’» Processing scenes concurrently", "<== All scene processing tasks completed": "āœ… Finished: All scenes processed", } for old, new in replacements.items(): if old in cleaned_text: cleaned_text = cleaned_text.replace(old, new) import re scenes_match = re.search(r'Found (\d+) scenes? in outline', cleaned_text) if scenes_match: num_scenes = scenes_match.group(1) scene_word = "scene" if num_scenes == "1" else "scenes" cleaned_text = f"šŸ“Š Video has {num_scenes} {scene_word}" scene_prefix_match = re.search(r'\[([^\]]+) \| Scene (\d+)\]', cleaned_text) if scene_prefix_match: scene_num = scene_prefix_match.group(2) cleaned_text = re.sub(r'\[([^\]]+) \| Scene (\d+)\]', f'Scene {scene_num}:', cleaned_text) cleaned_text = re.sub(r'\[([^\]]+)\]', '', cleaned_text).strip() if "======================================================" in cleaned_text or not cleaned_text.strip(): return with progress_lock: self.buffer_list.append(cleaned_text) def flush(self): self.original_stream.flush() def run_generation_sync(): old_stdout = sys.stdout old_stderr = sys.stderr try: sys.stdout = LogCapture(old_stdout, log_buffer) sys.stderr = LogCapture(old_stderr, log_buffer) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete( video_generator.generate_video_pipeline( topic, description, max_retries=app_state["max_retries"], progress_callback=progress_callback ) ) loop.close() except Exception as e: generation_error[0] = e finally: sys.stdout = old_stdout sys.stderr = old_stderr generation_complete.set() executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) generation_future = executor.submit(run_generation_sync) heartbeat_counter = 0 last_log_count = 0 last_progress_text = "" while not generation_complete.is_set(): await asyncio.sleep(0.5) heartbeat_counter += 1 with progress_lock: progress_bar_continue.value = current_progress["value"] # Update progress label with elapsed time elapsed = heartbeat_counter // 2 if current_progress["text"]: progress_label_continue.set_text(f"{current_progress['text']} ({elapsed}s)") else: progress_label_continue.set_text(f"ā³ Working... ({elapsed}s)") # Only push new log messages, not repeated status updates if len(log_buffer) > last_log_count: for log_line in log_buffer[last_log_count:]: log_output_continue.push(log_line) last_log_count = len(log_buffer) last_progress_text = current_progress["text"] if generation_error[0]: raise generation_error[0] generation_future.result() with progress_lock: if len(log_buffer) > last_log_count: for log_line in log_buffer[last_log_count:]: log_output_continue.push(log_line) progress_label_continue.set_text("šŸŽžļø Finalizing project...") progress_bar_continue.value = 0.9 # Check if any scenes failed has_failures = len(video_generator.failed_scenes) > 0 if has_failures: log_output_continue.push(f"\nāš ļø Generation completed with {len(video_generator.failed_scenes)} failed scene(s). Finalizing...") else: log_output_continue.push("\nāœ… All scenes generated successfully! Finalizing...") loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, combine_videos, topic) progress_bar_continue.value = 1.0 if has_failures: progress_label_continue.set_text(f"āš ļø Project completed with {len(video_generator.failed_scenes)} failed scene(s)") log_output_continue.push(f"āš ļø Project finalized with {len(video_generator.failed_scenes)} scene(s) failed. Check logs for details.") ui.notify(f"āš ļø Project '{topic}' completed with failures. Check logs.", color="warning", icon="warning") else: progress_label_continue.set_text("āœ… Project complete!") log_output_continue.push("āœ… Project finalized successfully!") if result == "success": ui.notify(f"āœ… Project '{topic}' completed successfully!", color="positive", icon="check_circle") elif result == "already_exists": ui.notify(f"ā„¹ļø Project '{topic}' already finalized", color="info", icon="info") elif result == "no_scenes": ui.notify(f"āš ļø No rendered scenes found for '{topic}'", color="warning", icon="warning") await asyncio.sleep(2) progress_container_continue.style("display: none;") await update_dashboard() except Exception as e: progress_label_continue.set_text(f"āŒ Error: {str(e)[:50]}...") progress_bar_continue.value = 0 log_output_continue.push(f"\nāŒ An error occurred: {e}") ui.notify(f"Error continuing project: {e}", color="negative", multi_line=True) finally: continue_button.enable() continue_button.set_text("ā–¶ļø Continue Project") with util_content: topic_folders_util = get_topic_folders("output") if not topic_folders_util: with ui.card().classes("w-full").style("padding: 60px; text-align: center;"): ui.icon("folder_open", size="xl").classes("text-gray-400") ui.label("No Projects Found").classes("text-2xl font-semibold text-gray-700 dark:text-gray-300 mt-4") ui.label("Create a project first to use utilities").classes("text-gray-500 dark:text-gray-400 mt-2") return # Get project statuses all_statuses = [video_generator.check_theorem_status({"theorem": th}) for th in topic_folders_util] incomplete_projects = [s for s in all_statuses if not s["has_combined_video"]] complete_projects = [s for s in all_statuses if s["has_combined_video"]] # Continue Unfinished Projects Section with ui.card().classes("w-full mb-4").style("padding: 24px;"): with ui.row().classes("w-full items-center gap-3 mb-4"): ui.icon("play_circle", size="lg").classes("text-primary") with ui.column().classes("gap-1"): ui.label("Continue Unfinished Projects").classes("text-xl font-bold text-gray-900 dark:text-white") ui.label("Resume generation for incomplete projects").classes("text-sm text-gray-600 dark:text-gray-400") if incomplete_projects: continue_select = ui.select( [p["topic"] for p in incomplete_projects], label="Select Project to Continue" ).props("outlined").classes("w-full") # Show project status with ui.row().classes("w-full items-center gap-2 mt-2 mb-3"): ui.icon("info", size="sm").classes("text-blue-500") status_label = ui.label("Select a project to see its status").classes("text-sm text-gray-600 dark:text-gray-400") def update_status(): if continue_select.value: status = next((s for s in incomplete_projects if s["topic"] == continue_select.value), None) if status: rendered = status["rendered_scenes"] total = status["total_scenes"] status_label.set_text(f"Progress: {rendered}/{total} scenes rendered ({rendered/total*100:.0f}%)") continue_select.on_value_change(lambda: update_status()) continue_button = ui.button( "ā–¶ļø Continue Project", on_click=lambda: handle_continue(continue_select.value), ).props("unelevated no-caps").classes("w-full mt-2") # Progress section for continue progress_container_continue = ui.column().classes("w-full gap-3 mt-4").style("display: none;") with progress_container_continue: progress_label_continue = ui.label("Starting...").classes("text-sm font-medium text-gray-700 dark:text-gray-300") progress_bar_continue = ui.linear_progress(value=0, show_value=False).props("color=primary size=8px").classes("w-full") with ui.expansion("View Detailed Logs", icon="terminal").props("duration=600").classes("w-full mt-2"): log_output_continue = ui.log().props("id=log_output_continue").classes( "w-full h-64 bg-gray-900 dark:bg-black text-white font-mono rounded-lg" ) else: with ui.row().classes("w-full items-center gap-2 p-4 bg-green-50 dark:bg-green-900/20 rounded-lg"): ui.icon("check_circle", size="md").classes("text-green-600") ui.label("All projects are complete! šŸŽ‰").classes("text-green-700 dark:text-green-300 font-medium") # Regenerate Subtitles Section with ui.card().classes("w-full mb-4").style("padding: 24px;"): with ui.row().classes("w-full items-center gap-3 mb-4"): ui.icon("movie_edit", size="lg").classes("text-primary") with ui.column().classes("gap-1"): ui.label("Re-Assemble Video & Subtitles (Fix Sync)").classes("text-xl font-bold text-gray-900 dark:text-white") ui.label("Regenerate all audio, fix sync, and re-render full video.").classes("text-sm text-gray-600 dark:text-gray-400") # Info box explaining the feature with ui.expansion("ā„¹ļø What does this do?", icon="help_outline").props("dense").classes("w-full mb-3"): with ui.column().classes("gap-2 p-2"): ui.label("This tool completely rebuilds the final video to ensure perfect synchronization.").classes("text-sm text-gray-700 dark:text-gray-300") ui.separator() ui.label("How it works:").classes("text-sm font-semibold text-gray-800 dark:text-gray-200") ui.label("• Verifies and regenerates ALL voiceover audio files (including pauses)").classes("text-sm text-gray-600 dark:text-gray-400") ui.label("• Reconstructs the entire audio track for millisecond-perfect subtitle match").classes("text-sm text-gray-600 dark:text-gray-400") ui.label("• Stitches scene videos together with the new audio track").classes("text-sm text-gray-600 dark:text-gray-400") ui.separator() ui.label("Note: This completely rebuilds the final video file (visuals + audio).").classes("text-xs text-gray-500 dark:text-gray-400 italic") if complete_projects: regen_select = ui.select( [p["topic"] for p in complete_projects], label="Select Project to Fix" ).props("outlined").classes("w-full") # Status indicator with ui.row().classes("w-full items-center gap-2 mt-2 mb-3"): ui.icon("info", size="sm").classes("text-blue-500") regen_status_label = ui.label("Select a project to check subtitle status").classes("text-sm text-gray-600 dark:text-gray-400") async def check_subtitle_status(): if regen_select.value: project_path = get_project_path("output", regen_select.value) inner_folder_name = os.path.basename(project_path) # Check if already has sync fix applied tcm_path = os.path.join(project_path, f"{inner_folder_name}_combined_tcm.json") has_sync_fix = False if os.path.exists(tcm_path): try: with open(tcm_path, "r", encoding="utf-8") as f: tcm_data = json.load(f) has_sync_fix = any(event.get("_sync_fix_applied", False) for event in tcm_data) except: pass if has_sync_fix: regen_status_label.set_text("āœ“ Subtitles already use improved sync (you can still regenerate if needed)") regen_status_label.classes( remove="text-gray-600 dark:text-gray-400 text-orange-600 dark:text-orange-400 text-red-600 dark:text-red-400", add="text-green-600 dark:text-green-400" ) return # Check for voiceover cache has_cache = False scene_dirs = sorted( glob.glob(os.path.join(project_path, "scene*")), key=lambda x: int(re.search(r"scene(\d+)", x).group(1)) if re.search(r"scene(\d+)", x) else 0 ) for scene_dir in scene_dirs: cache_dirs = [ os.path.join(scene_dir, "code", "media", "voiceovers"), os.path.join(project_path, "media", "voiceovers"), ] for cache_dir in cache_dirs: if os.path.exists(cache_dir) and glob.glob(os.path.join(cache_dir, "*.json")): has_cache = True break if has_cache: break if has_cache: regen_status_label.set_text("⚠ Old subtitle timing detected - regeneration recommended") regen_status_label.classes( remove="text-gray-600 dark:text-gray-400 text-green-600 dark:text-green-400 text-red-600 dark:text-red-400", add="text-orange-600 dark:text-orange-400" ) else: regen_status_label.set_text("⚠ No cache found - will use fallback scaling (less accurate)") regen_status_label.classes( remove="text-gray-600 dark:text-gray-400 text-green-600 dark:text-green-400 text-red-600 dark:text-red-400", add="text-orange-600 dark:text-orange-400" ) regen_select.on_value_change(lambda: asyncio.create_task(check_subtitle_status())) async def handle_regenerate_subtitles(topic): if not topic: ui.notify("Please select a project.", color="warning") return # Disable button and update UI regen_button.disable() regen_button.set_text("ā³ Re-Assembling...") regen_status_container.style("display: block;") # Beautiful processing status regen_status_container.clear() with regen_status_container: with ui.card().classes("w-full bg-blue-50 dark:bg-blue-900/10 border-blue-200 dark:border-blue-800"): with ui.row().classes("items-center gap-4"): ui.spinner("dots", size="lg").classes("text-blue-600") with ui.column().classes("gap-1"): ui.label("Re-Assembling Project Constraints").classes("text-lg font-bold text-blue-800 dark:text-blue-200") ui.label("Step 1: Verifying & Regenerating all audio assets...").classes("text-sm text-blue-600 dark:text-blue-400") ui.label("Step 2: Reconstructing millisecond-perfect audio track...").classes("text-sm text-blue-600 dark:text-blue-400") ui.label("Step 3: Stitching scenes & Re-rendering final video...").classes("text-sm text-blue-600 dark:text-blue-400") ui.label("Please wait, this will take a moment.").classes("text-xs text-gray-500 italic mt-1") try: import concurrent.futures def regenerate_subtitles_task(): """Background task to regenerate subtitles AND re-render video""" # Delete old subtitle files to force regeneration project_path = get_project_path("output", topic) inner_folder_name = os.path.basename(project_path) files_to_delete = [ os.path.join(project_path, f"{inner_folder_name}_combined_tcm.json"), os.path.join(project_path, f"{inner_folder_name}_combined.srt"), os.path.join(project_path, f"{inner_folder_name}_combined.vtt"), # We don't delete the video here, the function overwrites it safely ] for file_path in files_to_delete: if os.path.exists(file_path): os.remove(file_path) # Run the full re-assembly result = regenerate_subtitles_only(topic) return result # Run in executor to avoid blocking UI loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as executor: result = await loop.run_in_executor(executor, regenerate_subtitles_task) # Update UI based on result (back in main event loop, context is restored) regen_status_container.clear() with regen_status_container: if result == "success": with ui.card().classes("w-full bg-green-50 dark:bg-green-900/10 border-green-200 dark:border-green-800"): with ui.row().classes("items-center gap-4"): ui.icon("check_circle", size="lg").classes("text-green-600") with ui.column().classes("gap-1"): ui.label("Re-Assembly Complete!").classes("text-lg font-bold text-green-800 dark:text-green-200") ui.label("Video and subtitles have been perfectly synchronized.").classes("text-sm text-green-600 dark:text-green-400") ui.notify(f"āœ… Project '{topic}' successfully re-assembled!", color="positive", icon="check_circle") elif result == "no_video": ui.label("āš ļø Missing source video scenes").classes("text-orange-600") ui.notify(f"āš ļø No source scenes found for '{topic}'.", color="warning") else: ui.label("āŒ Unknown error").classes("text-red-600") ui.notify("Something went wrong.", color="negative") except Exception as e: regen_status_container.clear() with regen_status_container: ui.label(f"āŒ Error: {str(e)}").classes("text-red-600 font-bold") ui.notify(f"Error re-assembling project: {e}", color="negative", multi_line=True) finally: # Re-enable button regen_button.enable() regen_button.set_text("šŸ”„ Re-Assemble Video") with regen_status_container: # Wrapping this too just in case update_dashboard needs context, # though it likely has its own. But staying safe. pass await update_dashboard() if app_state.get("current_topic_inspector") == topic: await update_inspector(topic) regen_button = ui.button( "šŸ”„ Regenerate Subtitles", on_click=lambda: asyncio.create_task(handle_regenerate_subtitles(regen_select.value)), ).props("unelevated no-caps").classes("w-full mt-2") # Status container regen_status_container = ui.column().classes("w-full gap-2 mt-3").style("display: none;") with regen_status_container: with ui.card().classes("w-full border-l-4 border-primary").style("padding: 16px; background: rgba(255, 75, 75, 0.05);"): regen_status_label_progress = ui.label("Processing...").classes("text-sm font-medium text-gray-700 dark:text-gray-300") else: with ui.row().classes("w-full items-center gap-2 p-4 bg-orange-50 dark:bg-orange-900/20 rounded-lg"): ui.icon("info", size="md").classes("text-orange-600") ui.label("No completed projects found. Complete a project first to use this feature.").classes("text-orange-700 dark:text-orange-300 font-medium") # --- Main Content Area (UI Definition) --- with ui.column().classes("w-full max-w-6xl mx-auto gap-6").style("padding: 24px 16px;"): with ui.tabs().classes("w-full mb-4") as main_tabs: one = ui.tab("✨ Generate").props("no-caps") two = ui.tab("šŸ“Š Dashboard").props("no-caps") three = ui.tab("šŸ“ŗ Project View").props("no-caps") four = ui.tab("šŸ”§ Utilities").props("no-caps") with ui.tab_panels(main_tabs, value=one).classes("w-full"): with ui.tab_panel(one): # Modern header with gradient background with ui.card().classes("w-full shadow-lg border-0 mb-6").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"): with ui.row().classes("w-full items-center gap-3"): ui.icon("auto_awesome", size="xl").classes("text-white") with ui.column().classes("gap-1"): ui.label("Generate").classes("text-3xl font-bold text-white") ui.label("Create educational videos with AI-powered animations").classes("text-white/80") # Main Form with ui.card().classes("w-full").style("padding: 32px;"): with ui.column().classes("w-full gap-3"): # Project Name and Tips Row with ui.row().classes("w-full gap-4 items-end"): # Project Name Section (left side) with ui.column().classes("flex-grow gap-2"): with ui.row().classes("items-center gap-2"): ui.icon("label", size="sm").classes("text-primary") ui.label("Project Name").classes("text-sm font-semibold text-gray-900 dark:text-white") topic_input = ( ui.input(placeholder="e.g., Binary Search, Bubble Sort, Linked Lists...") .props("outlined dense") .classes("w-full") ) topic_input.value = "Binary Search" # Tips Card (right side) - aligned to bottom with ui.card().classes("flex-shrink-0").style("padding: 10px 14px; background-color: #fffbeb; border: 1px solid #fef3c7; width: 320px;"): with ui.row().classes("items-center gap-2"): ui.icon("tips_and_updates", size="sm").classes("text-primary") ui.label("More details = better video!").classes("text-xs font-medium") # Description Section with ui.column().classes("w-full").style("gap: 8px;"): with ui.row().classes("items-center gap-2 justify-between w-full"): with ui.row().classes("items-center gap-2"): ui.icon("description", size="sm").classes("text-primary") ui.label("Video Description").classes("text-sm font-semibold text-gray-900 dark:text-white") auto_gen_button = ( ui.button("Auto-Generate", icon="auto_awesome") .props("flat dense no-caps") .classes("text-primary") .style("font-size: 0.875rem;") .tooltip("Generate description from project name using AI") ) desc_input = ( ui.textarea( placeholder="""Tips for better videos: • Be specific about the algorithm and target audience level (beginner/intermediate/advanced) • Mention if you want code examples, analogies, or visualizations • Include the key concepts you want to emphasize • Specify the programming language if showing code • Mention any real-world applications or use cases""" ) .props("outlined rows=15") .classes("w-full") .style("margin-bottom: 12px;") ) desc_input.value = """Create a short video for beginners explaining Binary Search algorithm. Target Audience: Complete beginners to algorithms Key Concepts: Divide and conquer, sorted arrays, logarithmic time complexity Content: • Start with a sorted array example: [1, 3, 5, 7, 9, 11, 13] • Show step-by-step how to find a target number (e.g., 7) • Visualize checking the middle element • Demonstrate eliminating half the array each time • Compare with linear search to show efficiency • End with time complexity: O(log n) vs O(n) Style: Use simple animations, friendly tone, and a relatable analogy (like guessing a number between 1-100)""" # Review Mode Option with ui.row().classes("w-full items-center gap-2 mb-2"): review_checkbox = ui.checkbox("Interactive Script Review (Pause after planning)").props("dense") review_checkbox.tooltip("Generates plans and stops. You can review/edit the script in 'Project View' before rendering.") ui.icon("edit_note", size="sm").classes("text-gray-500") # Action Section - tight to textarea with ui.row().classes("w-full justify-end"): generate_button = ( ui.button("Generate Video", icon="play_circle") .props("unelevated no-caps") .classes("px-6") .style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); color: white;") ) # Progress Indicator (hidden by default) progress_container = ui.column().classes("w-full gap-3 mt-6").style("display: none;") with progress_container: with ui.card().classes("w-full shadow-lg border-l-4 border-primary").style("padding: 24px;"): with ui.column().classes("w-full gap-4"): with ui.row().classes("w-full items-center gap-3"): ui.spinner(size="lg", color="primary") progress_label = ui.label("Generating video...").classes("text-lg font-semibold text-gray-900 dark:text-white") progress_bar = ui.linear_progress(value=0, show_value=False).props('rounded color="primary"').style("height: 8px;") ui.label("This may take several minutes depending on the complexity of your project.").classes("text-sm text-gray-600 dark:text-gray-400") # --- TRIVIA CAROUSEL SECTION --- trivia_container = ui.column().classes("w-full mt-2 hidden transition-all duration-500") with trivia_container: with ui.card().classes("w-full bg-gradient-to-r from-blue-50 to-indigo-50 dark:from-gray-800 dark:to-gray-700 border-none shadow-inner p-4"): with ui.row().classes("w-full items-center gap-3"): with ui.avatar(size="md").classes("bg-white text-blue-600 shadow-sm"): #Icon background ui.icon("lightbulb", size="sm") with ui.column().classes("gap-1 flex-grow"): ui.label("Did You Know?").classes("text-xs font-bold text-blue-600 uppercase tracking-wide") trivia_label = ui.label("Fetching interesting facts...").classes("text-sm font-medium text-gray-800 dark:text-gray-200 italic transition-opacity duration-300") # ------------------------------- # Expandable log section with ui.expansion("View Detailed Logs", icon="terminal").props("duration=600").classes("w-full mt-2"): log_output = ( ui.log() .props("id=log_output") .classes( "w-full h-64 bg-gray-900 dark:bg-black text-white font-mono rounded-lg" ) ) # Define the auto-generate description handler async def handle_auto_generate_description(): topic = topic_input.value if not topic or not topic.strip(): ui.notify("Please enter a Project Name first.", color="warning") return # Create a dialog to ask for difficulty level difficulty_level = None with ui.dialog().props('persistent') as difficulty_dialog, ui.card().classes("w-full max-w-md"): # Header with icon with ui.row().classes("items-center gap-3 p-6 pb-4 border-b border-gray-200 dark:border-gray-700"): ui.icon("school", size="md").classes("text-primary") with ui.column().classes("gap-1"): ui.label("Select Difficulty Level").classes("text-xl font-bold text-gray-900 dark:text-white") ui.label("Choose the target audience for your video").classes("text-sm text-gray-600 dark:text-gray-400") difficulty_options = [ { "label": "Beginner", "value": "beginner", "icon": "lightbulb", "color": "green", "desc": "Perfect for newcomers with no prior knowledge" }, { "label": "Intermediate", "value": "intermediate", "icon": "trending_up", "color": "blue", "desc": "For learners with basic understanding" }, { "label": "Advanced", "value": "advanced", "icon": "psychology", "color": "purple", "desc": "Deep dive for experts and professionals" }, ] selected_difficulty = {"value": "intermediate"} # Default option_cards = [] # Options container with ui.column().classes("gap-3 p-6"): for option in difficulty_options: with ui.card().classes( "p-4 cursor-pointer transition-all duration-200 " "border-2 border-transparent hover:border-primary/50 " "hover:shadow-lg" ).style("border-radius: 12px") as option_card: with ui.row().classes("items-center gap-4 w-full"): # Icon with colored background with ui.element("div").classes( f"flex items-center justify-center w-12 h-12 rounded-full " f"bg-{option['color']}-100 dark:bg-{option['color']}-900/30" ): ui.icon(option["icon"], size="sm").classes(f"text-{option['color']}-600 dark:text-{option['color']}-400") # Text content with ui.column().classes("flex-1 gap-1"): ui.label(option["label"]).classes("text-base font-semibold text-gray-900 dark:text-white") ui.label(option["desc"]).classes("text-sm text-gray-600 dark:text-gray-400") # Radio indicator ui.icon("radio_button_unchecked", size="sm").classes("text-gray-400").bind_visibility_from( selected_difficulty, "value", backward=lambda v, opt=option: v != opt["value"] ) ui.icon("check_circle", size="sm").classes("text-primary").bind_visibility_from( selected_difficulty, "value", backward=lambda v, opt=option: v == opt["value"] ) # Store card reference and make clickable option_cards.append((option_card, option)) def make_click_handler(opt): def handler(): selected_difficulty["value"] = opt["value"] # Update card styles for card, card_opt in option_cards: if card_opt["value"] == opt["value"]: card.classes(remove="border-transparent", add="border-primary") else: card.classes(remove="border-primary", add="border-transparent") return handler option_card.on("click", make_click_handler(option)) # Set initial selected state if option["value"] == "intermediate": option_card.classes(remove="border-transparent", add="border-primary") # Action buttons with ui.row().classes("gap-3 p-6 pt-4 border-t border-gray-200 dark:border-gray-700 justify-end w-full"): ui.button("Cancel", icon="close", on_click=lambda: difficulty_dialog.close()).props("flat outline").classes("px-4") ui.button( "Generate Description", icon="auto_awesome", on_click=lambda: [ difficulty_dialog.submit(selected_difficulty["value"]) ] ).props("unelevated").classes("px-6 bg-primary text-white") # Show the dialog and wait for user selection difficulty_level = await difficulty_dialog if not difficulty_level: return # User cancelled # Disable button and show loading state auto_gen_button.disable() auto_gen_button.props("loading") ui.notify(f"Generating {difficulty_level} level description...", color="info") try: # Run LLM call in executor to keep UI responsive import concurrent.futures def generate_description(): # Use the planner model to generate description llm = LiteLLMWrapper( model_name=app_state["planner_model_name"], temperature=0.7, print_cost=False, verbose=False, use_langfuse=False ) # Map difficulty to audience description audience_map = { "beginner": "beginners with no prior knowledge, using simple language and basic concepts", "intermediate": "learners with basic understanding, building on foundational knowledge", "advanced": "experts and advanced learners, covering complex details and nuances" } audience_desc = audience_map.get(difficulty_level, audience_map["intermediate"]) # Create prompt that produces clean output without asterisks or preambles prompt = f"""Generate a detailed video description for an educational video about "{topic}". IMPORTANT INSTRUCTIONS: - Start directly with the content, NO preambles like "Here is..." or "Of course!" - Use plain text formatting with line breaks, NOT markdown asterisks or bold - Use bullet points with • symbol, not asterisks - Be specific, include concrete examples - Make it educational and engaging - Target audience level: {difficulty_level.upper()} - {audience_desc} REQUIRED FORMAT: Create a short video for {audience_desc} explaining {topic}. Target Audience: [Specify {difficulty_level} level audience and their background] Key Concepts: [List 2-4 main concepts appropriate for {difficulty_level} level] Content: • [First key point with specific example suitable for {difficulty_level} level] • [Second key point with visualization approach] • [Third key point showing step-by-step process] • [Fourth key point with comparison or application] • [Final point with complexity or summary appropriate for {difficulty_level} level] Style: [Describe the teaching approach and tone for {difficulty_level} learners] Now generate the description for "{topic}" following this exact format.""" messages = [{"type": "text", "content": prompt}] return llm(messages, metadata={}) # Run in thread pool to avoid blocking UI loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as executor: generated_desc = await loop.run_in_executor(executor, generate_description) if generated_desc and not generated_desc.startswith("Error"): # Clean up any remaining asterisks or unwanted formatting generated_desc = generated_desc.strip() # Remove common preambles preambles = [ "Of course! Here is", "Here is a detailed", "Here's a detailed", "Sure! Here is", "Certainly! Here is", ] for preamble in preambles: if generated_desc.startswith(preamble): # Find the first newline after preamble and start from there first_newline = generated_desc.find('\n') if first_newline > 0: generated_desc = generated_desc[first_newline:].strip() break desc_input.value = generated_desc ui.notify(f"{difficulty_level.capitalize()} level description generated successfully!", color="positive") else: ui.notify("Failed to generate description. Please try again.", color="negative") except Exception as e: print(f"Error generating description: {e}") ui.notify(f"Error: {str(e)}", color="negative") finally: # Re-enable button auto_gen_button.enable() auto_gen_button.props(remove="loading") # Connect the auto-generate button handler auto_gen_button.on_click(handle_auto_generate_description) # Define the generate handler now that UI elements exist async def handle_generate(): topic = topic_input.value description = desc_input.value review_mode = review_checkbox.value if not topic or not description: ui.notify( "Please provide both a Project Name and Description.", color="warning" ) return # Check if project already exists project_path = get_project_path("output", topic) is_existing = os.path.exists(project_path) # Disable button and show progress generate_button.disable() progress_container.style("display: block;") progress_bar.set_visibility(True) progress_label.set_text("šŸš€ Starting generation...") progress_bar.value = 0 log_output.clear() # Reset trivia state trivia_container.classes(remove="hidden") trivia_label.set_text("Thinking of interesting facts...") # Start background trivia generation async def generate_and_cycle_trivia(): facts = [ "Algorithm visualization helps in understanding complex logic by 70%.", "Python's Timsort is a hybrid sorting algorithm, derived from merge sort and insertion sort.", "Manim was originally created by 3Blue1Brown to explain math concepts.", "Big O notation frames the upper bound of an algorithm's running time.", "The first computer algorithm was written by Ada Lovelace in the mid-1800s." ] try: # Quick LLM call for custom trivia llm = LiteLLMWrapper(model_name=app_state["planner_model_name"], temperature=0.7) prompt = f"Generate 5 short, interesting, one-sentence facts or tips about {topic} or computer science. Return ONLY the facts, one per line." response = await asyncio.get_event_loop().run_in_executor(None, lambda: llm([{"type": "text", "content": prompt}])) if response: new_facts = [line.strip() for line in response.split('\n') if line.strip() and not line.strip().startswith(('Here', 'Sure'))] if len(new_facts) >= 3: facts = new_facts except Exception as e: print(f"Trivia gen failed, using defaults: {e}") i = 0 while progress_container.visible: # Keep cycling while visible trivia_label.set_text(facts[i % len(facts)]) # Animate transition trivia_label.classes(add="opacity-0") await asyncio.sleep(0.5) trivia_label.classes(remove="opacity-0") i += 1 # Wait before next fact for _ in range(80): # 8 seconds, checking visibility every 0.1s if not progress_container.visible: return await asyncio.sleep(0.1) asyncio.create_task(generate_and_cycle_trivia()) if is_existing: log_output.push(f"šŸ”„ Resuming existing project: '{topic}'") log_output.push(f"āœ… Checking project status and continuing from where it left off...") else: log_output.push(f"šŸš€ Starting new project: '{topic}'") log_output.push(f"šŸ“‹ Initializing video generation pipeline...") try: # Don't modify output_dir - generate_video.py handles folder structure # Update progress: Planning phase progress_label.set_text("šŸ“ Planning video structure...") progress_bar.value = 0.1 await asyncio.sleep(0.1) # Allow UI to update # Start generation in background thread to keep UI responsive import concurrent.futures import threading import sys from io import StringIO # Shared state for progress updates generation_complete = threading.Event() generation_error = [None] current_progress = {"value": 0.1, "text": "Starting..."} log_buffer = [] # Buffer for log messages progress_lock = threading.Lock() def progress_callback(value, text): """Thread-safe progress callback""" with progress_lock: current_progress["value"] = value current_progress["text"] = text class LogCapture: """Capture stdout/stderr and store in buffer""" def __init__(self, original_stream, buffer_list): self.original_stream = original_stream self.buffer_list = buffer_list self.skip_patterns = [ "Langfuse client is disabled", "LANGFUSE_PUBLIC_KEY", "No video folders found", "langfuse.com/docs", "See our docs:", "==> Loading existing implementation plans", "<== Finished loading plans", "Found: 0, Missing:", "Generating missing implementation plans for scenes:", "Loaded existing topic session ID:", "Saved topic session ID to", ] def write(self, text): self.original_stream.write(text) # Still print to console if text.strip(): # Only add non-empty lines # Filter out unnecessary technical messages should_skip = any(pattern in text for pattern in self.skip_patterns) if not should_skip: # Transform technical messages to user-friendly ones cleaned_text = text.rstrip() # Replace technical terms replacements = { "STARTING VIDEO PIPELINE FOR TOPIC:": "šŸš€ Starting video generation for:", "[PHASE 1: SCENE OUTLINE]": "šŸ“ Phase 1: Planning video structure", "[PHASE 1 COMPLETE]": "āœ… Phase 1: Video structure ready", "[PHASE 2: IMPLEMENTATION PLANS]": "šŸŽØ Phase 2: Designing all scenes", "[PHASE 2 COMPLETE]": "āœ… Phase 2: All scenes designed", "[PHASE 3: CODE GENERATION & RENDERING (SCENE-BY-SCENE)]": "šŸŽ¬ Phase 3: Rendering all scenes", "[PHASE 3 COMPLETE]": "āœ… Phase 3: All scenes rendered", "PIPELINE FINISHED FOR TOPIC:": "āœ… Video generation complete for:", "scene outline saved": "āœ… Video structure saved", "Loaded existing scene outline": "āœ… Using existing video structure", "Loaded existing topic session ID": "", "Total Cost:": "šŸ’° Cost:", "==> Generating scene outline": "šŸ“ Starting: Generating video structure", "==> Scene outline generated": "āœ… Finished: Video structure generated", "==> Generating scene implementations": "šŸŽØ Starting: Planning scene details", "==> All concurrent scene implementations generated": "āœ… Finished: All scene details planned", "==> Preparing to render": "šŸ’» Starting: Scene rendering", "Starting concurrent processing": "šŸ’» Processing scenes concurrently", "<== All scene processing tasks completed": "āœ… Finished: All scenes processed", } # Apply replacements for old, new in replacements.items(): if old in cleaned_text: cleaned_text = cleaned_text.replace(old, new) # Handle "Found X scenes in outline" dynamically import re scenes_match = re.search(r'Found (\d+) scenes? in outline', cleaned_text) if scenes_match: num_scenes = scenes_match.group(1) scene_word = "scene" if num_scenes == "1" else "scenes" cleaned_text = f"šŸ“Š Video has {num_scenes} {scene_word}" # Clean up scene prefixes like "[Binary Search | Scene 2]" scene_prefix_match = re.search(r'\[([^\]]+) \| Scene (\d+)\]', cleaned_text) if scene_prefix_match: scene_num = scene_prefix_match.group(2) cleaned_text = re.sub(r'\[([^\]]+) \| Scene (\d+)\]', f'Scene {scene_num}:', cleaned_text) # Clean up topic prefixes like "[Binary Search]" cleaned_text = re.sub(r'\[([^\]]+)\]', '', cleaned_text).strip() # Remove excessive equals signs and empty messages if "======================================================" in cleaned_text or not cleaned_text.strip(): return with progress_lock: self.buffer_list.append(cleaned_text) def flush(self): self.original_stream.flush() def run_generation_sync(): """Run generation in a separate thread""" # Capture stdout/stderr old_stdout = sys.stdout old_stderr = sys.stderr try: # Redirect output to our capture sys.stdout = LogCapture(old_stdout, log_buffer) sys.stderr = LogCapture(old_stderr, log_buffer) # Create a new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Run the generation pipeline with progress callback loop.run_until_complete( video_generator.generate_video_pipeline( topic, description, max_retries=app_state["max_retries"], only_plan=review_mode, progress_callback=progress_callback ) ) loop.close() except Exception as e: generation_error[0] = e finally: # Restore original stdout/stderr sys.stdout = old_stdout sys.stderr = old_stderr generation_complete.set() # Start generation in background thread executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) generation_future = executor.submit(run_generation_sync) heartbeat_counter = 0 last_log_count = 0 last_progress_text = "" # Keep UI responsive and update progress from callback while not generation_complete.is_set(): await asyncio.sleep(0.5) # Check twice per second for smoother updates heartbeat_counter += 1 # Update UI with actual progress from pipeline with progress_lock: progress_bar.value = current_progress["value"] # Update progress label with elapsed time elapsed = heartbeat_counter // 2 if current_progress["text"]: progress_label.set_text(f"{current_progress['text']} ({elapsed}s)") else: progress_label.set_text(f"ā³ Working... ({elapsed}s)") # Push new log messages to UI if len(log_buffer) > last_log_count: for log_line in log_buffer[last_log_count:]: log_output.push(log_line) last_log_count = len(log_buffer) last_progress_text = current_progress["text"] # Check for errors if generation_error[0]: raise generation_error[0] # Wait for thread to fully complete generation_future.result() # Push any remaining logs with progress_lock: if len(log_buffer) > last_log_count: for log_line in log_buffer[last_log_count:]: log_output.push(log_line) # Handle "Plan Only" / Review Mode completion if review_mode: progress_bar.value = 1.0 progress_label.set_text("āœ… Planning Complete! Ready for Review.") log_output.push("\nāœ… Video plans generated successfully!") log_output.push("ā„¹ļø Go to 'Project View' tab to review and edit the script before rendering.") ui.notify(f"Planning complete for '{topic}'. Please review in Project View.", color="positive", icon="edit_note", timeout=0) # Help user by enabling buttons generate_button.enable() # Add a button to go to review directly with progress_container: ui.button("Go to Review", icon="arrow_forward", on_click=lambda: [ main_tabs.set_value("šŸ“ŗ Project View"), # We can't easily auto-select the topic in the other tab without more state management, # but the user can select it. # Actually, let's try to set it: app_state.update({"current_topic_inspector": topic}), # Trigger update (this is async, so we wrap it) asyncio.create_task(update_inspector(topic)) ]).props("unelevated no-caps").classes("mt-4 bg-secondary text-white") return progress_label.set_text("šŸŽžļø Finalizing project...") progress_bar.value = 0.9 # Finalization phase progress_label.set_text("šŸŽžļø Combining videos and creating subtitles...") progress_bar.value = 0.9 # Check if any scenes failed has_failures = len(video_generator.failed_scenes) > 0 if has_failures: log_output.push(f"\nāš ļø Pipeline completed with {len(video_generator.failed_scenes)} failed scene(s). Finalizing project...") else: log_output.push("\nāœ… Pipeline completed! Finalizing project...") loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, combine_videos, topic) # Complete progress_bar.value = 1.0 if has_failures: progress_label.set_text(f"āš ļø Video generation completed with {len(video_generator.failed_scenes)} failed scene(s)") log_output.push(f"āš ļø Project finalized with {len(video_generator.failed_scenes)} scene(s) failed. Check logs for details.") ui.notify( f"āš ļø Video for '{topic}' completed with failures. Check logs.", color="warning", icon="warning", ) else: progress_label.set_text("āœ… Video generation complete!") log_output.push("āœ… Project finalized successfully!") if result == "success": ui.notify( f"Video for '{topic}' generated!", color="positive", icon="check_circle", ) elif result == "already_exists": ui.notify(f"ā„¹ļø Video for '{topic}' already finalized", color="info", icon="info") elif result == "no_scenes": ui.notify(f"āš ļø No rendered scenes found for '{topic}'", color="warning", icon="warning") # Hide progress after 2 seconds await asyncio.sleep(2) progress_container.style("display: none;") except Exception as e: progress_label.set_text(f"āŒ Error: {str(e)[:50]}...") progress_bar.value = 0 log_output.push(f"\nāŒ An error occurred: {e}") ui.notify(f"An error occurred: {e}", color="negative", multi_line=True) finally: generate_button.enable() await update_dashboard() # Connect the button to the handler generate_button.on_click(handle_generate) with ui.tab_panel(two): with ui.column().classes("w-full gap-6"): # Modern header with gradient background with ui.card().classes("w-full shadow-lg border-0").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"): with ui.row().classes("w-full items-center justify-between"): with ui.row().classes("items-center gap-3"): ui.icon("dashboard", size="xl").classes("text-white") with ui.column().classes("gap-1"): ui.label("Dashboard").classes("text-3xl font-bold text-white") ui.label("Monitor your projects and track progress").classes("text-white/80") ui.button("Refresh", on_click=update_dashboard, icon="refresh").props( "flat round" ).classes("text-white").tooltip("Refresh dashboard") dashboard_content = ui.column().classes("w-full gap-4 mt-4") with ui.tab_panel(three): with ui.column().classes("w-full gap-6"): # Modern header with gradient background with ui.card().classes("w-full shadow-lg border-0").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"): with ui.row().classes("w-full items-center gap-3"): ui.icon("tv", size="xl").classes("text-white") with ui.column().classes("gap-1"): ui.label("Project View").classes("text-3xl font-bold text-white") ui.label("Watch videos, explore code, and learn with AI").classes("text-white/80") topic_folders = get_topic_folders("output") default_topic = app_state.get("selected_topic") or ( topic_folders[0] if topic_folders else None ) # Project selector with modern styling with ui.card().classes("w-full shadow-md"): with ui.row().classes("w-full items-center gap-3 p-4"): ui.icon("folder_open", size="lg").classes("text-primary") inspector_select = ( ui.select( topic_folders, label="Select a Project", value=default_topic, on_change=lambda e: update_inspector(e.value), ) .props("outlined") .classes("flex-grow text-lg") .style("font-size: 1.125rem; min-height: 56px;") ) inspector_content = ui.column().classes("w-full gap-6 mt-4") with ui.tab_panel(four): with ui.column().classes("w-full gap-6"): # Modern header with gradient background with ui.card().classes("w-full shadow-lg border-0").style("background: linear-gradient(135deg, #FF4B4B 0%, #FF6B6B 100%); padding: 24px;"): with ui.row().classes("w-full items-center gap-3"): ui.icon("build_circle", size="xl").classes("text-white") with ui.column().classes("gap-1"): ui.label("Utilities").classes("text-3xl font-bold text-white") ui.label("Continue and manage your video projects").classes("text-white/80") util_content = ui.column().classes("w-full mt-4") # --- Define inspect_project after all UI elements are created --- async def inspect_project(topic_name): app_state["selected_topic"] = topic_name inspector_select.set_value(topic_name) await update_inspector(topic_name) main_tabs.set_value("šŸ“ŗ Project View") # --- Initial UI State Population --- await update_dashboard() # Initialize inspector with default topic if available topic_folders = get_topic_folders("output") default_topic = app_state.get("selected_topic") or ( topic_folders[0] if topic_folders else None ) if default_topic: app_state["selected_topic"] = default_topic await update_inspector(default_topic) update_util_tab() # Run the app with increased WebSocket timeout to prevent disconnections during long operations ui.run( title='AlgoVision', port=int(os.environ.get('PORT', 8080)), reconnect_timeout=3600.0, # 5 minutes timeout instead of default 30 seconds reload=False, # Disable auto-reload to prevent interruption of Manim rendering )