File size: 12,621 Bytes
c54dcef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
"""Document ingestion, chunking, and hierarchical classification."""

import re
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
import PyPDF2
from core.utils import (
    load_hierarchy,
    generate_doc_id,
    generate_chunk_id,
    detect_language,
    chunk_by_tokens,
    mask_pii
)


class DocumentLoader:
    """Load documents from various file formats."""
    
    def __init__(self, mask_pii: bool = False):
        """
        Initialize document loader.
        
        Args:
            mask_pii: Whether to mask personally identifiable information
        """
        self.mask_pii_enabled = mask_pii
    
    def load_pdf(self, filepath: str) -> Tuple[str, Dict[str, Any]]:
        """
        Load content from PDF file.
        
        Args:
            filepath: Path to PDF file
            
        Returns:
            Tuple of (content, metadata)
        """
        content = []
        metadata = {"source_name": Path(filepath).name, "format": "pdf"}
        
        try:
            with open(filepath, 'rb') as f:
                reader = PyPDF2.PdfReader(f)
                metadata["num_pages"] = len(reader.pages)
                
                for page_num, page in enumerate(reader.pages):
                    text = page.extract_text()
                    if text.strip():
                        content.append(text)
        except Exception as e:
            raise ValueError(f"Error loading PDF {filepath}: {str(e)}")
        
        full_content = "\n\n".join(content)
        
        if self.mask_pii_enabled:
            full_content = mask_pii(full_content)
        
        return full_content, metadata
    
    def load_txt(self, filepath: str) -> Tuple[str, Dict[str, Any]]:
        """
        Load content from text file.
        
        Args:
            filepath: Path to text file
            
        Returns:
            Tuple of (content, metadata)
        """
        metadata = {"source_name": Path(filepath).name, "format": "txt"}
        
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                content = f.read()
        except UnicodeDecodeError:
            # Try different encoding
            with open(filepath, 'r', encoding='latin-1') as f:
                content = f.read()
        
        if self.mask_pii_enabled:
            content = mask_pii(content)
        
        return content, metadata
    
    def load(self, filepath: str) -> Tuple[str, Dict[str, Any]]:
        """
        Load document based on file extension.
        
        Args:
            filepath: Path to document file
            
        Returns:
            Tuple of (content, metadata)
        """
        ext = Path(filepath).suffix.lower()
        
        if ext == '.pdf':
            return self.load_pdf(filepath)
        elif ext == '.txt':
            return self.load_txt(filepath)
        else:
            raise ValueError(f"Unsupported file format: {ext}")


class HierarchicalClassifier:
    """Classify documents into hierarchical categories."""
    
    def __init__(self, hierarchy_name: str):
        """
        Initialize classifier with hierarchy definition.
        
        Args:
            hierarchy_name: Name of hierarchy to use
        """
        self.hierarchy = load_hierarchy(hierarchy_name)
        self.hierarchy_name = hierarchy_name
        self._build_keyword_maps()
    
    def _build_keyword_maps(self) -> None:
        """Build keyword mappings for classification."""
        self.level1_keywords = {}
        self.level2_keywords = {}
        self.level3_keywords = {}
        
        # Level 1: domain keywords
        for domain in self.hierarchy['levels'][0]['values']:
            # Simple keyword extraction from domain name
            keywords = domain.lower().split()
            self.level1_keywords[domain] = keywords
        
        # Level 2: section keywords
        if 'mapping' in self.hierarchy['levels'][1]:
            for domain, sections in self.hierarchy['levels'][1]['mapping'].items():
                for section in sections:
                    keywords = section.lower().split()
                    self.level2_keywords[section] = keywords
        
        # Level 3: topic keywords
        if 'mapping' in self.hierarchy['levels'][2]:
            for section, topics in self.hierarchy['levels'][2]['mapping'].items():
                for topic in topics:
                    keywords = topic.lower().split()
                    self.level3_keywords[topic] = keywords
    
    def classify_text(self, text: str, doc_type: Optional[str] = None) -> Dict[str, str]:
        """
        Classify text into hierarchical categories.
        
        Args:
            text: Text to classify
            doc_type: Optional document type override
            
        Returns:
            Dictionary with level1, level2, level3, and doc_type classifications
        """
        text_lower = text.lower()
        
        # Classify level 1 (domain)
        level1 = self._classify_level1(text_lower)
        
        # Classify level 2 (section) based on level 1
        level2 = self._classify_level2(text_lower, level1)
        
        # Classify level 3 (topic) based on level 2
        level3 = self._classify_level3(text_lower, level2)
        
        # Infer doc_type if not provided
        if doc_type is None:
            doc_type = self._infer_doc_type(text_lower)
        
        return {
            "level1": level1,
            "level2": level2,
            "level3": level3,
            "doc_type": doc_type
        }
    
    def _classify_level1(self, text: str) -> str:
        """Classify domain (level 1)."""
        scores = {}
        
        for domain, keywords in self.level1_keywords.items():
            score = sum(1 for kw in keywords if kw in text)
            scores[domain] = score
        
        # Return domain with highest score, or first domain as default
        if max(scores.values()) > 0:
            return max(scores, key=scores.get)
        return self.hierarchy['levels'][0]['values'][0]
    
    def _classify_level2(self, text: str, level1: str) -> str:
        """Classify section (level 2) based on level 1."""
        if 'mapping' not in self.hierarchy['levels'][1]:
            return "Unknown"
        
        sections = self.hierarchy['levels'][1]['mapping'].get(level1, [])
        if not sections:
            return "Unknown"
        
        scores = {}
        for section in sections:
            keywords = self.level2_keywords.get(section, [])
            score = sum(1 for kw in keywords if kw in text)
            scores[section] = score
        
        if max(scores.values(), default=0) > 0:
            return max(scores, key=scores.get)
        return sections[0]
    
    def _classify_level3(self, text: str, level2: str) -> str:
        """Classify topic (level 3) based on level 2."""
        if 'mapping' not in self.hierarchy['levels'][2]:
            return "Unknown"
        
        topics = self.hierarchy['levels'][2]['mapping'].get(level2, [])
        if not topics:
            return "Unknown"
        
        scores = {}
        for topic in topics:
            keywords = self.level3_keywords.get(topic, [])
            score = sum(1 for kw in keywords if kw in text)
            scores[topic] = score
        
        if max(scores.values(), default=0) > 0:
            return max(scores, key=scores.get)
        return topics[0]
    
    def _infer_doc_type(self, text: str) -> str:
        """Infer document type from content."""
        doc_types = self.hierarchy.get('doc_types', ['unknown'])
        
        type_keywords = {
            'policy': ['policy', 'regulation', 'rule', 'requirement'],
            'manual': ['manual', 'guide', 'instruction', 'procedure'],
            'report': ['report', 'analysis', 'findings', 'results'],
            'protocol': ['protocol', 'standard', 'specification'],
            'faq': ['faq', 'question', 'answer'],
            'agreement': ['agreement', 'contract', 'terms'],
            'guideline': ['guideline', 'recommendation', 'best practice'],
            'paper': ['abstract', 'introduction', 'methodology', 'conclusion'],
            'tutorial': ['tutorial', 'example', 'walkthrough', 'demo'],
            'specification': ['specification', 'requirement', 'definition'],
            'record': ['record', 'resume', 'cv', 'curriculum']
        }
        
        scores = {dt: 0 for dt in doc_types}
        
        for doc_type in doc_types:
            keywords = type_keywords.get(doc_type, [doc_type])
            score = sum(1 for kw in keywords if kw in text)
            scores[doc_type] = score
        
        if max(scores.values()) > 0:
            return max(scores, key=scores.get)
        return doc_types[0]


