Spaces:
Runtime error
Runtime error
| from llm_engineering.domain.queries import Query | |
| from .topic_retriever import TopicAwareRetriever | |
| from .video_processor import VideoClipper | |
| from pathlib import Path | |
| import subprocess | |
| import os | |
| import traceback | |
| import time | |
| class VideoQAEngine: | |
| def __init__(self, video_root: str, qdrant_storage_path="/Users/yufeizhen/Desktop/project/qdrant_storage"): | |
| print("Initializing VideoQAEngine") | |
| print("Video root: {}".format(video_root)) | |
| print("Qdrant storage path: {}".format(qdrant_storage_path)) | |
| # Verify video root exists | |
| if not os.path.exists(video_root): | |
| print("WARNING: Video root directory not found: {}".format(video_root)) | |
| # Ensure Qdrant storage path exists | |
| os.makedirs(os.path.dirname(qdrant_storage_path), exist_ok=True) | |
| # Initialize components with retry logic | |
| retry_count = 0 | |
| max_retries = 3 | |
| while retry_count < max_retries: | |
| try: | |
| self.retriever = TopicAwareRetriever(qdrant_storage_path=qdrant_storage_path) | |
| self.clipper = VideoClipper() | |
| self.video_root = Path(video_root) | |
| print("VideoQAEngine initialized successfully") | |
| break | |
| except Exception as e: | |
| retry_count += 1 | |
| print("Error initializing components (attempt {}/{}): {}".format( | |
| retry_count, max_retries, e)) | |
| if retry_count >= max_retries: | |
| print("Failed to initialize components after {} attempts".format(max_retries)) | |
| raise | |
| time.sleep(2) | |
| def ask(self, question: str, output_dir: str = "clips"): | |
| print("\n--- Processing query: '{}' ---".format(question)) | |
| try: | |
| # Create a Query object | |
| query = Query.from_str(question) | |
| # Perform retrieval with diagnostics | |
| print("Retrieving relevant video segments...") | |
| start_time = time.time() | |
| results = self.retriever.retrieve(query.content) | |
| retrieval_time = time.time() - start_time | |
| print("Retrieval completed in {:.2f} seconds".format(retrieval_time)) | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Handle empty results | |
| if not results: | |
| print("No results found for query: '{}'".format(question)) | |
| return [] | |
| print("Found {} relevant video segments".format(len(results))) | |
| # Process each result to create clips | |
| clips = [] | |
| for i, result in enumerate(results): | |
| print("\nProcessing result {}/{}:".format(i+1, len(results))) | |
| print(" Video ID: {}".format(result["video_id"])) | |
| print(" Timestamps: {:.1f}s - {:.1f}s".format(result["start"], result["end"])) | |
| print(" Score: {:.4f}".format(result["score"])) | |
| # Check if video file exists | |
| video_path = self.video_root / result["video_id"] / "video.mp4" | |
| if not video_path.exists(): | |
| # Try alternative filename patterns | |
| alt_paths = list(self.video_root.glob("{}/*.mp4".format(result["video_id"]))) | |
| if alt_paths: | |
| video_path = alt_paths[0] | |
| print(" Found alternative video path: {}".format(video_path)) | |
| else: | |
| print(" ERROR: Video file not found at {}".format(video_path)) | |
| continue | |
| # Create unique output path | |
| output_path = Path(output_dir) / "clip_{}_{}_{:.3f}.mp4".format( | |
| result['video_id'], | |
| int(result["start"]), | |
| result["score"] | |
| ) | |
| try: | |
| print(" Creating clip to: {}".format(output_path)) | |
| self.clipper.create_clip(video_path, result["start"], result["end"], output_path) | |
| print(" Clip created successfully") | |
| # If clip was created successfully, add to results | |
| clips.append({ | |
| "path": output_path, | |
| "timestamps": (result["start"], result["end"]), | |
| "score": result["score"], | |
| "text": result.get("text", ""), # Include text for context | |
| "video_id": result["video_id"] | |
| }) | |
| except (subprocess.SubprocessError, FileNotFoundError) as e: | |
| print(" ERROR: Could not create video clip: {}".format(e)) | |
| # Create a placeholder info file instead of a video clip | |
| info_path = output_path.with_suffix('.txt') | |
| with open(info_path, 'w') as f: | |
| f.write("Video: {}\n".format(result['video_id'])) | |
| f.write("Time: {:.1f}s - {:.1f}s\n".format(result['start'], result['end'])) | |
| f.write("Text: {}\n".format(result.get('text', ''))) | |
| f.write("Score: {:.4f}\n".format(result['score'])) | |
| f.write("Error: {}\n".format(str(e))) | |
| print(" Created info file instead: {}".format(info_path)) | |
| # Add text-only result | |
| clips.append({ | |
| "path": info_path, | |
| "timestamps": (result["start"], result["end"]), | |
| "score": result["score"], | |
| "text": result.get("text", ""), | |
| "video_id": result["video_id"] | |
| }) | |
| print("\nProcessed {} clips successfully".format(len(clips))) | |
| return clips | |
| except Exception as e: | |
| print("Error in VideoQAEngine.ask: {}".format(e)) | |
| traceback.print_exc() | |
| return [] | |