intelligent-pid / gradioChatApp.py
msIntui
Fix merge conflict in config.py
690b5e4
import os
import base64
import gradio as gr
import json
from datetime import datetime
from symbol_detection import run_detection_with_optimal_threshold
from line_detection_ai import DiagramDetectionPipeline, LineDetector, LineConfig, ImageConfig, DebugHandler, PointConfig, JunctionConfig, PointDetector, JunctionDetector, SymbolConfig, SymbolDetector, TagConfig, TagDetector
from data_aggregation_ai import DataAggregator
from chatbot_agent import get_assistant_response
from storage import StorageFactory, LocalStorage
import traceback
from text_detection_combined import process_drawing
from pathlib import Path
from pdf_processor import DocumentProcessor
import networkx as nx
import logging
import matplotlib.pyplot as plt
from dotenv import load_dotenv
import torch
from graph_visualization import create_graph_visualization
import shutil
# Load environment variables from .env file
load_dotenv()
# Configure logging at the start of the file
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Get logger for this module
logger = logging.getLogger(__name__)
# Disable duplicate logs from other modules
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('matplotlib').setLevel(logging.WARNING)
logging.getLogger('gradio').setLevel(logging.WARNING)
logging.getLogger('networkx').setLevel(logging.WARNING)
logging.getLogger('line_detection_ai').setLevel(logging.WARNING)
logging.getLogger('symbol_detection').setLevel(logging.WARNING)
# Only log important messages
def log_process_step(message, level=logging.INFO):
"""Log processing steps with appropriate level"""
if level >= logging.WARNING:
logger.log(level, message)
elif "completed" in message.lower() or "generated" in message.lower():
logger.info(message)
# Helper function to format timestamps
def get_timestamp():
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def format_message(role, content):
"""Format message for chatbot history."""
return {"role": role, "content": content}
# Load avatar images for agents
localStorage = LocalStorage()
agent_avatar = base64.b64encode(localStorage.load_file("assets/AiAgent.png")).decode()
llm_avatar = base64.b64encode(localStorage.load_file("assets/llm.png")).decode()
user_avatar = base64.b64encode(localStorage.load_file("assets/user.png")).decode()
# Chat message formatting with avatars and enhanced HTML for readability
def chat_message(role, message, avatar, timestamp):
# Convert Markdown-style formatting to HTML
formatted_message = (
message.replace("**", "<strong>").replace("**", "</strong>")
.replace("###", "<h3>").replace("##", "<h2>")
.replace("#", "<h1>").replace("\n", "<br>")
.replace("```", "<pre><code>").replace("`", "</code></pre>")
.replace("\n1. ", "<br>1. ") # For ordered lists starting with "1."
.replace("\n2. ", "<br>2. ")
.replace("\n3. ", "<br>3. ")
.replace("\n4. ", "<br>4. ")
.replace("\n5. ", "<br>5. ")
)
return f"""
<div class="chat-message {role}">
<img src="data:image/png;base64,{avatar}" class="avatar"/>
<div>
<div class="speech-bubble {role}-bubble">{formatted_message}</div>
<div class="timestamp">{timestamp}</div>
</div>
</div>
"""
# Main processing function for P&ID steps
def process_pnid(image_file, progress_status, progress=gr.Progress()):
"""Process P&ID document with real-time progress updates."""
try:
# Disable verbose logging for processing components
logging.getLogger('line_detection_ai').setLevel(logging.WARNING)
logging.getLogger('symbol_detection').setLevel(logging.WARNING)
logging.getLogger('text_detection').setLevel(logging.WARNING)
progress_text = []
outputs = [None] * 9
def update_progress(step, message):
timestamp = get_timestamp()
progress_text.append(f"{timestamp} - {message}")
outputs[7] = "\n".join(progress_text[-20:]) # Keep last 20 lines
progress(step, desc=f"Step {step}/7: {message}")
return outputs
# Update progress with smaller steps
update_progress(0.1, "Starting processing...")
yield outputs
storage = StorageFactory.get_storage()
results_dir = "results"
outputs = [None] * 9
if image_file is None:
raise ValueError("No file uploaded")
os.makedirs(results_dir, exist_ok=True)
current_progress = 0
progress_text = []
# Step 1: File Upload (10%)
logger.info(f"Processing file: {os.path.basename(image_file)}")
update_progress(0.1, "Step 1/7: File uploaded successfully")
yield outputs
# Step 2: Document Processing (25%)
update_progress(0.25, "Step 2/7: Processing document...")
yield outputs
doc_processor = DocumentProcessor(storage)
processed_pages = doc_processor.process_document(
file_path=image_file,
output_dir=results_dir
)
if not processed_pages:
raise ValueError("No pages processed from document")
display_path = processed_pages[0]
outputs[0] = display_path
update_progress(0.25, "Document processed successfully")
yield outputs
# Step 3: Symbol Detection (45%)
update_progress(0.45, "Step 3/7: Symbol Detection")
yield outputs
# Store detection results and diagram_bbox
detection_results = run_detection_with_optimal_threshold(
display_path,
results_dir=results_dir,
file_name=os.path.basename(display_path),
resize_image=True,
storage=storage
)
detection_image_path, detection_json_path, _, diagram_bbox = detection_results
if diagram_bbox is None:
logger.warning("No diagram bounding box detected, using full image")
# Provide a fallback bbox if needed
diagram_bbox = [0, 0, 0, 0] # Or get image dimensions
outputs[1] = detection_image_path
update_progress(0.45, "Symbol detection completed")
yield outputs
# Step 4: Text Detection (65%)
update_progress(0.65, "Step 4/7: Text Detection")
yield outputs
text_results, text_summary = process_drawing(display_path, results_dir, storage)
outputs[2] = text_results['image_path']
update_progress(0.65, "Text detection completed")
update_progress(0.65, f"Found {text_summary['total_detections']} text elements")
yield outputs
# Step 5: Line Detection (80%)
update_progress(0.80, "Step 5/7: Line Detection")
yield outputs
try:
# Initialize components
debug_handler = DebugHandler(enabled=True, storage=storage)
# Configure detectors
line_config = LineConfig()
point_config = PointConfig()
junction_config = JunctionConfig()
symbol_config = SymbolConfig()
tag_config = TagConfig()
# Create all required detectors
symbol_detector = SymbolDetector(
config=symbol_config,
debug_handler=debug_handler
)
tag_detector = TagDetector(
config=tag_config,
debug_handler=debug_handler
)
line_detector = LineDetector(
config=line_config,
model_path="models/deeplsd_md.tar",
model_config={"detect_lines": True},
device=torch.device("cpu"),
debug_handler=debug_handler
)
point_detector = PointDetector(
config=point_config,
debug_handler=debug_handler
)
junction_detector = JunctionDetector(
config=junction_config,
debug_handler=debug_handler
)
# Create and run pipeline with all detectors
pipeline = DiagramDetectionPipeline(
tag_detector=tag_detector,
symbol_detector=symbol_detector,
line_detector=line_detector,
point_detector=point_detector,
junction_detector=junction_detector,
storage=storage,
debug_handler=debug_handler
)
# Run pipeline
result = pipeline.run(
image_path=display_path,
output_dir=results_dir,
config=ImageConfig()
)
if result.success:
line_image_path = result.image_path
line_json_path = result.json_path
outputs[3] = line_image_path
update_progress(0.80, "Line detection completed")
else:
logger.error(f"Pipeline failed: {result.error}")
raise Exception(result.error)
except Exception as e:
logger.error(f"Line detection error: {str(e)}")
raise
# Step 6: Data Aggregation (90%)
update_progress(0.90, "Step 6/7: Data Aggregation")
yield outputs
data_aggregator = DataAggregator(storage=storage)
aggregated_data = data_aggregator.aggregate_data(
symbols_path=detection_json_path,
texts_path=text_results['json_path'],
lines_path=line_json_path
)
# Add image path to aggregated data
aggregated_data['image_path'] = display_path
# Save aggregated data
aggregated_json_path = os.path.join(results_dir, f"{Path(display_path).stem}_aggregated.json")
with open(aggregated_json_path, 'w') as f:
json.dump(aggregated_data, f, indent=2)
# Use the detection image as the aggregated view for now
# TODO: Implement visualization in DataAggregator if needed
outputs[4] = detection_image_path # Changed from aggregated_image_path
outputs[8] = aggregated_json_path
update_progress(0.90, "Data aggregation completed")
yield outputs
# Step 7: Graph Generation (95%)
update_progress(0.95, "Step 7/7: Generating knowledge graph...")
yield outputs
try:
with open(aggregated_json_path, 'r') as f:
aggregated_detection_data = json.load(f)
logger.info("Creating knowledge graph...")
# Create graph visualization - this will save the visualization file
G, _ = create_graph_visualization(aggregated_json_path, save_plot=True)
if G is not None:
# Use the saved visualization file
graph_image_path = os.path.join(os.path.dirname(aggregated_json_path), "graph_visualization.png")
if os.path.exists(graph_image_path):
outputs[5] = graph_image_path
update_progress(0.95, "Knowledge graph generated")
logger.info("Knowledge graph generated and saved successfully")
# Final completion (100%)
update_progress(1.0, "✅ Processing Complete")
welcome_message = chat_message(
"agent",
"Processing complete! I can help answer questions about the P&ID contents.",
agent_avatar,
get_timestamp()
)
outputs[6] = welcome_message
update_progress(1.0, "✅ All processing steps completed successfully!")
yield outputs
else:
logger.warning("Graph visualization file not found")
update_progress(1.0, "⚠️ Warning: Graph visualization could not be generated")
yield outputs
else:
logger.warning("No graph was generated")
update_progress(1.0, "⚠️ Warning: No graph could be generated")
yield outputs
except Exception as e:
logger.error(f"Error in graph generation: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise
except Exception as e:
logger.error(f"Error in process_pnid: {str(e)}")
logger.error(traceback.format_exc())
error_msg = f"❌ Error: {str(e)}"
update_progress(1.0, error_msg)
yield outputs
# Separate function for Chat interaction
def handle_user_message(user_input, chat_history, json_path_state):
"""Handle user messages and generate responses."""
try:
if not user_input or not user_input.strip():
return chat_history
# Add user message
timestamp = get_timestamp()
new_history = chat_history + chat_message("user", user_input, user_avatar, timestamp)
# Check if json_path exists and is valid
if not json_path_state or not os.path.exists(json_path_state):
error_message = "Please upload and process a P&ID document first."
return new_history + chat_message("assistant", error_message, agent_avatar, get_timestamp())
try:
# Log for debugging
logger.info(f"Sending question to assistant: {user_input}")
logger.info(f"Using JSON path: {json_path_state}")
# Generate response
response = get_assistant_response(user_input, json_path_state)
# Handle the response
if isinstance(response, (str, dict)):
response_text = str(response)
else:
try:
# Try to get the first response from generator
response_text = next(response) if hasattr(response, '__next__') else str(response)
except StopIteration:
response_text = "I apologize, but I couldn't generate a response."
except Exception as e:
logger.error(f"Error processing response: {str(e)}")
response_text = "I apologize, but I encountered an error processing your request."
logger.info(f"Generated response: {response_text}")
if not response_text.strip():
response_text = "I apologize, but I couldn't generate a response. Please try asking your question differently."
# Add response to chat history
new_history += chat_message("assistant", response_text, agent_avatar, get_timestamp())
except Exception as e:
logger.error(f"Error generating response: {str(e)}")
logger.error(traceback.format_exc())
error_message = "I apologize, but I encountered an error processing your request. Please try again."
new_history += chat_message("assistant", error_message, agent_avatar, get_timestamp())
return new_history
except Exception as e:
logger.error(f"Chat error: {str(e)}")
logger.error(traceback.format_exc())
return chat_history + chat_message(
"assistant",
"I apologize, but something went wrong. Please try again.",
agent_avatar,
get_timestamp()
)
# Update custom CSS
custom_css = """
.full-height-row {
height: calc(100vh - 150px); /* Adjusted height */
margin: 0;
padding: 10px;
}
.upload-box {
background: #2a2a2a;
border-radius: 8px;
padding: 15px;
margin-bottom: 15px;
border: 1px solid #3a3a3a;
}
.status-box-container {
background: #2a2a2a;
border-radius: 8px;
padding: 15px;
height: calc(100vh - 350px); /* Reduced height */
border: 1px solid #3a3a3a;
margin-bottom: 15px;
}
.status-box {
font-family: 'Courier New', monospace;
font-size: 12px;
line-height: 1.4;
background-color: #1a1a1a;
color: #00ff00;
padding: 10px;
border-radius: 5px;
height: calc(100% - 40px); /* Adjust for header */
overflow-y: auto;
white-space: pre-wrap;
word-wrap: break-word;
border: none;
}
.preview-tabs {
height: calc(100vh - 350px); /* Reduced height */
background: #2a2a2a;
border-radius: 8px;
padding: 15px;
border: 1px solid #3a3a3a;
margin-bottom: 15px;
}
.chat-container {
height: 100%; /* Take full height */
display: flex;
flex-direction: column;
background: #2a2a2a;
border-radius: 8px;
padding: 15px;
border: 1px solid #3a3a3a;
}
.chatbox {
flex: 1; /* Take remaining space */
overflow-y: auto;
background: #1a1a1a;
border-radius: 8px;
padding: 15px;
margin-bottom: 15px;
color: #ffffff;
min-height: 200px; /* Ensure minimum height */
}
.chat-input-group {
height: auto; /* Allow natural height */
min-height: 120px; /* Minimum height for input area */
background: #1a1a1a;
border-radius: 8px;
padding: 15px;
margin-top: auto; /* Push to bottom */
}
.chat-input {
background: #2a2a2a;
color: #ffffff;
border: 1px solid #3a3a3a;
border-radius: 5px;
padding: 12px;
min-height: 80px;
width: 100%;
margin-bottom: 10px;
}
.send-button {
width: 100%;
background: #4a4a4a;
color: #ffffff;
border-radius: 5px;
border: none;
padding: 12px;
cursor: pointer;
transition: background-color 0.3s;
}
.result-image {
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin: 10px 0;
background: #ffffff;
}
.chat-message {
display: flex;
margin-bottom: 1rem;
align-items: flex-start;
}
.chat-message .avatar {
width: 40px;
height: 40px;
margin-right: 10px;
border-radius: 50%;
}
.chat-message .speech-bubble {
background: #2a2a2a;
padding: 10px 15px;
border-radius: 10px;
max-width: 80%;
margin-bottom: 5px;
}
.chat-message .timestamp {
font-size: 0.8em;
color: #666;
}
.logo-row {
width: 100%;
background-color: #1a1a1a;
padding: 10px 0;
margin: 0;
border-bottom: 1px solid #3a3a3a;
}
"""
def check_environment():
"""Check required environment variables and model files."""
logger.info("Checking environment configuration...")
try:
from storage import StorageFactory
storage = StorageFactory.get_storage()
logger.info(f"Storage initialized successfully: {storage.__class__.__name__}")
except Exception as e:
logger.error(f"Storage initialization error: {str(e)}")
logger.error(traceback.format_exc())
return False
# Log environment variables (excluding sensitive data)
env_vars = {
'STORAGE_TYPE': os.getenv('STORAGE_TYPE'),
'USE_TORCH': os.getenv('USE_TORCH'),
'LANGCHAIN_TRACING_V2': os.getenv('LANGCHAIN_TRACING_V2'),
'LANGCHAIN_PROJECT': os.getenv('LANGCHAIN_PROJECT')
}
logger.info(f"Environment variables: {env_vars}")
return True
def create_ui():
"""Create the Gradio interface with error handling."""
try:
# Check environment before creating UI
if not check_environment():
raise EnvironmentError("Missing required configuration. Check logs for details.")
# Create UI components
with gr.Blocks(css=custom_css) as demo:
# Logo row
with gr.Row(elem_classes=["logo-row"]):
try:
logo_path = os.path.join(os.path.dirname(__file__), "assets", "intuigence.png")
if os.path.exists(logo_path):
with open(logo_path, "rb") as f:
logo_base64 = base64.b64encode(f.read()).decode()
gr.HTML(f"""
<div style="text-align: center; padding: 10px; background-color: #1a1a1a; width: 100%;">
<img src="data:image/png;base64,{logo_base64}"
alt="Intuigence Logo"
style="height: 60px; object-fit: contain;">
</div>
""")
else:
logger.warning(f"Logo not found at {logo_path}")
except Exception as e:
logger.error(f"Error loading logo: {e}")
# Main layout
with gr.Row(equal_height=True, elem_classes=["full-height-row"]):
# Left column
with gr.Column(scale=2):
# Upload area
with gr.Column(elem_classes=["upload-box"]):
image_input = gr.File(
label="Upload P&ID Document",
file_types=[".pdf", ".png", ".jpg", ".jpeg"],
file_count="single",
type="filepath"
)
# Status area
with gr.Column(elem_classes=["status-box-container"]):
gr.Markdown("### Processing Status")
progress_status = gr.Textbox(
label="Status",
show_label=False,
elem_classes=["status-box"],
lines=15,
max_lines=20,
interactive=False,
autoscroll=True,
value="" # Initialize with empty value
)
json_path_state = gr.State()
# Center column
with gr.Column(scale=5):
with gr.Tabs(elem_classes=["preview-tabs"]) as tabs:
with gr.TabItem("P&ID"):
original_image = gr.Image(label="Original P&ID", height=450) # Reduced height
with gr.TabItem("Symbols"):
symbol_image = gr.Image(label="Detected Symbols", height=450)
with gr.TabItem("Tags"):
text_image = gr.Image(label="Detected Tags", height=450)
with gr.TabItem("Pipelines"):
line_image = gr.Image(label="Detected Lines", height=450)
with gr.TabItem("Aggregated"):
aggregated_image = gr.Image(label="Aggregated Results", height=450)
with gr.TabItem("Graph"):
graph_image = gr.Image(label="Knowledge Graph", height=450)
# Right column
with gr.Column(scale=3):
with gr.Column(elem_classes=["chat-container"]):
gr.Markdown("### Chat Interface")
# Initialize chat with a welcome message
initial_chat = chat_message(
"agent",
"Ready to process P&ID documents and answer questions.",
agent_avatar,
get_timestamp()
)
chat_output = gr.HTML(
label="Chat",
elem_classes=["chatbox"],
value=initial_chat
)
# Message input and send button in a fixed-height container
with gr.Column(elem_classes=["chat-input-group"]):
user_input = gr.Textbox(
show_label=False,
placeholder="Type your question here...",
elem_classes=["chat-input"],
lines=3
)
send_button = gr.Button(
"Send",
elem_classes=["send-button"]
)
# Set up event handlers inside the Blocks context
image_input.upload(
fn=process_pnid,
inputs=[image_input, progress_status],
outputs=[
original_image,
symbol_image,
text_image,
line_image,
aggregated_image,
graph_image,
chat_output,
progress_status,
json_path_state
],
show_progress="hidden" # Hide the default progress bar
)
# Add input clearing and enable/disable logic for chat
def clear_and_handle_message(user_message, chat_history, json_path):
response = handle_user_message(user_message, chat_history, json_path)
return "", response # Clear input after sending
send_button.click(
fn=clear_and_handle_message,
inputs=[user_input, chat_output, json_path_state],
outputs=[user_input, chat_output]
)
# Also trigger on Enter key
user_input.submit(
fn=clear_and_handle_message,
inputs=[user_input, chat_output, json_path_state],
outputs=[user_input, chat_output]
)
return demo
except Exception as e:
logger.error(f"Error creating UI: {str(e)}")
logger.error(traceback.format_exc())
# Create a minimal UI showing the error
with gr.Blocks() as error_demo:
gr.Markdown("# ⚠️ Configuration Error")
gr.Markdown(f"Error: {str(e)}")
gr.Markdown("Please check the logs and configuration.")
return error_demo
def main():
demo = create_ui()
# Local development settings
demo.launch(server_name="0.0.0.0",
server_port=7860,
share=False)
if __name__ == "__main__":
main()
else:
# For Spaces deployment
try:
logger.info("Initializing Spaces deployment...")
demo = create_ui()
app = demo.app
logger.info("Application initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize app: {str(e)}")
logger.error(traceback.format_exc())
with gr.Blocks() as error_demo:
gr.Markdown("# ⚠️ Deployment Error")
gr.Markdown(f"Error: {str(e)}")
gr.Markdown("Please check the logs for details.")
app = error_demo.app