soft.engineer
add setting tab
58259d1
import os
import yaml
from typing import List, Dict, Any, Optional
from pathlib import Path
import PyPDF2
from .utils import Chunk, TextProcessor, generate_id
import logging as _logging
_logger = _logging.getLogger("rag_ingest")
import os as _os
_OPENAI_ENABLED = False
try:
from openai import OpenAI as _OpenAI
_OPENAI_ENABLED = True if _os.getenv("OPENAI_API_KEY") else False
except Exception:
_OPENAI_ENABLED = False
class OpenAIMetadataDetector:
"""Use OpenAI to detect language, doc_type, and hierarchy levels for a chunk.
Falls back to heuristics when OpenAI is not available.
"""
def __init__(self, hierarchy_manager: 'HierarchyManager'):
self.hierarchy_manager = hierarchy_manager
self.client = _OpenAI() if _OPENAI_ENABLED else None
self.model = _os.getenv("OPENAI_MODEL", "gpt-4o-mini")
def detect(self, text: str) -> Dict[str, Any]:
if not self.client:
return {}
hierarchies = self.hierarchy_manager.list_hierarchies()
prompt = (
"You are a metadata extractor. Given a text chunk, infer: language (en|ja), "
"document_type (Policy|Manual|FAQ|Report|Note|Guideline), hierarchy_name, level1, level2, level3. "
"CRITICAL: hierarchy_name MUST be exactly one of the following: "
f"{hierarchies}. Do not invent other names. "
"Respond as strict JSON with keys: language, document_type, hierarchy_name, level1, level2, level3. "
"Be concise; if unsure, pick the closest.\n\nText:\n" + text[:2000]
)
try:
_logger.debug("Calling OpenAI for chunk metadata detection (model=%s)", self.model)
resp = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
)
content = resp.choices[0].message.content
import json as _json
data = _json.loads(content)
# Enforce allowed hierarchy set
if isinstance(data, dict) and data.get("hierarchy_name") not in hierarchies:
data["hierarchy_name"] = None
_logger.debug("OpenAI chunk metadata inferred: %s", data)
return data if isinstance(data, dict) else {}
except Exception:
_logger.exception("OpenAI chunk metadata detection failed; using heuristics.")
return {}
# Try to import pypdf (newer, more robust PDF library)
try:
from pypdf import PdfReader as PyPdfReader
PYPDF_AVAILABLE = True
except ImportError:
PYPDF_AVAILABLE = False
class DocumentLoader:
"""Load documents from various formats"""
def __init__(self):
self.text_processor = TextProcessor()
def load_pdf(self, file_path: str) -> str:
"""Load text from PDF file with fallback readers, preserving paragraphs"""
# Validate file exists and is readable
if not os.path.exists(file_path):
raise FileNotFoundError(f"PDF file not found: {file_path}")
if not os.path.isfile(file_path):
raise ValueError(f"Path is not a file: {file_path}")
# Check file size
file_size = os.path.getsize(file_path)
if file_size == 0:
raise ValueError(f"PDF file is empty: {file_path}")
# Try pypdf first (more robust)
if PYPDF_AVAILABLE:
try:
with open(file_path, 'rb') as file:
reader = PyPdfReader(file)
text = ""
for page in reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if text.strip():
return self.text_processor.clean_text_preserve_newlines(text)
except Exception as e:
# If pypdf fails, try PyPDF2 as fallback
pass
# Fallback to PyPDF2
try:
with open(file_path, 'rb') as file:
# Try to read with strict=False for corrupted PDFs
try:
reader = PyPDF2.PdfReader(file, strict=False)
except:
# If strict=False doesn't work, try normal reader
file.seek(0)
reader = PyPDF2.PdfReader(file)
text = ""
for i, page in enumerate(reader.pages):
try:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
except Exception as page_error:
# Skip pages that can't be extracted
continue
if not text.strip():
raise ValueError(f"No text could be extracted from PDF: {file_path}")
return self.text_processor.clean_text_preserve_newlines(text)
except Exception as e:
error_msg = str(e)
if "EOF marker not found" in error_msg or "EOF" in error_msg:
raise Exception(
f"PDF file appears to be corrupted or incomplete: {file_path}. "
f"This may be due to an incomplete upload or corrupted file. "
f"Please try re-uploading the file or check if the PDF is valid."
)
else:
raise Exception(f"Error loading PDF {file_path}: {error_msg}")
def load_txt(self, file_path: str) -> str:
"""Load text from TXT file preserving paragraphs"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
return self.text_processor.clean_text_preserve_newlines(text)
except Exception as e:
raise Exception(f"Error loading TXT {file_path}: {str(e)}")
def load_document(self, file_path: str) -> str:
"""Load document based on file extension"""
ext = Path(file_path).suffix.lower()
if ext == '.pdf':
return self.load_pdf(file_path)
elif ext == '.txt':
return self.load_txt(file_path)
else:
raise ValueError(f"Unsupported file format: {ext}")
class HierarchyManager:
"""Manage hierarchical metadata definitions"""
def __init__(self, hierarchies_dir: str = "hierarchies"):
self.hierarchies_dir = Path(hierarchies_dir)
self.hierarchies = {}
self.load_hierarchies()
def load_hierarchies(self):
"""Load all hierarchy definitions"""
for yaml_file in self.hierarchies_dir.glob("*.yaml"):
with open(yaml_file, 'r', encoding='utf-8') as file:
hierarchy_name = yaml_file.stem
self.hierarchies[hierarchy_name] = yaml.safe_load(file)
def get_hierarchy(self, name: str) -> Dict[str, Any]:
"""Get hierarchy definition by name"""
if name not in self.hierarchies:
raise ValueError(f"Hierarchy '{name}' not found")
return self.hierarchies[name]
def list_hierarchies(self) -> List[str]:
"""List available hierarchies"""
return list(self.hierarchies.keys())
class DocumentChunker:
"""Chunk documents with hierarchical metadata"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_processor = TextProcessor()
self.hierarchy_manager = HierarchyManager()
self.ai_detector = OpenAIMetadataDetector(self.hierarchy_manager)
def chunk_document(self, file_path: str, hierarchy: Optional[str],
doc_type: Optional[str], language: Optional[str]) -> List[Chunk]:
"""Chunk document with hierarchical metadata per chunk.
- Auto-detects hierarchy/doc_type/language when None or 'Auto'.
- Assigns metadata per chunk to support multi-topic documents.
"""
loader = DocumentLoader()
content = loader.load_document(file_path)
# Auto-detect language if needed
if not language or str(language).lower() == 'auto':
# Prefer OpenAI if available
ai_guess = self.ai_detector.detect(content)
_logger.debug("Language auto-detect: ai_guess=%s", ai_guess.get('language') if isinstance(ai_guess, dict) else None)
language = ai_guess.get('language') if isinstance(ai_guess, dict) and ai_guess.get('language') in ('en','ja') else (
'ja' if any('\u3040' <= ch <= '\u30ff' or '\u4e00' <= ch <= '\u9faf' for ch in content) else 'en'
)
# Prepare list of hierarchy names and definitions
hier_names = self.hierarchy_manager.list_hierarchies()
# If hierarchy is auto, we'll pick best per-chunk later; else load the chosen one
fixed_hierarchy_def = None
if hierarchy and hierarchy.lower() != 'auto':
fixed_hierarchy_def = self.hierarchy_manager.get_hierarchy(hierarchy)
# Simple structural chunking: split on double newlines first, then fall back to token windows
raw_blocks = [b.strip() for b in content.split('\n\n') if b.strip()]
if not raw_blocks:
raw_blocks = [content]
# Further split large blocks into overlapping windows
processed_blocks: List[str] = []
for block in raw_blocks:
words = block.split()
if len(words) <= self.chunk_size:
processed_blocks.append(block)
else:
step = max(1, self.chunk_size - self.chunk_overlap)
for i in range(0, len(words), step):
processed_blocks.append(' '.join(words[i:i + self.chunk_size]))
# Phase 1: provisional labels for each block
provisional: List[Dict[str, Any]] = []
# Sticky explicit labels propagate until overridden by new explicit labels
sticky_l1: Optional[str] = None
sticky_l2: Optional[str] = None
for block in processed_blocks:
ai_used = False
ph_hdef = fixed_hierarchy_def
ph_hname = hierarchy if hierarchy and hierarchy.lower() != 'auto' else None
if ph_hdef is None:
ai_guess = self.ai_detector.detect(block)
guess_name = ai_guess.get('hierarchy_name') if isinstance(ai_guess, dict) else None
# 0) Explicit label "Hierarchy: <name>"
import re
mH = re.search(r"^\s*hierarchy\s*:\s*(.+)$", block, flags=re.IGNORECASE | re.MULTILINE)
if mH:
explicit_h = mH.group(1).strip().lower()
for name in hier_names:
if name.lower() in explicit_h or explicit_h in name.lower():
ph_hdef = self.hierarchy_manager.get_hierarchy(name)
ph_hname = name
ai_used = ai_used or False
# 1) If OpenAI guessed a known hierarchy
if ph_hdef is None and guess_name in hier_names:
ph_hdef = self.hierarchy_manager.get_hierarchy(guess_name)
ph_hname = guess_name
ai_used = True
# 2) Weighted keyword scoring across all hierarchies (level1/2/3 + doc_types + filename hints)
if ph_hdef is None:
best_score = -1
best_name = None
best_def = None
block_lower = block.lower()
filename_lower = os.path.basename(file_path).lower()
for name in hier_names:
hdef = self.hierarchy_manager.get_hierarchy(name)
score = 0
# level1
for v in hdef['levels']['level1']['values']:
if v.lower() in block_lower:
score += 2
# level2
for l2_list in hdef['levels']['level2']['values'].values():
for v in l2_list:
if v.lower() in block_lower:
score += 2
# level3
for l3_list in hdef['levels']['level3']['values'].values():
for v in l3_list:
if v.lower() in block_lower:
score += 1
# doc_types
for dt in hdef.get('doc_types', []):
if dt.lower() in block_lower:
score += 1
# filename hint
if name.lower() in filename_lower:
score += 3
if score > best_score:
best_score = score
best_name = name
best_def = hdef
ph_hdef = best_def if best_def is not None else self.hierarchy_manager.get_hierarchy(hier_names[0])
ph_hname = best_name or hier_names[0]
ph_dtype = doc_type
if not doc_type or str(doc_type).lower() == 'auto':
ai_guess = self.ai_detector.detect(block)
if isinstance(ai_guess, dict) and ai_guess.get('document_type'):
ph_dtype = ai_guess['document_type']
ai_used = True
else:
dt_candidates = ph_hdef.get('doc_types', ["Policy", "Manual", "FAQ", "Report", "Note", "Guideline"])
block_lower = block.lower()
best_dt = dt_candidates[0]
best_score = -1
for dt in dt_candidates:
s = 0
if dt.lower() in block_lower:
s += 1
if dt.lower() == 'faq' and ('faq' in block_lower or 'q:' in block_lower):
s += 1
if dt.lower() == 'report' and ('report' in block_lower or 'summary' in block_lower):
s += 1
if s > best_score:
best_score = s
best_dt = dt
ph_dtype = best_dt
content_lower = block.lower()
# Detect explicit labels in this block
import re
exp_l1 = exp_l2 = None
m1 = re.search(r"^\s*domain\s*:\s*(.+)$", content_lower, flags=re.MULTILINE)
m2 = re.search(r"^\s*section\s*:\s*(.+)$", content_lower, flags=re.MULTILINE)
if m1:
exp_l1 = m1.group(1).strip()
if m2:
exp_l2 = m2.group(1).strip()
# Provisional levels
ph_l1 = self._classify_level1(content_lower, ph_hdef)
ph_l2 = self._classify_level2(content_lower, ph_hdef, ph_l1)
# Override with explicit labels when present
def _best_match(name: str, candidates: list[str]) -> str:
name_l = name.lower()
for c in candidates:
cl = c.lower()
if cl == name_l or name_l in cl or cl in name_l:
return c
return candidates[0] if candidates else "General"
if exp_l1:
ph_l1 = _best_match(exp_l1, ph_hdef['levels']['level1']['values'])
sticky_l1 = ph_l1
if exp_l2:
l2_candidates = ph_hdef['levels']['level2']['values'].get(ph_l1, [])
ph_l2 = _best_match(exp_l2, l2_candidates)
sticky_l2 = ph_l2
# Apply sticky labels when no explicit labels in this block
if not exp_l1 and sticky_l1:
ph_l1 = sticky_l1
if not exp_l2 and sticky_l2 and ph_hdef['levels']['level2']['values'].get(ph_l1):
ph_l2 = sticky_l2
provisional.append({
'text': block,
'hdef': ph_hdef,
'hname': ph_hname,
'dtype': ph_dtype,
'l1': ph_l1,
'l2': ph_l2,
'ai': ai_used
})
# Phase 2: merge adjacent blocks with same labels within size limit
merged_texts: List[str] = []
merged_meta: List[Dict[str, Any]] = []
if provisional:
current_text = provisional[0]['text']
current_meta = provisional[0]
for p in provisional[1:]:
same = (p['hname'] == current_meta['hname'] and p['l1'] == current_meta['l1'] and p['l2'] == current_meta['l2'])
candidate = current_text + "\n\n" + p['text'] if same else current_text
if same and self.text_processor.count_tokens(candidate) <= self.text_processor.count_tokens(current_text) + self.chunk_size:
current_text = candidate
current_meta['ai'] = current_meta['ai'] or p['ai']
else:
merged_texts.append(current_text)
merged_meta.append(current_meta)
current_text = p['text']
current_meta = p
merged_texts.append(current_text)
merged_meta.append(current_meta)
# Phase 3: finalize chunks
chunks: List[Chunk] = []
for text_block, meta in zip(merged_texts, merged_meta):
final_md = self._generate_metadata(
file_path=file_path,
hierarchy_def=meta['hdef'],
doc_type=meta['dtype'],
language=language,
content=text_block
)
if meta['hname']:
final_md['hierarchy'] = meta['hname']
final_md['ai_detected'] = meta['ai']
chunks.append(Chunk(
doc_id=generate_id(),
chunk_id=generate_id(),
content=text_block,
metadata=final_md
))
return chunks
def _generate_metadata(self, file_path: str, hierarchy_def: Dict[str, Any],
doc_type: str, language: str, content: str) -> Dict[str, Any]:
"""Generate hierarchical metadata for chunk"""
# Simple rule-based classification with explicit label override
content_lower = content.lower()
# 1) Try to honor explicit labels like "Domain:", "Section:", "Topic:"
import re
explicit_l1 = explicit_l2 = explicit_l3 = None
m1 = re.search(r"^\s*domain\s*:\s*(.+)$", content_lower, flags=re.MULTILINE)
m2 = re.search(r"^\s*section\s*:\s*(.+)$", content_lower, flags=re.MULTILINE)
m3 = re.search(r"^\s*topic\s*:\s*(.+)$", content_lower, flags=re.MULTILINE)
if m1:
explicit_l1 = m1.group(1).strip()
if m2:
explicit_l2 = m2.group(1).strip()
if m3:
explicit_l3 = m3.group(1).strip()
def _best_match(name: str, candidates: list[str]) -> str:
name_l = name.lower()
# exact contains
for c in candidates:
if c.lower() == name_l or name_l in c.lower() or c.lower() in name_l:
return c
# fallback: first candidate
return candidates[0] if candidates else "General"
if explicit_l1:
level1 = _best_match(explicit_l1, hierarchy_def['levels']['level1']['values'])
else:
level1 = self._classify_level1(content_lower, hierarchy_def)
if explicit_l2:
level2_candidates = hierarchy_def['levels']['level2']['values'].get(level1, [])
level2 = _best_match(explicit_l2, level2_candidates)
else:
level2 = self._classify_level2(content_lower, hierarchy_def, level1)
if explicit_l3:
level3_candidates = hierarchy_def['levels']['level3']['values'].get(level2, [])
level3 = _best_match(explicit_l3, level3_candidates)
else:
level3 = self._classify_level3(content_lower, hierarchy_def, level1, level2)
# Fallback mapping to 'Other' when nothing matches this hierarchy
def _any_present(values: list[str]) -> bool:
return any(v.lower() in content_lower for v in values)
# If no level1 value appears, set to 'Other'
if not _any_present(hierarchy_def['levels']['level1']['values']):
level1 = 'Other'
# If level2 options for chosen level1 exist but none appear, set to 'Other'
l2_opts = hierarchy_def['levels']['level2']['values'].get(level1, [])
if l2_opts and not _any_present(l2_opts):
level2 = 'Other'
# If level3 options for chosen level2 exist but none appear, set to 'Other'
l3_opts = hierarchy_def['levels']['level3']['values'].get(level2, [])
if l3_opts and not _any_present(l3_opts):
level3 = 'Other'
return {
"source_name": os.path.basename(file_path),
"lang": language,
"level1": level1,
"level2": level2,
"level3": level3,
"doc_type": doc_type,
"chunk_size": len(content),
"token_count": self.text_processor.count_tokens(content)
}
def _classify_level1(self, content: str, hierarchy_def: Dict[str, Any]) -> str:
"""Classify level1 domain"""
level1_options = hierarchy_def['levels']['level1']['values']
# Simple keyword matching (enhance with ML model)
keyword_scores = {}
for domain in level1_options:
score = 0
# Add domain-specific keyword matching logic
if domain.lower() in content:
score += 1
keyword_scores[domain] = score
return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level1_options[0]
def _classify_level2(self, content: str, hierarchy_def: Dict[str, Any], level1: str) -> str:
"""Classify level2 section"""
level2_options = hierarchy_def['levels']['level2']['values'].get(level1, [])
if not level2_options:
return "General"
keyword_scores = {}
for section in level2_options:
score = 0
if section.lower() in content:
score += 1
keyword_scores[section] = score
return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level2_options[0]
def _classify_level3(self, content: str, hierarchy_def: Dict[str, Any],
level1: str, level2: str) -> str:
"""Classify level3 topic"""
level3_options = hierarchy_def['levels']['level3']['values'].get(level2, [])
if not level3_options:
return "General"
keyword_scores = {}
for topic in level3_options:
score = 0
if topic.lower() in content:
score += 1
keyword_scores[topic] = score
return max(keyword_scores.items(), key=lambda x: x[1])[0] if keyword_scores else level3_options[0]