Spaces:
Sleeping
Sleeping
| """ | |
| AI-Powered Search for Telegram Analytics | |
| Supports: Ollama (local), Groq (free API), Google Gemini (free API) | |
| """ | |
| import sqlite3 | |
| import json | |
| import re | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| import os | |
| # Try to import AI libraries | |
| try: | |
| import requests | |
| HAS_REQUESTS = True | |
| except ImportError: | |
| HAS_REQUESTS = False | |
| try: | |
| from groq import Groq | |
| HAS_GROQ = True | |
| except ImportError: | |
| HAS_GROQ = False | |
| try: | |
| import google.generativeai as genai | |
| HAS_GEMINI = True | |
| except ImportError: | |
| HAS_GEMINI = False | |
| class AISearchEngine: | |
| """AI-powered natural language search for Telegram messages.""" | |
| def __init__(self, db_path: str, provider: str = "ollama", api_key: str = None): | |
| """ | |
| Initialize AI search engine. | |
| Args: | |
| db_path: Path to SQLite database | |
| provider: "ollama", "groq", or "gemini" | |
| api_key: API key for Groq or Gemini (not needed for Ollama) | |
| """ | |
| self.db_path = db_path | |
| self.provider = provider | |
| self.api_key = api_key or os.getenv(f"{provider.upper()}_API_KEY") | |
| # Initialize provider | |
| if provider == "groq" and HAS_GROQ: | |
| self.client = Groq(api_key=self.api_key) | |
| self.model = "llama-3.1-70b-versatile" | |
| elif provider == "gemini" and HAS_GEMINI: | |
| genai.configure(api_key=self.api_key) | |
| # Using 2.5 Flash - free tier, fast, good for SQL | |
| self.client = genai.GenerativeModel("gemini-2.5-flash") | |
| elif provider == "ollama": | |
| self.ollama_url = os.getenv("OLLAMA_URL", "http://localhost:11434") | |
| self.model = os.getenv("OLLAMA_MODEL", "llama3.1") | |
| else: | |
| raise ValueError(f"Provider {provider} not available. Install required packages.") | |
| def _get_db_schema(self) -> str: | |
| """Dynamically read schema from the actual database to stay in sync.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Get all tables and their columns | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name") | |
| tables = [row[0] for row in cursor.fetchall()] | |
| schema_parts = ["Database Schema:"] | |
| for table in tables: | |
| cursor.execute(f"PRAGMA table_info({table})") | |
| cols = cursor.fetchall() | |
| col_names = [f"{c[1]} ({c[2]})" for c in cols] | |
| schema_parts.append(f" - {table}: {', '.join(col_names)}") | |
| # Note virtual tables (FTS5) separately | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND sql LIKE '%fts5%'") | |
| fts_tables = [row[0] for row in cursor.fetchall()] | |
| if fts_tables: | |
| schema_parts.append(f"\n FTS5 tables (use MATCH for search): {', '.join(fts_tables)}") | |
| conn.close() | |
| schema_parts.append(""" | |
| Key notes: | |
| - date_unixtime: Unix timestamp (INTEGER), use for date comparisons | |
| - date: ISO format string (TEXT), use for display | |
| - text_plain: Message text content | |
| - text_length: Character count of the message | |
| - has_links: 1 if message contains URL, 0 otherwise (note: plural) | |
| - has_media: 1 if message has any media attachment | |
| - has_photo: 1 if message has a photo specifically | |
| - from_id: TEXT user ID (e.g., 'user356173100') | |
| - For text search: SELECT * FROM messages WHERE id IN (SELECT rowid FROM messages_fts WHERE messages_fts MATCH 'term') | |
| """) | |
| return '\n'.join(schema_parts) | |
| def _get_sample_data(self) -> str: | |
| """Get sample data for context.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Get user list | |
| cursor.execute(""" | |
| SELECT from_name, COUNT(*) as cnt | |
| FROM messages | |
| WHERE from_name IS NOT NULL | |
| GROUP BY from_name | |
| ORDER BY cnt DESC | |
| LIMIT 10 | |
| """) | |
| users = cursor.fetchall() | |
| # Get date range | |
| cursor.execute("SELECT MIN(date), MAX(date) FROM messages") | |
| date_range = cursor.fetchone() | |
| conn.close() | |
| return f""" | |
| Top users: {', '.join([u[0] for u in users])} | |
| Date range: {date_range[0]} to {date_range[1]} | |
| """ | |
| def _build_prompt(self, user_query: str) -> str: | |
| """Build prompt for AI model.""" | |
| schema = self._get_db_schema() | |
| sample = self._get_sample_data() | |
| return f"""You are a SQL query generator for a Telegram chat database. | |
| Your task is to convert natural language questions into SQLite queries. | |
| {schema} | |
| {sample} | |
| IMPORTANT RULES: | |
| 1. Return ONLY valid SQLite query, no explanations | |
| 2. For text search, use: SELECT * FROM messages WHERE id IN (SELECT id FROM messages_fts WHERE messages_fts MATCH 'search_term') | |
| 3. For Hebrew text, the FTS5 will handle it correctly | |
| 4. Always include relevant columns like date, from_name, text_plain | |
| 5. Limit results to 50 unless specified | |
| 6. For "who" questions, GROUP BY from_name and COUNT(*) | |
| 7. For "when" questions, include date in SELECT | |
| 8. For threads/replies, JOIN messages m2 ON m1.reply_to_message_id = m2.id | |
| User question: {user_query} | |
| SQLite query:""" | |
| def _call_ollama(self, prompt: str) -> str: | |
| """Call Ollama API.""" | |
| if not HAS_REQUESTS: | |
| raise ImportError("requests library required for Ollama") | |
| response = requests.post( | |
| f"{self.ollama_url}/api/generate", | |
| json={ | |
| "model": self.model, | |
| "prompt": prompt, | |
| "stream": False, | |
| "options": { | |
| "temperature": 0.1, | |
| "num_predict": 500 | |
| } | |
| }, | |
| timeout=60 | |
| ) | |
| response.raise_for_status() | |
| return response.json()["response"] | |
| def _call_groq(self, prompt: str) -> str: | |
| """Call Groq API.""" | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.1, | |
| max_tokens=500 | |
| ) | |
| return response.choices[0].message.content | |
| def _call_gemini(self, prompt: str) -> str: | |
| """Call Google Gemini API.""" | |
| response = self.client.generate_content(prompt) | |
| return response.text | |
| def _generate_sql(self, user_query: str) -> str: | |
| """Generate SQL from natural language query.""" | |
| prompt = self._build_prompt(user_query) | |
| if self.provider == "ollama": | |
| response = self._call_ollama(prompt) | |
| elif self.provider == "groq": | |
| response = self._call_groq(prompt) | |
| elif self.provider == "gemini": | |
| response = self._call_gemini(prompt) | |
| else: | |
| raise ValueError(f"Unknown provider: {self.provider}") | |
| # Extract SQL from response | |
| sql = response.strip() | |
| # Clean up common issues - handle various code block formats | |
| sql = re.sub(r'^```\w*\s*', '', sql) # Remove opening ```sql or ``` | |
| sql = re.sub(r'\s*```$', '', sql) # Remove closing ``` | |
| sql = re.sub(r'^```', '', sql, flags=re.MULTILINE) # Remove any remaining ``` | |
| sql = sql.strip() | |
| # Try to extract SELECT statement if there's text before it | |
| select_match = re.search(r'(SELECT\s+.+?)(?:;|$)', sql, re.IGNORECASE | re.DOTALL) | |
| if select_match: | |
| sql = select_match.group(1).strip() | |
| # Ensure it's a SELECT query for safety | |
| if not sql.upper().startswith("SELECT"): | |
| raise ValueError(f"AI generated non-SELECT query: {sql[:100]}") | |
| return sql | |
| def _execute_sql(self, sql: str) -> List[Dict[str, Any]]: | |
| """Execute SQL and return results as list of dicts.""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(sql) | |
| rows = cursor.fetchall() | |
| results = [dict(row) for row in rows] | |
| except sqlite3.Error as e: | |
| results = [{"error": str(e), "sql": sql}] | |
| finally: | |
| conn.close() | |
| return results | |
| def _generate_answer(self, user_query: str, results: List[Dict], sql: str) -> str: | |
| """Generate natural language answer from results.""" | |
| if not results: | |
| return "לא נמצאו תוצאות." | |
| if "error" in results[0]: | |
| return f"שגיאה בשאילתה: {results[0]['error']}" | |
| # Build answer prompt | |
| results_str = json.dumps(results[:20], ensure_ascii=False, indent=2) | |
| answer_prompt = f"""Based on the following query results, provide a concise answer in Hebrew. | |
| User question: {user_query} | |
| Query results (JSON): | |
| {results_str} | |
| Total results: {len(results)} | |
| Provide a helpful, concise answer in Hebrew. Include specific names, dates, and numbers from the results. | |
| If showing a list, format it nicely. Keep it brief but informative.""" | |
| if self.provider == "ollama": | |
| answer = self._call_ollama(answer_prompt) | |
| elif self.provider == "groq": | |
| answer = self._call_groq(answer_prompt) | |
| elif self.provider == "gemini": | |
| answer = self._call_gemini(answer_prompt) | |
| return answer | |
| def context_search(self, query: str, user_name: str = None) -> Dict[str, Any]: | |
| """ | |
| Hybrid context-aware search - combines FTS5 keyword search with AI reasoning. | |
| 1. AI extracts user name and relevant keywords from query | |
| 2. FTS5 finds messages matching keywords (fast, searches ALL messages) | |
| 3. AI reads relevant messages and reasons to find the answer | |
| Example: "באיזה בית חולים האחות עובדת?" | |
| - Extracts: user="האחות", keywords=["בית חולים", "עבודה", "מחלקה", "סורוקה", ...] | |
| - FTS5 finds messages from האחות containing these keywords | |
| - AI reads and infers the answer | |
| """ | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| # Step 1: AI extracts user name AND relevant keywords | |
| extract_prompt = f"""Analyze this question and extract: | |
| 1. USER_NAME: The specific person being asked about (or NONE if not about a specific person) | |
| 2. KEYWORDS: Hebrew keywords to search for in their messages (related to the question topic) | |
| Question: {query} | |
| Return in this exact format (one per line): | |
| USER_NAME: <name or NONE> | |
| KEYWORDS: <comma-separated keywords in Hebrew> | |
| Example for "באיזה בית חולים האחות עובדת?": | |
| USER_NAME: האחות | |
| KEYWORDS: בית חולים, עבודה, מחלקה, סורוקה, רמבם, איכילוב, שיבא, הדסה, טיפול נמרץ, אחות | |
| Extract:""" | |
| if self.provider == "gemini": | |
| extraction = self._call_gemini(extract_prompt).strip() | |
| elif self.provider == "groq": | |
| extraction = self._call_groq(extract_prompt).strip() | |
| else: | |
| extraction = self._call_ollama(extract_prompt).strip() | |
| # Parse extraction | |
| user_name = None | |
| keywords = [] | |
| for line in extraction.split('\n'): | |
| if line.startswith('USER_NAME:'): | |
| name = line.replace('USER_NAME:', '').strip() | |
| if name.upper() != 'NONE' and len(name) < 50: | |
| user_name = name | |
| elif line.startswith('KEYWORDS:'): | |
| kw_str = line.replace('KEYWORDS:', '').strip() | |
| keywords = [k.strip() for k in kw_str.split(',') if k.strip()] | |
| messages = [] | |
| # Step 2: Hybrid retrieval - FTS5 keyword search + recent messages | |
| if user_name and keywords: | |
| # Build FTS5 query for keywords | |
| fts_query = ' OR '.join(keywords[:10]) # Limit to 10 keywords | |
| # Search for messages from user containing keywords | |
| cursor = conn.execute(""" | |
| SELECT date, from_name, text | |
| FROM messages | |
| WHERE from_name LIKE ? | |
| AND id IN (SELECT id FROM messages_fts WHERE messages_fts MATCH ?) | |
| ORDER BY date DESC | |
| LIMIT 100 | |
| """, (f"%{user_name}%", fts_query)) | |
| messages = [dict(row) for row in cursor.fetchall()] | |
| # Also add some recent messages for context (might contain relevant info without keywords) | |
| cursor = conn.execute(""" | |
| SELECT date, from_name, text | |
| FROM messages | |
| WHERE from_name LIKE ? | |
| ORDER BY date DESC | |
| LIMIT 50 | |
| """, (f"%{user_name}%",)) | |
| recent = [dict(row) for row in cursor.fetchall()] | |
| # Combine and deduplicate | |
| seen_texts = {m['text'] for m in messages if m['text']} | |
| for m in recent: | |
| if m['text'] and m['text'] not in seen_texts: | |
| messages.append(m) | |
| seen_texts.add(m['text']) | |
| elif user_name: | |
| # No keywords, just get user's messages | |
| cursor = conn.execute(""" | |
| SELECT date, from_name, text | |
| FROM messages | |
| WHERE from_name LIKE ? | |
| ORDER BY date DESC | |
| LIMIT 200 | |
| """, (f"%{user_name}%",)) | |
| messages = [dict(row) for row in cursor.fetchall()] | |
| elif keywords: | |
| # No user, search all messages for keywords | |
| fts_query = ' OR '.join(keywords[:10]) | |
| cursor = conn.execute(""" | |
| SELECT date, from_name, text | |
| FROM messages | |
| WHERE id IN (SELECT id FROM messages_fts WHERE messages_fts MATCH ?) | |
| ORDER BY date DESC | |
| LIMIT 100 | |
| """, (fts_query,)) | |
| messages = [dict(row) for row in cursor.fetchall()] | |
| else: | |
| # Fallback: recent messages | |
| cursor = conn.execute(""" | |
| SELECT date, from_name, text | |
| FROM messages | |
| WHERE text IS NOT NULL AND text != '' | |
| ORDER BY date DESC | |
| LIMIT 100 | |
| """) | |
| messages = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| if not messages: | |
| return { | |
| "query": query, | |
| "answer": "לא נמצאו הודעות רלוונטיות", | |
| "context_messages": 0, | |
| "keywords_used": keywords, | |
| "mode": "context_search" | |
| } | |
| # Step 3: AI reasons over the retrieved messages | |
| context_text = "\n".join([ | |
| f"[{m['date']}] {m['from_name']}: {m['text'][:500]}" | |
| for m in messages if m['text'] | |
| ]) | |
| reason_prompt = f"""You are analyzing a Telegram chat history to answer a question. | |
| Read the messages carefully and infer the answer from context clues. | |
| The user may not have stated things directly - look for hints, mentions, and implications. | |
| Question: {query} | |
| Chat messages (sorted by relevance and date): | |
| {context_text} | |
| Based on these messages, answer the question in Hebrew. | |
| If you can infer information (like workplace, location, profession) from context clues, do so. | |
| Cite specific messages when possible. | |
| If you truly cannot find any relevant information, say so. | |
| Answer:""" | |
| if self.provider == "gemini": | |
| answer = self._call_gemini(reason_prompt) | |
| elif self.provider == "groq": | |
| answer = self._call_groq(reason_prompt) | |
| else: | |
| answer = self._call_ollama(reason_prompt) | |
| return { | |
| "query": query, | |
| "answer": answer, | |
| "context_user": user_name, | |
| "context_messages": len(messages), | |
| "keywords_used": keywords, | |
| "mode": "context_search" | |
| } | |
| except Exception as e: | |
| return { | |
| "query": query, | |
| "error": f"Context search error: {str(e)}", | |
| "mode": "context_search" | |
| } | |
| def search(self, query: str, generate_answer: bool = True) -> Dict[str, Any]: | |
| """ | |
| Perform AI-powered search. | |
| Args: | |
| query: Natural language question in Hebrew or English | |
| generate_answer: Whether to generate natural language answer | |
| Returns: | |
| Dict with sql, results, and optionally answer | |
| """ | |
| try: | |
| # Generate SQL | |
| sql = self._generate_sql(query) | |
| # Execute query | |
| results = self._execute_sql(sql) | |
| response = { | |
| "query": query, | |
| "sql": sql, | |
| "results": results, | |
| "count": len(results) | |
| } | |
| # Generate natural language answer | |
| if generate_answer and results and "error" not in results[0]: | |
| response["answer"] = self._generate_answer(query, results, sql) | |
| return response | |
| except Exception as e: | |
| return { | |
| "query": query, | |
| "error": str(e), | |
| "results": [] | |
| } | |
| def get_thread(self, message_id: int) -> List[Dict[str, Any]]: | |
| """Get full conversation thread for a message.""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| thread = [] | |
| visited = set() | |
| def get_parent(msg_id): | |
| """Recursively get parent messages.""" | |
| if msg_id in visited: | |
| return | |
| visited.add(msg_id) | |
| cursor.execute(""" | |
| SELECT message_id, date, from_name, text, reply_to_message_id | |
| FROM messages WHERE message_id = ? | |
| """, (msg_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| if row['reply_to_message_id']: | |
| get_parent(row['reply_to_message_id']) | |
| thread.append(dict(row)) | |
| def get_children(msg_id): | |
| """Get all replies to a message.""" | |
| cursor.execute(""" | |
| SELECT message_id, date, from_name, text, reply_to_message_id | |
| FROM messages WHERE reply_to_message_id = ? | |
| ORDER BY date | |
| """, (msg_id,)) | |
| for row in cursor.fetchall(): | |
| if row['message_id'] not in visited: | |
| visited.add(row['message_id']) | |
| thread.append(dict(row)) | |
| get_children(row['message_id']) | |
| # Get the original message and its parents | |
| get_parent(message_id) | |
| # Get all replies | |
| get_children(message_id) | |
| conn.close() | |
| # Sort by date | |
| thread.sort(key=lambda x: x['date']) | |
| return thread | |
| def find_similar_messages(self, message_id: int, limit: int = 10) -> List[Dict[str, Any]]: | |
| """Find messages similar to the given message using trigrams.""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| # Get the original message | |
| cursor.execute("SELECT text FROM messages WHERE message_id = ?", (message_id,)) | |
| row = cursor.fetchone() | |
| if not row or not row['text']: | |
| return [] | |
| # Use FTS5 to find similar messages | |
| words = row['text'].split()[:5] # Use first 5 words | |
| search_term = ' OR '.join(words) | |
| cursor.execute(""" | |
| SELECT m.message_id, m.date, m.from_name, m.text | |
| FROM messages m | |
| WHERE m.id IN ( | |
| SELECT id FROM messages_fts | |
| WHERE messages_fts MATCH ? | |
| ) | |
| AND m.message_id != ? | |
| LIMIT ? | |
| """, (search_term, message_id, limit)) | |
| results = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return results | |
| class ChatViewer: | |
| """View chat messages like Telegram.""" | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| def get_messages(self, | |
| offset: int = 0, | |
| limit: int = 50, | |
| user_id: str = None, | |
| search: str = None, | |
| date_from: str = None, | |
| date_to: str = None, | |
| has_media: bool = None, | |
| has_link: bool = None) -> Dict[str, Any]: | |
| """ | |
| Get messages with Telegram-like pagination. | |
| Returns messages in reverse chronological order (newest first). | |
| """ | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| # Build query | |
| conditions = [] | |
| params = [] | |
| if user_id: | |
| conditions.append("from_id = ?") | |
| params.append(user_id) | |
| if date_from: | |
| conditions.append("date >= ?") | |
| params.append(date_from) | |
| if date_to: | |
| conditions.append("date <= ?") | |
| params.append(date_to) | |
| if has_media is not None: | |
| if has_media: | |
| conditions.append("media_type IS NOT NULL") | |
| else: | |
| conditions.append("media_type IS NULL") | |
| if has_link is not None: | |
| conditions.append("has_link = ?") | |
| params.append(1 if has_link else 0) | |
| # Handle search | |
| if search: | |
| conditions.append("""id IN ( | |
| SELECT id FROM messages_fts WHERE messages_fts MATCH ? | |
| )""") | |
| params.append(search) | |
| where_clause = " AND ".join(conditions) if conditions else "1=1" | |
| # Get total count | |
| cursor.execute(f"SELECT COUNT(*) FROM messages WHERE {where_clause}", params) | |
| total = cursor.fetchone()[0] | |
| # Get messages | |
| query = f""" | |
| SELECT | |
| m.message_id, | |
| m.date, | |
| m.from_id, | |
| m.from_name, | |
| m.text, | |
| m.reply_to_message_id, | |
| m.forwarded_from, | |
| m.media_type, | |
| m.has_link, | |
| m.char_count, | |
| r.from_name as reply_to_name, | |
| r.text as reply_to_text | |
| FROM messages m | |
| LEFT JOIN messages r ON m.reply_to_message_id = r.message_id | |
| WHERE {where_clause} | |
| ORDER BY m.date DESC | |
| LIMIT ? OFFSET ? | |
| """ | |
| params.extend([limit, offset]) | |
| cursor.execute(query, params) | |
| messages = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return { | |
| "messages": messages, | |
| "total": total, | |
| "offset": offset, | |
| "limit": limit, | |
| "has_more": offset + limit < total | |
| } | |
| def get_message_context(self, message_id: int, before: int = 10, after: int = 10) -> Dict[str, Any]: | |
| """Get messages around a specific message (for context view).""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| # Get the target message date | |
| cursor.execute("SELECT date FROM messages WHERE message_id = ?", (message_id,)) | |
| row = cursor.fetchone() | |
| if not row: | |
| return {"messages": [], "target_id": message_id} | |
| target_date = row['date'] | |
| # Get messages before | |
| cursor.execute(""" | |
| SELECT message_id, date, from_id, from_name, text, | |
| reply_to_message_id, media_type, has_link | |
| FROM messages | |
| WHERE date < ? | |
| ORDER BY date DESC | |
| LIMIT ? | |
| """, (target_date, before)) | |
| before_msgs = list(reversed([dict(row) for row in cursor.fetchall()])) | |
| # Get target message | |
| cursor.execute(""" | |
| SELECT message_id, date, from_id, from_name, text, | |
| reply_to_message_id, media_type, has_link | |
| FROM messages | |
| WHERE message_id = ? | |
| """, (message_id,)) | |
| target_msg = dict(cursor.fetchone()) | |
| # Get messages after | |
| cursor.execute(""" | |
| SELECT message_id, date, from_id, from_name, text, | |
| reply_to_message_id, media_type, has_link | |
| FROM messages | |
| WHERE date > ? | |
| ORDER BY date ASC | |
| LIMIT ? | |
| """, (target_date, after)) | |
| after_msgs = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return { | |
| "messages": before_msgs + [target_msg] + after_msgs, | |
| "target_id": message_id | |
| } | |
| def get_user_conversation(self, user1_id: str, user2_id: str, limit: int = 100) -> List[Dict]: | |
| """Get conversation between two users (their replies to each other).""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT m1.message_id, m1.date, m1.from_id, m1.from_name, m1.text, | |
| m1.reply_to_message_id, m2.from_name as reply_to_name | |
| FROM messages m1 | |
| LEFT JOIN messages m2 ON m1.reply_to_message_id = m2.message_id | |
| WHERE (m1.from_id = ? AND m2.from_id = ?) | |
| OR (m1.from_id = ? AND m2.from_id = ?) | |
| ORDER BY m1.date DESC | |
| LIMIT ? | |
| """, (user1_id, user2_id, user2_id, user1_id, limit)) | |
| results = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return results | |
| # CLI for testing | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="AI-powered Telegram search") | |
| parser.add_argument("--db", required=True, help="Database path") | |
| parser.add_argument("--provider", default="ollama", choices=["ollama", "groq", "gemini"]) | |
| parser.add_argument("--query", help="Search query") | |
| parser.add_argument("--api-key", help="API key for cloud providers") | |
| args = parser.parse_args() | |
| if args.query: | |
| engine = AISearchEngine(args.db, args.provider, args.api_key) | |
| result = engine.search(args.query) | |
| print(f"\nQuery: {result['query']}") | |
| print(f"SQL: {result.get('sql', 'N/A')}") | |
| print(f"Results: {result.get('count', 0)}") | |
| if 'answer' in result: | |
| print(f"\nAnswer:\n{result['answer']}") | |
| if result.get('results'): | |
| print(f"\nFirst 3 results:") | |
| for r in result['results'][:3]: | |
| print(json.dumps(r, ensure_ascii=False, indent=2)) | |