Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Telegram JSON Chat Indexer (Optimized) | |
| Features: | |
| - Batch processing for faster indexing | |
| - Graph building for reply threads | |
| - Trigram index for fuzzy search | |
| - Progress tracking | |
| - Memory-efficient streaming | |
| Usage: | |
| python indexer.py <json_file> [--db <database_file>] | |
| python indexer.py result.json --db telegram.db | |
| python indexer.py result.json --batch-size 5000 --build-trigrams | |
| """ | |
| import json | |
| import sqlite3 | |
| import argparse | |
| try: | |
| import ijson | |
| HAS_IJSON = True | |
| except ImportError: | |
| HAS_IJSON = False | |
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Any, Generator | |
| from collections import defaultdict | |
| from data_structures import BloomFilter, ReplyGraph, generate_trigrams | |
| def flatten_text(text_field: Any) -> str: | |
| """ | |
| Flatten the text field which can be either a string or array of mixed content. | |
| """ | |
| if isinstance(text_field, str): | |
| return text_field | |
| if isinstance(text_field, list): | |
| parts = [] | |
| for item in text_field: | |
| if isinstance(item, str): | |
| parts.append(item) | |
| elif isinstance(item, dict) and 'text' in item: | |
| parts.append(item['text']) | |
| return ''.join(parts) | |
| return '' | |
| def extract_entities(text_entities: list) -> list[dict]: | |
| """Extract typed entities (links, mentions, etc.) from text_entities array.""" | |
| entities = [] | |
| for entity in text_entities or []: | |
| if isinstance(entity, dict): | |
| entity_type = entity.get('type', 'plain') | |
| if entity_type != 'plain': | |
| entities.append({ | |
| 'type': entity_type, | |
| 'value': entity.get('text', '') | |
| }) | |
| return entities | |
| def parse_message(msg: dict) -> dict | None: | |
| """Parse a single message from Telegram JSON format.""" | |
| if msg.get('type') != 'message': | |
| return None | |
| text_plain = flatten_text(msg.get('text', '')) | |
| entities = extract_entities(msg.get('text_entities', [])) | |
| has_links = any(e['type'] == 'link' for e in entities) | |
| has_mentions = any(e['type'] == 'mention' for e in entities) | |
| return { | |
| 'id': msg.get('id'), | |
| 'type': msg.get('type', 'message'), | |
| 'date': msg.get('date'), | |
| 'date_unixtime': int(msg.get('date_unixtime', 0)) if msg.get('date_unixtime') else 0, | |
| 'from_name': msg.get('from', ''), | |
| 'from_id': msg.get('from_id', ''), | |
| 'reply_to_message_id': msg.get('reply_to_message_id'), | |
| 'forwarded_from': msg.get('forwarded_from'), | |
| 'forwarded_from_id': msg.get('forwarded_from_id'), | |
| 'text_plain': text_plain, | |
| 'text_length': len(text_plain), | |
| 'has_media': 1 if msg.get('photo') or msg.get('file') or msg.get('media_type') else 0, | |
| 'has_photo': 1 if msg.get('photo') else 0, | |
| 'has_links': 1 if has_links else 0, | |
| 'has_mentions': 1 if has_mentions else 0, | |
| 'is_edited': 1 if msg.get('edited') else 0, | |
| 'edited_unixtime': int(msg.get('edited_unixtime', 0)) if msg.get('edited_unixtime') else None, | |
| 'photo_file_size': msg.get('photo_file_size'), | |
| 'photo_width': msg.get('width'), | |
| 'photo_height': msg.get('height'), | |
| 'raw_json': json.dumps(msg, ensure_ascii=False), | |
| 'entities': entities | |
| } | |
| def _detect_json_structure(json_path: str) -> str: | |
| """Peek at JSON to determine if root is a list or object with 'messages' key.""" | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| for char in iter(lambda: f.read(1), ''): | |
| if char in ' \t\n\r': | |
| continue | |
| if char == '[': | |
| return 'list' | |
| return 'object' | |
| return 'object' | |
| def load_json_messages(json_path: str) -> Generator[dict, None, None]: | |
| """ | |
| Load messages from Telegram export JSON file. | |
| Uses ijson for streaming (constant memory) if available, | |
| otherwise falls back to full json.load(). | |
| """ | |
| if HAS_IJSON: | |
| structure = _detect_json_structure(json_path) | |
| prefix = 'item' if structure == 'list' else 'messages.item' | |
| with open(json_path, 'rb') as f: | |
| for msg in ijson.items(f, prefix): | |
| parsed = parse_message(msg) | |
| if parsed: | |
| yield parsed | |
| else: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| messages = data if isinstance(data, list) else data.get('messages', []) | |
| for msg in messages: | |
| parsed = parse_message(msg) | |
| if parsed: | |
| yield parsed | |
| def count_messages(json_path: str) -> int: | |
| """Count messages in JSON file. Uses streaming if ijson available.""" | |
| if HAS_IJSON: | |
| structure = _detect_json_structure(json_path) | |
| prefix = 'item' if structure == 'list' else 'messages.item' | |
| count = 0 | |
| with open(json_path, 'rb') as f: | |
| for msg in ijson.items(f, prefix): | |
| if msg.get('type') == 'message': | |
| count += 1 | |
| return count | |
| else: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| messages = data if isinstance(data, list) else data.get('messages', []) | |
| return sum(1 for msg in messages if msg.get('type') == 'message') | |
| def init_database(db_path: str) -> sqlite3.Connection: | |
| """Initialize SQLite database with optimized schema.""" | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| # Read and execute schema | |
| schema_path = Path(__file__).parent / 'schema.sql' | |
| if schema_path.exists(): | |
| with open(schema_path, 'r') as f: | |
| conn.executescript(f.read()) | |
| else: | |
| raise FileNotFoundError(f"Schema file not found: {schema_path}") | |
| return conn | |
| class OptimizedIndexer: | |
| """ | |
| High-performance indexer with batch processing and graph building. | |
| Features: | |
| - Batch inserts (100x faster than individual inserts) | |
| - Bloom filter for duplicate detection | |
| - Reply graph construction | |
| - Trigram index building | |
| - Progress tracking | |
| """ | |
| def __init__( | |
| self, | |
| db_path: str, | |
| batch_size: int = 1000, | |
| build_trigrams: bool = False, | |
| build_graph: bool = True | |
| ): | |
| self.db_path = db_path | |
| self.batch_size = batch_size | |
| self.build_trigrams = build_trigrams | |
| self.build_graph = build_graph | |
| self.conn = init_database(db_path) | |
| self.bloom = BloomFilter(expected_items=1000000, fp_rate=0.01) | |
| self.graph = ReplyGraph() if build_graph else None | |
| # Batch buffers | |
| self.message_batch: list[tuple] = [] | |
| self.entity_batch: list[tuple] = [] | |
| self.trigram_batch: list[tuple] = [] | |
| # Stats | |
| self.stats = { | |
| 'messages': 0, | |
| 'entities': 0, | |
| 'trigrams': 0, | |
| 'users': {}, | |
| 'skipped': 0, | |
| 'duplicates': 0 | |
| } | |
| def index_file(self, json_path: str, show_progress: bool = True) -> dict: | |
| """ | |
| Index a JSON file into the database. | |
| Returns statistics dict. | |
| """ | |
| start_time = time.time() | |
| # Count total for progress | |
| if show_progress: | |
| print(f"Counting messages in {json_path}...") | |
| total = count_messages(json_path) | |
| print(f"Found {total:,} messages to index") | |
| else: | |
| total = 0 | |
| # Disable auto-commit for batch processing | |
| self.conn.execute('BEGIN TRANSACTION') | |
| try: | |
| for i, msg in enumerate(load_json_messages(json_path)): | |
| self._index_message(msg) | |
| # Progress update | |
| if show_progress and (i + 1) % 10000 == 0: | |
| elapsed = time.time() - start_time | |
| rate = (i + 1) / elapsed | |
| eta = (total - i - 1) / rate if rate > 0 else 0 | |
| print(f" Indexed {i+1:,}/{total:,} ({100*(i+1)/total:.1f}%) " | |
| f"- {rate:.0f} msg/s - ETA: {eta:.0f}s") | |
| # Flush remaining batches | |
| self._flush_batches() | |
| # Build reply graph in database | |
| if self.build_graph: | |
| self._build_graph_tables() | |
| # Update users table | |
| self._update_users() | |
| # Commit transaction | |
| self.conn.commit() | |
| # Optimize FTS index | |
| print("Optimizing FTS index...") | |
| self.conn.execute("INSERT INTO messages_fts(messages_fts) VALUES('optimize')") | |
| self.conn.commit() | |
| except Exception as e: | |
| self.conn.rollback() | |
| raise e | |
| elapsed = time.time() - start_time | |
| self.stats['elapsed_seconds'] = elapsed | |
| self.stats['messages_per_second'] = self.stats['messages'] / elapsed if elapsed > 0 else 0 | |
| return self.stats | |
| def _index_message(self, msg: dict) -> None: | |
| """Index a single message into batch buffers.""" | |
| msg_id = msg['id'] | |
| # Duplicate check with Bloom filter | |
| msg_key = f"msg_{msg_id}" | |
| if msg_key in self.bloom: | |
| self.stats['duplicates'] += 1 | |
| return | |
| self.bloom.add(msg_key) | |
| # Add to message batch | |
| self.message_batch.append(( | |
| msg['id'], msg['type'], msg['date'], msg['date_unixtime'], | |
| msg['from_name'], msg['from_id'], msg['reply_to_message_id'], | |
| msg['forwarded_from'], msg['forwarded_from_id'], msg['text_plain'], | |
| msg['text_length'], msg['has_media'], msg['has_photo'], | |
| msg['has_links'], msg['has_mentions'], msg['is_edited'], | |
| msg['edited_unixtime'], msg['photo_file_size'], | |
| msg['photo_width'], msg['photo_height'], msg['raw_json'] | |
| )) | |
| # Add entities to batch | |
| for entity in msg['entities']: | |
| self.entity_batch.append((msg_id, entity['type'], entity['value'])) | |
| # Add trigrams if enabled | |
| if self.build_trigrams and msg['text_plain']: | |
| for i, trigram in enumerate(generate_trigrams(msg['text_plain'])): | |
| self.trigram_batch.append((trigram, msg_id, i)) | |
| # Build graph | |
| if self.graph: | |
| self.graph.add_message(msg_id, msg['reply_to_message_id']) | |
| # Track users | |
| user_id = msg['from_id'] | |
| if user_id: | |
| if user_id not in self.stats['users']: | |
| self.stats['users'][user_id] = { | |
| 'display_name': msg['from_name'], | |
| 'first_seen': msg['date_unixtime'], | |
| 'last_seen': msg['date_unixtime'], | |
| 'count': 0 | |
| } | |
| self.stats['users'][user_id]['count'] += 1 | |
| ts = msg['date_unixtime'] | |
| if ts and ts < self.stats['users'][user_id]['first_seen']: | |
| self.stats['users'][user_id]['first_seen'] = ts | |
| if ts and ts > self.stats['users'][user_id]['last_seen']: | |
| self.stats['users'][user_id]['last_seen'] = ts | |
| self.stats['messages'] += 1 | |
| # Flush if batch is full | |
| if len(self.message_batch) >= self.batch_size: | |
| self._flush_batches() | |
| def _flush_batches(self) -> None: | |
| """Flush all batch buffers to database.""" | |
| cursor = self.conn.cursor() | |
| # Insert messages | |
| if self.message_batch: | |
| cursor.executemany(''' | |
| INSERT OR REPLACE INTO messages ( | |
| id, type, date, date_unixtime, from_name, from_id, | |
| reply_to_message_id, forwarded_from, forwarded_from_id, | |
| text_plain, text_length, has_media, has_photo, has_links, | |
| has_mentions, is_edited, edited_unixtime, photo_file_size, | |
| photo_width, photo_height, raw_json | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', self.message_batch) | |
| self.message_batch = [] | |
| # Insert entities | |
| if self.entity_batch: | |
| cursor.executemany(''' | |
| INSERT INTO entities (message_id, type, value) | |
| VALUES (?, ?, ?) | |
| ''', self.entity_batch) | |
| self.stats['entities'] += len(self.entity_batch) | |
| self.entity_batch = [] | |
| # Insert trigrams | |
| if self.trigram_batch: | |
| cursor.executemany(''' | |
| INSERT OR IGNORE INTO trigrams (trigram, message_id, position) | |
| VALUES (?, ?, ?) | |
| ''', self.trigram_batch) | |
| self.stats['trigrams'] += len(self.trigram_batch) | |
| self.trigram_batch = [] | |
| def _build_graph_tables(self) -> None: | |
| """Build reply graph tables from in-memory graph.""" | |
| if not self.graph: | |
| return | |
| print("Building reply graph tables...") | |
| cursor = self.conn.cursor() | |
| # Insert edges into reply_graph | |
| edges = [] | |
| for parent_id, children in self.graph.children.items(): | |
| for child_id in children: | |
| edges.append((parent_id, child_id, 1)) | |
| if edges: | |
| cursor.executemany(''' | |
| INSERT OR IGNORE INTO reply_graph (parent_id, child_id, depth) | |
| VALUES (?, ?, ?) | |
| ''', edges) | |
| # Find connected components (threads) | |
| print("Finding conversation threads...") | |
| components = self.graph.find_connected_components() | |
| thread_data = [] | |
| message_thread_data = [] | |
| for thread_id, component in enumerate(components): | |
| if not component: | |
| continue | |
| # Find root (message with no parent in this component) | |
| root_id = None | |
| for msg_id in component: | |
| if msg_id not in self.graph.parents: | |
| root_id = msg_id | |
| break | |
| if root_id is None: | |
| root_id = min(component) | |
| # Get thread stats | |
| cursor.execute(''' | |
| SELECT MIN(date_unixtime), MAX(date_unixtime), COUNT(DISTINCT from_id) | |
| FROM messages WHERE id IN ({}) | |
| '''.format(','.join('?' * len(component))), list(component)) | |
| row = cursor.fetchone() | |
| thread_data.append(( | |
| root_id, | |
| len(component), | |
| row[0], # first_message_time | |
| row[1], # last_message_time | |
| row[2] # participant_count | |
| )) | |
| # Map messages to threads with depth | |
| for msg_id in component: | |
| depth = len(self.graph.get_ancestors(msg_id)) | |
| message_thread_data.append((msg_id, len(thread_data), depth)) | |
| # Insert thread data | |
| cursor.executemany(''' | |
| INSERT INTO threads (root_message_id, message_count, first_message_time, | |
| last_message_time, participant_count) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', thread_data) | |
| cursor.executemany(''' | |
| INSERT OR REPLACE INTO message_threads (message_id, thread_id, depth) | |
| VALUES (?, ?, ?) | |
| ''', message_thread_data) | |
| print(f" Created {len(thread_data)} conversation threads") | |
| def _update_users(self) -> None: | |
| """Update users table from tracked data.""" | |
| cursor = self.conn.cursor() | |
| user_data = [ | |
| (user_id, data['display_name'], data['first_seen'], | |
| data['last_seen'], data['count']) | |
| for user_id, data in self.stats['users'].items() | |
| ] | |
| cursor.executemany(''' | |
| INSERT OR REPLACE INTO users (user_id, display_name, first_seen, last_seen, message_count) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', user_data) | |
| def close(self) -> None: | |
| """Close database connection.""" | |
| self.conn.close() | |
| class IncrementalIndexer: | |
| """ | |
| Incremental indexer for adding new JSON data to existing database. | |
| Features: | |
| - Loads existing message IDs into Bloom filter | |
| - Only processes new messages | |
| - Updates FTS index automatically | |
| - Fast duplicate detection O(1) | |
| """ | |
| def __init__(self, db_path: str, batch_size: int = 1000): | |
| self.db_path = db_path | |
| self.batch_size = batch_size | |
| if not os.path.exists(db_path): | |
| raise FileNotFoundError(f"Database not found: {db_path}. Use OptimizedIndexer for initial import.") | |
| self.conn = sqlite3.connect(db_path) | |
| self.conn.row_factory = sqlite3.Row | |
| # Batch buffers | |
| self.message_batch: list[tuple] = [] | |
| self.entity_batch: list[tuple] = [] | |
| # Stats (must be initialized before _load_existing_ids) | |
| self.stats = { | |
| 'total_in_file': 0, | |
| 'new_messages': 0, | |
| 'duplicates': 0, | |
| 'entities': 0, | |
| 'users_updated': 0 | |
| } | |
| # Load existing message IDs into Bloom filter | |
| self.bloom = BloomFilter(expected_items=2000000, fp_rate=0.001) | |
| self._load_existing_ids() | |
| def _load_existing_ids(self) -> None: | |
| """Load existing message IDs into Bloom filter for O(1) duplicate detection.""" | |
| cursor = self.conn.cursor() | |
| cursor.execute("SELECT id FROM messages") | |
| count = 0 | |
| for row in cursor: | |
| self.bloom.add(f"msg_{row[0]}") | |
| count += 1 | |
| print(f"Loaded {count:,} existing message IDs into Bloom filter") | |
| self.stats['existing_count'] = count | |
| def update_from_json(self, json_path: str, show_progress: bool = True) -> dict: | |
| """ | |
| Add new messages from JSON file to existing database. | |
| Only messages that don't exist in the database will be added. | |
| FTS5 index is updated automatically. | |
| Uses streaming JSON parser (ijson) when available for constant memory usage. | |
| """ | |
| start_time = time.time() | |
| # Count total for progress (streaming-aware) | |
| total_hint = 0 | |
| if show_progress: | |
| total_hint = count_messages(json_path) | |
| print(f"Processing ~{total_hint:,} messages from {json_path}") | |
| self.stats['total_in_file'] = total_hint | |
| # Start transaction | |
| self.conn.execute('BEGIN TRANSACTION') | |
| try: | |
| if HAS_IJSON: | |
| structure = _detect_json_structure(json_path) | |
| prefix = 'item' if structure == 'list' else 'messages.item' | |
| with open(json_path, 'rb') as f: | |
| for i, msg in enumerate(ijson.items(f, prefix)): | |
| if msg.get('type') != 'message': | |
| continue | |
| parsed = parse_message(msg) | |
| if parsed: | |
| self._process_message(parsed) | |
| if show_progress and (i + 1) % 10000 == 0: | |
| print(f" Processed {i+1:,} - " | |
| f"New: {self.stats['new_messages']:,}, " | |
| f"Duplicates: {self.stats['duplicates']:,}") | |
| else: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| messages = data if isinstance(data, list) else data.get('messages', []) | |
| self.stats['total_in_file'] = len(messages) | |
| for i, msg in enumerate(messages): | |
| if msg.get('type') != 'message': | |
| continue | |
| parsed = parse_message(msg) | |
| if parsed: | |
| self._process_message(parsed) | |
| if show_progress and (i + 1) % 10000 == 0: | |
| print(f" Processed {i+1:,}/{len(messages):,} - " | |
| f"New: {self.stats['new_messages']:,}, " | |
| f"Duplicates: {self.stats['duplicates']:,}") | |
| # Flush remaining | |
| self._flush_batches() | |
| # Update user stats | |
| self._update_user_stats() | |
| # Commit | |
| self.conn.commit() | |
| # Optimize FTS if we added new data | |
| if self.stats['new_messages'] > 0: | |
| print("Optimizing FTS index...") | |
| self.conn.execute("INSERT INTO messages_fts(messages_fts) VALUES('optimize')") | |
| self.conn.commit() | |
| except Exception as e: | |
| self.conn.rollback() | |
| raise e | |
| elapsed = time.time() - start_time | |
| self.stats['elapsed_seconds'] = elapsed | |
| return self.stats | |
| def update_from_json_data(self, json_data: dict | list, show_progress: bool = False) -> dict: | |
| """ | |
| Add new messages from JSON data (already parsed, not from file). | |
| Useful for API uploads. | |
| """ | |
| start_time = time.time() | |
| messages = json_data if isinstance(json_data, list) else json_data.get('messages', []) | |
| self.stats['total_in_file'] = len(messages) | |
| # Start transaction | |
| self.conn.execute('BEGIN TRANSACTION') | |
| try: | |
| for msg in messages: | |
| if msg.get('type') != 'message': | |
| continue | |
| parsed = parse_message(msg) | |
| if parsed: | |
| self._process_message(parsed) | |
| # Flush remaining | |
| self._flush_batches() | |
| # Update user stats | |
| self._update_user_stats() | |
| # Commit | |
| self.conn.commit() | |
| # Optimize FTS if we added new data | |
| if self.stats['new_messages'] > 0: | |
| self.conn.execute("INSERT INTO messages_fts(messages_fts) VALUES('optimize')") | |
| self.conn.commit() | |
| except Exception as e: | |
| self.conn.rollback() | |
| raise e | |
| elapsed = time.time() - start_time | |
| self.stats['elapsed_seconds'] = elapsed | |
| return self.stats | |
| def _process_message(self, msg: dict) -> None: | |
| """Process a single message, adding to batch if new.""" | |
| msg_id = msg['id'] | |
| msg_key = f"msg_{msg_id}" | |
| # Check if already exists (Bloom filter first, then DB if needed) | |
| if msg_key in self.bloom: | |
| self.stats['duplicates'] += 1 | |
| return | |
| # Add to Bloom filter | |
| self.bloom.add(msg_key) | |
| # Add to message batch | |
| self.message_batch.append(( | |
| msg['id'], msg['type'], msg['date'], msg['date_unixtime'], | |
| msg['from_name'], msg['from_id'], msg['reply_to_message_id'], | |
| msg['forwarded_from'], msg['forwarded_from_id'], msg['text_plain'], | |
| msg['text_length'], msg['has_media'], msg['has_photo'], | |
| msg['has_links'], msg['has_mentions'], msg['is_edited'], | |
| msg['edited_unixtime'], msg['photo_file_size'], | |
| msg['photo_width'], msg['photo_height'], msg['raw_json'] | |
| )) | |
| # Add entities to batch | |
| for entity in msg['entities']: | |
| self.entity_batch.append((msg_id, entity['type'], entity['value'])) | |
| self.stats['new_messages'] += 1 | |
| # Flush if batch is full | |
| if len(self.message_batch) >= self.batch_size: | |
| self._flush_batches() | |
| def _flush_batches(self) -> None: | |
| """Flush batch buffers to database.""" | |
| cursor = self.conn.cursor() | |
| # Insert messages (FTS5 trigger will update automatically) | |
| if self.message_batch: | |
| cursor.executemany(''' | |
| INSERT OR IGNORE INTO messages ( | |
| id, type, date, date_unixtime, from_name, from_id, | |
| reply_to_message_id, forwarded_from, forwarded_from_id, | |
| text_plain, text_length, has_media, has_photo, has_links, | |
| has_mentions, is_edited, edited_unixtime, photo_file_size, | |
| photo_width, photo_height, raw_json | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', self.message_batch) | |
| self.message_batch = [] | |
| # Insert entities | |
| if self.entity_batch: | |
| cursor.executemany(''' | |
| INSERT OR IGNORE INTO entities (message_id, type, value) | |
| VALUES (?, ?, ?) | |
| ''', self.entity_batch) | |
| self.stats['entities'] += len(self.entity_batch) | |
| self.entity_batch = [] | |
| def _update_user_stats(self) -> None: | |
| """Update users table with aggregated stats.""" | |
| cursor = self.conn.cursor() | |
| # Upsert users from messages | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO users (user_id, display_name, first_seen, last_seen, message_count) | |
| SELECT | |
| from_id, | |
| from_name, | |
| MIN(date_unixtime), | |
| MAX(date_unixtime), | |
| COUNT(*) | |
| FROM messages | |
| WHERE from_id IS NOT NULL AND from_id != '' | |
| GROUP BY from_id | |
| ''') | |
| self.stats['users_updated'] = cursor.rowcount | |
| def close(self) -> None: | |
| """Close database connection.""" | |
| self.conn.close() | |
| def update_database(db_path: str, json_path: str) -> dict: | |
| """ | |
| Convenience function to update database with new JSON file. | |
| Args: | |
| db_path: Path to existing SQLite database | |
| json_path: Path to new JSON file | |
| Returns: | |
| Statistics dict | |
| """ | |
| indexer = IncrementalIndexer(db_path) | |
| try: | |
| stats = indexer.update_from_json(json_path) | |
| return stats | |
| finally: | |
| indexer.close() | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Index Telegram JSON export to SQLite (Optimized)') | |
| parser.add_argument('json_file', help='Path to Telegram export JSON file') | |
| parser.add_argument('--db', default='telegram.db', help='SQLite database path') | |
| parser.add_argument('--batch-size', type=int, default=1000, help='Batch size for inserts') | |
| parser.add_argument('--build-trigrams', action='store_true', help='Build trigram index for fuzzy search') | |
| parser.add_argument('--no-graph', action='store_true', help='Skip building reply graph') | |
| parser.add_argument('--quiet', action='store_true', help='Suppress progress output') | |
| parser.add_argument('--update', action='store_true', | |
| help='Update existing database (add only new messages)') | |
| args = parser.parse_args() | |
| if not os.path.exists(args.json_file): | |
| print(f"Error: JSON file not found: {args.json_file}") | |
| return 1 | |
| # Update mode: add new messages to existing database | |
| if args.update: | |
| if not os.path.exists(args.db): | |
| print(f"Error: Database not found: {args.db}") | |
| print("Use without --update flag for initial import") | |
| return 1 | |
| print(f"{'='*50}") | |
| print(f"INCREMENTAL UPDATE MODE") | |
| print(f"{'='*50}") | |
| print(f"Database: {args.db}") | |
| print(f"New JSON: {args.json_file}") | |
| print() | |
| indexer = IncrementalIndexer(args.db, args.batch_size) | |
| stats = indexer.update_from_json(args.json_file, show_progress=not args.quiet) | |
| print(f"\n{'='*50}") | |
| print(f"Update complete!") | |
| print(f"{'='*50}") | |
| print(f" Messages in file: {stats['total_in_file']:,}") | |
| print(f" Already existed: {stats['duplicates']:,}") | |
| print(f" New messages added: {stats['new_messages']:,}") | |
| print(f" New entities: {stats['entities']:,}") | |
| print(f" Time elapsed: {stats['elapsed_seconds']:.1f}s") | |
| indexer.close() | |
| return 0 | |
| # Initial import mode | |
| print(f"Initializing database: {args.db}") | |
| indexer = OptimizedIndexer( | |
| db_path=args.db, | |
| batch_size=args.batch_size, | |
| build_trigrams=args.build_trigrams, | |
| build_graph=not args.no_graph | |
| ) | |
| print(f"Indexing: {args.json_file}") | |
| stats = indexer.index_file(args.json_file, show_progress=not args.quiet) | |
| print(f"\n{'='*50}") | |
| print(f"Indexing complete!") | |
| print(f"{'='*50}") | |
| print(f" Messages indexed: {stats['messages']:,}") | |
| print(f" Entities extracted: {stats['entities']:,}") | |
| print(f" Unique users: {len(stats['users']):,}") | |
| print(f" Duplicates skipped: {stats['duplicates']:,}") | |
| if stats.get('trigrams'): | |
| print(f" Trigrams indexed: {stats['trigrams']:,}") | |
| print(f" Time elapsed: {stats['elapsed_seconds']:.1f}s") | |
| print(f" Speed: {stats['messages_per_second']:.0f} msg/s") | |
| print(f"\nDatabase saved to: {args.db}") | |
| indexer.close() | |
| return 0 | |
| if __name__ == '__main__': | |
| exit(main()) | |