Add all remaining modules: index_store, query_engine, akinator, pipeline, app, requirements
fea499e verified | """ | |
| Video Intelligence Platform β Gradio UI | |
| Interactive video search with Akinator-style refinement. | |
| """ | |
| import os | |
| import json | |
| import time | |
| import tempfile | |
| import numpy as np | |
| import gradio as gr | |
| from typing import Optional | |
| from .config import Config | |
| from .pipeline import IndexingPipeline | |
| from .query_engine import QueryEngine, QueryResult | |
| from .akinator import AkinatorRefiner | |
| from .gemini_client import GeminiClient | |
| from .visual_encoders import SigLIPEncoder | |
| from .index_store import VideoIndex | |
| # ββ Global State ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # (Gradio runs in a single process, so module-level state is fine) | |
| pipeline: Optional[IndexingPipeline] = None | |
| query_engine: Optional[QueryEngine] = None | |
| akinator: Optional[AkinatorRefiner] = None | |
| current_video_path: Optional[str] = None | |
| current_results: list = [] | |
| akinator_state: Optional[dict] = None | |
| def initialize_system(api_key: str) -> str: | |
| """Initialize all models and indices.""" | |
| global pipeline, query_engine, akinator | |
| if not api_key.strip(): | |
| return "β Please enter your Gemini API key" | |
| try: | |
| os.environ["GEMINI_API_KEY"] = api_key.strip() | |
| config = Config(gemini_api_key=api_key.strip()) | |
| pipeline = IndexingPipeline(config) | |
| query_engine = QueryEngine( | |
| index=pipeline.index, | |
| gemini=pipeline.gemini, | |
| siglip=pipeline.siglip, | |
| top_k=config.top_k, | |
| ) | |
| akinator = AkinatorRefiner( | |
| index=pipeline.index, | |
| gemini=pipeline.gemini, | |
| threshold=config.akinator_threshold, | |
| ) | |
| return "β System initialized! Models loaded. Upload a video to get started." | |
| except Exception as e: | |
| return f"β Initialization failed: {str(e)}" | |
| def index_video(video_file, caption_every_n: int = 3, progress=gr.Progress()): | |
| """Index an uploaded video file.""" | |
| global current_video_path | |
| if pipeline is None: | |
| return "β System not initialized. Enter your Gemini API key first.", "" | |
| if video_file is None: | |
| return "β No video uploaded", "" | |
| video_path = video_file if isinstance(video_file, str) else video_file.name | |
| current_video_path = video_path | |
| try: | |
| progress(0.1, desc="Extracting frames...") | |
| stats = pipeline.index_video( | |
| video_path, | |
| caption_every_n=max(1, int(caption_every_n)), | |
| detect_every_n=1, | |
| ) | |
| stats_str = ( | |
| f"β **Video Indexed Successfully!**\n\n" | |
| f"- **Frames extracted:** {stats['frames']}\n" | |
| f"- **Objects detected:** {stats['detections']}\n" | |
| f"- **Visual embeddings:** {stats['visual_vectors']}\n" | |
| f"- **Caption embeddings:** {stats['caption_vectors']}\n" | |
| f"- **Time elapsed:** {stats['elapsed_sec']:.1f}s\n\n" | |
| f"π Ready to search! Try queries like:\n" | |
| f'- "person wearing white clothes"\n' | |
| f'- "red car"\n' | |
| f'- "person AND car" (boolean)\n' | |
| f'- "outdoor scene at night"' | |
| ) | |
| return stats_str, video_path | |
| except Exception as e: | |
| return f"β Indexing failed: {str(e)}", "" | |
| def search_video(query: str) -> tuple: | |
| """Search the indexed video.""" | |
| global current_results, akinator_state | |
| if query_engine is None: | |
| return "β System not initialized", "", gr.update(visible=False), gr.update(visible=False) | |
| if not query.strip(): | |
| return "β Enter a search query", "", gr.update(visible=False), gr.update(visible=False) | |
| try: | |
| results = query_engine.search(query.strip()) | |
| current_results = results | |
| if not results: | |
| return "No results found for this query.", "", gr.update(visible=False), gr.update(visible=False) | |
| # Format results | |
| results_md = f"## π Found {len(results)} matching moments\n\n" | |
| for i, r in enumerate(results, 1): | |
| results_md += f"### {i}. β±οΈ {r.time_str} (score: {r.score:.3f})\n" | |
| if r.caption: | |
| results_md += f"> {r.caption[:200]}\n" | |
| if r.detections: | |
| results_md += f"π·οΈ Objects: {', '.join(r.detections)}\n" | |
| results_md += f"π‘ Source: {r.match_source}\n\n" | |
| # Check if Akinator refinement is needed | |
| if len(results) > 10 and akinator is not None: | |
| akinator_result = akinator.start(results, query) | |
| akinator_state = akinator_result | |
| if akinator_result["status"] == "refining": | |
| question = akinator_result["question"] | |
| options = akinator_result["options"] | |
| options_md = f"### π³ Too many results! Let me help narrow them down.\n\n" | |
| options_md += f"**{question}**\n\n" | |
| for opt in options: | |
| options_md += f"- {opt}\n" | |
| return ( | |
| results_md, | |
| "", | |
| gr.update(visible=True, value=options_md), | |
| gr.update(visible=True, choices=options, value=None), | |
| ) | |
| return results_md, "", gr.update(visible=False), gr.update(visible=False) | |
| except Exception as e: | |
| return f"β Search failed: {str(e)}", "", gr.update(visible=False), gr.update(visible=False) | |
| def refine_results(choice: str, query: str) -> tuple: | |
| """Process Akinator refinement choice.""" | |
| global akinator_state, current_results | |
| if akinator is None or akinator_state is None: | |
| return "No active refinement session", gr.update(visible=False), gr.update(visible=False) | |
| try: | |
| result = akinator.answer(choice, query) | |
| akinator_state = result | |
| if result["status"] == "done": | |
| # Show final refined results | |
| refined = result.get("results", []) | |
| results_md = f"## β Refined to {len(refined)} results\n\n" | |
| # Show refinement history | |
| history = result.get("history", []) | |
| if history: | |
| results_md += "**Refinement path:**\n" | |
| for h in history: | |
| results_md += f"- Q: {h['question']} β A: {h['answer']} ({h['remaining']} remaining)\n" | |
| results_md += "\n" | |
| for i, r in enumerate(refined, 1): | |
| results_md += f"### {i}. β±οΈ {r['time_str']} (score: {r['score']:.3f})\n" | |
| if r.get("caption"): | |
| results_md += f"> {r['caption'][:200]}\n" | |
| if r.get("detections"): | |
| results_md += f"π·οΈ Objects: {', '.join(r['detections'])}\n\n" | |
| return results_md, gr.update(visible=False), gr.update(visible=False) | |
| elif result["status"] == "refining": | |
| question = result["question"] | |
| options = result["options"] | |
| options_md = f"### π³ Narrowing down... ({result['count']} remaining)\n\n" | |
| options_md += f"**{question}**\n" | |
| return ( | |
| options_md, | |
| gr.update(visible=True, value=options_md), | |
| gr.update(visible=True, choices=options, value=None), | |
| ) | |
| except Exception as e: | |
| return f"β Refinement failed: {str(e)}", gr.update(visible=False), gr.update(visible=False) | |
| def generate_rag_answer(query: str) -> str: | |
| """Generate a RAG-based answer using retrieved contexts.""" | |
| global current_results | |
| if pipeline is None or not current_results: | |
| return "β No search results to generate answer from. Search first!" | |
| try: | |
| contexts = [r.to_dict() for r in current_results[:15]] # Top 15 as context | |
| answer = pipeline.gemini.generate_rag_answer(query, contexts) | |
| return f"## π€ RAG Answer\n\n{answer}" | |
| except Exception as e: | |
| return f"β RAG generation failed: {str(e)}" | |
| def get_timestamp_link(video_path, timestamp_sec): | |
| """Generate a clickable timestamp.""" | |
| return f"Jump to {int(timestamp_sec)}s" | |
| # ββ Build Gradio Interface ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_ui(): | |
| """Create the full Gradio interface.""" | |
| with gr.Blocks( | |
| title="π¬ Video Intelligence Platform", | |
| ) as app: | |
| gr.Markdown(""" | |
| # π¬ Video Intelligence Platform | |
| ### Akinator-style Video Search with RAG | |
| **Upload a video β Index it β Search with natural language β Get exact timestamps** | |
| Supports: boolean queries ("red car AND person"), attribute search ("person in white clothes"), | |
| and interactive tree-based refinement when too many results are found. | |
| --- | |
| """) | |
| # ββ Setup Section βββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| api_key_input = gr.Textbox( | |
| label="π Gemini API Key", | |
| type="password", | |
| placeholder="Enter your Gemini API key...", | |
| info="Get one free at https://aistudio.google.com/apikey", | |
| ) | |
| init_btn = gr.Button("π Initialize System", variant="primary") | |
| init_status = gr.Markdown("") | |
| init_btn.click(initialize_system, inputs=[api_key_input], outputs=[init_status]) | |
| gr.Markdown("---") | |
| # ββ Video Upload & Indexing βββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video(label="πΉ Upload Video") | |
| caption_frequency = gr.Slider( | |
| minimum=1, maximum=10, value=3, step=1, | |
| label="Caption every Nth frame", | |
| info="Lower = more detailed but slower (uses Gemini API calls)", | |
| ) | |
| index_btn = gr.Button("π Index Video", variant="primary") | |
| with gr.Column(scale=1): | |
| index_status = gr.Markdown("Upload a video and click 'Index Video' to start.") | |
| video_display = gr.Video(label="π₯ Indexed Video", interactive=False, visible=True) | |
| index_btn.click( | |
| index_video, | |
| inputs=[video_input, caption_frequency], | |
| outputs=[index_status, video_display], | |
| ) | |
| gr.Markdown("---") | |
| # ββ Search Section ββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| query_input = gr.Textbox( | |
| label="π Search Query", | |
| placeholder='Try: "person wearing white clothes", "red car AND bicycle", "outdoor night scene"', | |
| lines=2, | |
| ) | |
| with gr.Row(): | |
| search_btn = gr.Button("π Search", variant="primary") | |
| rag_btn = gr.Button("π€ Generate RAG Answer", variant="secondary") | |
| # ββ Results Section βββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| results_display = gr.Markdown( | |
| "Results will appear here after searching.", | |
| elem_classes=["results-box"], | |
| ) | |
| rag_answer = gr.Markdown("") | |
| # ββ Akinator Refinement Section βββββββββββββββββββββββββββββββββ | |
| akinator_question = gr.Markdown("", visible=False) | |
| akinator_choices = gr.Radio( | |
| choices=[], label="Select an option to narrow down results", | |
| visible=False, | |
| ) | |
| refine_btn = gr.Button("π³ Refine", visible=False) | |
| search_btn.click( | |
| search_video, | |
| inputs=[query_input], | |
| outputs=[results_display, rag_answer, akinator_question, akinator_choices], | |
| ) | |
| rag_btn.click( | |
| generate_rag_answer, | |
| inputs=[query_input], | |
| outputs=[rag_answer], | |
| ) | |
| # ββ Example Queries βββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.Markdown("---") | |
| gr.Markdown("### π‘ Example Queries") | |
| gr.Examples( | |
| examples=[ | |
| ["person wearing white clothes"], | |
| ["red car"], | |
| ["person AND car"], | |
| ["dog OR cat"], | |
| ["outdoor scene at night"], | |
| ["short girl with a bag"], | |
| ["crowd of people walking"], | |
| ], | |
| inputs=[query_input], | |
| ) | |
| # ββ Architecture Info βββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Accordion("ποΈ Architecture Details", open=False): | |
| gr.Markdown(""" | |
| ### How it works: | |
| **Indexing Pipeline:** | |
| 1. **Frame Extraction** β Extract frames at 1 FPS using OpenCV | |
| 2. **Object Detection** β Grounding DINO detects objects with attributes (colors, clothing, sizes) | |
| 3. **Visual Embeddings** β SigLIP2 embeds each frame into a 1152-dim vector | |
| 4. **Captioning** β Gemini 2.0 Flash generates detailed captions per frame | |
| 5. **Caption Embeddings** β Gemini text-embedding-004 embeds captions into 768-dim vectors | |
| 6. **Storage** β SQLite (structured) + FAISS (vectors) | |
| **Search Pipeline:** | |
| 1. **Query Decomposition** β Gemini splits boolean queries ("A AND B") into sub-queries | |
| 2. **Multi-Channel Search:** | |
| - Visual: SigLIP2 textβframe similarity (FAISS) | |
| - Caption: Gemini embedding textβcaption similarity (FAISS) | |
| - Detection: SQL structured search on object labels | |
| 3. **Score Fusion** β Weighted merge across channels | |
| 4. **Boolean Ops** β AND (timestamp intersection), OR (union) | |
| **Akinator Refinement:** | |
| - When too many results found, uses information-gain-based feature splitting | |
| - Asks discriminative questions (indoor/outdoor? day/night? etc.) | |
| - Each answer narrows results like a decision tree | |
| **RAG Generation:** | |
| - Retrieved contexts β Gemini 2.0 Flash β grounded answer with timestamp citations | |
| **Models Used:** | |
| | Component | Model | | |
| |---|---| | |
| | Frame Embeddings | SigLIP2 (google/siglip2-so400m-patch14-384) | | |
| | Object Detection | Grounding DINO (IDEA-Research/grounding-dino-tiny) | | |
| | Captioning | Gemini 2.0 Flash | | |
| | Text Embeddings | Gemini text-embedding-004 | | |
| | Query/RAG | Gemini 2.0 Flash | | |
| """) | |
| return app | |
| def main(): | |
| """Launch the application.""" | |
| app = create_ui() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |