"""Document ingestion, chunking, and hierarchical classification.""" import re from typing import List, Dict, Any, Optional, Tuple from pathlib import Path import PyPDF2 from core.utils import ( load_hierarchy, generate_doc_id, generate_chunk_id, detect_language, chunk_by_tokens, mask_pii ) class DocumentLoader: """Load documents from various file formats.""" def __init__(self, mask_pii: bool = False): """ Initialize document loader. Args: mask_pii: Whether to mask personally identifiable information """ self.mask_pii_enabled = mask_pii def load_pdf(self, filepath: str) -> Tuple[str, Dict[str, Any]]: """ Load content from PDF file. Args: filepath: Path to PDF file Returns: Tuple of (content, metadata) """ content = [] metadata = {"source_name": Path(filepath).name, "format": "pdf"} try: with open(filepath, 'rb') as f: reader = PyPDF2.PdfReader(f) metadata["num_pages"] = len(reader.pages) for page_num, page in enumerate(reader.pages): text = page.extract_text() if text.strip(): content.append(text) except Exception as e: raise ValueError(f"Error loading PDF {filepath}: {str(e)}") full_content = "\n\n".join(content) if self.mask_pii_enabled: full_content = mask_pii(full_content) return full_content, metadata def load_txt(self, filepath: str) -> Tuple[str, Dict[str, Any]]: """ Load content from text file. Args: filepath: Path to text file Returns: Tuple of (content, metadata) """ metadata = {"source_name": Path(filepath).name, "format": "txt"} try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # Try different encoding with open(filepath, 'r', encoding='latin-1') as f: content = f.read() if self.mask_pii_enabled: content = mask_pii(content) return content, metadata def load(self, filepath: str) -> Tuple[str, Dict[str, Any]]: """ Load document based on file extension. Args: filepath: Path to document file Returns: Tuple of (content, metadata) """ ext = Path(filepath).suffix.lower() if ext == '.pdf': return self.load_pdf(filepath) elif ext == '.txt': return self.load_txt(filepath) else: raise ValueError(f"Unsupported file format: {ext}") class HierarchicalClassifier: """Classify documents into hierarchical categories.""" def __init__(self, hierarchy_name: str): """ Initialize classifier with hierarchy definition. Args: hierarchy_name: Name of hierarchy to use """ self.hierarchy = load_hierarchy(hierarchy_name) self.hierarchy_name = hierarchy_name self._build_keyword_maps() def _build_keyword_maps(self) -> None: """Build keyword mappings for classification.""" self.level1_keywords = {} self.level2_keywords = {} self.level3_keywords = {} # Level 1: domain keywords for domain in self.hierarchy['levels'][0]['values']: # Simple keyword extraction from domain name keywords = domain.lower().split() self.level1_keywords[domain] = keywords # Level 2: section keywords if 'mapping' in self.hierarchy['levels'][1]: for domain, sections in self.hierarchy['levels'][1]['mapping'].items(): for section in sections: keywords = section.lower().split() self.level2_keywords[section] = keywords # Level 3: topic keywords if 'mapping' in self.hierarchy['levels'][2]: for section, topics in self.hierarchy['levels'][2]['mapping'].items(): for topic in topics: keywords = topic.lower().split() self.level3_keywords[topic] = keywords def classify_text(self, text: str, doc_type: Optional[str] = None) -> Dict[str, str]: """ Classify text into hierarchical categories. Args: text: Text to classify doc_type: Optional document type override Returns: Dictionary with level1, level2, level3, and doc_type classifications """ text_lower = text.lower() # Classify level 1 (domain) level1 = self._classify_level1(text_lower) # Classify level 2 (section) based on level 1 level2 = self._classify_level2(text_lower, level1) # Classify level 3 (topic) based on level 2 level3 = self._classify_level3(text_lower, level2) # Infer doc_type if not provided if doc_type is None: doc_type = self._infer_doc_type(text_lower) return { "level1": level1, "level2": level2, "level3": level3, "doc_type": doc_type } def _classify_level1(self, text: str) -> str: """Classify domain (level 1).""" scores = {} for domain, keywords in self.level1_keywords.items(): score = sum(1 for kw in keywords if kw in text) scores[domain] = score # Return domain with highest score, or first domain as default if max(scores.values()) > 0: return max(scores, key=scores.get) return self.hierarchy['levels'][0]['values'][0] def _classify_level2(self, text: str, level1: str) -> str: """Classify section (level 2) based on level 1.""" if 'mapping' not in self.hierarchy['levels'][1]: return "Unknown" sections = self.hierarchy['levels'][1]['mapping'].get(level1, []) if not sections: return "Unknown" scores = {} for section in sections: keywords = self.level2_keywords.get(section, []) score = sum(1 for kw in keywords if kw in text) scores[section] = score if max(scores.values(), default=0) > 0: return max(scores, key=scores.get) return sections[0] def _classify_level3(self, text: str, level2: str) -> str: """Classify topic (level 3) based on level 2.""" if 'mapping' not in self.hierarchy['levels'][2]: return "Unknown" topics = self.hierarchy['levels'][2]['mapping'].get(level2, []) if not topics: return "Unknown" scores = {} for topic in topics: keywords = self.level3_keywords.get(topic, []) score = sum(1 for kw in keywords if kw in text) scores[topic] = score if max(scores.values(), default=0) > 0: return max(scores, key=scores.get) return topics[0] def _infer_doc_type(self, text: str) -> str: """Infer document type from content.""" doc_types = self.hierarchy.get('doc_types', ['unknown']) type_keywords = { 'policy': ['policy', 'regulation', 'rule', 'requirement'], 'manual': ['manual', 'guide', 'instruction', 'procedure'], 'report': ['report', 'analysis', 'findings', 'results'], 'protocol': ['protocol', 'standard', 'specification'], 'faq': ['faq', 'question', 'answer'], 'agreement': ['agreement', 'contract', 'terms'], 'guideline': ['guideline', 'recommendation', 'best practice'], 'paper': ['abstract', 'introduction', 'methodology', 'conclusion'], 'tutorial': ['tutorial', 'example', 'walkthrough', 'demo'], 'specification': ['specification', 'requirement', 'definition'], 'record': ['record', 'resume', 'cv', 'curriculum'] } scores = {dt: 0 for dt in doc_types} for doc_type in doc_types: keywords = type_keywords.get(doc_type, [doc_type]) score = sum(1 for kw in keywords if kw in text) scores[doc_type] = score if max(scores.values()) > 0: return max(scores, key=scores.get) return doc_types[0] class DocumentProcessor: """Process documents into chunks with metadata.""" def __init__( self, hierarchy_name: str, chunk_size: int = 512, chunk_overlap: int = 50, mask_pii: bool = False, use_llm_classification: bool = False # Default to False for backward compatibility ): """ Initialize document processor. Args: hierarchy_name: Name of hierarchy to use for classification chunk_size: Target chunk size in tokens chunk_overlap: Number of overlapping tokens between chunks mask_pii: Whether to mask PII use_llm_classification: Whether to use LLM for classification (requires core/classification.py) """ self.loader = DocumentLoader(mask_pii=mask_pii) # Try to use improved classifier if available and requested if use_llm_classification: try: from core.classification import ImprovedHierarchicalClassifier self.classifier = ImprovedHierarchicalClassifier( hierarchy_name, use_llm=True ) except ImportError: # Fall back to basic classifier self.classifier = HierarchicalClassifier(hierarchy_name) else: self.classifier = HierarchicalClassifier(hierarchy_name) self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def process_document(self, filepath: str) -> List[Dict[str, Any]]: """ Process a single document into chunks with metadata. Args: filepath: Path to document file Returns: List of chunk dictionaries with content and metadata """ # Load document content, base_metadata = self.loader.load(filepath) # Generate document ID doc_id = generate_doc_id(content) # Detect language lang = detect_language(content) # Chunk the document chunks = chunk_by_tokens(content, self.chunk_size, self.chunk_overlap) # Process each chunk processed_chunks = [] for i, chunk_text in enumerate(chunks): # Classify chunk classification = self.classifier.classify_text(chunk_text) # Build metadata metadata = { "doc_id": doc_id, "chunk_id": generate_chunk_id(doc_id, i), "chunk_index": i, "source_name": base_metadata["source_name"], "lang": lang, "level1": classification["level1"], "level2": classification["level2"], "level3": classification["level3"], "doc_type": classification["doc_type"], **base_metadata } processed_chunks.append({ "text": chunk_text, "metadata": metadata }) return processed_chunks def process_documents(self, filepaths: List[str]) -> List[Dict[str, Any]]: """ Process multiple documents. Args: filepaths: List of document file paths Returns: List of all chunks from all documents """ all_chunks = [] for filepath in filepaths: try: chunks = self.process_document(filepath) all_chunks.extend(chunks) except Exception as e: print(f"Error processing {filepath}: {str(e)}") continue return all_chunks