Spaces:
Sleeping
Sleeping
File size: 4,353 Bytes
f23deb1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | import json
import sys
import warnings
from typing import Dict
from pathlib import Path
import numpy as np
from server.constants import _CORPORA_DIR
_corpus_cache: Dict[str, Dict] = {}
def _load_corpus(domain: str) -> Dict:
"""Load and cache corpus data for a domain. Falls back to synthetic data."""
if domain in _corpus_cache:
return _corpus_cache[domain]
domain_dir = _CORPORA_DIR / domain
try:
chunks = json.loads((domain_dir / "chunks.json").read_text())
queries = json.loads((domain_dir / "queries.json").read_text())
ground_truth = json.loads((domain_dir / "ground_truth.json").read_text())
corpus_stats = json.loads((domain_dir / "corpus_stats.json").read_text())
s_true: Dict[str, np.ndarray] = {}
for model_name in ["general", "medical", "legal", "code"]:
path = domain_dir / f"S_true_{model_name}.npy"
if path.exists():
s_true[model_name] = np.load(path, mmap_mode="r").astype(np.float32)
data = {
"chunks": chunks,
"queries": queries,
"ground_truth": ground_truth,
"corpus_stats": corpus_stats,
"s_true": s_true,
}
except Exception as exc:
_REQUIRED_FILES = ["chunks.json", "queries.json", "ground_truth.json", "corpus_stats.json"]
missing = [f for f in _REQUIRED_FILES if not (domain_dir / f).exists()]
msg = (
f"\n{'!'*60}\n"
f"[RAGDebugEnv] WARNING: Real corpus unavailable for domain '{domain}'.\n"
f" Reason : {exc}\n"
+ (f" Missing : {', '.join(missing)}\n" if missing else "")
+ f" Fix : run `python -m corpora.build_corpus --domain {domain}`\n"
f" Action : falling back to SYNTHETIC corpus — do NOT use for training.\n"
f"{'!'*60}\n"
)
print(msg, file=sys.stderr, flush=True)
warnings.warn(msg, RuntimeWarning, stacklevel=2)
data = _make_synthetic_corpus(domain)
_corpus_cache[domain] = data
return data
def _make_synthetic_corpus(domain: str) -> Dict:
"""Generate a minimal synthetic corpus for smoke-testing without real files."""
rng = np.random.default_rng(0)
n_chunks = 50
n_queries = 10
chunks = [
{
"chunk_id": i,
"text": f"Synthetic chunk {i} for domain {domain}.",
"n_tokens": 100,
"source_doc": f"doc_{i // 5}",
"domain": domain,
}
for i in range(n_chunks)
]
queries = [
{
"query_id": i,
"text": f"Synthetic query {i}?",
"type": "direct",
"seed_chunk_id": i * 5,
"is_multi_hop": False,
"domain": domain,
"difficulty": "easy",
}
for i in range(n_queries)
]
if domain == "medical":
# Add a few multi-hop queries
for j in range(3):
queries.append(
{
"query_id": n_queries + j,
"text": f"Synthetic multi-hop query {j}?",
"type": "multi_hop",
"seed_chunk_ids": [j * 4, j * 4 + 2],
"is_multi_hop": True,
"domain": domain,
"difficulty": "hard",
}
)
ground_truth = {
str(q["query_id"]): (
q.get("seed_chunk_ids") or [q["seed_chunk_id"]]
)
for q in queries
}
s_true_general = rng.uniform(0.2, 0.6, (len(queries), n_chunks)).astype(np.float32)
# Spike the actual relevant chunks
for q in queries:
qidx = q["query_id"]
for cid in ground_truth[str(qidx)]:
s_true_general[qidx, cid] = rng.uniform(0.75, 0.95)
corpus_stats = {
"domain": domain,
"n_documents": 10,
"n_chunks": n_chunks,
"avg_chunk_tokens": 100,
"has_near_duplicates": False,
"n_queries": len(queries),
"n_multi_hop_queries": sum(1 for q in queries if q.get("is_multi_hop")),
}
return {
"chunks": chunks,
"queries": queries,
"ground_truth": ground_truth,
"corpus_stats": corpus_stats,
"s_true": {m: s_true_general.copy() for m in ["general", "medical", "legal", "code"]},
}
|