# app.py import json import os import sys import time import logging from pathlib import Path from typing import Any, Dict, List from neo4j import GraphDatabase from openai import OpenAI import subprocess import time import requests import zipfile import gradio as gr # Import dotenv to load environment variables from a .env file from dotenv import load_dotenv # Load environment variables from a .env file load_dotenv() # ----------------------------- # Configuration # ----------------------------- # Use os.getenv to load environment variables OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") NEO4J_URI = os.getenv("NEO4J_URI") NEO4J_USER = os.getenv("NEO4J_USER") NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") NEO4J_DATABASE = os.getenv("NEO4J_DATABASE") # Use a default if not set CHAT_MODEL = os.getenv("CHAT_MODEL", " gpt-o1") # Default to gpt-4o-mini if not set EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small") # Default if not set # ----------------------------- # Logging setup # ----------------------------- # Basic logging configuration for the application logger = logging.getLogger("proc-assistant") handler = logging.StreamHandler(sys.stdout) fmt = logging.Formatter( "%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) handler.setFormatter(fmt) # Clear existing handlers to avoid duplicate logs in some environments if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(handler) logger.setLevel(logging.INFO) # ----------------------------- # SYSTEM_PROMPT # ----------------------------- SYSTEM_PROMPT = """ You are a Signavio Repository assistant and trying to answer questions based on a repository which can be accessed via cypher queries. You have access to the following tools: - get_nodes() - get_index() - submit_cypher() 1. Always first call the functions - get_nodes() get_index() so that you have a complete list of nodes and full text index. 2. Answer questions by creating cypher queries you submit over the tool submit_cypher() 3. Always query the name and Description of a node. 4. Where an index for the node exists, create a query using a fulltext search 5. Always request related objects using a generic query 'match (n)-[]-(m)' to retrieve all nodes 'm' related to the node 'n' 6. If no responses are found (empty results), retry with a broader or parent term (e.g., "procurement" if "indirect procurement" fails). 7. Use multiple function calls if several processes are needed. 8. Use only the returned results to answer and always cite the nodes names and Type. 9. If results are poor, ask the user to clarify, especially to clarify which node supports answering the question best. 10. Inform the user the exact cypher query you submit. """ # ----------------------------- # Tool implementation / get_nodes # ----------------------------- def get_nodes() -> str: """ Returns a list of all node labels in the database. """ logger.info("Getting all node labels.") cypher = "MATCH (n) RETURN DISTINCT labels(n) AS labels" try: with driver.session(database=NEO4J_DATABASE) as sess: result = [label for record in sess.run(cypher) for label in record["labels"]] logger.info("Retrieved %d distinct node labels.", len(result)) return json.dumps(result, ensure_ascii=False) except Exception as e: logger.error("Error getting node labels: %s", e) return json.dumps({"error": str(e)}, ensure_ascii=False) # ----------------------------- # Tool implementation / get_relations # ----------------------------- def get_relations() -> str: """ Returns a list of all relationship types in the database. """ logger.info("Getting all relationship types.") cypher = "MATCH ()-[r]-() RETURN DISTINCT type(r) AS type" try: with driver.session(database=NEO4J_DATABASE) as sess: result = [record["type"] for record in sess.run(cypher)] logger.info("Retrieved %d distinct relationship types.", len(result)) return json.dumps(result, ensure_ascii=False) except Exception as e: logger.error("Error getting relationship types: %s", e) return json.dumps({"error": str(e)}, ensure_ascii=False) # ----------------------------- # Tool implementation / get_index # ----------------------------- def get_index() -> str: """ Returns a list of all fulltext index names in the database. """ logger.info("Getting all fulltext index names.") cypher = "SHOW FULLTEXT INDEXES YIELD name" try: with driver.session(database=NEO4J_DATABASE) as sess: result = [record["name"] for record in sess.run(cypher)] logger.info("Retrieved %d fulltext index names.", len(result)) return json.dumps(result, ensure_ascii=False) except Exception as e: logger.error("Error getting fulltext index names: %s", e) return json.dumps({"error": str(e)}, ensure_ascii=False) # ----------------------------- # Tool implementation / submit_cypher # ----------------------------- def submit_cypher(cypher_query: str) -> str: """ Executes a Cypher query against the Neo4j database and returns the results as a JSON string. """ logger.info("Executing Cypher query: %s", cypher_query) try: with driver.session(database=NEO4J_DATABASE) as sess: result = sess.run(cypher_query).data() logger.info("Cypher query returned %d rows.", len(result)) return json.dumps(result, ensure_ascii=False) except Exception as e: logger.error("Error executing Cypher query: %s", e) return json.dumps({"error": str(e)}, ensure_ascii=False) # ----------------------------- # Tools definition # ----------------------------- TOOLS = [{ "type": "function", "function": { "name": "get_nodes", "description": "Returns a list of all node labels in the database.", "parameters": { "type": "object", "properties": {}, "required": [], "additionalProperties": False } } }, { "type": "function", "function": { "name": "get_relations", "description": "Returns a list of all relationship types in the database.", "parameters": { "type": "object", "properties": {}, "required": [], "additionalProperties": False } } }, { "type": "function", "function": { "name": "get_index", "description": "Returns a list of all fulltext index names in the database.", "parameters": { "type": "object", "properties": {}, "required": [], "additionalProperties": False } } }, { "type": "function", "function": { "name": "submit_cypher", "description": "Executes a Cypher query against the Neo4j database and returns the results as a JSON string.", "parameters": { "type": "object", "properties": { "cypher_query": { "type": "string", "description": "The Cypher query to execute." } }, "required": ["cypher_query"], "additionalProperties": False } } }] # ----------------------------- # Chatbot assistant function # ----------------------------- def assistant_reply(client: OpenAI, user_query: str, history: list[dict] = []) -> str: """ Provides a conversational response to the user query using the OpenAI Chat API, optionally calling tools to get relevant information from Neo4j. Args: client: The initialized OpenAI client object. user_query: The user's question or request. history: List of previous message dictionaries in the conversation, formatted as [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]. Returns: The assistant's reply as a string. """ logger.info("assistant_reply received user_query: %r", user_query) # Log history carefully to avoid excessive output with large histories if len(history) < 20: # Log full history if relatively small logger.info("assistant_reply received history: %r", history) else: # Log summary if history is longer logger.info("assistant_reply received history length: %d", len(history)) logger.info("assistant_reply first 10 history items: %r", history[:10]) logger.info("assistant_reply last 10 history items: %r", history[-10:]) # Construct the initial messages list including system prompt and the provided history messages = [{"role": "system", "content": SYSTEM_PROMPT}] # Extend messages with the received history (already in the correct role/content dict format from chatbot_interface) messages.extend(history) # Append the current user query as the latest user message dictionary messages.append({"role": "user", "content": user_query}) logger.info("Messages prepared for OpenAI API (first call): %r", messages) try: # First API call: Get assistant's response or tool call logger.info("Calling OpenAI chat completion (first call) with model: %s", CHAT_MODEL) response = client.chat.completions.create( model=CHAT_MODEL, messages=messages, tools=TOOLS, tool_choice="auto", ) response_message = response.choices[0].message logger.info("OpenAI response (first call) message object: %s", response_message) # Log the full message object # Handle tool calls if response_message.tool_calls: logger.info("Tool calls detected: %s", response_message.tool_calls) available_functions = { "get_nodes": get_nodes, "get_relations": get_relations, "get_index": get_index, "submit_cypher": submit_cypher, } # Append the assistant's message with tool_calls to the messages list # This message object returned by client.chat.completions.create is usually directly compatible # with the 'messages' format for the subsequent API call in a tool-using turn. messages.append(response_message) logger.info("Messages after appending assistant tool_calls message: %r", messages) # Execute tool calls and append results for tool_call in response_message.tool_calls: function_name = tool_call.function.name # Ensure the function name exists in available_functions function_to_call = available_functions.get(function_name) if function_to_call: try: # Parse arguments from the tool call message function_args = json.loads(tool_call.function.arguments) logger.info("Parsed function arguments: %r", function_args) # Call the tool function logger.info("Calling tool function: %s with args: %r", function_name, function_args) # Ensure tool functions return a string or JSON string as content # Adjust how arguments are passed based on the function's signature if function_name == "submit_cypher": function_response_content = function_to_call( function_args.get("cypher_query") ) else: # For get_nodes, get_relations, get_index which take no arguments function_response_content = function_to_call() logger.info("Tool function response content (first 500 chars): %s", function_response_content[:500]) # Append tool output message to the messages list in the correct format # Role must be 'tool', content is the string output, and must include tool_call_id and name messages.append( { "tool_call_id": tool_call.id, # Required for tool response messages "role": "tool", "content": function_response_content, # Content must be a string "name": function_name # Required for tool response messages } ) logger.info("Appended tool output message: %r", messages[-1]) except json.JSONDecodeError: logger.error("Error decoding function arguments JSON: %s", tool_call.function.arguments) # Append an error message as tool output if args are invalid messages.append( { "tool_call_id": tool_call.id, "role": "tool", "content": f"Error: Invalid JSON arguments for tool '{function_name}'.", "name": function_name } ) logger.info("Appended tool invalid args error message: %r", messages[-1]) except Exception as e: logger.error("Error executing tool '%s': %s", function_name, e) logger.exception("Tool execution traceback:") # Append an error message as tool output messages.append( { "tool_call_id": tool_call.id, "role": "tool", "content": f"Error executing tool '{function_name}': {e}", # Content as string "name": function_name } ) logger.info("Appended tool execution error message: %r", messages[-1]) else: logger.warning("Function '%s' called by model not found in available_functions", function_name) # If the model hallucinates a tool call, append a tool message indicating it wasn't found. messages.append( { "tool_call_id": tool_call.id, "role": "tool", "content": f"Error: Tool '{function_name}' not found.", "name": function_name # Still include the name from the model's call } ) logger.info("Appended 'tool not found' message: %r", messages[-1]) # Second API call: Get final response after tool execution logger.info("Calling OpenAI chat completion (second call after tools) with model: %s", CHAT_MODEL) logger.info("Messages prepared for OpenAI API (second call): %r", messages) second_response = client.chat.completions.create( model=CHAT_MODEL, messages=messages, # Send the updated messages list including assistant tool_calls and tool outputs ) final_response_message = second_response.choices[0].message logger.info("OpenAI response (second call) message object: %s", final_response_message) # Log the full message object # The content of the second response is the final assistant reply # Ensure the final response content is a string final_response = final_response_message.content if final_response_message.content is not None else "" logger.info("Final assistant response content (after tool): %s", final_response) return final_response else: # No function call needed, return the initial response content logger.info("No tool calls detected. Returning initial response content.") # Ensure the initial response content is a string initial_response_content = response_message.content if response_message.content is not None else "" logger.info("Initial assistant response content: %s", initial_response_content) return initial_response_content except Exception as e: logger.exception("An error occurred during OpenAI chat completion:") # Log the full exception traceback # Return a user-friendly error message return f"An error occurred while processing your request: {e}" # Include error message for debugging # ----------------------------- # Clients # ----------------------------- # Ensure these variables are defined in previous cells or loaded from environment variables # OPENAI_API_KEY, NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD, NEO4J_DATABASE # Initialize clients using environment variables try: client = OpenAI(api_key=OPENAI_API_KEY) driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) logger.info("Clients (OpenAI and Neo4j) initialized successfully.") except Exception as e: logger.error("Error initializing clients: %s", e) # Depending on the severity, you might want to exit or handle this differently # For a Gradio app, you might want to show an error message on the UI # ----------------------------- # Gradio Interface # ----------------------------- # Get the logger for the chatbot interface function as well logger = logging.getLogger("proc-assistant") def chatbot_interface(user_query: str, history: list[dict]) -> tuple[list[dict], list[dict]]: """ Wrapper function for assistant_reply to be used with Gradio. Manages conversation history. Accepts and returns history in OpenAI messages format. """ logger.info(f"chatbot_interface received history (dict format): {history}") logger.info(f"chatbot_interface received user_query: {user_query}") # The history is already in the correct format (list[dict]) due to gr.Chatbot type='messages' # Get the response from the assistant try: # Pass the history directly to assistant_reply (it expects list[dict]) # Pass the client object to assistant_reply assistant_response_content = assistant_reply(client, user_query, history) logger.info(f"chatbot_interface received assistant_response_content: {assistant_response_content}") assistant_message = {"role": "assistant", "content": assistant_response_content} except Exception as e: logger.exception("Error calling assistant_reply from chatbot_interface:") assistant_response_content = "An error occurred while getting the assistant's response. Please check the logs for details." # Provide a fallback message assistant_message = {"role": "assistant", "content": assistant_response_content} # Append the new interaction (user query and assistant response) to the history # History is already in list[dict] format, so we append the new messages updated_history = history + [{"role": "user", "content": user_query}, assistant_message] logger.info(f"chatbot_interface updated history (dict format): {updated_history}") # Return the updated history for the Chatbot component and the state # Both need the history in list[dict] format now because gr.Chatbot has type='messages' return updated_history, updated_history # Create the Gradio interface with history components my_theme = gr.themes.Soft( primary_hue="yellow", secondary_hue="gray", neutral_hue="zinc", radius_size="sm", spacing_size="md" ).set( body_background_fill="#FFFFFF", block_background_fill="#FFFFFF", block_border_color="#E5E5E5", input_background_fill="#FAFAFA", input_border_color="#DDDDDD", button_primary_background_fill="#F2C200", button_primary_text_color="#000000", button_secondary_background_fill="#FFFFFF", button_secondary_text_color="#111111" ) with gr.Blocks(theme=my_theme) as iface: gr.HTML("""
bpExperts Logo

Business Flows Assistant

""") # Use gr.Chatbot with type='messages' to handle history in OpenAI message format chatbot = gr.Chatbot(label="Process Assistant Chatbot", type='messages') msg = gr.Textbox(label="Ask a question about business processes.") clear = gr.ClearButton([msg, chatbot]) # gr.State is used to maintain the history between interactions # Initialize state as an empty list for history in dict format state = gr.State([]) # The submit method connects the input (msg, state) to the function (chatbot_interface) # and updates the chatbot and state with the returned history (now in dict format). msg.submit(chatbot_interface, inputs=[msg, state], outputs=[chatbot, state]) # ----------------------------- # Launch the Gradio interface # ----------------------------- # This part will run when the app.py file is executed if __name__ == "__main__": logger.info("Launching Gradio interface.") # Use host="0.0.0.0" to make the app accessible externally in a container environment # Set share=False for production deployment iface.launch(server_name="0.0.0.0", share=False) logger.info("Gradio interface launched.")