#!/usr/bin/env python3 """ Telegram Chat Search Utilities (Optimized) Features: - Full-text search with BM25 ranking - LRU caching for repeated queries - Fuzzy search with trigram similarity - Thread traversal with DFS/BFS - Autocomplete suggestions Usage: python search.py [options] python search.py "שלום" --db telegram.db python search.py "link" --user user123 --fuzzy """ import sqlite3 import argparse from datetime import datetime from typing import Optional from functools import lru_cache from data_structures import LRUCache, Trie, TrigramIndex, ReplyGraph, lru_cached class TelegramSearch: """ High-performance search interface for indexed Telegram messages. Features: - Full-text search with FTS5 and BM25 ranking - Query result caching (LRU) - Fuzzy/approximate search with trigrams - Thread reconstruction with graph traversal - Autocomplete for usernames and common terms """ def __init__(self, db_path: str = 'telegram.db', cache_size: int = 1000): self.db_path = db_path self.conn = sqlite3.connect(db_path) self.conn.row_factory = sqlite3.Row # Initialize caches self.query_cache = LRUCache(maxsize=cache_size) self.user_trie: Optional[Trie] = None self.trigram_index: Optional[TrigramIndex] = None self.reply_graph: Optional[ReplyGraph] = None def close(self): self.conn.close() def __enter__(self): return self def __exit__(self, *args): self.close() # ========================================== # FULL-TEXT SEARCH # ========================================== def search( self, query: str, user_id: Optional[str] = None, from_date: Optional[int] = None, to_date: Optional[int] = None, has_links: Optional[bool] = None, has_mentions: Optional[bool] = None, has_media: Optional[bool] = None, limit: int = 100, offset: int = 0, use_cache: bool = True ) -> list[dict]: """ Full-text search with BM25 ranking and optional filters. Args: query: FTS5 query (supports AND, OR, NOT, "phrase", prefix*) user_id: Filter by user ID from_date: Unix timestamp lower bound to_date: Unix timestamp upper bound has_links/has_mentions/has_media: Boolean filters limit: Max results offset: Pagination offset use_cache: Whether to use LRU cache Returns: List of message dicts with relevance scores """ # Build cache key cache_key = f"search:{query}:{user_id}:{from_date}:{to_date}:{has_links}:{has_mentions}:{has_media}:{limit}:{offset}" if use_cache: cached = self.query_cache.get(cache_key) if cached is not None: return cached # Build query conditions conditions = [] params = [] if user_id: conditions.append("m.from_id = ?") params.append(user_id) if from_date: conditions.append("m.date_unixtime >= ?") params.append(from_date) if to_date: conditions.append("m.date_unixtime <= ?") params.append(to_date) if has_links is not None: conditions.append("m.has_links = ?") params.append(1 if has_links else 0) if has_mentions is not None: conditions.append("m.has_mentions = ?") params.append(1 if has_mentions else 0) if has_media is not None: conditions.append("m.has_media = ?") params.append(1 if has_media else 0) where_clause = " AND ".join(conditions) if conditions else "1=1" sql = f''' SELECT m.id, m.date, m.date_unixtime, m.from_name, m.from_id, m.text_plain, m.reply_to_message_id, m.forwarded_from, m.has_links, m.has_mentions, m.has_media, bm25(messages_fts, 1.0, 0.5) as relevance FROM messages_fts JOIN messages m ON messages_fts.rowid = m.id WHERE messages_fts MATCH ? AND {where_clause} ORDER BY relevance LIMIT ? OFFSET ? ''' params = [query] + params + [limit, offset] cursor = self.conn.execute(sql, params) results = [dict(row) for row in cursor.fetchall()] if use_cache: self.query_cache.put(cache_key, results) return results def search_prefix(self, prefix: str, limit: int = 100) -> list[dict]: """ Search using prefix matching (autocomplete-style). Uses FTS5 prefix index for fast prefix queries. """ # FTS5 prefix search syntax query = f'{prefix}*' return self.search(query, limit=limit, use_cache=True) # ========================================== # FUZZY SEARCH # ========================================== def fuzzy_search( self, query: str, threshold: float = 0.3, limit: int = 50 ) -> list[dict]: """ Fuzzy search using trigram similarity. Finds messages even with typos or slight variations. Args: query: Search query threshold: Minimum similarity (0-1) limit: Max results Returns: List of (message, similarity) tuples """ # Build trigram index if not exists if self.trigram_index is None: self._build_trigram_index() # Search trigram index matches = self.trigram_index.search(query, threshold=threshold, limit=limit) # Fetch full messages results = [] for msg_id, similarity in matches: cursor = self.conn.execute( 'SELECT * FROM messages WHERE id = ?', (msg_id,) ) row = cursor.fetchone() if row: msg = dict(row) msg['similarity'] = similarity results.append(msg) return results def _build_trigram_index(self) -> None: """Build in-memory trigram index from database.""" print("Building trigram index (first time only)...") self.trigram_index = TrigramIndex() cursor = self.conn.execute( 'SELECT id, text_plain FROM messages WHERE text_plain IS NOT NULL' ) for row in cursor.fetchall(): self.trigram_index.add(row[0], row[1]) print(f"Trigram index built: {len(self.trigram_index)} documents") # ========================================== # THREAD TRAVERSAL # ========================================== def get_thread_dfs(self, message_id: int) -> list[dict]: """ Get full conversation thread using DFS traversal. Returns messages in depth-first order (follows reply chains deep). """ if self.reply_graph is None: self._build_reply_graph() # Find thread root root_id = self.reply_graph.get_thread_root(message_id) # DFS traversal msg_ids = self.reply_graph.dfs_descendants(root_id) # Fetch messages in order return self._fetch_messages_ordered(msg_ids) def get_thread_bfs(self, message_id: int) -> list[dict]: """ Get conversation thread using BFS traversal. Returns messages level by level. """ if self.reply_graph is None: self._build_reply_graph() root_id = self.reply_graph.get_thread_root(message_id) msg_ids = self.reply_graph.bfs_descendants(root_id) return self._fetch_messages_ordered(msg_ids) def get_thread_with_depth(self, message_id: int) -> list[tuple[dict, int]]: """ Get thread with depth information for each message. Returns list of (message, depth) tuples. """ if self.reply_graph is None: self._build_reply_graph() root_id = self.reply_graph.get_thread_root(message_id) items = self.reply_graph.bfs_with_depth(root_id) results = [] for msg_id, depth in items: cursor = self.conn.execute( 'SELECT * FROM messages WHERE id = ?', (msg_id,) ) row = cursor.fetchone() if row: results.append((dict(row), depth)) return results def get_replies(self, message_id: int) -> list[dict]: """Get all direct replies to a message.""" if self.reply_graph is None: self._build_reply_graph() child_ids = self.reply_graph.get_children(message_id) return self._fetch_messages_ordered(child_ids) def get_conversation_path(self, message_id: int) -> list[dict]: """Get the path from thread root to this message.""" if self.reply_graph is None: self._build_reply_graph() path_ids = self.reply_graph.get_thread_path(message_id) return self._fetch_messages_ordered(path_ids) def _build_reply_graph(self) -> None: """Build in-memory reply graph from database.""" print("Building reply graph (first time only)...") self.reply_graph = ReplyGraph() cursor = self.conn.execute( 'SELECT id, reply_to_message_id FROM messages' ) for row in cursor.fetchall(): self.reply_graph.add_message(row[0], row[1]) print(f"Reply graph built: {self.reply_graph.stats}") def _fetch_messages_ordered(self, msg_ids: list[int]) -> list[dict]: """Fetch messages preserving the order of IDs.""" if not msg_ids: return [] placeholders = ','.join('?' * len(msg_ids)) cursor = self.conn.execute( f'SELECT * FROM messages WHERE id IN ({placeholders})', msg_ids ) # Create lookup dict msg_map = {row['id']: dict(row) for row in cursor.fetchall()} # Return in original order return [msg_map[mid] for mid in msg_ids if mid in msg_map] # ========================================== # AUTOCOMPLETE # ========================================== def autocomplete_user(self, prefix: str, limit: int = 10) -> list[str]: """ Autocomplete username suggestions. Uses Trie for O(p + k) lookup where p=prefix length, k=results. """ if self.user_trie is None: self._build_user_trie() return self.user_trie.autocomplete(prefix, limit=limit) def _build_user_trie(self) -> None: """Build Trie index for usernames.""" self.user_trie = Trie() cursor = self.conn.execute('SELECT user_id, display_name FROM users') for row in cursor.fetchall(): if row['display_name']: self.user_trie.insert(row['display_name'], data=row['user_id']) if row['user_id']: self.user_trie.insert(row['user_id'], data=row['user_id']) # ========================================== # CONVENIENCE METHODS # ========================================== def search_by_user(self, user_id: str, limit: int = 100) -> list[dict]: """Get all messages from a specific user.""" sql = ''' SELECT * FROM messages WHERE from_id = ? ORDER BY date_unixtime DESC LIMIT ? ''' cursor = self.conn.execute(sql, (user_id, limit)) return [dict(row) for row in cursor.fetchall()] def search_by_date_range( self, from_date: int, to_date: int, limit: int = 1000 ) -> list[dict]: """Get messages within a date range.""" sql = ''' SELECT * FROM messages WHERE date_unixtime BETWEEN ? AND ? ORDER BY date_unixtime ASC LIMIT ? ''' cursor = self.conn.execute(sql, (from_date, to_date, limit)) return [dict(row) for row in cursor.fetchall()] def get_links(self, limit: int = 100) -> list[dict]: """Get all extracted links.""" sql = ''' SELECT e.value as url, e.message_id, m.from_name, m.date FROM entities e JOIN messages m ON e.message_id = m.id WHERE e.type = 'link' ORDER BY m.date_unixtime DESC LIMIT ? ''' cursor = self.conn.execute(sql, (limit,)) return [dict(row) for row in cursor.fetchall()] def get_mentions(self, username: Optional[str] = None, limit: int = 100) -> list[dict]: """Get mentions, optionally filtered by username.""" if username: sql = ''' SELECT e.value as mention, e.message_id, m.from_name, m.text_plain, m.date FROM entities e JOIN messages m ON e.message_id = m.id WHERE e.type = 'mention' AND e.value LIKE ? ORDER BY m.date_unixtime DESC LIMIT ? ''' cursor = self.conn.execute(sql, (f'%{username}%', limit)) else: sql = ''' SELECT e.value as mention, e.message_id, m.from_name, m.text_plain, m.date FROM entities e JOIN messages m ON e.message_id = m.id WHERE e.type = 'mention' ORDER BY m.date_unixtime DESC LIMIT ? ''' cursor = self.conn.execute(sql, (limit,)) return [dict(row) for row in cursor.fetchall()] @property def cache_stats(self) -> dict: """Get cache statistics.""" return self.query_cache.stats def format_result(msg: dict, show_depth: bool = False, depth: int = 0) -> str: """Format a message for display.""" date_str = msg.get('date', 'Unknown date') from_name = msg.get('from_name', 'Unknown') text = msg.get('text_plain', '')[:200] if len(msg.get('text_plain', '')) > 200: text += '...' flags = [] if msg.get('has_links'): flags.append('[link]') if msg.get('has_mentions'): flags.append('[mention]') if msg.get('has_media'): flags.append('[media]') if msg.get('similarity'): flags.append(f'[sim:{msg["similarity"]:.2f}]') if msg.get('relevance'): flags.append(f'[rel:{abs(msg["relevance"]):.2f}]') flags_str = ' '.join(flags) indent = ' ' * depth if show_depth else '' return f"{indent}[{date_str}] {from_name}: {text} {flags_str}" def main(): parser = argparse.ArgumentParser(description='Search indexed Telegram messages') parser.add_argument('query', nargs='?', help='Search query') parser.add_argument('--db', default='telegram.db', help='Database path') parser.add_argument('--user', help='Filter by user ID') parser.add_argument('--from-date', help='From date (YYYY-MM-DD)') parser.add_argument('--to-date', help='To date (YYYY-MM-DD)') parser.add_argument('--links', action='store_true', help='Show only messages with links') parser.add_argument('--mentions', action='store_true', help='Show only messages with mentions') parser.add_argument('--media', action='store_true', help='Show only messages with media') parser.add_argument('--limit', type=int, default=50, help='Max results') parser.add_argument('--fuzzy', action='store_true', help='Use fuzzy search') parser.add_argument('--threshold', type=float, default=0.3, help='Fuzzy match threshold') parser.add_argument('--thread', type=int, help='Show thread for message ID') parser.add_argument('--list-links', action='store_true', help='List all extracted links') parser.add_argument('--list-mentions', action='store_true', help='List all mentions') parser.add_argument('--autocomplete', help='Autocomplete username') parser.add_argument('--cache-stats', action='store_true', help='Show cache statistics') args = parser.parse_args() with TelegramSearch(args.db) as search: # Show thread if args.thread: print(f"Thread containing message {args.thread}:\n") thread = search.get_thread_with_depth(args.thread) for msg, depth in thread: print(format_result(msg, show_depth=True, depth=depth)) return # Autocomplete if args.autocomplete: suggestions = search.autocomplete_user(args.autocomplete) print(f"Suggestions for '{args.autocomplete}':") for s in suggestions: print(f" {s}") return # List links if args.list_links: links = search.get_links(args.limit) print(f"Found {len(links)} links:\n") for link in links: print(f" {link['url']}") print(f" From: {link['from_name']} at {link['date']}") return # List mentions if args.list_mentions: mentions = search.get_mentions(limit=args.limit) print(f"Found {len(mentions)} mentions:\n") for m in mentions: print(f" {m['mention']} by {m['from_name']}") return # Cache stats if args.cache_stats: print(f"Cache stats: {search.cache_stats}") return if not args.query: parser.print_help() return # Parse dates from_ts = None to_ts = None if args.from_date: from_ts = int(datetime.strptime(args.from_date, '%Y-%m-%d').timestamp()) if args.to_date: to_ts = int(datetime.strptime(args.to_date, '%Y-%m-%d').timestamp()) # Fuzzy or regular search if args.fuzzy: results = search.fuzzy_search( query=args.query, threshold=args.threshold, limit=args.limit ) print(f"Found {len(results)} fuzzy matches for '{args.query}':\n") else: results = search.search( query=args.query, user_id=args.user, from_date=from_ts, to_date=to_ts, has_links=True if args.links else None, has_mentions=True if args.mentions else None, has_media=True if args.media else None, limit=args.limit ) print(f"Found {len(results)} results for '{args.query}':\n") for msg in results: print(format_result(msg)) print() # Show cache stats print(f"\nCache: {search.cache_stats}") if __name__ == '__main__': main()