Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import base64 | |
| import io | |
| import json | |
| import requests | |
| import os | |
| # --- Configuration --- | |
| # Read from environment variables with fallback defaults | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") | |
| OPENAI_API_BASE_URL = os.environ.get("OPENAI_API_BASE_URL", "https://api.openai.com/v1") | |
| OPENAI_CHAT_COMPLETIONS_ENDPOINT = f"{OPENAI_API_BASE_URL}/chat/completions" | |
| TEXT_MODEL = os.environ.get("TEXT_MODEL", "gpt-3.5-turbo") | |
| # MCP Server's direct image query endpoint | |
| MCP_IMAGE_QUERY_ENDPOINT = os.environ.get("MCP_IMAGE_QUERY_ENDPOINT", "https://gnr-demo.edgecollaborate.com/mcp/image_query") | |
| # --- Tool Definitions for Supervisor Agent's LLM --- | |
| GENERAL_CHAT_TOOL = { | |
| "type": "function", | |
| "function": { | |
| "name": "general_chat", | |
| "description": "Engage in general conversation and answer questions that do not require specific document or image analysis.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The user's query for general conversation." | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| } | |
| DOCUMENT_ANALYSIS_TOOL = { | |
| "type": "function", | |
| "function": { | |
| "name": "document_analysis", | |
| "description": "Analyze and summarize uploaded text documents based on the user's query.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The query related to the document analysis." | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| } | |
| IMAGE_ANALYSIS_TOOL = { | |
| "type": "function", | |
| "function": { | |
| "name": "image_analysis", | |
| "description": "Process and analyze uploaded images based on the user's query. This tool delegates image processing to an external Model Context Protocol (MCP) that handles vision models.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The query related to the image analysis, to be sent to the MCP's vision capabilities." | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| } | |
| # List of all tools available to the Supervisor Agent's LLM | |
| SUPERVISOR_TOOLS = [GENERAL_CHAT_TOOL, DOCUMENT_ANALYSIS_TOOL, IMAGE_ANALYSIS_TOOL] | |
| # --- Helper Functions for OpenAI Compatible API Calls --- | |
| def call_openai_api(messages, model, tools=None, tool_choice=None, generation_config=None): | |
| """ | |
| Makes a call to an OpenAI-compatible API with the given messages, supporting tool calls. | |
| Returns content or tool_calls. | |
| """ | |
| if not OPENAI_API_KEY: | |
| st.error("OpenAI Compatible API Key is not set. Please provide it in your environment variables.") | |
| return None, None | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {OPENAI_API_KEY}" | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": 0.7, # Default temperature | |
| } | |
| if tools: | |
| payload["tools"] = tools | |
| if tool_choice: | |
| payload["tool_choice"] = tool_choice | |
| if generation_config: | |
| payload.update(generation_config) | |
| try: | |
| with st.spinner(f"Agent ({model}) thinking..."): | |
| response = requests.post(OPENAI_CHAT_COMPLETIONS_ENDPOINT, headers=headers, json=payload) | |
| response.raise_for_status() # Raise an exception for HTTP errors | |
| result = response.json() | |
| if result and result.get("choices") and result["choices"][0].get("message"): | |
| message = result["choices"][0]["message"] | |
| if message.get("content"): | |
| return message["content"], None | |
| elif message.get("tool_calls"): | |
| return None, message["tool_calls"] | |
| # If the LLM returns an empty message object or no choices/message, | |
| # return a specific string instead of None to prevent 'None' in UI. | |
| return "No response from agent.", None | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error communicating with OpenAI Compatible API: {e}") | |
| return None, None | |
| except json.JSONDecodeError: | |
| st.error("Failed to decode JSON response from OpenAI Compatible API.") | |
| return None, None | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {e}") | |
| return None, None | |
| def _call_text_model_for_content(messages, model): | |
| """ | |
| Internal helper to call the text model for content, without managing chat history. | |
| Used by tools to get a response. | |
| """ | |
| content, _ = call_openai_api(messages, model) | |
| return content | |
| def get_openai_response(user_message, chat_history=None): | |
| """ | |
| Gets a response from the OpenAI compatible text model for general chat. | |
| This function specifically handles the 'General Chat Agent' mode. | |
| """ | |
| if chat_history is None: | |
| chat_history = [] | |
| messages = [] | |
| for msg in chat_history[-4:]: # Use last 4 messages for context | |
| messages.append({"role": msg["role"], "content": msg["content"]}) | |
| messages.append({"role": "user", "content": user_message}) | |
| # This function is now responsible for getting the content and returning it. | |
| # The calling context (General Chat Agent mode) will append it to chat_history. | |
| content = _call_text_model_for_content(messages, TEXT_MODEL) | |
| return content | |
| def get_openai_document_analysis(prompt, document_content): | |
| """ | |
| Gets document analysis/summarization from the OpenAI compatible text model. | |
| """ | |
| if not document_content: | |
| return "Error: Document analysis requested but no document uploaded." | |
| full_prompt = f"Analyze the following document content and respond to the query:\n\nDocument:\n{document_content}\n\nQuery: {prompt}" | |
| messages = [{"role": "user", "content": full_prompt}] | |
| content = _call_text_model_for_content(messages, TEXT_MODEL) | |
| return content | |
| def call_mcp_image_tool(question_for_image: str, image_data_base64: str): | |
| if not image_data_base64: | |
| return "Error: Image processing requested but no image uploaded." | |
| if not MCP_IMAGE_QUERY_ENDPOINT: | |
| return "Error: MCP_IMAGE_QUERY_ENDPOINT is not configured in your environment variables." | |
| payload = { | |
| "image_base64": image_data_base64, | |
| "question": question_for_image | |
| } | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| try: | |
| print(f"DEBUG: Value of question_for_image: '{question_for_image}'") | |
| print(f"DEBUG: Type of question_for_image: {type(question_for_image)}") | |
| print(f"DEBUG: Length of image_data_base64: {len(image_data_base64) if image_data_base64 else 0}") | |
| print(f"DEBUG: Type of image_data_base64: {type(image_data_base64)}") | |
| print(f"DEBUG: Full payload being sent to MCP:\n{json.dumps(payload, indent=2)}") | |
| with st.spinner("Delegating to MCP for image processing..."): | |
| response = requests.post(MCP_IMAGE_QUERY_ENDPOINT, json=payload, headers=headers, timeout=180) | |
| response.raise_for_status() | |
| mcp_result = response.json() | |
| print(f"Received response from MCP: {json.dumps(mcp_result, indent=2)}") | |
| if isinstance(mcp_result, dict) and "result" in mcp_result: | |
| return mcp_result["result"] | |
| else: | |
| return f"Unexpected response format from MCP. Raw response: {mcp_result}" | |
| except requests.exceptions.RequestException as e: | |
| if e.response is not None: | |
| error_details = e.response.json() if e.response.text else "No response body." | |
| st.error(f"Error communicating with MCP: {e}. FastAPI validation details: {error_details}") | |
| print(f"DEBUG: Full FastAPI 422 response text: {e.response.text}") | |
| else: | |
| st.error(f"Error communicating with MCP: {e}.") | |
| return f"Error: Failed to get response from MCP." | |
| except json.JSONDecodeError: | |
| st.error(f"Error: Failed to decode JSON response from MCP. Is the MCP server responding with valid JSON? Raw response: {response.text}") | |
| return f"Error: Invalid JSON response from MCP." | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred during MCP delegation: {e}") | |
| return f"An unexpected error occurred: {e}" | |
| # --- Streamlit App Layout --- | |
| st.set_page_config( | |
| page_title="Shopping Floor Safety Monitor", | |
| page_icon="π€", | |
| layout="wide" | |
| ) | |
| st.title("π€ Shopping Floor Safety Monitor") | |
| st.markdown(""" | |
| **AI-Powered Document and Image Analysis** | |
| This application uses a **Supervisor Agent** that intelligently orchestrates specialized agents to analyze documents, | |
| process images, and provide comprehensive answers to your queries. | |
| """) | |
| # Initialize session state for chat history and uploaded files | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "uploaded_doc_content" not in st.session_state: | |
| st.session_state.uploaded_doc_content = None | |
| if "uploaded_image_data" not in st.session_state: | |
| st.session_state.uploaded_image_data = None | |
| if "uploaded_image_base64" not in st.session_state: | |
| st.session_state.uploaded_image_base64 = None | |
| if "input_text_value" not in st.session_state: | |
| st.session_state.input_text_value = "" | |
| # Initialize a counter for the input widget key | |
| if "input_key_counter" not in st.session_state: | |
| st.session_state.input_key_counter = 0 | |
| # Initialize session state for showing traces | |
| if "show_traces" not in st.session_state: | |
| st.session_state.show_traces = False | |
| # --- Callback to clear state when clear conversation button is clicked --- | |
| def clear_state_on_change(): | |
| st.session_state.chat_history = [] | |
| st.session_state.uploaded_doc_content = None | |
| st.session_state.uploaded_image_data = None | |
| st.session_state.uploaded_image_base64 = None | |
| st.session_state.input_text_value = "" | |
| st.session_state.input_key_counter += 1 # Increment to force re-creation of text_area | |
| # --- Callback for the "Show Traces" toggle --- | |
| def update_show_traces_state(): | |
| # This function is called when the toggle is clicked. | |
| # The value of the toggle is automatically updated in st.session_state | |
| # under the key specified by the 'key' argument of st.toggle. | |
| # We explicitly update our 'show_traces' state variable based on the toggle's new value. | |
| st.session_state.show_traces = st.session_state.show_traces_toggle_internal | |
| # --- Sidebar for Agent Architecture and Settings --- | |
| st.sidebar.header("π€ Multi-Agent System") | |
| # Compact Agent Architecture Visualization | |
| st.sidebar.markdown(f""" | |
| <div style='background-color: rgba(0, 102, 204, 0.1); padding: 10px; border-radius: 8px; margin-bottom: 15px; border: 1px solid rgba(0, 102, 204, 0.3);'> | |
| <div style='text-align: center; margin-bottom: 8px;'> | |
| <strong style='color: #0066cc;'>π― Supervisor</strong> | |
| <div style='font-size: 10px; opacity: 0.7;'>orchestrates β</div> | |
| </div> | |
| <div style='font-size: 11px; line-height: 1.6;'> | |
| π¬ <strong>Chat</strong> β’ π <strong>Document</strong> β’ πΌοΈ <strong>Image (MCP)</strong> | |
| </div> | |
| <div style='margin-top: 8px; padding-top: 8px; border-top: 1px solid rgba(0, 102, 204, 0.2); text-align: center;'> | |
| <span style='font-size: 10px; opacity: 0.7;'>Model: <strong>{TEXT_MODEL}</strong></span> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| st.sidebar.header("π€ Upload Your Data") | |
| uploaded_document = st.sidebar.file_uploader( | |
| "π Text Document", | |
| type=["txt", "md"], | |
| key="doc_uploader", | |
| help="Upload any text document for analysis (.txt or .md)" | |
| ) | |
| if uploaded_document is not None: | |
| try: | |
| string_data = uploaded_document.read().decode("utf-8") | |
| st.session_state.uploaded_doc_content = string_data | |
| st.sidebar.success("β Document uploaded successfully!") | |
| with st.sidebar.expander("π View Document Preview"): | |
| st.code(string_data[:500] + "..." if len(string_data) > 500 else string_data) | |
| except Exception as e: | |
| st.sidebar.error(f"β Error reading document: {e}") | |
| st.session_state.uploaded_doc_content = None | |
| else: | |
| st.session_state.uploaded_doc_content = None | |
| uploaded_image = st.sidebar.file_uploader( | |
| "πΈ Image File", | |
| type=["jpg", "jpeg", "png"], | |
| key="image_uploader", | |
| help="Upload any image for visual analysis" | |
| ) | |
| if uploaded_image is not None: | |
| try: | |
| image_bytes = uploaded_image.read() | |
| st.session_state.uploaded_image_data = image_bytes | |
| st.session_state.uploaded_image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| st.sidebar.success("β Image uploaded successfully!") | |
| st.sidebar.image(uploaded_image, caption="Uploaded Image", use_container_width=True) | |
| except Exception as e: | |
| st.sidebar.error(f"β Error reading image: {e}") | |
| st.session_state.uploaded_image_data = None | |
| st.session_state.uploaded_image_base64 = None | |
| else: | |
| st.session_state.uploaded_image_data = None | |
| st.session_state.uploaded_image_base64 = None | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown(""" | |
| <div style='background-color: rgba(33, 150, 243, 0.1); padding: 10px; border-radius: 8px; font-size: 11px; border: 1px solid rgba(33, 150, 243, 0.3);'> | |
| <strong style='color: inherit;'>βΉοΈ How it works:</strong><br/> | |
| Upload files β Ask question β Get AI analysis | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # --- Main Chat Interface --- | |
| st.subheader("π¬ Analysis Console") | |
| # --- Compact status indicator showing uploaded files --- | |
| status_parts = [] | |
| if st.session_state.uploaded_doc_content: | |
| status_parts.append("π Document") | |
| if st.session_state.uploaded_image_base64: | |
| status_parts.append("πΌοΈ Image") | |
| if status_parts: | |
| st.success(f"β Ready: {' β’ '.join(status_parts)}") | |
| else: | |
| st.info("βΉοΈ No files uploaded - You can still chat or upload files from the sidebar") | |
| st.markdown("---") | |
| # Use a form for automatic submission on Enter | |
| with st.form(key="chat_form"): | |
| # Set default prompt if input_text_value is empty | |
| default_value = st.session_state.input_text_value if st.session_state.input_text_value else "Analyze the given image for any safety hazard. If you find any, draft an email to John who is responsible for floor safety. Describe the exact safety issue." | |
| user_input_from_widget = st.text_area( | |
| "βοΈ Your Question or Request:", | |
| key=f"user_input_widget_{st.session_state.input_key_counter}", | |
| value=default_value, | |
| height=120, | |
| placeholder="Ask me anything about your uploaded documents or images, or just chat with me..." | |
| ) | |
| col_btn1, col_btn2 = st.columns([0.85, 0.15]) | |
| with col_btn1: | |
| send_button_clicked = st.form_submit_button("π Analyze", use_container_width=True, type="primary") | |
| with col_btn2: | |
| st.form_submit_button("ποΈ Clear", on_click=clear_state_on_change, use_container_width=True) | |
| if send_button_clicked: | |
| current_user_message = user_input_from_widget | |
| if current_user_message: | |
| st.session_state.chat_history.append({"role": "user", "content": current_user_message}) | |
| # Prepare messages for supervisor, including context about uploaded files | |
| initial_system_prompt = ( | |
| "You are a highly capable Supervisor Agent responsible for fulfilling user requests. " | |
| "You have access to specialized tools: 'general_chat' for conversational queries, " | |
| "'document_analysis' for extracting insights from text documents, and 'image_analysis' for processing images. " | |
| "Your task is to select and execute the most appropriate tool(s) based on the user's query " | |
| "and any available uploaded data. " | |
| "\n\n" | |
| "IMPORTANT INSTRUCTIONS:\n" | |
| "1. If the user's request requires analysis of uploaded files, call the appropriate tool(s) (document_analysis and/or image_analysis).\n" | |
| "2. After receiving tool outputs, you MUST synthesize the information into a comprehensive, direct answer to the user.\n" | |
| "3. DO NOT just describe what tools were called or what they returned.\n" | |
| "4. DO NOT generate meta-commentary about the process.\n" | |
| "5. Provide the actual final answer, report, email draft, or analysis that the user requested.\n" | |
| "6. If multiple tools were used, combine their outputs into one coherent response.\n" | |
| "7. Be direct and answer the user's question fully using the information from the tools.\n" | |
| ) | |
| user_message_content = current_user_message | |
| if st.session_state.uploaded_doc_content: | |
| user_message_content += "\n\n[CONTEXT: A text document is available for analysis. Use the `document_analysis` tool if relevant to the query.]" | |
| if st.session_state.uploaded_image_base64: | |
| user_message_content += "\n\n[CONTEXT: An image file is available for analysis. Use the `image_analysis` tool if relevant to the query.]" | |
| messages_for_supervisor = [ | |
| {"role": "system", "content": initial_system_prompt}, | |
| {"role": "user", "content": user_message_content} # Use the modified user message | |
| ] | |
| # final_agent_response will store the last, definitive content from the agent | |
| final_agent_response = "" | |
| orchestration_finished = False # Flag to indicate if orchestration resulted in a final answer | |
| # Loop for potential multi-turn tool use by Supervisor Agent | |
| MAX_TURNS = 5 | |
| for turn_count in range(MAX_TURNS): | |
| # Call the Supervisor LLM to decide on actions (content or tool calls) | |
| content, tool_calls = call_openai_api(messages_for_supervisor, TEXT_MODEL, tools=SUPERVISOR_TOOLS) | |
| if tool_calls: | |
| # Log tool execution (always add to history, display controlled by toggle) | |
| st.session_state.chat_history.append({"role": "model", "content": f"*(Supervisor decided to use {len(tool_calls)} tool(s) - Turn {turn_count + 1})*"}) | |
| tool_output_messages = [] | |
| any_tool_error = False | |
| for tool_call in tool_calls: | |
| function_name = tool_call['function']['name'] | |
| function_args = json.loads(tool_call['function']['arguments']) | |
| tool_call_id = tool_call['id'] | |
| st.session_state.chat_history.append({"role": "model", "content": f"*(Executing tool: {function_name})*"}) | |
| tool_output = "" | |
| # Handle cases where tool is called but no relevant file is uploaded | |
| if function_name == "image_analysis" and not st.session_state.uploaded_image_base64: | |
| tool_output = "Error: Image analysis tool selected by Supervisor, but no image was uploaded. Please upload an image if you want image analysis." | |
| st.warning(tool_output) | |
| any_tool_error = True | |
| elif function_name == "document_analysis" and not st.session_state.uploaded_doc_content: | |
| tool_output = "Error: Document analysis tool selected by Supervisor, but no document was uploaded. Please upload a document if you want document analysis." | |
| st.warning(tool_output) | |
| any_tool_error = True | |
| elif function_name == "general_chat": | |
| # For general_chat tool, we directly call the text model for content | |
| tool_output = _call_text_model_for_content([{"role": "user", "content": function_args["query"]}], TEXT_MODEL) | |
| elif function_name == "document_analysis": | |
| tool_output = get_openai_document_analysis(function_args["query"], st.session_state.uploaded_doc_content) | |
| elif function_name == "image_analysis": | |
| tool_output = call_mcp_image_tool(function_args["query"], st.session_state.uploaded_image_base64) | |
| else: | |
| tool_output = f"Error: Unknown tool '{function_name}' called by Supervisor." | |
| any_tool_error = True | |
| # Ensure tool_output is a string, even if the tool function returns None or empty | |
| if tool_output is None: | |
| tool_output = "No output from tool." | |
| # Store the tool call (assistant message) and its output (tool message) | |
| tool_output_messages.append({"role": "assistant", "tool_calls": [tool_call]}) | |
| tool_output_messages.append({"role": "tool", "tool_call_id": tool_call_id, "content": tool_output}) | |
| # Always add to history (display controlled by toggle) | |
| st.session_state.chat_history.append({"role": "model", "content": f"*(Tool output for {function_name}: {tool_output[:100]}...)*" if len(tool_output) > 100 else f"*(Tool output for {function_name}: {tool_output})*"}) | |
| # Add all tool outputs to the messages for the next LLM call | |
| messages_for_supervisor.extend(tool_output_messages) | |
| # Add explicit instruction to synthesize the final answer | |
| messages_for_supervisor.append({ | |
| "role": "system", | |
| "content": "Now provide your final comprehensive answer to the user based on the tool outputs above. Do NOT describe what tools were used or provide meta-commentary. Directly answer the user's question with the actual content they requested (e.g., if they asked for an email draft, provide the complete email)." | |
| }) | |
| # If there were errors in tool execution, break loop to prevent further LLM calls with bad context | |
| if any_tool_error: | |
| final_agent_response = "Supervisor encountered issues executing some tools. Please review the warnings above." | |
| orchestration_finished = True | |
| break # Exit the loop if errors occurred | |
| # If LLM returned tool calls, we continue the loop for the next turn | |
| # to allow it to process the tool outputs and generate a final answer. | |
| # No 'break' here - continue to next iteration | |
| elif content: | |
| # If LLM returns content directly (no tool call), this is the final answer | |
| final_agent_response = content | |
| orchestration_finished = True | |
| break # Exit the loop as we have a final response | |
| else: | |
| # If no content and no tool calls, it's an ambiguous state, break. | |
| final_agent_response = "Supervisor did not provide a direct response or tool calls in this turn. Halting orchestration." | |
| orchestration_finished = True | |
| break | |
| # After the orchestration loop, ensure a final response is set | |
| if not final_agent_response: # Checks if it's still empty after the loop | |
| st.session_state.chat_history.append({"role": "model", "content": f"*(Supervisor synthesizing final response...)*"}) | |
| synthesized_response, _ = call_openai_api(messages_for_supervisor, TEXT_MODEL) | |
| if synthesized_response: | |
| final_agent_response = synthesized_response | |
| else: | |
| final_agent_response = "Supervisor completed its process but could not generate a clear final response. Please rephrase your query or provide more context." | |
| # ONLY APPEND THE FINAL RESPONSE HERE | |
| st.session_state.chat_history.append({"role": "model", "content": final_agent_response}) | |
| st.session_state.input_text_value = "" | |
| st.rerun() | |
| # --- Display Chat History --- | |
| st.markdown("---") | |
| # --- Header with Toggle --- | |
| col_header1, col_header2, col_header3 = st.columns([0.6, 0.2, 0.2]) | |
| with col_header1: | |
| st.subheader("π Conversation History") | |
| with col_header2: | |
| st.button("ποΈ Clear All", on_click=clear_state_on_change, use_container_width=True, key="clear_history_btn") | |
| with col_header3: | |
| # Toggle button for showing/hiding traces | |
| st.toggle( | |
| "π Traces", | |
| value=st.session_state.show_traces, | |
| key="show_traces_toggle_internal", | |
| on_change=update_show_traces_state, | |
| help="Show/hide agent thinking traces" | |
| ) | |
| chat_container = st.container(height=500, border=True) | |
| # Display messages in chronological order (oldest first) | |
| with chat_container: | |
| for message in st.session_state.chat_history: | |
| if message["role"] == "user": | |
| st.markdown(f"**π€ You:** {message['content']}") | |
| st.markdown("---") | |
| else: # role is "model" | |
| # Check if it's a trace message (starts with *( and ends with )*) | |
| is_trace_message = isinstance(message["content"], str) and message["content"].startswith("*(") and message["content"].endswith(")*") | |
| if st.session_state.show_traces or not is_trace_message: | |
| # Apply different styling for trace vs final messages | |
| if is_trace_message: | |
| # Trace messages in a muted style | |
| st.markdown(f"<div style='opacity: 0.6; font-style: italic; font-size: 0.9em;'>π {message['content']}</div>", unsafe_allow_html=True) | |
| else: | |
| # Final answer in normal style | |
| st.markdown(f"**π€ Agent:** {message['content']}") | |
| st.markdown("---") | |