lochn's picture
Update app.py
46ec7f0 verified
import os
import subprocess
import time
import tempfile
import shutil
from pathlib import Path
import json
import datetime
import threading
from typing import List, Dict, Optional
import gradio as gr
import numpy as np
# Try to import optional dependencies
try:
import whisper
WHISPER_AVAILABLE = True
print("βœ… Whisper available")
except ImportError:
WHISPER_AVAILABLE = False
print("❌ Whisper not available")
try:
import spacy
nlp = None
try:
nlp = spacy.load("en_core_web_sm")
SPACY_AVAILABLE = True
print("βœ… spaCy model available")
except OSError:
SPACY_AVAILABLE = False
print("❌ spaCy model not available")
except ImportError:
SPACY_AVAILABLE = False
print("❌ spaCy not available")
try:
from transformers import pipeline
import torch
TRANSFORMERS_AVAILABLE = True
print("βœ… Transformers available")
except ImportError:
TRANSFORMERS_AVAILABLE = False
print("❌ Transformers not available")
def check_ffmpeg():
"""Check if ffmpeg is available"""
try:
result = subprocess.run(["ffmpeg", "-version"], capture_output=True)
return result.returncode == 0
except:
return False
def get_video_info(video_path: str) -> Dict:
"""Get video information using ffprobe"""
try:
cmd = [
"ffprobe", "-v", "quiet", "-print_format", "json", "-show_format",
"-show_streams", video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
info = json.loads(result.stdout)
# Extract video stream info
video_streams = [s for s in info.get('streams', []) if s.get('codec_type') == 'video']
audio_streams = [s for s in info.get('streams', []) if s.get('codec_type') == 'audio']
duration = float(info.get('format', {}).get('duration', 0))
return {
'duration': duration,
'has_video': len(video_streams) > 0,
'has_audio': len(audio_streams) > 0,
'video_codec': video_streams[0].get('codec_name') if video_streams else None,
'audio_codec': audio_streams[0].get('codec_name') if audio_streams else None
}
except Exception as e:
print(f"Error getting video info: {e}")
return {'duration': 0, 'has_video': False, 'has_audio': False}
def extract_audio_simple(video_path: str, audio_path: str, start_time: float = 0, duration: float = 180) -> bool:
"""Extract audio with simpler approach and better error handling"""
try:
cmd = [
"ffmpeg", "-y",
"-ss", str(start_time),
"-i", video_path,
"-t", str(duration),
"-vn",
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
"-f", "wav",
audio_path
]
print(f"Extracting audio: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
if os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000:
print(f"Audio extracted successfully: {os.path.getsize(audio_path)} bytes")
return True
else:
print("Audio file created but seems empty")
return False
else:
print(f"FFmpeg error: {result.stderr}")
return False
except Exception as e:
print(f"Error extracting audio: {str(e)}")
return False
def extract_frame(video_path: str, timestamp: float, output_path: str) -> bool:
"""Extract frame from video at specific timestamp"""
try:
cmd = [
"ffmpeg", "-y",
"-ss", str(timestamp),
"-i", video_path,
"-vframes", "1",
"-q:v", "2",
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0 and os.path.exists(output_path):
return True
return False
except Exception as e:
print(f"Error extracting frame: {e}")
return False
def transcribe_audio_whisper_simple(audio_path: str) -> str:
"""Simplified Whisper transcription that just returns text"""
try:
if not WHISPER_AVAILABLE:
return "Whisper not available"
print(f"Starting Whisper transcription of {audio_path}")
# Load the smallest model
model = whisper.load_model("tiny")
# Use faster settings
options = {
"language": "en",
"task": "transcribe",
"fp16": False,
"beam_size": 1
}
# Transcribe
result = model.transcribe(audio_path, **options)
if result and "text" in result:
return result["text"].strip()
else:
return "Transcription failed"
except Exception as e:
print(f"Whisper transcription error: {str(e)}")
return f"Transcription error: {str(e)}"
def transcribe_audio_transformers_simple(audio_path: str) -> str:
"""Simplified Transformers transcription that just returns text"""
try:
if not TRANSFORMERS_AVAILABLE:
return "Transformers not available"
print(f"Starting Transformers transcription of {audio_path}")
# Use the smallest model with minimal settings
asr = pipeline(
"automatic-speech-recognition",
model="openai/whisper-tiny",
device=-1 # Force CPU
)
# Simple transcription
result = asr(audio_path)
if isinstance(result, dict) and "text" in result:
return result["text"].strip()
elif isinstance(result, str):
return result.strip()
else:
return str(result)
except Exception as e:
print(f"Transformers transcription error: {str(e)}")
return f"Transcription error: {str(e)}"
def transcribe_audio_simple(audio_path: str) -> str:
"""Main transcription function that returns simple text"""
# Try Whisper first
if WHISPER_AVAILABLE:
try:
return transcribe_audio_whisper_simple(audio_path)
except Exception as e:
print(f"Whisper failed: {e}")
# Try Transformers as fallback
if TRANSFORMERS_AVAILABLE:
try:
return transcribe_audio_transformers_simple(audio_path)
except Exception as e:
print(f"Transformers failed: {e}")
# Use fallback
return "Transcription not available - no speech recognition models loaded"
def extract_key_phrases_simple(text: str, top_n: int = 5) -> List[str]:
"""Simple key phrase extraction"""
if not text:
return []
words = text.split()
key_words = [
w.strip('.,!?";:()') for w in words
if len(w) > 4 and w.isalpha() and w.lower() not in {
'this', 'that', 'with', 'have', 'will', 'from', 'they', 'been',
'were', 'said', 'each', 'which', 'their', 'time', 'would', 'there'
}
]
seen = set()
unique_words = [w for w in key_words if not (w.lower() in seen or seen.add(w.lower()))]
return unique_words[:top_n]
def summarize_text_simple(text: str) -> str:
"""Simple text summarization"""
if not text or len(text.split()) < 10:
return text
sentences = text.split('.')
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) <= 2:
return text
elif len(sentences) <= 5:
return '. '.join(sentences[:2]) + '.'
else:
# Take first, middle, and last sentences
middle_idx = len(sentences) // 2
summary_sentences = [sentences[0], sentences[middle_idx], sentences[-1]]
return '. '.join(summary_sentences) + '.'
def format_timestamp(seconds: float) -> str:
"""Format seconds into MM:SS format"""
minutes = int(seconds // 60)
remaining_seconds = int(seconds % 60)
return f"{minutes:02d}:{remaining_seconds:02d}"
def process_video_segment(video_path: str, start_time: float, duration: float, segment_id: int, temp_dir: str) -> Dict:
"""Process a single video segment"""
try:
print(f"Processing segment {segment_id}: {start_time}s - {start_time + duration}s")
# Create paths
audio_path = os.path.join(temp_dir, f"segment_{segment_id:03d}.wav")
frame_path = os.path.join(temp_dir, f"frame_{segment_id:03d}.jpg")
# Extract audio for this segment
if not extract_audio_simple(video_path, audio_path, start_time, duration):
return {
"segment": segment_id,
"start_time": format_timestamp(start_time),
"end_time": format_timestamp(start_time + duration),
"start_seconds": start_time,
"end_seconds": start_time + duration,
"text": "Audio extraction failed",
"summary": "Failed to process this segment",
"key_phrases": [],
"frame": None
}
# Extract a frame from the middle of the segment
frame_time = start_time + (duration / 2)
frame_extracted = extract_frame(video_path, frame_time, frame_path)
# Transcribe audio
text = transcribe_audio_simple(audio_path)
# Clean up audio file
try:
os.remove(audio_path)
except:
pass
if not text or text.startswith("Transcription"):
return {
"segment": segment_id,
"start_time": format_timestamp(start_time),
"end_time": format_timestamp(start_time + duration),
"start_seconds": start_time,
"end_seconds": start_time + duration,
"text": text or "No speech detected",
"summary": "No content in this segment",
"key_phrases": [],
"frame": frame_path if frame_extracted else None
}
# Generate summary and key phrases
summary = summarize_text_simple(text)
key_phrases = extract_key_phrases_simple(text)
return {
"segment": segment_id,
"start_time": format_timestamp(start_time),
"end_time": format_timestamp(start_time + duration),
"start_seconds": start_time,
"end_seconds": start_time + duration,
"text": text,
"summary": summary,
"key_phrases": key_phrases,
"frame": frame_path if frame_extracted else None
}
except Exception as e:
print(f"Error processing segment {segment_id}: {str(e)}")
return {
"segment": segment_id,
"start_time": format_timestamp(start_time),
"end_time": format_timestamp(start_time + duration),
"start_seconds": start_time,
"end_seconds": start_time + duration,
"text": f"Processing failed: {str(e)}",
"summary": "Error occurred during processing",
"key_phrases": [],
"frame": None
}
def run_pipeline(video_file: str, progress=gr.Progress()) -> List[Dict]:
"""Main pipeline function"""
if not video_file:
return [], "No video file provided", None
# Check if ffmpeg is available
if not check_ffmpeg():
return [], "FFmpeg is not available in this environment", None
print(f"Processing video: {video_file}")
progress(0.1, desc="Analyzing video...")
# Get video information
video_info = get_video_info(video_file)
print(f"Video info: {video_info}")
if not video_info['has_audio']:
return [], "Video has no audio track", None
duration = video_info['duration']
if duration == 0:
return [], "Could not determine video duration", None
# Limit processing time
max_duration = min(duration, 600) # Max 10 minutes
segment_length = 120 # 2 minutes per segment
progress(0.2, desc=f"Video duration: {duration:.1f}s, processing {max_duration:.1f}s...")
# Create temporary directory
temp_dir = tempfile.mkdtemp(prefix="lecture_capture_")
try:
# Calculate segments
segments_to_process = []
current_time = 0
segment_id = 1
while current_time < max_duration:
remaining_time = max_duration - current_time
actual_duration = min(segment_length, remaining_time)
segments_to_process.append({
'start_time': current_time,
'duration': actual_duration,
'segment_id': segment_id
})
current_time += actual_duration
segment_id += 1
print(f"Will process {len(segments_to_process)} segments")
# Process each segment
timeline = []
for i, seg_info in enumerate(segments_to_process):
progress(
0.3 + (0.6 * i / len(segments_to_process)),
desc=f"Processing segment {i+1}/{len(segments_to_process)}..."
)
try:
result = process_video_segment(
video_file,
seg_info['start_time'],
seg_info['duration'],
seg_info['segment_id'],
temp_dir
)
timeline.append(result)
except Exception as e:
print(f"Error processing segment {i+1}: {str(e)}")
timeline.append({
"segment": seg_info['segment_id'],
"start_time": format_timestamp(seg_info['start_time']),
"end_time": format_timestamp(seg_info['start_time'] + seg_info['duration']),
"start_seconds": seg_info['start_time'],
"end_seconds": seg_info['start_time'] + seg_info['duration'],
"text": f"Error: {str(e)}",
"summary": "Processing failed",
"key_phrases": [],
"frame": None
})
progress(0.9, desc="Generating visual timeline...")
if not timeline:
return [], "No segments were successfully processed", None
# Generate HTML for visual timeline
html_timeline = generate_visual_timeline(timeline, video_file)
# Generate summary of the entire video
all_text = " ".join([segment["text"] for segment in timeline if not segment["text"].startswith("Error") and not segment["text"].startswith("Processing")])
video_summary = summarize_text_simple(all_text) if all_text else "No valid transcription available"
progress(1.0, desc="Processing complete!")
return timeline, html_timeline, video_summary
except Exception as e:
import traceback
print(f"Pipeline error: {str(e)}")
print(traceback.format_exc())
return [], f"Pipeline failed: {str(e)}", None
finally:
# Don't delete temp_dir as we need the frames for display
# We'll clean it up at the end of the session
pass
def generate_visual_timeline(timeline: List[Dict], video_path: str) -> str:
"""Generate HTML for visual timeline"""
if not timeline:
return "<p>No timeline data available</p>"
html = """
<style>
.timeline-container {
font-family: Arial, sans-serif;
max-width: 100%;
margin: 0 auto;
}
.timeline-segment {
display: flex;
margin-bottom: 20px;
padding: 15px;
border-radius: 8px;
background-color: #f9f9f9;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.timeline-segment:nth-child(odd) {
background-color: #f0f7ff;
}
.timeline-thumbnail {
flex: 0 0 160px;
margin-right: 15px;
}
.timeline-thumbnail img {
width: 160px;
height: 90px;
object-fit: cover;
border-radius: 4px;
}
.timeline-content {
flex: 1;
}
.timeline-header {
display: flex;
justify-content: space-between;
margin-bottom: 8px;
}
.timeline-timestamp {
font-weight: bold;
color: #555;
}
.timeline-summary {
font-weight: bold;
margin-bottom: 8px;
}
.timeline-text {
margin-bottom: 8px;
color: #333;
}
.timeline-tags {
display: flex;
flex-wrap: wrap;
gap: 5px;
}
.timeline-tag {
background-color: #e1ecf4;
color: #39739d;
padding: 2px 8px;
border-radius: 12px;
font-size: 12px;
}
.timeline-placeholder {
background-color: #ddd;
display: flex;
align-items: center;
justify-content: center;
color: #666;
font-size: 12px;
}
.timeline-error {
color: #d32f2f;
font-style: italic;
}
.timeline-transcript {
margin: 8px 0;
}
.transcript-toggle {
cursor: pointer;
color: #39739d;
font-weight: 500;
padding: 4px 0;
}
.transcript-toggle:hover {
color: #2c5aa0;
}
.timeline-transcript[open] .timeline-text {
margin-top: 8px;
padding: 10px;
background-color: #f8f9fa;
border-radius: 4px;
border-left: 3px solid #39739d;
}
</style>
<div class="timeline-container">
"""
for segment in timeline:
# Skip if this is the info segment
if "info" in segment:
continue
segment_id = segment.get("segment", "")
start_time = segment.get("start_time", "")
end_time = segment.get("end_time", "")
text = segment.get("text", "")
summary = segment.get("summary", "")
key_phrases = segment.get("key_phrases", [])
frame_path = segment.get("frame")
# Check if this segment has an error
has_error = text.startswith("Error") or text.startswith("Processing failed") or text.startswith("Transcription error")
html += f"""
<div class="timeline-segment">
<div class="timeline-thumbnail">
"""
if frame_path and os.path.exists(frame_path):
# Use base64 encoding for the image
import base64
try:
with open(frame_path, "rb") as img_file:
img_data = base64.b64encode(img_file.read()).decode('utf-8')
html += f'<img src="data:image/jpeg;base64,{img_data}" alt="Frame at {start_time}">'
except:
html += f'<div class="timeline-placeholder" style="width:160px;height:90px;">No thumbnail</div>'
else:
html += f'<div class="timeline-placeholder" style="width:160px;height:90px;">No thumbnail</div>'
html += """
</div>
<div class="timeline-content">
<div class="timeline-header">
"""
html += f'<div class="timeline-timestamp">Segment {segment_id}: {start_time} - {end_time}</div>'
html += """
</div>
"""
if has_error:
html += f'<div class="timeline-error">{text}</div>'
else:
html += f'<div class="timeline-summary">{summary}</div>'
html += f'''
<details class="timeline-transcript">
<summary class="transcript-toggle">View Full Transcription</summary>
<div class="timeline-text">{text}</div>
</details>
'''
html += """
</div>
</div>
"""
html += "</div>"
return html
def create_interface():
with gr.Blocks(title="Lecture Capture AI Pipeline", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# NeverMiss.AI
Upload a lecture video to automatically generate:
- Transcription with timestamps
- Summaries for each segment
- Key phrases extraction
- Visual timeline with thumbnails
""")
with gr.Row():
with gr.Column(scale=1):
video_input = gr.Video(
label="Upload Lecture Video",
height=300
)
process_btn = gr.Button(
"Process Video",
variant="primary",
size="lg"
)
video_summary = gr.Textbox(
label="Video Summary",
placeholder="Video summary will appear here after processing",
lines=4
)
with gr.Column(scale=2):
with gr.Tabs():
with gr.TabItem("Visual Timeline"):
timeline_html = gr.HTML(
label="Visual Timeline",
value="<p>Timeline will appear here after processing</p>"
)
with gr.TabItem("Raw Data"):
timeline_json = gr.JSON(
label="Timeline Data"
)
process_btn.click(
fn=run_pipeline,
inputs=[video_input],
outputs=[timeline_json, timeline_html, video_summary],
show_progress=True
)
return demo
if __name__ == "__main__":
# Check if ffmpeg is available
if check_ffmpeg():
print("FFmpeg available")
else:
print("FFmpeg not available")
demo = create_interface()
demo.launch(debug=True)