| | """End-to-end smoke test: full journey with mocked services.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | import pytest |
| |
|
| | from app.services.mock_data import ( |
| | MOCK_ELIGIBILITY_LEDGERS, |
| | MOCK_PATIENT_PROFILE, |
| | MOCK_TRIAL_CANDIDATES, |
| | ) |
| | from app.services.state_manager import JOURNEY_STATES |
| | from trialpath.models import ( |
| | EligibilityLedger, |
| | PatientProfile, |
| | SearchAnchors, |
| | TrialCandidate, |
| | ) |
| |
|
| |
|
| | class TestE2EJourney: |
| | """Simulate the full 5-state journey: INGEST β PRESCREEN β VALIDATE β GAP β SUMMARY.""" |
| |
|
| | def _build_session_state(self) -> dict: |
| | """Create a minimal session state dict simulating Streamlit.""" |
| | return { |
| | "journey_state": "INGEST", |
| | "parlant_session_id": None, |
| | "parlant_agent_id": None, |
| | "parlant_session_active": False, |
| | "patient_profile": None, |
| | "uploaded_files": [], |
| | "search_anchors": None, |
| | "trial_candidates": [], |
| | "eligibility_ledger": [], |
| | "last_event_offset": 0, |
| | } |
| |
|
| | def test_full_journey_state_transitions(self): |
| | """Verify all state transitions complete in correct order.""" |
| | state = self._build_session_state() |
| |
|
| | |
| | assert state["journey_state"] == "INGEST" |
| | state["patient_profile"] = MOCK_PATIENT_PROFILE |
| | state["journey_state"] = "PRESCREEN" |
| |
|
| | |
| | assert state["journey_state"] == "PRESCREEN" |
| | anchors = SearchAnchors( |
| | condition="Non-Small Cell Lung Cancer", |
| | biomarkers=["EGFR"], |
| | stage="IIIB", |
| | ) |
| | state["search_anchors"] = anchors |
| | state["trial_candidates"] = list(MOCK_TRIAL_CANDIDATES) |
| | state["journey_state"] = "VALIDATE_TRIALS" |
| |
|
| | |
| | assert state["journey_state"] == "VALIDATE_TRIALS" |
| | state["eligibility_ledger"] = list(MOCK_ELIGIBILITY_LEDGERS) |
| | state["journey_state"] = "GAP_FOLLOWUP" |
| |
|
| | |
| | assert state["journey_state"] == "GAP_FOLLOWUP" |
| | state["journey_state"] = "SUMMARY" |
| |
|
| | assert state["journey_state"] == "SUMMARY" |
| |
|
| | def test_journey_produces_exportable_data(self): |
| | """Verify end state has all data needed for doctor packet export.""" |
| | state = self._build_session_state() |
| | state["patient_profile"] = MOCK_PATIENT_PROFILE |
| | state["trial_candidates"] = list(MOCK_TRIAL_CANDIDATES) |
| | state["eligibility_ledger"] = list(MOCK_ELIGIBILITY_LEDGERS) |
| | state["journey_state"] = "SUMMARY" |
| |
|
| | |
| | profile = state["patient_profile"] |
| | ledgers = state["eligibility_ledger"] |
| | trials = state["trial_candidates"] |
| |
|
| | assert isinstance(profile, PatientProfile) |
| | assert len(trials) == 3 |
| | assert len(ledgers) == 3 |
| |
|
| | eligible = sum(1 for lg in ledgers if lg.traffic_light == "green") |
| | uncertain = sum(1 for lg in ledgers if lg.traffic_light == "yellow") |
| | ineligible = sum(1 for lg in ledgers if lg.traffic_light == "red") |
| |
|
| | assert eligible == 1 |
| | assert uncertain == 1 |
| | assert ineligible == 1 |
| |
|
| | def test_gap_loop_back_to_ingest(self): |
| | """Verify GAP_FOLLOWUP can loop back to INGEST for new docs.""" |
| | state = self._build_session_state() |
| | state["patient_profile"] = MOCK_PATIENT_PROFILE |
| | state["trial_candidates"] = list(MOCK_TRIAL_CANDIDATES) |
| | state["eligibility_ledger"] = list(MOCK_ELIGIBILITY_LEDGERS) |
| | state["journey_state"] = "GAP_FOLLOWUP" |
| |
|
| | |
| | state["journey_state"] = "INGEST" |
| | assert state["journey_state"] == "INGEST" |
| |
|
| | |
| | assert state["patient_profile"] is not None |
| | assert len(state["trial_candidates"]) == 3 |
| |
|
| | def test_all_journey_states_reachable(self): |
| | """Verify each of the 5 journey states can be reached.""" |
| | state = self._build_session_state() |
| | visited = [] |
| |
|
| | for target_state in JOURNEY_STATES: |
| | state["journey_state"] = target_state |
| | visited.append(state["journey_state"]) |
| |
|
| | assert visited == JOURNEY_STATES |
| | assert len(visited) == 5 |
| |
|
| |
|
| | class TestE2EWithMockedServices: |
| | """E2E test using mocked service calls to verify data flow.""" |
| |
|
| | @pytest.mark.asyncio |
| | async def test_extract_to_search_to_evaluate_chain(self, mock_medgemma, mock_gemini): |
| | """Full service chain: extraction β search anchors β evaluate.""" |
| | from trialpath.services.gemini_planner import GeminiPlanner |
| | from trialpath.services.medgemma_extractor import MedGemmaExtractor |
| |
|
| | |
| | extractor = MedGemmaExtractor() |
| | profile = await extractor.extract(["patient_notes.pdf"], {}) |
| | assert isinstance(profile, PatientProfile) |
| |
|
| | |
| | planner = GeminiPlanner() |
| | anchors = await planner.generate_search_anchors(profile) |
| | assert isinstance(anchors, SearchAnchors) |
| |
|
| | |
| | criteria = await planner.slice_criteria(MOCK_TRIAL_CANDIDATES[0].model_dump()) |
| | assert len(criteria) >= 1 |
| |
|
| | |
| | assessments = [] |
| | for c in criteria: |
| | if c["type"] == "medical": |
| | result = await extractor.evaluate_medical_criterion(c["text"], profile, []) |
| | else: |
| | result = await planner.evaluate_structural_criterion(c["text"], profile) |
| | assessments.append( |
| | { |
| | "criterion": c["text"], |
| | "decision": result["decision"], |
| | "confidence": result.get("confidence", 0.5), |
| | } |
| | ) |
| | assert len(assessments) == len(criteria) |
| |
|
| | |
| | ledger = await planner.aggregate_assessments( |
| | profile=profile, |
| | trial=MOCK_TRIAL_CANDIDATES[0].model_dump(), |
| | assessments=assessments, |
| | ) |
| | assert isinstance(ledger, EligibilityLedger) |
| |
|
| | def test_data_contracts_survive_serialization(self): |
| | """Verify all data contracts survive JSON roundtrip.""" |
| | |
| | p_json = MOCK_PATIENT_PROFILE.model_dump_json() |
| | p_restored = PatientProfile.model_validate_json(p_json) |
| | assert p_restored.patient_id == MOCK_PATIENT_PROFILE.patient_id |
| |
|
| | |
| | for t in MOCK_TRIAL_CANDIDATES: |
| | t_json = t.model_dump_json() |
| | t_restored = TrialCandidate.model_validate_json(t_json) |
| | assert t_restored.nct_id == t.nct_id |
| |
|
| | |
| | for lg in MOCK_ELIGIBILITY_LEDGERS: |
| | lg_json = lg.model_dump_json() |
| | lg_restored = EligibilityLedger.model_validate_json(lg_json) |
| | assert lg_restored.nct_id == lg.nct_id |
| |
|
| | |
| | anchors = SearchAnchors( |
| | condition="NSCLC", |
| | biomarkers=["EGFR", "ALK"], |
| | stage="IV", |
| | ) |
| | a_json = anchors.model_dump_json() |
| | a_restored = SearchAnchors.model_validate_json(a_json) |
| | assert a_restored.condition == "NSCLC" |
| |
|
| |
|
| | class TestE2ELatencyBudget: |
| | """Verify operations complete within latency budget (mocked).""" |
| |
|
| | @pytest.mark.asyncio |
| | async def test_mock_operations_are_fast(self, mock_medgemma, mock_gemini): |
| | """With mocked services, full chain should complete near-instantly.""" |
| | import time |
| |
|
| | from trialpath.services.gemini_planner import GeminiPlanner |
| | from trialpath.services.medgemma_extractor import MedGemmaExtractor |
| |
|
| | start = time.monotonic() |
| |
|
| | extractor = MedGemmaExtractor() |
| | profile = await extractor.extract(["doc.pdf"], {}) |
| |
|
| | planner = GeminiPlanner() |
| | await planner.generate_search_anchors(profile) |
| | criteria = await planner.slice_criteria(MOCK_TRIAL_CANDIDATES[0].model_dump()) |
| |
|
| | for c in criteria: |
| | if c["type"] == "medical": |
| | await extractor.evaluate_medical_criterion(c["text"], profile, []) |
| | else: |
| | await planner.evaluate_structural_criterion(c["text"], profile) |
| |
|
| | await planner.aggregate_assessments( |
| | profile=profile, |
| | trial=MOCK_TRIAL_CANDIDATES[0].model_dump(), |
| | assessments=[], |
| | ) |
| | await planner.analyze_gaps(profile, list(MOCK_ELIGIBILITY_LEDGERS)) |
| |
|
| | elapsed = time.monotonic() - start |
| | |
| | assert elapsed < 1.0, f"Mock pipeline took {elapsed:.2f}s, expected < 1s" |
| |
|