Intelligent_PID / chatbot_agent.py
msIntui
feat: initial clean deployment
910e0d4
# chatbot_agent.py
import os
import json
import re
from openai import OpenAI
import traceback
import logging
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get logger
logger = logging.getLogger(__name__)
# Initialize OpenAI client with error handling
def get_openai_client():
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OpenAI API key not found in environment variables")
return OpenAI(api_key=api_key)
def format_message(role, content):
"""Format message for chatbot history."""
return {"role": role, "content": content}
def initialize_graph_prompt(graph_data):
"""Initialize the conversation with available node and edge information."""
try:
# Get summary info with safe fallbacks
summary = graph_data.get('summary', {})
summary_parts = []
# Only include counts that exist
if 'symbol_count' in summary:
summary_parts.append(f"Symbols: {summary['symbol_count']}")
if 'text_count' in summary:
summary_parts.append(f"Texts: {summary['text_count']}")
if 'line_count' in summary:
summary_parts.append(f"Lines: {summary['line_count']}")
if 'edge_count' in summary:
summary_parts.append(f"Edges: {summary['edge_count']}")
summary_info = ", ".join(summary_parts) + "."
# Prepare node details only if they exist
node_details = ""
detailed_results = graph_data.get('detailed_results', {})
if 'symbols' in detailed_results:
node_details = "Nodes (symbols) in the graph include:\n"
for symbol in detailed_results['symbols']:
details = []
if 'symbol_id' in symbol:
details.append(f"ID: {symbol['symbol_id']}")
if 'class_id' in symbol:
details.append(f"Class: {symbol['class_id']}")
if 'category' in symbol:
details.append(f"Category: {symbol['category']}")
if 'type' in symbol:
details.append(f"Type: {symbol['type']}")
if 'label' in symbol:
details.append(f"Label: {symbol['label']}")
if details: # Only add if we have any details
node_details += ", ".join(details) + "\n"
initial_prompt = (
"You have access to a knowledge graph generated from a P&ID diagram. "
f"The summary information includes:\n{summary_info}\n\n"
f"{node_details}\n"
"Answer questions about the P&ID elements using this information."
)
return initial_prompt
except Exception as e:
logger.error(f"Error creating initial prompt: {str(e)}")
return ("I have access to a P&ID diagram knowledge graph. "
"I can help answer questions about the diagram elements.")
def get_assistant_response(user_message, json_path):
"""Generate response based on P&ID data and OpenAI."""
try:
client = get_openai_client()
# Load the aggregated data
with open(json_path, 'r') as f:
data = json.load(f)
# Process the user's question
question = user_message.lower()
# Use rule-based responses for specific questions
if "valve" in question or "valves" in question:
valve_count = sum(1 for symbol in data.get('symbols', [])
if 'class' in symbol and 'valve' in symbol['class'].lower())
return f"I found {valve_count} valves in this P&ID."
elif "pump" in question or "pumps" in question:
pump_count = sum(1 for symbol in data.get('symbols', [])
if 'class' in symbol and 'pump' in symbol['class'].lower())
return f"I found {pump_count} pumps in this P&ID."
elif "equipment" in question or "components" in question:
equipment_types = {}
for symbol in data.get('symbols', []):
if 'class' in symbol:
eq_type = symbol['class']
equipment_types[eq_type] = equipment_types.get(eq_type, 0) + 1
response = "Here's a summary of the equipment I found:\n"
for eq_type, count in equipment_types.items():
response += f"- {eq_type}: {count}\n"
return response
# For other questions, use OpenAI
else:
# Prepare the conversation context
graph_data = {
"summary": {
"symbol_count": len(data.get('symbols', [])),
"text_count": len(data.get('texts', [])),
"line_count": len(data.get('lines', [])),
"edge_count": len(data.get('edges', [])),
},
"detailed_results": data
}
initial_prompt = initialize_graph_prompt(graph_data)
conversation = [
{"role": "system", "content": initial_prompt},
{"role": "user", "content": user_message}
]
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=conversation
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error in get_assistant_response: {str(e)}")
logger.error(traceback.format_exc())
return "I apologize, but I encountered an error analyzing the P&ID data. Please try asking a different question."
# Testing and Usage block
if __name__ == "__main__":
# Load the knowledge graph data from JSON file
json_file_path = "results/0_aggregated_detections.json"
try:
with open(json_file_path, 'r') as file:
graph_data = json.load(file)
except FileNotFoundError:
print(f"Error: File not found at {json_file_path}")
graph_data = None
except json.JSONDecodeError:
print("Error: Failed to decode JSON. Please check the file format.")
graph_data = None
# Initialize conversation history with assistant's welcome message
history = [format_message("assistant", "Hello! I am ready to answer your questions about the P&ID knowledge graph. The graph includes nodes (symbols), edges, linkers, and text tags, and I have detailed information available about each. Please ask any questions related to these elements and their connections.")]
# Print the assistant's welcome message
print("Assistant:", history[0]["content"])
# Individual Testing Options
if graph_data:
# Option 1: Test the graph prompt initialization
print("\n--- Test: Graph Prompt Initialization ---")
initial_prompt = initialize_graph_prompt(graph_data)
print(initial_prompt)
# Option 2: Simulate a conversation with a test question
print("\n--- Test: Simulate Conversation ---")
test_question = "Can you tell me about the connections between the nodes?"
history.append(format_message("user", test_question))
print(f"\nUser: {test_question}")
for response in get_assistant_response(test_question, json_file_path):
print("Assistant:", response)
history.append(format_message("assistant", response))
# Option 3: Manually input questions for interactive testing
while True:
user_question = input("\nYou: ")
if user_question.lower() in ["exit", "quit"]:
print("Exiting chat. Goodbye!")
break
history.append(format_message("user", user_question))
for response in get_assistant_response(user_question, json_file_path):
print("Assistant:", response)
history.append(format_message("assistant", response))
else:
print("Unable to load graph data. Please check the file path and format.")