class DocumentProcessor:
    """Process documents into chunks with metadata."""
    
    def __init__(
        self,
        hierarchy_name: str,
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        mask_pii: bool = False,
        use_llm_classification: bool = False  # Default to False for backward compatibility
    ):
        """
        Initialize document processor.
        
        Args:
            hierarchy_name: Name of hierarchy to use for classification
            chunk_size: Target chunk size in tokens
            chunk_overlap: Number of overlapping tokens between chunks
            mask_pii: Whether to mask PII
            use_llm_classification: Whether to use LLM for classification (requires core/classification.py)
        """
        self.loader = DocumentLoader(mask_pii=mask_pii)
        
        # Try to use improved classifier if available and requested
        if use_llm_classification:
            try:
                from core.classification import ImprovedHierarchicalClassifier
                self.classifier = ImprovedHierarchicalClassifier(
                    hierarchy_name, 
                    use_llm=True
                )
            except ImportError:
                # Fall back to basic classifier
                self.classifier = HierarchicalClassifier(hierarchy_name)
        else:
            self.classifier = HierarchicalClassifier(hierarchy_name)
        
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def process_document(self, filepath: str) -> List[Dict[str, Any]]:
        """
        Process a single document into chunks with metadata.
        
        Args:
            filepath: Path to document file
            
        Returns:
            List of chunk dictionaries with content and metadata
        """
        # Load document
        content, base_metadata = self.loader.load(filepath)
        
        # Generate document ID
        doc_id = generate_doc_id(content)
        
        # Detect language
        lang = detect_language(content)
        
        # Chunk the document
        chunks = chunk_by_tokens(content, self.chunk_size, self.chunk_overlap)
        
        # Process each chunk
        processed_chunks = []
        for i, chunk_text in enumerate(chunks):
            # Classify chunk
            classification = self.classifier.classify_text(chunk_text)
            
            # Build metadata
            metadata = {
                "doc_id": doc_id,
                "chunk_id": generate_chunk_id(doc_id, i),
                "chunk_index": i,
                "source_name": base_metadata["source_name"],
                "lang": lang,
                "level1": classification["level1"],
                "level2": classification["level2"],
                "level3": classification["level3"],
                "doc_type": classification["doc_type"],
                **base_metadata
            }
            
            processed_chunks.append({
                "text": chunk_text,
                "metadata": metadata
            })
        
        return processed_chunks
    
    def process_documents(self, filepaths: List[str]) -> List[Dict[str, Any]]:
        """
        Process multiple documents.
        
        Args:
            filepaths: List of document file paths
            
        Returns:
            List of all chunks from all documents
        """
        all_chunks = []
        
        for filepath in filepaths:
            try:
                chunks = self.process_document(filepath)
                all_chunks.extend(chunks)
            except Exception as e:
                print(f"Error processing {filepath}: {str(e)}")
                continue
        
        return all_chunks