Spaces:
Sleeping
Sleeping
| """ | |
| Contextual compression module for reducing token usage in RAG. | |
| Contextual compression uses an LLM to extract only the most relevant sentences | |
| from retrieved documents, significantly reducing token count and cost. | |
| """ | |
| import os | |
| from typing import List, Optional | |
| from langchain.schema import Document | |
| from openai import OpenAI | |
| class ContextCompressor: | |
| """ | |
| LLM-based contextual compressor for RAG optimization. | |
| Takes retrieved documents and extracts only the sentences that are | |
| directly relevant to the user's query, reducing tokens by 40-60%. | |
| """ | |
| def __init__( | |
| self, | |
| api_key: Optional[str] = None, | |
| model: str = "gpt-3.5-turbo", | |
| max_tokens: int = 500 | |
| ): | |
| """ | |
| Initialize context compressor. | |
| Args: | |
| api_key: OpenAI API key | |
| model: Model to use for compression | |
| max_tokens: Maximum tokens per compressed chunk | |
| """ | |
| self.api_key = api_key or os.getenv("OPENAI_API_KEY") | |
| self.model = model | |
| self.max_tokens = max_tokens | |
| self.client = None | |
| if self.api_key: | |
| self.client = OpenAI(api_key=self.api_key) | |
| print(f"✅ Context compressor initialized (model: {model})") | |
| else: | |
| print("❌ OPENAI_API_KEY not set. Compression disabled.") | |
| def compress( | |
| self, | |
| query: str, | |
| documents: List[Document] | |
| ) -> List[Document]: | |
| """ | |
| Compress documents by extracting only relevant content. | |
| Args: | |
| query: Original search query | |
| documents: List of documents to compress | |
| Returns: | |
| Compressed documents | |
| """ | |
| if not self.client or not documents: | |
| return documents | |
| compressed_docs = [] | |
| total_original_tokens = 0 | |
| total_compressed_tokens = 0 | |
| for doc in documents: | |
| # Estimate original token count (rough: 1 token ≈ 4 chars) | |
| original_tokens = len(doc.page_content) // 4 | |
| total_original_tokens += original_tokens | |
| # Skip compression for very short documents | |
| if original_tokens < 100: | |
| compressed_docs.append(doc) | |
| total_compressed_tokens += original_tokens | |
| continue | |
| try: | |
| # Compress the document | |
| compressed_content = self._compress_single(query, doc.page_content) | |
| # Create new document with compressed content | |
| compressed_doc = Document( | |
| page_content=compressed_content, | |
| metadata={ | |
| **doc.metadata, | |
| 'compressed': True, | |
| 'original_length': len(doc.page_content), | |
| 'compressed_length': len(compressed_content) | |
| } | |
| ) | |
| compressed_docs.append(compressed_doc) | |
| # Estimate compressed token count | |
| compressed_tokens = len(compressed_content) // 4 | |
| total_compressed_tokens += compressed_tokens | |
| except Exception as e: | |
| print(f"⚠️ Compression failed for document: {e}") | |
| # Keep original if compression fails | |
| compressed_docs.append(doc) | |
| total_compressed_tokens += original_tokens | |
| # Calculate savings | |
| if total_original_tokens > 0: | |
| savings_pct = ((total_original_tokens - total_compressed_tokens) / total_original_tokens) * 100 | |
| print(f"📉 Compressed {total_original_tokens} → {total_compressed_tokens} tokens ({savings_pct:.1f}% reduction)") | |
| return compressed_docs | |
| def _compress_single(self, query: str, content: str) -> str: | |
| """ | |
| Compress a single document. | |
| Args: | |
| query: Search query | |
| content: Document content | |
| Returns: | |
| Compressed content | |
| """ | |
| prompt = f"""You are a text compression expert. Extract only the sentences from the following text that are directly relevant to answering this query: | |
| Query: "{query}" | |
| Text: | |
| {content} | |
| Instructions: | |
| 1. Extract ONLY sentences that directly answer or relate to the query | |
| 2. Preserve the original wording - do not paraphrase | |
| 3. Remove redundant or tangential information | |
| 4. Keep the extracted sentences in their original order | |
| 5. If multiple sentences are relevant, separate them with a space | |
| Relevant sentences:""" | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant that extracts relevant information."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.1, # Low temperature for consistency | |
| max_tokens=self.max_tokens | |
| ) | |
| compressed = response.choices[0].message.content.strip() | |
| # If compression resulted in empty or very short text, keep original | |
| if len(compressed) < 50: | |
| return content | |
| return compressed | |
| except Exception as e: | |
| print(f"⚠️ Single document compression failed: {e}") | |
| return content | |
| def compress_batch( | |
| self, | |
| query: str, | |
| documents: List[Document], | |
| batch_size: int = 3 | |
| ) -> List[Document]: | |
| """ | |
| Compress documents in batches for efficiency. | |
| Args: | |
| query: Search query | |
| documents: Documents to compress | |
| batch_size: Number of documents to compress per API call | |
| Returns: | |
| Compressed documents | |
| """ | |
| # For now, process individually | |
| # TODO: Implement true batching for better efficiency | |
| return self.compress(query, documents) | |