from dataclasses import replace from datetime import UTC, datetime from types import SimpleNamespace import pandas as pd from fastapi.testclient import TestClient import TerraFin.agent.service as agent_service import TerraFin.interface.agent.data_routes as agent_routes import TerraFin.interface.stock.data_routes as stock_routes import TerraFin.interface.stock.payloads as stock_payloads from TerraFin.agent.definitions import ( DEFAULT_HOSTED_AGENT_NAME, TerraFinAgentDefinition, is_internal_agent_definition, ) from TerraFin.agent.loop import TerraFinConversationMessage, TerraFinHostedConversation, TerraFinHostedRunResult from TerraFin.agent.runtime import TerraFinAgentSession, TerraFinTaskRegistry from TerraFin.agent.session_store import ( TerraFinHostedApprovalRequest, TerraFinHostedSessionRecord, TerraFinHostedViewContextRecord, ) from TerraFin.agent.tools import TerraFinToolDefinition, TerraFinToolInvocationResult from TerraFin.data.contracts import HistoryChunk, TimeSeriesDataFrame from TerraFin.data.providers.corporate.filings.sec_edgar.filing import SecEdgarConfigurationError from TerraFin.interface.server import create_app from TerraFin.interface.watchlist_service import reset_watchlist_service def _make_fake_tsdf(ticker: str = "TEST", n: int = 120) -> TimeSeriesDataFrame: dates = pd.date_range("2025-01-01", periods=n, freq="B") base = 100.0 data = { "time": dates, "open": [base + idx * 0.1 for idx in range(n)], "high": [base + idx * 0.1 + 1 for idx in range(n)], "low": [base + idx * 0.1 - 1 for idx in range(n)], "close": [base + idx * 0.2 for idx in range(n)], "volume": [1000 + idx for idx in range(n)], } df = TimeSeriesDataFrame(pd.DataFrame(data)) df.name = ticker return df class _FakeDataFactory: def __init__(self, *args, **kwargs) -> None: _ = args, kwargs def get(self, name): return _make_fake_tsdf(name, 180) def get_recent_history(self, name, period="3y", *, force_refresh=False): _ = period, force_refresh df = _make_fake_tsdf(name, 90) return HistoryChunk( frame=df, loaded_start="2025-01-01", loaded_end="2025-05-06", requested_period="3y", is_complete=False, has_older=True, source_version="recent-cache", ) def get_full_history_backfill(self, name, loaded_start=None): _ = loaded_start df = _make_fake_tsdf(name, 180) return HistoryChunk( frame=df, loaded_start="2024-07-03", loaded_end="2025-05-06", requested_period=None, is_complete=True, has_older=False, source_version="full-cache", ) def get_fred_data(self, name): df = _make_fake_tsdf(name, 12)[["time", "close"]] df = TimeSeriesDataFrame(df) df.name = name return df def get_corporate_data(self, ticker, statement_type="income", period="annual"): _ = ticker, statement_type, period return pd.DataFrame( { "date": ["2025-12-31", "2024-12-31"], "Revenue": [1000.0, 950.0], "Net Income": [210.0, 200.0], } ) def get_portfolio_data(self, guru): _ = guru return _FakePortfolioOutput() def get_panel_data(self, name): if name == "market_breadth": return [{"label": "Advancers", "value": "300", "tone": "#047857"}] raise ValueError(f"Unknown panel: {name}") def get_calendar_events(self, *, year, month, categories=None, limit=None): _ = categories, limit return [ { "id": f"{year}-{month}-1", "title": "CPI", "start": f"{year}-{month:02d}-12", "category": "macro", "importance": "high", "displayTime": "08:30", "description": "Inflation", "source": "FRED", } ] class _FakeWatchlistService: def get_watchlist_snapshot(self): return [{"symbol": "AAPL", "name": "Apple", "move": "+1.1%", "tags": []}] class _FakePortfolioOutput: def __init__(self) -> None: self.info = {"Period": "Q1 2026", "Source": "fixture"} self.df = pd.DataFrame( [ {"Stock": "AAA", "% of Portfolio": 10.5, "Recent Activity": "Add 2.00%", "Updated": 2.0}, {"Stock": "BBB", "% of Portfolio": 8.0, "Recent Activity": "Reduce 1.50%", "Updated": -1.5}, ] ) class _FakeHostedRuntime: def __init__( self, definition: TerraFinAgentDefinition, internal_definition: TerraFinAgentDefinition | None = None ) -> None: self._definition = definition self._internal_definition = internal_definition self._records: dict[str, TerraFinHostedSessionRecord] = {} self._task_index: dict[str, str] = {} self._view_contexts: dict[str, TerraFinHostedViewContextRecord] = {} def list_agents(self): definitions = [self._definition] if self._internal_definition is not None: definitions.append(self._internal_definition) return tuple(definitions) def get_agent_definition(self, agent_name: str) -> TerraFinAgentDefinition: if agent_name == self._definition.name: return self._definition if self._internal_definition is not None and agent_name == self._internal_definition.name: return self._internal_definition raise KeyError(agent_name) def create_record( self, session_id: str, *, metadata: dict | None = None, agent_name: str | None = None, ) -> TerraFinHostedSessionRecord: definition = self.get_agent_definition(agent_name or self._definition.name) session_metadata = { **dict(metadata or {}), "agentDefinition": definition.name, "agentPolicy": { "defaultDepth": definition.default_depth, "defaultView": definition.default_view, "chartAccess": definition.chart_access, "allowBackgroundTasks": definition.allow_background_tasks, }, } session = TerraFinAgentSession(session_id=session_id, metadata=session_metadata) context = SimpleNamespace(session=session, task_registry=TerraFinTaskRegistry()) record = TerraFinHostedSessionRecord( session_id=session_id, agent_name=definition.name, context=context, metadata=dict(session_metadata), ) self._records[session_id] = record return record def get_session_record(self, session_id: str) -> TerraFinHostedSessionRecord: return self._records[session_id] def get_public_session_record(self, session_id: str) -> TerraFinHostedSessionRecord: record = self.get_session_record(session_id) definition = self.get_agent_definition(record.agent_name) if is_internal_agent_definition(definition) or record.metadata.get("hiddenInternal"): raise KeyError(session_id) return record def list_sessions(self): return tuple( sorted( self._records.values(), key=lambda record: record.last_accessed_at, reverse=True, ) ) def delete_session(self, session_id: str): record = self._records[session_id] active_tasks = [ task for task in record.context.task_registry.list_for_session(session_id) if task.status not in {"completed", "failed", "cancelled"} ] if active_tasks: raise agent_routes.TerraFinAgentSessionConflictError( f"Session '{session_id}' still has active background tasks. Cancel them before deleting the session." ) self._records.pop(session_id, None) return record def list_session_tasks(self, session_id: str): return self._records[session_id].context.task_registry.list_for_session(session_id) def list_public_session_tasks(self, session_id: str): self.get_public_session_record(session_id) return self.list_session_tasks(session_id) def get_task(self, task_id: str): session_id = self._task_index[task_id] return self._records[session_id].context.task_registry.get(task_id) def get_public_task(self, task_id: str): task = self.get_task(task_id) self.get_public_session_record(task.session_id) return task def cancel_task(self, task_id: str): session_id = self._task_index[task_id] return self._records[session_id].context.task_registry.cancel(task_id, reason="Cancelled by test") def cancel_public_task(self, task_id: str): task = self.get_public_task(task_id) return self.cancel_task(task.task_id) def list_session_approvals(self, session_id: str): return tuple(self._records[session_id].approval_requests) def list_public_session_approvals(self, session_id: str): self.get_public_session_record(session_id) return self.list_session_approvals(session_id) def get_approval(self, approval_id: str): for record in self._records.values(): for approval in record.approval_requests: if approval.approval_id == approval_id: return approval raise KeyError(approval_id) def get_public_approval(self, approval_id: str): approval = self.get_approval(approval_id) self.get_public_session_record(approval.session_id) return approval def upsert_view_context( self, context_id: str, *, route: str, page_type: str, title: str | None = None, summary: str | None = None, selection: dict | None = None, entities: list[dict] | None = None, metadata: dict | None = None, ): existing = self._view_contexts.get(context_id) now = datetime.now(UTC) record = TerraFinHostedViewContextRecord( context_id=context_id, created_at=existing.created_at if existing is not None else now, updated_at=now, route=route, page_type=page_type, title=title, summary=summary, selection=dict(selection or {}), entities=[dict(entity) for entity in (entities or [])], metadata=dict(metadata or {}), ) self._view_contexts[context_id] = record return record def get_view_context(self, context_id: str): return self._view_contexts[context_id] def approve_approval(self, approval_id: str, *, note: str | None = None): for record in self._records.values(): for idx, approval in enumerate(record.approval_requests): if approval.approval_id != approval_id: continue updated = replace( approval, status="approved", updated_at=datetime.now(UTC), resolved_at=datetime.now(UTC), decision_note=note, ) record.approval_requests[idx] = updated return updated raise KeyError(approval_id) def approve_public_approval(self, approval_id: str, *, note: str | None = None): approval = self.get_public_approval(approval_id) return self.approve_approval(approval.approval_id, note=note) def deny_approval(self, approval_id: str, *, note: str | None = None): for record in self._records.values(): for idx, approval in enumerate(record.approval_requests): if approval.approval_id != approval_id: continue updated = replace( approval, status="denied", updated_at=datetime.now(UTC), resolved_at=datetime.now(UTC), decision_note=note, ) record.approval_requests[idx] = updated return updated raise KeyError(approval_id) def deny_public_approval(self, approval_id: str, *, note: str | None = None): approval = self.get_public_approval(approval_id) return self.deny_approval(approval.approval_id, note=note) def seed_task( self, session_id: str, *, capability_name: str = "market_snapshot", description: str = "Fetch market snapshot", status: str = "running", ): record = self._records[session_id] task = record.context.task_registry.create( capability_name, description=description, session_id=session_id, input_payload={"name": "AAPL"}, ) if status == "running": task = record.context.task_registry.mark_running(task.task_id, progress={"stage": "queued"}) elif status == "completed": record.context.task_registry.mark_running(task.task_id, progress={"stage": "queued"}) task = record.context.task_registry.complete( task.task_id, result={"ticker": "AAPL"}, progress={"stage": "done"}, ) self._task_index[task.task_id] = session_id return task def seed_approval( self, session_id: str, *, capability_name: str = "open_chart", tool_name: str = "open_chart", ): record = self._records[session_id] approval = TerraFinHostedApprovalRequest( approval_id="approval:test", created_at=datetime.now(UTC), updated_at=datetime.now(UTC), session_id=session_id, agent_name=self._definition.name, action="invoke", capability_name=capability_name, tool_name=tool_name, side_effecting=True, status="pending", reason="Human approval required before opening a chart.", fingerprint="fingerprint:test", input_payload={"data_or_names": ["AAPL"]}, ) record.approval_requests.append(approval) return approval class _FakeHostedToolAdapter: def __init__(self, tools: tuple[TerraFinToolDefinition, ...]) -> None: self._tools = tools def list_tools_for_agent(self, agent_name: str): _ = agent_name return self._tools def list_tools_for_session(self, session_id: str): _ = session_id return self._tools class _FakeHostedLoop: def __init__(self, *, runtime_configured: bool = True, runtime_setup_message: str | None = None) -> None: self.definition = TerraFinAgentDefinition( name=DEFAULT_HOSTED_AGENT_NAME, description="General market agent.", allowed_capabilities=("market_snapshot", "open_chart"), chart_access=True, allow_background_tasks=True, ) self.internal_definition = TerraFinAgentDefinition( name="warren-buffett", description="Internal guru role.", allowed_capabilities=("market_snapshot",), chart_access=False, allow_background_tasks=False, metadata={"visibility": "internal", "role": "guru"}, ) self.tools = ( TerraFinToolDefinition( name="market_snapshot", capability_name="market_snapshot", description="Fetch a compact market snapshot for a single asset.", input_schema={ "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], "additionalProperties": False, }, execution_mode="invoke", side_effecting=False, ), ) self.runtime = _FakeHostedRuntime(self.definition, self.internal_definition) self.tool_adapter = _FakeHostedToolAdapter(self.tools) self._conversations: dict[str, TerraFinHostedConversation] = {} runtime_model = { "modelRef": "openai/gpt-4.1-mini", "providerId": "openai", "providerLabel": "OpenAI", "modelId": "gpt-4.1-mini", "metadata": {}, } self.model_client = SimpleNamespace( describe_runtime_model=lambda session=None: runtime_model, describe_runtime_status=lambda session=None: { "runtimeModel": runtime_model, "configured": runtime_configured, "message": runtime_setup_message, }, ) def create_session( self, agent_name: str, *, session_id: str | None = None, metadata: dict | None = None, system_prompt: str | None = None, allow_internal: bool = False, ) -> TerraFinHostedConversation: if agent_name == self.internal_definition.name and not allow_internal: raise agent_routes.TerraFinAgentPolicyError("Internal-only agent.") record = self.runtime.create_record( session_id or "hosted:test-session", metadata=metadata, agent_name=agent_name, ) conversation = TerraFinHostedConversation( session_id=record.session_id, agent_name=agent_name, metadata=dict(metadata or {}), messages=[ TerraFinConversationMessage( role="system", content=system_prompt or "You are a hosted TerraFin agent.", ) ], ) self._conversations[conversation.session_id] = conversation record.conversation = conversation return conversation def get_conversation(self, session_id: str) -> TerraFinHostedConversation: return self._conversations[session_id] def submit_user_message(self, session_id: str, content: str) -> TerraFinHostedRunResult: conversation = self._conversations[session_id] user_message = TerraFinConversationMessage(role="user", content=content) assistant_message = TerraFinConversationMessage(role="assistant", content="AAPL") conversation.messages.extend([user_message, assistant_message]) tool_result = TerraFinToolInvocationResult( tool_name="market_snapshot", capability_name="market_snapshot", session_id=session_id, execution_mode="invoke", payload={"ticker": "AAPL"}, task=None, ) return TerraFinHostedRunResult( session_id=session_id, agent_name=conversation.agent_name, final_message=assistant_message, messages_added=(user_message, assistant_message), tool_results=(tool_result,), steps=2, ) def _configure_agent_fakes(monkeypatch) -> None: monkeypatch.setattr(agent_service, "get_data_factory", lambda: _FakeDataFactory()) monkeypatch.setattr(stock_payloads, "get_data_factory", lambda: _FakeDataFactory()) monkeypatch.setattr(agent_service, "get_watchlist_service", lambda: _FakeWatchlistService()) monkeypatch.setattr( stock_payloads, "get_ticker_info", lambda ticker: { "shortName": f"{ticker} Inc.", "sector": "Technology", "industry": "Software", "currentPrice": 150.0, "previousClose": 147.0, "exchange": "NASDAQ", }, ) monkeypatch.setattr( stock_payloads, "get_ticker_earnings", lambda ticker: [ { "date": "2025-12-31", "epsEstimate": "2.10", "epsReported": "2.25", "surprise": "0.15", "surprisePercent": "7.14", } ], ) def _client(monkeypatch, *, hosted_loop=None) -> TestClient: _configure_agent_fakes(monkeypatch) if hosted_loop is not None: monkeypatch.setattr(agent_routes, "get_hosted_agent_loop", lambda: hosted_loop) reset_watchlist_service() return TestClient(create_app()) def test_agent_market_data_contract(monkeypatch) -> None: client = _client(monkeypatch) resp = client.get("/agent/api/market-data?ticker=TEST&depth=auto&view=monthly") assert resp.status_code == 200 payload = resp.json() assert set(payload) == {"ticker", "seriesType", "count", "data", "processing"} assert payload["ticker"] == "TEST" assert payload["processing"]["requestedDepth"] == "auto" assert payload["processing"]["resolvedDepth"] == "recent" assert payload["processing"]["view"] == "monthly" def test_agent_indicators_contract(monkeypatch) -> None: client = _client(monkeypatch) resp = client.get("/agent/api/indicators?ticker=TEST&indicators=rsi,macd,bb,sma_20,realized_vol,range_vol") assert resp.status_code == 200 payload = resp.json() assert set(payload) == {"ticker", "indicators", "unknown", "processing"} assert set(payload["indicators"]) == {"rsi", "macd", "bb", "sma_20", "realized_vol", "range_vol"} assert payload["unknown"] == [] assert payload["processing"]["resolvedDepth"] == "recent" def test_agent_market_snapshot_contract(monkeypatch) -> None: client = _client(monkeypatch) resp = client.get("/agent/api/market-snapshot?ticker=TEST&depth=full") assert resp.status_code == 200 payload = resp.json() assert set(payload) == {"ticker", "price_action", "indicators", "asof", "processing"} assert payload["processing"]["resolvedDepth"] == "full" # `asof` mirrors processing.loadedEnd so the freshness gate can read # the last bar's date without digging through processing metadata. assert payload["asof"] == payload["processing"]["loadedEnd"] assert payload["asof"] is not None def test_agent_market_snapshot_force_refresh_threads_through_route(monkeypatch) -> None: """`force_refresh=true` on the query string must reach the service — that's the lever the worker uses for time-sensitive snapshots when the 24h `yfinance.full` cache TTL may be hiding a freshly-closed bar.""" _configure_agent_fakes(monkeypatch) seen: dict[str, object] = {} original = agent_service.TerraFinAgentService.market_snapshot def _spy(self, name, *, depth="auto", view="daily", force_refresh=False): seen["force_refresh"] = force_refresh return original(self, name, depth=depth, view=view, force_refresh=force_refresh) monkeypatch.setattr(agent_service.TerraFinAgentService, "market_snapshot", _spy) reset_watchlist_service() client = TestClient(create_app()) resp_default = client.get("/agent/api/market-snapshot?ticker=TEST") assert resp_default.status_code == 200 assert seen["force_refresh"] is False resp_force = client.get("/agent/api/market-snapshot?ticker=TEST&force_refresh=true") assert resp_force.status_code == 200 assert seen["force_refresh"] is True def test_agent_resolve_company_earnings_and_financials(monkeypatch) -> None: client = _client(monkeypatch) resolve_resp = client.get("/agent/api/resolve?q=AAPL") company_resp = client.get("/agent/api/company?ticker=AAPL") earnings_resp = client.get("/agent/api/earnings?ticker=AAPL") financials_resp = client.get("/agent/api/financials?ticker=AAPL&statement=income&period=annual") assert resolve_resp.status_code == 200 assert resolve_resp.json()["path"] == "/stock/AAPL" assert company_resp.status_code == 200 assert company_resp.json()["ticker"] == "AAPL" assert "processing" in company_resp.json() assert earnings_resp.status_code == 200 assert earnings_resp.json()["earnings"][0]["epsReported"] == "2.25" assert financials_resp.status_code == 200 assert financials_resp.json()["statement"] == "income" assert len(financials_resp.json()["rows"]) == 2 def test_agent_portfolio_economic_macro_and_calendar(monkeypatch) -> None: client = _client(monkeypatch) portfolio_resp = client.get("/agent/api/portfolio?guru=Test%20Guru") economic_resp = client.get("/agent/api/economic?indicators=UNRATE") macro_resp = client.get("/agent/api/macro-focus?name=S%26P%20500&view=weekly") china_macro_resp = client.get("/agent/api/macro-focus?name=Shanghai%20Composite&view=weekly") calendar_resp = client.get("/agent/api/calendar?year=2026&month=4&categories=macro") assert portfolio_resp.status_code == 200 assert portfolio_resp.json()["count"] == 2 assert economic_resp.status_code == 200 assert "UNRATE" in economic_resp.json()["indicators"] assert "processing" in economic_resp.json() assert macro_resp.status_code == 200 assert macro_resp.json()["info"]["type"] == "index" assert macro_resp.json()["info"]["description"] == "Benchmark U.S. large-cap equity index." assert macro_resp.json()["processing"]["view"] == "weekly" assert china_macro_resp.status_code == 200 assert china_macro_resp.json()["name"] == "Shanghai Composite" assert china_macro_resp.json()["info"]["type"] == "index" assert china_macro_resp.json()["info"]["description"] == ( "Broad mainland China equity benchmark tracking the Shanghai market." ) assert calendar_resp.status_code == 200 assert calendar_resp.json()["count"] == 1 assert calendar_resp.json()["processing"]["resolvedDepth"] == "full" def test_agent_openapi_includes_new_routes(monkeypatch) -> None: client = _client(monkeypatch, hosted_loop=_FakeHostedLoop()) resp = client.get("/openapi.json") assert resp.status_code == 200 paths = resp.json()["paths"] assert "/agent/api/resolve" in paths assert "/agent/api/company" in paths assert "/agent/api/earnings" in paths assert "/agent/api/financials" in paths assert "/agent/api/macro-focus" in paths assert "/agent/api/calendar" in paths assert "/agent/api/runtime/agents" in paths assert "/agent/api/runtime/sessions" in paths assert "/agent/api/runtime/sessions/{session_id}" in paths assert "/agent/api/runtime/view-contexts/{context_id}" in paths assert "/agent/api/runtime/sessions/{session_id}/messages" in paths assert "/agent/api/runtime/sessions/{session_id}/tasks" in paths assert "/agent/api/runtime/sessions/{session_id}/approvals" in paths assert "/agent/api/runtime/tasks/{task_id}" in paths assert "/agent/api/runtime/approvals/{approval_id}" in paths assert "/agent/api/runtime/approvals/{approval_id}/approve" in paths assert "/agent/api/runtime/approvals/{approval_id}/deny" in paths def test_agent_portfolio_returns_503_when_sec_edgar_is_not_configured(monkeypatch) -> None: _configure_agent_fakes(monkeypatch) def _raise_config(self, guru): raise SecEdgarConfigurationError( "SEC EDGAR access is unavailable until `TERRAFIN_SEC_USER_AGENT` is configured." ) monkeypatch.setattr(_FakeDataFactory, "get_portfolio_data", _raise_config) reset_watchlist_service() client = TestClient(create_app()) response = client.get("/agent/api/portfolio?guru=Warren%20Buffett") assert response.status_code == 503 payload = response.json() assert payload["error"]["code"] == "sec_edgar_not_configured" assert payload["error"]["details"]["feature"] == "agent_portfolio" def test_hosted_agent_runtime_routes(monkeypatch) -> None: loop = _FakeHostedLoop() client = _client(monkeypatch, hosted_loop=loop) catalog = client.get("/agent/api/runtime/agents") assert catalog.status_code == 200 catalog_payload = catalog.json() assert catalog_payload["agents"][0]["name"] == DEFAULT_HOSTED_AGENT_NAME assert catalog_payload["agents"][0]["tools"][0]["name"] == "market_snapshot" assert catalog_payload["agents"][0]["runtimeModel"]["modelRef"] == "openai/gpt-4.1-mini" assert catalog_payload["agents"][0]["runtimeConfigured"] is True assert catalog_payload["agents"][0]["runtimeSetupMessage"] is None create_resp = client.post( "/agent/api/runtime/sessions", json={ "agentName": DEFAULT_HOSTED_AGENT_NAME, "sessionId": "hosted:http-test", "metadata": {"thread": "demo"}, }, ) assert create_resp.status_code == 200 created = create_resp.json() assert created["sessionId"] == "hosted:http-test" assert created["agentName"] == DEFAULT_HOSTED_AGENT_NAME assert created["metadata"]["thread"] == "demo" assert created["runtimeModel"]["providerId"] == "openai" assert created["messages"][0]["role"] == "system" session_resp = client.get("/agent/api/runtime/sessions/hosted:http-test") assert session_resp.status_code == 200 assert session_resp.json()["sessionId"] == "hosted:http-test" assert session_resp.json()["runtimeModel"]["modelId"] == "gpt-4.1-mini" sessions_resp = client.get("/agent/api/runtime/sessions") assert sessions_resp.status_code == 200 sessions_payload = sessions_resp.json() assert sessions_payload["sessions"][0]["sessionId"] == "hosted:http-test" assert sessions_payload["sessions"][0]["messageCount"] == 0 view_context_resp = client.put( "/agent/api/runtime/view-contexts/view:buffett", json={ "route": "/market-insights", "pageType": "market-insights", "title": "Warren Buffett Portfolio View", "selection": {"selectedGuru": "Warren Buffett"}, "entities": [{"kind": "portfolio", "id": "Warren Buffett"}], }, ) assert view_context_resp.status_code == 200 assert view_context_resp.json()["contextId"] == "view:buffett" assert view_context_resp.json()["selection"]["selectedGuru"] == "Warren Buffett" get_view_context_resp = client.get("/agent/api/runtime/view-contexts/view:buffett") assert get_view_context_resp.status_code == 200 assert get_view_context_resp.json()["pageType"] == "market-insights" run_resp = client.post( "/agent/api/runtime/sessions/hosted:http-test/messages", json={"content": "Give me AAPL."}, ) assert run_resp.status_code == 200 run_payload = run_resp.json() assert run_payload["steps"] == 2 assert run_payload["finalMessage"]["content"] == "AAPL" assert run_payload["toolResults"][0]["toolName"] == "market_snapshot" assert run_payload["session"]["messages"][-1]["role"] == "assistant" task = loop.runtime.seed_task("hosted:http-test") task_list_resp = client.get("/agent/api/runtime/sessions/hosted:http-test/tasks") assert task_list_resp.status_code == 200 assert task_list_resp.json()["tasks"][0]["taskId"] == task.task_id task_resp = client.get(f"/agent/api/runtime/tasks/{task.task_id}") assert task_resp.status_code == 200 assert task_resp.json()["status"] == "running" cancel_resp = client.post(f"/agent/api/runtime/tasks/{task.task_id}/cancel") assert cancel_resp.status_code == 200 assert cancel_resp.json()["status"] == "cancelled" approval = loop.runtime.seed_approval("hosted:http-test") approval_list_resp = client.get("/agent/api/runtime/sessions/hosted:http-test/approvals") assert approval_list_resp.status_code == 200 assert approval_list_resp.json()["approvals"][0]["approvalId"] == approval.approval_id approval_resp = client.get(f"/agent/api/runtime/approvals/{approval.approval_id}") assert approval_resp.status_code == 200 assert approval_resp.json()["status"] == "pending" approve_resp = client.post( f"/agent/api/runtime/approvals/{approval.approval_id}/approve", json={"note": "Ship it"}, ) assert approve_resp.status_code == 200 assert approve_resp.json()["status"] == "approved" deny_resp = client.post( f"/agent/api/runtime/approvals/{approval.approval_id}/deny", json={"note": "Actually no"}, ) assert deny_resp.status_code == 200 assert deny_resp.json()["status"] == "denied" loop.runtime.seed_task("hosted:http-test") delete_conflict_resp = client.delete("/agent/api/runtime/sessions/hosted:http-test") assert delete_conflict_resp.status_code == 409 completed_task = loop.runtime.seed_task("hosted:http-test", status="completed") assert completed_task.status == "completed" for task_record in loop.runtime.get_session_record("hosted:http-test").context.task_registry.list_for_session( "hosted:http-test" ): if task_record.status not in {"completed", "failed", "cancelled"}: loop.runtime.get_session_record("hosted:http-test").context.task_registry.cancel( task_record.task_id, reason="Settled for delete test", ) delete_resp = client.delete("/agent/api/runtime/sessions/hosted:http-test") assert delete_resp.status_code == 200 assert delete_resp.json()["sessionId"] == "hosted:http-test" deleted_session_resp = client.get("/agent/api/runtime/sessions/hosted:http-test") assert deleted_session_resp.status_code == 404 def test_hosted_agent_runtime_rejects_internal_agent_creation(monkeypatch) -> None: loop = _FakeHostedLoop() client = _client(monkeypatch, hosted_loop=loop) create_resp = client.post( "/agent/api/runtime/sessions", json={ "agentName": "warren-buffett", "sessionId": "hosted:hidden-guru", }, ) assert create_resp.status_code == 404 assert create_resp.json()["error"]["code"] == "hosted_agent_not_found" def test_hosted_agent_runtime_hides_internal_sessions_from_direct_routes(monkeypatch) -> None: loop = _FakeHostedLoop() hidden = loop.create_session( "warren-buffett", session_id="hosted:hidden-guru", metadata={"hiddenInternal": True}, allow_internal=True, ) hidden_task = loop.runtime.seed_task(hidden.session_id) hidden_approval = loop.runtime.seed_approval(hidden.session_id) client = _client(monkeypatch, hosted_loop=loop) session_resp = client.get(f"/agent/api/runtime/sessions/{hidden.session_id}") task_list_resp = client.get(f"/agent/api/runtime/sessions/{hidden.session_id}/tasks") approval_list_resp = client.get(f"/agent/api/runtime/sessions/{hidden.session_id}/approvals") task_resp = client.get(f"/agent/api/runtime/tasks/{hidden_task.task_id}") approval_resp = client.get(f"/agent/api/runtime/approvals/{hidden_approval.approval_id}") assert session_resp.status_code == 404 assert task_list_resp.status_code == 404 assert approval_list_resp.status_code == 404 assert task_resp.status_code == 404 assert approval_resp.status_code == 404 def test_hosted_agent_runtime_routes_report_unconfigured_runtime(monkeypatch) -> None: loop = _FakeHostedLoop( runtime_configured=False, runtime_setup_message="OPENAI_API_KEY is required for the hosted OpenAI agent runtime.", ) client = _client(monkeypatch, hosted_loop=loop) catalog = client.get("/agent/api/runtime/agents") assert catalog.status_code == 200 payload = catalog.json() assert payload["agents"][0]["runtimeConfigured"] is False assert payload["agents"][0]["runtimeSetupMessage"] == ( "OPENAI_API_KEY is required for the hosted OpenAI agent runtime." ) create_resp = client.post( "/agent/api/runtime/sessions", json={ "agentName": DEFAULT_HOSTED_AGENT_NAME, "sessionId": "hosted:http-test", }, ) assert create_resp.status_code == 503 assert create_resp.json()["error"]["code"] == "hosted_agent_not_configured" def test_agent_page_route_is_not_registered(monkeypatch) -> None: client = _client(monkeypatch, hosted_loop=_FakeHostedLoop()) response = client.get("/agent") assert response.status_code == 404