Spaces:
Sleeping
Sleeping
File size: 4,703 Bytes
c54dcef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | """Utility functions for RAG system."""
import re
import hashlib
from typing import List, Dict, Any, Optional
import json # Changed from yaml
from pathlib import Path
def load_hierarchy(hierarchy_name: str) -> Dict[str, Any]:
"""
Load hierarchy definition from JSON file.
Args:
hierarchy_name: Name of the hierarchy (hospital, bank, fluid_simulation)
Returns:
Dictionary containing hierarchy definition
"""
# Changed from .yaml to .json
hierarchy_path = Path(__file__).parent.parent / "hierarchies" / f"{hierarchy_name}.json"
if not hierarchy_path.exists():
raise FileNotFoundError(f"Hierarchy file not found: {hierarchy_path}")
with open(hierarchy_path, 'r', encoding='utf-8') as f:
return json.load(f) # Changed from yaml.safe_load
# Rest of the functions remain the same...
def generate_doc_id(content: str) -> str:
"""
Generate unique document ID from content.
Args:
content: Document content
Returns:
Hexadecimal hash string
"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def generate_chunk_id(doc_id: str, chunk_index: int) -> str:
"""
Generate unique chunk ID.
Args:
doc_id: Parent document ID
chunk_index: Index of chunk within document
Returns:
Formatted chunk ID string
"""
return f"{doc_id}_chunk_{chunk_index}"
def mask_pii(text: str) -> str:
"""
Basic PII masking for sensitive data.
Args:
text: Input text potentially containing PII
Returns:
Text with masked PII
"""
# Email addresses
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)
# Phone numbers (simple pattern)
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
# SSN pattern
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
return text
def detect_language(text: str) -> str:
"""
Simple language detection (English vs Japanese).
Args:
text: Input text
Returns:
Language code ('en' or 'ja')
"""
# Count Japanese characters
japanese_chars = len(re.findall(r'[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]', text))
# If more than 10% Japanese characters, classify as Japanese
if len(text) > 0 and japanese_chars / len(text) > 0.1:
return "ja"
return "en"
def chunk_by_tokens(text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]:
"""
Split text into chunks by approximate token count.
Args:
text: Input text to chunk
chunk_size: Target chunk size in tokens (approximate)
overlap: Number of overlapping tokens between chunks
Returns:
List of text chunks
"""
# Approximate: 1 token ≈ 4 characters
chars_per_chunk = chunk_size * 4
overlap_chars = overlap * 4
chunks = []
start = 0
while start < len(text):
end = start + chars_per_chunk
chunk = text[start:end]
# Try to break at sentence boundary
if end < len(text):
last_period = chunk.rfind('.')
last_newline = chunk.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > chars_per_chunk * 0.5: # Only if we're past halfway
chunk = chunk[:break_point + 1]
end = start + break_point + 1
chunks.append(chunk.strip())
start = end - overlap_chars
return [c for c in chunks if c] # Remove empty chunks
def save_json(data: Any, filepath: str) -> None:
"""
Save data to JSON file.
Args:
data: Data to save
filepath: Output file path
"""
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def load_json(filepath: str) -> Any:
"""
Load data from JSON file.
Args:
filepath: Input file path
Returns:
Loaded data
"""
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
def format_metadata(metadata: Dict[str, Any]) -> str:
"""
Format metadata dictionary for display.
Args:
metadata: Metadata dictionary
Returns:
Formatted string representation
"""
lines = []
for key, value in metadata.items():
if key not in ['embedding', 'text']: # Skip large fields
lines.append(f"{key}: {value}")
return "\n".join(lines) |