File size: 16,324 Bytes
f5754e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
import asyncio
import json
import os
import time
import httpx
from typing import List, Dict, Any, Literal
from pydantic import BaseModel
import re
import string

def normalize_answer(s):
    """Lower text and remove punctuation, articles and extra whitespace."""
    def remove_articles(text):
        return re.sub(r'\b(a|an|the)\b', ' ', text)
    def white_space_fix(text):
        return ' '.join(text.split())
    def remove_punc(text):
        exclude = set(string.punctuation)
        return ''.join(ch for ch in text if ch not in exclude)
    def lower(text):
        return text.lower()
    return white_space_fix(remove_articles(remove_punc(lower(s))))

def exact_match(prediction, ground_truth):
    return normalize_answer(prediction) == normalize_answer(ground_truth)

def token_f1(prediction, ground_truth):
    prediction_tokens = normalize_answer(prediction).split()
    ground_truth_tokens = normalize_answer(ground_truth).split()
    common = set(prediction_tokens) & set(ground_truth_tokens)
    num_same = len(common)
    if num_same == 0:
        return 0.0
    precision = 1.0 * num_same / len(prediction_tokens)
    recall = 1.0 * num_same / len(ground_truth_tokens)
    f1 = (2 * precision * recall) / (precision + recall)
    return f1

# Mock datasets for benchmarking fallback
HOTPOT_QA_SAMPLE = [
    {
        "question": "What is the capital of the country where the city of Lyon is located?",
        "ground_truth": "Paris",
        "context": "Lyon is a city in France. The capital of France is Paris.",
        "type": "multi-hop"
    },
    {
        "question": "Which company acquired the startup that developed the Siri virtual assistant?",
        "ground_truth": "Apple",
        "context": "Siri was originally developed by Siri Inc. Apple acquired Siri Inc. in 2010.",
        "type": "multi-hop"
    }
]

class BenchmarkConfig(BaseModel):
    base_url: str = "http://localhost:7860"
    modes: List[str] = ["naive", "hybrid", "hippo", "global_community"]
    dataset: str = "hotpot_qa"
    num_samples: int = 10
    # P1 fix: benchmark_mode controls ingestion strategy
    # "corpus"      β€” ingest ALL contexts first, then evaluate all questions (default)
    # "per_example" β€” per question: ingest context, query, then continue
    benchmark_mode: Literal["corpus", "per_example"] = "corpus"
    # P1 fix: allow overriding admin credentials via environment variables
    admin_user: str = os.environ.get("BENCHMARK_ADMIN_USER", "admin")
    admin_password: str = os.environ.get("BENCHMARK_ADMIN_PASSWORD", "admin")

def load_hf_dataset(config: BenchmarkConfig) -> List[Dict[str, str]]:
    try:
        from datasets import load_dataset  # type: ignore
        print(f"Loading {config.dataset} from Hugging Face datasets...")
        
        if config.dataset == "hotpot_qa":
            ds = load_dataset("hotpot_qa", "distractor", split="validation", streaming=True)
        elif config.dataset == "musique":
            ds = load_dataset("bdsaglam/musique", split="validation", streaming=True)
        else:
            ds = load_dataset(config.dataset, split="validation", streaming=True)
            
        samples = []
        for item in ds:
            if len(samples) >= config.num_samples:
                break
                
            # Extract context text
            context_text = ""
            if "context" in item:
                # HotpotQA format: lists of titles and sentences
                if isinstance(item["context"], dict) and "sentences" in item["context"]:
                    for sentences in item["context"]["sentences"]:
                        context_text += " ".join(sentences) + " "
                else:
                    context_text = str(item["context"])
                    
            samples.append({
                "question": item.get("question", ""),
                "ground_truth": item.get("answer", ""),
                "context": context_text,
                "type": item.get("level", "unknown")
            })
        return samples
    except ImportError:
        print("HF 'datasets' library not installed. Falling back to mock dataset.")
        return HOTPOT_QA_SAMPLE
    except Exception as e:
        print(f"Failed to load dataset from HF: {e}. Falling back to mock dataset.")
        return HOTPOT_QA_SAMPLE

async def create_benchmark_tenant(client: httpx.AsyncClient, config: BenchmarkConfig):
    """Returns (token, tenant_id) for the isolated benchmark tenant."""
    timestamp = int(time.time())
    username = f"benchmark_user_{timestamp}"
    tenant_id = f"benchmark_run_{timestamp}"
    password = "password123"

    register_url = f"{config.base_url}/api/auth/register"
    payload = {
        "username": username,
        "password": password,
        "email": f"{username}@example.com",
        "full_name": "Benchmark User",
        "scopes": ["read", "write"],
        "tenant_id": tenant_id
    }

    try:
        res = await client.post(register_url, json=payload, timeout=10.0)
        res.raise_for_status()

        login_url = f"{config.base_url}/api/auth/login"
        login_payload = {"username": username, "password": password}
        login_res = await client.post(login_url, json=login_payload, timeout=10.0)
        login_res.raise_for_status()
        token = login_res.json()["access_token"]
        print(f"  Created isolated tenant: {tenant_id}")
        return token, tenant_id
    except Exception as e:
        print(f"  Failed to create isolated tenant: {e}. Falling back to default admin.")
        token = await authenticate(client, config)
        return token, "admin"

