""" EA Advisor β€” continuous streaming chat with Neo4j session persistence and automatic DRL graph enrichment. """ import re import uuid import streamlit as st from frontend.utils.api_client import ( stream_chat, chat as api_chat, create_or_touch_session, get_recent_sessions, get_session_messages, delete_session, ) _WELCOME = ( "Hello! I'm your **Enterprise Architecture Advisor**, powered by **AMD MI300X** and " "**Qwen-72B** with live access to a knowledge graph of **1,416 capabilities across 44 domains**.\n\n" "I can help you:\n" "- Explore governance standards and compliance requirements\n" "- Identify the right capabilities for your organisation\n" "- Understand cross-domain architecture patterns\n" "- Generate a strategic transformation roadmap\n\n" "What would you like to explore?" ) _ROADMAP_KEYWORDS = { "roadmap", "strategy", "strategic", "plan", "planning", "implement", "transform", "programme", "program", "initiative", } # --------------------------------------------------------------------------- # Think-block helpers # --------------------------------------------------------------------------- def _split_think(text: str) -> tuple[str, str]: """ Split a model response into (think_content, response_text). Returns: ("", text) β€” no block present (partial, "") β€” started but not yet seen (still streaming) (think, response) β€” complete block; response is everything outside the tags """ if not text: return "", "" start = text.find("") if start == -1: return "", text end = text.find("", start) if end == -1: return text[start + 7:].strip(), "" think_content = text[start + 7:end].strip() response = (text[:start] + text[end + 9:]).strip() return think_content, response def _think_html(think_text: str) -> str: """Render think content as a collapsible HTML details block.""" return ( '
' '' 'πŸ’­ Reasoning (click to expand / collapse)' '' f'\n\n{think_text}\n\n' '
' ) def _render_message_content(content: str): """Render a stored message, collapsing any block.""" think, response = _split_think(content) if think: st.markdown(_think_html(think), unsafe_allow_html=True) st.markdown(response if response else (content if not think else "")) def _stream_assistant_response(prompt: str, history_for_api: list, session_id: str) -> str: """ Stream the response into two Streamlit placeholders: - think_slot β†’ collapsed
block while model reasons - reply_slot β†’ the actual answer, streamed token-by-token Returns the clean response text (no think tags) for storing in history. """ think_slot = st.empty() reply_slot = st.empty() full = "" reply_text = "" try: for token in stream_chat(prompt, history_for_api, session_id): full += token think, response = _split_think(full) if think and response: # Think block fully closed β€” show collapsed + stream response think_slot.markdown(_think_html(think), unsafe_allow_html=True) reply_slot.markdown(response + "β–Œ") reply_text = response elif think and not response: # Still inside think block β€” show live word count word_count = len(think.split()) think_slot.markdown( f'

' f'πŸ’­ Reasoning… ({word_count} words)

', unsafe_allow_html=True, ) else: # No think block at all β€” plain response reply_slot.markdown(full + "β–Œ") reply_text = full # Final render β€” remove cursor, ensure think block is collapsed think, response = _split_think(full) if think: think_slot.markdown(_think_html(think), unsafe_allow_html=True) final_reply = response if response else (full if not think else "") reply_slot.markdown(final_reply) reply_text = final_reply except Exception as exc: st.warning(f"Streaming unavailable β€” switching to standard mode: {exc}") try: resp = api_chat(prompt, history_for_api, session_id=session_id) raw = resp.get("reply", "") think, response = _split_think(raw) if think: think_slot.markdown(_think_html(think), unsafe_allow_html=True) reply_text = response if response else raw reply_slot.markdown(reply_text) st.session_state["_last_chat_sources"] = resp.get("sources", []) st.session_state["_last_enrich_info"] = { "triggered": resp.get("enrich_triggered", False), "domains": resp.get("enrich_domains", []), } except Exception as exc2: reply_text = f"Error contacting EA Advisor: {exc2}" reply_slot.error(reply_text) return reply_text # --------------------------------------------------------------------------- # Session management helpers # --------------------------------------------------------------------------- def _new_session_id() -> str: return uuid.uuid4().hex def _ensure_session() -> str: if "chat_session_id" not in st.session_state: sid = _new_session_id() st.session_state["chat_session_id"] = sid st.session_state["chat_history"] = [{"role": "assistant", "content": _WELCOME}] create_or_touch_session(sid, "New Conversation") return st.session_state["chat_session_id"] def _load_session(session_id: str, title: str = ""): messages = get_session_messages(session_id) if messages: st.session_state["chat_session_id"] = session_id st.session_state["chat_session_title"] = title history = [] for m in messages: history.append({ "role": m["role"], "content": m["content"], "sources": m.get("sources", []), }) st.session_state["chat_history"] = history else: st.session_state["chat_session_id"] = session_id st.session_state["chat_session_title"] = title st.session_state["chat_history"] = [{"role": "assistant", "content": _WELCOME}] def _start_new_session(): sid = _new_session_id() st.session_state["chat_session_id"] = sid st.session_state["chat_session_title"] = "New Conversation" st.session_state["chat_history"] = [{"role": "assistant", "content": _WELCOME}] st.session_state["_last_chat_sources"] = [] st.session_state["_last_enrich_info"] = {} create_or_touch_session(sid, "New Conversation") # --------------------------------------------------------------------------- # Session panel # --------------------------------------------------------------------------- def _render_session_panel(): with st.expander("Conversation History", expanded=False): col_new, col_refresh = st.columns([3, 1]) with col_new: if st.button("New Conversation", type="primary", width='stretch'): _start_new_session() st.rerun() with col_refresh: if st.button("↻", help="Refresh session list", width='stretch'): st.session_state.pop("_cached_sessions", None) if "_cached_sessions" not in st.session_state: st.session_state["_cached_sessions"] = get_recent_sessions() sessions = st.session_state["_cached_sessions"] current_sid = st.session_state.get("chat_session_id", "") if not sessions: st.caption("No saved conversations yet.") else: st.caption(f"{len(sessions)} saved conversation(s)") for s in sessions: sid = s["session_id"] title = s.get("title", "Untitled")[:45] n_msgs = s.get("message_count", 0) last = (s.get("last_active", "") or "")[:16].replace("T", " ") is_current = sid == current_sid row_col, del_col = st.columns([5, 1]) with row_col: label = f"{'β–Ά ' if is_current else ''}{title}" if st.button( label, key=f"sess_{sid}", disabled=is_current, width='stretch', help=f"{n_msgs} messages Β· {last}", ): _load_session(sid, s.get("title", "")) st.session_state.pop("_cached_sessions", None) st.rerun() with del_col: if st.button("πŸ—‘", key=f"del_{sid}", help="Delete this conversation"): delete_session(sid) st.session_state.pop("_cached_sessions", None) if is_current: _start_new_session() st.rerun() # --------------------------------------------------------------------------- # Main render # --------------------------------------------------------------------------- def render_chat_tab(): st.subheader("EA Advisor") st.caption( "Conversational AI for enterprise architecture " "β€” AMD MI300X Β· Qwen-72B Β· Knowledge Graph RAG Β· 1,416 capabilities" ) session_id = _ensure_session() if "_last_chat_sources" not in st.session_state: st.session_state["_last_chat_sources"] = [] if "_last_enrich_info" not in st.session_state: st.session_state["_last_enrich_info"] = {} _render_session_panel() enrich_info = st.session_state.get("_last_enrich_info", {}) if enrich_info.get("triggered"): domains_str = ", ".join(enrich_info.get("domains", [])) st.toast(f"DRL enrichment started for: {domains_str}", icon="🧠") st.session_state["_last_enrich_info"] = {} # ── Render persisted messages ───────────────────────────────────────────── for msg in st.session_state.get("chat_history", []): with st.chat_message(msg["role"]): _render_message_content(msg["content"]) srcs = msg.get("sources") or [] if srcs: with st.expander(f"Knowledge Graph Sources ({len(srcs)})", expanded=False): for src in srcs: st.caption( f"β€’ **{src.get('name', '')}** β€” {src.get('domain', '')}" + (f" Β· {src.get('standard', '')}" if src.get("standard") else "") ) # ── Chat input ──────────────────────────────────────────────────────────── if prompt := st.chat_input("Ask about enterprise architecture, governance, or capabilities…"): st.session_state["chat_history"].append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) history_for_api = [ {"role": m["role"], "content": m["content"]} for m in st.session_state["chat_history"][:-1] ] st.session_state["_last_chat_sources"] = [] st.session_state["_last_enrich_info"] = {} with st.chat_message("assistant"): full_response = _stream_assistant_response(prompt, history_for_api, session_id) sources = list(st.session_state.get("_last_chat_sources", [])) # Store the clean response (think block stripped) so history replays cleanly st.session_state["chat_history"].append({ "role": "assistant", "content": full_response, "sources": sources, }) st.session_state.pop("_cached_sessions", None) if any(kw in prompt.lower() for kw in _ROADMAP_KEYWORDS): st.info( "Switch to the **Strategic Roadmap** tab to generate a full, " "Jira-ready roadmap with Epics, Features, User Stories, and Tasks.", icon="πŸ—ΊοΈ", ) st.rerun()