import time from pathlib import Path import pytest from TerraFin.agent.definitions import ( DEFAULT_HOSTED_AGENT_NAME, TerraFinAgentDefinition, TerraFinAgentDefinitionRegistry, build_default_agent_definition_registry, ) from TerraFin.agent.hosted_runtime import ( TerraFinAgentApprovalRequiredError, TerraFinAgentPolicyError, TerraFinAgentSessionConflictError, TerraFinHostedAgentRuntime, ) from TerraFin.agent.model_runtime import TerraFinRuntimeModel from TerraFin.agent.runtime import build_default_capability_registry from TerraFin.agent.session_store import SQLiteHostedSessionStore from TerraFin.agent.transcript_store import HostedTranscriptStore def _processing() -> dict[str, object]: return { "requestedDepth": "auto", "resolvedDepth": "full", "loadedStart": "2024-01-01", "loadedEnd": "2024-12-31", "isComplete": True, "hasOlder": False, "sourceVersion": "test-source", "view": "daily", } class _FakeService: def resolve(self, query: str) -> dict[str, object]: return {"type": "stock", "name": query.upper(), "path": f"/stock/{query.upper()}", "processing": _processing()} def market_data(self, name: str, *, depth: str = "auto", view: str = "daily") -> dict[str, object]: return {"ticker": name, "seriesType": "candlestick", "count": 1, "data": [], "processing": {**_processing(), "requestedDepth": depth, "view": view}} def indicators( self, name: str, indicators: str, *, depth: str = "auto", view: str = "daily", ) -> dict[str, object]: return { "ticker": name, "indicators": {"rsi": {"name": "rsi", "offset": 0, "values": {"value": 55.0}}}, "unknown": [], "processing": {**_processing(), "requestedDepth": depth, "view": view, "indicatorQuery": indicators}, } def patterns(self, name: str, *, depth: str = "auto", view: str = "daily") -> dict[str, object]: return {"ticker": name, "signals": [], "total": 0, "processing": {**_processing(), "requestedDepth": depth, "view": view}} def market_snapshot(self, name: str, *, depth: str = "auto", view: str = "daily") -> dict[str, object]: return { "ticker": name, "price_action": {"current": 100.0}, "indicators": {"rsi": 55.0}, "market_breadth": [], "watchlist": [], "processing": {**_processing(), "requestedDepth": depth, "view": view}, } def lppl_analysis(self, name: str, *, depth: str = "auto", view: str = "daily") -> dict[str, object]: return {"name": name, "confidence": 0.2, "processing": {**_processing(), "requestedDepth": depth, "view": view}} def company_info(self, ticker: str) -> dict[str, object]: return {"ticker": ticker, "shortName": f"{ticker} Corp", "processing": _processing()} def earnings(self, ticker: str) -> dict[str, object]: return {"ticker": ticker, "earnings": [], "processing": _processing()} def financials(self, ticker: str, *, statement: str = "income", period: str = "annual") -> dict[str, object]: return {"ticker": ticker, "statement": statement, "period": period, "columns": [], "rows": [], "processing": _processing()} def portfolio(self, guru: str) -> dict[str, object]: return {"guru": guru, "info": {}, "holdings": [], "count": 0, "processing": _processing()} def economic(self, indicators: str) -> dict[str, object]: return {"indicators": {indicators: {"latest_value": 3.0}}, "processing": _processing()} def macro_focus(self, name: str, *, depth: str = "auto", view: str = "daily") -> dict[str, object]: return { "name": name, "info": {"name": name, "type": "index", "description": "Macro", "currentValue": 1.0, "change": 0.0, "changePercent": 0.0}, "seriesType": "line", "count": 1, "data": [], "processing": {**_processing(), "requestedDepth": depth, "view": view}, } def calendar_events( self, *, year: int, month: int, categories: str | None = None, limit: int | None = None, ) -> dict[str, object]: return {"events": [], "count": 0, "month": month, "year": year, "categories": categories, "limit": limit, "processing": _processing()} def fundamental_screen(self, ticker: str) -> dict[str, object]: return { "ticker": ticker, "moat": {"score": "wide"}, "earnings_quality": {}, "balance_sheet": {}, "capital_allocation": {}, "pricing_power": {}, "warnings": [], "processing": _processing(), } def risk_profile(self, name: str, *, depth: str = "auto") -> dict[str, object]: return { "ticker": name, "tail_risk": {}, "convexity": {}, "volatility": {"requestedDepth": depth}, "drawdown": {}, "warnings": [], "processing": _processing(), } def valuation(self, ticker: str) -> dict[str, object]: return { "ticker": ticker, "dcf": {"status": "ready", "intrinsic_value": 120.0}, "reverse_dcf": {"status": "ready", "implied_growth_pct": 8.0}, "relative": {"trailing_pe": 22.0}, "graham_number": 100.0, "margin_of_safety_pct": 12.0, "current_price": 107.0, "processing": _processing(), } def sec_filings(self, ticker: str) -> dict[str, object]: return {"ticker": ticker, "cik": 1, "forms": [], "filings": [], "processing": _processing()} def sec_filing_document( self, ticker: str, accession: str, primaryDocument: str, *, form: str = "10-Q" ) -> dict[str, object]: return {"ticker": ticker, "accession": accession, "primaryDocument": primaryDocument, "toc": [], "charCount": 0, "indexUrl": "", "documentUrl": "", "processing": _processing()} def sec_filing_section( self, ticker: str, accession: str, primaryDocument: str, sectionSlug: str, *, form: str = "10-Q" ) -> dict[str, object]: return {"ticker": ticker, "accession": accession, "sectionSlug": sectionSlug, "sectionTitle": "stub", "markdown": "", "charCount": 0, "documentUrl": "", "processing": _processing()} def fcf_history(self, ticker: str, years: int = 10) -> dict[str, object]: return { "ticker": ticker, "years": years, "rows": [], "candidates": {"threeYearAvg": None, "latestAnnual": None, "ttm": None}, "autoSelectedSource": "annual", "processing": _processing(), } def similarity_search( self, ticker: str, universe: str = "sp500+nasdaq100+kospi200", period: str = "1y", top_n: int = 20, ) -> dict[str, object]: return {"ticker": ticker, "period": period, "pool": {}, "results": [], "count": 0, "processing": _processing()} def fear_greed(self) -> dict[str, object]: return {"score": 50, "rating": "Neutral", "processing": _processing()} def sp500_dcf(self) -> dict[str, object]: return {"status": "ready", "currentIntrinsicValue": 5000.0, "processing": _processing()} def beta_estimate(self, ticker: str) -> dict[str, object]: return {"symbol": ticker, "beta": 1.0, "adjustedBeta": 1.0, "rSquared": 0.5, "processing": _processing()} def top_companies(self) -> dict[str, object]: return {"companies": [], "count": 0, "processing": _processing()} def market_regime(self) -> dict[str, object]: return {"summary": "stub", "confidence": "low", "signals": [], "processing": _processing()} def trailing_forward_pe(self) -> dict[str, object]: return {"date": "2026-04-01", "latestValue": 0.0, "history": [], "processing": _processing()} def market_breadth(self) -> dict[str, object]: return {"metrics": [], "processing": _processing()} def watchlist(self) -> dict[str, object]: return {"items": [], "count": 0, "processing": _processing()} def _fake_chart_opener( data_or_names, *, session_id: str | None = None, **kwargs, ) -> dict[str, object]: _ = kwargs return { "ok": True, "sessionId": session_id or "agent:chart", "chartUrl": f"http://127.0.0.1:8001/chart?sessionId={session_id or 'agent:chart'}", "processing": _processing(), "inputEcho": data_or_names, } def _runtime( agent_registry: TerraFinAgentDefinitionRegistry | None = None, *, transcript_root: Path | None = None, ) -> TerraFinHostedAgentRuntime: service = _FakeService() registry = build_default_capability_registry(service, chart_opener=_fake_chart_opener) return TerraFinHostedAgentRuntime( service=service, capability_registry=registry, agent_registry=agent_registry, transcript_store=None if transcript_root is None else HostedTranscriptStore(root_dir=transcript_root), ) def test_hosted_runtime_lists_default_agent_definitions() -> None: runtime = _runtime() assert tuple(definition.name for definition in runtime.list_agents()) == (DEFAULT_HOSTED_AGENT_NAME,) def test_create_session_records_agent_metadata(tmp_path) -> None: runtime = _runtime(transcript_root=tmp_path / "transcripts") context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:test", metadata={"thread": "alpha"}) assert context.session.session_id == "hosted:test" assert context.session.metadata["thread"] == "alpha" assert context.session.metadata["agentDefinition"] == DEFAULT_HOSTED_AGENT_NAME assert context.metadata["agentPolicy"]["chartAccess"] is True def test_create_session_rejects_internal_agent_definitions_by_default(tmp_path) -> None: runtime = _runtime( agent_registry=build_default_agent_definition_registry(include_gurus=True), transcript_root=tmp_path / "transcripts", ) with pytest.raises(TerraFinAgentPolicyError, match="internal-only"): runtime.create_session("warren-buffett", session_id="hosted:hidden") def test_create_internal_session_allows_hidden_guru_definitions(tmp_path) -> None: runtime = _runtime( agent_registry=build_default_agent_definition_registry(include_gurus=True), transcript_root=tmp_path / "transcripts", ) context = runtime.create_internal_session( "warren-buffett", session_id="hosted:hidden", metadata={"hiddenInternal": True}, ) assert context.session.session_id == "hosted:hidden" assert context.session.metadata["agentDefinition"] == "warren-buffett" def test_existing_session_runtime_model_tracks_current_default_model(tmp_path) -> None: runtime = _runtime(transcript_root=tmp_path / "transcripts") runtime.default_runtime_model = TerraFinRuntimeModel( model_ref="openai/gpt-4.1-mini", provider_id="openai", provider_label="OpenAI", model_id="gpt-4.1-mini", ) context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:model-sync") record = runtime.get_session_record(context.session.session_id) assert record.context.session.metadata["runtimeModel"]["modelRef"] == "openai/gpt-4.1-mini" runtime.default_runtime_model = TerraFinRuntimeModel( model_ref="github-copilot/gpt-4o", provider_id="github-copilot", provider_label="GitHub Copilot", model_id="gpt-4o", ) updated = runtime.get_session_record(context.session.session_id) assert updated.context.session.metadata["runtimeModel"]["modelRef"] == "github-copilot/gpt-4o" assert updated.metadata["runtimeModel"]["providerId"] == "github-copilot" def test_delete_session_cascades_hidden_child_sessions(tmp_path) -> None: runtime = _runtime( agent_registry=build_default_agent_definition_registry(include_gurus=True), transcript_root=tmp_path / "transcripts", ) parent = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:parent") child = runtime.create_internal_session( "warren-buffett", session_id="hosted:child", metadata={ "hiddenInternal": True, "parentSessionId": parent.session.session_id, }, ) runtime.delete_session(parent.session.session_id) assert runtime.transcript_store is not None assert runtime.transcript_store.session_exists(parent.session.session_id) is False assert runtime.transcript_store.session_exists(child.session.session_id) is False with pytest.raises(KeyError): runtime.get_session_record(child.session.session_id) def test_invoke_applies_agent_default_depth_and_view() -> None: registry = TerraFinAgentDefinitionRegistry( [ TerraFinAgentDefinition( name="macro-analyst", description="Macro-focused test agent.", allowed_capabilities=("macro_focus",), default_depth="auto", default_view="weekly", chart_access=False, allow_background_tasks=False, ) ] ) runtime = _runtime(agent_registry=registry) context = runtime.create_session("macro-analyst", session_id="hosted:macro") payload = runtime.invoke(context.session.session_id, "macro_focus", name="DXY") assert payload["processing"]["requestedDepth"] == "auto" assert payload["processing"]["view"] == "weekly" def test_invoke_rejects_disallowed_capability() -> None: registry = TerraFinAgentDefinitionRegistry( [ TerraFinAgentDefinition( name="portfolio-reader", description="Portfolio-only test agent.", allowed_capabilities=("portfolio", "company_info", "market_snapshot"), default_depth="auto", default_view="daily", chart_access=False, allow_background_tasks=True, ) ] ) runtime = _runtime(agent_registry=registry) context = runtime.create_session("portfolio-reader", session_id="hosted:portfolio") with pytest.raises(TerraFinAgentPolicyError, match="cannot use capability 'economic'"): runtime.invoke(context.session.session_id, "economic", indicators="UNRATE") def test_invoke_rejects_chart_access_when_policy_disallows_it() -> None: registry = TerraFinAgentDefinitionRegistry( [ TerraFinAgentDefinition( name="no-chart-operator", description="Cannot open charts.", allowed_capabilities=("market_snapshot", "open_chart"), default_depth="auto", default_view="daily", chart_access=False, allow_background_tasks=True, ) ] ) runtime = _runtime(agent_registry=registry) context = runtime.create_session("no-chart-operator", session_id="hosted:no-chart") with pytest.raises(TerraFinAgentPolicyError, match="chart"): runtime.invoke(context.session.session_id, "open_chart", data_or_names=["AAPL"]) def test_run_task_requires_background_policy_and_capability_support() -> None: runtime = _runtime() context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:foreground-only") with pytest.raises(TerraFinAgentPolicyError, match="backgroundable"): runtime.run_task(context.session.session_id, "company_info", ticker="AAPL") def test_run_task_rejects_agents_without_background_permission() -> None: registry = TerraFinAgentDefinitionRegistry( [ TerraFinAgentDefinition( name="foreground-only", description="Cannot launch background work.", allowed_capabilities=("market_snapshot",), default_depth="auto", default_view="daily", chart_access=False, allow_background_tasks=False, ) ] ) runtime = _runtime(agent_registry=registry) context = runtime.create_session("foreground-only", session_id="hosted:task-denied") with pytest.raises(TerraFinAgentPolicyError, match="background"): runtime.run_task(context.session.session_id, "market_snapshot", name="AAPL") def test_run_task_completes_for_backgroundable_allowed_capability() -> None: runtime = _runtime() context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:task") task, result = runtime.run_task(context.session.session_id, "market_snapshot", name="AAPL") assert task.status == "completed" assert result["ticker"] == "AAPL" def test_start_task_completes_async_and_is_visible_on_session() -> None: runtime = _runtime() context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:async-task") task = runtime.start_task(context.session.session_id, "market_snapshot", name="MSFT") assert any(item.task_id == task.task_id for item in runtime.list_session_tasks(context.session.session_id)) for _ in range(40): current = runtime.get_task(task.task_id) if current.status == "completed": break time.sleep(0.05) else: raise AssertionError("Timed out waiting for async task completion.") assert current.result is not None assert current.result["ticker"] == "MSFT" def test_invoke_requires_human_approval_for_side_effecting_capability() -> None: runtime = _runtime() runtime.default_require_human_approval_for_side_effects = True context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:approval") with pytest.raises(TerraFinAgentApprovalRequiredError) as exc: runtime.invoke(context.session.session_id, "open_chart", data_or_names=["AAPL"]) approval = exc.value.approval assert approval.status == "pending" assert runtime.list_session_approvals(context.session.session_id)[0].approval_id == approval.approval_id approved = runtime.approve_approval(approval.approval_id, note="looks good") assert approved.status == "approved" assert runtime.get_approval(approval.approval_id).status == "approved" def test_read_linked_view_context_returns_published_page_state() -> None: runtime = _runtime() context = runtime.create_session( DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:view", metadata={"viewContextId": "view:buffett"}, ) runtime.upsert_view_context( "view:buffett", route="/market-insights", page_type="market-insights", title="Buffett Portfolio", selection={"guru": "Buffett"}, metadata={"topHoldingTickers": ["AAPL", "BAC"]}, ) payload = runtime.read_linked_view_context(context.session.session_id) assert payload["available"] is True assert payload["route"] == "/market-insights" assert payload["selection"]["guru"] == "Buffett" def test_sqlite_session_store_reloads_cached_record_when_another_worker_persists(tmp_path) -> None: """Simulate two FastAPI workers sharing one SQLite DB. Worker A has the record cached in-process. Worker B mutates the session (e.g. relinks the viewContextId) and persists. The next `get()` on Worker A must return the fresh metadata — not the stale cached copy — or the pull-based `current_view_context` tool will read the wrong tab's view context. Regression for hostile-review finding C2: the get() path previously returned cached records unconditionally, and only the WRITE side of the relink fix (0a54535) was in place.""" service = _FakeService() registry = build_default_capability_registry(service, chart_opener=_fake_chart_opener) db_path = tmp_path / "coherency.sqlite3" store_a = SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry) store_b = SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry) runtime_a = TerraFinHostedAgentRuntime(service=service, capability_registry=registry, session_store=store_a) runtime_b = TerraFinHostedAgentRuntime(service=service, capability_registry=registry, session_store=store_b) runtime_a.create_session( DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:coherency", metadata={"viewContextId": "ctx-A"}, ) # Prime worker B's cache by doing a read first. record_before = runtime_b.get_session_record("hosted:coherency") assert record_before.context.session.metadata.get("viewContextId") == "ctx-A" # Worker A mutates through a relink — persist updates SQLite AND worker # A's cache, but leaves worker B's cache stale. import time as _time _time.sleep(0.01) # ensure strictly-later updated_at runtime_a.relink_session_view_context("hosted:coherency", "ctx-B") # Worker B's next read should detect the newer SQLite updated_at and # reload, picking up the new viewContextId. record_after = runtime_b.get_session_record("hosted:coherency") assert record_after.context.session.metadata.get("viewContextId") == "ctx-B" runtime_a.shutdown() runtime_b.shutdown() def test_relink_session_view_context_persists_across_runtime_instances(tmp_path) -> None: """The relink must survive a runtime restart — in-memory mutation alone is invisible to the SQLite store, so other workers / processes would keep reading the stale viewContextId. Regression for live-server bug where `relink_session_view_context` mutated the cached record but didn't save it, so `current_view_context` kept reading the creation-time snapshot.""" service = _FakeService() registry = build_default_capability_registry(service, chart_opener=_fake_chart_opener) db_path = tmp_path / "hosted-runtime.sqlite3" transcript_root = tmp_path / "transcripts" runtime_a = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) context = runtime_a.create_session( DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:relink", metadata={"viewContextId": "creation-time-id"}, ) sid = context.session.session_id runtime_a.relink_session_view_context(sid, "live-id") runtime_a.shutdown() runtime_b = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) reloaded = runtime_b.get_session_record(sid) assert reloaded.context.session.metadata.get("viewContextId") == "live-id" runtime_b.shutdown() def test_sqlite_session_store_restores_session_state_after_runtime_restart(tmp_path) -> None: service = _FakeService() registry = build_default_capability_registry(service, chart_opener=_fake_chart_opener) db_path = tmp_path / "hosted-runtime.sqlite3" transcript_root = tmp_path / "transcripts" runtime = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:sqlite-restore") runtime.invoke(context.session.session_id, "market_snapshot", name="AAPL") task, _ = runtime.run_task(context.session.session_id, "financials", ticker="AAPL") runtime.shutdown() reloaded = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) record = reloaded.get_session_record(context.session.session_id) snapshot = record.context.session.snapshot() assert "AAPL" in snapshot.focus_items assert len(snapshot.capability_calls) == 2 assert record.context.task_registry.get(task.task_id).status == "completed" reloaded.shutdown() def test_hosted_runtime_lists_and_deletes_sessions(tmp_path) -> None: runtime = _runtime(transcript_root=tmp_path / "transcripts") runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:first") runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:second") listed = tuple(record.session_id for record in runtime.list_sessions()) assert listed == ("hosted:second", "hosted:first") removed = runtime.delete_session("hosted:first") assert removed.session_id == "hosted:first" assert tuple(record.session_id for record in runtime.list_sessions()) == ("hosted:second",) assert list((tmp_path / "transcripts" / "sessions").glob("hosted:first.deleted.*.jsonl")) def test_hosted_runtime_rejects_delete_when_session_has_active_tasks() -> None: runtime = _runtime() context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:delete-blocked") record = runtime.get_session_record(context.session.session_id) task = record.context.task_registry.create( "market_snapshot", description="market snapshot", session_id=context.session.session_id, input_payload={"name": "AAPL"}, ) record.context.task_registry.mark_running(task.task_id) runtime.session_store.persist(record) with pytest.raises(TerraFinAgentSessionConflictError, match="active background tasks"): runtime.delete_session(context.session.session_id) def test_sqlite_session_store_restores_view_context_after_runtime_restart(tmp_path) -> None: service = _FakeService() registry = build_default_capability_registry(service, chart_opener=_fake_chart_opener) db_path = tmp_path / "hosted-view-context.sqlite3" transcript_root = tmp_path / "transcripts" runtime = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) runtime.upsert_view_context( "view:macro", route="/market-insights", page_type="market-insights", selection={"instrument": "DXY"}, metadata={"routeSource": "widget"}, ) runtime.shutdown() reloaded = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) context = reloaded.get_view_context("view:macro") assert context.route == "/market-insights" assert context.page_type == "market-insights" assert context.selection["instrument"] == "DXY" reloaded.shutdown() def test_sqlite_store_allows_another_runtime_to_execute_pending_task(tmp_path) -> None: service = _FakeService() registry = build_default_capability_registry(service, chart_opener=_fake_chart_opener) db_path = tmp_path / "hosted-shared.sqlite3" transcript_root = tmp_path / "transcripts" runtime_a = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) context = runtime_a.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="hosted:shared-task") task = runtime_a.start_task(context.session.session_id, "market_snapshot", name="QQQ") runtime_a.shutdown() runtime_b = TerraFinHostedAgentRuntime( service=service, capability_registry=registry, session_store=SQLiteHostedSessionStore(db_path=db_path, service=service, registry=registry), transcript_store=HostedTranscriptStore(root_dir=transcript_root), ) for _ in range(40): current = runtime_b.get_task(task.task_id) if current.status == "completed": break time.sleep(0.05) else: raise AssertionError("Timed out waiting for the second runtime to complete the shared task.") assert current.result is not None assert current.result["ticker"] == "QQQ" runtime_b.shutdown() def test_transcript_first_runtime_ignores_legacy_blob_only_sessions(tmp_path) -> None: runtime = _runtime() context = runtime.create_session(DEFAULT_HOSTED_AGENT_NAME, session_id="legacy:blob-only") runtime.transcript_store = HostedTranscriptStore(root_dir=tmp_path / "transcripts") assert runtime.list_sessions() == () with pytest.raises(KeyError): runtime.get_session_record(context.session.session_id)