Spaces:
Running
Running
| """ | |
| Prompt reconstruction and token optimization module. | |
| Reconstructs compressed prompts by re-injecting protected entities, | |
| applying semantic filtering, and computing token/cost metrics. | |
| Mathematical Foundations | |
| ------------------------ | |
| 1. Token Counting (Byte-Pair Encoding): | |
| tokens(T) = |BPE_encode(T, V)| | |
| Reference: Sennrich et al., "Neural Machine Translation of Rare Words", ACL 2016 | |
| https://github.com/openai/tiktoken | |
| 2. Compression Ratio: | |
| R = 1 - (T_compressed / T_original) ∈ [0, 1] | |
| Cost savings: C = (T_original - T_compressed) × price_per_token | |
| 3. Placeholder Substitution Order: | |
| Process placeholders by descending length to avoid partial matches. | |
| If |ph₁| > |ph₂| and ph₂ ⊂ ph₁, replacing ph₂ first corrupts ph₁. | |
| 4. Stopword Filtering: | |
| T_filtered = {w ∈ T : w ∉ S} where S = stopword set | |
| Implemented via regex with word boundaries: O(n) single-pass removal. | |
| References | |
| ---------- | |
| [1] Sennrich, R., Haddow, B., & Birch, A. (2016). Neural Machine Translation | |
| of Rare Words with Subword Units. ACL 2016. | |
| https://arxiv.org/abs/1508.07909 | |
| [2] Manning, C. D., & Schütze, H. (1999). Foundations of Statistical NLP. | |
| MIT Press. Chapter 2: Collocations. | |
| [3] OpenAI. (2024). tiktoken: BPE tokenizer for GPT models. | |
| https://github.com/openai/tiktoken | |
| Performance | |
| ----------- | |
| - _filter_stopwords(): O(n + |S|) via Aho-Corasick regex engine | |
| - _reinject_entities(): O(k·n·m) with k=placeholders, m=avg length | |
| - reconstruct(): O(n + k·n·m + t_tokenize) where t_tokenize = BPE time | |
| - Token counting: ~1M tokens/sec via tiktoken's C++ backend | |
| Author: IntelliDeep Labs Team | |
| License: BSL 1.1 | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional | |
| import tiktoken | |
| from nlproxy.core.shield import ShieldResult | |
| from nlproxy.utils.constants import MODEL_PRICING, SEMANTIC_STOPWORDS | |
| logger = logging.getLogger(__name__) | |
| class ReconstructionResult: | |
| """ | |
| Output container for the reconstruction pipeline. | |
| Attributes | |
| ---------- | |
| compressed_text : str | |
| Final reconstructed prompt ready for LLM consumption. | |
| original_tokens : int | |
| Token count of the original prompt (BPE-encoded). | |
| compressed_tokens : int | |
| Token count after compression + reconstruction. | |
| tokens_saved : int | |
| Absolute reduction: original_tokens - compressed_tokens. | |
| compression_ratio : float | |
| Relative savings: tokens_saved / original_tokens ∈ [-∞, 1]. | |
| cost_saved_usd : float | |
| Estimated USD savings: tokens_saved × price_per_input_token. | |
| safety_score : Optional[float] | |
| Safety validation score from SafetyChecker (0.0 to 1.0). | |
| audit_log : List[Dict] | |
| Step-by-step processing log from upstream ShieldResult. | |
| alerts : List[str] | |
| Warning messages for edge cases (e.g., negative compression). | |
| compressed_indices : List[int] | |
| Original sentence indices retained after compression. | |
| """ | |
| compressed_text: str | |
| original_tokens: int | |
| compressed_tokens: int | |
| tokens_saved: int | |
| compression_ratio: float | |
| cost_saved_usd: float | |
| safety_score: Optional[float] = None | |
| audit_log: List[Dict] = field(default_factory=list) | |
| alerts: List[str] = field(default_factory=list) | |
| compressed_indices: List[int] = field(default_factory=list) | |
| class PromptReconstructor: | |
| """ | |
| Reconstructs compressed prompts with entity re-injection and metrics. | |
| Pipeline: | |
| 1. Join compressed sentences → coherent text | |
| 2. Optional stopword filtering (reduce noise tokens) | |
| 3. Re-inject protected placeholders with original values | |
| 4. Inject constraint instructions (FORBID/MANDATE) | |
| 5. Normalize whitespace/punctuation | |
| 6. Compute token counts and cost savings via tiktoken | |
| Key Design Decisions | |
| -------------------- | |
| - Longest-first placeholder substitution prevents partial matches | |
| - Tolerant regex matching handles LLM output variations (case, underscores) | |
| - Stopword filtering applied BEFORE re-injection to avoid breaking code | |
| - Instruction injection only when stopwords enabled (reduces token overhead) | |
| """ | |
| # Pre-compiled stopword regex (shared across instances) | |
| # Pattern: \b(stopword1|stopword2|...)\b[.,]?\s* | |
| _STOPWORD_PATTERN: re.Pattern = re.compile( | |
| r'\b(?:' + '|'.join(re.escape(w) for w in SEMANTIC_STOPWORDS) + r')\b[.,]?\s*', | |
| flags=re.IGNORECASE | |
| ) | |
| # Line-start cleanup: removes leading punctuation/spaces after stopword removal | |
| _LINE_START_CLEANUP: re.Pattern = re.compile(r'^[ ,.;:!?]+', flags=re.MULTILINE) | |
| # Whitespace normalization patterns | |
| _MULTI_SPACE: re.Pattern = re.compile(r' +') | |
| _MULTI_NEWLINE: re.Pattern = re.compile(r'\n\s*\n+') | |
| def __init__( | |
| self, | |
| model_name: str = "gpt-4", | |
| custom_pricing: Optional[Dict[str, float]] = None, | |
| remove_stopwords: bool = True, | |
| ) -> None: | |
| """ | |
| Initialize the PromptReconstructor. | |
| Parameters | |
| ---------- | |
| model_name : str, optional | |
| Target LLM for token counting and pricing (default: "gpt-4"). | |
| Must be tiktoken-compatible or falls back to cl100k_base. | |
| custom_pricing : Optional[Dict[str, float]], optional | |
| Override pricing: {"input": price_per_token, "output": price_per_token}. | |
| remove_stopwords : bool, optional | |
| Enable semantic stopword filtering (default: True). | |
| Disable for domains where all words carry meaning (e.g., legal). | |
| """ | |
| self.model_name = model_name | |
| self.remove_stopwords = remove_stopwords | |
| # Resolve pricing: custom > model-specific > default | |
| self.pricing = ( | |
| custom_pricing | |
| or MODEL_PRICING.get(model_name) | |
| or MODEL_PRICING.get("default", {"input": 3e-5, "output": 6e-5}) | |
| ) | |
| # Initialize tokenizer with fallback to cl100k_base (GPT-4 encoding) | |
| try: | |
| self.tokenizer = tiktoken.encoding_for_model(model_name) | |
| except KeyError: | |
| self.tokenizer = tiktoken.get_encoding("cl100k_base") | |
| logger.warning(f"Model '{model_name}' not in tiktoken; using cl100k_base") | |
| logger.info(f"PromptReconstructor initialized for '{model_name}'") | |
| def _filter_stopwords(self, text: str) -> str: | |
| """ | |
| Remove semantic stopwords and clean resulting artifacts. | |
| Two-stage cleanup: | |
| 1. Remove stopwords with optional trailing punctuation/whitespace | |
| 2. Clean line-start artifacts (leading commas, periods, spaces) | |
| Complexity: O(n + |S|) via single-pass regex substitution. | |
| """ | |
| # Stage 1: Remove stopwords with trailing punctuation/whitespace | |
| cleaned = self._STOPWORD_PATTERN.sub('', text) | |
| # Stage 2: Clean line-start artifacts from stopword removal | |
| lines = cleaned.splitlines() | |
| cleaned_lines = [self._LINE_START_CLEANUP.sub('', line) for line in lines] | |
| return '\n'.join(cleaned_lines) | |
| def _get_placeholder_pattern(self, placeholder: str) -> re.Pattern: | |
| """ | |
| Build tolerant regex pattern for placeholder matching. | |
| Allows optional underscores and case-insensitive matching | |
| to handle LLM output variations (e.g., "__PROT_abc" vs "_PROT_abc"). | |
| Pattern: re.escape(placeholder).replace(r'\_', r'[_]?') | |
| Compiled with re.IGNORECASE for case-insensitive matching. | |
| """ | |
| escaped = re.escape(placeholder) | |
| # Make underscores optional: _ → [_]? | |
| tolerant_pattern = escaped.replace('_', '[_]?') | |
| return re.compile(tolerant_pattern, flags=re.IGNORECASE) | |
| def _reinject_entities(self, text: str, placeholder_map: Dict[str, str]) -> str: | |
| """ | |
| Re-inject protected entity values by replacing placeholders. | |
| Uses longest-first substitution to avoid partial matches: | |
| - Sort placeholders by length descending | |
| - Replace longer placeholders first to prevent corruption | |
| Example: | |
| placeholder_map = {"__PROT_ab": "A", "__PROT_abc": "B"} | |
| text = "__PROT_abc" | |
| If "__PROT_ab" replaced first: "__PROT_abc" → "Ac" (wrong!) | |
| If "__PROT_abc" replaced first: "__PROT_abc" → "B" (correct ✓) | |
| Complexity: O(k·n·m) where k=placeholders, n=text length, m=avg placeholder length. | |
| """ | |
| # Sort by length descending to avoid partial matches | |
| sorted_placeholders = sorted(placeholder_map.keys(), key=len, reverse=True) | |
| result = text | |
| for placeholder in sorted_placeholders: | |
| value = placeholder_map[placeholder] | |
| pattern = self._get_placeholder_pattern(placeholder) | |
| result = pattern.sub(value, result) | |
| return result | |
| def inject_instructions(self, compressed_text: str, shield_result: ShieldResult) -> str: | |
| """ | |
| Inject critical instructions based on extracted restrictions. | |
| Adds a prefixed instruction block guiding the LLM to respect | |
| FORBID/MANDATE constraints and preserve protected placeholders. | |
| Format: | |
| [CRITICAL INSTRUCTIONS] | |
| Do not use: Python, Java. | |
| Must use: Rust. | |
| Placeholders __PROT_xxx contain protected data; do not modify. | |
| [/CRITICAL INSTRUCTIONS] | |
| Original compressed text... | |
| """ | |
| instructions: List[str] = [] | |
| if shield_result.restrictions: | |
| forbidden = [r.entity for r in shield_result.restrictions if r.type == "FORBID"] | |
| mandated = [r.entity for r in shield_result.restrictions if r.type == "MANDATE"] | |
| if forbidden: | |
| instructions.append(f"Do not use: {', '.join(forbidden)}.") | |
| if mandated: | |
| instructions.append(f"Must use: {', '.join(mandated)}.") | |
| if shield_result.placeholder_map: | |
| instructions.append( | |
| "Placeholders __PROT_xxx contain protected data; do not modify." | |
| ) | |
| if instructions: | |
| instruction_block = ( | |
| "[CRITICAL INSTRUCTIONS]\n" | |
| + "\n".join(instructions) | |
| + "\n[/CRITICAL INSTRUCTIONS]\n\n" | |
| ) | |
| return instruction_block + compressed_text | |
| return compressed_text | |
| def reconstruct( | |
| self, | |
| original_prompt: str, | |
| compressed_sentences: List[str], | |
| shield_result: ShieldResult, | |
| apply_stopwords: Optional[bool] = None, | |
| compressed_indices: Optional[List[int]] = None, | |
| privacy_mode: bool = False, | |
| ) -> ReconstructionResult: | |
| """ | |
| Execute the complete reconstruction pipeline. | |
| Parameters | |
| ---------- | |
| original_prompt : str | |
| Original uncompressed prompt (for token comparison). | |
| compressed_sentences : List[str] | |
| Sentences after semantic compression. | |
| shield_result : ShieldResult | |
| Result from PromptShield with placeholders and restrictions. | |
| apply_stopwords : Optional[bool], optional | |
| Override instance default for stopword filtering. | |
| compressed_indices : Optional[List[int]], optional | |
| Original indices of compressed sentences for traceability. | |
| privacy_mode : bool, optional | |
| If True, suppresses entity re-injection to keep sensitive data masked. | |
| Returns | |
| ------- | |
| ReconstructionResult | |
| Container with reconstructed text, metrics, and audit info. | |
| Pipeline Stages | |
| --------------- | |
| 1. Join compressed sentences with newlines | |
| 2. Optional stopword filtering (reduces token count) | |
| 3. Re-inject protected entities via placeholder_map | |
| 4. Inject constraint instructions (if stopwords enabled) | |
| 5. Whitespace and punctuation normalization | |
| 6. Token metrics and cost calculations | |
| Complexity: O(n + k·n·m + t_tokenize) where t_tokenize = BPE encoding time. | |
| """ | |
| apply_stopwords = apply_stopwords if apply_stopwords is not None else self.remove_stopwords | |
| # Stage 1: Join compressed sentences | |
| compressed_text = "\n".join(compressed_sentences) | |
| # Stage 2: Stopword filtering (BEFORE re-injection to avoid breaking code) | |
| if apply_stopwords: | |
| compressed_text = self._filter_stopwords(compressed_text) | |
| # Stage 3: Re-inject protected entities (unless privacy mode is enabled) | |
| if not privacy_mode: | |
| final_text = self._reinject_entities(compressed_text, shield_result.placeholder_map) | |
| else: | |
| final_text = compressed_text | |
| # Stage 4: Inject instructions (only if stopwords enabled) | |
| if apply_stopwords: | |
| final_text = self.inject_instructions(final_text, shield_result) | |
| # Stage 5: Normalize whitespace and punctuation | |
| final_text = self._MULTI_SPACE.sub(' ', final_text) | |
| final_text = self._LINE_START_CLEANUP.sub('', final_text) | |
| final_text = self._MULTI_NEWLINE.sub('\n', final_text).strip() | |
| # Stage 6: Compute metrics | |
| final_indices = sorted(compressed_indices) if compressed_indices else list(range(len(compressed_sentences))) | |
| orig_tokens = len(self.tokenizer.encode(original_prompt)) | |
| comp_tokens = len(self.tokenizer.encode(final_text)) | |
| saved = orig_tokens - comp_tokens | |
| ratio = saved / orig_tokens if orig_tokens else 0.0 | |
| cost = saved * self.pricing["input"] | |
| alerts = [] | |
| if saved < 0: | |
| alerts.append("Negative compression detected.") | |
| if ratio > 0.9: | |
| alerts.append("Aggressive compression may affect quality.") | |
| return ReconstructionResult( | |
| compressed_text=final_text, | |
| original_tokens=orig_tokens, | |
| compressed_tokens=comp_tokens, | |
| tokens_saved=saved, | |
| compression_ratio=ratio, | |
| cost_saved_usd=cost, | |
| audit_log=shield_result.audit_log.copy(), | |
| alerts=alerts, | |
| compressed_indices=final_indices | |
| ) | |