Spaces:
Sleeping
Sleeping
| """ | |
| Gemini AI Client for Chat Search | |
| Uses Gemini 1.5 Flash to summarize search results and answer questions. | |
| """ | |
| import os | |
| from typing import List, Dict, Optional | |
| # Try importing Google GenAI (new package) | |
| try: | |
| from google import genai | |
| HAS_GEMINI = True | |
| except ImportError: | |
| HAS_GEMINI = False | |
| class GeminiClient: | |
| """Client for Gemini AI API.""" | |
| def __init__(self, api_key: Optional[str] = None): | |
| self.api_key = api_key or os.environ.get('GEMINI_API_KEY') | |
| self.client = None | |
| self._initialized = False | |
| def _initialize(self): | |
| """Initialize the Gemini client.""" | |
| if self._initialized: | |
| return True | |
| if not HAS_GEMINI: | |
| print("google-genai not installed") | |
| return False | |
| if not self.api_key: | |
| print("GEMINI_API_KEY not set") | |
| return False | |
| try: | |
| self.client = genai.Client(api_key=self.api_key) | |
| self._initialized = True | |
| print("Gemini client initialized") | |
| return True | |
| except Exception as e: | |
| print(f"Failed to initialize Gemini: {e}") | |
| return False | |
| def answer_from_context(self, query: str, search_results: List[Dict], | |
| max_results: int = 5) -> Dict: | |
| """ | |
| Generate an answer based on search results. | |
| Args: | |
| query: User's question | |
| search_results: List of search results with context | |
| max_results: Max results to include in context | |
| Returns: | |
| Dict with 'answer', 'sources', and 'success' | |
| """ | |
| if not self._initialize(): | |
| return { | |
| 'success': False, | |
| 'error': 'Gemini not available', | |
| 'answer': None | |
| } | |
| # Build context from search results | |
| context_parts = [] | |
| sources = [] | |
| for i, result in enumerate(search_results[:max_results]): | |
| # Handle different result formats | |
| if 'message' in result: | |
| # search_with_context format | |
| msg = result['message'] | |
| context_parts.append(f""" | |
| --- 转讜爪讗讛 {i+1} (爪讬讜谉: {result.get('score', 0):.2f}) --- | |
| 诪讗转: {msg.get('from_name', '诇讗 讬讚讜注')} | |
| 转讗专讬讱: {msg.get('date', '诇讗 讬讚讜注')} | |
| 讛讜讚注讛: {msg.get('text', '')} | |
| """) | |
| sources.append({ | |
| 'from_name': msg.get('from_name'), | |
| 'date': msg.get('date'), | |
| 'message_id': result.get('message_id') | |
| }) | |
| # Add context if available | |
| if result.get('context_before'): | |
| context_parts.append("讛拽砖专 诇驻谞讬:") | |
| for ctx in result['context_before']: | |
| context_parts.append(f" [{ctx.get('from_name', '?')}] {ctx.get('text_plain', '')[:100]}") | |
| if result.get('context_after'): | |
| context_parts.append("讛拽砖专 讗讞专讬:") | |
| for ctx in result['context_after']: | |
| context_parts.append(f" [{ctx.get('from_name', '?')}] {ctx.get('text_plain', '')[:100]}") | |
| elif 'chunk_text' in result: | |
| # hybrid_search format | |
| context_parts.append(f""" | |
| --- 转讜爪讗讛 {i+1} (爪讬讜谉: {result.get('score', 0):.2f}) --- | |
| {result.get('chunk_text', '')} | |
| """) | |
| sources.append({ | |
| 'message_id': result.get('message_id'), | |
| 'score': result.get('score') | |
| }) | |
| context = "\n".join(context_parts) | |
| # Build prompt | |
| prompt = f"""讗转讛 注讜讝专 砖诪谞转讞 砖讬讞讜转 诪拽讘讜爪转 讟诇讙专诐 讜注讜谞讛 注诇 砖讗诇讜转. | |
| 讛砖讗诇讛: {query} | |
| 诇讛诇谉 转讜爪讗讜转 讞讬驻讜砖 专诇讜讜谞讟讬讜转 诪讛砖讬讞讜转: | |
| {context} | |
| 讛谞讞讬讜转: | |
| 1. 注谞讛 讘注讘专讬转 | |
| 2. 转谉 转砖讜讘讛 拽爪专讛 讜诪诪讜拽讚转 (1-3 诪砖驻讟讬诐) | |
| 3. 讗诐 讛诪讬讚注 诇讗 讘专讜专 讗讜 诇讗 拽讬讬诐 讘转讜爪讗讜转, 讗诪讜专 "诇讗 诪爪讗转讬 诪讬讚注 讘专讜专" | |
| 4. 爪讬讬谉 讗转 讛诪拽讜专 (砖诐 讛砖讜诇讞 讜讛转讗专讬讱) 讗诐 专诇讜讜谞讟讬 | |
| 5. 讗诇 转诪爪讬讗 诪讬讚注 砖诇讗 诪讜驻讬注 讘转讜爪讗讜转 | |
| 讛转砖讜讘讛:""" | |
| try: | |
| response = self.client.models.generate_content( | |
| model='gemini-2.5-flash', | |
| contents=prompt | |
| ) | |
| answer = response.text.strip() | |
| return { | |
| 'success': True, | |
| 'answer': answer, | |
| 'sources': sources, | |
| 'query': query, | |
| 'results_used': len(context_parts) | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'answer': None | |
| } | |
| def is_available(self) -> bool: | |
| """Check if Gemini is available.""" | |
| return self._initialize() | |
| # Singleton instance | |
| _gemini_client = None | |
| def get_gemini_client() -> GeminiClient: | |
| """Get or create Gemini client instance.""" | |
| global _gemini_client | |
| if _gemini_client is None: | |
| _gemini_client = GeminiClient() | |
| return _gemini_client | |
| def ai_search(query: str, limit: int = 5) -> Dict: | |
| """ | |
| Perform AI-powered search: hybrid search + Gemini summarization. | |
| Args: | |
| query: Search query | |
| limit: Max results to use | |
| Returns: | |
| Dict with answer and metadata | |
| """ | |
| from hybrid_search import get_hybrid_search | |
| # Get hybrid search results | |
| hs = get_hybrid_search() | |
| results = hs.search_with_context(query, limit=limit) | |
| if not results: | |
| return { | |
| 'success': False, | |
| 'error': 'No search results found', | |
| 'answer': '诇讗 谞诪爪讗讜 转讜爪讗讜转 诇讞讬驻讜砖 讝讛', | |
| 'query': query | |
| } | |
| # Get AI answer | |
| client = get_gemini_client() | |
| response = client.answer_from_context(query, results, max_results=limit) | |
| # Add raw results for transparency | |
| response['search_results'] = results | |
| return response | |
| # CLI for testing | |
| if __name__ == '__main__': | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage: python gemini_client.py 'search query'") | |
| print("\nChecking Gemini availability...") | |
| client = get_gemini_client() | |
| if client.is_available(): | |
| print("Gemini is available!") | |
| else: | |
| print("Gemini is NOT available. Set GEMINI_API_KEY environment variable.") | |
| sys.exit(0) | |
| query = ' '.join(sys.argv[1:]) | |
| print(f"\n=== AI Search: {query} ===\n") | |
| result = ai_search(query) | |
| if result['success']: | |
| print(f"Answer: {result['answer']}") | |
| print(f"\nSources: {len(result.get('sources', []))} results used") | |
| else: | |
| print(f"Error: {result.get('error')}") | |