from pathlib import Path import re # Clean BOM for path in Path("app").rglob("*.py"): text = path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") path.write_text(text, encoding="utf-8") Path("app/product").mkdir(parents=True, exist_ok=True) # ----------------------------------------------------- # 1. Create source_viewer.py # ----------------------------------------------------- Path("app/product/source_viewer.py").write_text(r""" import json import csv from pathlib import Path from typing import Dict, Any, List, Optional from fastapi import HTTPException from fastapi.responses import HTMLResponse from app.core.config import settings def safe_str(value) -> str: if value is None: return "" return str(value) def html_escape(value: str) -> str: return ( safe_str(value) .replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) ) def normalize(value) -> str: return safe_str(value).strip().lower() def get_processed_document_dir(document_id: str) -> Path: return Path(settings.PROCESSED_DIR) / document_id def load_json_file(path: Path): try: return json.loads(path.read_text(encoding="utf-8")) except Exception: try: return json.loads(path.read_text(encoding="utf-8-sig")) except Exception: return None def load_jsonl_file(path: Path) -> List[Dict[str, Any]]: rows = [] try: lines = path.read_text(encoding="utf-8").splitlines() except Exception: try: lines = path.read_text(encoding="utf-8-sig").splitlines() except Exception: return rows for line in lines: line = line.strip() if not line: continue try: item = json.loads(line) if isinstance(item, dict): rows.append(item) except Exception: pass return rows def load_csv_file(path: Path) -> List[Dict[str, Any]]: rows = [] for enc in ["utf-8", "utf-8-sig"]: try: with path.open("r", encoding=enc, newline="") as f: reader = csv.DictReader(f) for row in reader: rows.append(dict(row)) return rows except Exception: rows = [] return rows def flatten_json_records(data) -> List[Dict[str, Any]]: records = [] if isinstance(data, dict): for key in ["chunks", "results", "pages", "items", "documents", "data"]: if isinstance(data.get(key), list): for item in data[key]: if isinstance(item, dict): records.append(item) if not records: records.append(data) elif isinstance(data, list): for item in data: if isinstance(item, dict): records.append(item) return records def collect_candidate_records(document_id: str) -> List[Dict[str, Any]]: doc_dir = get_processed_document_dir(document_id) processed_dir = Path(settings.PROCESSED_DIR) roots = [] if doc_dir.exists(): roots.append(doc_dir) if processed_dir.exists(): roots.append(processed_dir) records = [] seen_files = set() for root in roots: for path in root.rglob("*"): if not path.is_file(): continue if path in seen_files: continue seen_files.add(path) suffix = path.suffix.lower() file_records = [] if suffix == ".json": file_records = flatten_json_records(load_json_file(path)) elif suffix == ".jsonl": file_records = load_jsonl_file(path) elif suffix == ".csv": file_records = load_csv_file(path) for record in file_records: enriched = dict(record) enriched["_source_file_path"] = str(path) records.append(enriched) return records def value_from(record: Dict[str, Any], keys: List[str], default: str = "") -> str: for key in keys: if key in record and record[key] not in [None, ""]: return safe_str(record[key]) metadata = record.get("metadata") if isinstance(metadata, dict): for key in keys: if key in metadata and metadata[key] not in [None, ""]: return safe_str(metadata[key]) return default def record_text(record: Dict[str, Any]) -> str: return value_from( record, [ "text", "content", "chunk_text", "page_text", "cleaned_text", "raw_text", "body", "preview", "text_preview", "chunk_preview" ], "" ) def record_match_score( record: Dict[str, Any], source_id: str, page: Optional[str] = None, chunk_id: Optional[str] = None ) -> int: score = 0 source_id_norm = normalize(source_id) page_norm = normalize(page) chunk_id_norm = normalize(chunk_id) candidate_source_values = [ value_from(record, ["source_id", "citation_id", "id", "source"]), value_from(record, ["chunk_id", "chunk", "chunk_index", "chunk_number"]), value_from(record, ["page_id", "page_source_id"]) ] candidate_page_values = [ value_from(record, ["page", "page_number", "page_no", "page_index"]) ] candidate_chunk_values = [ value_from(record, ["chunk_id", "chunk", "chunk_index", "chunk_number", "id"]) ] if source_id_norm: for value in candidate_source_values: value_norm = normalize(value) if value_norm == source_id_norm: score += 10 elif source_id_norm in value_norm or value_norm in source_id_norm: score += 3 if page_norm: for value in candidate_page_values: if normalize(value) == page_norm: score += 5 if chunk_id_norm: for value in candidate_chunk_values: if normalize(value) == chunk_id_norm: score += 8 if record_text(record): score += 1 return score def find_best_source_record( document_id: str, source_id: str, page: Optional[str] = None, chunk_id: Optional[str] = None ) -> Dict[str, Any]: records = collect_candidate_records(document_id) if not records: raise HTTPException( status_code=404, detail="No processed records found. Upload/index the document first." ) scored = [] for record in records: score = record_match_score( record=record, source_id=source_id, page=page, chunk_id=chunk_id ) scored.append((score, record)) scored.sort(key=lambda item: item[0], reverse=True) best_score, best_record = scored[0] if best_score <= 0: raise HTTPException( status_code=404, detail="Source record not found." ) return best_record def get_source_details( document_id: str, source_id: str, page: Optional[str] = None, chunk_id: Optional[str] = None ) -> Dict[str, Any]: record = find_best_source_record( document_id=document_id, source_id=source_id, page=page, chunk_id=chunk_id ) document_name = value_from( record, ["document_name", "source_file_name", "file_name", "filename", "document_title"], "Selected document" ) page_number = value_from( record, ["page", "page_number", "page_no", "page_index"], page or "Not available" ) resolved_chunk_id = value_from( record, ["chunk_id", "chunk", "chunk_index", "chunk_number", "id"], chunk_id or source_id ) text = record_text(record) return { "document_id": document_id, "source_id": source_id, "document_name": document_name, "page": page_number, "chunk_id": resolved_chunk_id, "text": text, "text_preview": text[:1200], "metadata": record, "source_file_path": record.get("_source_file_path") } def get_source_html( document_id: str, source_id: str, page: Optional[str] = None, chunk_id: Optional[str] = None ) -> HTMLResponse: details = get_source_details( document_id=document_id, source_id=source_id, page=page, chunk_id=chunk_id ) document_name = html_escape(details.get("document_name", "Selected document")) page_value = html_escape(details.get("page", "Not available")) chunk_value = html_escape(details.get("chunk_id", source_id)) text_value = html_escape(details.get("text", "Source text not available.")) metadata_value = html_escape(json.dumps(details.get("metadata", {}), indent=2, ensure_ascii=False)) html = f'''
{metadata_value}