#!/usr/bin/env python3 """ Telegram JSON Chat Indexer (Optimized) Features: - Batch processing for faster indexing - Graph building for reply threads - Trigram index for fuzzy search - Progress tracking - Memory-efficient streaming Usage: python indexer.py [--db ] python indexer.py result.json --db telegram.db python indexer.py result.json --batch-size 5000 --build-trigrams """ import json import sqlite3 import argparse try: import ijson HAS_IJSON = True except ImportError: HAS_IJSON = False import os import time from pathlib import Path from typing import Any, Generator from collections import defaultdict from data_structures import BloomFilter, ReplyGraph, generate_trigrams def flatten_text(text_field: Any) -> str: """ Flatten the text field which can be either a string or array of mixed content. """ if isinstance(text_field, str): return text_field if isinstance(text_field, list): parts = [] for item in text_field: if isinstance(item, str): parts.append(item) elif isinstance(item, dict) and 'text' in item: parts.append(item['text']) return ''.join(parts) return '' def extract_entities(text_entities: list) -> list[dict]: """Extract typed entities (links, mentions, etc.) from text_entities array.""" entities = [] for entity in text_entities or []: if isinstance(entity, dict): entity_type = entity.get('type', 'plain') if entity_type != 'plain': entities.append({ 'type': entity_type, 'value': entity.get('text', '') }) return entities def parse_message(msg: dict) -> dict | None: """Parse a single message from Telegram JSON format.""" if msg.get('type') != 'message': return None text_plain = flatten_text(msg.get('text', '')) entities = extract_entities(msg.get('text_entities', [])) has_links = any(e['type'] == 'link' for e in entities) has_mentions = any(e['type'] == 'mention' for e in entities) return { 'id': msg.get('id'), 'type': msg.get('type', 'message'), 'date': msg.get('date'), 'date_unixtime': int(msg.get('date_unixtime', 0)) if msg.get('date_unixtime') else 0, 'from_name': msg.get('from', ''), 'from_id': msg.get('from_id', ''), 'reply_to_message_id': msg.get('reply_to_message_id'), 'forwarded_from': msg.get('forwarded_from'), 'forwarded_from_id': msg.get('forwarded_from_id'), 'text_plain': text_plain, 'text_length': len(text_plain), 'has_media': 1 if msg.get('photo') or msg.get('file') or msg.get('media_type') else 0, 'has_photo': 1 if msg.get('photo') else 0, 'has_links': 1 if has_links else 0, 'has_mentions': 1 if has_mentions else 0, 'is_edited': 1 if msg.get('edited') else 0, 'edited_unixtime': int(msg.get('edited_unixtime', 0)) if msg.get('edited_unixtime') else None, 'photo_file_size': msg.get('photo_file_size'), 'photo_width': msg.get('width'), 'photo_height': msg.get('height'), 'raw_json': json.dumps(msg, ensure_ascii=False), 'entities': entities } def _detect_json_structure(json_path: str) -> str: """Peek at JSON to determine if root is a list or object with 'messages' key.""" with open(json_path, 'r', encoding='utf-8') as f: for char in iter(lambda: f.read(1), ''): if char in ' \t\n\r': continue if char == '[': return 'list' return 'object' return 'object' def load_json_messages(json_path: str) -> Generator[dict, None, None]: """ Load messages from Telegram export JSON file. Uses ijson for streaming (constant memory) if available, otherwise falls back to full json.load(). """ if HAS_IJSON: structure = _detect_json_structure(json_path) prefix = 'item' if structure == 'list' else 'messages.item' with open(json_path, 'rb') as f: for msg in ijson.items(f, prefix): parsed = parse_message(msg) if parsed: yield parsed else: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) messages = data if isinstance(data, list) else data.get('messages', []) for msg in messages: parsed = parse_message(msg) if parsed: yield parsed def count_messages(json_path: str) -> int: """Count messages in JSON file. Uses streaming if ijson available.""" if HAS_IJSON: structure = _detect_json_structure(json_path) prefix = 'item' if structure == 'list' else 'messages.item' count = 0 with open(json_path, 'rb') as f: for msg in ijson.items(f, prefix): if msg.get('type') == 'message': count += 1 return count else: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) messages = data if isinstance(data, list) else data.get('messages', []) return sum(1 for msg in messages if msg.get('type') == 'message') def init_database(db_path: str) -> sqlite3.Connection: """Initialize SQLite database with optimized schema.""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row # Read and execute schema schema_path = Path(__file__).parent / 'schema.sql' if schema_path.exists(): with open(schema_path, 'r') as f: conn.executescript(f.read()) else: raise FileNotFoundError(f"Schema file not found: {schema_path}") return conn class OptimizedIndexer: """ High-performance indexer with batch processing and graph building. Features: - Batch inserts (100x faster than individual inserts) - Bloom filter for duplicate detection - Reply graph construction - Trigram index building - Progress tracking """ def __init__( self, db_path: str, batch_size: int = 1000, build_trigrams: bool = False, build_graph: bool = True ): self.db_path = db_path self.batch_size = batch_size self.build_trigrams = build_trigrams self.build_graph = build_graph self.conn = init_database(db_path) self.bloom = BloomFilter(expected_items=1000000, fp_rate=0.01) self.graph = ReplyGraph() if build_graph else None # Batch buffers self.message_batch: list[tuple] = [] self.entity_batch: list[tuple] = [] self.trigram_batch: list[tuple] = [] # Stats self.stats = { 'messages': 0, 'entities': 0, 'trigrams': 0, 'users': {}, 'skipped': 0, 'duplicates': 0 } def index_file(self, json_path: str, show_progress: bool = True) -> dict: """ Index a JSON file into the database. Returns statistics dict. """ start_time = time.time() # Count total for progress if show_progress: print(f"Counting messages in {json_path}...") total = count_messages(json_path) print(f"Found {total:,} messages to index") else: total = 0 # Disable auto-commit for batch processing self.conn.execute('BEGIN TRANSACTION') try: for i, msg in enumerate(load_json_messages(json_path)): self._index_message(msg) # Progress update if show_progress and (i + 1) % 10000 == 0: elapsed = time.time() - start_time rate = (i + 1) / elapsed eta = (total - i - 1) / rate if rate > 0 else 0 print(f" Indexed {i+1:,}/{total:,} ({100*(i+1)/total:.1f}%) " f"- {rate:.0f} msg/s - ETA: {eta:.0f}s") # Flush remaining batches self._flush_batches() # Build reply graph in database if self.build_graph: self._build_graph_tables() # Update users table self._update_users() # Commit transaction self.conn.commit() # Optimize FTS index print("Optimizing FTS index...") self.conn.execute("INSERT INTO messages_fts(messages_fts) VALUES('optimize')") self.conn.commit() except Exception as e: self.conn.rollback() raise e elapsed = time.time() - start_time self.stats['elapsed_seconds'] = elapsed self.stats['messages_per_second'] = self.stats['messages'] / elapsed if elapsed > 0 else 0 return self.stats def _index_message(self, msg: dict) -> None: """Index a single message into batch buffers.""" msg_id = msg['id'] # Duplicate check with Bloom filter msg_key = f"msg_{msg_id}" if msg_key in self.bloom: self.stats['duplicates'] += 1 return self.bloom.add(msg_key) # Add to message batch self.message_batch.append(( msg['id'], msg['type'], msg['date'], msg['date_unixtime'], msg['from_name'], msg['from_id'], msg['reply_to_message_id'], msg['forwarded_from'], msg['forwarded_from_id'], msg['text_plain'], msg['text_length'], msg['has_media'], msg['has_photo'], msg['has_links'], msg['has_mentions'], msg['is_edited'], msg['edited_unixtime'], msg['photo_file_size'], msg['photo_width'], msg['photo_height'], msg['raw_json'] )) # Add entities to batch for entity in msg['entities']: self.entity_batch.append((msg_id, entity['type'], entity['value'])) # Add trigrams if enabled if self.build_trigrams and msg['text_plain']: for i, trigram in enumerate(generate_trigrams(msg['text_plain'])): self.trigram_batch.append((trigram, msg_id, i)) # Build graph if self.graph: self.graph.add_message(msg_id, msg['reply_to_message_id']) # Track users user_id = msg['from_id'] if user_id: if user_id not in self.stats['users']: self.stats['users'][user_id] = { 'display_name': msg['from_name'], 'first_seen': msg['date_unixtime'], 'last_seen': msg['date_unixtime'], 'count': 0 } self.stats['users'][user_id]['count'] += 1 ts = msg['date_unixtime'] if ts and ts < self.stats['users'][user_id]['first_seen']: self.stats['users'][user_id]['first_seen'] = ts if ts and ts > self.stats['users'][user_id]['last_seen']: self.stats['users'][user_id]['last_seen'] = ts self.stats['messages'] += 1 # Flush if batch is full if len(self.message_batch) >= self.batch_size: self._flush_batches() def _flush_batches(self) -> None: """Flush all batch buffers to database.""" cursor = self.conn.cursor() # Insert messages if self.message_batch: cursor.executemany(''' INSERT OR REPLACE INTO messages ( id, type, date, date_unixtime, from_name, from_id, reply_to_message_id, forwarded_from, forwarded_from_id, text_plain, text_length, has_media, has_photo, has_links, has_mentions, is_edited, edited_unixtime, photo_file_size, photo_width, photo_height, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', self.message_batch) self.message_batch = [] # Insert entities if self.entity_batch: cursor.executemany(''' INSERT INTO entities (message_id, type, value) VALUES (?, ?, ?) ''', self.entity_batch) self.stats['entities'] += len(self.entity_batch) self.entity_batch = [] # Insert trigrams if self.trigram_batch: cursor.executemany(''' INSERT OR IGNORE INTO trigrams (trigram, message_id, position) VALUES (?, ?, ?) ''', self.trigram_batch) self.stats['trigrams'] += len(self.trigram_batch) self.trigram_batch = [] def _build_graph_tables(self) -> None: """Build reply graph tables from in-memory graph.""" if not self.graph: return print("Building reply graph tables...") cursor = self.conn.cursor() # Insert edges into reply_graph edges = [] for parent_id, children in self.graph.children.items(): for child_id in children: edges.append((parent_id, child_id, 1)) if edges: cursor.executemany(''' INSERT OR IGNORE INTO reply_graph (parent_id, child_id, depth) VALUES (?, ?, ?) ''', edges) # Find connected components (threads) print("Finding conversation threads...") components = self.graph.find_connected_components() thread_data = [] message_thread_data = [] for thread_id, component in enumerate(components): if not component: continue # Find root (message with no parent in this component) root_id = None for msg_id in component: if msg_id not in self.graph.parents: root_id = msg_id break if root_id is None: root_id = min(component) # Get thread stats cursor.execute(''' SELECT MIN(date_unixtime), MAX(date_unixtime), COUNT(DISTINCT from_id) FROM messages WHERE id IN ({}) '''.format(','.join('?' * len(component))), list(component)) row = cursor.fetchone() thread_data.append(( root_id, len(component), row[0], # first_message_time row[1], # last_message_time row[2] # participant_count )) # Map messages to threads with depth for msg_id in component: depth = len(self.graph.get_ancestors(msg_id)) message_thread_data.append((msg_id, len(thread_data), depth)) # Insert thread data cursor.executemany(''' INSERT INTO threads (root_message_id, message_count, first_message_time, last_message_time, participant_count) VALUES (?, ?, ?, ?, ?) ''', thread_data) cursor.executemany(''' INSERT OR REPLACE INTO message_threads (message_id, thread_id, depth) VALUES (?, ?, ?) ''', message_thread_data) print(f" Created {len(thread_data)} conversation threads") def _update_users(self) -> None: """Update users table from tracked data.""" cursor = self.conn.cursor() user_data = [ (user_id, data['display_name'], data['first_seen'], data['last_seen'], data['count']) for user_id, data in self.stats['users'].items() ] cursor.executemany(''' INSERT OR REPLACE INTO users (user_id, display_name, first_seen, last_seen, message_count) VALUES (?, ?, ?, ?, ?) ''', user_data) def close(self) -> None: """Close database connection.""" self.conn.close() class IncrementalIndexer: """ Incremental indexer for adding new JSON data to existing database. Features: - Loads existing message IDs into Bloom filter - Only processes new messages - Updates FTS index automatically - Fast duplicate detection O(1) """ def __init__(self, db_path: str, batch_size: int = 1000): self.db_path = db_path self.batch_size = batch_size if not os.path.exists(db_path): raise FileNotFoundError(f"Database not found: {db_path}. Use OptimizedIndexer for initial import.") self.conn = sqlite3.connect(db_path) self.conn.row_factory = sqlite3.Row # Batch buffers self.message_batch: list[tuple] = [] self.entity_batch: list[tuple] = [] # Stats (must be initialized before _load_existing_ids) self.stats = { 'total_in_file': 0, 'new_messages': 0, 'duplicates': 0, 'entities': 0, 'users_updated': 0 } # Load existing message IDs into Bloom filter self.bloom = BloomFilter(expected_items=2000000, fp_rate=0.001) self._load_existing_ids() def _load_existing_ids(self) -> None: """Load existing message IDs into Bloom filter for O(1) duplicate detection.""" cursor = self.conn.cursor() cursor.execute("SELECT id FROM messages") count = 0 for row in cursor: self.bloom.add(f"msg_{row[0]}") count += 1 print(f"Loaded {count:,} existing message IDs into Bloom filter") self.stats['existing_count'] = count def update_from_json(self, json_path: str, show_progress: bool = True) -> dict: """ Add new messages from JSON file to existing database. Only messages that don't exist in the database will be added. FTS5 index is updated automatically. Uses streaming JSON parser (ijson) when available for constant memory usage. """ start_time = time.time() # Count total for progress (streaming-aware) total_hint = 0 if show_progress: total_hint = count_messages(json_path) print(f"Processing ~{total_hint:,} messages from {json_path}") self.stats['total_in_file'] = total_hint # Start transaction self.conn.execute('BEGIN TRANSACTION') try: if HAS_IJSON: structure = _detect_json_structure(json_path) prefix = 'item' if structure == 'list' else 'messages.item' with open(json_path, 'rb') as f: for i, msg in enumerate(ijson.items(f, prefix)): if msg.get('type') != 'message': continue parsed = parse_message(msg) if parsed: self._process_message(parsed) if show_progress and (i + 1) % 10000 == 0: print(f" Processed {i+1:,} - " f"New: {self.stats['new_messages']:,}, " f"Duplicates: {self.stats['duplicates']:,}") else: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) messages = data if isinstance(data, list) else data.get('messages', []) self.stats['total_in_file'] = len(messages) for i, msg in enumerate(messages): if msg.get('type') != 'message': continue parsed = parse_message(msg) if parsed: self._process_message(parsed) if show_progress and (i + 1) % 10000 == 0: print(f" Processed {i+1:,}/{len(messages):,} - " f"New: {self.stats['new_messages']:,}, " f"Duplicates: {self.stats['duplicates']:,}") # Flush remaining self._flush_batches() # Update user stats self._update_user_stats() # Commit self.conn.commit() # Optimize FTS if we added new data if self.stats['new_messages'] > 0: print("Optimizing FTS index...") self.conn.execute("INSERT INTO messages_fts(messages_fts) VALUES('optimize')") self.conn.commit() except Exception as e: self.conn.rollback() raise e elapsed = time.time() - start_time self.stats['elapsed_seconds'] = elapsed return self.stats def update_from_json_data(self, json_data: dict | list, show_progress: bool = False) -> dict: """ Add new messages from JSON data (already parsed, not from file). Useful for API uploads. """ start_time = time.time() messages = json_data if isinstance(json_data, list) else json_data.get('messages', []) self.stats['total_in_file'] = len(messages) # Start transaction self.conn.execute('BEGIN TRANSACTION') try: for msg in messages: if msg.get('type') != 'message': continue parsed = parse_message(msg) if parsed: self._process_message(parsed) # Flush remaining self._flush_batches() # Update user stats self._update_user_stats() # Commit self.conn.commit() # Optimize FTS if we added new data if self.stats['new_messages'] > 0: self.conn.execute("INSERT INTO messages_fts(messages_fts) VALUES('optimize')") self.conn.commit() except Exception as e: self.conn.rollback() raise e elapsed = time.time() - start_time self.stats['elapsed_seconds'] = elapsed return self.stats def _process_message(self, msg: dict) -> None: """Process a single message, adding to batch if new.""" msg_id = msg['id'] msg_key = f"msg_{msg_id}" # Check if already exists (Bloom filter first, then DB if needed) if msg_key in self.bloom: self.stats['duplicates'] += 1 return # Add to Bloom filter self.bloom.add(msg_key) # Add to message batch self.message_batch.append(( msg['id'], msg['type'], msg['date'], msg['date_unixtime'], msg['from_name'], msg['from_id'], msg['reply_to_message_id'], msg['forwarded_from'], msg['forwarded_from_id'], msg['text_plain'], msg['text_length'], msg['has_media'], msg['has_photo'], msg['has_links'], msg['has_mentions'], msg['is_edited'], msg['edited_unixtime'], msg['photo_file_size'], msg['photo_width'], msg['photo_height'], msg['raw_json'] )) # Add entities to batch for entity in msg['entities']: self.entity_batch.append((msg_id, entity['type'], entity['value'])) self.stats['new_messages'] += 1 # Flush if batch is full if len(self.message_batch) >= self.batch_size: self._flush_batches() def _flush_batches(self) -> None: """Flush batch buffers to database.""" cursor = self.conn.cursor() # Insert messages (FTS5 trigger will update automatically) if self.message_batch: cursor.executemany(''' INSERT OR IGNORE INTO messages ( id, type, date, date_unixtime, from_name, from_id, reply_to_message_id, forwarded_from, forwarded_from_id, text_plain, text_length, has_media, has_photo, has_links, has_mentions, is_edited, edited_unixtime, photo_file_size, photo_width, photo_height, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', self.message_batch) self.message_batch = [] # Insert entities if self.entity_batch: cursor.executemany(''' INSERT OR IGNORE INTO entities (message_id, type, value) VALUES (?, ?, ?) ''', self.entity_batch) self.stats['entities'] += len(self.entity_batch) self.entity_batch = [] def _update_user_stats(self) -> None: """Update users table with aggregated stats.""" cursor = self.conn.cursor() # Upsert users from messages cursor.execute(''' INSERT OR REPLACE INTO users (user_id, display_name, first_seen, last_seen, message_count) SELECT from_id, from_name, MIN(date_unixtime), MAX(date_unixtime), COUNT(*) FROM messages WHERE from_id IS NOT NULL AND from_id != '' GROUP BY from_id ''') self.stats['users_updated'] = cursor.rowcount def close(self) -> None: """Close database connection.""" self.conn.close() def update_database(db_path: str, json_path: str) -> dict: """ Convenience function to update database with new JSON file. Args: db_path: Path to existing SQLite database json_path: Path to new JSON file Returns: Statistics dict """ indexer = IncrementalIndexer(db_path) try: stats = indexer.update_from_json(json_path) return stats finally: indexer.close() def main(): parser = argparse.ArgumentParser(description='Index Telegram JSON export to SQLite (Optimized)') parser.add_argument('json_file', help='Path to Telegram export JSON file') parser.add_argument('--db', default='telegram.db', help='SQLite database path') parser.add_argument('--batch-size', type=int, default=1000, help='Batch size for inserts') parser.add_argument('--build-trigrams', action='store_true', help='Build trigram index for fuzzy search') parser.add_argument('--no-graph', action='store_true', help='Skip building reply graph') parser.add_argument('--quiet', action='store_true', help='Suppress progress output') parser.add_argument('--update', action='store_true', help='Update existing database (add only new messages)') args = parser.parse_args() if not os.path.exists(args.json_file): print(f"Error: JSON file not found: {args.json_file}") return 1 # Update mode: add new messages to existing database if args.update: if not os.path.exists(args.db): print(f"Error: Database not found: {args.db}") print("Use without --update flag for initial import") return 1 print(f"{'='*50}") print(f"INCREMENTAL UPDATE MODE") print(f"{'='*50}") print(f"Database: {args.db}") print(f"New JSON: {args.json_file}") print() indexer = IncrementalIndexer(args.db, args.batch_size) stats = indexer.update_from_json(args.json_file, show_progress=not args.quiet) print(f"\n{'='*50}") print(f"Update complete!") print(f"{'='*50}") print(f" Messages in file: {stats['total_in_file']:,}") print(f" Already existed: {stats['duplicates']:,}") print(f" New messages added: {stats['new_messages']:,}") print(f" New entities: {stats['entities']:,}") print(f" Time elapsed: {stats['elapsed_seconds']:.1f}s") indexer.close() return 0 # Initial import mode print(f"Initializing database: {args.db}") indexer = OptimizedIndexer( db_path=args.db, batch_size=args.batch_size, build_trigrams=args.build_trigrams, build_graph=not args.no_graph ) print(f"Indexing: {args.json_file}") stats = indexer.index_file(args.json_file, show_progress=not args.quiet) print(f"\n{'='*50}") print(f"Indexing complete!") print(f"{'='*50}") print(f" Messages indexed: {stats['messages']:,}") print(f" Entities extracted: {stats['entities']:,}") print(f" Unique users: {len(stats['users']):,}") print(f" Duplicates skipped: {stats['duplicates']:,}") if stats.get('trigrams'): print(f" Trigrams indexed: {stats['trigrams']:,}") print(f" Time elapsed: {stats['elapsed_seconds']:.1f}s") print(f" Speed: {stats['messages_per_second']:.0f} msg/s") print(f"\nDatabase saved to: {args.db}") indexer.close() return 0 if __name__ == '__main__': exit(main())