Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Advanced Data Structures for Efficient Search and Traversal | |
| Includes: | |
| - Bloom Filter: O(1) "definitely not in set" checks | |
| - Trie: O(k) prefix search and autocomplete | |
| - LRU Cache: O(1) cached query results | |
| - Graph algorithms: DFS, BFS for thread traversal | |
| """ | |
| import hashlib | |
| import math | |
| from collections import OrderedDict, defaultdict, deque | |
| from typing import Any, Callable, Generator, Iterator, Optional | |
| from functools import wraps | |
| # ============================================ | |
| # BLOOM FILTER | |
| # ============================================ | |
| class BloomFilter: | |
| """ | |
| Space-efficient probabilistic data structure for set membership testing. | |
| - O(k) insert and lookup where k is number of hash functions | |
| - False positives possible, false negatives impossible | |
| - Use case: Quick "message ID exists?" check before DB query | |
| Example: | |
| bf = BloomFilter(expected_items=100000, fp_rate=0.01) | |
| bf.add("message_123") | |
| if "message_123" in bf: # O(1) check | |
| # Might exist, check DB | |
| else: | |
| # Definitely doesn't exist, skip DB | |
| """ | |
| def __init__(self, expected_items: int = 100000, fp_rate: float = 0.01): | |
| """ | |
| Initialize Bloom filter. | |
| Args: | |
| expected_items: Expected number of items to store | |
| fp_rate: Desired false positive rate (0.01 = 1%) | |
| """ | |
| # Calculate optimal size and hash count | |
| self.size = self._optimal_size(expected_items, fp_rate) | |
| self.hash_count = self._optimal_hash_count(self.size, expected_items) | |
| self.bit_array = bytearray(math.ceil(self.size / 8)) | |
| self.count = 0 | |
| def _optimal_size(n: int, p: float) -> int: | |
| """Calculate optimal bit array size: m = -n*ln(p) / (ln2)^2""" | |
| return int(-n * math.log(p) / (math.log(2) ** 2)) | |
| def _optimal_hash_count(m: int, n: int) -> int: | |
| """Calculate optimal hash count: k = (m/n) * ln2""" | |
| return max(1, int((m / n) * math.log(2))) | |
| def _get_hash_values(self, item: str) -> Generator[int, None, None]: | |
| """Generate k hash values using double hashing technique.""" | |
| h1 = int(hashlib.md5(item.encode()).hexdigest(), 16) | |
| h2 = int(hashlib.sha1(item.encode()).hexdigest(), 16) | |
| for i in range(self.hash_count): | |
| yield (h1 + i * h2) % self.size | |
| def add(self, item: str) -> None: | |
| """Add an item to the filter. O(k) where k is hash count.""" | |
| for pos in self._get_hash_values(item): | |
| byte_idx, bit_idx = divmod(pos, 8) | |
| self.bit_array[byte_idx] |= (1 << bit_idx) | |
| self.count += 1 | |
| def __contains__(self, item: str) -> bool: | |
| """Check if item might be in the filter. O(k).""" | |
| for pos in self._get_hash_values(item): | |
| byte_idx, bit_idx = divmod(pos, 8) | |
| if not (self.bit_array[byte_idx] & (1 << bit_idx)): | |
| return False # Definitely not in set | |
| return True # Might be in set | |
| def __len__(self) -> int: | |
| return self.count | |
| def memory_usage(self) -> int: | |
| """Return memory usage in bytes.""" | |
| return len(self.bit_array) | |
| # ============================================ | |
| # TRIE (PREFIX TREE) | |
| # ============================================ | |
| class TrieNode: | |
| """Node in a Trie data structure.""" | |
| __slots__ = ['children', 'is_end', 'data', 'count'] | |
| def __init__(self): | |
| self.children: dict[str, TrieNode] = {} | |
| self.is_end: bool = False | |
| self.data: Any = None # Store associated data (e.g., message IDs) | |
| self.count: int = 0 # Frequency count | |
| class Trie: | |
| """ | |
| Trie (Prefix Tree) for fast prefix-based search and autocomplete. | |
| - O(k) insert/search where k is key length | |
| - O(p + n) prefix search where p is prefix length, n is results | |
| - Use case: Autocomplete usernames, find all messages starting with prefix | |
| Example: | |
| trie = Trie() | |
| trie.insert("@username1", message_ids=[1, 2, 3]) | |
| trie.insert("@username2", message_ids=[4, 5]) | |
| results = trie.search_prefix("@user") # Returns both | |
| completions = trie.autocomplete("@user", limit=5) | |
| """ | |
| def __init__(self): | |
| self.root = TrieNode() | |
| self.size = 0 | |
| def insert(self, key: str, data: Any = None) -> None: | |
| """Insert a key with optional associated data. O(k).""" | |
| node = self.root | |
| for char in key.lower(): | |
| if char not in node.children: | |
| node.children[char] = TrieNode() | |
| node = node.children[char] | |
| node.count += 1 | |
| if not node.is_end: | |
| self.size += 1 | |
| node.is_end = True | |
| # Store or append data | |
| if data is not None: | |
| if node.data is None: | |
| node.data = [] | |
| if isinstance(data, list): | |
| node.data.extend(data) | |
| else: | |
| node.data.append(data) | |
| def search(self, key: str) -> Optional[Any]: | |
| """Search for exact key. O(k). Returns associated data or None.""" | |
| node = self._find_node(key.lower()) | |
| return node.data if node and node.is_end else None | |
| def __contains__(self, key: str) -> bool: | |
| """Check if key exists. O(k).""" | |
| node = self._find_node(key.lower()) | |
| return node is not None and node.is_end | |
| def _find_node(self, prefix: str) -> Optional[TrieNode]: | |
| """Find the node for a given prefix.""" | |
| node = self.root | |
| for char in prefix: | |
| if char not in node.children: | |
| return None | |
| node = node.children[char] | |
| return node | |
| def search_prefix(self, prefix: str) -> list[tuple[str, Any]]: | |
| """ | |
| Find all keys with given prefix. O(p + n). | |
| Returns list of (key, data) tuples. | |
| """ | |
| results = [] | |
| node = self._find_node(prefix.lower()) | |
| if node: | |
| self._collect_all(node, prefix.lower(), results) | |
| return results | |
| def _collect_all( | |
| self, | |
| node: TrieNode, | |
| prefix: str, | |
| results: list[tuple[str, Any]] | |
| ) -> None: | |
| """Recursively collect all keys under a node.""" | |
| if node.is_end: | |
| results.append((prefix, node.data)) | |
| for char, child in node.children.items(): | |
| self._collect_all(child, prefix + char, results) | |
| def autocomplete(self, prefix: str, limit: int = 10) -> list[str]: | |
| """ | |
| Get autocomplete suggestions for prefix. | |
| Returns most frequent completions up to limit. | |
| """ | |
| node = self._find_node(prefix.lower()) | |
| if not node: | |
| return [] | |
| suggestions = [] | |
| self._collect_suggestions(node, prefix.lower(), suggestions) | |
| # Sort by frequency and return top results | |
| suggestions.sort(key=lambda x: x[1], reverse=True) | |
| return [s[0] for s in suggestions[:limit]] | |
| def _collect_suggestions( | |
| self, | |
| node: TrieNode, | |
| prefix: str, | |
| suggestions: list[tuple[str, int]] | |
| ) -> None: | |
| """Collect suggestions with their frequency counts.""" | |
| if node.is_end: | |
| suggestions.append((prefix, node.count)) | |
| for char, child in node.children.items(): | |
| self._collect_suggestions(child, prefix + char, suggestions) | |
| def __len__(self) -> int: | |
| return self.size | |
| # ============================================ | |
| # LRU CACHE | |
| # ============================================ | |
| class LRUCache: | |
| """ | |
| Least Recently Used (LRU) Cache for query results. | |
| - O(1) get/put operations | |
| - Automatically evicts least recently used items when full | |
| - Use case: Cache expensive query results | |
| Example: | |
| cache = LRUCache(maxsize=1000) | |
| cache.put("query:hello", results) | |
| results = cache.get("query:hello") # O(1) | |
| """ | |
| def __init__(self, maxsize: int = 1000): | |
| self.maxsize = maxsize | |
| self.cache: OrderedDict[str, Any] = OrderedDict() | |
| self.hits = 0 | |
| self.misses = 0 | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get item from cache. O(1). Returns None if not found.""" | |
| if key in self.cache: | |
| self.cache.move_to_end(key) | |
| self.hits += 1 | |
| return self.cache[key] | |
| self.misses += 1 | |
| return None | |
| def put(self, key: str, value: Any) -> None: | |
| """Put item in cache. O(1). Evicts LRU item if full.""" | |
| if key in self.cache: | |
| self.cache.move_to_end(key) | |
| else: | |
| if len(self.cache) >= self.maxsize: | |
| self.cache.popitem(last=False) | |
| self.cache[key] = value | |
| def __contains__(self, key: str) -> bool: | |
| return key in self.cache | |
| def __len__(self) -> int: | |
| return len(self.cache) | |
| def clear(self) -> None: | |
| """Clear the cache.""" | |
| self.cache.clear() | |
| self.hits = 0 | |
| self.misses = 0 | |
| def hit_rate(self) -> float: | |
| """Return cache hit rate.""" | |
| total = self.hits + self.misses | |
| return self.hits / total if total > 0 else 0.0 | |
| def stats(self) -> dict: | |
| """Return cache statistics.""" | |
| return { | |
| 'size': len(self.cache), | |
| 'maxsize': self.maxsize, | |
| 'hits': self.hits, | |
| 'misses': self.misses, | |
| 'hit_rate': self.hit_rate | |
| } | |
| def lru_cached(cache: LRUCache, key_func: Callable[..., str] = None): | |
| """ | |
| Decorator to cache function results using LRUCache. | |
| Example: | |
| cache = LRUCache(1000) | |
| @lru_cached(cache, key_func=lambda q, **kw: f"search:{q}") | |
| def search(query, limit=100): | |
| return expensive_search(query, limit) | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| if key_func: | |
| key = key_func(*args, **kwargs) | |
| else: | |
| key = f"{func.__name__}:{args}:{kwargs}" | |
| result = cache.get(key) | |
| if result is not None: | |
| return result | |
| result = func(*args, **kwargs) | |
| cache.put(key, result) | |
| return result | |
| return wrapper | |
| return decorator | |
| # ============================================ | |
| # GRAPH ALGORITHMS FOR REPLY THREADS | |
| # ============================================ | |
| class ReplyGraph: | |
| """ | |
| Graph structure for message reply relationships. | |
| Supports: | |
| - DFS: Depth-first traversal for finding all descendants | |
| - BFS: Breadth-first traversal for level-order exploration | |
| - Connected components: Find isolated conversation threads | |
| - Topological sort: Order messages by reply chain | |
| Time complexity: O(V + E) for traversals | |
| Space complexity: O(V) for visited set | |
| """ | |
| def __init__(self): | |
| # Adjacency lists | |
| self.children: dict[int, list[int]] = defaultdict(list) # parent -> [children] | |
| self.parents: dict[int, int] = {} # child -> parent | |
| self.nodes: set[int] = set() | |
| def add_edge(self, parent_id: int, child_id: int) -> None: | |
| """Add a reply relationship. O(1).""" | |
| self.children[parent_id].append(child_id) | |
| self.parents[child_id] = parent_id | |
| self.nodes.add(parent_id) | |
| self.nodes.add(child_id) | |
| def add_message(self, message_id: int, reply_to: Optional[int] = None) -> None: | |
| """Add a message, optionally with its reply relationship.""" | |
| self.nodes.add(message_id) | |
| if reply_to is not None: | |
| self.add_edge(reply_to, message_id) | |
| def get_children(self, message_id: int) -> list[int]: | |
| """Get direct replies to a message. O(1).""" | |
| return self.children.get(message_id, []) | |
| def get_parent(self, message_id: int) -> Optional[int]: | |
| """Get the message this is a reply to. O(1).""" | |
| return self.parents.get(message_id) | |
| # ================== | |
| # DFS - Depth First Search | |
| # ================== | |
| def dfs_descendants(self, start_id: int) -> list[int]: | |
| """ | |
| DFS: Get all descendants of a message (entire sub-thread). | |
| Time: O(V + E) | |
| Space: O(V) | |
| Returns messages in DFS order (deep before wide). | |
| """ | |
| result = [] | |
| visited = set() | |
| def dfs(node_id: int) -> None: | |
| if node_id in visited: | |
| return | |
| visited.add(node_id) | |
| result.append(node_id) | |
| for child_id in self.children.get(node_id, []): | |
| dfs(child_id) | |
| dfs(start_id) | |
| return result | |
| def dfs_iterative(self, start_id: int) -> Iterator[int]: | |
| """ | |
| Iterative DFS using explicit stack (avoids recursion limit). | |
| Yields message IDs in DFS order. | |
| """ | |
| stack = [start_id] | |
| visited = set() | |
| while stack: | |
| node_id = stack.pop() | |
| if node_id in visited: | |
| continue | |
| visited.add(node_id) | |
| yield node_id | |
| # Add children in reverse order for correct DFS order | |
| for child_id in reversed(self.children.get(node_id, [])): | |
| if child_id not in visited: | |
| stack.append(child_id) | |
| # ================== | |
| # BFS - Breadth First Search | |
| # ================== | |
| def bfs_descendants(self, start_id: int) -> list[int]: | |
| """ | |
| BFS: Get all descendants level by level. | |
| Time: O(V + E) | |
| Space: O(V) | |
| Returns messages in BFS order (level by level). | |
| """ | |
| result = [] | |
| visited = set() | |
| queue = deque([start_id]) | |
| while queue: | |
| node_id = queue.popleft() | |
| if node_id in visited: | |
| continue | |
| visited.add(node_id) | |
| result.append(node_id) | |
| for child_id in self.children.get(node_id, []): | |
| if child_id not in visited: | |
| queue.append(child_id) | |
| return result | |
| def bfs_with_depth(self, start_id: int) -> list[tuple[int, int]]: | |
| """ | |
| BFS with depth information. | |
| Returns list of (message_id, depth) tuples. | |
| """ | |
| result = [] | |
| visited = set() | |
| queue = deque([(start_id, 0)]) | |
| while queue: | |
| node_id, depth = queue.popleft() | |
| if node_id in visited: | |
| continue | |
| visited.add(node_id) | |
| result.append((node_id, depth)) | |
| for child_id in self.children.get(node_id, []): | |
| if child_id not in visited: | |
| queue.append((child_id, depth + 1)) | |
| return result | |
| # ================== | |
| # THREAD RECONSTRUCTION | |
| # ================== | |
| def get_thread_root(self, message_id: int) -> int: | |
| """ | |
| Find the root message of a thread. O(d) where d is depth. | |
| """ | |
| current = message_id | |
| while current in self.parents: | |
| current = self.parents[current] | |
| return current | |
| def get_full_thread(self, message_id: int) -> list[int]: | |
| """ | |
| Get the complete thread containing a message. | |
| 1. Find root via parent traversal | |
| 2. BFS from root to get all descendants | |
| """ | |
| root = self.get_thread_root(message_id) | |
| return self.bfs_descendants(root) | |
| def get_ancestors(self, message_id: int) -> list[int]: | |
| """ | |
| Get all ancestors (path to root). O(d). | |
| Returns in order from message to root. | |
| """ | |
| ancestors = [] | |
| current = message_id | |
| while current in self.parents: | |
| parent = self.parents[current] | |
| ancestors.append(parent) | |
| current = parent | |
| return ancestors | |
| def get_thread_path(self, message_id: int) -> list[int]: | |
| """ | |
| Get path from root to message. O(d). | |
| """ | |
| path = [message_id] | |
| current = message_id | |
| while current in self.parents: | |
| parent = self.parents[current] | |
| path.append(parent) | |
| current = parent | |
| return list(reversed(path)) | |
| # ================== | |
| # CONNECTED COMPONENTS | |
| # ================== | |
| def find_connected_components(self) -> list[set[int]]: | |
| """ | |
| Find all isolated conversation threads. | |
| Time: O(V + E) | |
| Returns list of sets, each set is a connected thread. | |
| """ | |
| visited = set() | |
| components = [] | |
| for node in self.nodes: | |
| if node not in visited: | |
| component = set() | |
| # Use BFS to find all connected nodes | |
| queue = deque([node]) | |
| while queue: | |
| current = queue.popleft() | |
| if current in visited: | |
| continue | |
| visited.add(current) | |
| component.add(current) | |
| # Add parent | |
| if current in self.parents: | |
| parent = self.parents[current] | |
| if parent not in visited: | |
| queue.append(parent) | |
| # Add children | |
| for child in self.children.get(current, []): | |
| if child not in visited: | |
| queue.append(child) | |
| components.append(component) | |
| return components | |
| def get_thread_roots(self) -> list[int]: | |
| """Get all thread root messages (messages with no parent).""" | |
| return [node for node in self.nodes if node not in self.parents] | |
| # ================== | |
| # STATISTICS | |
| # ================== | |
| def get_thread_depth(self, root_id: int) -> int: | |
| """Get maximum depth of a thread from root.""" | |
| max_depth = 0 | |
| for _, depth in self.bfs_with_depth(root_id): | |
| max_depth = max(max_depth, depth) | |
| return max_depth | |
| def get_subtree_size(self, message_id: int) -> int: | |
| """Get number of messages in subtree including root.""" | |
| return len(self.dfs_descendants(message_id)) | |
| def stats(self) -> dict: | |
| """Get graph statistics.""" | |
| return { | |
| 'total_nodes': len(self.nodes), | |
| 'total_edges': sum(len(children) for children in self.children.values()), | |
| 'root_messages': len(self.get_thread_roots()), | |
| 'connected_components': len(self.find_connected_components()) | |
| } | |
| # ============================================ | |
| # TRIGRAM SIMILARITY | |
| # ============================================ | |
| def generate_trigrams(text: str) -> set[str]: | |
| """ | |
| Generate trigrams (3-character subsequences) for fuzzy matching. | |
| Example: "hello" -> {"hel", "ell", "llo"} | |
| """ | |
| text = text.lower().strip() | |
| if len(text) < 3: | |
| return {text} if text else set() | |
| return {text[i:i+3] for i in range(len(text) - 2)} | |
| def trigram_similarity(text1: str, text2: str) -> float: | |
| """ | |
| Calculate Jaccard similarity between trigram sets. | |
| Returns value between 0 (no similarity) and 1 (identical). | |
| """ | |
| tri1 = generate_trigrams(text1) | |
| tri2 = generate_trigrams(text2) | |
| if not tri1 or not tri2: | |
| return 0.0 | |
| intersection = len(tri1 & tri2) | |
| union = len(tri1 | tri2) | |
| return intersection / union if union > 0 else 0.0 | |
| class TrigramIndex: | |
| """ | |
| Inverted index of trigrams for fuzzy search. | |
| Time complexity: | |
| - Insert: O(k) where k is text length | |
| - Search: O(t * m) where t is trigrams in query, m is avg matches | |
| Example: | |
| index = TrigramIndex() | |
| index.add(1, "ืฉืืื ืขืืื") | |
| index.add(2, "ืฉืืื ืืืืื") | |
| results = index.search("ืฉืืื", threshold=0.3) | |
| """ | |
| def __init__(self): | |
| self.index: dict[str, set[int]] = defaultdict(set) | |
| self.texts: dict[int, str] = {} | |
| def add(self, doc_id: int, text: str) -> None: | |
| """Add a document to the index.""" | |
| self.texts[doc_id] = text | |
| for trigram in generate_trigrams(text): | |
| self.index[trigram].add(doc_id) | |
| def search(self, query: str, threshold: float = 0.3, limit: int = 100) -> list[tuple[int, float]]: | |
| """ | |
| Search for documents similar to query. | |
| Returns list of (doc_id, similarity) tuples, sorted by similarity. | |
| """ | |
| query_trigrams = generate_trigrams(query) | |
| if not query_trigrams: | |
| return [] | |
| # Find candidate documents | |
| candidates: dict[int, int] = defaultdict(int) | |
| for trigram in query_trigrams: | |
| for doc_id in self.index.get(trigram, []): | |
| candidates[doc_id] += 1 | |
| # Calculate similarity for candidates | |
| results = [] | |
| query_len = len(query_trigrams) | |
| for doc_id, match_count in candidates.items(): | |
| doc_trigrams = generate_trigrams(self.texts[doc_id]) | |
| doc_len = len(doc_trigrams) | |
| # Jaccard similarity approximation | |
| similarity = match_count / (query_len + doc_len - match_count) | |
| if similarity >= threshold: | |
| results.append((doc_id, similarity)) | |
| # Sort by similarity descending | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| return results[:limit] | |
| def __len__(self) -> int: | |
| return len(self.texts) | |
| # ============================================ | |
| # INVERTED INDEX | |
| # ============================================ | |
| class InvertedIndex: | |
| """ | |
| Simple inverted index for fast word-to-document lookup. | |
| Time complexity: | |
| - Insert: O(w) where w is word count | |
| - Search: O(1) for single word | |
| - AND/OR queries: O(min(n1, n2)) for set operations | |
| """ | |
| def __init__(self): | |
| self.index: dict[str, set[int]] = defaultdict(set) | |
| self.doc_count = 0 | |
| def add(self, doc_id: int, text: str) -> None: | |
| """Add document to index.""" | |
| words = self._tokenize(text) | |
| for word in words: | |
| self.index[word].add(doc_id) | |
| self.doc_count += 1 | |
| def _tokenize(self, text: str) -> list[str]: | |
| """Simple tokenization.""" | |
| import re | |
| return re.findall(r'[\u0590-\u05FFa-zA-Z]+', text.lower()) | |
| def search(self, word: str) -> set[int]: | |
| """Find all documents containing word.""" | |
| return self.index.get(word.lower(), set()) | |
| def search_and(self, words: list[str]) -> set[int]: | |
| """Find documents containing ALL words.""" | |
| if not words: | |
| return set() | |
| result = self.search(words[0]) | |
| for word in words[1:]: | |
| result &= self.search(word) | |
| return result | |
| def search_or(self, words: list[str]) -> set[int]: | |
| """Find documents containing ANY word.""" | |
| result = set() | |
| for word in words: | |
| result |= self.search(word) | |
| return result | |
| if __name__ == '__main__': | |
| # Demo | |
| print("=== Bloom Filter Demo ===") | |
| bf = BloomFilter(expected_items=1000, fp_rate=0.01) | |
| bf.add("message_1") | |
| bf.add("message_2") | |
| print(f"message_1 in filter: {'message_1' in bf}") | |
| print(f"message_999 in filter: {'message_999' in bf}") | |
| print(f"Memory usage: {bf.memory_usage} bytes") | |
| print("\n=== Trie Demo ===") | |
| trie = Trie() | |
| trie.insert("@username1", data=1) | |
| trie.insert("@username2", data=2) | |
| trie.insert("@user_test", data=3) | |
| print(f"Autocomplete '@user': {trie.autocomplete('@user')}") | |
| print("\n=== Reply Graph Demo ===") | |
| graph = ReplyGraph() | |
| graph.add_message(1) | |
| graph.add_message(2, reply_to=1) | |
| graph.add_message(3, reply_to=1) | |
| graph.add_message(4, reply_to=2) | |
| graph.add_message(5, reply_to=2) | |
| print(f"DFS from 1: {graph.dfs_descendants(1)}") | |
| print(f"BFS from 1: {graph.bfs_descendants(1)}") | |
| print(f"Thread path for 4: {graph.get_thread_path(4)}") | |
| print(f"Stats: {graph.stats}") | |