import os import subprocess import time import tempfile import shutil from pathlib import Path import json import datetime import threading from typing import List, Dict, Optional import gradio as gr import numpy as np # Try to import optional dependencies try: import whisper WHISPER_AVAILABLE = True print("✅ Whisper available") except ImportError: WHISPER_AVAILABLE = False print("❌ Whisper not available") try: import spacy nlp = None try: nlp = spacy.load("en_core_web_sm") SPACY_AVAILABLE = True print("✅ spaCy model available") except OSError: SPACY_AVAILABLE = False print("❌ spaCy model not available") except ImportError: SPACY_AVAILABLE = False print("❌ spaCy not available") try: from transformers import pipeline import torch TRANSFORMERS_AVAILABLE = True print("✅ Transformers available") except ImportError: TRANSFORMERS_AVAILABLE = False print("❌ Transformers not available") def check_ffmpeg(): """Check if ffmpeg is available""" try: result = subprocess.run(["ffmpeg", "-version"], capture_output=True) return result.returncode == 0 except: return False def get_video_info(video_path: str) -> Dict: """Get video information using ffprobe""" try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: info = json.loads(result.stdout) # Extract video stream info video_streams = [s for s in info.get('streams', []) if s.get('codec_type') == 'video'] audio_streams = [s for s in info.get('streams', []) if s.get('codec_type') == 'audio'] duration = float(info.get('format', {}).get('duration', 0)) return { 'duration': duration, 'has_video': len(video_streams) > 0, 'has_audio': len(audio_streams) > 0, 'video_codec': video_streams[0].get('codec_name') if video_streams else None, 'audio_codec': audio_streams[0].get('codec_name') if audio_streams else None } except Exception as e: print(f"Error getting video info: {e}") return {'duration': 0, 'has_video': False, 'has_audio': False} def extract_audio_simple(video_path: str, audio_path: str, start_time: float = 0, duration: float = 180) -> bool: """Extract audio with simpler approach and better error handling""" try: cmd = [ "ffmpeg", "-y", "-ss", str(start_time), "-i", video_path, "-t", str(duration), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-f", "wav", audio_path ] print(f"Extracting audio: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: if os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: print(f"Audio extracted successfully: {os.path.getsize(audio_path)} bytes") return True else: print("Audio file created but seems empty") return False else: print(f"FFmpeg error: {result.stderr}") return False except Exception as e: print(f"Error extracting audio: {str(e)}") return False def extract_frame(video_path: str, timestamp: float, output_path: str) -> bool: """Extract frame from video at specific timestamp""" try: cmd = [ "ffmpeg", "-y", "-ss", str(timestamp), "-i", video_path, "-vframes", "1", "-q:v", "2", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0 and os.path.exists(output_path): return True return False except Exception as e: print(f"Error extracting frame: {e}") return False def transcribe_audio_whisper_simple(audio_path: str) -> str: """Simplified Whisper transcription that just returns text""" try: if not WHISPER_AVAILABLE: return "Whisper not available" print(f"Starting Whisper transcription of {audio_path}") # Load the smallest model model = whisper.load_model("tiny") # Use faster settings options = { "language": "en", "task": "transcribe", "fp16": False, "beam_size": 1 } # Transcribe result = model.transcribe(audio_path, **options) if result and "text" in result: return result["text"].strip() else: return "Transcription failed" except Exception as e: print(f"Whisper transcription error: {str(e)}") return f"Transcription error: {str(e)}" def transcribe_audio_transformers_simple(audio_path: str) -> str: """Simplified Transformers transcription that just returns text""" try: if not TRANSFORMERS_AVAILABLE: return "Transformers not available" print(f"Starting Transformers transcription of {audio_path}") # Use the smallest model with minimal settings asr = pipeline( "automatic-speech-recognition", model="openai/whisper-tiny", device=-1 # Force CPU ) # Simple transcription result = asr(audio_path) if isinstance(result, dict) and "text" in result: return result["text"].strip() elif isinstance(result, str): return result.strip() else: return str(result) except Exception as e: print(f"Transformers transcription error: {str(e)}") return f"Transcription error: {str(e)}" def transcribe_audio_simple(audio_path: str) -> str: """Main transcription function that returns simple text""" # Try Whisper first if WHISPER_AVAILABLE: try: return transcribe_audio_whisper_simple(audio_path) except Exception as e: print(f"Whisper failed: {e}") # Try Transformers as fallback if TRANSFORMERS_AVAILABLE: try: return transcribe_audio_transformers_simple(audio_path) except Exception as e: print(f"Transformers failed: {e}") # Use fallback return "Transcription not available - no speech recognition models loaded" def extract_key_phrases_simple(text: str, top_n: int = 5) -> List[str]: """Simple key phrase extraction""" if not text: return [] words = text.split() key_words = [ w.strip('.,!?";:()') for w in words if len(w) > 4 and w.isalpha() and w.lower() not in { 'this', 'that', 'with', 'have', 'will', 'from', 'they', 'been', 'were', 'said', 'each', 'which', 'their', 'time', 'would', 'there' } ] seen = set() unique_words = [w for w in key_words if not (w.lower() in seen or seen.add(w.lower()))] return unique_words[:top_n] def summarize_text_simple(text: str) -> str: """Simple text summarization""" if not text or len(text.split()) < 10: return text sentences = text.split('.') sentences = [s.strip() for s in sentences if s.strip()] if len(sentences) <= 2: return text elif len(sentences) <= 5: return '. '.join(sentences[:2]) + '.' else: # Take first, middle, and last sentences middle_idx = len(sentences) // 2 summary_sentences = [sentences[0], sentences[middle_idx], sentences[-1]] return '. '.join(summary_sentences) + '.' def format_timestamp(seconds: float) -> str: """Format seconds into MM:SS format""" minutes = int(seconds // 60) remaining_seconds = int(seconds % 60) return f"{minutes:02d}:{remaining_seconds:02d}" def process_video_segment(video_path: str, start_time: float, duration: float, segment_id: int, temp_dir: str) -> Dict: """Process a single video segment""" try: print(f"Processing segment {segment_id}: {start_time}s - {start_time + duration}s") # Create paths audio_path = os.path.join(temp_dir, f"segment_{segment_id:03d}.wav") frame_path = os.path.join(temp_dir, f"frame_{segment_id:03d}.jpg") # Extract audio for this segment if not extract_audio_simple(video_path, audio_path, start_time, duration): return { "segment": segment_id, "start_time": format_timestamp(start_time), "end_time": format_timestamp(start_time + duration), "start_seconds": start_time, "end_seconds": start_time + duration, "text": "Audio extraction failed", "summary": "Failed to process this segment", "key_phrases": [], "frame": None } # Extract a frame from the middle of the segment frame_time = start_time + (duration / 2) frame_extracted = extract_frame(video_path, frame_time, frame_path) # Transcribe audio text = transcribe_audio_simple(audio_path) # Clean up audio file try: os.remove(audio_path) except: pass if not text or text.startswith("Transcription"): return { "segment": segment_id, "start_time": format_timestamp(start_time), "end_time": format_timestamp(start_time + duration), "start_seconds": start_time, "end_seconds": start_time + duration, "text": text or "No speech detected", "summary": "No content in this segment", "key_phrases": [], "frame": frame_path if frame_extracted else None } # Generate summary and key phrases summary = summarize_text_simple(text) key_phrases = extract_key_phrases_simple(text) return { "segment": segment_id, "start_time": format_timestamp(start_time), "end_time": format_timestamp(start_time + duration), "start_seconds": start_time, "end_seconds": start_time + duration, "text": text, "summary": summary, "key_phrases": key_phrases, "frame": frame_path if frame_extracted else None } except Exception as e: print(f"Error processing segment {segment_id}: {str(e)}") return { "segment": segment_id, "start_time": format_timestamp(start_time), "end_time": format_timestamp(start_time + duration), "start_seconds": start_time, "end_seconds": start_time + duration, "text": f"Processing failed: {str(e)}", "summary": "Error occurred during processing", "key_phrases": [], "frame": None } def run_pipeline(video_file: str, progress=gr.Progress()) -> List[Dict]: """Main pipeline function""" if not video_file: return [], "No video file provided", None # Check if ffmpeg is available if not check_ffmpeg(): return [], "FFmpeg is not available in this environment", None print(f"Processing video: {video_file}") progress(0.1, desc="Analyzing video...") # Get video information video_info = get_video_info(video_file) print(f"Video info: {video_info}") if not video_info['has_audio']: return [], "Video has no audio track", None duration = video_info['duration'] if duration == 0: return [], "Could not determine video duration", None # Limit processing time max_duration = min(duration, 600) # Max 10 minutes segment_length = 120 # 2 minutes per segment progress(0.2, desc=f"Video duration: {duration:.1f}s, processing {max_duration:.1f}s...") # Create temporary directory temp_dir = tempfile.mkdtemp(prefix="lecture_capture_") try: # Calculate segments segments_to_process = [] current_time = 0 segment_id = 1 while current_time < max_duration: remaining_time = max_duration - current_time actual_duration = min(segment_length, remaining_time) segments_to_process.append({ 'start_time': current_time, 'duration': actual_duration, 'segment_id': segment_id }) current_time += actual_duration segment_id += 1 print(f"Will process {len(segments_to_process)} segments") # Process each segment timeline = [] for i, seg_info in enumerate(segments_to_process): progress( 0.3 + (0.6 * i / len(segments_to_process)), desc=f"Processing segment {i+1}/{len(segments_to_process)}..." ) try: result = process_video_segment( video_file, seg_info['start_time'], seg_info['duration'], seg_info['segment_id'], temp_dir ) timeline.append(result) except Exception as e: print(f"Error processing segment {i+1}: {str(e)}") timeline.append({ "segment": seg_info['segment_id'], "start_time": format_timestamp(seg_info['start_time']), "end_time": format_timestamp(seg_info['start_time'] + seg_info['duration']), "start_seconds": seg_info['start_time'], "end_seconds": seg_info['start_time'] + seg_info['duration'], "text": f"Error: {str(e)}", "summary": "Processing failed", "key_phrases": [], "frame": None }) progress(0.9, desc="Generating visual timeline...") if not timeline: return [], "No segments were successfully processed", None # Generate HTML for visual timeline html_timeline = generate_visual_timeline(timeline, video_file) # Generate summary of the entire video all_text = " ".join([segment["text"] for segment in timeline if not segment["text"].startswith("Error") and not segment["text"].startswith("Processing")]) video_summary = summarize_text_simple(all_text) if all_text else "No valid transcription available" progress(1.0, desc="Processing complete!") return timeline, html_timeline, video_summary except Exception as e: import traceback print(f"Pipeline error: {str(e)}") print(traceback.format_exc()) return [], f"Pipeline failed: {str(e)}", None finally: # Don't delete temp_dir as we need the frames for display # We'll clean it up at the end of the session pass def generate_visual_timeline(timeline: List[Dict], video_path: str) -> str: """Generate HTML for visual timeline""" if not timeline: return "

