Spaces:
Sleeping
Sleeping
| import json | |
| import sys | |
| import warnings | |
| from typing import Dict | |
| from pathlib import Path | |
| import numpy as np | |
| from server.constants import _CORPORA_DIR | |
| _corpus_cache: Dict[str, Dict] = {} | |
| def _load_corpus(domain: str) -> Dict: | |
| """Load and cache corpus data for a domain. Falls back to synthetic data.""" | |
| if domain in _corpus_cache: | |
| return _corpus_cache[domain] | |
| domain_dir = _CORPORA_DIR / domain | |
| try: | |
| chunks = json.loads((domain_dir / "chunks.json").read_text()) | |
| queries = json.loads((domain_dir / "queries.json").read_text()) | |
| ground_truth = json.loads((domain_dir / "ground_truth.json").read_text()) | |
| corpus_stats = json.loads((domain_dir / "corpus_stats.json").read_text()) | |
| s_true: Dict[str, np.ndarray] = {} | |
| for model_name in ["general", "medical", "legal", "code"]: | |
| path = domain_dir / f"S_true_{model_name}.npy" | |
| if path.exists(): | |
| s_true[model_name] = np.load(path, mmap_mode="r").astype(np.float32) | |
| data = { | |
| "chunks": chunks, | |
| "queries": queries, | |
| "ground_truth": ground_truth, | |
| "corpus_stats": corpus_stats, | |
| "s_true": s_true, | |
| } | |
| except Exception as exc: | |
| _REQUIRED_FILES = ["chunks.json", "queries.json", "ground_truth.json", "corpus_stats.json"] | |
| missing = [f for f in _REQUIRED_FILES if not (domain_dir / f).exists()] | |
| msg = ( | |
| f"\n{'!'*60}\n" | |
| f"[RAGDebugEnv] WARNING: Real corpus unavailable for domain '{domain}'.\n" | |
| f" Reason : {exc}\n" | |
| + (f" Missing : {', '.join(missing)}\n" if missing else "") | |
| + f" Fix : run `python -m corpora.build_corpus --domain {domain}`\n" | |
| f" Action : falling back to SYNTHETIC corpus — do NOT use for training.\n" | |
| f"{'!'*60}\n" | |
| ) | |
| print(msg, file=sys.stderr, flush=True) | |
| warnings.warn(msg, RuntimeWarning, stacklevel=2) | |
| data = _make_synthetic_corpus(domain) | |
| _corpus_cache[domain] = data | |
| return data | |
| def _make_synthetic_corpus(domain: str) -> Dict: | |
| """Generate a minimal synthetic corpus for smoke-testing without real files.""" | |
| rng = np.random.default_rng(0) | |
| n_chunks = 50 | |
| n_queries = 10 | |
| chunks = [ | |
| { | |
| "chunk_id": i, | |
| "text": f"Synthetic chunk {i} for domain {domain}.", | |
| "n_tokens": 100, | |
| "source_doc": f"doc_{i // 5}", | |
| "domain": domain, | |
| } | |
| for i in range(n_chunks) | |
| ] | |
| queries = [ | |
| { | |
| "query_id": i, | |
| "text": f"Synthetic query {i}?", | |
| "type": "direct", | |
| "seed_chunk_id": i * 5, | |
| "is_multi_hop": False, | |
| "domain": domain, | |
| "difficulty": "easy", | |
| } | |
| for i in range(n_queries) | |
| ] | |
| if domain == "medical": | |
| # Add a few multi-hop queries | |
| for j in range(3): | |
| queries.append( | |
| { | |
| "query_id": n_queries + j, | |
| "text": f"Synthetic multi-hop query {j}?", | |
| "type": "multi_hop", | |
| "seed_chunk_ids": [j * 4, j * 4 + 2], | |
| "is_multi_hop": True, | |
| "domain": domain, | |
| "difficulty": "hard", | |
| } | |
| ) | |
| ground_truth = { | |
| str(q["query_id"]): ( | |
| q.get("seed_chunk_ids") or [q["seed_chunk_id"]] | |
| ) | |
| for q in queries | |
| } | |
| s_true_general = rng.uniform(0.2, 0.6, (len(queries), n_chunks)).astype(np.float32) | |
| # Spike the actual relevant chunks | |
| for q in queries: | |
| qidx = q["query_id"] | |
| for cid in ground_truth[str(qidx)]: | |
| s_true_general[qidx, cid] = rng.uniform(0.75, 0.95) | |
| corpus_stats = { | |
| "domain": domain, | |
| "n_documents": 10, | |
| "n_chunks": n_chunks, | |
| "avg_chunk_tokens": 100, | |
| "has_near_duplicates": False, | |
| "n_queries": len(queries), | |
| "n_multi_hop_queries": sum(1 for q in queries if q.get("is_multi_hop")), | |
| } | |
| return { | |
| "chunks": chunks, | |
| "queries": queries, | |
| "ground_truth": ground_truth, | |
| "corpus_stats": corpus_stats, | |
| "s_true": {m: s_true_general.copy() for m in ["general", "medical", "legal", "code"]}, | |
| } | |