Update app.py
Browse files
app.py
CHANGED
|
@@ -1,8 +1,8 @@
|
|
| 1 |
import os
|
| 2 |
import json
|
|
|
|
| 3 |
from pathlib import Path
|
| 4 |
|
| 5 |
-
import ffmpeg
|
| 6 |
import openai
|
| 7 |
import whisper
|
| 8 |
import spacy
|
|
@@ -16,40 +16,42 @@ try:
|
|
| 16 |
nlp = spacy.load("en_core_web_sm")
|
| 17 |
except OSError:
|
| 18 |
from spacy.cli import download
|
| 19 |
-
|
| 20 |
download("en_core_web_sm")
|
| 21 |
nlp = spacy.load("en_core_web_sm")
|
| 22 |
|
| 23 |
|
| 24 |
def chunk_video(input_path: str, chunk_length: int = 300, output_dir: str = "chunks") -> list[Path]:
|
| 25 |
"""
|
| 26 |
-
Split input video into fixed-length chunks.
|
| 27 |
"""
|
| 28 |
Path(output_dir).mkdir(exist_ok=True)
|
| 29 |
-
(
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
|
| 33 |
-
|
| 34 |
-
|
|
|
|
|
|
|
|
|
|
| 35 |
return sorted(Path(output_dir).glob("chunk_*.mp4"))
|
| 36 |
|
| 37 |
|
| 38 |
def extract_audio(video_path: str, audio_path: str) -> None:
|
| 39 |
"""
|
| 40 |
-
Extract mono 16kHz PCM audio from video.
|
| 41 |
"""
|
| 42 |
-
|
| 43 |
-
ffmpeg
|
| 44 |
-
|
| 45 |
-
|
| 46 |
-
|
| 47 |
-
)
|
| 48 |
|
| 49 |
|
| 50 |
def transcribe_audio(audio_path: str) -> list[dict]:
|
| 51 |
"""
|
| 52 |
-
Transcribe audio using
|
| 53 |
Returns list of segments with start, end, and text.
|
| 54 |
"""
|
| 55 |
model = whisper.load_model("base")
|
|
@@ -89,21 +91,21 @@ def extract_key_phrases(text: str, top_n: int = 5) -> list[str]:
|
|
| 89 |
|
| 90 |
def extract_frame(video_path: str, timestamp: str, output_path: str) -> None:
|
| 91 |
"""
|
| 92 |
-
Extract a single frame at given timestamp.
|
| 93 |
"""
|
| 94 |
-
|
| 95 |
-
ffmpeg
|
| 96 |
-
|
| 97 |
-
|
| 98 |
-
|
| 99 |
-
|
|
|
|
| 100 |
|
| 101 |
|
| 102 |
def run_pipeline(video_file: str) -> list[dict]:
|
| 103 |
"""
|
| 104 |
Execute the full pipeline and return timeline as a list of dicts.
|
| 105 |
"""
|
| 106 |
-
# Prepare directories
|
| 107 |
chunks = chunk_video(video_file)
|
| 108 |
all_segments = []
|
| 109 |
for chunk in chunks:
|
|
@@ -115,7 +117,8 @@ def run_pipeline(video_file: str) -> list[dict]:
|
|
| 115 |
summaries = [summarize_text(block) for block in transcript_blocks]
|
| 116 |
key_phrases = [extract_key_phrases(block) for block in transcript_blocks]
|
| 117 |
|
| 118 |
-
frame_dir = Path("frames")
|
|
|
|
| 119 |
frame_paths = []
|
| 120 |
for seg in all_segments:
|
| 121 |
ts = seg.get("start")
|
|
@@ -138,7 +141,7 @@ def run_pipeline(video_file: str) -> list[dict]:
|
|
| 138 |
|
| 139 |
def process_video(video_path: str) -> list[dict]:
|
| 140 |
"""
|
| 141 |
-
|
| 142 |
"""
|
| 143 |
return run_pipeline(video_path)
|
| 144 |
|
|
|
|
| 1 |
import os
|
| 2 |
import json
|
| 3 |
+
import subprocess
|
| 4 |
from pathlib import Path
|
| 5 |
|
|
|
|
| 6 |
import openai
|
| 7 |
import whisper
|
| 8 |
import spacy
|
|
|
|
| 16 |
nlp = spacy.load("en_core_web_sm")
|
| 17 |
except OSError:
|
| 18 |
from spacy.cli import download
|
|
|
|
| 19 |
download("en_core_web_sm")
|
| 20 |
nlp = spacy.load("en_core_web_sm")
|
| 21 |
|
| 22 |
|
| 23 |
def chunk_video(input_path: str, chunk_length: int = 300, output_dir: str = "chunks") -> list[Path]:
|
| 24 |
"""
|
| 25 |
+
Split input video into fixed-length chunks using ffmpeg CLI.
|
| 26 |
"""
|
| 27 |
Path(output_dir).mkdir(exist_ok=True)
|
| 28 |
+
output_pattern = os.path.join(output_dir, "chunk_%03d.mp4")
|
| 29 |
+
cmd = [
|
| 30 |
+
"ffmpeg", "-i", input_path,
|
| 31 |
+
"-f", "segment",
|
| 32 |
+
"-segment_time", str(chunk_length),
|
| 33 |
+
"-reset_timestamps", "1",
|
| 34 |
+
output_pattern
|
| 35 |
+
]
|
| 36 |
+
subprocess.run(cmd, check=True)
|
| 37 |
return sorted(Path(output_dir).glob("chunk_*.mp4"))
|
| 38 |
|
| 39 |
|
| 40 |
def extract_audio(video_path: str, audio_path: str) -> None:
|
| 41 |
"""
|
| 42 |
+
Extract mono 16kHz PCM audio from video using ffmpeg CLI.
|
| 43 |
"""
|
| 44 |
+
cmd = [
|
| 45 |
+
"ffmpeg", "-i", video_path,
|
| 46 |
+
"-acodec", "pcm_s16le", "-ac", "1", "-ar", "16k",
|
| 47 |
+
audio_path
|
| 48 |
+
]
|
| 49 |
+
subprocess.run(cmd, check=True)
|
| 50 |
|
| 51 |
|
| 52 |
def transcribe_audio(audio_path: str) -> list[dict]:
|
| 53 |
"""
|
| 54 |
+
Transcribe audio using Whisper model.
|
| 55 |
Returns list of segments with start, end, and text.
|
| 56 |
"""
|
| 57 |
model = whisper.load_model("base")
|
|
|
|
| 91 |
|
| 92 |
def extract_frame(video_path: str, timestamp: str, output_path: str) -> None:
|
| 93 |
"""
|
| 94 |
+
Extract a single frame at given timestamp using ffmpeg CLI.
|
| 95 |
"""
|
| 96 |
+
cmd = [
|
| 97 |
+
"ffmpeg", "-i", video_path,
|
| 98 |
+
"-ss", timestamp,
|
| 99 |
+
"-frames:v", "1",
|
| 100 |
+
output_path
|
| 101 |
+
]
|
| 102 |
+
subprocess.run(cmd, check=True)
|
| 103 |
|
| 104 |
|
| 105 |
def run_pipeline(video_file: str) -> list[dict]:
|
| 106 |
"""
|
| 107 |
Execute the full pipeline and return timeline as a list of dicts.
|
| 108 |
"""
|
|
|
|
| 109 |
chunks = chunk_video(video_file)
|
| 110 |
all_segments = []
|
| 111 |
for chunk in chunks:
|
|
|
|
| 117 |
summaries = [summarize_text(block) for block in transcript_blocks]
|
| 118 |
key_phrases = [extract_key_phrases(block) for block in transcript_blocks]
|
| 119 |
|
| 120 |
+
frame_dir = Path("frames")
|
| 121 |
+
frame_dir.mkdir(exist_ok=True)
|
| 122 |
frame_paths = []
|
| 123 |
for seg in all_segments:
|
| 124 |
ts = seg.get("start")
|
|
|
|
| 141 |
|
| 142 |
def process_video(video_path: str) -> list[dict]:
|
| 143 |
"""
|
| 144 |
+
Gradio wrapper: receives file path to uploaded video and runs pipeline.
|
| 145 |
"""
|
| 146 |
return run_pipeline(video_path)
|
| 147 |
|