File size: 4,353 Bytes
f23deb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import json
import sys
import warnings
from typing import Dict
from pathlib import Path
import numpy as np

from server.constants import _CORPORA_DIR

_corpus_cache: Dict[str, Dict] = {}

def _load_corpus(domain: str) -> Dict:
    """Load and cache corpus data for a domain. Falls back to synthetic data."""
    if domain in _corpus_cache:
        return _corpus_cache[domain]

    domain_dir = _CORPORA_DIR / domain
    try:
        chunks = json.loads((domain_dir / "chunks.json").read_text())
        queries = json.loads((domain_dir / "queries.json").read_text())
        ground_truth = json.loads((domain_dir / "ground_truth.json").read_text())
        corpus_stats = json.loads((domain_dir / "corpus_stats.json").read_text())

        s_true: Dict[str, np.ndarray] = {}
        for model_name in ["general", "medical", "legal", "code"]:
            path = domain_dir / f"S_true_{model_name}.npy"
            if path.exists():
                s_true[model_name] = np.load(path, mmap_mode="r").astype(np.float32)

        data = {
            "chunks": chunks,
            "queries": queries,
            "ground_truth": ground_truth,
            "corpus_stats": corpus_stats,
            "s_true": s_true,
        }
    except Exception as exc:
        _REQUIRED_FILES = ["chunks.json", "queries.json", "ground_truth.json", "corpus_stats.json"]
        missing = [f for f in _REQUIRED_FILES if not (domain_dir / f).exists()]
        msg = (
            f"\n{'!'*60}\n"
            f"[RAGDebugEnv] WARNING: Real corpus unavailable for domain '{domain}'.\n"
            f"  Reason : {exc}\n"
            + (f"  Missing : {', '.join(missing)}\n" if missing else "")
            + f"  Fix    : run `python -m corpora.build_corpus --domain {domain}`\n"
            f"  Action : falling back to SYNTHETIC corpus — do NOT use for training.\n"
            f"{'!'*60}\n"
        )
        print(msg, file=sys.stderr, flush=True)
        warnings.warn(msg, RuntimeWarning, stacklevel=2)
        data = _make_synthetic_corpus(domain)

    _corpus_cache[domain] = data
    return data

def _make_synthetic_corpus(domain: str) -> Dict:
    """Generate a minimal synthetic corpus for smoke-testing without real files."""
    rng = np.random.default_rng(0)
    n_chunks = 50
    n_queries = 10

    chunks = [
        {
            "chunk_id": i,
            "text": f"Synthetic chunk {i} for domain {domain}.",
            "n_tokens": 100,
            "source_doc": f"doc_{i // 5}",
            "domain": domain,
        }
        for i in range(n_chunks)
    ]
    queries = [
        {
            "query_id": i,
            "text": f"Synthetic query {i}?",
            "type": "direct",
            "seed_chunk_id": i * 5,
            "is_multi_hop": False,
            "domain": domain,
            "difficulty": "easy",
        }
        for i in range(n_queries)
    ]
    if domain == "medical":
        # Add a few multi-hop queries
        for j in range(3):
            queries.append(
                {
                    "query_id": n_queries + j,
                    "text": f"Synthetic multi-hop query {j}?",
                    "type": "multi_hop",
                    "seed_chunk_ids": [j * 4, j * 4 + 2],
                    "is_multi_hop": True,
                    "domain": domain,
                    "difficulty": "hard",
                }
            )

    ground_truth = {
        str(q["query_id"]): (
            q.get("seed_chunk_ids") or [q["seed_chunk_id"]]
        )
        for q in queries
    }

    s_true_general = rng.uniform(0.2, 0.6, (len(queries), n_chunks)).astype(np.float32)
    # Spike the actual relevant chunks
    for q in queries:
        qidx = q["query_id"]
        for cid in ground_truth[str(qidx)]:
            s_true_general[qidx, cid] = rng.uniform(0.75, 0.95)

    corpus_stats = {
        "domain": domain,
        "n_documents": 10,
        "n_chunks": n_chunks,
        "avg_chunk_tokens": 100,
        "has_near_duplicates": False,
        "n_queries": len(queries),
        "n_multi_hop_queries": sum(1 for q in queries if q.get("is_multi_hop")),
    }

    return {
        "chunks": chunks,
        "queries": queries,
        "ground_truth": ground_truth,
        "corpus_stats": corpus_stats,
        "s_true": {m: s_true_general.copy() for m in ["general", "medical", "legal", "code"]},
    }