import os import yaml from typing import List, Dict, Any, Optional from pathlib import Path import PyPDF2 from .utils import Chunk, TextProcessor, generate_id import logging as _logging _logger = _logging.getLogger("rag_ingest") import os as _os _OPENAI_ENABLED = False try: from openai import OpenAI as _OpenAI _OPENAI_ENABLED = True if _os.getenv("OPENAI_API_KEY") else False except Exception: _OPENAI_ENABLED = False class OpenAIMetadataDetector: """Use OpenAI to detect language, doc_type, and hierarchy levels for a chunk. Falls back to heuristics when OpenAI is not available. """ def __init__(self, hierarchy_manager: 'HierarchyManager'): self.hierarchy_manager = hierarchy_manager self.client = _OpenAI() if _OPENAI_ENABLED else None self.model = _os.getenv("OPENAI_MODEL", "gpt-4o-mini") def detect(self, text: str) -> Dict[str, Any]: if not self.client: return {} hierarchies = self.hierarchy_manager.list_hierarchies() prompt = ( "You are a metadata extractor. Given a text chunk, infer: language (en|ja), " "document_type (Policy|Manual|FAQ|Report|Note|Guideline), hierarchy_name, level1, level2, level3. " "CRITICAL: hierarchy_name MUST be exactly one of the following: " f"{hierarchies}. Do not invent other names. " "Respond as strict JSON with keys: language, document_type, hierarchy_name, level1, level2, level3. " "Be concise; if unsure, pick the closest.\n\nText:\n" + text[:2000] ) try: _logger.debug("Calling OpenAI for chunk metadata detection (model=%s)", self.model) resp = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.0, ) content = resp.choices[0].message.content import json as _json data = _json.loads(content) # Enforce allowed hierarchy set if isinstance(data, dict) and data.get("hierarchy_name") not in hierarchies: data["hierarchy_name"] = None _logger.debug("OpenAI chunk metadata inferred: %s", data) return data if isinstance(data, dict) else {} except Exception: _logger.exception("OpenAI chunk metadata detection failed; using heuristics.") return {} # Try to import pypdf (newer, more robust PDF library) try: from pypdf import PdfReader as PyPdfReader PYPDF_AVAILABLE = True except ImportError: PYPDF_AVAILABLE = False class DocumentLoader: """Load documents from various formats""" def __init__(self): self.text_processor = TextProcessor() def load_pdf(self, file_path: str) -> str: """Load text from PDF file with fallback readers, preserving paragraphs""" # Validate file exists and is readable if not os.path.exists(file_path): raise FileNotFoundError(f"PDF file not found: {file_path}") if not os.path.isfile(file_path): raise ValueError(f"Path is not a file: {file_path}") # Check file size file_size = os.path.getsize(file_path) if file_size == 0: raise ValueError(f"PDF file is empty: {file_path}") # Try pypdf first (more robust) if PYPDF_AVAILABLE: try: with open(file_path, 'rb') as file: reader = PyPdfReader(file) text = "" for page in reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" if text.strip(): return self.text_processor.clean_text_preserve_newlines(text) except Exception as e: # If pypdf fails, try PyPDF2 as fallback pass # Fallback to PyPDF2 try: with open(file_path, 'rb') as file: # Try to read with strict=False for corrupted PDFs try: reader = PyPDF2.PdfReader(file, strict=False) except: # If strict=False doesn't work, try normal reader file.seek(0) reader = PyPDF2.PdfReader(file) text = "" for i, page in enumerate(reader.pages): try: page_text = page.extract_text() if page_text: text += page_text + "\n" except Exception as page_error: # Skip pages that can't be extracted continue if not text.strip(): raise ValueError(f"No text could be extracted from PDF: {file_path}") return self.text_processor.clean_text_preserve_newlines(text) except Exception as e: error_msg = str(e) if "EOF marker not found" in error_msg or "EOF" in error_msg: raise Exception( f"PDF file appears to be corrupted or incomplete: {file_path}. " f"This may be due to an incomplete upload or corrupted file. " f"Please try re-uploading the file or check if the PDF is valid." ) else: raise Exception(f"Error loading PDF {file_path}: {error_msg}") def load_txt(self, file_path: str) -> str: """Load text from TXT file preserving paragraphs""" try: with open(file_path, 'r', encoding='utf-8') as file: text = file.read() return self.text_processor.clean_text_preserve_newlines(text) except Exception as e: raise Exception(f"Error loading TXT {file_path}: {str(e)}") def load_document(self, file_path: str) -> str: """Load document based on file extension""" ext = Path(file_path).suffix.lower() if ext == '.pdf': return self.load_pdf(file_path) elif ext == '.txt': return self.load_txt(file_path) else: raise ValueError(f"Unsupported file format: {ext}") class HierarchyManager: """Manage hierarchical metadata definitions""" def __init__(self, hierarchies_dir: str = "hierarchies"): self.hierarchies_dir = Path(hierarchies_dir) self.hierarchies = {} self.load_hierarchies() def load_hierarchies(self): """Load all hierarchy definitions""" for yaml_file in self.hierarchies_dir.glob("*.yaml"): with open(yaml_file, 'r', encoding='utf-8') as file: hierarchy_name = yaml_file.stem self.hierarchies[hierarchy_name] = yaml.safe_load(file) def get_hierarchy(self, name: str) -> Dict[str, Any]: """Get hierarchy definition by name""" if name not in self.hierarchies: raise ValueError(f"Hierarchy '{name}' not found") return self.hierarchies[name] def list_hierarchies(self) -> List[str]: """List available hierarchies""" return list(self.hierarchies.keys()) class DocumentChunker: """Chunk documents with hierarchical metadata""" def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.text_processor = TextProcessor() self.hierarchy_manager = HierarchyManager() self.ai_detector = OpenAIMetadataDetector(self.hierarchy_manager) def chunk_document(self, file_path: str, hierarchy: Optional[str], doc_type: Optional[str], language: Optional[str]) -> List[Chunk]: """Chunk document with hierarchical metadata per chunk. - Auto-detects hierarchy/doc_type/language when None or 'Auto'. - Assigns metadata per chunk to support multi-topic documents. """ loader = DocumentLoader() content = loader.load_document(file_path) # Auto-detect language if needed if not language or str(language).lower() == 'auto': # Prefer OpenAI if available ai_guess = self.ai_detector.detect(content) _logger.debug("Language auto-detect: ai_guess=%s", ai_guess.get('language') if isinstance(ai_guess, dict) else None) language = ai_guess.get('language') if isinstance(ai_guess, dict) and ai_guess.get('language') in ('en','ja') else ( 'ja' if any('\u3040' <= ch <= '\u30ff' or '\u4e00' <= ch <= '\u9faf' for ch in content) else 'en' ) # Prepare list of hierarchy names and definitions hier_names = self.hierarchy_manager.list_hierarchies() # If hierarchy is auto, we'll pick best per-chunk later; else load the chosen one fixed_hierarchy_def = None if hierarchy and hierarchy.lower() != 'auto': fixed_hierarchy_def = self.hierarchy_manager.get_hierarchy(hierarchy) # Simple structural chunking: split on double newlines first, then fall back to token windows raw_blocks = [b.strip() for b in content.split('\n\n') if b.strip()] if not raw_blocks: raw_blocks = [content] # Further split large blocks into overlapping windows processed_blocks: List[str] = [] for block in raw_blocks: words = block.split() if len(words) <= self.chunk_size: processed_blocks.append(block) else: step = max(1, self.chunk_size - self.chunk_overlap) for i in range(0, len(words), step): processed_blocks.append(' '.join(words[i:i + self.chunk_size])) # Phase 1: provisional labels for each block provisional: List[Dict[str, Any]] = [] # Sticky explicit labels propagate until overridden by new explicit labels sticky_l1: Optional[str] = None sticky_l2: Optional[str] = None for block in processed_blocks: ai_used = False ph_hdef = fixed_hierarchy_def ph_hname = hierarchy if hierarchy and hierarchy.lower() != 'auto' else None if ph_hdef is None: ai_guess = self.ai_detector.detect(block) guess_name = ai_guess.get('hierarchy_name') if isinstance(ai_guess, dict) else None # 0) Explicit label "Hierarchy: " import re mH = re.search(r"^\s*hierarchy\s*:\s*(.+)$", block, flags=re.IGNORECASE | re.MULTILINE) if mH: explicit_h = mH.group(1).strip().lower() for name in hier_names: if name.lower() in explicit_h or explicit_h in name.lower(): ph_hdef = self.hierarchy_manager.get_hierarchy(name) ph_hname = name ai_used = ai_used or False # 1) If OpenAI guessed a known hierarchy if ph_hdef is None and guess_name in hier_names: ph_hdef = self.hierarchy_manager.get_hierarchy(guess_name) ph_hname = guess_name ai_used = True # 2) Weighted keyword scoring across all hierarchies (level1/2/3 + doc_types + filename hints) if ph_hdef is None: best_score = -1 best_name = None best_def = None block_lower = block.lower() filename_lower = os.path.basename(file_path).lower() for name in hier_names: hdef = self.hierarchy_manager.get_hierarchy(name) score = 0 # level1 for v in hdef['levels']['level1']['values']: if v.lower() in block_lower: score += 2 # level2 for l2_list in hdef['levels']['level2']['values'].values(): for v in l2_list: if v.lower() in block_lower: score += 2 # level3 for l3_list in hdef['levels']['level3']['values'].values(): for v in l3_list: if v.lower() in block_lower: score += 1 # doc_types for dt in hdef.get('doc_types', []): if dt.lower() in block_lower: score += 1 # filename hint if name.lower() in filename_lower: score += 3 if score > best_score: best_score = score best_name = name best_def = hdef ph_hdef = best_def if best_def is not None else self.hierarchy_manager.get_hierarchy(hier_names[0]) ph_hname = best_name or hier_names[0] ph_dtype = doc_type if not doc_type or str(doc_type).lower() == 'auto': ai_guess = self.ai_detector.detect(block) if isinstance(ai_guess, dict) and ai_guess.get('document_type'): ph_dtype = ai_guess['document_type'] ai_used = True else: dt_candidates = ph_hdef.get('doc_types', ["Policy", "Manual", "FAQ", "Report", "Note", "Guideline"]) block_lower = block.lower() best_dt = dt_candidates[0] best_score = -1 for dt in dt_candidates: s = 0 if dt.lower() in block_lower: s += 1 if dt.lower() == 'faq' and ('faq' in block_lower or 'q:' in block_lower): s += 1 if dt.lower() == 'report' and ('report' in block_lower or 'summary' in block_lower): s += 1 if s > best_score: best_score = s best_dt = dt ph_dtype = best_dt content_lower = block.lower() # Detect explicit labels in this block import re exp_l1 = exp_l2 = None m1 = re.search(r"^\s*domain\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) m2 = re.search(r"^\s*section\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) if m1: exp_l1 = m1.group(1).strip() if m2: exp_l2 = m2.group(1).strip() # Provisional levels ph_l1 = self._classify_level1(content_lower, ph_hdef) ph_l2 = self._classify_level2(content_lower, ph_hdef, ph_l1) # Override with explicit labels when present def _best_match(name: str, candidates: list[str]) -> str: name_l = name.lower() for c in candidates: cl = c.lower() if cl == name_l or name_l in cl or cl in name_l: return c return candidates[0] if candidates else "General" if exp_l1: ph_l1 = _best_match(exp_l1, ph_hdef['levels']['level1']['values']) sticky_l1 = ph_l1 if exp_l2: l2_candidates = ph_hdef['levels']['level2']['values'].get(ph_l1, []) ph_l2 = _best_match(exp_l2, l2_candidates) sticky_l2 = ph_l2 # Apply sticky labels when no explicit labels in this block if not exp_l1 and sticky_l1: ph_l1 = sticky_l1 if not exp_l2 and sticky_l2 and ph_hdef['levels']['level2']['values'].get(ph_l1): ph_l2 = sticky_l2 provisional.append({ 'text': block, 'hdef': ph_hdef, 'hname': ph_hname, 'dtype': ph_dtype, 'l1': ph_l1, 'l2': ph_l2, 'ai': ai_used }) # Phase 2: merge adjacent blocks with same labels within size limit merged_texts: List[str] = [] merged_meta: List[Dict[str, Any]] = [] if provisional: current_text = provisional[0]['text'] current_meta = provisional[0] for p in provisional[1:]: same = (p['hname'] == current_meta['hname'] and p['l1'] == current_meta['l1'] and p['l2'] == current_meta['l2']) candidate = current_text + "\n\n" + p['text'] if same else current_text if same and self.text_processor.count_tokens(candidate) <= self.text_processor.count_tokens(current_text) + self.chunk_size: current_text = candidate current_meta['ai'] = current_meta['ai'] or p['ai'] else: merged_texts.append(current_text) merged_meta.append(current_meta) current_text = p['text'] current_meta = p merged_texts.append(current_text) merged_meta.append(current_meta) # Phase 3: finalize chunks chunks: List[Chunk] = [] for text_block, meta in zip(merged_texts, merged_meta): final_md = self._generate_metadata( file_path=file_path, hierarchy_def=meta['hdef'], doc_type=meta['dtype'], language=language, content=text_block ) if meta['hname']: final_md['hierarchy'] = meta['hname'] final_md['ai_detected'] = meta['ai'] chunks.append(Chunk( doc_id=generate_id(), chunk_id=generate_id(), content=text_block, metadata=final_md )) return chunks def _generate_metadata(self, file_path: str, hierarchy_def: Dict[str, Any], doc_type: str, language: str, content: str) -> Dict[str, Any]: """Generate hierarchical metadata for chunk""" # Simple rule-based classification with explicit label override content_lower = content.lower() # 1) Try to honor explicit labels like "Domain:", "Section:", "Topic:" import re explicit_l1 = explicit_l2 = explicit_l3 = None m1 = re.search(r"^\s*domain\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) m2 = re.search(r"^\s*section\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) m3 = re.search(r"^\s*topic\s*:\s*(.+)$", content_lower, flags=re.MULTILINE) if m1: explicit_l1 = m1.group(1).strip() if m2: explicit_l2 = m2.group(1).strip() if m3: explicit_l3 = m3.group(1).strip() def _best_match(name: str, candidates: list[str]) -> str: name_l = name.lower() # exact contains for c in candidates: if c.lower() == name_l or name_l in c.lower() or c.lower() in name_l: return c # fallback: first candidate return candidates[0] if candidates else "General" if explicit_l1: level1 = _best_match(explicit_l1, hierarchy_def['levels']['level1']['values']) else: level1 = self._classify_level1(content_lower, hierarchy_def) if explicit_l2: level2_candidates = hierarchy_def['levels']['level2']['values'].get(level1, []) level2 = _best_match(explicit_l2, level2_candidates) else: level2 = self._classify_level2(content_lower, hierarchy_def, level1) if explicit_l3: level3_candidates = hierarchy_def['levels']['level3']['values'].get(level2, []) level3 = _best_match(explicit_l3, level3_candidates) else: level3 = self._classify_level3(content_lower, hierarchy_def, level1, level2) # Fallback mapping to 'Other' when nothing matches this hierarchy def _any_present(values: list[str]) -> bool: return any(v.lower() in content_lower for v in values) # If no level1 value appears, set to 'Other' if not _any_present(hierarchy_def['levels']['level1']['values']): level1 = 'Other' # If level2 options for chosen level1 exist but none appear, set to 'Other' l2_opts = hierarchy_def['levels']['level2']['values'].get(level1, []) if l2_opts and not _any_present(l2_opts): level2 = 'Other' # If level3 options for chosen level2 exist but none appear, set to 'Other' l3_opts = hierarchy_def['levels']['level3']['values'].get(level2, []) if l3_opts and not _any_present(l3_opts): level3 = 'Other' return { "source_name": os.path.basename(file_path), "lang": language, "level1": level1, "level2": level2, "level3": level3, "doc_type": doc_type, "chunk_size": len(content), "token_count": self.text_processor.count_tokens(content) } def _classify_level1(self, content: str, hierarchy_def: Dict[str, Any]) -> str: """Classify level1 domain""" level1_options = hierarchy_def['levels']['level1']['values'] # Simple keyword matching (enhance with ML model) keyword_scores = {} for domain in level1_options: score = 0 # Add domain-specific keyword matching logic if domain.lower() in content: score += 1 keyword_scores[domain] = score return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level1_options[0] def _classify_level2(self, content: str, hierarchy_def: Dict[str, Any], level1: str) -> str: """Classify level2 section""" level2_options = hierarchy_def['levels']['level2']['values'].get(level1, []) if not level2_options: return "General" keyword_scores = {} for section in level2_options: score = 0 if section.lower() in content: score += 1 keyword_scores[section] = score return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level2_options[0] def _classify_level3(self, content: str, hierarchy_def: Dict[str, Any], level1: str, level2: str) -> str: """Classify level3 topic""" level3_options = hierarchy_def['levels']['level3']['values'].get(level2, []) if not level3_options: return "General" keyword_scores = {} for topic in level3_options: score = 0 if topic.lower() in content: score += 1 keyword_scores[topic] = score return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level3_options[0]