EA_strat_optimizer / frontend /utils /api_client.py
TheQuantEd's picture
deploy: AMD EA Strategy Optimizer — Neo4j + FastAPI + Streamlit
6252f54
"""HTTP client for FastAPI backend."""
import os
import requests
from typing import Any
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8080")
def _url(path: str) -> str:
return f"{BACKEND_URL}{path}"
def get_health() -> dict:
try:
r = requests.get(_url("/api/v1/health"), timeout=5)
r.raise_for_status()
return r.json()
except Exception as exc:
return {"status": "unreachable", "error": str(exc)}
def get_domains() -> list[dict]:
try:
r = requests.get(_url("/api/v1/domains"), timeout=10)
r.raise_for_status()
return r.json()
except Exception:
return []
def get_subdomains(domain_names: list[str]) -> list[dict]:
try:
r = requests.get(
_url("/api/v1/subdomains"),
params=[("domain_names", n) for n in domain_names],
timeout=10,
)
r.raise_for_status()
return r.json()
except Exception:
return []
def get_subdomain_capabilities(subdomain_ids: list[str]) -> list[dict]:
try:
r = requests.get(
_url("/api/v1/subdomain-capabilities"),
params=[("subdomain_ids", sid) for sid in subdomain_ids],
timeout=10,
)
r.raise_for_status()
return r.json()
except Exception:
return []
def analyze(payload: dict, timeout: int = 600) -> dict:
r = requests.post(_url("/api/v1/analyze"), json=payload, timeout=timeout)
r.raise_for_status()
return r.json()
def get_training_metrics() -> list[dict]:
try:
r = requests.get(_url("/api/v1/training/metrics"), timeout=10)
r.raise_for_status()
return r.json()
except Exception:
return []
def get_training_coverage() -> list[dict]:
try:
r = requests.get(_url("/api/v1/training/coverage"), timeout=10)
r.raise_for_status()
return r.json()
except Exception:
return []
def trigger_training(episodes_per_domain: int = 50, domain: str | None = None) -> dict:
payload: dict = {"episodes_per_domain": episodes_per_domain}
if domain:
payload["domain"] = domain
try:
r = requests.post(_url("/api/v1/training/run"), json=payload, timeout=15)
r.raise_for_status()
return r.json()
except Exception as exc:
return {"status": "error", "message": str(exc)}
# ---------------------------------------------------------------------------
# Chat
# ---------------------------------------------------------------------------
def chat(
message: str,
history: list[dict],
domain_filter: str | None = None,
session_id: str | None = None,
) -> dict:
"""Non-streaming chat — returns {reply, sources, suggested_action, enrich_triggered}."""
try:
payload: dict = {"message": message, "history": history}
if domain_filter:
payload["domain_filter"] = domain_filter
if session_id:
payload["session_id"] = session_id
r = requests.post(_url("/api/v1/chat"), json=payload, timeout=120)
r.raise_for_status()
return r.json()
except Exception as exc:
return {"reply": f"Error: {exc}", "sources": [], "suggested_action": None,
"enrich_triggered": False, "enrich_domains": []}
def stream_chat(message: str, history: list[dict], session_id: str = ""):
"""
Generator: streams SSE tokens from /api/v1/chat/stream.
Yields text chunks. Final SSE event stores sources + enrichment info
in st.session_state['_last_chat_sources'] and ['_last_enrich_info'].
"""
import json as _json
params = {"message": message, "history": _json.dumps(history), "session_id": session_id}
try:
with requests.get(
_url("/api/v1/chat/stream"), params=params, stream=True, timeout=600
) as r:
r.raise_for_status()
for raw in r.iter_lines():
if not raw:
continue
line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="replace")
if not line.startswith("data: "):
continue
try:
event = _json.loads(line[6:])
except Exception:
continue
if event.get("text"):
yield event["text"]
elif event.get("done"):
try:
import streamlit as _st
_st.session_state["_last_chat_sources"] = event.get("sources", [])
_st.session_state["_last_enrich_info"] = {
"triggered": event.get("enrich_triggered", False),
"domains": event.get("enrich_domains", []),
"run_id": event.get("enrich_run_id"),
}
except Exception:
pass
except Exception as exc:
yield f"\n\n*Stream error: {exc}*"
# ---------------------------------------------------------------------------
# Chat session management
# ---------------------------------------------------------------------------
def create_or_touch_session(session_id: str, title: str = "") -> dict:
"""POST /api/v1/chat/sessions — create or touch a session."""
try:
r = requests.post(
_url("/api/v1/chat/sessions"),
params={"session_id": session_id, "title": title},
timeout=10,
)
r.raise_for_status()
return r.json()
except Exception as exc:
return {"session_id": session_id, "error": str(exc)}
def get_recent_sessions() -> list[dict]:
"""GET /api/v1/chat/sessions — list recent sessions."""
try:
r = requests.get(_url("/api/v1/chat/sessions"), timeout=10)
r.raise_for_status()
return r.json()
except Exception:
return []
def get_session_messages(session_id: str) -> list[dict]:
"""GET /api/v1/chat/sessions/{id}/messages — load full session history."""
try:
r = requests.get(_url(f"/api/v1/chat/sessions/{session_id}/messages"), timeout=10)
r.raise_for_status()
return r.json().get("messages", [])
except Exception:
return []
def delete_session(session_id: str) -> bool:
"""DELETE /api/v1/chat/sessions/{id}."""
try:
r = requests.delete(_url(f"/api/v1/chat/sessions/{session_id}"), timeout=10)
return r.status_code == 200
except Exception:
return False
# ---------------------------------------------------------------------------
# Graph network
# ---------------------------------------------------------------------------
def get_domain_detail(domain_name: str) -> dict:
"""Returns {domain, standard, trend, subdomain_groups} for the Graph Explorer detail panel."""
try:
r = requests.get(
_url("/api/v1/graph/domain-detail"),
params={"domain_name": domain_name},
timeout=15,
)
r.raise_for_status()
return r.json()
except Exception:
return {"domain": {}, "standard": None, "trend": None, "subdomain_groups": []}
def get_network_graph() -> dict:
"""Returns {nodes: [...], edges: [...]} for the knowledge graph explorer."""
try:
r = requests.get(_url("/api/v1/graph/network"), timeout=15)
r.raise_for_status()
return r.json()
except Exception:
return {"nodes": [], "edges": []}
# ---------------------------------------------------------------------------
# Integrations
# ---------------------------------------------------------------------------
def export_to_jira(payload: dict) -> dict:
"""POST to /api/v1/integrations/jira/export — returns {created_epics, created_stories, errors}."""
r = requests.post(_url("/api/v1/integrations/jira/export"), json=payload, timeout=120)
r.raise_for_status()
return r.json()
def connect_itsm(tool: str, instance_url: str, credentials: dict) -> dict:
"""POST to /api/v1/integrations/itsm/connect — mock integration test."""
payload = {"tool": tool, "instance_url": instance_url, "credentials": credentials}
r = requests.post(_url("/api/v1/integrations/itsm/connect"), json=payload, timeout=15)
r.raise_for_status()
return r.json()
def ingest_erp_csv(file_bytes: bytes, filename: str) -> dict:
"""POST multipart CSV to /api/v1/integrations/erp/ingest."""
files = {"file": (filename, file_bytes, "text/csv")}
r = requests.post(_url("/api/v1/integrations/erp/ingest"), files=files, timeout=60)
r.raise_for_status()
return r.json()
def get_archimate() -> dict:
"""GET /api/v1/integrations/archimate — returns {business, application, technology}."""
try:
r = requests.get(_url("/api/v1/integrations/archimate"), timeout=20)
r.raise_for_status()
return r.json()
except Exception:
return {"business": [], "application": [], "technology": []}