Spaces:
Sleeping
Sleeping
| """Tests for multi-corpus construction at app startup.""" | |
| import pytest | |
| from agent_bench.core.config import ( | |
| AppConfig, | |
| CorpusConfig, | |
| EmbeddingConfig, | |
| ProviderConfig, | |
| RAGConfig, | |
| ) | |
| from agent_bench.serving.app import create_app | |
| def multi_corpus_config(tmp_path): | |
| """Config with two corpora pointing at empty store paths.""" | |
| # Neither store exists on disk, so create_app falls back to empty stores | |
| return AppConfig( | |
| provider=ProviderConfig(default="mock"), | |
| rag=RAGConfig(store_path=str(tmp_path / "store_default")), | |
| embedding=EmbeddingConfig(cache_dir=str(tmp_path / "emb_cache")), | |
| corpora={ | |
| "fastapi": CorpusConfig( | |
| label="FastAPI Docs", | |
| store_path=str(tmp_path / "store_fastapi"), | |
| data_path="data/tech_docs", | |
| refusal_threshold=0.35, | |
| ), | |
| "k8s": CorpusConfig( | |
| label="Kubernetes", | |
| store_path=str(tmp_path / "store_k8s"), | |
| data_path="data/k8s_docs", | |
| refusal_threshold=0.30, | |
| ), | |
| }, | |
| default_corpus="fastapi", | |
| ) | |
| def test_corpus_map_keys_match_config(multi_corpus_config): | |
| """app.state.corpus_map is keyed by corpus names.""" | |
| app = create_app(multi_corpus_config) | |
| assert set(app.state.corpus_map.keys()) == {"fastapi", "k8s"} | |
| def test_corpus_map_inner_dict_keyed_by_provider(multi_corpus_config): | |
| """Each corpus entry is a dict keyed by provider name (nested composition).""" | |
| app = create_app(multi_corpus_config) | |
| # Mock provider is the only one registered (no API keys set) | |
| for corpus_name in ("fastapi", "k8s"): | |
| inner = app.state.corpus_map[corpus_name] | |
| assert isinstance(inner, dict) | |
| assert "mock" in inner | |
| # Every inner dict has the same provider keys | |
| assert set(inner.keys()) == set(app.state.corpus_map["fastapi"].keys()) | |
| def test_default_orchestrator_points_at_default_corpus_and_provider(multi_corpus_config): | |
| """app.state.orchestrator == corpus_map[default_corpus][default_provider].""" | |
| app = create_app(multi_corpus_config) | |
| assert ( | |
| app.state.orchestrator | |
| is app.state.corpus_map["fastapi"]["mock"] | |
| ) | |
| def test_legacy_mode_has_empty_corpus_map(): | |
| """If config.corpora is empty, corpus_map is empty too.""" | |
| config = AppConfig(provider=ProviderConfig(default="mock")) | |
| app = create_app(config) | |
| assert app.state.corpus_map == {} | |
| # Legacy orchestrator still attached | |
| assert app.state.orchestrator is not None | |
| def test_default_corpus_not_in_corpora_raises(): | |
| """Pydantic validator rejects default_corpus not in corpora.""" | |
| from pydantic import ValidationError | |
| with pytest.raises(ValidationError, match="default_corpus"): | |
| AppConfig( | |
| corpora={ | |
| "fastapi": CorpusConfig( | |
| label="FastAPI Docs", | |
| store_path=".cache/store", | |
| data_path="data/tech_docs", | |
| ), | |
| }, | |
| default_corpus="kubernetes", # typo — should be "fastapi" | |
| ) | |
| def test_legacy_rag_refusal_threshold_preserved_when_no_corpora(tmp_path): | |
| """In legacy mode, rag.refusal_threshold drives the SearchTool.""" | |
| from agent_bench.core.config import RAGConfig | |
| config = AppConfig( | |
| provider=ProviderConfig(default="mock"), | |
| rag=RAGConfig( | |
| store_path=str(tmp_path / "store"), | |
| refusal_threshold=0.42, | |
| ), | |
| embedding=EmbeddingConfig(cache_dir=str(tmp_path / "emb")), | |
| ) | |
| app = create_app(config) | |
| # No corpora → empty corpus_map → legacy store attached | |
| assert app.state.corpus_map == {} | |
| # Legacy orchestrator's registry has the SearchTool built with the | |
| # legacy refusal_threshold (we reach into the tool registry to verify). | |
| search_tool = app.state.orchestrator.registry.get("search_documents") | |
| assert search_tool is not None | |
| assert search_tool.refusal_threshold == 0.42 | |
| def test_only_one_store_built_per_corpus(multi_corpus_config, monkeypatch): | |
| """In multi-corpus mode, the legacy single-store path is skipped. | |
| Counts HybridStore constructions: should equal len(config.corpora), not | |
| len(config.corpora) + 1 (the +1 being the now-deleted legacy store). | |
| """ | |
| from agent_bench.rag import store as store_mod | |
| constructed: list = [] | |
| orig_init = store_mod.HybridStore.__init__ | |
| def tracking_init(self, *args, **kwargs): | |
| constructed.append(self) | |
| return orig_init(self, *args, **kwargs) | |
| monkeypatch.setattr(store_mod.HybridStore, "__init__", tracking_init) | |
| create_app(multi_corpus_config) | |
| # Exactly 2 stores (one per corpus). The legacy store is not built. | |
| assert len(constructed) == len(multi_corpus_config.corpora) | |
| def test_corpus_map_has_all_providers(multi_corpus_config, monkeypatch): | |
| """With two providers available, each corpus inner dict has both. | |
| Verifies the structural invariant that every corpus exposes the same | |
| set of provider keys — the contract that Task 3's routing depends on. | |
| """ | |
| from agent_bench.core import provider as provider_mod | |
| from agent_bench.core.provider import MockProvider | |
| class FakeOpenAI(MockProvider): | |
| pass | |
| monkeypatch.setattr(provider_mod, "OpenAIProvider", lambda _cfg: FakeOpenAI()) | |
| monkeypatch.setenv("OPENAI_API_KEY", "test-key") | |
| app = create_app(multi_corpus_config) | |
| expected_providers = {"mock", "openai"} | |
| for corpus_name in ("fastapi", "k8s"): | |
| inner = app.state.corpus_map[corpus_name] | |
| assert set(inner.keys()) == expected_providers | |
| # Structural invariant: every corpus has identical provider keys | |
| key_sets = [set(v.keys()) for v in app.state.corpus_map.values()] | |
| assert all(ks == key_sets[0] for ks in key_sets) | |
| # Provider orchestrators within a corpus are distinct instances | |
| assert ( | |
| app.state.corpus_map["fastapi"]["mock"] | |
| is not app.state.corpus_map["fastapi"]["openai"] | |
| ) | |
| # Same provider across corpora is also distinct (different registries) | |
| assert ( | |
| app.state.corpus_map["fastapi"]["mock"] | |
| is not app.state.corpus_map["k8s"]["mock"] | |
| ) | |
| def test_unavailable_corpus_is_skipped(tmp_path): | |
| """A corpus with available=False is kept in config.corpora for | |
| schema visibility but is NOT wired into corpus_map at startup.""" | |
| config = AppConfig( | |
| provider=ProviderConfig(default="mock"), | |
| rag=RAGConfig(store_path=str(tmp_path / "store_default")), | |
| embedding=EmbeddingConfig(cache_dir=str(tmp_path / "emb_cache")), | |
| corpora={ | |
| "fastapi": CorpusConfig( | |
| label="FastAPI", | |
| store_path=str(tmp_path / "store_fastapi"), | |
| data_path="data/tech_docs", | |
| ), | |
| "k8s": CorpusConfig( | |
| label="Kubernetes", | |
| store_path=str(tmp_path / "store_k8s"), | |
| data_path="data/k8s_docs", | |
| available=False, | |
| ), | |
| }, | |
| default_corpus="fastapi", | |
| ) | |
| app = create_app(config) | |
| # Only fastapi wired in corpus_map | |
| assert set(app.state.corpus_map.keys()) == {"fastapi"} | |
| # But k8s is still in config.corpora for dashboard/introspection | |
| assert "k8s" in config.corpora | |
| assert config.corpora["k8s"].available is False | |
| async def test_unavailable_k8s_corpus_returns_400_at_request_time(tmp_path): | |
| """End-to-end: request for the unavailable corpus gets 400.""" | |
| from httpx import ASGITransport, AsyncClient | |
| config = AppConfig( | |
| provider=ProviderConfig(default="mock"), | |
| rag=RAGConfig(store_path=str(tmp_path / "store_default")), | |
| embedding=EmbeddingConfig(cache_dir=str(tmp_path / "emb_cache")), | |
| corpora={ | |
| "fastapi": CorpusConfig( | |
| label="FastAPI", | |
| store_path=str(tmp_path / "store_fastapi"), | |
| data_path="data/tech_docs", | |
| ), | |
| "k8s": CorpusConfig( | |
| label="Kubernetes", | |
| store_path=str(tmp_path / "store_k8s"), | |
| data_path="data/k8s_docs", | |
| available=False, | |
| ), | |
| }, | |
| default_corpus="fastapi", | |
| ) | |
| app = create_app(config) | |
| async with AsyncClient( | |
| transport=ASGITransport(app=app), base_url="http://test", | |
| ) as client: | |
| resp = await client.post( | |
| "/ask", json={"question": "hi", "corpus": "k8s"}, | |
| ) | |
| assert resp.status_code == 400 | |
| assert "k8s" in resp.json()["detail"] | |