telegram-analytics / data_structures.py
rottg's picture
Upload folder using huggingface_hub
a99d4dc
#!/usr/bin/env python3
"""
Advanced Data Structures for Efficient Search and Traversal
Includes:
- Bloom Filter: O(1) "definitely not in set" checks
- Trie: O(k) prefix search and autocomplete
- LRU Cache: O(1) cached query results
- Graph algorithms: DFS, BFS for thread traversal
"""
import hashlib
import math
from collections import OrderedDict, defaultdict, deque
from typing import Any, Callable, Generator, Iterator, Optional
from functools import wraps
# ============================================
# BLOOM FILTER
# ============================================
class BloomFilter:
"""
Space-efficient probabilistic data structure for set membership testing.
- O(k) insert and lookup where k is number of hash functions
- False positives possible, false negatives impossible
- Use case: Quick "message ID exists?" check before DB query
Example:
bf = BloomFilter(expected_items=100000, fp_rate=0.01)
bf.add("message_123")
if "message_123" in bf: # O(1) check
# Might exist, check DB
else:
# Definitely doesn't exist, skip DB
"""
def __init__(self, expected_items: int = 100000, fp_rate: float = 0.01):
"""
Initialize Bloom filter.
Args:
expected_items: Expected number of items to store
fp_rate: Desired false positive rate (0.01 = 1%)
"""
# Calculate optimal size and hash count
self.size = self._optimal_size(expected_items, fp_rate)
self.hash_count = self._optimal_hash_count(self.size, expected_items)
self.bit_array = bytearray(math.ceil(self.size / 8))
self.count = 0
@staticmethod
def _optimal_size(n: int, p: float) -> int:
"""Calculate optimal bit array size: m = -n*ln(p) / (ln2)^2"""
return int(-n * math.log(p) / (math.log(2) ** 2))
@staticmethod
def _optimal_hash_count(m: int, n: int) -> int:
"""Calculate optimal hash count: k = (m/n) * ln2"""
return max(1, int((m / n) * math.log(2)))
def _get_hash_values(self, item: str) -> Generator[int, None, None]:
"""Generate k hash values using double hashing technique."""
h1 = int(hashlib.md5(item.encode()).hexdigest(), 16)
h2 = int(hashlib.sha1(item.encode()).hexdigest(), 16)
for i in range(self.hash_count):
yield (h1 + i * h2) % self.size
def add(self, item: str) -> None:
"""Add an item to the filter. O(k) where k is hash count."""
for pos in self._get_hash_values(item):
byte_idx, bit_idx = divmod(pos, 8)
self.bit_array[byte_idx] |= (1 << bit_idx)
self.count += 1
def __contains__(self, item: str) -> bool:
"""Check if item might be in the filter. O(k)."""
for pos in self._get_hash_values(item):
byte_idx, bit_idx = divmod(pos, 8)
if not (self.bit_array[byte_idx] & (1 << bit_idx)):
return False # Definitely not in set
return True # Might be in set
def __len__(self) -> int:
return self.count
@property
def memory_usage(self) -> int:
"""Return memory usage in bytes."""
return len(self.bit_array)
# ============================================
# TRIE (PREFIX TREE)
# ============================================
class TrieNode:
"""Node in a Trie data structure."""
__slots__ = ['children', 'is_end', 'data', 'count']
def __init__(self):
self.children: dict[str, TrieNode] = {}
self.is_end: bool = False
self.data: Any = None # Store associated data (e.g., message IDs)
self.count: int = 0 # Frequency count
class Trie:
"""
Trie (Prefix Tree) for fast prefix-based search and autocomplete.
- O(k) insert/search where k is key length
- O(p + n) prefix search where p is prefix length, n is results
- Use case: Autocomplete usernames, find all messages starting with prefix
Example:
trie = Trie()
trie.insert("@username1", message_ids=[1, 2, 3])
trie.insert("@username2", message_ids=[4, 5])
results = trie.search_prefix("@user") # Returns both
completions = trie.autocomplete("@user", limit=5)
"""
def __init__(self):
self.root = TrieNode()
self.size = 0
def insert(self, key: str, data: Any = None) -> None:
"""Insert a key with optional associated data. O(k)."""
node = self.root
for char in key.lower():
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.count += 1
if not node.is_end:
self.size += 1
node.is_end = True
# Store or append data
if data is not None:
if node.data is None:
node.data = []
if isinstance(data, list):
node.data.extend(data)
else:
node.data.append(data)
def search(self, key: str) -> Optional[Any]:
"""Search for exact key. O(k). Returns associated data or None."""
node = self._find_node(key.lower())
return node.data if node and node.is_end else None
def __contains__(self, key: str) -> bool:
"""Check if key exists. O(k)."""
node = self._find_node(key.lower())
return node is not None and node.is_end
def _find_node(self, prefix: str) -> Optional[TrieNode]:
"""Find the node for a given prefix."""
node = self.root
for char in prefix:
if char not in node.children:
return None
node = node.children[char]
return node
def search_prefix(self, prefix: str) -> list[tuple[str, Any]]:
"""
Find all keys with given prefix. O(p + n).
Returns list of (key, data) tuples.
"""
results = []
node = self._find_node(prefix.lower())
if node:
self._collect_all(node, prefix.lower(), results)
return results
def _collect_all(
self,
node: TrieNode,
prefix: str,
results: list[tuple[str, Any]]
) -> None:
"""Recursively collect all keys under a node."""
if node.is_end:
results.append((prefix, node.data))
for char, child in node.children.items():
self._collect_all(child, prefix + char, results)
def autocomplete(self, prefix: str, limit: int = 10) -> list[str]:
"""
Get autocomplete suggestions for prefix.
Returns most frequent completions up to limit.
"""
node = self._find_node(prefix.lower())
if not node:
return []
suggestions = []
self._collect_suggestions(node, prefix.lower(), suggestions)
# Sort by frequency and return top results
suggestions.sort(key=lambda x: x[1], reverse=True)
return [s[0] for s in suggestions[:limit]]
def _collect_suggestions(
self,
node: TrieNode,
prefix: str,
suggestions: list[tuple[str, int]]
) -> None:
"""Collect suggestions with their frequency counts."""
if node.is_end:
suggestions.append((prefix, node.count))
for char, child in node.children.items():
self._collect_suggestions(child, prefix + char, suggestions)
def __len__(self) -> int:
return self.size
# ============================================
# LRU CACHE
# ============================================
class LRUCache:
"""
Least Recently Used (LRU) Cache for query results.
- O(1) get/put operations
- Automatically evicts least recently used items when full
- Use case: Cache expensive query results
Example:
cache = LRUCache(maxsize=1000)
cache.put("query:hello", results)
results = cache.get("query:hello") # O(1)
"""
def __init__(self, maxsize: int = 1000):
self.maxsize = maxsize
self.cache: OrderedDict[str, Any] = OrderedDict()
self.hits = 0
self.misses = 0
def get(self, key: str) -> Optional[Any]:
"""Get item from cache. O(1). Returns None if not found."""
if key in self.cache:
self.cache.move_to_end(key)
self.hits += 1
return self.cache[key]
self.misses += 1
return None
def put(self, key: str, value: Any) -> None:
"""Put item in cache. O(1). Evicts LRU item if full."""
if key in self.cache:
self.cache.move_to_end(key)
else:
if len(self.cache) >= self.maxsize:
self.cache.popitem(last=False)
self.cache[key] = value
def __contains__(self, key: str) -> bool:
return key in self.cache
def __len__(self) -> int:
return len(self.cache)
def clear(self) -> None:
"""Clear the cache."""
self.cache.clear()
self.hits = 0
self.misses = 0
@property
def hit_rate(self) -> float:
"""Return cache hit rate."""
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def stats(self) -> dict:
"""Return cache statistics."""
return {
'size': len(self.cache),
'maxsize': self.maxsize,
'hits': self.hits,
'misses': self.misses,
'hit_rate': self.hit_rate
}
def lru_cached(cache: LRUCache, key_func: Callable[..., str] = None):
"""
Decorator to cache function results using LRUCache.
Example:
cache = LRUCache(1000)
@lru_cached(cache, key_func=lambda q, **kw: f"search:{q}")
def search(query, limit=100):
return expensive_search(query, limit)
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if key_func:
key = key_func(*args, **kwargs)
else:
key = f"{func.__name__}:{args}:{kwargs}"
result = cache.get(key)
if result is not None:
return result
result = func(*args, **kwargs)
cache.put(key, result)
return result
return wrapper
return decorator
# ============================================
# GRAPH ALGORITHMS FOR REPLY THREADS
# ============================================
class ReplyGraph:
"""
Graph structure for message reply relationships.
Supports:
- DFS: Depth-first traversal for finding all descendants
- BFS: Breadth-first traversal for level-order exploration
- Connected components: Find isolated conversation threads
- Topological sort: Order messages by reply chain
Time complexity: O(V + E) for traversals
Space complexity: O(V) for visited set
"""
def __init__(self):
# Adjacency lists
self.children: dict[int, list[int]] = defaultdict(list) # parent -> [children]
self.parents: dict[int, int] = {} # child -> parent
self.nodes: set[int] = set()
def add_edge(self, parent_id: int, child_id: int) -> None:
"""Add a reply relationship. O(1)."""
self.children[parent_id].append(child_id)
self.parents[child_id] = parent_id
self.nodes.add(parent_id)
self.nodes.add(child_id)
def add_message(self, message_id: int, reply_to: Optional[int] = None) -> None:
"""Add a message, optionally with its reply relationship."""
self.nodes.add(message_id)
if reply_to is not None:
self.add_edge(reply_to, message_id)
def get_children(self, message_id: int) -> list[int]:
"""Get direct replies to a message. O(1)."""
return self.children.get(message_id, [])
def get_parent(self, message_id: int) -> Optional[int]:
"""Get the message this is a reply to. O(1)."""
return self.parents.get(message_id)
# ==================
# DFS - Depth First Search
# ==================
def dfs_descendants(self, start_id: int) -> list[int]:
"""
DFS: Get all descendants of a message (entire sub-thread).
Time: O(V + E)
Space: O(V)
Returns messages in DFS order (deep before wide).
"""
result = []
visited = set()
def dfs(node_id: int) -> None:
if node_id in visited:
return
visited.add(node_id)
result.append(node_id)
for child_id in self.children.get(node_id, []):
dfs(child_id)
dfs(start_id)
return result
def dfs_iterative(self, start_id: int) -> Iterator[int]:
"""
Iterative DFS using explicit stack (avoids recursion limit).
Yields message IDs in DFS order.
"""
stack = [start_id]
visited = set()
while stack:
node_id = stack.pop()
if node_id in visited:
continue
visited.add(node_id)
yield node_id
# Add children in reverse order for correct DFS order
for child_id in reversed(self.children.get(node_id, [])):
if child_id not in visited:
stack.append(child_id)
# ==================
# BFS - Breadth First Search
# ==================
def bfs_descendants(self, start_id: int) -> list[int]:
"""
BFS: Get all descendants level by level.
Time: O(V + E)
Space: O(V)
Returns messages in BFS order (level by level).
"""
result = []
visited = set()
queue = deque([start_id])
while queue:
node_id = queue.popleft()
if node_id in visited:
continue
visited.add(node_id)
result.append(node_id)
for child_id in self.children.get(node_id, []):
if child_id not in visited:
queue.append(child_id)
return result
def bfs_with_depth(self, start_id: int) -> list[tuple[int, int]]:
"""
BFS with depth information.
Returns list of (message_id, depth) tuples.
"""
result = []
visited = set()
queue = deque([(start_id, 0)])
while queue:
node_id, depth = queue.popleft()
if node_id in visited:
continue
visited.add(node_id)
result.append((node_id, depth))
for child_id in self.children.get(node_id, []):
if child_id not in visited:
queue.append((child_id, depth + 1))
return result
# ==================
# THREAD RECONSTRUCTION
# ==================
def get_thread_root(self, message_id: int) -> int:
"""
Find the root message of a thread. O(d) where d is depth.
"""
current = message_id
while current in self.parents:
current = self.parents[current]
return current
def get_full_thread(self, message_id: int) -> list[int]:
"""
Get the complete thread containing a message.
1. Find root via parent traversal
2. BFS from root to get all descendants
"""
root = self.get_thread_root(message_id)
return self.bfs_descendants(root)
def get_ancestors(self, message_id: int) -> list[int]:
"""
Get all ancestors (path to root). O(d).
Returns in order from message to root.
"""
ancestors = []
current = message_id
while current in self.parents:
parent = self.parents[current]
ancestors.append(parent)
current = parent
return ancestors
def get_thread_path(self, message_id: int) -> list[int]:
"""
Get path from root to message. O(d).
"""
path = [message_id]
current = message_id
while current in self.parents:
parent = self.parents[current]
path.append(parent)
current = parent
return list(reversed(path))
# ==================
# CONNECTED COMPONENTS
# ==================
def find_connected_components(self) -> list[set[int]]:
"""
Find all isolated conversation threads.
Time: O(V + E)
Returns list of sets, each set is a connected thread.
"""
visited = set()
components = []
for node in self.nodes:
if node not in visited:
component = set()
# Use BFS to find all connected nodes
queue = deque([node])
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
component.add(current)
# Add parent
if current in self.parents:
parent = self.parents[current]
if parent not in visited:
queue.append(parent)
# Add children
for child in self.children.get(current, []):
if child not in visited:
queue.append(child)
components.append(component)
return components
def get_thread_roots(self) -> list[int]:
"""Get all thread root messages (messages with no parent)."""
return [node for node in self.nodes if node not in self.parents]
# ==================
# STATISTICS
# ==================
def get_thread_depth(self, root_id: int) -> int:
"""Get maximum depth of a thread from root."""
max_depth = 0
for _, depth in self.bfs_with_depth(root_id):
max_depth = max(max_depth, depth)
return max_depth
def get_subtree_size(self, message_id: int) -> int:
"""Get number of messages in subtree including root."""
return len(self.dfs_descendants(message_id))
@property
def stats(self) -> dict:
"""Get graph statistics."""
return {
'total_nodes': len(self.nodes),
'total_edges': sum(len(children) for children in self.children.values()),
'root_messages': len(self.get_thread_roots()),
'connected_components': len(self.find_connected_components())
}
# ============================================
# TRIGRAM SIMILARITY
# ============================================
def generate_trigrams(text: str) -> set[str]:
"""
Generate trigrams (3-character subsequences) for fuzzy matching.
Example: "hello" -> {"hel", "ell", "llo"}
"""
text = text.lower().strip()
if len(text) < 3:
return {text} if text else set()
return {text[i:i+3] for i in range(len(text) - 2)}
def trigram_similarity(text1: str, text2: str) -> float:
"""
Calculate Jaccard similarity between trigram sets.
Returns value between 0 (no similarity) and 1 (identical).
"""
tri1 = generate_trigrams(text1)
tri2 = generate_trigrams(text2)
if not tri1 or not tri2:
return 0.0
intersection = len(tri1 & tri2)
union = len(tri1 | tri2)
return intersection / union if union > 0 else 0.0
class TrigramIndex:
"""
Inverted index of trigrams for fuzzy search.
Time complexity:
- Insert: O(k) where k is text length
- Search: O(t * m) where t is trigrams in query, m is avg matches
Example:
index = TrigramIndex()
index.add(1, "ืฉืœื•ื ืขื•ืœื")
index.add(2, "ืฉืœื•ื ืœื›ื•ืœื")
results = index.search("ืฉืœื•ื", threshold=0.3)
"""
def __init__(self):
self.index: dict[str, set[int]] = defaultdict(set)
self.texts: dict[int, str] = {}
def add(self, doc_id: int, text: str) -> None:
"""Add a document to the index."""
self.texts[doc_id] = text
for trigram in generate_trigrams(text):
self.index[trigram].add(doc_id)
def search(self, query: str, threshold: float = 0.3, limit: int = 100) -> list[tuple[int, float]]:
"""
Search for documents similar to query.
Returns list of (doc_id, similarity) tuples, sorted by similarity.
"""
query_trigrams = generate_trigrams(query)
if not query_trigrams:
return []
# Find candidate documents
candidates: dict[int, int] = defaultdict(int)
for trigram in query_trigrams:
for doc_id in self.index.get(trigram, []):
candidates[doc_id] += 1
# Calculate similarity for candidates
results = []
query_len = len(query_trigrams)
for doc_id, match_count in candidates.items():
doc_trigrams = generate_trigrams(self.texts[doc_id])
doc_len = len(doc_trigrams)
# Jaccard similarity approximation
similarity = match_count / (query_len + doc_len - match_count)
if similarity >= threshold:
results.append((doc_id, similarity))
# Sort by similarity descending
results.sort(key=lambda x: x[1], reverse=True)
return results[:limit]
def __len__(self) -> int:
return len(self.texts)
# ============================================
# INVERTED INDEX
# ============================================
class InvertedIndex:
"""
Simple inverted index for fast word-to-document lookup.
Time complexity:
- Insert: O(w) where w is word count
- Search: O(1) for single word
- AND/OR queries: O(min(n1, n2)) for set operations
"""
def __init__(self):
self.index: dict[str, set[int]] = defaultdict(set)
self.doc_count = 0
def add(self, doc_id: int, text: str) -> None:
"""Add document to index."""
words = self._tokenize(text)
for word in words:
self.index[word].add(doc_id)
self.doc_count += 1
def _tokenize(self, text: str) -> list[str]:
"""Simple tokenization."""
import re
return re.findall(r'[\u0590-\u05FFa-zA-Z]+', text.lower())
def search(self, word: str) -> set[int]:
"""Find all documents containing word."""
return self.index.get(word.lower(), set())
def search_and(self, words: list[str]) -> set[int]:
"""Find documents containing ALL words."""
if not words:
return set()
result = self.search(words[0])
for word in words[1:]:
result &= self.search(word)
return result
def search_or(self, words: list[str]) -> set[int]:
"""Find documents containing ANY word."""
result = set()
for word in words:
result |= self.search(word)
return result
if __name__ == '__main__':
# Demo
print("=== Bloom Filter Demo ===")
bf = BloomFilter(expected_items=1000, fp_rate=0.01)
bf.add("message_1")
bf.add("message_2")
print(f"message_1 in filter: {'message_1' in bf}")
print(f"message_999 in filter: {'message_999' in bf}")
print(f"Memory usage: {bf.memory_usage} bytes")
print("\n=== Trie Demo ===")
trie = Trie()
trie.insert("@username1", data=1)
trie.insert("@username2", data=2)
trie.insert("@user_test", data=3)
print(f"Autocomplete '@user': {trie.autocomplete('@user')}")
print("\n=== Reply Graph Demo ===")
graph = ReplyGraph()
graph.add_message(1)
graph.add_message(2, reply_to=1)
graph.add_message(3, reply_to=1)
graph.add_message(4, reply_to=2)
graph.add_message(5, reply_to=2)
print(f"DFS from 1: {graph.dfs_descendants(1)}")
print(f"BFS from 1: {graph.bfs_descendants(1)}")
print(f"Thread path for 4: {graph.get_thread_path(4)}")
print(f"Stats: {graph.stats}")