No timeline data available

" html = """
""" for segment in timeline: # Skip if this is the info segment if "info" in segment: continue segment_id = segment.get("segment", "") start_time = segment.get("start_time", "") end_time = segment.get("end_time", "") text = segment.get("text", "") summary = segment.get("summary", "") key_phrases = segment.get("key_phrases", []) frame_path = segment.get("frame") # Check if this segment has an error has_error = text.startswith("Error") or text.startswith("Processing failed") or text.startswith("Transcription error") html += f"""
""" if frame_path and os.path.exists(frame_path): # Use base64 encoding for the image import base64 try: with open(frame_path, "rb") as img_file: img_data = base64.b64encode(img_file.read()).decode('utf-8') html += f'Frame at {start_time}' except: html += f'
No thumbnail
' else: html += f'
No thumbnail
' html += """
""" html += f'
Segment {segment_id}: {start_time} - {end_time}
' html += """
""" if has_error: html += f'
{text}
' else: html += f'
{summary}
' html += f'''
View Full Transcription
{text}
''' html += """
""" html += "
" return html def create_interface(): with gr.Blocks(title="Lecture Capture AI Pipeline", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # NeverMiss.AI Upload a lecture video to automatically generate: - Transcription with timestamps - Summaries for each segment - Key phrases extraction - Visual timeline with thumbnails """) with gr.Row(): with gr.Column(scale=1): video_input = gr.Video( label="Upload Lecture Video", height=300 ) process_btn = gr.Button( "Process Video", variant="primary", size="lg" ) video_summary = gr.Textbox( label="Video Summary", placeholder="Video summary will appear here after processing", lines=4 ) with gr.Column(scale=2): with gr.Tabs(): with gr.TabItem("Visual Timeline"): timeline_html = gr.HTML( label="Visual Timeline", value="

Timeline will appear here after processing

" ) with gr.TabItem("Raw Data"): timeline_json = gr.JSON( label="Timeline Data" ) process_btn.click( fn=run_pipeline, inputs=[video_input], outputs=[timeline_json, timeline_html, video_summary], show_progress=True ) return demo if __name__ == "__main__": # Check if ffmpeg is available if check_ffmpeg(): print("FFmpeg available") else: print("FFmpeg not available") demo = create_interface() demo.launch(debug=True)