Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import time | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| import json | |
| import datetime | |
| import threading | |
| from typing import List, Dict, Optional | |
| import gradio as gr | |
| import numpy as np | |
| # Try to import optional dependencies | |
| try: | |
| import whisper | |
| WHISPER_AVAILABLE = True | |
| print("β Whisper available") | |
| except ImportError: | |
| WHISPER_AVAILABLE = False | |
| print("β Whisper not available") | |
| try: | |
| import spacy | |
| nlp = None | |
| try: | |
| nlp = spacy.load("en_core_web_sm") | |
| SPACY_AVAILABLE = True | |
| print("β spaCy model available") | |
| except OSError: | |
| SPACY_AVAILABLE = False | |
| print("β spaCy model not available") | |
| except ImportError: | |
| SPACY_AVAILABLE = False | |
| print("β spaCy not available") | |
| try: | |
| from transformers import pipeline | |
| import torch | |
| TRANSFORMERS_AVAILABLE = True | |
| print("β Transformers available") | |
| except ImportError: | |
| TRANSFORMERS_AVAILABLE = False | |
| print("β Transformers not available") | |
| def check_ffmpeg(): | |
| """Check if ffmpeg is available""" | |
| try: | |
| result = subprocess.run(["ffmpeg", "-version"], capture_output=True) | |
| return result.returncode == 0 | |
| except: | |
| return False | |
| def get_video_info(video_path: str) -> Dict: | |
| """Get video information using ffprobe""" | |
| try: | |
| cmd = [ | |
| "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", | |
| "-show_streams", video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| info = json.loads(result.stdout) | |
| # Extract video stream info | |
| video_streams = [s for s in info.get('streams', []) if s.get('codec_type') == 'video'] | |
| audio_streams = [s for s in info.get('streams', []) if s.get('codec_type') == 'audio'] | |
| duration = float(info.get('format', {}).get('duration', 0)) | |
| return { | |
| 'duration': duration, | |
| 'has_video': len(video_streams) > 0, | |
| 'has_audio': len(audio_streams) > 0, | |
| 'video_codec': video_streams[0].get('codec_name') if video_streams else None, | |
| 'audio_codec': audio_streams[0].get('codec_name') if audio_streams else None | |
| } | |
| except Exception as e: | |
| print(f"Error getting video info: {e}") | |
| return {'duration': 0, 'has_video': False, 'has_audio': False} | |
| def extract_audio_simple(video_path: str, audio_path: str, start_time: float = 0, duration: float = 180) -> bool: | |
| """Extract audio with simpler approach and better error handling""" | |
| try: | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-ss", str(start_time), | |
| "-i", video_path, | |
| "-t", str(duration), | |
| "-vn", | |
| "-acodec", "pcm_s16le", | |
| "-ar", "16000", | |
| "-ac", "1", | |
| "-f", "wav", | |
| audio_path | |
| ] | |
| print(f"Extracting audio: {' '.join(cmd)}") | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| if os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: | |
| print(f"Audio extracted successfully: {os.path.getsize(audio_path)} bytes") | |
| return True | |
| else: | |
| print("Audio file created but seems empty") | |
| return False | |
| else: | |
| print(f"FFmpeg error: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| print(f"Error extracting audio: {str(e)}") | |
| return False | |
| def extract_frame(video_path: str, timestamp: float, output_path: str) -> bool: | |
| """Extract frame from video at specific timestamp""" | |
| try: | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-ss", str(timestamp), | |
| "-i", video_path, | |
| "-vframes", "1", | |
| "-q:v", "2", | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0 and os.path.exists(output_path): | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error extracting frame: {e}") | |
| return False | |
| def transcribe_audio_whisper_simple(audio_path: str) -> str: | |
| """Simplified Whisper transcription that just returns text""" | |
| try: | |
| if not WHISPER_AVAILABLE: | |
| return "Whisper not available" | |
| print(f"Starting Whisper transcription of {audio_path}") | |
| # Load the smallest model | |
| model = whisper.load_model("tiny") | |
| # Use faster settings | |
| options = { | |
| "language": "en", | |
| "task": "transcribe", | |
| "fp16": False, | |
| "beam_size": 1 | |
| } | |
| # Transcribe | |
| result = model.transcribe(audio_path, **options) | |
| if result and "text" in result: | |
| return result["text"].strip() | |
| else: | |
| return "Transcription failed" | |
| except Exception as e: | |
| print(f"Whisper transcription error: {str(e)}") | |
| return f"Transcription error: {str(e)}" | |
| def transcribe_audio_transformers_simple(audio_path: str) -> str: | |
| """Simplified Transformers transcription that just returns text""" | |
| try: | |
| if not TRANSFORMERS_AVAILABLE: | |
| return "Transformers not available" | |
| print(f"Starting Transformers transcription of {audio_path}") | |
| # Use the smallest model with minimal settings | |
| asr = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-tiny", | |
| device=-1 # Force CPU | |
| ) | |
| # Simple transcription | |
| result = asr(audio_path) | |
| if isinstance(result, dict) and "text" in result: | |
| return result["text"].strip() | |
| elif isinstance(result, str): | |
| return result.strip() | |
| else: | |
| return str(result) | |
| except Exception as e: | |
| print(f"Transformers transcription error: {str(e)}") | |
| return f"Transcription error: {str(e)}" | |
| def transcribe_audio_simple(audio_path: str) -> str: | |
| """Main transcription function that returns simple text""" | |
| # Try Whisper first | |
| if WHISPER_AVAILABLE: | |
| try: | |
| return transcribe_audio_whisper_simple(audio_path) | |
| except Exception as e: | |
| print(f"Whisper failed: {e}") | |
| # Try Transformers as fallback | |
| if TRANSFORMERS_AVAILABLE: | |
| try: | |
| return transcribe_audio_transformers_simple(audio_path) | |
| except Exception as e: | |
| print(f"Transformers failed: {e}") | |
| # Use fallback | |
| return "Transcription not available - no speech recognition models loaded" | |
| def extract_key_phrases_simple(text: str, top_n: int = 5) -> List[str]: | |
| """Simple key phrase extraction""" | |
| if not text: | |
| return [] | |
| words = text.split() | |
| key_words = [ | |
| w.strip('.,!?";:()') for w in words | |
| if len(w) > 4 and w.isalpha() and w.lower() not in { | |
| 'this', 'that', 'with', 'have', 'will', 'from', 'they', 'been', | |
| 'were', 'said', 'each', 'which', 'their', 'time', 'would', 'there' | |
| } | |
| ] | |
| seen = set() | |
| unique_words = [w for w in key_words if not (w.lower() in seen or seen.add(w.lower()))] | |
| return unique_words[:top_n] | |
| def summarize_text_simple(text: str) -> str: | |
| """Simple text summarization""" | |
| if not text or len(text.split()) < 10: | |
| return text | |
| sentences = text.split('.') | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if len(sentences) <= 2: | |
| return text | |
| elif len(sentences) <= 5: | |
| return '. '.join(sentences[:2]) + '.' | |
| else: | |
| # Take first, middle, and last sentences | |
| middle_idx = len(sentences) // 2 | |
| summary_sentences = [sentences[0], sentences[middle_idx], sentences[-1]] | |
| return '. '.join(summary_sentences) + '.' | |
| def format_timestamp(seconds: float) -> str: | |
| """Format seconds into MM:SS format""" | |
| minutes = int(seconds // 60) | |
| remaining_seconds = int(seconds % 60) | |
| return f"{minutes:02d}:{remaining_seconds:02d}" | |
| def process_video_segment(video_path: str, start_time: float, duration: float, segment_id: int, temp_dir: str) -> Dict: | |
| """Process a single video segment""" | |
| try: | |
| print(f"Processing segment {segment_id}: {start_time}s - {start_time + duration}s") | |
| # Create paths | |
| audio_path = os.path.join(temp_dir, f"segment_{segment_id:03d}.wav") | |
| frame_path = os.path.join(temp_dir, f"frame_{segment_id:03d}.jpg") | |
| # Extract audio for this segment | |
| if not extract_audio_simple(video_path, audio_path, start_time, duration): | |
| return { | |
| "segment": segment_id, | |
| "start_time": format_timestamp(start_time), | |
| "end_time": format_timestamp(start_time + duration), | |
| "start_seconds": start_time, | |
| "end_seconds": start_time + duration, | |
| "text": "Audio extraction failed", | |
| "summary": "Failed to process this segment", | |
| "key_phrases": [], | |
| "frame": None | |
| } | |
| # Extract a frame from the middle of the segment | |
| frame_time = start_time + (duration / 2) | |
| frame_extracted = extract_frame(video_path, frame_time, frame_path) | |
| # Transcribe audio | |
| text = transcribe_audio_simple(audio_path) | |
| # Clean up audio file | |
| try: | |
| os.remove(audio_path) | |
| except: | |
| pass | |
| if not text or text.startswith("Transcription"): | |
| return { | |
| "segment": segment_id, | |
| "start_time": format_timestamp(start_time), | |
| "end_time": format_timestamp(start_time + duration), | |
| "start_seconds": start_time, | |
| "end_seconds": start_time + duration, | |
| "text": text or "No speech detected", | |
| "summary": "No content in this segment", | |
| "key_phrases": [], | |
| "frame": frame_path if frame_extracted else None | |
| } | |
| # Generate summary and key phrases | |
| summary = summarize_text_simple(text) | |
| key_phrases = extract_key_phrases_simple(text) | |
| return { | |
| "segment": segment_id, | |
| "start_time": format_timestamp(start_time), | |
| "end_time": format_timestamp(start_time + duration), | |
| "start_seconds": start_time, | |
| "end_seconds": start_time + duration, | |
| "text": text, | |
| "summary": summary, | |
| "key_phrases": key_phrases, | |
| "frame": frame_path if frame_extracted else None | |
| } | |
| except Exception as e: | |
| print(f"Error processing segment {segment_id}: {str(e)}") | |
| return { | |
| "segment": segment_id, | |
| "start_time": format_timestamp(start_time), | |
| "end_time": format_timestamp(start_time + duration), | |
| "start_seconds": start_time, | |
| "end_seconds": start_time + duration, | |
| "text": f"Processing failed: {str(e)}", | |
| "summary": "Error occurred during processing", | |
| "key_phrases": [], | |
| "frame": None | |
| } | |
| def run_pipeline(video_file: str, progress=gr.Progress()) -> List[Dict]: | |
| """Main pipeline function""" | |
| if not video_file: | |
| return [], "No video file provided", None | |
| # Check if ffmpeg is available | |
| if not check_ffmpeg(): | |
| return [], "FFmpeg is not available in this environment", None | |
| print(f"Processing video: {video_file}") | |
| progress(0.1, desc="Analyzing video...") | |
| # Get video information | |
| video_info = get_video_info(video_file) | |
| print(f"Video info: {video_info}") | |
| if not video_info['has_audio']: | |
| return [], "Video has no audio track", None | |
| duration = video_info['duration'] | |
| if duration == 0: | |
| return [], "Could not determine video duration", None | |
| # Limit processing time | |
| max_duration = min(duration, 600) # Max 10 minutes | |
| segment_length = 120 # 2 minutes per segment | |
| progress(0.2, desc=f"Video duration: {duration:.1f}s, processing {max_duration:.1f}s...") | |
| # Create temporary directory | |
| temp_dir = tempfile.mkdtemp(prefix="lecture_capture_") | |
| try: | |
| # Calculate segments | |
| segments_to_process = [] | |
| current_time = 0 | |
| segment_id = 1 | |
| while current_time < max_duration: | |
| remaining_time = max_duration - current_time | |
| actual_duration = min(segment_length, remaining_time) | |
| segments_to_process.append({ | |
| 'start_time': current_time, | |
| 'duration': actual_duration, | |
| 'segment_id': segment_id | |
| }) | |
| current_time += actual_duration | |
| segment_id += 1 | |
| print(f"Will process {len(segments_to_process)} segments") | |
| # Process each segment | |
| timeline = [] | |
| for i, seg_info in enumerate(segments_to_process): | |
| progress( | |
| 0.3 + (0.6 * i / len(segments_to_process)), | |
| desc=f"Processing segment {i+1}/{len(segments_to_process)}..." | |
| ) | |
| try: | |
| result = process_video_segment( | |
| video_file, | |
| seg_info['start_time'], | |
| seg_info['duration'], | |
| seg_info['segment_id'], | |
| temp_dir | |
| ) | |
| timeline.append(result) | |
| except Exception as e: | |
| print(f"Error processing segment {i+1}: {str(e)}") | |
| timeline.append({ | |
| "segment": seg_info['segment_id'], | |
| "start_time": format_timestamp(seg_info['start_time']), | |
| "end_time": format_timestamp(seg_info['start_time'] + seg_info['duration']), | |
| "start_seconds": seg_info['start_time'], | |
| "end_seconds": seg_info['start_time'] + seg_info['duration'], | |
| "text": f"Error: {str(e)}", | |
| "summary": "Processing failed", | |
| "key_phrases": [], | |
| "frame": None | |
| }) | |
| progress(0.9, desc="Generating visual timeline...") | |
| if not timeline: | |
| return [], "No segments were successfully processed", None | |
| # Generate HTML for visual timeline | |
| html_timeline = generate_visual_timeline(timeline, video_file) | |
| # Generate summary of the entire video | |
| all_text = " ".join([segment["text"] for segment in timeline if not segment["text"].startswith("Error") and not segment["text"].startswith("Processing")]) | |
| video_summary = summarize_text_simple(all_text) if all_text else "No valid transcription available" | |
| progress(1.0, desc="Processing complete!") | |
| return timeline, html_timeline, video_summary | |
| except Exception as e: | |
| import traceback | |
| print(f"Pipeline error: {str(e)}") | |
| print(traceback.format_exc()) | |
| return [], f"Pipeline failed: {str(e)}", None | |
| finally: | |
| # Don't delete temp_dir as we need the frames for display | |
| # We'll clean it up at the end of the session | |
| pass | |
| def generate_visual_timeline(timeline: List[Dict], video_path: str) -> str: | |
| """Generate HTML for visual timeline""" | |
| if not timeline: | |
| return "<p>No timeline data available</p>" | |
| html = """ | |
| <style> | |
| .timeline-container { | |
| font-family: Arial, sans-serif; | |
| max-width: 100%; | |
| margin: 0 auto; | |
| } | |
| .timeline-segment { | |
| display: flex; | |
| margin-bottom: 20px; | |
| padding: 15px; | |
| border-radius: 8px; | |
| background-color: #f9f9f9; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| .timeline-segment:nth-child(odd) { | |
| background-color: #f0f7ff; | |
| } | |
| .timeline-thumbnail { | |
| flex: 0 0 160px; | |
| margin-right: 15px; | |
| } | |
| .timeline-thumbnail img { | |
| width: 160px; | |
| height: 90px; | |
| object-fit: cover; | |
| border-radius: 4px; | |
| } | |
| .timeline-content { | |
| flex: 1; | |
| } | |
| .timeline-header { | |
| display: flex; | |
| justify-content: space-between; | |
| margin-bottom: 8px; | |
| } | |
| .timeline-timestamp { | |
| font-weight: bold; | |
| color: #555; | |
| } | |
| .timeline-summary { | |
| font-weight: bold; | |
| margin-bottom: 8px; | |
| } | |
| .timeline-text { | |
| margin-bottom: 8px; | |
| color: #333; | |
| } | |
| .timeline-tags { | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 5px; | |
| } | |
| .timeline-tag { | |
| background-color: #e1ecf4; | |
| color: #39739d; | |
| padding: 2px 8px; | |
| border-radius: 12px; | |
| font-size: 12px; | |
| } | |
| .timeline-placeholder { | |
| background-color: #ddd; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| color: #666; | |
| font-size: 12px; | |
| } | |
| .timeline-error { | |
| color: #d32f2f; | |
| font-style: italic; | |
| } | |
| .timeline-transcript { | |
| margin: 8px 0; | |
| } | |
| .transcript-toggle { | |
| cursor: pointer; | |
| color: #39739d; | |
| font-weight: 500; | |
| padding: 4px 0; | |
| } | |
| .transcript-toggle:hover { | |
| color: #2c5aa0; | |
| } | |
| .timeline-transcript[open] .timeline-text { | |
| margin-top: 8px; | |
| padding: 10px; | |
| background-color: #f8f9fa; | |
| border-radius: 4px; | |
| border-left: 3px solid #39739d; | |
| } | |
| </style> | |
| <div class="timeline-container"> | |
| """ | |
| for segment in timeline: | |
| # Skip if this is the info segment | |
| if "info" in segment: | |
| continue | |
| segment_id = segment.get("segment", "") | |
| start_time = segment.get("start_time", "") | |
| end_time = segment.get("end_time", "") | |
| text = segment.get("text", "") | |
| summary = segment.get("summary", "") | |
| key_phrases = segment.get("key_phrases", []) | |
| frame_path = segment.get("frame") | |
| # Check if this segment has an error | |
| has_error = text.startswith("Error") or text.startswith("Processing failed") or text.startswith("Transcription error") | |
| html += f""" | |
| <div class="timeline-segment"> | |
| <div class="timeline-thumbnail"> | |
| """ | |
| if frame_path and os.path.exists(frame_path): | |
| # Use base64 encoding for the image | |
| import base64 | |
| try: | |
| with open(frame_path, "rb") as img_file: | |
| img_data = base64.b64encode(img_file.read()).decode('utf-8') | |
| html += f'<img src="data:image/jpeg;base64,{img_data}" alt="Frame at {start_time}">' | |
| except: | |
| html += f'<div class="timeline-placeholder" style="width:160px;height:90px;">No thumbnail</div>' | |
| else: | |
| html += f'<div class="timeline-placeholder" style="width:160px;height:90px;">No thumbnail</div>' | |
| html += """ | |
| </div> | |
| <div class="timeline-content"> | |
| <div class="timeline-header"> | |
| """ | |
| html += f'<div class="timeline-timestamp">Segment {segment_id}: {start_time} - {end_time}</div>' | |
| html += """ | |
| </div> | |
| """ | |
| if has_error: | |
| html += f'<div class="timeline-error">{text}</div>' | |
| else: | |
| html += f'<div class="timeline-summary">{summary}</div>' | |
| html += f''' | |
| <details class="timeline-transcript"> | |
| <summary class="transcript-toggle">View Full Transcription</summary> | |
| <div class="timeline-text">{text}</div> | |
| </details> | |
| ''' | |
| html += """ | |
| </div> | |
| </div> | |
| """ | |
| html += "</div>" | |
| return html | |
| def create_interface(): | |
| with gr.Blocks(title="Lecture Capture AI Pipeline", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # NeverMiss.AI | |
| Upload a lecture video to automatically generate: | |
| - Transcription with timestamps | |
| - Summaries for each segment | |
| - Key phrases extraction | |
| - Visual timeline with thumbnails | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video( | |
| label="Upload Lecture Video", | |
| height=300 | |
| ) | |
| process_btn = gr.Button( | |
| "Process Video", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| video_summary = gr.Textbox( | |
| label="Video Summary", | |
| placeholder="Video summary will appear here after processing", | |
| lines=4 | |
| ) | |
| with gr.Column(scale=2): | |
| with gr.Tabs(): | |
| with gr.TabItem("Visual Timeline"): | |
| timeline_html = gr.HTML( | |
| label="Visual Timeline", | |
| value="<p>Timeline will appear here after processing</p>" | |
| ) | |
| with gr.TabItem("Raw Data"): | |
| timeline_json = gr.JSON( | |
| label="Timeline Data" | |
| ) | |
| process_btn.click( | |
| fn=run_pipeline, | |
| inputs=[video_input], | |
| outputs=[timeline_json, timeline_html, video_summary], | |
| show_progress=True | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| # Check if ffmpeg is available | |
| if check_ffmpeg(): | |
| print("FFmpeg available") | |
| else: | |
| print("FFmpeg not available") | |
| demo = create_interface() | |
| demo.launch(debug=True) | |