CRAYON-tokenizer / src /crayon /concurrency /thread_local.py
Phase-Technologies's picture
Upload folder using huggingface_hub
708f4a3 verified
import threading
from typing import List, Optional
from ..core.vocabulary import CrayonVocab
from ..memory.cache import LockFreeVocabCache
class ThreadLocalTokenizer:
"""
Thread-Local tokenization state to minimize cross-thread coordination.
Maintains separate caches and buffers for each thread to avoid
LOCK contention and False Sharing[cite: 639].
"""
def __init__(self, global_vocab: CrayonVocab):
self.global_vocab = global_vocab
self._local = threading.local()
@property
def local_state(self):
"""Lazy initialization of thread-local resources[cite: 647]."""
if not hasattr(self._local, 'initialized'):
# L1 Cache specific to this thread (2048 entries)
self._local.cache = LockFreeVocabCache(capacity=2048)
# Reusable buffer to prevent allocation churn
self._local.temp_buffer = bytearray(65536)
self._local.result_buffer = []
self._local.initialized = True
return self._local
def tokenize_thread_safe(self, text: str) -> List[int]:
"""
Thread-safe tokenization with minimal synchronization overhead.
Strategy:
1. Try thread-local L1 cache.
2. Fallback to global vocabulary (which releases GIL in C-ext).
"""
state = self.local_state
cache = state.cache
result = state.result_buffer
result.clear()
position = 0
text_len = len(text)
while position < text_len:
# Check cache for common tokens first (Optimistic read)
# Note: A real implementation might cache substrings at 'position'
# Here we simplify to illustrate the pattern
# Fallback to global with GIL release (simulated here via method call)
# In C-extension, this call releases the GIL [cite: 590]
token_id, match_len = self.global_vocab.longest_match(text, position)
if match_len > 0:
result.append(token_id)
# Update local cache for next time
# cache.put(substring, token_id)
position += match_len
else:
result.append(self.global_vocab.unk_token_id)
position += 1
# Return a copy, keeping the buffer for next run
return list(result)