"""HTTP client for FastAPI backend.""" import os import requests from typing import Any BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8080") def _url(path: str) -> str: return f"{BACKEND_URL}{path}" def get_health() -> dict: try: r = requests.get(_url("/api/v1/health"), timeout=5) r.raise_for_status() return r.json() except Exception as exc: return {"status": "unreachable", "error": str(exc)} def get_domains() -> list[dict]: try: r = requests.get(_url("/api/v1/domains"), timeout=10) r.raise_for_status() return r.json() except Exception: return [] def get_subdomains(domain_names: list[str]) -> list[dict]: try: r = requests.get( _url("/api/v1/subdomains"), params=[("domain_names", n) for n in domain_names], timeout=10, ) r.raise_for_status() return r.json() except Exception: return [] def get_subdomain_capabilities(subdomain_ids: list[str]) -> list[dict]: try: r = requests.get( _url("/api/v1/subdomain-capabilities"), params=[("subdomain_ids", sid) for sid in subdomain_ids], timeout=10, ) r.raise_for_status() return r.json() except Exception: return [] def analyze(payload: dict, timeout: int = 600) -> dict: r = requests.post(_url("/api/v1/analyze"), json=payload, timeout=timeout) r.raise_for_status() return r.json() def get_training_metrics() -> list[dict]: try: r = requests.get(_url("/api/v1/training/metrics"), timeout=10) r.raise_for_status() return r.json() except Exception: return [] def get_training_coverage() -> list[dict]: try: r = requests.get(_url("/api/v1/training/coverage"), timeout=10) r.raise_for_status() return r.json() except Exception: return [] def trigger_training(episodes_per_domain: int = 50, domain: str | None = None) -> dict: payload: dict = {"episodes_per_domain": episodes_per_domain} if domain: payload["domain"] = domain try: r = requests.post(_url("/api/v1/training/run"), json=payload, timeout=15) r.raise_for_status() return r.json() except Exception as exc: return {"status": "error", "message": str(exc)} # --------------------------------------------------------------------------- # Chat # --------------------------------------------------------------------------- def chat( message: str, history: list[dict], domain_filter: str | None = None, session_id: str | None = None, ) -> dict: """Non-streaming chat — returns {reply, sources, suggested_action, enrich_triggered}.""" try: payload: dict = {"message": message, "history": history} if domain_filter: payload["domain_filter"] = domain_filter if session_id: payload["session_id"] = session_id r = requests.post(_url("/api/v1/chat"), json=payload, timeout=120) r.raise_for_status() return r.json() except Exception as exc: return {"reply": f"Error: {exc}", "sources": [], "suggested_action": None, "enrich_triggered": False, "enrich_domains": []} def stream_chat(message: str, history: list[dict], session_id: str = ""): """ Generator: streams SSE tokens from /api/v1/chat/stream. Yields text chunks. Final SSE event stores sources + enrichment info in st.session_state['_last_chat_sources'] and ['_last_enrich_info']. """ import json as _json params = {"message": message, "history": _json.dumps(history), "session_id": session_id} try: with requests.get( _url("/api/v1/chat/stream"), params=params, stream=True, timeout=600 ) as r: r.raise_for_status() for raw in r.iter_lines(): if not raw: continue line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="replace") if not line.startswith("data: "): continue try: event = _json.loads(line[6:]) except Exception: continue if event.get("text"): yield event["text"] elif event.get("done"): try: import streamlit as _st _st.session_state["_last_chat_sources"] = event.get("sources", []) _st.session_state["_last_enrich_info"] = { "triggered": event.get("enrich_triggered", False), "domains": event.get("enrich_domains", []), "run_id": event.get("enrich_run_id"), } except Exception: pass except Exception as exc: yield f"\n\n*Stream error: {exc}*" # --------------------------------------------------------------------------- # Chat session management # --------------------------------------------------------------------------- def create_or_touch_session(session_id: str, title: str = "") -> dict: """POST /api/v1/chat/sessions — create or touch a session.""" try: r = requests.post( _url("/api/v1/chat/sessions"), params={"session_id": session_id, "title": title}, timeout=10, ) r.raise_for_status() return r.json() except Exception as exc: return {"session_id": session_id, "error": str(exc)} def get_recent_sessions() -> list[dict]: """GET /api/v1/chat/sessions — list recent sessions.""" try: r = requests.get(_url("/api/v1/chat/sessions"), timeout=10) r.raise_for_status() return r.json() except Exception: return [] def get_session_messages(session_id: str) -> list[dict]: """GET /api/v1/chat/sessions/{id}/messages — load full session history.""" try: r = requests.get(_url(f"/api/v1/chat/sessions/{session_id}/messages"), timeout=10) r.raise_for_status() return r.json().get("messages", []) except Exception: return [] def delete_session(session_id: str) -> bool: """DELETE /api/v1/chat/sessions/{id}.""" try: r = requests.delete(_url(f"/api/v1/chat/sessions/{session_id}"), timeout=10) return r.status_code == 200 except Exception: return False # --------------------------------------------------------------------------- # Graph network # --------------------------------------------------------------------------- def get_domain_detail(domain_name: str) -> dict: """Returns {domain, standard, trend, subdomain_groups} for the Graph Explorer detail panel.""" try: r = requests.get( _url("/api/v1/graph/domain-detail"), params={"domain_name": domain_name}, timeout=15, ) r.raise_for_status() return r.json() except Exception: return {"domain": {}, "standard": None, "trend": None, "subdomain_groups": []} def get_network_graph() -> dict: """Returns {nodes: [...], edges: [...]} for the knowledge graph explorer.""" try: r = requests.get(_url("/api/v1/graph/network"), timeout=15) r.raise_for_status() return r.json() except Exception: return {"nodes": [], "edges": []} # --------------------------------------------------------------------------- # Integrations # --------------------------------------------------------------------------- def export_to_jira(payload: dict) -> dict: """POST to /api/v1/integrations/jira/export — returns {created_epics, created_stories, errors}.""" r = requests.post(_url("/api/v1/integrations/jira/export"), json=payload, timeout=120) r.raise_for_status() return r.json() def connect_itsm(tool: str, instance_url: str, credentials: dict) -> dict: """POST to /api/v1/integrations/itsm/connect — mock integration test.""" payload = {"tool": tool, "instance_url": instance_url, "credentials": credentials} r = requests.post(_url("/api/v1/integrations/itsm/connect"), json=payload, timeout=15) r.raise_for_status() return r.json() def ingest_erp_csv(file_bytes: bytes, filename: str) -> dict: """POST multipart CSV to /api/v1/integrations/erp/ingest.""" files = {"file": (filename, file_bytes, "text/csv")} r = requests.post(_url("/api/v1/integrations/erp/ingest"), files=files, timeout=60) r.raise_for_status() return r.json() def get_archimate() -> dict: """GET /api/v1/integrations/archimate — returns {business, application, technology}.""" try: r = requests.get(_url("/api/v1/integrations/archimate"), timeout=20) r.raise_for_status() return r.json() except Exception: return {"business": [], "application": [], "technology": []}