Spaces:
Runtime error
Runtime error
| import subprocess | |
| from pathlib import Path | |
| from typing import List | |
| import webvtt | |
| import json | |
| import torch | |
| from llm_engineering.domain.video_chunks import EmbeddedVideoChunk | |
| from llm_engineering.domain.queries import Query | |
| from .multimodal_dispatcher import MultimodalEmbeddingDispatcher, ImageEmbedder, TextEmbedder | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import PointStruct, VectorParams, Distance | |
| from qdrant_client.http.exceptions import UnexpectedResponse | |
| import uuid | |
| import hashlib | |
| import numpy as np | |
| from typing import Set | |
| import time | |
| import psutil | |
| from tqdm import tqdm | |
| from contextlib import nullcontext | |
| from PIL import Image, ImageDraw, ImageFont | |
| import ffmpeg | |
| # Make spacy optional | |
| try: | |
| import spacy | |
| SPACY_AVAILABLE = True | |
| except ImportError: | |
| SPACY_AVAILABLE = False | |
| print("Spacy not available, using simplified text processing") | |
| import hashlib | |
| import uuid | |
| # Remove bertopic dependency | |
| try: | |
| from bertopic import BERTopic | |
| BERTOPIC_AVAILABLE = True | |
| except ImportError: | |
| BERTOPIC_AVAILABLE = False | |
| print("BERTopic not available, using simplified topic modeling") | |
| from sentence_transformers import SentenceTransformer | |
| class VideoIngester: | |
| def __init__(self, video_root: str): | |
| self.video_root = Path(video_root) | |
| self.checkpoint_file = self.video_root / ".processed_videos.json" | |
| self.processed_frames_file = self.video_root / ".processed_frames.json" | |
| self.processed_frames = self._load_processed_frames() | |
| self.processed_videos = self._load_checkpoint() | |
| self.nlp = None | |
| self.text_embedder = None | |
| self.image_embedder = None | |
| try: | |
| # Use multi-threaded execution | |
| cv2.setNumThreads(4) | |
| except: | |
| pass | |
| # Initialize text embedder | |
| try: | |
| from llm_engineering.application.rag.multimodal_dispatcher import TextEmbedder, ImageEmbedder | |
| self.text_embedder = TextEmbedder() | |
| self.image_embedder = ImageEmbedder() | |
| print("Initialized embedders") | |
| except Exception as e: | |
| print("Failed to load embedders: {}".format(e)) | |
| # Load NLP if spaCy is available | |
| if SPACY_AVAILABLE: | |
| try: | |
| import spacy | |
| # Use smaller model for efficiency | |
| try: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| except: | |
| # Download model if not found | |
| spacy.cli.download("en_core_web_sm") | |
| self.nlp = spacy.load("en_core_web_sm") | |
| print("Loaded NLP model") | |
| except Exception as e: | |
| print("NLP model unavailable: {}. Using fallbacks.".format(e)) | |
| # Try to load BERTopic | |
| self.topic_model = None | |
| if BERTOPIC_AVAILABLE: | |
| try: | |
| from bertopic import BERTopic | |
| # Use minimal model | |
| self.topic_model = BERTopic(verbose=True) | |
| print("Loaded BERTopic") | |
| except Exception as e: | |
| print("BERTopic unavailable: {}".format(e)) | |
| # Use CLIP-based text encoder for consistent embedding dimensions | |
| # instead of sentence-transformers which has different dimensions | |
| self.sentence_model = self.image_embedder | |
| def _merge_subtitles(self, subtitles: List[dict]) -> List[dict]: | |
| """Merge adjacent subtitles into larger chunks for better context""" | |
| merged = [] | |
| if not subtitles: | |
| return merged | |
| current_text = [subtitles[0]["text"]] | |
| current_start = subtitles[0]["start"] | |
| current_end = subtitles[0]["end"] | |
| # Configure max merge duration | |
| max_duration = 30.0 # Maximum duration for merged subtitles in seconds | |
| for i in range(1, len(subtitles)): | |
| sub = subtitles[i] | |
| # Check if this subtitle is within a reasonable time gap (2 seconds) of the previous one | |
| time_gap = sub["start"] - current_end | |
| duration_so_far = current_end - current_start | |
| if time_gap <= 2.0 and duration_so_far < max_duration: | |
| # Continue merging | |
| current_text.append(sub["text"]) | |
| current_end = sub["end"] | |
| else: | |
| # Merge complete, add to results and start a new segment | |
| merged.append({ | |
| "start": current_start, | |
| "end": current_end, | |
| "text": " ".join(current_text) | |
| }) | |
| current_text = [sub["text"]] | |
| current_start = sub["start"] | |
| current_end = sub["end"] | |
| # Don't forget the last segment | |
| if current_text: | |
| merged.append({ | |
| "start": current_start, | |
| "end": current_end, | |
| "text": " ".join(current_text) | |
| }) | |
| print("Merged {} subtitle entries into {} chunks".format(len(subtitles), len(merged))) | |
| return merged | |
| def process_video_library(self, force_reprocess: bool = False): | |
| """Process all videos in the root directory""" | |
| if not self.video_root.exists(): | |
| print("Error: Video root directory does not exist: {}".format(self.video_root)) | |
| return | |
| print("Processing videos from: {}".format(self.video_root)) | |
| # Load checkpoint if exists | |
| self.processed_videos = self._load_checkpoint() | |
| print("Already processed {} videos".format(len(self.processed_videos))) | |
| # Debug output to see which videos were already processed | |
| if self.processed_videos: | |
| print("Previously processed videos:") | |
| for vid in sorted(self.processed_videos): | |
| print(" - {}".format(vid)) | |
| # Get list of folders containing mp4 files | |
| folders = [] | |
| for path in self.video_root.glob("*"): | |
| if path.is_dir(): | |
| mp4_files = list(path.glob("*.mp4")) | |
| if mp4_files: | |
| folders.append(path) | |
| print("Found {} video folders".format(len(folders))) | |
| # Count how many will be processed | |
| to_process = [f for f in folders if force_reprocess or f.name not in self.processed_videos] | |
| print("Will process {} videos ({} skipped)".format( | |
| len(to_process), len(folders) - len(to_process))) | |
| # Process each folder | |
| start_time = time.time() | |
| for i, folder in enumerate(folders): | |
| folder_id = folder.name | |
| # Skip if already processed and not forced to reprocess | |
| if folder_id in self.processed_videos and not force_reprocess: | |
| print("Skipping {} (already processed)".format(folder_id)) | |
| continue | |
| try: | |
| print("\n[{}/{}] Processing {}".format(i+1, len(folders), folder_id)) | |
| # Log resource utilization | |
| self._log_resources() | |
| # Process the folder | |
| self._process_video_folder(folder) | |
| # Add to processed list and update checkpoint | |
| self.processed_videos.add(folder_id) | |
| self._save_checkpoint() | |
| # Estimate remaining time | |
| elapsed = time.time() - start_time | |
| videos_left = len(to_process) - (i + 1) | |
| videos_processed = i + 1 | |
| if videos_processed > 0: | |
| avg_time_per_video = elapsed / videos_processed | |
| eta = avg_time_per_video * videos_left | |
| eta_str = self._format_eta(eta) | |
| print("\nProgress: {}/{} videos ({:.1f}%)".format( | |
| videos_processed, len(to_process), | |
| 100.0 * videos_processed / len(to_process) | |
| )) | |
| print("Elapsed: {}, Avg: {:.1f}s/video, ETA: {}".format( | |
| self._format_eta(elapsed), | |
| avg_time_per_video, | |
| eta_str | |
| )) | |
| except Exception as e: | |
| print("Error processing {}: {}".format(folder_id, str(e))) | |
| # Save checkpoint to avoid reprocessing the same video | |
| self._save_checkpoint() | |
| print("\nAll videos processed!") | |
| print("Total processed videos: {}".format(len(self.processed_videos))) | |
| return | |
| def _accelerated_frame_extraction(self, mp4_path: Path, subtitles: List[dict]) -> List[Path]: | |
| """Hardware-optimized frame extraction""" | |
| frame_dir = mp4_path.parent / "frames" | |
| frame_dir.mkdir(exist_ok=True) | |
| # Check if there are already frames in the directory | |
| existing_frames = sorted(frame_dir.glob("*.jpg")) | |
| if existing_frames: | |
| print("Found {} existing frames, skipping extraction".format(len(existing_frames))) | |
| return existing_frames | |
| total_duration = sum(sub["end"] - sub["start"] for sub in subtitles) | |
| with tqdm(total=total_duration, desc="Extracting frames", leave=False) as pbar: | |
| # Try to find ffmpeg in common locations | |
| ffmpeg_cmd = None | |
| for cmd in ["/opt/homebrew/bin/ffmpeg", "ffmpeg", "/usr/local/bin/ffmpeg", "/usr/bin/ffmpeg"]: | |
| try: | |
| # Check if the command exists | |
| subprocess.run([cmd, "-version"], capture_output=True, check=True) | |
| ffmpeg_cmd = cmd | |
| print("Found ffmpeg at: {}".format(ffmpeg_cmd)) | |
| break | |
| except (subprocess.SubprocessError, FileNotFoundError): | |
| continue | |
| if not ffmpeg_cmd: | |
| print("WARNING: ffmpeg not found, using manual frame extraction") | |
| return self._manual_frame_extraction(mp4_path, subtitles) | |
| cmd = [ | |
| ffmpeg_cmd, | |
| "-y", # Overwrite output files without asking | |
| "-i", str(mp4_path), | |
| "-vf", "fps=1", | |
| "-vsync", "0", | |
| str(frame_dir / "frame_%04d.jpg") | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| # Process the output to update progress | |
| for line in result.stderr.split('\n'): | |
| if "frame=" in line: | |
| try: | |
| # Extract frame number and update progress | |
| frame_num = int(line.split("frame=")[1].split()[0]) | |
| pbar.update(1) | |
| except (ValueError, IndexError): | |
| pass | |
| except subprocess.CalledProcessError as e: | |
| print("FFmpeg error: {}".format(e.stderr)) | |
| print("Falling back to manual frame extraction") | |
| return self._manual_frame_extraction(mp4_path, subtitles) | |
| except FileNotFoundError: | |
| print("FFmpeg not found, falling back to manual frame extraction") | |
| return self._manual_frame_extraction(mp4_path, subtitles) | |
| return sorted(frame_dir.glob("*.jpg")) | |
| def _manual_frame_extraction(self, mp4_path: Path, subtitles: List[dict]) -> List[Path]: | |
| """Fallback method when ffmpeg is not available - create placeholder image files""" | |
| print("Using manual frame extraction (FALLBACK MODE)") | |
| frame_dir = mp4_path.parent / "frames" | |
| frame_dir.mkdir(exist_ok=True) | |
| # Try importing PIL for image creation | |
| try: | |
| from PIL import Image, ImageDraw, ImageFont | |
| can_create_images = True | |
| print("PIL is available for image creation") | |
| except ImportError: | |
| can_create_images = False | |
| print("PIL not available, will create empty placeholder files") | |
| # For each subtitle, create a simple blank image for each second | |
| frame_paths = [] | |
| # Ensure at least one frame is created even if no subtitles | |
| if not subtitles: | |
| print("No subtitles provided, creating a single frame") | |
| subtitles = [{"start": 0, "end": 1, "text": "No subtitle data"}] | |
| with tqdm(total=len(subtitles), desc="Creating placeholder frames", leave=False) as pbar: | |
| for subtitle in subtitles: | |
| try: | |
| start_time = int(subtitle["start"]) | |
| end_time = int(subtitle["end"]) | |
| # Create one frame per second with a maximum of 5 frames per segment | |
| frame_count = min(end_time - start_time + 1, 5) | |
| seconds = list(range(start_time, end_time + 1)) | |
| if frame_count < 5 and len(seconds) > 0: | |
| # Use all available seconds | |
| seconds_to_use = seconds | |
| else: | |
| # Sample evenly from the range | |
| step = max(1, len(seconds) // 5) | |
| seconds_to_use = seconds[::step][:5] # Take at most 5 | |
| # Always ensure at least one frame | |
| if not seconds_to_use and start_time <= end_time: | |
| seconds_to_use = [start_time] | |
| print("Creating {} placeholder frames for segment {}-{}".format( | |
| len(seconds_to_use), start_time, end_time)) | |
| for second in seconds_to_use: | |
| frame_path = frame_dir / "frame_{:04d}.jpg".format(second) | |
| # If the frame already exists, skip creation | |
| if frame_path.exists(): | |
| print("Frame already exists: {}".format(frame_path)) | |
| frame_paths.append(frame_path) | |
| continue | |
| if can_create_images: | |
| try: | |
| # Create a white background | |
| img = Image.new('RGB', (224, 224), color='white') | |
| # Add timestamp and subtitle text | |
| draw = ImageDraw.Draw(img) | |
| # Add timestamp | |
| draw.text((10, 10), "Timestamp: {}s".format(second), fill="black") | |
| # Add subtitle text (wrap it if needed) | |
| text = subtitle.get("text", "No text") | |
| if len(text) > 30: | |
| wrapped_text = "" | |
| for i in range(0, len(text), 30): | |
| wrapped_text += text[i:i+30] + "\n" | |
| text = wrapped_text | |
| draw.text((10, 40), text, fill="black") | |
| # Save the image | |
| img.save(str(frame_path), quality=85) | |
| print("Created image frame: {}".format(frame_path)) | |
| except Exception as e: | |
| print("Failed to create image: {}".format(e)) | |
| # Create an empty file as fallback | |
| with open(frame_path, 'w') as f: | |
| f.write("Placeholder for timestamp: {}s".format(second)) | |
| else: | |
| # Create an empty file as fallback | |
| with open(frame_path, 'w') as f: | |
| f.write("Placeholder for timestamp: {}s".format(second)) | |
| frame_paths.append(frame_path) | |
| except Exception as e: | |
| print("Error in manual frame extraction: {}".format(e)) | |
| # Ensure at least one frame is created even on error | |
| timestamp = int(subtitle.get("start", 0)) | |
| frame_path = frame_dir / "frame_{:04d}.jpg".format(timestamp) | |
| with open(frame_path, 'w') as f: | |
| f.write("Error placeholder for timestamp: {}s".format(timestamp)) | |
| frame_paths.append(frame_path) | |
| pbar.update(1) | |
| if not frame_paths: | |
| # Last resort - create at least one empty frame | |
| print("No frames created, adding emergency placeholder") | |
| frame_path = frame_dir / "frame_0000.jpg" | |
| with open(frame_path, 'w') as f: | |
| f.write("Emergency placeholder frame") | |
| frame_paths.append(frame_path) | |
| print("Created {} placeholder frames".format(len(frame_paths))) | |
| return sorted(frame_paths) | |
| def _create_frame_subtitle_map(self, frame_paths: List[Path], subtitles: List[dict]) -> dict: | |
| frame_to_subtitle = {} | |
| for i, sub in enumerate(subtitles): | |
| start_frame = int(sub["start"]) | |
| end_frame = int(sub["end"]) | |
| for fn in range(start_frame, end_frame + 1): | |
| frame_name = "frame_{:04d}.jpg".format(fn) | |
| # Store the subtitle index instead of text | |
| frame_to_subtitle[frame_name] = i | |
| return frame_to_subtitle | |
| def _clean_text_for_embedding(self, text): | |
| """Clean and prepare text for embedding""" | |
| if not text: | |
| return "" | |
| try: | |
| # Remove excessive whitespace and newlines | |
| import re | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Remove or replace problematic characters | |
| text = re.sub(r'[^\w\s.,!?\'"-]', '', text) | |
| return text | |
| except Exception as e: | |
| print("Error cleaning text: {}".format(e)) | |
| return text.strip() if text else "" | |
| def _optimized_embedding_processing(self, video_id: str, frame_paths: List[Path], | |
| subtitles: List[dict], metadata: dict): | |
| """Process embeddings in batches to optimize memory usage""" | |
| # Get sentences and embeddings using CLIP text encoder for consistency | |
| chunks = [] | |
| text_embedder = TextEmbedder() | |
| if len(subtitles) == 0: | |
| print("No subtitles found for {}".format(video_id)) | |
| return [] | |
| # Ensure embedders are initialized | |
| try: | |
| print("Initializing embedders") | |
| if self.text_embedder is None: | |
| print("Creating new text embedder") | |
| self.text_embedder = TextEmbedder() | |
| if self.image_embedder is None: | |
| print("Creating new image embedder") | |
| from llm_engineering.application.rag.multimodal_dispatcher import ImageEmbedder | |
| self.image_embedder = ImageEmbedder() | |
| except Exception as e: | |
| print("Error initializing embedders: {}. Using simple TextEmbedder.".format(e)) | |
| if self.text_embedder is None: | |
| self.text_embedder = TextEmbedder() | |
| # Reduce batch size for more stability | |
| batch_size = 64 | |
| # Print video processing status | |
| print("Processing video {} with {} subtitles segments".format(video_id, len(subtitles))) | |
| # Try to create a zero vector once for reuse | |
| try: | |
| zero_vector = [0.0] * 512 # CLIP uses 512 dimensions | |
| except Exception: | |
| zero_vector = None | |
| # Process in smaller batches | |
| for i in range(0, len(subtitles), batch_size): | |
| batch_subtitles = subtitles[i:i+batch_size] | |
| print("Processing subtitle batch {}/{} (segments {}-{})".format( | |
| i//batch_size + 1, | |
| (len(subtitles)-1)//batch_size + 1, | |
| i, min(i+batch_size, len(subtitles)) | |
| )) | |
| current_batch_chunks = [] | |
| for subtitle in batch_subtitles: | |
| try: | |
| # Extract frames for this segment - limit to max 3 frames per subtitle | |
| frame_paths_for_segment = self._extract_frames( | |
| Path(metadata["mp4_path"]), | |
| subtitle["start"], | |
| subtitle["end"] | |
| ) | |
| # Limit number of frames to process | |
| if frame_paths_for_segment and len(frame_paths_for_segment) > 3: | |
| print("Limiting frames from {} to 3 for subtitle at {}".format( | |
| len(frame_paths_for_segment), subtitle['start'])) | |
| # Take first, middle and last frame | |
| indices = [0, len(frame_paths_for_segment)//2, -1] | |
| frame_paths_for_segment = [frame_paths_for_segment[i] for i in indices if i < len(frame_paths_for_segment)] | |
| # Create chunk with cleaned text | |
| original_content = subtitle["text"] | |
| # Clean text for better embedding | |
| content = self._clean_text_for_embedding(original_content) | |
| # Skip empty content | |
| if not content or not content.strip(): | |
| print("Skipping empty content at time {}".format(subtitle["start"])) | |
| continue | |
| # Create a unique ID for this chunk | |
| chunk_id = "{}_{}".format(video_id, int(subtitle["start"])) | |
| # Create embeddings with better error handling | |
| text_embedding = None | |
| # First try with image embedder (CLIP) if text isn't too long | |
| clip_succeeded = False | |
| if self.image_embedder is not None and len(content) < 500: | |
| try: | |
| print("Encoding text with CLIP: {}...".format(content[:50])) | |
| text_embedding = self.image_embedder.encode_text(content) | |
| if text_embedding: | |
| print("Text embedding done, dimension: {}".format(len(text_embedding))) | |
| clip_succeeded = True | |
| else: | |
| print("Image embedder returned None, falling back") | |
| text_embedding = None | |
| except Exception as e: | |
| print("Failed to embed text with CLIP: {}".format(e)) | |
| text_embedding = None | |
| elif self.image_embedder is not None: | |
| print("Text too long for CLIP ({} chars), using fallback embedder".format(len(content))) | |
| # Fall back to text embedder if needed | |
| if text_embedding is None: | |
| try: | |
| print("Using sentence transformer for text") | |
| if self.text_embedder: | |
| text_embedding = self.text_embedder.encode(content) | |
| else: | |
| text_embedding = text_embedder.encode(content) | |
| # Ensure we have 512 dimensions for compatibility | |
| if text_embedding and len(text_embedding) != 512: | |
| print("Adjusting dimensions from {} to 512".format(len(text_embedding))) | |
| if len(text_embedding) < 512: | |
| text_embedding = text_embedding + [0.0] * (512 - len(text_embedding)) | |
| else: | |
| text_embedding = text_embedding[:512] | |
| print("Created text embedding with sentence transformer, dim: {}".format(len(text_embedding) if text_embedding else "None")) | |
| except Exception as e: | |
| print("Text embedding fallback failed: {}".format(e)) | |
| # Last resort fallback - zero embedding | |
| text_embedding = zero_vector or [0.0] * 512 | |
| # Ensure embedding is valid | |
| if not text_embedding or len(text_embedding) != 512: | |
| print("Invalid embedding, using zeros") | |
| text_embedding = zero_vector or [0.0] * 512 | |
| # Create frame embeddings if possible | |
| frame_embeddings = [] | |
| if clip_succeeded: # Only attempt frame embeddings if CLIP text worked | |
| for frame_idx, frame in enumerate(frame_paths_for_segment): | |
| try: | |
| if self.image_embedder is not None: | |
| print("Encoding frame {}/{}".format(frame_idx + 1, len(frame_paths_for_segment))) | |
| embedding = self.image_embedder.encode(str(frame)) | |
| if embedding is not None: | |
| frame_embeddings.append(embedding) | |
| except Exception as e: | |
| print("Error embedding frame {}: {}".format(frame, e)) | |
| else: | |
| print("Skipping frame embeddings since CLIP failed with text") | |
| # Create a chunk | |
| try: | |
| chunk = EmbeddedVideoChunk( | |
| video_id=video_id, | |
| document_id=chunk_id, | |
| start_time=subtitle["start"], | |
| end_time=subtitle["end"], | |
| content=content, | |
| embedding=text_embedding, | |
| frame_paths=[str(p) for p in frame_paths_for_segment] if frame_paths_for_segment else [], | |
| frame_embeddings=frame_embeddings if frame_embeddings else [[0.0] * 512], # Match CLIP dimension | |
| author_id=metadata.get("uploader", "unknown").replace(" ", "_").lower(), | |
| author_full_name=metadata.get("uploader", "unknown") | |
| ) | |
| current_batch_chunks.append(chunk) | |
| print("Created chunk for segment {}-{}, content: {}...".format( | |
| subtitle["start"], subtitle["end"], content[:50])) | |
| except Exception as e: | |
| print("Failed to create chunk object: {}".format(e)) | |
| except Exception as e: | |
| print("Error processing segment {}-{}: {}".format( | |
| subtitle["start"], subtitle["end"], e)) | |
| # Process current batch if we have chunks | |
| if current_batch_chunks: | |
| chunks.extend(current_batch_chunks) | |
| # Store chunks after each batch to avoid memory buildup | |
| try: | |
| print("Storing batch of {} chunks to Qdrant".format(len(current_batch_chunks))) | |
| self._store_chunks(current_batch_chunks) | |
| print("Memory cleared after storing batch") | |
| except Exception as e: | |
| print("Error storing chunks: {}".format(e)) | |
| # Clear batch to free memory | |
| current_batch_chunks = [] | |
| return chunks # Return any remaining chunks that weren't stored | |
| def _log_resources(self): | |
| """System resource monitoring""" | |
| mem = psutil.virtual_memory() | |
| print("\nSystem Resources | CPU: {}% | " | |
| "Memory: {:.1f}/{:.1f}GB | " | |
| "GPU Memory: {:.1f}GB".format( | |
| psutil.cpu_percent(), | |
| mem.used/1e9, | |
| mem.total/1e9, | |
| self._get_gpu_memory() | |
| )) | |
| def _get_gpu_memory(self) -> float: | |
| """Get unified memory usage""" | |
| return psutil.virtual_memory().used / 1e9 | |
| def _format_eta(self, seconds: float) -> str: | |
| return time.strftime("%H:%M:%S", time.gmtime(seconds)) | |
| def _load_checkpoint(self) -> Set[str]: | |
| if self.checkpoint_file.exists(): | |
| try: | |
| with open(self.checkpoint_file) as f: | |
| return set(json.load(f)) | |
| except (json.JSONDecodeError, IOError): | |
| print("Corrupted checkpoint file, resetting...") | |
| return set() | |
| return set() | |
| def _save_checkpoint(self): | |
| """Save the set of processed video IDs to checkpoint file""" | |
| with open(self.checkpoint_file, "w") as f: | |
| # Don't reload the checkpoint, use the current processed_videos set | |
| json.dump(list(self.processed_videos), f) | |
| print("Saved checkpoint with {} processed videos".format(len(self.processed_videos))) | |
| def _process_video_folder(self, folder: Path): | |
| """Process a single video folder""" | |
| # Load video metadata | |
| video_id = folder.name | |
| print("Processing video folder: {}".format(video_id)) | |
| try: | |
| # Phase 1: Load metadata and subtitles | |
| print("Phase 1: Loading metadata and subtitles") | |
| metadata = self._load_metadata(folder) | |
| # Find VTT file | |
| vtt_files = list(folder.glob("*.vtt")) | |
| if not vtt_files: | |
| print("No VTT subtitle file found for {}".format(video_id)) | |
| return | |
| subtitles = self._parse_subtitles(vtt_files[0]) | |
| print("Loaded {} subtitle entries".format(len(subtitles))) | |
| # Merge adjacent subtitles for better context | |
| merged_subtitles = self._merge_subtitles(subtitles) | |
| print("Merged to {} subtitle entries".format(len(merged_subtitles))) | |
| # Phase 2: Find MP4 file | |
| mp4_files = list(folder.glob("*.mp4")) | |
| if not mp4_files: | |
| print("No MP4 file found for {}".format(video_id)) | |
| return | |
| mp4_path = mp4_files[0] | |
| metadata["mp4_path"] = str(mp4_path) # Store MP4 path in metadata | |
| print("Using video file: {}".format(mp4_path)) | |
| # Phase 3: Process video chunks directly with optimized method | |
| print("Phase 3: Processing video chunks with optimized method") | |
| remaining_chunks = self._optimized_embedding_processing(video_id, [], merged_subtitles, metadata) | |
| # Store any remaining chunks | |
| if remaining_chunks: | |
| print("Storing {} remaining chunks".format(len(remaining_chunks))) | |
| self._store_chunks(remaining_chunks) | |
| print("Successfully processed video {}".format(video_id)) | |
| except Exception as e: | |
| print("Error in _process_video_folder for {}: {}".format(video_id, e)) | |
| raise | |
| def _load_metadata(self, folder: Path) -> dict: | |
| info_json = next(folder.glob("*.info.json")) | |
| with open(info_json) as f: | |
| metadata = json.load(f) | |
| metadata.setdefault("uploader", "unknown_author") | |
| return metadata | |
| def _parse_subtitles(self, vtt_path: Path) -> List[dict]: | |
| captions = webvtt.read(vtt_path) | |
| print("Raw subtitles found: {}".format(len(captions))) | |
| valid_captions = [] | |
| for caption in captions: | |
| print("Caption: {} -> {}: {}...".format(caption.start, caption.end, caption.text[:50])) | |
| if caption.end_in_seconds > caption.start_in_seconds: | |
| valid_captions.append({ | |
| "start": caption.start_in_seconds, | |
| "end": caption.end_in_seconds, | |
| "text": caption.text | |
| }) | |
| print("Valid subtitles: {}".format(len(valid_captions))) | |
| return valid_captions | |
| def _create_chunks(self, video_id: str, mp4_path: Path, subtitles: List[dict], metadata: dict): | |
| """Process subtitles and extract frames for each chunk""" | |
| if len(subtitles) == 0: | |
| print("No subtitles found for {}".format(video_id)) | |
| return [] | |
| # Process sentences with NLP for better chunking if available | |
| chunks = [] | |
| # Extract sentences with fallback for missing NLP | |
| sentences = [] | |
| if self.nlp is not None: | |
| try: | |
| # Join all subtitles and process as one document | |
| full_text = " ".join([s["text"] for s in subtitles]) | |
| doc = self.nlp(full_text) | |
| sentences = [str(sent) for sent in doc.sents] | |
| except Exception as e: | |
| print("Error in NLP processing: {}".format(e)) | |
| sentences = [s["text"] for s in subtitles] | |
| else: | |
| # Simple sentence splitting by punctuation | |
| sentences = [s["text"] for s in subtitles] | |
| # Create chunks | |
| for subtitle in subtitles: | |
| try: | |
| # Extract frames for this segment | |
| frame_paths = self._extract_frames(mp4_path, subtitle["start"], subtitle["end"]) | |
| # Create chunk | |
| content = subtitle["text"] | |
| # Skip empty content | |
| if not content.strip(): | |
| continue | |
| # Create a unique ID for this chunk | |
| chunk_id = "{}_{}".format(video_id, int(subtitle["start"])) | |
| # Create embeddings | |
| text_embedding = None | |
| if self.image_embedder is not None: | |
| try: | |
| text_embedding = self.image_embedder.encode_text(content) | |
| except Exception as e: | |
| print("Failed to embed text: {}".format(e)) | |
| if text_embedding is None: | |
| # Fallback to text embedder | |
| try: | |
| text_embedding = self.text_embedder.encode(content) | |
| except Exception: | |
| # Last resort fallback | |
| text_embedding = [0.0] * 384 | |
| # Create frame embeddings if possible | |
| frame_embeddings = [] | |
| for frame in frame_paths: | |
| try: | |
| if self.image_embedder is not None: | |
| embedding = self.image_embedder.encode(str(frame)) | |
| frame_embeddings.append(embedding) | |
| except Exception as e: | |
| print("Error embedding frame {}: {}".format(frame, e)) | |
| # Create a chunk | |
| chunk = EmbeddedVideoChunk( | |
| video_id=video_id, | |
| document_id=chunk_id, | |
| start_time=subtitle["start"], | |
| end_time=subtitle["end"], | |
| content=content, | |
| embedding=text_embedding, | |
| frame_paths=[str(p) for p in frame_paths], | |
| frame_embeddings=frame_embeddings if frame_embeddings else [[0.0] * 768], # Add fallback empty vector | |
| author_id=metadata.get("uploader", "unknown").replace(" ", "_").lower(), | |
| author_full_name=metadata.get("uploader", "unknown") | |
| ) | |
| chunks.append(chunk) | |
| except Exception as e: | |
| print("Error creating chunk for segment {}-{}: {}".format( | |
| subtitle["start"], subtitle["end"], e)) | |
| return chunks | |
| def _extract_frames(self, video_path: Path, start: float, end: float) -> List[Path]: | |
| frame_dir = video_path.parent / "frames" | |
| print("Extracting frames to: {}".format(frame_dir)) | |
| frame_dir.mkdir(exist_ok=True) | |
| # Try to find ffmpeg in common locations on macOS | |
| ffmpeg_locations = [ | |
| "ffmpeg", # if it's in PATH | |
| "/opt/homebrew/bin/ffmpeg", # Homebrew on Apple Silicon | |
| "/usr/local/bin/ffmpeg", # Homebrew on Intel Mac | |
| "/usr/bin/ffmpeg", # System-installed | |
| "/opt/local/bin/ffmpeg" # MacPorts | |
| ] | |
| ffmpeg_cmd = None | |
| for cmd in ffmpeg_locations: | |
| try: | |
| # Test if the command is available | |
| result = subprocess.run([cmd, "-version"], | |
| capture_output=True, | |
| text=True, | |
| check=False) | |
| if result.returncode == 0: | |
| ffmpeg_cmd = cmd | |
| print("Found ffmpeg at: {}".format(ffmpeg_cmd)) | |
| break | |
| except FileNotFoundError: | |
| continue | |
| if ffmpeg_cmd is None: | |
| print("WARNING: ffmpeg not found in any location. Using fallback method.") | |
| return self._manual_frame_extraction(video_path, [{"start": start, "end": end, "text": ""}]) | |
| # Continue with ffmpeg if found | |
| cmd = [ | |
| ffmpeg_cmd, | |
| "-y", # Overwrite output files without asking | |
| "-ss", str(start), | |
| "-to", str(end), | |
| "-i", str(video_path), | |
| "-vf", "fps=1", | |
| str(frame_dir / "frame_%04d.jpg") | |
| ] | |
| print("Running ffmpeg command: {}".format(" ".join(cmd))) | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, check=True, text=True) | |
| # Check for errors | |
| if result.stderr: | |
| print("FFmpeg output: {}".format(result.stderr)) | |
| except subprocess.CalledProcessError as e: | |
| print("FFmpeg error: {}".format(e.stderr)) | |
| print("Falling back to manual frame extraction") | |
| return self._manual_frame_extraction(video_path, [{"start": start, "end": end, "text": ""}]) | |
| frames = sorted(frame_dir.glob("*.jpg")) | |
| print("Extracted {} frames".format(len(frames))) | |
| return frames | |
| def _store_chunks(self, chunks: List[EmbeddedVideoChunk]): | |
| # Use a direct connection to Qdrant with the specified storage path | |
| from qdrant_client import QdrantClient | |
| qdrant_storage_path = "/Users/yufeizhen/Desktop/project/qdrant_storage" | |
| # Ensure the storage directory exists | |
| import os | |
| os.makedirs(os.path.dirname(qdrant_storage_path), exist_ok=True) | |
| # Create a direct connection to specified path | |
| try: | |
| client = QdrantClient(path=qdrant_storage_path) | |
| print("Established direct connection to Qdrant storage at: {}".format(qdrant_storage_path)) | |
| except Exception as e: | |
| print("Error connecting to Qdrant storage, falling back to connection singleton: {}".format(e)) | |
| # Fall back to the connection singleton if direct connection fails | |
| from llm_engineering.infrastructure.db.qdrant import connection | |
| client = connection | |
| collection_name = "video_chunks" | |
| if not chunks: | |
| print("Warning: No chunks to store") | |
| return | |
| # Create points payload first | |
| points = [] | |
| skipped_chunks = 0 | |
| print("Processing {} chunks for storage".format(len(chunks))) | |
| for chunk in chunks: | |
| try: | |
| # Debug print chunk properties | |
| print("Processing chunk with ID: {}, video_id: {}, start_time: {}".format( | |
| chunk.document_id, chunk.video_id, chunk.start_time)) | |
| # Ensure embedding is exactly 512 dimensions for CLIP | |
| embedding = chunk.embedding | |
| if embedding is None: | |
| print("Warning: Chunk has None embedding, skipping") | |
| skipped_chunks += 1 | |
| continue | |
| if not isinstance(embedding, list): | |
| print("Warning: Embedding is not a list, converting") | |
| try: | |
| embedding = embedding.tolist() | |
| except: | |
| print("Failed to convert embedding to list, skipping chunk") | |
| skipped_chunks += 1 | |
| continue | |
| if len(embedding) != 512: | |
| print("Embedding dimension mismatch: {} (should be 512)".format(len(embedding))) | |
| # Try to pad or truncate | |
| if len(embedding) < 512: | |
| print("Padding embedding from {} to 512 dimensions".format(len(embedding))) | |
| embedding = embedding + [0.0] * (512 - len(embedding)) | |
| else: | |
| print("Truncating embedding from {} to 512 dimensions".format(len(embedding))) | |
| embedding = embedding[:512] | |
| # Create a unique ID based on video and timestamp | |
| unique_str = "{}_{}".format(chunk.video_id, chunk.start_time) | |
| hash_obj = hashlib.sha256(unique_str.encode()).hexdigest() | |
| point_uuid = uuid.UUID(hash_obj[:32]) | |
| # Validate that chunk content is not empty | |
| if not chunk.content or not chunk.content.strip(): | |
| print("Warning: Empty content in chunk, using placeholder") | |
| content = "Empty content at timestamp {}".format(chunk.start_time) | |
| else: | |
| content = chunk.content | |
| points.append(PointStruct( | |
| id=str(point_uuid), | |
| vector=embedding, | |
| payload={ | |
| "text": content, | |
| "start": chunk.start_time, | |
| "end": chunk.end_time, | |
| "video_id": chunk.video_id, | |
| "metadata": { | |
| "topics": [], | |
| "sentence_hash": hashlib.md5(content.encode()).hexdigest() | |
| } | |
| } | |
| )) | |
| except Exception as e: | |
| print("Error processing chunk: {}".format(e)) | |
| skipped_chunks += 1 | |
| if skipped_chunks > 0: | |
| print("Skipped {} chunks due to errors".format(skipped_chunks)) | |
| if not points: | |
| print("No valid points to store after processing") | |
| return | |
| print("Prepared {} valid points for storage".format(len(points))) | |
| try: | |
| # Check if Qdrant client is properly initialized | |
| if client is None: | |
| raise ValueError("Qdrant client is None, check connection setup") | |
| # Create collection if not exists | |
| try: | |
| if not client.collection_exists(collection_name): | |
| print("Creating collection '{}' with 512-dimensional vectors".format(collection_name)) | |
| client.recreate_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams( | |
| size=512, | |
| distance=Distance.COSINE | |
| ) | |
| ) | |
| else: | |
| print("Collection '{}' already exists".format(collection_name)) | |
| except Exception as e: | |
| print("Error checking/creating collection: {}".format(e)) | |
| raise | |
| # Batch insert with progress and retry mechanism | |
| batch_size = 64 | |
| max_retries = 3 | |
| for i in range(0, len(points), batch_size): | |
| batch = points[i:i+batch_size] | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| print("Storing batch {} of {} ({} points)".format( | |
| i//batch_size + 1, | |
| (len(points)-1)//batch_size + 1, | |
| len(batch) | |
| )) | |
| client.upsert( | |
| collection_name=collection_name, | |
| points=batch, | |
| wait=True # Wait for the operation to complete | |
| ) | |
| print("Successfully stored batch {} of {}".format( | |
| i//batch_size + 1, | |
| (len(points)-1)//batch_size + 1 | |
| )) | |
| break # Successfully stored, break the retry loop | |
| except UnexpectedResponse as e: | |
| # Specific handling for connection reset and other API errors | |
| retry_count += 1 | |
| print("Qdrant API error: {} - retrying batch {} (attempt {}/{})...".format( | |
| str(e), i//batch_size + 1, retry_count, max_retries)) | |
| import time | |
| time.sleep(3 * retry_count) # Exponential backoff | |
| except Exception as e: | |
| if "Connection reset by peer" in str(e) and retry_count < max_retries - 1: | |
| retry_count += 1 | |
| print("Connection reset, retrying batch {} (attempt {}/{})...".format( | |
| i//batch_size + 1, retry_count, max_retries)) | |
| import time | |
| time.sleep(3 * retry_count) # Exponential backoff | |
| else: | |
| # If it's not a connection reset or we've used all retries, re-raise | |
| print("Fatal error storing batch: {}".format(str(e))) | |
| raise | |
| # Verify storage by counting points | |
| try: | |
| count = client.count(collection_name=collection_name) | |
| print("Successfully stored {} chunks. Collection now contains {} points".format( | |
| len(points), count.count)) | |
| except Exception as e: | |
| print("Note: Stored points but couldn't verify count: {}".format(e)) | |
| except Exception as e: | |
| print("Storage error: {}".format(str(e))) | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| def _load_processed_frames(self) -> dict: | |
| if self.processed_frames_file.exists(): | |
| with open(self.processed_frames_file) as f: | |
| return json.load(f) | |
| return {} | |
| def _save_processed_frames(self): | |
| with open(self.processed_frames_file, "w") as f: | |
| json.dump(self.processed_frames, f) | |