async def authenticate(client: httpx.AsyncClient, config: BenchmarkConfig) -> str:
    """Authenticate as admin using env-var overrideable credentials."""
    login_url = f"{config.base_url}/api/auth/login"
    try:
        response = await client.post(
            login_url,
            json={"username": config.admin_user, "password": config.admin_password},
            timeout=10.0
        )
        response.raise_for_status()
        return response.json().get("access_token", "")
    except Exception as e:
        print(f"Failed to authenticate with backend ({config.admin_user}): {e}. "
              f"Set BENCHMARK_ADMIN_USER / BENCHMARK_ADMIN_PASSWORD env vars if needed.")
        return "test-token"

async def ingest_context(client: httpx.AsyncClient, config: BenchmarkConfig, token: str, q: Dict[str, str]):
    if not q.get("context"):
        return
        
    update_url = f"{config.base_url}/api/graph/update"
    headers = {"Authorization": f"Bearer {token}"}
    payload = {
        "text": q["context"],
        "source_label": "benchmark_ingest"
    }
    
    try:
        response = await client.post(update_url, json=payload, headers=headers, timeout=30.0)
        response.raise_for_status()
    except Exception as e:
        print(f"    [Warning] Failed to ingest context: {e}")

async def evaluate_question(client: httpx.AsyncClient, config: BenchmarkConfig, token: str, mode: str, q: Dict[str, str]) -> Dict[str, Any]:
    query_url = f"{config.base_url}/api/query"
    payload = {
        "query": q["question"],
        "top_k": 5,
        "mode": mode,
        "streaming": False
    }
    
    start_time = time.time()
    try:
        headers = {"Authorization": f"Bearer {token}"}
        
        response = await client.post(query_url, json=payload, headers=headers, timeout=60.0)
        response.raise_for_status()
        data = response.json()
        
        duration = time.time() - start_time
        answer = data.get("answer", "")
        
        em = exact_match(answer, q["ground_truth"])
        f1 = token_f1(answer, q["ground_truth"])
        is_correct = em or f1 > 0.6  # Use relaxed F1 threshold for general correctness flag
        
        return {
            "question": q["question"],
            "mode": mode,
            "duration": duration,
            "is_correct": is_correct,
            "exact_match": em,
            "f1_score": f1,
            "generated_answer": answer,
            "confidence": data.get("confidence", 0.0),
            "sources_count": len(data.get("sources", []))
        }
    except Exception as e:
        return {
            "question": q["question"],
            "mode": mode,
            "duration": time.time() - start_time,
            "is_correct": False,
            "error": str(e)
        }

async def build_communities(client: httpx.AsyncClient, config: BenchmarkConfig, token: str):
    """Build community index once before evaluating global_community mode."""
    communities_url = f"{config.base_url}/api/graph/communities/assign"
    headers = {"Authorization": f"Bearer {token}"}
    try:
        res = await client.post(communities_url, headers=headers, timeout=120.0)
        res.raise_for_status()
        print(f"  Community index built: {res.json()}")
    except Exception as e:
        print(f"  [Warning] Community indexing failed: {e}")

async def cleanup_benchmark_tenant(
    client: httpx.AsyncClient,
    config: BenchmarkConfig,
    benchmark_token: str,
    tenant_id: str
):
    """
    Delete all graph data for the benchmark tenant.

    P1 fix: First tries self-cleanup with the benchmark user's own token
    (purge-tenant now allows users to delete their own tenant). Falls back to
    an admin-authenticated token only if self-cleanup is rejected (403).
    Admin credentials are configurable via env vars BENCHMARK_ADMIN_USER /
    BENCHMARK_ADMIN_PASSWORD, removing the hardcoded admin/admin dependency.
    """
    cleanup_url = f"{config.base_url}/api/graph/purge-tenant"
    payload = {"tenant_id": tenant_id}

    # --- Attempt 1: self-cleanup with benchmark user token ---
    try:
        res = await client.request(
            "DELETE",
            cleanup_url,
            headers={"Authorization": f"Bearer {benchmark_token}"},
            json=payload,
            timeout=30.0
        )
        if res.status_code == 200:
            print(f"  Benchmark tenant {tenant_id} self-cleaned up.")
            return
        elif res.status_code == 403:
            print(f"  Self-cleanup not permitted; falling back to admin cleanup.")
        else:
            print(f"  [Warning] Self-cleanup returned {res.status_code}; trying admin.")
    except Exception as e:
        print(f"  [Warning] Self-cleanup request failed: {e}; trying admin.")

    # --- Attempt 2: admin-authenticated cleanup ---
    async with httpx.AsyncClient() as admin_client:
        try:
            admin_token = await authenticate(admin_client, config)
            res = await admin_client.request(
                "DELETE",
                cleanup_url,
                headers={"Authorization": f"Bearer {admin_token}"},
                json=payload,
                timeout=30.0
            )
            res.raise_for_status()
            print(f"  Benchmark tenant {tenant_id} admin-cleaned up.")
        except Exception as e:
            print(f"  [Warning] Admin cleanup failed for tenant {tenant_id}: {e}")
            print(f"  Set BENCHMARK_ADMIN_USER / BENCHMARK_ADMIN_PASSWORD env vars if admin credentials differ from defaults.")

