# file: ui/streamlit_app.py import streamlit as st import requests import json from datetime import datetime import pandas as pd import time from collections import defaultdict import os st.set_page_config( page_title="Lucidya MCP Prototype", page_icon="๐ŸŽฏ", layout="wide" ) st.title("๐ŸŽฏ Lucidya Multi-Agent CX Platform") st.caption("Real-time agent orchestration with Ollama streaming and MCP integration") # Configure API base via environment; default to loopback API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000") # Initialize session state if "pipeline_logs" not in st.session_state: st.session_state.pipeline_logs = [] if "current_prospect" not in st.session_state: st.session_state.current_prospect = None if "company_outputs" not in st.session_state: st.session_state.company_outputs = {} if "handoff_packets" not in st.session_state: st.session_state.handoff_packets = {} # Sidebar with st.sidebar: st.header("System Status") # Health check try: resp = requests.get(f"{API_BASE}/health", timeout=8) health = resp.json() if health.get("status") == "healthy": st.success("โœ… System Healthy") with st.expander("System Components"): # Ollama status ollama_status = health.get("ollama", {}) if ollama_status.get("connected"): st.success(f"โœ… Ollama: {ollama_status.get('model', 'Unknown')}") else: st.error("โŒ Ollama: Disconnected") # MCP servers status mcp_status = health.get("mcp", {}) for server, status in mcp_status.items(): if status == "healthy": st.success(f"โœ… MCP {server.title()}: Running") else: st.error(f"โŒ MCP {server.title()}: {status}") # Vector store status if health.get("vector_store"): st.success("โœ… Vector Store: Initialized") else: st.warning("โš ๏ธ Vector Store: Not initialized") else: st.error("โŒ System Unhealthy") except Exception as e: st.error(f"โŒ API Offline at {API_BASE}: {e}") st.divider() # System controls st.header("System Controls") col1, col2 = st.columns(2) with col1: if st.button("๐Ÿ”„ Reset", help="Clear all data and reload"): with st.spinner("Resetting..."): try: result = requests.post(f"{API_BASE}/reset").json() st.success(f"โœ… Reset: {result['companies_loaded']} companies") st.session_state.company_outputs = {} st.rerun() except Exception as e: st.error(f"Reset failed: {e}") with col2: if st.button("๐Ÿ” Check", help="Verify system health"): st.rerun() # Main tabs tab1, tab2, tab3, tab4 = st.tabs(["๐Ÿš€ Pipeline", "๐Ÿ“Š Prospects", "๐Ÿ” Details", "๐Ÿงช Dev Tools"]) # Pipeline Tab with tab1: st.header("Pipeline Execution") st.markdown("Watch the complete agent orchestration workflow with MCP interactions in real-time") # Pipeline controls col1, col2, col3 = st.columns([3, 2, 1]) with col1: company_ids = st.text_input( "Company IDs", placeholder="acme,techcorp,retailplus (or leave empty for all)", help="Comma-separated list of company IDs to process" ) with col2: display_mode = st.selectbox( "Display Mode", ["Complete Workflow", "Summary Only", "Content Only"], help="Choose what information to display" ) with col3: st.write("") # Spacer st.write("") # Spacer if st.button("โ–ถ๏ธ Run Pipeline", type="primary", use_container_width=True): st.session_state.running = True st.session_state.pipeline_logs = [] st.session_state.company_outputs = {} # Pipeline execution display if st.session_state.get("running"): # Create display containers progress_container = st.container() with progress_container: progress_bar = st.progress(0, text="Initializing pipeline...") status_text = st.empty() # Main display area if display_mode == "Complete Workflow": # Create columns for workflow and content col1, col2 = st.columns([3, 2]) with col1: st.subheader("๐Ÿ”„ Agent Workflow & MCP Interactions") workflow_container = st.container() workflow_display = workflow_container.empty() with col2: st.subheader("๐Ÿ“ Generated Content by Company") # Single placeholder updated on each token content_area = st.empty() elif display_mode == "Content Only": st.subheader("๐Ÿ“ Generated Content by Company") content_area = st.empty() else: # Summary Only st.subheader("๐Ÿ“‹ Execution Summary") summary_container = st.empty() # Process the pipeline stream try: # Parse company IDs ids = None if company_ids: ids = [id.strip() for id in company_ids.split(",") if id.strip()] # Start streaming response = requests.post( f"{API_BASE}/run", json={"company_ids": ids}, stream=True, timeout=60 ) # Initialize tracking variables workflow_logs = [] current_agent = None current_company = None agents_completed = set() total_agents = 8 company_outputs = defaultdict(lambda: {"summary": "", "email": "", "status": "processing"}) mcp_interactions = [] # Helper to render the accumulated content once per update def render_content(): if display_mode == "Summary Only": return lines = [] for company in sorted(company_outputs.keys()): outputs = company_outputs[company] lines.append(f"### ๐Ÿข {company}\n") # Summary lines.append("**๐Ÿ“ Summary**") summary_text = outputs.get("final_summary") or outputs.get("summary") or "" lines.append(summary_text if summary_text else "_No summary yet_\n") # Email lines.append("**โœ‰๏ธ Email Draft**") email_val = outputs.get("final_email") or outputs.get("email") or "" if isinstance(email_val, dict): subj = email_val.get("subject", "") body = email_val.get("body", "") lines.append(f"Subject: {subj}\n\n{body}\n") elif email_val: lines.append(f"{email_val}\n") else: lines.append("_No email yet_\n") lines.append("\n---\n") # Overwrite the single placeholder with the assembled markdown content_area.markdown("\n".join(lines)) # Process stream for line in response.iter_lines(): if line: try: event = json.loads(line) # Track current company payload = event.get("payload", {}) if payload.get("company_name"): current_company = payload["company_name"] elif payload.get("company"): current_company = payload["company"] elif payload.get("prospect", {}).get("company", {}).get("name"): current_company = payload["prospect"]["company"]["name"] # Update progress if event.get("agent"): current_agent = event["agent"] if event["type"] == "agent_end": agents_completed.add(current_agent) progress = len(agents_completed) / total_agents progress_bar.progress(progress, text=f"Processing: {current_agent.title()} ({len(agents_completed)}/{total_agents})") # Handle different event types if event["type"] == "agent_start": workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": event["agent"].title(), "๐Ÿ“Œ Action": "โ–ถ๏ธ Started", "๐Ÿข Company": current_company or "All", "๐Ÿ’ฌ Details": event["message"] }) status_text.info(f"๐Ÿ”„ {event['agent'].title()}: {event['message']}") elif event["type"] == "mcp_call": mcp_server = event["payload"].get("mcp_server", "unknown") method = event["payload"].get("method", "unknown") workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": current_agent.title() if current_agent else "System", "๐Ÿ“Œ Action": f"๐Ÿ”Œ MCP Call", "๐Ÿข Company": current_company or "All", "๐Ÿ’ฌ Details": f"โ†’ {mcp_server.upper()}: {method}" }) elif event["type"] == "mcp_response": mcp_server = event["payload"].get("mcp_server", "unknown") workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": current_agent.title() if current_agent else "System", "๐Ÿ“Œ Action": f"๐Ÿ“ฅ MCP Response", "๐Ÿข Company": current_company or "All", "๐Ÿ’ฌ Details": f"โ† {mcp_server.upper()}: {event['message']}" }) elif event["type"] == "agent_end": details = event["message"] if event.get("payload"): payload = event["payload"] extra = [] if "facts_count" in payload: extra.append(f"Facts: {payload['facts_count']}") if "contacts_count" in payload: extra.append(f"Contacts: {payload['contacts_count']}") if "fit_score" in payload: extra.append(f"Score: {payload['fit_score']:.2f}") if "thread_id" in payload: extra.append(f"Thread: {payload['thread_id'][:8]}...") if extra: details += f" ({', '.join(extra)})" workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": event["agent"].title(), "๐Ÿ“Œ Action": "โœ… Completed", "๐Ÿข Company": current_company or "All", "๐Ÿ’ฌ Details": details }) elif event["type"] == "company_start": company = event["payload"]["company"] industry = event["payload"].get("industry", "Unknown") size = event["payload"].get("size", 0) workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": "Writer", "๐Ÿ“Œ Action": "๐Ÿข Company", "๐Ÿข Company": company, "๐Ÿ’ฌ Details": f"Starting: {company} ({industry}, {size} employees)" }) elif event["type"] == "llm_token": payload = event.get("payload", {}) token = payload.get("token", "") token_type = payload.get("type", "") company = payload.get("company_name") or payload.get("company") or current_company if company and display_mode != "Summary Only": if token_type == "summary": company_outputs[company]["summary"] += token elif token_type == "email": company_outputs[company]["email"] += token # Update the single content area render_content() elif event["type"] == "llm_done": payload = event.get("payload", {}) company = payload.get("company_name") or payload.get("company") or current_company if company: company_outputs[company]["status"] = "completed" if "summary" in payload: company_outputs[company]["final_summary"] = payload["summary"] if "email" in payload: company_outputs[company]["final_email"] = payload["email"] render_content() workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": "Writer", "๐Ÿ“Œ Action": "โœ… Generated", "๐Ÿข Company": company or "Unknown", "๐Ÿ’ฌ Details": "Content generation complete" }) elif event["type"] == "policy_block": workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": "Compliance", "๐Ÿ“Œ Action": "โŒ Blocked", "๐Ÿข Company": current_company or "Unknown", "๐Ÿ’ฌ Details": event["payload"].get("reason", "Policy violation") }) elif event["type"] == "policy_pass": workflow_logs.append({ "โฐ Time": datetime.now().strftime("%H:%M:%S"), "๐Ÿค– Agent": "Compliance", "๐Ÿ“Œ Action": "โœ… Passed", "๐Ÿข Company": current_company or "Unknown", "๐Ÿ’ฌ Details": "All compliance checks passed" }) # Update displays based on mode if display_mode == "Complete Workflow": # Update workflow display if workflow_logs: df = pd.DataFrame(workflow_logs[-50:]) # Show last 50 entries workflow_display.dataframe( df, use_container_width=True, hide_index=True, height=400 ) # Content display handled by render_content() elif display_mode == "Content Only": # Content display handled by render_content() pass else: # Summary Only # Show high-level statistics summary_stats = { "Total Events": len(workflow_logs), "Agents Run": len(agents_completed), "Companies Processed": len(set(log.get("๐Ÿข Company", "Unknown") for log in workflow_logs if log.get("๐Ÿข Company") != "All")), "MCP Calls": len([log for log in workflow_logs if "MCP Call" in log.get("๐Ÿ“Œ Action", "")]), "MCP Responses": len([log for log in workflow_logs if "MCP Response" in log.get("๐Ÿ“Œ Action", "")]), "Current Agent": current_agent.title() if current_agent else "None", "Current Company": current_company or "None" } summary_container.json(summary_stats) except json.JSONDecodeError: continue except Exception as e: st.error(f"Error processing event: {e}") # Pipeline complete progress_bar.progress(1.0, text="โœ… Pipeline Complete!") status_text.success("โœ… Pipeline execution completed successfully!") # Store outputs in session state st.session_state.pipeline_logs = workflow_logs st.session_state.company_outputs = dict(company_outputs) # Show final summary st.divider() st.subheader("๐Ÿ“Š Execution Summary") # Calculate statistics companies_processed = set(log.get("๐Ÿข Company", "Unknown") for log in workflow_logs if log.get("๐Ÿข Company") not in ["All", None]) mcp_calls = [log for log in workflow_logs if "MCP Call" in log.get("๐Ÿ“Œ Action", "")] mcp_responses = [log for log in workflow_logs if "MCP Response" in log.get("๐Ÿ“Œ Action", "")] col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric("Total Events", len(workflow_logs)) with col2: st.metric("Companies", len(companies_processed)) with col3: st.metric("Agents Run", len(agents_completed)) with col4: st.metric("MCP Calls", len(mcp_calls)) with col5: st.metric("MCP Responses", len(mcp_responses)) # Show MCP interaction summary if mcp_calls or mcp_responses: with st.expander("๐Ÿ”Œ MCP Server Interactions"): mcp_servers = defaultdict(int) for log in workflow_logs: if "MCP" in log.get("๐Ÿ“Œ Action", ""): details = log.get("๐Ÿ’ฌ Details", "") for server in ["STORE", "SEARCH", "EMAIL", "CALENDAR", "VECTOR", "OLLAMA"]: if server in details.upper(): mcp_servers[server] += 1 if mcp_servers: mcp_df = pd.DataFrame( [(server, count) for server, count in mcp_servers.items()], columns=["MCP Server", "Interactions"] ) st.dataframe(mcp_df, hide_index=True) except requests.exceptions.Timeout: st.error("โฑ๏ธ Pipeline timeout - please check if Ollama is running") except Exception as e: st.error(f"Pipeline error: {str(e)}") finally: st.session_state.running = False # Show stored outputs if available elif st.session_state.company_outputs: st.subheader("๐Ÿ“‹ Previous Execution Results") company_outputs = st.session_state.company_outputs if company_outputs: # Create tabs for each company company_names = list(company_outputs.keys()) if company_names: tabs = st.tabs([f"๐Ÿข {name}" for name in company_names]) for i, (company, outputs) in enumerate(company_outputs.items()): with tabs[i]: col1, col2 = st.columns(2) with col1: st.markdown("### ๐Ÿ“ Summary") if outputs.get("final_summary"): st.markdown(outputs["final_summary"]) elif outputs.get("summary"): st.markdown(outputs["summary"]) else: st.info("No summary available") with col2: st.markdown("### โœ‰๏ธ Email Draft") if outputs.get("final_email"): email = outputs["final_email"] if isinstance(email, dict): st.write(f"**Subject:** {email.get('subject', '')}") st.markdown(f"**Body:**\n{email.get('body', '')}") else: st.markdown(email) elif outputs.get("email"): st.markdown(outputs["email"]) else: st.info("No email available") # Prospects Tab with tab2: st.header("Prospects Overview") st.markdown("View all prospects and their current status in the pipeline") # Refresh controls col1, col2 = st.columns([6, 1]) with col2: if st.button("๐Ÿ”„ Refresh", use_container_width=True): st.rerun() try: prospects_data = requests.get(f"{API_BASE}/prospects").json() if prospects_data["count"] > 0: # Metrics row col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Prospects", prospects_data["count"]) with col2: ready = sum(1 for p in prospects_data["prospects"] if p["status"] == "ready_for_handoff") st.metric("Ready for Handoff", ready) with col3: blocked = sum(1 for p in prospects_data["prospects"] if p["status"] in ["blocked", "dropped"]) st.metric("Blocked/Dropped", blocked) with col4: scores = [p["fit_score"] for p in prospects_data["prospects"] if p["fit_score"] > 0] avg_score = sum(scores) / len(scores) if scores else 0 st.metric("Avg Fit Score", f"{avg_score:.2f}") st.divider() # Prospect table with enhanced status display prospects_df = pd.DataFrame(prospects_data["prospects"]) # Status mapping with colors and descriptions status_info = { "new": ("๐Ÿ†•", "New", "Just discovered"), "enriched": ("๐Ÿ“š", "Enriched", "Facts gathered"), "contacted": ("๐Ÿ‘ฅ", "Contacted", "Contacts identified"), "scored": ("๐Ÿ“Š", "Scored", "Fit score calculated"), "drafted": ("๐Ÿ“", "Drafted", "Content generated"), "compliant": ("โœ…", "Compliant", "Passed compliance"), "sequenced": ("๐Ÿ“ฎ", "Sequenced", "Email sent"), "ready_for_handoff": ("๐ŸŽฏ", "Ready", "Ready for sales"), "dropped": ("โ›”", "Dropped", "Low score"), "blocked": ("๐Ÿšซ", "Blocked", "Failed requirements") } # Format the dataframe display_data = [] for _, row in prospects_df.iterrows(): status = row["status"] icon, label, desc = status_info.get(status, ("โ“", status, "Unknown")) display_data.append({ "Company": row["company"], "Status": f"{icon} {label}", "Description": desc, "Fit Score": f"{row['fit_score']:.2f}" if row['fit_score'] > 0 else "N/A", "Contacts": row["contacts"], "Facts": row["facts"], "ID": row["id"] }) display_df = pd.DataFrame(display_data) # Show the table st.dataframe( display_df, use_container_width=True, hide_index=True, column_config={ "Fit Score": st.column_config.NumberColumn( format="%.2f", min_value=0, max_value=1 ), "Contacts": st.column_config.NumberColumn(format="%d"), "Facts": st.column_config.NumberColumn(format="%d") } ) else: st.info("No prospects found. Run the pipeline to generate prospects.") except Exception as e: st.error(f"Could not load prospects: {e}") # Details Tab (keeping existing implementation) with tab3: st.header("Prospect Details") st.markdown("Deep dive into individual prospect information") # Prospect selector col1, col2 = st.columns([3, 1]) with col1: prospect_id = st.text_input( "Prospect ID", placeholder="Enter prospect ID (e.g., acme, techcorp, retailplus)", value=st.session_state.current_prospect["id"] if st.session_state.current_prospect else "" ) with col2: st.write("") # Spacer search_btn = st.button("๐Ÿ” Load Details", use_container_width=True) if prospect_id and (search_btn or st.session_state.current_prospect): try: data = requests.get(f"{API_BASE}/prospects/{prospect_id}", timeout=10).json() if "error" not in data: prospect = data["prospect"] thread = data.get("thread") # Persist current prospect so subsequent button clicks don't clear the view st.session_state.current_prospect = prospect col1, col2 = st.columns(2) with col1: st.subheader("๐Ÿ“Š Prospect Info") st.json({ "Company": prospect["company"]["name"], "Status": prospect["status"], "Fit Score": prospect["fit_score"], "Contacts": len(prospect["contacts"]), "Facts": len(prospect["facts"]) }) if prospect.get("summary"): st.subheader("๐Ÿ“ Summary") st.markdown(prospect["summary"]) with col2: if prospect.get("email_draft"): st.subheader("โœ‰๏ธ Email Draft") st.write(f"**Subject:** {prospect['email_draft']['subject']}") st.markdown(prospect["email_draft"]["body"]) if thread: st.subheader("๐Ÿ’ฌ Thread") for msg in thread.get("messages", []): with st.expander(f"{msg['direction']}: {msg['subject']}"): st.write(msg["body"]) st.caption(f"Sent: {msg['sent_at']}") # Handoff section (persistent across reruns) st.subheader("๐Ÿ“ฆ Handoff") handoff = st.session_state.handoff_packets.get(prospect_id) if st.button("Get Handoff Packet", key=f"handoff_{prospect_id}"): try: resp_h = requests.get(f"{API_BASE}/handoff/{prospect_id}", timeout=15) if resp_h.status_code == 200: handoff = resp_h.json() st.session_state.handoff_packets[prospect_id] = handoff else: # Surface API error detail try: detail = resp_h.json().get("detail") except Exception: detail = resp_h.text st.warning(f"Handoff not available: {detail}") except Exception as e: st.error(f"Could not get handoff: {e}") # Render cached handoff if available if handoff: cols = st.columns(2) with cols[0]: st.markdown("**Calendar Slots**") for slot in handoff.get("calendar_slots", []): st.write(f"โ€ข {slot.get('start_iso','')[:16]}") with cols[1]: st.markdown("**Generated At**") st.write(handoff.get("generated_at", "Unknown")) st.markdown("**Full Packet**") st.json(handoff) except Exception as e: st.error(f"Could not load prospect: {e}") # Dev Tools Tab (keeping existing implementation) with tab4: st.header("Developer Tools") st.subheader("๐Ÿงช Writer Streaming Test") test_company_id = st.text_input("Test Company ID", value="acme") if st.button("Test Writer Stream"): with st.spinner("Streaming from Writer agent..."): output_container = st.empty() full_text = "" try: response = requests.post( f"{API_BASE}/writer/stream", json={"company_id": test_company_id}, stream=True ) for line in response.iter_lines(): if line: try: event = json.loads(line) if event.get("type") == "llm_token": token = event["payload"].get("token", "") full_text += token output_container.markdown(full_text) elif event.get("type") == "llm_done": st.success("โœ… Generation complete") # Show final artifacts if "summary" in event["payload"]: with st.expander("Final Summary"): st.markdown(event["payload"]["summary"]) if "email" in event["payload"]: with st.expander("Final Email"): email = event["payload"]["email"] st.write(f"**Subject:** {email.get('subject', '')}") st.markdown(email.get("body", "")) except json.JSONDecodeError: continue except Exception as e: st.error(f"Stream test failed: {e}") st.divider() st.subheader("๐Ÿ“ก API Endpoints") endpoints = [ ("GET /health", "System health check"), ("POST /run", "Run full pipeline (streaming)"), ("POST /writer/stream", "Test Writer streaming"), ("GET /prospects", "List all prospects"), ("GET /prospects/{id}", "Get prospect details"), ("GET /handoff/{id}", "Get handoff packet"), ("POST /reset", "Reset system") ] for endpoint, desc in endpoints: st.code(f"{endpoint} - {desc}")