Spaces:
Sleeping
Sleeping
| """Document ingestion, chunking, and hierarchical classification.""" | |
| import re | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from pathlib import Path | |
| import PyPDF2 | |
| from core.utils import ( | |
| load_hierarchy, | |
| generate_doc_id, | |
| generate_chunk_id, | |
| detect_language, | |
| chunk_by_tokens, | |
| mask_pii | |
| ) | |
| class DocumentLoader: | |
| """Load documents from various file formats.""" | |
| def __init__(self, mask_pii: bool = False): | |
| """ | |
| Initialize document loader. | |
| Args: | |
| mask_pii: Whether to mask personally identifiable information | |
| """ | |
| self.mask_pii_enabled = mask_pii | |
| def load_pdf(self, filepath: str) -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| Load content from PDF file. | |
| Args: | |
| filepath: Path to PDF file | |
| Returns: | |
| Tuple of (content, metadata) | |
| """ | |
| content = [] | |
| metadata = {"source_name": Path(filepath).name, "format": "pdf"} | |
| try: | |
| with open(filepath, 'rb') as f: | |
| reader = PyPDF2.PdfReader(f) | |
| metadata["num_pages"] = len(reader.pages) | |
| for page_num, page in enumerate(reader.pages): | |
| text = page.extract_text() | |
| if text.strip(): | |
| content.append(text) | |
| except Exception as e: | |
| raise ValueError(f"Error loading PDF {filepath}: {str(e)}") | |
| full_content = "\n\n".join(content) | |
| if self.mask_pii_enabled: | |
| full_content = mask_pii(full_content) | |
| return full_content, metadata | |
| def load_txt(self, filepath: str) -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| Load content from text file. | |
| Args: | |
| filepath: Path to text file | |
| Returns: | |
| Tuple of (content, metadata) | |
| """ | |
| metadata = {"source_name": Path(filepath).name, "format": "txt"} | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| except UnicodeDecodeError: | |
| # Try different encoding | |
| with open(filepath, 'r', encoding='latin-1') as f: | |
| content = f.read() | |
| if self.mask_pii_enabled: | |
| content = mask_pii(content) | |
| return content, metadata | |
| def load(self, filepath: str) -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| Load document based on file extension. | |
| Args: | |
| filepath: Path to document file | |
| Returns: | |
| Tuple of (content, metadata) | |
| """ | |
| ext = Path(filepath).suffix.lower() | |
| if ext == '.pdf': | |
| return self.load_pdf(filepath) | |
| elif ext == '.txt': | |
| return self.load_txt(filepath) | |
| else: | |
| raise ValueError(f"Unsupported file format: {ext}") | |
| class HierarchicalClassifier: | |
| """Classify documents into hierarchical categories.""" | |
| def __init__(self, hierarchy_name: str): | |
| """ | |
| Initialize classifier with hierarchy definition. | |
| Args: | |
| hierarchy_name: Name of hierarchy to use | |
| """ | |
| self.hierarchy = load_hierarchy(hierarchy_name) | |
| self.hierarchy_name = hierarchy_name | |
| self._build_keyword_maps() | |
| def _build_keyword_maps(self) -> None: | |
| """Build keyword mappings for classification.""" | |
| self.level1_keywords = {} | |
| self.level2_keywords = {} | |
| self.level3_keywords = {} | |
| # Level 1: domain keywords | |
| for domain in self.hierarchy['levels'][0]['values']: | |
| # Simple keyword extraction from domain name | |
| keywords = domain.lower().split() | |
| self.level1_keywords[domain] = keywords | |
| # Level 2: section keywords | |
| if 'mapping' in self.hierarchy['levels'][1]: | |
| for domain, sections in self.hierarchy['levels'][1]['mapping'].items(): | |
| for section in sections: | |
| keywords = section.lower().split() | |
| self.level2_keywords[section] = keywords | |
| # Level 3: topic keywords | |
| if 'mapping' in self.hierarchy['levels'][2]: | |
| for section, topics in self.hierarchy['levels'][2]['mapping'].items(): | |
| for topic in topics: | |
| keywords = topic.lower().split() | |
| self.level3_keywords[topic] = keywords | |
| def classify_text(self, text: str, doc_type: Optional[str] = None) -> Dict[str, str]: | |
| """ | |
| Classify text into hierarchical categories. | |
| Args: | |
| text: Text to classify | |
| doc_type: Optional document type override | |
| Returns: | |
| Dictionary with level1, level2, level3, and doc_type classifications | |
| """ | |
| text_lower = text.lower() | |
| # Classify level 1 (domain) | |
| level1 = self._classify_level1(text_lower) | |
| # Classify level 2 (section) based on level 1 | |
| level2 = self._classify_level2(text_lower, level1) | |
| # Classify level 3 (topic) based on level 2 | |
| level3 = self._classify_level3(text_lower, level2) | |
| # Infer doc_type if not provided | |
| if doc_type is None: | |
| doc_type = self._infer_doc_type(text_lower) | |
| return { | |
| "level1": level1, | |
| "level2": level2, | |
| "level3": level3, | |
| "doc_type": doc_type | |
| } | |
| def _classify_level1(self, text: str) -> str: | |
| """Classify domain (level 1).""" | |
| scores = {} | |
| for domain, keywords in self.level1_keywords.items(): | |
| score = sum(1 for kw in keywords if kw in text) | |
| scores[domain] = score | |
| # Return domain with highest score, or first domain as default | |
| if max(scores.values()) > 0: | |
| return max(scores, key=scores.get) | |
| return self.hierarchy['levels'][0]['values'][0] | |
| def _classify_level2(self, text: str, level1: str) -> str: | |
| """Classify section (level 2) based on level 1.""" | |
| if 'mapping' not in self.hierarchy['levels'][1]: | |
| return "Unknown" | |
| sections = self.hierarchy['levels'][1]['mapping'].get(level1, []) | |
| if not sections: | |
| return "Unknown" | |
| scores = {} | |
| for section in sections: | |
| keywords = self.level2_keywords.get(section, []) | |
| score = sum(1 for kw in keywords if kw in text) | |
| scores[section] = score | |
| if max(scores.values(), default=0) > 0: | |
| return max(scores, key=scores.get) | |
| return sections[0] | |
| def _classify_level3(self, text: str, level2: str) -> str: | |
| """Classify topic (level 3) based on level 2.""" | |
| if 'mapping' not in self.hierarchy['levels'][2]: | |
| return "Unknown" | |
| topics = self.hierarchy['levels'][2]['mapping'].get(level2, []) | |
| if not topics: | |
| return "Unknown" | |
| scores = {} | |
| for topic in topics: | |
| keywords = self.level3_keywords.get(topic, []) | |
| score = sum(1 for kw in keywords if kw in text) | |
| scores[topic] = score | |
| if max(scores.values(), default=0) > 0: | |
| return max(scores, key=scores.get) | |
| return topics[0] | |
| def _infer_doc_type(self, text: str) -> str: | |
| """Infer document type from content.""" | |
| doc_types = self.hierarchy.get('doc_types', ['unknown']) | |
| type_keywords = { | |
| 'policy': ['policy', 'regulation', 'rule', 'requirement'], | |
| 'manual': ['manual', 'guide', 'instruction', 'procedure'], | |
| 'report': ['report', 'analysis', 'findings', 'results'], | |
| 'protocol': ['protocol', 'standard', 'specification'], | |
| 'faq': ['faq', 'question', 'answer'], | |
| 'agreement': ['agreement', 'contract', 'terms'], | |
| 'guideline': ['guideline', 'recommendation', 'best practice'], | |
| 'paper': ['abstract', 'introduction', 'methodology', 'conclusion'], | |
| 'tutorial': ['tutorial', 'example', 'walkthrough', 'demo'], | |
| 'specification': ['specification', 'requirement', 'definition'], | |
| 'record': ['record', 'resume', 'cv', 'curriculum'] | |
| } | |
| scores = {dt: 0 for dt in doc_types} | |
| for doc_type in doc_types: | |
| keywords = type_keywords.get(doc_type, [doc_type]) | |
| score = sum(1 for kw in keywords if kw in text) | |
| scores[doc_type] = score | |
| if max(scores.values()) > 0: | |
| return max(scores, key=scores.get) | |
| return doc_types[0] | |
| class DocumentProcessor: | |
| """Process documents into chunks with metadata.""" | |
| def __init__( | |
| self, | |
| hierarchy_name: str, | |
| chunk_size: int = 512, | |
| chunk_overlap: int = 50, | |
| mask_pii: bool = False, | |
| use_llm_classification: bool = False # Default to False for backward compatibility | |
| ): | |
| """ | |
| Initialize document processor. | |
| Args: | |
| hierarchy_name: Name of hierarchy to use for classification | |
| chunk_size: Target chunk size in tokens | |
| chunk_overlap: Number of overlapping tokens between chunks | |
| mask_pii: Whether to mask PII | |
| use_llm_classification: Whether to use LLM for classification (requires core/classification.py) | |
| """ | |
| self.loader = DocumentLoader(mask_pii=mask_pii) | |
| # Try to use improved classifier if available and requested | |
| if use_llm_classification: | |
| try: | |
| from core.classification import ImprovedHierarchicalClassifier | |
| self.classifier = ImprovedHierarchicalClassifier( | |
| hierarchy_name, | |
| use_llm=True | |
| ) | |
| except ImportError: | |
| # Fall back to basic classifier | |
| self.classifier = HierarchicalClassifier(hierarchy_name) | |
| else: | |
| self.classifier = HierarchicalClassifier(hierarchy_name) | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| def process_document(self, filepath: str) -> List[Dict[str, Any]]: | |
| """ | |
| Process a single document into chunks with metadata. | |
| Args: | |
| filepath: Path to document file | |
| Returns: | |
| List of chunk dictionaries with content and metadata | |
| """ | |
| # Load document | |
| content, base_metadata = self.loader.load(filepath) | |
| # Generate document ID | |
| doc_id = generate_doc_id(content) | |
| # Detect language | |
| lang = detect_language(content) | |
| # Chunk the document | |
| chunks = chunk_by_tokens(content, self.chunk_size, self.chunk_overlap) | |
| # Process each chunk | |
| processed_chunks = [] | |
| for i, chunk_text in enumerate(chunks): | |
| # Classify chunk | |
| classification = self.classifier.classify_text(chunk_text) | |
| # Build metadata | |
| metadata = { | |
| "doc_id": doc_id, | |
| "chunk_id": generate_chunk_id(doc_id, i), | |
| "chunk_index": i, | |
| "source_name": base_metadata["source_name"], | |
| "lang": lang, | |
| "level1": classification["level1"], | |
| "level2": classification["level2"], | |
| "level3": classification["level3"], | |
| "doc_type": classification["doc_type"], | |
| **base_metadata | |
| } | |
| processed_chunks.append({ | |
| "text": chunk_text, | |
| "metadata": metadata | |
| }) | |
| return processed_chunks | |
| def process_documents(self, filepaths: List[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Process multiple documents. | |
| Args: | |
| filepaths: List of document file paths | |
| Returns: | |
| List of all chunks from all documents | |
| """ | |
| all_chunks = [] | |
| for filepath in filepaths: | |
| try: | |
| chunks = self.process_document(filepath) | |
| all_chunks.extend(chunks) | |
| except Exception as e: | |
| print(f"Error processing {filepath}: {str(e)}") | |
| continue | |
| return all_chunks |