Spaces:
Sleeping
Sleeping
| import os | |
| import yaml | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| import PyPDF2 | |
| from .utils import Chunk, TextProcessor, generate_id | |
| import logging as _logging | |
| _logger = _logging.getLogger("rag_ingest") | |
| import os as _os | |
| _OPENAI_ENABLED = False | |
| try: | |
| from openai import OpenAI as _OpenAI | |
| _OPENAI_ENABLED = True if _os.getenv("OPENAI_API_KEY") else False | |
| except Exception: | |
| _OPENAI_ENABLED = False | |
| class OpenAIMetadataDetector: | |
| """Use OpenAI to detect language, doc_type, and hierarchy levels for a chunk. | |
| Falls back to heuristics when OpenAI is not available. | |
| """ | |
| def __init__(self, hierarchy_manager: 'HierarchyManager'): | |
| self.hierarchy_manager = hierarchy_manager | |
| self.client = _OpenAI() if _OPENAI_ENABLED else None | |
| self.model = _os.getenv("OPENAI_MODEL", "gpt-4o-mini") | |
| def detect(self, text: str) -> Dict[str, Any]: | |
| if not self.client: | |
| return {} | |
| hierarchies = self.hierarchy_manager.list_hierarchies() | |
| prompt = ( | |
| "You are a metadata extractor. Given a text chunk, infer: language (en|ja), " | |
| "document_type (Policy|Manual|FAQ|Report|Note|Guideline), hierarchy_name, level1, level2, level3. " | |
| "CRITICAL: hierarchy_name MUST be exactly one of the following: " | |
| f"{hierarchies}. Do not invent other names. " | |
| "Respond as strict JSON with keys: language, document_type, hierarchy_name, level1, level2, level3. " | |
| "Be concise; if unsure, pick the closest.\n\nText:\n" + text[:2000] | |
| ) | |
| try: | |
| _logger.debug("Calling OpenAI for chunk metadata detection (model=%s)", self.model) | |
| resp = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.0, | |
| ) | |
| content = resp.choices[0].message.content | |
| import json as _json | |
| data = _json.loads(content) | |
| # Enforce allowed hierarchy set | |
| if isinstance(data, dict) and data.get("hierarchy_name") not in hierarchies: | |
| data["hierarchy_name"] = None | |
| _logger.debug("OpenAI chunk metadata inferred: %s", data) | |
| return data if isinstance(data, dict) else {} | |
| except Exception: | |
| _logger.exception("OpenAI chunk metadata detection failed; using heuristics.") | |
| return {} | |
| # Try to import pypdf (newer, more robust PDF library) | |
| try: | |
| from pypdf import PdfReader as PyPdfReader | |
| PYPDF_AVAILABLE = True | |
| except ImportError: | |
| PYPDF_AVAILABLE = False | |
| class DocumentLoader: | |
| """Load documents from various formats""" | |
| def __init__(self): | |
| self.text_processor = TextProcessor() | |
| def load_pdf(self, file_path: str) -> str: | |
| """Load text from PDF file with fallback readers, preserving paragraphs""" | |
| # Validate file exists and is readable | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"PDF file not found: {file_path}") | |
| if not os.path.isfile(file_path): | |
| raise ValueError(f"Path is not a file: {file_path}") | |
| # Check file size | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| raise ValueError(f"PDF file is empty: {file_path}") | |
| # Try pypdf first (more robust) | |
| if PYPDF_AVAILABLE: | |
| try: | |
| with open(file_path, 'rb') as file: | |
| reader = PyPdfReader(file) | |
| text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| if text.strip(): | |
| return self.text_processor.clean_text_preserve_newlines(text) | |
| except Exception as e: | |
| # If pypdf fails, try PyPDF2 as fallback | |
| pass | |
| # Fallback to PyPDF2 | |
| try: | |
| with open(file_path, 'rb') as file: | |
| # Try to read with strict=False for corrupted PDFs | |
| try: | |
| reader = PyPDF2.PdfReader(file, strict=False) | |
| except: | |
| # If strict=False doesn't work, try normal reader | |
| file.seek(0) | |
| reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for i, page in enumerate(reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| except Exception as page_error: | |
| # Skip pages that can't be extracted | |
| continue | |
| if not text.strip(): | |
| raise ValueError(f"No text could be extracted from PDF: {file_path}") | |
| return self.text_processor.clean_text_preserve_newlines(text) | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "EOF marker not found" in error_msg or "EOF" in error_msg: | |
| raise Exception( | |
| f"PDF file appears to be corrupted or incomplete: {file_path}. " | |
| f"This may be due to an incomplete upload or corrupted file. " | |
| f"Please try re-uploading the file or check if the PDF is valid." | |
| ) | |
| else: | |
| raise Exception(f"Error loading PDF {file_path}: {error_msg}") | |
| def load_txt(self, file_path: str) -> str: | |
| """Load text from TXT file preserving paragraphs""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| text = file.read() | |
| return self.text_processor.clean_text_preserve_newlines(text) | |
| except Exception as e: | |
| raise Exception(f"Error loading TXT {file_path}: {str(e)}") | |
| def load_document(self, file_path: str) -> str: | |
| """Load document based on file extension""" | |
| ext = Path(file_path).suffix.lower() | |
| if ext == '.pdf': | |
| return self.load_pdf(file_path) | |
| elif ext == '.txt': | |
| return self.load_txt(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file format: {ext}") | |
| class HierarchyManager: | |
| """Manage hierarchical metadata definitions""" | |
| def __init__(self, hierarchies_dir: str = "hierarchies"): | |
| self.hierarchies_dir = Path(hierarchies_dir) | |
| self.hierarchies = {} | |
| self.load_hierarchies() | |
| def load_hierarchies(self): | |
| """Load all hierarchy definitions""" | |
| for yaml_file in self.hierarchies_dir.glob("*.yaml"): | |
| with open(yaml_file, 'r', encoding='utf-8') as file: | |
| hierarchy_name = yaml_file.stem | |
| self.hierarchies[hierarchy_name] = yaml.safe_load(file) | |
| def get_hierarchy(self, name: str) -> Dict[str, Any]: | |
| """Get hierarchy definition by name""" | |
| if name not in self.hierarchies: | |
| raise ValueError(f"Hierarchy '{name}' not found") | |
| return self.hierarchies[name] | |
| def list_hierarchies(self) -> List[str]: | |
| """List available hierarchies""" | |
| return list(self.hierarchies.keys()) | |
| class DocumentChunker: | |
| """Chunk documents with hierarchical metadata""" | |
| def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.text_processor = TextProcessor() | |
| self.hierarchy_manager = HierarchyManager() | |
| self.ai_detector = OpenAIMetadataDetector(self.hierarchy_manager) | |
| def chunk_document(self, file_path: str, hierarchy: Optional[str], | |
| doc_type: Optional[str], language: Optional[str]) -> List[Chunk]: | |
| """Chunk document with hierarchical metadata per chunk. | |
| - Auto-detects hierarchy/doc_type/language when None or 'Auto'. | |
| - Assigns metadata per chunk to support multi-topic documents. | |
| """ | |
| loader = DocumentLoader() | |
| content = loader.load_document(file_path) | |
| # Auto-detect language if needed | |
| if not language or str(language).lower() == 'auto': | |
| # Prefer OpenAI if available | |
| ai_guess = self.ai_detector.detect(content) | |
| _logger.debug("Language auto-detect: ai_guess=%s", ai_guess.get('language') if isinstance(ai_guess, dict) else None) | |
| language = ai_guess.get('language') if isinstance(ai_guess, dict) and ai_guess.get('language') in ('en','ja') else ( | |
| 'ja' if any('\u3040' <= ch <= '\u30ff' or '\u4e00' <= ch <= '\u9faf' for ch in content) else 'en' | |
| ) | |
| # Prepare list of hierarchy names and definitions | |
| hier_names = self.hierarchy_manager.list_hierarchies() | |
| # If hierarchy is auto, we'll pick best per-chunk later; else load the chosen one | |
| fixed_hierarchy_def = None | |
| if hierarchy and hierarchy.lower() != 'auto': | |
| fixed_hierarchy_def = self.hierarchy_manager.get_hierarchy(hierarchy) | |
| # Simple structural chunking: split on double newlines first, then fall back to token windows | |
| raw_blocks = [b.strip() for b in content.split('\n\n') if b.strip()] | |
| if not raw_blocks: | |
| raw_blocks = [content] | |
| # Further split large blocks into overlapping windows | |
| processed_blocks: List[str] = [] | |
| for block in raw_blocks: | |
| words = block.split() | |
| if len(words) <= self.chunk_size: | |
| processed_blocks.append(block) | |
| else: | |
| step = max(1, self.chunk_size - self.chunk_overlap) | |
| for i in range(0, len(words), step): | |
| processed_blocks.append(' '.join(words[i:i + self.chunk_size])) | |
| # Phase 1: provisional labels for each block | |
| provisional: List[Dict[str, Any]] = [] | |
| # Sticky explicit labels propagate until overridden by new explicit labels | |
| sticky_l1: Optional[str] = None | |
| sticky_l2: Optional[str] = None | |
| for block in processed_blocks: | |
| ai_used = False | |
| ph_hdef = fixed_hierarchy_def | |
| ph_hname = hierarchy if hierarchy and hierarchy.lower() != 'auto' else None | |
| if ph_hdef is None: | |
| ai_guess = self.ai_detector.detect(block) | |
| guess_name = ai_guess.get('hierarchy_name') if isinstance(ai_guess, dict) else None | |
| # 0) Explicit label "Hierarchy: <name>" | |
| import re | |
| mH = re.search(r"^\s*hierarchy\s*:\s*(.+)$", block, flags=re.IGNORECASE | re.MULTILINE) | |
| if mH: | |
| explicit_h = mH.group(1).strip().lower() | |
| for name in hier_names: | |
| if name.lower() in explicit_h or explicit_h in name.lower(): | |
| ph_hdef = self.hierarchy_manager.get_hierarchy(name) | |
| ph_hname = name | |
| ai_used = ai_used or False | |
| # 1) If OpenAI guessed a known hierarchy | |
| if ph_hdef is None and guess_name in hier_names: | |
| ph_hdef = self.hierarchy_manager.get_hierarchy(guess_name) | |
| ph_hname = guess_name | |
| ai_used = True | |
| # 2) Weighted keyword scoring across all hierarchies (level1/2/3 + doc_types + filename hints) | |
| if ph_hdef is None: | |
| best_score = -1 | |
| best_name = None | |
| best_def = None | |
| block_lower = block.lower() | |
| filename_lower = os.path.basename(file_path).lower() | |
| for name in hier_names: | |
| hdef = self.hierarchy_manager.get_hierarchy(name) | |
| score = 0 | |
| # level1 | |
| for v in hdef['levels']['level1']['values']: | |
| if v.lower() in block_lower: | |
| score += 2 | |
| # level2 | |
| for l2_list in hdef['levels']['level2']['values'].values(): | |
| for v in l2_list: | |
| if v.lower() in block_lower: | |
| score += 2 | |
| # level3 | |
| for l3_list in hdef['levels']['level3']['values'].values(): | |
| for v in l3_list: | |
| if v.lower() in block_lower: | |
| score += 1 | |
| # doc_types | |
| for dt in hdef.get('doc_types', []): | |
| if dt.lower() in block_lower: | |
| score += 1 | |
| # filename hint | |
| if name.lower() in filename_lower: | |
| score += 3 | |
| if score > best_score: | |
| best_score = score | |
| best_name = name | |
| best_def = hdef | |
| ph_hdef = best_def if best_def is not None else self.hierarchy_manager.get_hierarchy(hier_names[0]) | |
| ph_hname = best_name or hier_names[0] | |
| ph_dtype = doc_type | |
| if not doc_type or str(doc_type).lower() == 'auto': | |
| ai_guess = self.ai_detector.detect(block) | |
| if isinstance(ai_guess, dict) and ai_guess.get('document_type'): | |
| ph_dtype = ai_guess['document_type'] | |
| ai_used = True | |
| else: | |
| dt_candidates = ph_hdef.get('doc_types', ["Policy", "Manual", "FAQ", "Report", "Note", "Guideline"]) | |
| block_lower = block.lower() | |
| best_dt = dt_candidates[0] | |
| best_score = -1 | |
| for dt in dt_candidates: | |
| s = 0 | |
| if dt.lower() in block_lower: | |
| s += 1 | |
| if dt.lower() == 'faq' and ('faq' in block_lower or 'q:' in block_lower): | |
| s += 1 | |
| if dt.lower() == 'report' and ('report' in block_lower or 'summary' in block_lower): | |
| s += 1 | |
| if s > best_score: | |
| best_score = s | |
| best_dt = dt | |
| ph_dtype = best_dt | |
| content_lower = block.lower() | |
| # Detect explicit labels in this block | |
| import re | |
| exp_l1 = exp_l2 = None | |
| m1 = re.search(r"^\s*domain\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) | |
| m2 = re.search(r"^\s*section\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) | |
| if m1: | |
| exp_l1 = m1.group(1).strip() | |
| if m2: | |
| exp_l2 = m2.group(1).strip() | |
| # Provisional levels | |
| ph_l1 = self._classify_level1(content_lower, ph_hdef) | |
| ph_l2 = self._classify_level2(content_lower, ph_hdef, ph_l1) | |
| # Override with explicit labels when present | |
| def _best_match(name: str, candidates: list[str]) -> str: | |
| name_l = name.lower() | |
| for c in candidates: | |
| cl = c.lower() | |
| if cl == name_l or name_l in cl or cl in name_l: | |
| return c | |
| return candidates[0] if candidates else "General" | |
| if exp_l1: | |
| ph_l1 = _best_match(exp_l1, ph_hdef['levels']['level1']['values']) | |
| sticky_l1 = ph_l1 | |
| if exp_l2: | |
| l2_candidates = ph_hdef['levels']['level2']['values'].get(ph_l1, []) | |
| ph_l2 = _best_match(exp_l2, l2_candidates) | |
| sticky_l2 = ph_l2 | |
| # Apply sticky labels when no explicit labels in this block | |
| if not exp_l1 and sticky_l1: | |
| ph_l1 = sticky_l1 | |
| if not exp_l2 and sticky_l2 and ph_hdef['levels']['level2']['values'].get(ph_l1): | |
| ph_l2 = sticky_l2 | |
| provisional.append({ | |
| 'text': block, | |
| 'hdef': ph_hdef, | |
| 'hname': ph_hname, | |
| 'dtype': ph_dtype, | |
| 'l1': ph_l1, | |
| 'l2': ph_l2, | |
| 'ai': ai_used | |
| }) | |
| # Phase 2: merge adjacent blocks with same labels within size limit | |
| merged_texts: List[str] = [] | |
| merged_meta: List[Dict[str, Any]] = [] | |
| if provisional: | |
| current_text = provisional[0]['text'] | |
| current_meta = provisional[0] | |
| for p in provisional[1:]: | |
| same = (p['hname'] == current_meta['hname'] and p['l1'] == current_meta['l1'] and p['l2'] == current_meta['l2']) | |
| candidate = current_text + "\n\n" + p['text'] if same else current_text | |
| if same and self.text_processor.count_tokens(candidate) <= self.text_processor.count_tokens(current_text) + self.chunk_size: | |
| current_text = candidate | |
| current_meta['ai'] = current_meta['ai'] or p['ai'] | |
| else: | |
| merged_texts.append(current_text) | |
| merged_meta.append(current_meta) | |
| current_text = p['text'] | |
| current_meta = p | |
| merged_texts.append(current_text) | |
| merged_meta.append(current_meta) | |
| # Phase 3: finalize chunks | |
| chunks: List[Chunk] = [] | |
| for text_block, meta in zip(merged_texts, merged_meta): | |
| final_md = self._generate_metadata( | |
| file_path=file_path, | |
| hierarchy_def=meta['hdef'], | |
| doc_type=meta['dtype'], | |
| language=language, | |
| content=text_block | |
| ) | |
| if meta['hname']: | |
| final_md['hierarchy'] = meta['hname'] | |
| final_md['ai_detected'] = meta['ai'] | |
| chunks.append(Chunk( | |
| doc_id=generate_id(), | |
| chunk_id=generate_id(), | |
| content=text_block, | |
| metadata=final_md | |
| )) | |
| return chunks | |
| def _generate_metadata(self, file_path: str, hierarchy_def: Dict[str, Any], | |
| doc_type: str, language: str, content: str) -> Dict[str, Any]: | |
| """Generate hierarchical metadata for chunk""" | |
| # Simple rule-based classification with explicit label override | |
| content_lower = content.lower() | |
| # 1) Try to honor explicit labels like "Domain:", "Section:", "Topic:" | |
| import re | |
| explicit_l1 = explicit_l2 = explicit_l3 = None | |
| m1 = re.search(r"^\s*domain\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) | |
| m2 = re.search(r"^\s*section\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) | |
| m3 = re.search(r"^\s*topic\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) | |
| if m1: | |
| explicit_l1 = m1.group(1).strip() | |
| if m2: | |
| explicit_l2 = m2.group(1).strip() | |
| if m3: | |
| explicit_l3 = m3.group(1).strip() | |
| def _best_match(name: str, candidates: list[str]) -> str: | |
| name_l = name.lower() | |
| # exact contains | |
| for c in candidates: | |
| if c.lower() == name_l or name_l in c.lower() or c.lower() in name_l: | |
| return c | |
| # fallback: first candidate | |
| return candidates[0] if candidates else "General" | |
| if explicit_l1: | |
| level1 = _best_match(explicit_l1, hierarchy_def['levels']['level1']['values']) | |
| else: | |
| level1 = self._classify_level1(content_lower, hierarchy_def) | |
| if explicit_l2: | |
| level2_candidates = hierarchy_def['levels']['level2']['values'].get(level1, []) | |
| level2 = _best_match(explicit_l2, level2_candidates) | |
| else: | |
| level2 = self._classify_level2(content_lower, hierarchy_def, level1) | |
| if explicit_l3: | |
| level3_candidates = hierarchy_def['levels']['level3']['values'].get(level2, []) | |
| level3 = _best_match(explicit_l3, level3_candidates) | |
| else: | |
| level3 = self._classify_level3(content_lower, hierarchy_def, level1, level2) | |
| # Fallback mapping to 'Other' when nothing matches this hierarchy | |
| def _any_present(values: list[str]) -> bool: | |
| return any(v.lower() in content_lower for v in values) | |
| # If no level1 value appears, set to 'Other' | |
| if not _any_present(hierarchy_def['levels']['level1']['values']): | |
| level1 = 'Other' | |
| # If level2 options for chosen level1 exist but none appear, set to 'Other' | |
| l2_opts = hierarchy_def['levels']['level2']['values'].get(level1, []) | |
| if l2_opts and not _any_present(l2_opts): | |
| level2 = 'Other' | |
| # If level3 options for chosen level2 exist but none appear, set to 'Other' | |
| l3_opts = hierarchy_def['levels']['level3']['values'].get(level2, []) | |
| if l3_opts and not _any_present(l3_opts): | |
| level3 = 'Other' | |
| return { | |
| "source_name": os.path.basename(file_path), | |
| "lang": language, | |
| "level1": level1, | |
| "level2": level2, | |
| "level3": level3, | |
| "doc_type": doc_type, | |
| "chunk_size": len(content), | |
| "token_count": self.text_processor.count_tokens(content) | |
| } | |
| def _classify_level1(self, content: str, hierarchy_def: Dict[str, Any]) -> str: | |
| """Classify level1 domain""" | |
| level1_options = hierarchy_def['levels']['level1']['values'] | |
| # Simple keyword matching (enhance with ML model) | |
| keyword_scores = {} | |
| for domain in level1_options: | |
| score = 0 | |
| # Add domain-specific keyword matching logic | |
| if domain.lower() in content: | |
| score += 1 | |
| keyword_scores[domain] = score | |
| return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level1_options[0] | |
| def _classify_level2(self, content: str, hierarchy_def: Dict[str, Any], level1: str) -> str: | |
| """Classify level2 section""" | |
| level2_options = hierarchy_def['levels']['level2']['values'].get(level1, []) | |
| if not level2_options: | |
| return "General" | |
| keyword_scores = {} | |
| for section in level2_options: | |
| score = 0 | |
| if section.lower() in content: | |
| score += 1 | |
| keyword_scores[section] = score | |
| return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level2_options[0] | |
| def _classify_level3(self, content: str, hierarchy_def: Dict[str, Any], | |
| level1: str, level2: str) -> str: | |
| """Classify level3 topic""" | |
| level3_options = hierarchy_def['levels']['level3']['values'].get(level2, []) | |
| if not level3_options: | |
| return "General" | |
| keyword_scores = {} | |
| for topic in level3_options: | |
| score = 0 | |
| if topic.lower() in content: | |
| score += 1 | |
| keyword_scores[topic] = score | |
| return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level3_options[0] |