# chatbot_agent.py import os import json import re from openai import OpenAI import traceback import logging # Get logger logger = logging.getLogger(__name__) client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) def format_message(role, content): """Format message for chatbot history.""" return {"role": role, "content": content} def initialize_graph_prompt(graph_data): """Initialize the conversation with detailed node, edge, linker, and text information.""" summary_info = ( f"Symbols (represented as nodes): {graph_data['summary']['symbol_count']}, " f"Texts: {graph_data['summary']['text_count']}, " f"Lines: {graph_data['summary']['line_count']}, " f"Linkers: {graph_data['summary']['linker_count']}, " f"Edges: {graph_data['summary']['edge_count']}." ) # Prepare detailed node (symbol) data node_details = "Nodes (symbols) in the graph include the following details:\n" for symbol in graph_data["detailed_results"]["symbols"]: node_details += ( f"Node ID: {symbol['symbol_id']}, Class ID: {symbol['class_id']}, " f"Category: {symbol['category']}, Type: {symbol['type']}, " f"Label: {symbol['label']}, Confidence: {symbol['confidence']}\n" ) # Prepare edge data edge_details = "Edges in the graph showing connections between nodes are as follows:\n" for edge in graph_data["detailed_results"].get("edges", []): edge_details += ( f"Edge ID: {edge['edge_id']}, From Node: {edge['symbol_1_id']}, " f"To Node: {edge['symbol_2_id']}, Type: {edge.get('type', 'unknown')}\n" ) # Prepare linker data linker_details = "Linkers in the diagram are as follows:\n" for linker in graph_data["detailed_results"].get("linkers", []): linker_details += ( f"Symbol ID: {linker['symbol_id']}, Associated Text IDs: {linker.get('text_ids', [])}, " f"Associated Edge IDs: {linker.get('edge_ids', [])}, Position: {linker.get('bbox', 'unknown')}\n" ) # Prepare text (tag) data text_details = "Text elements with associated tags in the diagram are as follows:\n" for text in graph_data["detailed_results"].get("texts", []): text_details += ( f"Text ID: {text['text_id']}, Content: {text['content']}, " f"Confidence: {text['confidence']}, Position: {text['bbox']}\n" ) initial_prompt = ( "You have access to a knowledge graph generated from a P&ID diagram. " f"The summary information includes:\n{summary_info}\n\n" "The detailed information about each node (symbol) in the graph is as follows:\n" f"{node_details}\n" "The edges connecting these nodes are as follows:\n" f"{edge_details}\n" "The linkers in the diagram are as follows:\n" f"{linker_details}\n" "The text elements and their tags in the diagram are as follows:\n" f"{text_details}\n" "Answer questions about specific nodes, edges, types, labels, categories, linkers, or text tags using this information." ) return initial_prompt def get_assistant_response(user_message, json_path): """Generate response based on P&ID data and OpenAI.""" try: # Load the aggregated data with open(json_path, 'r') as f: data = json.load(f) # Process the user's question question = user_message.lower() # Use rule-based responses for specific questions if "valve" in question or "valves" in question: valve_count = sum(1 for symbol in data.get('symbols', []) if 'class' in symbol and 'valve' in symbol['class'].lower()) return f"I found {valve_count} valves in this P&ID." elif "pump" in question or "pumps" in question: pump_count = sum(1 for symbol in data.get('symbols', []) if 'class' in symbol and 'pump' in symbol['class'].lower()) return f"I found {pump_count} pumps in this P&ID." elif "equipment" in question or "components" in question: equipment_types = {} for symbol in data.get('symbols', []): if 'class' in symbol: eq_type = symbol['class'] equipment_types[eq_type] = equipment_types.get(eq_type, 0) + 1 response = "Here's a summary of the equipment I found:\n" for eq_type, count in equipment_types.items(): response += f"- {eq_type}: {count}\n" return response # For other questions, use OpenAI else: # Prepare the conversation context graph_data = { "summary": { "symbol_count": len(data.get('symbols', [])), "text_count": len(data.get('texts', [])), "line_count": len(data.get('lines', [])), "edge_count": len(data.get('edges', [])), }, "detailed_results": data } initial_prompt = initialize_graph_prompt(graph_data) conversation = [ {"role": "system", "content": initial_prompt}, {"role": "user", "content": user_message} ] response = client.chat.completions.create( model="gpt-4-turbo", messages=conversation ) return response.choices[0].message.content except Exception as e: logger.error(f"Error in get_assistant_response: {str(e)}") logger.error(traceback.format_exc()) return "I apologize, but I encountered an error analyzing the P&ID data. Please try asking a different question." # Testing and Usage block if __name__ == "__main__": # Load the knowledge graph data from JSON file json_file_path = "results/0_aggregated_detections.json" try: with open(json_file_path, 'r') as file: graph_data = json.load(file) except FileNotFoundError: print(f"Error: File not found at {json_file_path}") graph_data = None except json.JSONDecodeError: print("Error: Failed to decode JSON. Please check the file format.") graph_data = None # Initialize conversation history with assistant's welcome message history = [format_message("assistant", "Hello! I am ready to answer your questions about the P&ID knowledge graph. The graph includes nodes (symbols), edges, linkers, and text tags, and I have detailed information available about each. Please ask any questions related to these elements and their connections.")] # Print the assistant's welcome message print("Assistant:", history[0]["content"]) # Individual Testing Options if graph_data: # Option 1: Test the graph prompt initialization print("\n--- Test: Graph Prompt Initialization ---") initial_prompt = initialize_graph_prompt(graph_data) print(initial_prompt) # Option 2: Simulate a conversation with a test question print("\n--- Test: Simulate Conversation ---") test_question = "Can you tell me about the connections between the nodes?" history.append(format_message("user", test_question)) print(f"\nUser: {test_question}") for response in get_assistant_response(test_question, json_file_path): print("Assistant:", response) history.append(format_message("assistant", response)) # Option 3: Manually input questions for interactive testing while True: user_question = input("\nYou: ") if user_question.lower() in ["exit", "quit"]: print("Exiting chat. Goodbye!") break history.append(format_message("user", user_question)) for response in get_assistant_response(user_question, json_file_path): print("Assistant:", response) history.append(format_message("assistant", response)) else: print("Unable to load graph data. Please check the file path and format.")