Spaces:
Sleeping
Sleeping
| # lesson_gen.py | |
| import os | |
| import io | |
| import json | |
| import logging | |
| import uuid | |
| import tempfile | |
| import re | |
| from pathlib import Path | |
| import numpy as np | |
| import requests | |
| import subprocess | |
| import shutil | |
| import cv2 | |
| # LangChain for data sourcing | |
| from langchain_community.document_loaders import ArxivLoader | |
| # Google Gemini | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # Video, Audio, and Animation (using robust tools) | |
| from PIL import Image, ImageDraw, ImageFont | |
| import matplotlib | |
| matplotlib.use('Agg') # Use non-interactive backend | |
| import matplotlib.pyplot as plt | |
| from matplotlib.animation import FuncAnimation, FFMpegWriter | |
| # --- 1. CONFIGURATION --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s') | |
| FPS, WIDTH, HEIGHT = 24, 1280, 720 | |
| PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
| # --- 2. HELPER & UTILITY FUNCTIONS (Adapted from Sozo) --- | |
| def deepgram_tts(txt: str, voice_model: str = 'aura-2-andromeda-en'): | |
| DG_KEY = os.getenv("DEEPGRAM_API_KEY") | |
| if not DG_KEY or not txt: return None | |
| clean_txt = re.sub(r"[^\w\s.,!?;:-]", "", txt) | |
| try: | |
| r = requests.post("https://api.deepgram.com/v1/speak", params={"model": voice_model}, headers={"Authorization": f"Token {DG_KEY}"}, json={"text": clean_txt}, timeout=45) | |
| r.raise_for_status() | |
| return r.content | |
| except Exception as e: | |
| logging.error(f"Deepgram TTS failed: {e}") | |
| return None | |
| def audio_duration(path: str) -> float: | |
| try: | |
| res = subprocess.run(["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", path], text=True, capture_output=True, check=True) | |
| return float(res.stdout.strip()) | |
| except Exception: return 5.0 | |
| def generate_silence_mp3(duration: float, out: Path): | |
| subprocess.run(["ffmpeg", "-y", "-f", "lavfi", "-i", f"anullsrc=r=44100:cl=mono", "-t", f"{duration:.3f}", "-q:a", "9", str(out)], check=True, capture_output=True) | |
| def concat_media(file_paths: list, output_path: Path): | |
| valid_paths = [p for p in file_paths if Path(p).exists() and Path(p).stat().st_size > 100] | |
| if not valid_paths: raise ValueError("No valid media files to concatenate.") | |
| if len(valid_paths) == 1: | |
| shutil.copy2(valid_paths[0], str(output_path)) | |
| return | |
| list_file = output_path.with_suffix(".txt") | |
| with open(list_file, 'w') as f: | |
| for path in valid_paths: f.write(f"file '{Path(path).resolve()}'\n") | |
| cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(list_file), "-c", "copy", str(output_path)] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| finally: | |
| list_file.unlink(missing_ok=True) | |
| # --- 3. AI & CONTENT GENERATION --- | |
| def get_llm(): | |
| return ChatGoogleGenerativeAI(model="gemini-2.5-flash", google_api_key=os.getenv("GOOGLE_API_KEY"), temperature=0.5) | |
| def fetch_arxiv_papers(topic: str, count=3): | |
| logging.info(f"Fetching {count} arXiv papers for topic: '{topic}'") | |
| try: | |
| loader = ArxivLoader(query=topic, load_max_docs=count, load_all_available_meta=True) | |
| docs = loader.load() | |
| logging.info(f"Successfully fetched {len(docs)} documents from arXiv.") | |
| return docs | |
| except Exception as e: | |
| logging.error(f"Failed to fetch from arXiv: {e}") | |
| return [] | |
| def generate_knowledge_base(topic: str, level: str, goal: str, arxiv_docs: list): | |
| # This function remains solid, no changes needed. | |
| logging.info(f"Generating Knowledge Base for topic: {topic}") | |
| llm = get_llm() | |
| papers_context = "\n\n".join([f"Title: {doc.metadata.get('Title', 'N/A')}\nAbstract: {doc.page_content}" for doc in arxiv_docs]) | |
| prompt = f"""You are an expert curriculum designer. Create a structured Knowledge Base for a personalized course on "{topic}". The learner's level is {level} and their goal is {goal}. Synthesize the following research: --- {papers_context} ---. Generate a JSON object with "topic", "introduction", a "learning_path" array of 5-7 key concepts, and "detailed_concepts" dictionary. Return ONLY the valid JSON object.""" | |
| try: | |
| response = llm.invoke(prompt).content.strip().replace("```json", "").replace("```", "") | |
| return json.loads(response) | |
| except Exception as e: logging.error(f"Failed to generate Knowledge Base: {e}"); raise | |
| def generate_lesson_from_knowledge_base(knowledge_base: dict, concept_to_cover: str): | |
| logging.info(f"Generating lesson for concept: '{concept_to_cover}'") | |
| llm = get_llm() | |
| concept_details = knowledge_base.get("detailed_concepts", {}).get(concept_to_cover, "") | |
| available_animations = ["Linear Regression", "Neural Network"] | |
| animation_instruction = "" | |
| # Find a concept that is a substring of the concept_to_cover | |
| for anim_concept in available_animations: | |
| if anim_concept.lower() in concept_to_cover.lower(): | |
| animation_tag = anim_concept.lower().replace(" ", "_") | |
| animation_instruction = f'When explaining the core mechanism of {anim_concept}, you MUST insert the tag `<visual: "{animation_tag}">` in the script. This is crucial for visualization.' | |
| break | |
| prompt = f"""You are ProfAI, an engaging AI professor. Create a lesson on "{concept_to_cover}". Detailed info: --- {concept_details} ---. {animation_instruction} The script must begin with a short, engaging introduction (1-2 sentences). Generate a JSON object with "script" (a 60-90 second video script) and "quiz" (3 multiple-choice questions). Return ONLY valid JSON.""" | |
| try: | |
| response = llm.invoke(prompt).content.strip().replace("```json", "").replace("```", "") | |
| return json.loads(response) | |
| except Exception as e: logging.error(f"Failed to generate lesson content: {e}"); raise | |
| def generate_remedial_lesson(failed_concept: str): | |
| logging.info(f"Generating remedial lesson for concept: '{failed_concept}'") | |
| llm = get_llm() | |
| prompt = f"""You are ProfAI. A student struggled with "{failed_concept}". Create a short, remedial micro-lesson. Generate JSON with "script" (a simple, 30-45 second explanation with a new analogy) and "quiz" (ONE multiple-choice question). Return ONLY valid JSON.""" | |
| try: | |
| response = llm.invoke(prompt).content.strip().replace("```json", "").replace("```", "") | |
| return json.loads(response) | |
| except Exception as e: logging.error(f"Failed to generate remedial lesson: {e}"); raise | |
| # --- 4. ANIMATION & VIDEO GENERATION (NEW ENGINE) --- | |
| def animate_linear_regression(duration, output_path: Path): | |
| logging.info("Generating Matplotlib animation for Linear Regression.") | |
| fig, ax = plt.subplots(figsize=(WIDTH/100, HEIGHT/100), dpi=120) | |
| np.random.seed(42) | |
| X = 2 * np.random.rand(100, 1); y = 4 + 3 * X + np.random.randn(100, 1) | |
| ax.scatter(X, y, alpha=0.6, label='Data Points') | |
| line, = ax.plot([], [], 'r-', lw=3, label='Regression Line') | |
| ax.set_xlim(0, 2); ax.set_ylim(0, 15) | |
| ax.set_title("Linear Regression: Finding the Best Fit Line", fontsize=16) | |
| ax.legend(); plt.tight_layout() | |
| def init(): line.set_data([], []); return line, | |
| def update(frame): | |
| progress = frame / (duration * FPS) | |
| slope, intercept = 3 * progress, 4 | |
| x_vals = np.array([0, 2]); y_vals = intercept + slope * x_vals | |
| line.set_data(x_vals, y_vals); return line, | |
| anim = FuncAnimation(fig, update, frames=int(duration * FPS), init_func=init, blit=True) | |
| anim.save(str(output_path), writer=FFMpegWriter(fps=FPS)) | |
| plt.close(fig) | |
| def generate_matplotlib_animation(concept_tag: str, duration: float, temp_dir: Path) -> Path: | |
| output_path = temp_dir / f"anim_{concept_tag}.mp4" | |
| if concept_tag == "linear_regression": | |
| animate_linear_regression(duration, output_path) | |
| return output_path | |
| # Add more animation concepts here with 'elif concept_tag == "new_concept":' | |
| raise ValueError(f"Animation for '{concept_tag}' not implemented.") | |
| def search_and_download_pexels_video(query: str, duration: float, out_path: Path) -> str: | |
| if not PEXELS_API_KEY: | |
| logging.warning("PEXELS_API_KEY not set.") | |
| return None | |
| try: | |
| response = requests.get("https://api.pexels.com/videos/search", headers={"Authorization": PEXELS_API_KEY}, params={"query": query, "per_page": 5, "orientation": "landscape"}, timeout=20) | |
| response.raise_for_status() | |
| videos = [v for f in v.get('video_files', []) if f.get('quality') == 'hd' and f.get('width') >= 1280 for v in response.json().get('videos', [])] | |
| if not videos: return None | |
| with requests.get(videos[0]['video_files'][0]['link'], stream=True, timeout=60) as r, tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_dl: | |
| r.raise_for_status() | |
| for chunk in r.iter_content(chunk_size=8192): temp_dl.write(chunk) | |
| temp_dl_path = Path(temp_dl.name) | |
| cmd = ["ffmpeg", "-y", "-stream_loop", "-1", "-i", str(temp_dl_path), "-vf", f"scale={WIDTH}:{HEIGHT}:force_original_aspect_ratio=decrease,pad={WIDTH}:{HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1", "-t", f"{duration:.3f}", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-an", str(out_path)] | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| temp_dl_path.unlink() | |
| return str(out_path) | |
| except Exception as e: | |
| logging.error(f"Pexels processing failed for query '{query}': {e}") | |
| if 'temp_dl_path' in locals() and temp_dl_path.exists(): temp_dl_path.unlink() | |
| return None | |
| def create_title_card(text: str, duration: float, output_path: Path): | |
| """Creates a simple video clip with centered text.""" | |
| frame = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8) | |
| frame[:] = (40, 20, 20) # Dark blue background | |
| try: | |
| font = ImageFont.truetype("arial.ttf", 60) | |
| except IOError: | |
| font = ImageFont.load_default() | |
| img = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(img) | |
| text_bbox = draw.textbbox((0, 0), text, font=font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| position = ((WIDTH - text_width) / 2, (HEIGHT - text_height) / 2) | |
| draw.text(position, text, font=font, fill=(255, 255, 255)) | |
| final_frame = np.array(img) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(str(output_path), fourcc, FPS, (WIDTH, HEIGHT)) | |
| for _ in range(int(FPS * duration)): | |
| out.write(final_frame) | |
| out.release() | |
| return str(output_path) | |
| def generate_profai_video_from_script(script: str, topic: str): | |
| logging.info("Starting new video generation process.") | |
| with tempfile.TemporaryDirectory() as temp_dir_str: | |
| temp_dir = Path(temp_dir_str) | |
| # 1. Parse Script into Scenes | |
| tag_pattern = r'(<visual: "([^"]+)">)' | |
| script_parts = re.split(tag_pattern, script) | |
| scenes = [] | |
| for i in range(0, len(script_parts), 3): | |
| text = script_parts[i].strip() | |
| tag = script_parts[i+2] if i+2 < len(script_parts) else None | |
| if text: scenes.append({"text": text, "tag": tag}) | |
| # 2. Generate Audio and Visuals for each scene | |
| video_parts, audio_parts = [], [] | |
| total_audio_duration = 0 | |
| for i, scene in enumerate(scenes): | |
| narration_audio_bytes = deepgram_tts(scene['text']) | |
| if not narration_audio_bytes: | |
| logging.warning(f"TTS failed for scene {i}. Skipping.") | |
| continue | |
| audio_path = temp_dir / f"audio_{i}.mp3" | |
| audio_path.write_bytes(narration_audio_bytes) | |
| scene_audio_dur = audio_duration(str(audio_path)) | |
| audio_parts.append(str(audio_path)) | |
| total_audio_duration += scene_audio_dur | |
| video_path = temp_dir / f"video_{i}.mp4" | |
| visual_generated = False | |
| # Try to generate specific visual from tag | |
| if scene['tag']: | |
| try: | |
| logging.info(f"Attempting to generate animation for tag: {scene['tag']}") | |
| generate_matplotlib_animation(scene['tag'], scene_audio_dur, video_path) | |
| visual_generated = True | |
| except Exception as e: | |
| logging.warning(f"Animation failed for tag '{scene['tag']}': {e}. Triggering Pexels fallback.") | |
| # Fallback or default visual generation | |
| if not visual_generated: | |
| query = scene['tag'] if scene['tag'] else f"{topic} abstract" | |
| logging.info(f"Searching Pexels with query: '{query}'") | |
| pexels_path = search_and_download_pexels_video(query, scene_audio_dur, video_path) | |
| if not pexels_path: | |
| logging.warning("Pexels failed. Creating a title card as final fallback.") | |
| create_title_card(scene['text'], scene_audio_dur, video_path) | |
| video_parts.append(str(video_path)) | |
| if not video_parts or not audio_parts: raise Exception("Failed to generate any video or audio parts.") | |
| # 3. Concatenate and Finalize Video | |
| silent_vid_path = temp_dir / "silent_video.mp4" | |
| audio_mix_path = temp_dir / "full_audio.mp3" | |
| final_vid_path = temp_dir / "final_video.mp4" | |
| concat_media(video_parts, silent_vid_path) | |
| concat_media(audio_parts, audio_mix_path) | |
| cmd = ["ffmpeg", "-y", "-i", str(silent_vid_path), "-i", str(audio_mix_path), "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", str(final_vid_path)] | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| return final_vid_path.read_bytes() |