Spaces:
Running
Running
| """ | |
| Orchestration service for the complete prompt compression pipeline. | |
| This module coordinates all compression stages: | |
| 1. Prompt shielding (entity protection, restriction extraction) | |
| 2. Semantic segmentation and embedding generation | |
| 3. Clustering-based sentence compression | |
| 4. Reconstruction with token optimization | |
| 5. Safety validation and intent preservation | |
| Mathematical Foundations | |
| ------------------------ | |
| 1. Pipeline Composition: | |
| Given input prompt P and configuration C: | |
| Output = Safety(Reconstruct(Compress(Segment(Shield(P, C))))) | |
| Each stage is a function fᵢ: Xᵢ → Xᵢ₊₁ with well-defined contracts. | |
| 2. Batch Processing Parallelism: | |
| For batch of N prompts with M workers: | |
| Time ≈ O(max(⌈N/M⌉ · T_stage)) where T_stage = slowest stage latency | |
| Amdahl's Law: Speedup ≤ 1 / (S + P/N) where S=serial fraction, P=parallel | |
| 3. Embedding Cache Efficiency: | |
| Hit rate H = |cached| / |total| ∈ [0, 1] | |
| Expected latency: E[T] = H·T_cache + (1-H)·T_compute | |
| Where T_cache ≈ O(1) lookup, T_compute = embedding inference time | |
| 4. Aggressiveness Adaptation: | |
| effective_agg = base_agg × (1 + α·nli_active) where α = 0.3 | |
| Higher NLI confidence allows more aggressive compression safely. | |
| References | |
| ---------- | |
| [1] Amdahl, G. M. (1967). Validity of the single processor approach to | |
| achieving large scale computing capabilities. AFIPS Conference. | |
| [2] Reimers, N., & Gurevych, I. (2019). Sentence-BERT: Sentence embeddings | |
| using Siamese BERT-networks. EMNLP-IJCNLP 2019. | |
| https://github.com/UKPLab/sentence-transformers | |
| [3] Sennrich, R., Haddow, B., & Birch, A. (2016). Neural Machine Translation | |
| of Rare Words with Subword Units. ACL 2016. | |
| https://github.com/openai/tiktoken | |
| Performance Characteristics | |
| --------------------------- | |
| - compress_batch(): O(N · T_pipeline / M) with M workers, N prompts | |
| - Typical per-prompt latency: 100-500ms (CPU), 50-200ms (GPU) | |
| - Memory: O(B · d) for batch embeddings, B=batch size, d=embedding_dim | |
| - Cache hit rate: 60-90% typical for repetitive prompts | |
| Author: IntelliDeep Labs Team | |
| License: BSL 1.1 | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| # Import core components from sibling modules | |
| from nlproxy.core.restriction import Restriction | |
| from nlproxy.core.shield import PromptShield, ShieldResult, DomainMode | |
| from nlproxy.core.segmenter import SemanticSegmenter | |
| from nlproxy.core.compressor import SemanticCompressor | |
| from nlproxy.core.reconstructor import PromptReconstructor, ReconstructionResult | |
| from nlproxy.core.safety import SafetyChecker, SafetyReport | |
| from nlproxy.cache.semantic_cache import SemanticLLMCache | |
| from nlproxy.utils.constants import AGGRESSIVENESS_MAP | |
| # Optional Redis import for distributed caching | |
| try: | |
| from redis import Redis | |
| _REDIS_AVAILABLE = True | |
| except ImportError: | |
| _REDIS_AVAILABLE = False | |
| Redis = None # type: ignore | |
| logger = logging.getLogger(__name__) | |
| class CompressionService: | |
| """ | |
| Orchestrates the complete prompt compression pipeline. | |
| This service coordinates all stages of prompt processing: | |
| 1. Shielding: Protect entities, extract restrictions | |
| 2. Segmentation: Split text into sentences, generate embeddings | |
| 3. Compression: Cluster similar sentences, select representatives | |
| 4. Reconstruction: Re-inject entities, optimize tokens, compute metrics | |
| 5. Safety: Validate intent preservation, enforce constraints | |
| Key Features | |
| ------------ | |
| - Batch processing with thread pool parallelism | |
| - Multi-level caching: shield results, embeddings, semantic responses | |
| - Adaptive aggressiveness based on domain mode and NLI confidence | |
| - Async support for non-blocking operation in high-concurrency servers | |
| - Configurable privacy mode for PII handling | |
| Pipeline Architecture | |
| --------------------- | |
| Input: List[str] prompts + configuration | |
| ↓ | |
| [Shield] → Protect entities, extract restrictions | |
| ↓ | |
| [Segment] → Split sentences, compute embeddings (cached) | |
| ↓ | |
| [Compress] → Cluster sentences, select centroids | |
| ↓ | |
| [Reconstruct] → Re-inject entities, compute token metrics | |
| ↓ | |
| [Safety] → Validate constraints, re-insert if needed | |
| ↓ | |
| Output: List[Dict] with compressed text and metrics | |
| Usage Example | |
| ------------- | |
| >>> service = CompressionService( | |
| ... use_cache=True, | |
| ... redis_url="redis://localhost:6379", | |
| ... privacy_mode=True | |
| ... ) | |
| >>> results = service.compress_batch( | |
| ... texts=["Hello world", "Another prompt"], | |
| ... mode="code", | |
| ... aggressiveness=0.3 | |
| ... ) | |
| >>> for r in results: | |
| ... print(f"Saved {r['tokens_saved']} tokens") | |
| """ | |
| # Aggressiveness presets are centralized in shared constants for consistent tuning. | |
| # Configuration defaults | |
| _DEFAULT_MODEL_NAME: str = "all-MiniLM-L6-v2" | |
| _DEFAULT_EMBEDDING_DIM: int = 384 | |
| _DEFAULT_BATCH_SIZE: int = 128 | |
| _DEFAULT_MAX_SEQ_LENGTH: int = 256 | |
| _DEFAULT_BASE_AGGRESSIVENESS: float = 0.2 | |
| _DEFAULT_NLI_ADAPTATION_FACTOR: float = 0.3 | |
| _DEFAULT_THREAD_POOL_WORKERS: int = 8 | |
| def __init__( | |
| self, | |
| use_cache: bool = True, | |
| device: Optional[str] = None, | |
| redis_url: Optional[str] = None, | |
| nli_refinement_fn: Optional = None, | |
| privacy_mode: bool = False, | |
| models_dir: Optional[Path] = None, | |
| llm_default_model: Optional[str] = None, | |
| thread_pool_workers: Optional[int] = None, | |
| ) -> None: | |
| """ | |
| Initialize the CompressionService with all pipeline components. | |
| Parameters | |
| ---------- | |
| use_cache : bool, optional | |
| Enable in-memory caching for shield results and embeddings. | |
| device : Optional[str], optional | |
| Device for embedding model ("cuda", "cpu", or None for auto-detect). | |
| redis_url : Optional[str], optional | |
| Redis connection URL for distributed semantic caching. | |
| If None, semantic cache is disabled. | |
| nli_refinement_fn : Optional, optional | |
| NLI inference function for restriction refinement. | |
| Typically obtained from PostLLMVerifier.get_nli_check_function(). | |
| privacy_mode : bool, optional | |
| Enable strict PII handling: never expose protected entities. | |
| models_dir : Optional[Path], optional | |
| Directory containing pre-downloaded models (default: "models"). | |
| Required for embedding and NLI models. | |
| Raises | |
| ------ | |
| ImportError | |
| If redis_url is provided but redis-py is not installed. | |
| FileNotFoundError | |
| If required models are not found in models_dir. | |
| Complexity | |
| ---------- | |
| Time: O(1) initialization + O(T_load) for model loading | |
| Space: O(C) for cache storage if enabled, C = cache capacity | |
| """ | |
| # Validate Redis availability if requested | |
| if redis_url and not _REDIS_AVAILABLE: | |
| raise ImportError( | |
| "Redis URL provided but redis-py not installed. " | |
| "Install with: pip install redis" | |
| ) | |
| # Resolve models directory | |
| self.models_dir = models_dir or Path("nlproxy") / "models" | |
| # Thread pool sizing can be overridden via environment variable | |
| if thread_pool_workers is not None: | |
| self.thread_pool_workers = thread_pool_workers | |
| else: | |
| env_workers = os.getenv("NLPROXY_COMPRESSION_WORKERS", "") | |
| try: | |
| self.thread_pool_workers = int(env_workers) | |
| except ValueError: | |
| self.thread_pool_workers = self._DEFAULT_THREAD_POOL_WORKERS | |
| if self.thread_pool_workers <= 0: | |
| self.thread_pool_workers = self._DEFAULT_THREAD_POOL_WORKERS | |
| self.executor = ThreadPoolExecutor(max_workers=self.thread_pool_workers) | |
| self.privacy_mode = privacy_mode | |
| # Initialize pipeline components | |
| self.shield = PromptShield(mode=DomainMode.GENERAL) | |
| self.segmenter = SemanticSegmenter( | |
| model_name=self._DEFAULT_MODEL_NAME, | |
| device=device, | |
| batch_size=self._DEFAULT_BATCH_SIZE, | |
| max_seq_length=self._DEFAULT_MAX_SEQ_LENGTH, | |
| models_dir=self.models_dir, | |
| ) | |
| self.compressor = SemanticCompressor( | |
| aggressiveness=self._DEFAULT_BASE_AGGRESSIVENESS | |
| ) | |
| # Use provided default LLM model for token counting/pricing when available | |
| model_for_reconstructor = llm_default_model or "gpt-4" | |
| self.reconstructor = PromptReconstructor(model_name=model_for_reconstructor) | |
| self.safety = SafetyChecker( | |
| mode="general", | |
| models_dir=self.models_dir, | |
| ) | |
| # Caching configuration | |
| self.use_emb_cache = use_cache | |
| self.emb_cache: Optional[Dict[str, bytes]] = {} if use_cache else None | |
| self.shield_cache: Optional[Dict[str, str]] = {} if use_cache else None | |
| # Redis client for distributed caching (optional) | |
| self.redis: Optional[Redis] = None | |
| if redis_url and _REDIS_AVAILABLE: | |
| self.redis = Redis.from_url(redis_url, decode_responses=True) | |
| try: | |
| self.redis.ping() # Test connection | |
| logger.info(f"Connected to Redis at {redis_url}") | |
| except Exception as e: | |
| logger.warning(f"Redis connection failed: {e}") | |
| self.redis = None | |
| # Semantic cache for response deduplication (optional) | |
| if redis_url and self.redis: | |
| self.semantic_cache = SemanticLLMCache( | |
| redis_url=redis_url, | |
| dimension=self._DEFAULT_EMBEDDING_DIM, | |
| #models_dir=self.models_dir, | |
| ) | |
| logger.info("Semantic cache initialized") | |
| else: | |
| self.semantic_cache = None | |
| logger.debug("Semantic cache disabled (Redis not configured)") | |
| # NLI refinement function for semantic restriction validation | |
| self.nli_refinement_fn = nli_refinement_fn | |
| # Privacy mode: controls entity re-injection behavior | |
| self.privacy_mode = privacy_mode | |
| # Optional post-processing components (set externally) | |
| self.post_verifier:Optional = None | |
| self.response_corrector:Optional = None | |
| logger.info( | |
| f"CompressionService initialized: cache={use_cache}, " | |
| f"redis={redis_url is not None}, privacy={privacy_mode}" | |
| ) | |
| def _hash_text(self, text: str) -> str: | |
| """ | |
| Compute SHA-256 hash of text for cache key generation. | |
| Parameters | |
| ---------- | |
| text : str | |
| Input text to hash. | |
| Returns | |
| ------- | |
| str | |
| Hexadecimal SHA-256 hash (64 characters). | |
| Note | |
| ---- | |
| SHA-256 provides collision probability < 2⁻¹²⁸ for practical inputs, | |
| sufficient for cache key uniqueness. | |
| """ | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def _cache_shield( | |
| self, | |
| text: str, | |
| shield_result: ShieldResult, | |
| mode: str = "general", | |
| privacy_mode: bool = False, | |
| ) -> None: | |
| """ | |
| Store shield result in cache (Redis or in-memory). | |
| Parameters | |
| ---------- | |
| text : str | |
| Original prompt text (used as cache key). | |
| shield_result : ShieldResult | |
| Result from PromptShield to cache. | |
| mode : str, optional | |
| Domain mode for pattern selection. | |
| privacy_mode : bool, optional | |
| Whether privacy mode is active. | |
| Complexity | |
| ---------- | |
| Time: O(1) for in-memory, O(L) for Redis where L = serialized size | |
| Space: O(L) for cached entry | |
| """ | |
| key = f"shield:{mode}:{privacy_mode}:{self._hash_text(text)}" | |
| data = json.dumps(shield_result.to_cache_dict()) | |
| if self.redis: | |
| # Redis: set with optional TTL (default 1 hour) | |
| self.redis.setex(key, 3600, data) | |
| elif self.shield_cache is not None: | |
| # In-memory fallback | |
| self.shield_cache[key] = data | |
| def _get_cached_shield( | |
| self, | |
| text: str, | |
| mode: str = "general", | |
| privacy_mode: bool = False, | |
| ) -> Optional[ShieldResult]: | |
| """ | |
| Retrieve shield result from cache if available. | |
| Parameters | |
| ---------- | |
| text : str | |
| Original prompt text to look up. | |
| mode : str, optional | |
| Domain mode for pattern selection. | |
| privacy_mode : bool, optional | |
| Whether privacy mode is active. | |
| Returns | |
| ------- | |
| Optional[ShieldResult] | |
| Cached result if found, None otherwise. | |
| Complexity | |
| ---------- | |
| Time: O(1) for in-memory, O(L) for Redis deserialization | |
| Space: O(1) additional | |
| """ | |
| key = f"shield:{mode}:{privacy_mode}:{self._hash_text(text)}" | |
| data: Optional[str] = None | |
| if self.redis: | |
| data = self.redis.get(key) | |
| elif self.shield_cache is not None: | |
| data = self.shield_cache.get(key) | |
| if data: | |
| # Handle bytes from Redis | |
| if isinstance(data, bytes): | |
| data = data.decode("utf-8") | |
| return ShieldResult.from_cache_dict(json.loads(data)) | |
| return None | |
| def _shield_with_cache( | |
| self, | |
| text: str, | |
| manual_restrictions: Optional[List[Restriction]] = None, | |
| mode: str = "general", | |
| privacy_mode: Optional[bool] = None, | |
| ) -> ShieldResult: | |
| """ | |
| Shield prompt with cache lookup fallback. | |
| Parameters | |
| ---------- | |
| text : str | |
| Prompt to shield. | |
| manual_restrictions : Optional[List[Restriction]], optional | |
| Additional restrictions to enforce. | |
| mode : str, optional | |
| Domain mode for pattern selection. | |
| privacy_mode : Optional[bool], optional | |
| Override for PII handling. | |
| Returns | |
| ------- | |
| ShieldResult | |
| Shielding result (from cache or freshly computed). | |
| Complexity | |
| ---------- | |
| Time: O(1) cache hit, O(T_shield) cache miss | |
| where T_shield = shielding pipeline latency (~10-50ms) | |
| """ | |
| effective_privacy = ( | |
| privacy_mode if privacy_mode is not None else self.privacy_mode | |
| ) | |
| # Try cache first | |
| cached = self._get_cached_shield(text, mode, effective_privacy) | |
| if cached: | |
| logger.debug(f"Shield cache hit for text hash {self._hash_text(text)[:16]}...") | |
| return cached | |
| # Compute fresh result | |
| result = self.shield.shield( | |
| text, | |
| manual_restrictions=manual_restrictions, | |
| nli_refinement_fn=self.nli_refinement_fn, | |
| privacy_mode=effective_privacy, | |
| mode_override=mode, | |
| ) | |
| # Store in cache for future requests | |
| self._cache_shield(text, result, mode, effective_privacy) | |
| return result | |
| def _compute_effective_aggressiveness( | |
| self, | |
| aggressiveness: Optional[float], | |
| mode: str, | |
| nli_active: bool, | |
| ) -> float: | |
| """ | |
| Compute effective aggressiveness considering mode and NLI status. | |
| Formula: | |
| effective = (explicit or mode_preset) × (1 + α·nli_active) | |
| where α = 0.3 (adaptation factor) | |
| Parameters | |
| ---------- | |
| aggressiveness : Optional[float] | |
| Explicit aggressiveness value, or None for mode default. | |
| mode : str | |
| Domain mode for preset lookup. | |
| nli_active : bool | |
| Whether NLI-based validation is enabled. | |
| Returns | |
| ------- | |
| float | |
| Effective aggressiveness ∈ [0, 1]. | |
| """ | |
| # Resolve base aggressiveness: explicit > mode preset > default | |
| if aggressiveness is not None: | |
| base_agg = aggressiveness | |
| else: | |
| base_agg = AGGRESSIVENESS_MAP.get(mode, self._DEFAULT_BASE_AGGRESSIVENESS) | |
| # Adapt for NLI: higher confidence allows more aggressive compression | |
| if nli_active: | |
| effective_agg = min(1.0, base_agg * (1 + self._DEFAULT_NLI_ADAPTATION_FACTOR)) | |
| logger.debug(f"NLI adaptation: {base_agg:.2f} → {effective_agg:.2f}") | |
| else: | |
| effective_agg = base_agg | |
| return effective_agg | |
| def _collect_all_sentences( | |
| self, | |
| shield_results: List[ShieldResult], | |
| language: Optional[str], | |
| ) -> tuple[List[str], List[tuple[int, List[str]]]]: | |
| """ | |
| Extract all sentences from shielded texts for batch embedding. | |
| Parameters | |
| ---------- | |
| shield_results : List[ShieldResult] | |
| Results from shielding stage. | |
| language : Optional[str] | |
| Language code for segmentation (auto-detect if None). | |
| Returns | |
| ------- | |
| Tuple[List[str], List[Tuple[int, List[str]]]] | |
| - Flat list of all sentences for embedding | |
| - Mapping: (original_index, sentences_for_that_prompt) | |
| """ | |
| all_sents: List[str] = [] | |
| sent_map: List[tuple[int, List[str]]] = [] | |
| for i, shielded in enumerate(shield_results): | |
| sents = self.segmenter.split_sentences( | |
| shielded.shielded_text, language=language | |
| ) | |
| sent_map.append((i, sents)) | |
| all_sents.extend(sents) | |
| return all_sents, sent_map | |
| def _encode_with_cache( | |
| self, | |
| sentences: List[str], | |
| ) -> Dict[int, np.ndarray]: | |
| """ | |
| Generate embeddings with in-memory caching. | |
| Parameters | |
| ---------- | |
| sentences : List[str] | |
| Sentences to encode. | |
| Returns | |
| ------- | |
| Dict[int, np.ndarray] | |
| Mapping: sentence_index → embedding vector. | |
| Complexity | |
| ---------- | |
| Time: O(U · T_encode + C · T_lookup) | |
| where U = uncached count, C = cached count | |
| Space: O(U · d) for new embeddings, d = embedding_dim | |
| """ | |
| sent_emb: Dict[int, np.ndarray] = {} | |
| if not self.use_emb_cache or self.emb_cache is None: | |
| # No caching: encode all at once | |
| all_embs = self.segmenter.encode_batch(sentences) | |
| return {i: emb for i, emb in enumerate(all_embs)} | |
| # Separate cached vs uncached sentences | |
| uncached: List[str] = [] | |
| uncached_idx: List[int] = [] | |
| for idx, sent in enumerate(sentences): | |
| key = self._hash_text(sent) | |
| cached_bytes = self.emb_cache.get(key) | |
| if cached_bytes is not None: | |
| # Load from cache | |
| sent_emb[idx] = np.frombuffer(cached_bytes, dtype=np.float32) | |
| else: | |
| uncached.append(sent) | |
| uncached_idx.append(idx) | |
| # Encode uncached sentences | |
| if uncached: | |
| new_embs = self.segmenter.encode_batch(uncached) | |
| for i, idx in enumerate(uncached_idx): | |
| emb = new_embs[i] | |
| sent_emb[idx] = emb | |
| # Store in cache as bytes for memory efficiency | |
| cache_key = self._hash_text(uncached[i]) | |
| self.emb_cache[cache_key] = emb.astype(np.float32).tobytes() | |
| return sent_emb | |
| def _regroup_embeddings( | |
| self, | |
| sent_emb: Dict[int, np.ndarray], | |
| sent_map: List[tuple[int, List[str]]], | |
| ) -> tuple[List[np.ndarray], List[List[str]]]: | |
| """ | |
| Regroup flat embeddings back to per-prompt structure. | |
| Parameters | |
| ---------- | |
| sent_emb : Dict[int, np.ndarray] | |
| Flat mapping: global_sentence_index → embedding. | |
| sent_map : List[Tuple[int, List[str]]] | |
| Mapping: (prompt_index, sentences_for_prompt). | |
| Returns | |
| ------- | |
| Tuple[List[np.ndarray], List[List[str]]] | |
| - List of embedding arrays (one per prompt) | |
| - List of sentence lists (one per prompt) | |
| """ | |
| emb_per_text: List[np.ndarray] = [] | |
| sents_per_text: List[List[str]] = [] | |
| offset = 0 | |
| for prompt_idx, sents in sent_map: | |
| count = len(sents) | |
| # Gather embeddings for this prompt's sentences | |
| emb = np.array( | |
| [sent_emb[offset + j] for j in range(count)] | |
| ) | |
| emb_per_text.append(emb) | |
| sents_per_text.append(sents) | |
| offset += count | |
| return emb_per_text, sents_per_text | |
| def _process_single( | |
| self, | |
| original: str, | |
| shielded: ShieldResult, | |
| sentences: List[str], | |
| embeddings: np.ndarray, | |
| aggressiveness: float, | |
| privacy_mode: bool, | |
| mode: str, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a single prompt through compression pipeline. | |
| This method executes stages 3-5 of the pipeline for one prompt: | |
| 3. Compression: cluster sentences, select representatives | |
| 4. Reconstruction: re-inject entities, compute metrics | |
| 5. Safety: validate constraints, re-insert if needed | |
| Parameters | |
| ---------- | |
| original : str | |
| Original prompt text (for token comparison). | |
| shielded : ShieldResult | |
| Result from shielding stage. | |
| sentences : List[str] | |
| Segmented sentences from shielded text. | |
| embeddings : np.ndarray | |
| Pre-computed embeddings for sentences. | |
| aggressiveness : float | |
| Compression intensity ∈ [0, 1]. | |
| privacy_mode : bool | |
| Whether to suppress entity re-injection. | |
| mode : str | |
| Domain mode for safety validation. | |
| Returns | |
| ------- | |
| Dict[str, Any] | |
| Dictionary with compressed text and metrics: | |
| - compressed_text: final output ready for LLM | |
| - original_tokens, compressed_tokens, tokens_saved | |
| - compression_ratio, cost_saved_usd | |
| - safety_score, alerts | |
| """ | |
| # Stage 3: Semantic compression | |
| comp_sents, comp_stats = self.compressor.compress( | |
| sentences, | |
| embeddings, | |
| aggressiveness=aggressiveness, | |
| mode=mode, | |
| ) | |
| comp_indices = comp_stats.get( | |
| "compressed_indices", list(range(len(comp_sents))) | |
| ) | |
| # Stage 4: Reconstruction with token metrics | |
| recon: ReconstructionResult = self.reconstructor.reconstruct( | |
| original_prompt=original, | |
| compressed_sentences=comp_sents, | |
| shield_result=shielded, | |
| apply_stopwords=False, # Stopwords handled in safety stage | |
| compressed_indices=comp_indices, | |
| privacy_mode=privacy_mode, | |
| ) | |
| # Stage 5: Safety validation (may re-insert sentences) | |
| report: SafetyReport = self.safety.validate( | |
| original_text=original, | |
| compressed_text=recon.compressed_text, | |
| shield_result=shielded, | |
| original_sentences=sentences, | |
| compressed_indices=recon.compressed_indices, | |
| mode=mode, | |
| ) | |
| # Start with safety-validated text | |
| final = report.final_text | |
| # Re-inject protected entities unless privacy mode is enabled | |
| if not privacy_mode: | |
| final = self.reconstructor._reinject_entities( | |
| final, shielded.placeholder_map | |
| ) | |
| # Compute final token metrics | |
| final_tokens = len(self.reconstructor.tokenizer.encode(final)) | |
| saved = recon.original_tokens - final_tokens | |
| # Deduplicate consecutive identical lines (post-processing) | |
| lines = final.splitlines() | |
| unique_lines: List[str] = [] | |
| seen: set = set() | |
| for line in lines: | |
| stripped = line.strip() | |
| if stripped and stripped not in seen: | |
| unique_lines.append(stripped) | |
| seen.add(stripped) | |
| final = "\n".join(unique_lines) if unique_lines else final | |
| # Assemble result dictionary | |
| return { | |
| "compressed_text": final, | |
| "original_tokens": recon.original_tokens, | |
| "compressed_tokens": final_tokens, | |
| "tokens_saved": saved, | |
| "compression_ratio": ( | |
| saved / recon.original_tokens if recon.original_tokens > 0 else 0.0 | |
| ), | |
| "cost_saved_usd": saved * self.reconstructor.pricing["input"], | |
| "safety_score": report.safety_score, | |
| "alerts": recon.alerts + report.alerts, | |
| # Optional metadata for observability | |
| "_comp_stats": comp_stats, | |
| "_safety_report": { | |
| "missing_intents": report.missing_intents, | |
| "forced_added": report.forced_sentences_added, | |
| "perplexity": report.perplexity, | |
| }, | |
| } | |
| def compress_batch( | |
| self, | |
| texts: List[str], | |
| aggressiveness: Optional[float] = None, | |
| mode: str = "general", | |
| nli_active: bool = False, | |
| language: Optional[str] = None, | |
| privacy_mode: Optional[bool] = None, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Compress a batch of prompts through the full pipeline. | |
| This method orchestrates parallel execution of all pipeline stages | |
| for multiple prompts, maximizing throughput via thread pool. | |
| Parameters | |
| ---------- | |
| texts : List[str] | |
| List of prompts to compress. | |
| aggressiveness : Optional[float], optional | |
| Compression intensity ∈ [0, 1]. If None, uses mode preset. | |
| mode : str, optional | |
| Domain mode: "legal", "finance", "code", or "general". | |
| nli_active : bool, optional | |
| Whether NLI-based validation is enabled (affects aggressiveness). | |
| language : Optional[str], optional | |
| Language code for sentence segmentation (auto-detect if None). | |
| privacy_mode : Optional[bool], optional | |
| Override instance default for PII handling. | |
| Returns | |
| ------- | |
| List[Dict[str, Any]] | |
| List of result dictionaries, one per input prompt. | |
| Each contains compressed text and metrics (see _process_single). | |
| Pipeline Execution | |
| ------------------ | |
| Stage 1: Shielding (parallel) | |
| - Check cache first, compute fresh if miss | |
| - Extract entities, restrictions, placeholders | |
| Stage 2: Segmentation + Embedding (batched) | |
| - Collect all sentences across prompts | |
| - Encode with caching to avoid redundant computation | |
| - Regroup embeddings by original prompt | |
| Stage 3-5: Compression + Reconstruction + Safety (parallel) | |
| - Process each prompt independently in thread pool | |
| - Apply domain-specific aggressiveness and constraints | |
| Complexity | |
| ---------- | |
| Time: O(N · T_pipeline / M) amortized | |
| where N = prompt count, M = worker count, | |
| T_pipeline = latency for single prompt through all stages | |
| Space: O(N · L · d) for embeddings, L = avg sentences/prompt, | |
| d = embedding dimension (384 for MiniLM) | |
| Example | |
| ------- | |
| >>> service = CompressionService(use_cache=True) | |
| >>> results = service.compress_batch( | |
| ... texts=["Prompt 1", "Prompt 2"], | |
| ... mode="code", | |
| ... aggressiveness=0.3 | |
| ... ) | |
| >>> for i, r in enumerate(results): | |
| ... print(f"Prompt {i}: saved {r['tokens_saved']} tokens") | |
| """ | |
| if not texts: | |
| return [] | |
| # Resolve effective aggressiveness | |
| effective_agg = self._compute_effective_aggressiveness( | |
| aggressiveness, mode, nli_active | |
| ) | |
| logger.debug(f"Effective aggressiveness: {effective_agg:.2f} (mode={mode})") | |
| # Use instance privacy_mode if not overridden | |
| effective_privacy = ( | |
| privacy_mode if privacy_mode is not None else self.privacy_mode | |
| ) | |
| start_time = time.time() | |
| # Stage 1: Shielding (parallel with cache) | |
| shield_results: List[ShieldResult] = [None] * len(texts) # type: ignore | |
| future_to_idx = { | |
| self.executor.submit( | |
| self._shield_with_cache, text, None, mode, effective_privacy | |
| ): i | |
| for i, text in enumerate(texts) | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_idx): | |
| idx = future_to_idx[future] | |
| shield_results[idx] = future.result() | |
| # Stage 2: Segmentation + Embedding (batched for efficiency) | |
| all_sents, sent_map = self._collect_all_sentences(shield_results, language) | |
| if not all_sents: | |
| # Edge case: all prompts empty after shielding | |
| return [ | |
| { | |
| "compressed_text": "", | |
| "original_tokens": 0, | |
| "compressed_tokens": 0, | |
| "tokens_saved": 0, | |
| "compression_ratio": 0.0, | |
| "cost_saved_usd": 0.0, | |
| "safety_score": 1.0, | |
| "alerts": ["Empty prompt after shielding"], | |
| } | |
| for _ in texts | |
| ] | |
| sent_emb = self._encode_with_cache(all_sents) | |
| emb_per_text, sents_per_text = self._regroup_embeddings(sent_emb, sent_map) | |
| # Stage 3-5: Compression + Reconstruction + Safety (parallel) | |
| results: List[Dict[str, Any]] = [None] * len(texts) # type: ignore | |
| future_to_idx = { | |
| self.executor.submit( | |
| self._process_single, | |
| texts[i], | |
| shield_results[i], | |
| sents_per_text[i], | |
| emb_per_text[i], | |
| effective_agg, | |
| effective_privacy, | |
| mode, | |
| ): i | |
| for i in range(len(texts)) | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_idx): | |
| idx = future_to_idx[future] | |
| results[idx] = future.result() | |
| elapsed = time.time() - start_time | |
| logger.info( | |
| f"Batch compression complete: {len(texts)} prompts in {elapsed:.2f}s " | |
| f"({elapsed / len(texts) * 1000:.1f}ms/prompt avg)" | |
| ) | |
| return results | |
| async def compress_batch_async( | |
| self, | |
| texts: List[str], | |
| aggressiveness: Optional[float] = None, | |
| mode: str = "general", | |
| nli_active: bool = False, | |
| language: Optional[str] = None, | |
| privacy_mode: Optional[bool] = None, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Async wrapper for compress_batch (non-blocking event loop). | |
| Offloads CPU-bound compression to worker threads via asyncio.to_thread, | |
| preventing event loop starvation in async applications (FastAPI, etc.). | |
| Parameters | |
| ---------- | |
| texts : List[str] | |
| List of prompts to compress. | |
| aggressiveness : Optional[float], optional | |
| Compression intensity ∈ [0, 1]. | |
| mode : str, optional | |
| Domain mode for pattern selection. | |
| nli_active : bool, optional | |
| Whether NLI-based validation is enabled. | |
| language : Optional[str], optional | |
| Language code for segmentation. | |
| privacy_mode : Optional[bool], optional | |
| Override for PII handling. | |
| Returns | |
| ------- | |
| List[Dict[str, Any]] | |
| Same as compress_batch(). | |
| Note | |
| ---- | |
| This does not provide true parallelism; it uses a thread pool | |
| to avoid blocking the async event loop. For true parallelism, | |
| consider multiprocessing or distributed processing. | |
| """ | |
| return await asyncio.to_thread( | |
| self.compress_batch, | |
| texts, | |
| aggressiveness, | |
| mode, | |
| nli_active, | |
| language, | |
| privacy_mode, | |
| ) | |
| def clear_caches(self, semantic_only: bool = False) -> Dict[str, int]: | |
| """ | |
| Clear internal caches to free memory or force refresh. | |
| Parameters | |
| ---------- | |
| semantic_only : bool, optional | |
| If True, only clear semantic cache (Redis). | |
| If False, clear all caches: shield, embeddings, semantic. | |
| Returns | |
| ------- | |
| Dict[str, int] | |
| Count of cleared entries per cache type. | |
| """ | |
| cleared: Dict[str, int] = {} | |
| if not semantic_only: | |
| # Clear in-memory shield cache | |
| if self.shield_cache is not None: | |
| cleared["shield_cache"] = len(self.shield_cache) | |
| self.shield_cache.clear() | |
| # Clear in-memory embedding cache | |
| if self.emb_cache is not None: | |
| cleared["emb_cache"] = len(self.emb_cache) | |
| self.emb_cache.clear() | |
| # Clear semantic cache (Redis) | |
| if self.semantic_cache: | |
| cleared["semantic_cache"] = self.semantic_cache.clear() | |
| logger.info(f"Caches cleared: {cleared}") | |
| return cleared | |
| def get_stats(self) -> Dict[str, Any]: | |
| """ | |
| Return operational statistics for monitoring. | |
| Returns | |
| ------- | |
| Dict[str, Any] | |
| Statistics including: | |
| - cache_sizes: counts for shield/emb caches | |
| - semantic_cache: stats from RedisVL index (if available) | |
| - configuration: current settings for observability | |
| """ | |
| stats: Dict[str, Any] = { | |
| "cache_sizes": { | |
| "shield": len(self.shield_cache) if self.shield_cache else 0, | |
| "embeddings": len(self.emb_cache) if self.emb_cache else 0, | |
| }, | |
| "configuration": { | |
| "use_cache": self.use_emb_cache, | |
| "privacy_mode": self.privacy_mode, | |
| "nli_enabled": self.nli_refinement_fn is not None, | |
| }, | |
| } | |
| if self.semantic_cache: | |
| stats["semantic_cache"] = self.semantic_cache.get_stats() | |
| return stats | |