async def run_benchmark():
    config = BenchmarkConfig()
    dataset = load_hf_dataset(config)
    
    print(f"Starting benchmark on {config.dataset} with {len(dataset)} questions...")
    print(f"Modes to test: {config.modes}")
    print(f"Benchmark mode: {config.benchmark_mode}")
    if config.benchmark_mode == "corpus":
        print("  [corpus mode] All contexts ingested first, then all questions evaluated.")
    else:
        print("  [per_example mode] Each question ingests its own context before evaluation.")
    
    results = []
    
    async with httpx.AsyncClient() as client:
        print("\nCreating isolated benchmark tenant...")
        token, benchmark_tenant_id = await create_benchmark_tenant(client, config)

        has_community_mode = any(m in config.modes for m in ["global_community", "hippo"])

        if config.benchmark_mode == "corpus":
            # ── Corpus mode: ingest all, build communities, then evaluate all ──
            print("\nIngesting all context documents...")
            for i, q in enumerate(dataset):
                print(f"  Ingesting context {i+1}/{len(dataset)}...")
                await ingest_context(client, config, token, q)

            if has_community_mode:
                print("\nBuilding community index (required for global_community mode)...")
                await asyncio.sleep(2)  # Allow Neo4j to settle
                await build_communities(client, config, token)

            for i, q in enumerate(dataset):
                print(f"\nEvaluating Question {i+1}/{len(dataset)}: {q['question']}")
                for mode in config.modes:
                    print(f"  Running mode: {mode}...")
                    res = await evaluate_question(client, config, token, mode, q)
                    results.append(res)
                    status_label = "PASS" if res.get("is_correct") else "FAIL"
                    f1 = res.get("f1_score", 0.0)
                    print(f"    [{status_label}] F1: {f1:.2f} | Time: {res['duration']:.2f}s | Sources: {res.get('sources_count', 0)}")

        else:
            # ── Per-example mode: ingest β†’ query β†’ continue for each question ──
            for i, q in enumerate(dataset):
                print(f"\nQuestion {i+1}/{len(dataset)}: {q['question']}")
                print("  Ingesting context...")
                await ingest_context(client, config, token, q)

                # Minimal settle time for per-example (graph writes are async)
                await asyncio.sleep(1)

                for mode in config.modes:
                    print(f"  Running mode: {mode}...")
                    res = await evaluate_question(client, config, token, mode, q)
                    results.append(res)
                    status_label = "PASS" if res.get("is_correct") else "FAIL"
                    f1 = res.get("f1_score", 0.0)
                    print(f"    [{status_label}] F1: {f1:.2f} | Time: {res['duration']:.2f}s | Sources: {res.get('sources_count', 0)}")
                    
                # P1 fix: true per-example isolation - purge ingested context after question evaluation
                print("  Cleaning up isolated context...")
                await cleanup_benchmark_tenant(client, config, token, benchmark_tenant_id)

    summary = {}
    for r in results:
        m = r["mode"]
        if m not in summary:
            summary[m] = {"correct": 0, "total": 0, "time": 0.0}
        
        summary[m]["total"] += 1
        if r.get("is_correct"):
            summary[m]["correct"] += 1
        summary[m]["time"] += r["duration"]
        summary[m]["f1"] = summary[m].get("f1", 0.0) + r.get("f1_score", 0.0)
        
    print("\n=== BENCHMARK RESULTS ===")
    print(f"Benchmark mode: {config.benchmark_mode}")
    for m, stats in summary.items():
        accuracy = (stats["correct"] / stats["total"]) * 100 if stats["total"] > 0 else 0
        avg_time = stats["time"] / stats["total"] if stats["total"] > 0 else 0
        avg_f1 = stats["f1"] / stats["total"] if stats["total"] > 0 else 0
        print(f"Mode: {m:<15} | Accuracy: {accuracy:>5.1f}% | Avg F1: {avg_f1:.3f} | Avg Time: {avg_time:>5.2f}s")

    # P1 fix: try self-cleanup first, then admin fallback
    if benchmark_tenant_id != "admin":
        async with httpx.AsyncClient() as cleanup_client:
            # Re-authenticate the benchmark user for self-cleanup
            try:
                login_res = await cleanup_client.post(
                    f"{config.base_url}/api/auth/login",
                    json={"username": f"benchmark_user_{benchmark_tenant_id.split('_')[-1]}", "password": "password123"},
                    timeout=10.0
                )
                if login_res.status_code == 200:
                    fresh_token = login_res.json().get("access_token", token)
                else:
                    fresh_token = token
            except Exception:
                fresh_token = token

            await cleanup_benchmark_tenant(cleanup_client, config, fresh_token, benchmark_tenant_id)

if __name__ == "__main__":
    asyncio.run(run_benchmark())