File size: 6,205 Bytes
7644eac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
Contextual compression module for reducing token usage in RAG.

Contextual compression uses an LLM to extract only the most relevant sentences
from retrieved documents, significantly reducing token count and cost.
"""
import os
from typing import List, Optional
from langchain.schema import Document
from openai import OpenAI


class ContextCompressor:
    """
    LLM-based contextual compressor for RAG optimization.
    
    Takes retrieved documents and extracts only the sentences that are
    directly relevant to the user's query, reducing tokens by 40-60%.
    """
    
    def __init__(
        self,
        api_key: Optional[str] = None,
        model: str = "gpt-3.5-turbo",
        max_tokens: int = 500
    ):
        """
        Initialize context compressor.
        
        Args:
            api_key: OpenAI API key
            model: Model to use for compression
            max_tokens: Maximum tokens per compressed chunk
        """
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        self.model = model
        self.max_tokens = max_tokens
        self.client = None
        
        if self.api_key:
            self.client = OpenAI(api_key=self.api_key)
            print(f"โœ… Context compressor initialized (model: {model})")
        else:
            print("โŒ OPENAI_API_KEY not set. Compression disabled.")
    
    def compress(
        self,
        query: str,
        documents: List[Document]
    ) -> List[Document]:
        """
        Compress documents by extracting only relevant content.
        
        Args:
            query: Original search query
            documents: List of documents to compress
            
        Returns:
            Compressed documents
        """
        if not self.client or not documents:
            return documents
        
        compressed_docs = []
        total_original_tokens = 0
        total_compressed_tokens = 0
        
        for doc in documents:
            # Estimate original token count (rough: 1 token โ‰ˆ 4 chars)
            original_tokens = len(doc.page_content) // 4
            total_original_tokens += original_tokens
            
            # Skip compression for very short documents
            if original_tokens < 100:
                compressed_docs.append(doc)
                total_compressed_tokens += original_tokens
                continue
            
            try:
                # Compress the document
                compressed_content = self._compress_single(query, doc.page_content)
                
                # Create new document with compressed content
                compressed_doc = Document(
                    page_content=compressed_content,
                    metadata={
                        **doc.metadata,
                        'compressed': True,
                        'original_length': len(doc.page_content),
                        'compressed_length': len(compressed_content)
                    }
                )
                
                compressed_docs.append(compressed_doc)
                
                # Estimate compressed token count
                compressed_tokens = len(compressed_content) // 4
                total_compressed_tokens += compressed_tokens
                
            except Exception as e:
                print(f"โš ๏ธ  Compression failed for document: {e}")
                # Keep original if compression fails
                compressed_docs.append(doc)
                total_compressed_tokens += original_tokens
        
        # Calculate savings
        if total_original_tokens > 0:
            savings_pct = ((total_original_tokens - total_compressed_tokens) / total_original_tokens) * 100
            print(f"๐Ÿ“‰ Compressed {total_original_tokens} โ†’ {total_compressed_tokens} tokens ({savings_pct:.1f}% reduction)")
        
        return compressed_docs
    
    def _compress_single(self, query: str, content: str) -> str:
        """
        Compress a single document.
        
        Args:
            query: Search query
            content: Document content
            
        Returns:
            Compressed content
        """
        prompt = f"""You are a text compression expert. Extract only the sentences from the following text that are directly relevant to answering this query:

Query: "{query}"

Text:
{content}

Instructions:
1. Extract ONLY sentences that directly answer or relate to the query
2. Preserve the original wording - do not paraphrase
3. Remove redundant or tangential information
4. Keep the extracted sentences in their original order
5. If multiple sentences are relevant, separate them with a space

Relevant sentences:"""
        
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are a helpful assistant that extracts relevant information."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.1,  # Low temperature for consistency
                max_tokens=self.max_tokens
            )
            
            compressed = response.choices[0].message.content.strip()
            
            # If compression resulted in empty or very short text, keep original
            if len(compressed) < 50:
                return content
            
            return compressed
            
        except Exception as e:
            print(f"โš ๏ธ  Single document compression failed: {e}")
            return content
    
    def compress_batch(
        self,
        query: str,
        documents: List[Document],
        batch_size: int = 3
    ) -> List[Document]:
        """
        Compress documents in batches for efficiency.
        
        Args:
            query: Search query
            documents: Documents to compress
            batch_size: Number of documents to compress per API call
            
        Returns:
            Compressed documents
        """
        # For now, process individually
        # TODO: Implement true batching for better efficiency
        return self.compress(query, documents)