import os import base64 import gradio as gr import json from datetime import datetime from symbol_detection import run_detection_with_optimal_threshold from line_detection_ai import DiagramDetectionPipeline, LineDetector, LineConfig, ImageConfig, DebugHandler, PointConfig, JunctionConfig, PointDetector, JunctionDetector, SymbolConfig, SymbolDetector, TagConfig, TagDetector from data_aggregation_ai import DataAggregator from chatbot_agent import get_assistant_response from storage import StorageFactory, LocalStorage import traceback from text_detection_combined import process_drawing from pathlib import Path from pdf_processor import DocumentProcessor import networkx as nx import logging import matplotlib.pyplot as plt from dotenv import load_dotenv import torch from graph_visualization import create_graph_visualization import shutil # Load environment variables from .env file load_dotenv() # Configure logging at the start of the file logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Get logger for this module logger = logging.getLogger(__name__) # Disable duplicate logs from other modules logging.getLogger('PIL').setLevel(logging.WARNING) logging.getLogger('matplotlib').setLevel(logging.WARNING) logging.getLogger('gradio').setLevel(logging.WARNING) logging.getLogger('networkx').setLevel(logging.WARNING) logging.getLogger('line_detection_ai').setLevel(logging.WARNING) logging.getLogger('symbol_detection').setLevel(logging.WARNING) # Only log important messages def log_process_step(message, level=logging.INFO): """Log processing steps with appropriate level""" if level >= logging.WARNING: logger.log(level, message) elif "completed" in message.lower() or "generated" in message.lower(): logger.info(message) # Helper function to format timestamps def get_timestamp(): return datetime.now().strftime('%Y-%m-%d %H:%M:%S') def format_message(role, content): """Format message for chatbot history.""" return {"role": role, "content": content} # Load avatar images for agents localStorage = LocalStorage() agent_avatar = base64.b64encode(localStorage.load_file("assets/AiAgent.png")).decode() llm_avatar = base64.b64encode(localStorage.load_file("assets/llm.png")).decode() user_avatar = base64.b64encode(localStorage.load_file("assets/user.png")).decode() # Chat message formatting with avatars and enhanced HTML for readability def chat_message(role, message, avatar, timestamp): # Convert Markdown-style formatting to HTML formatted_message = ( message.replace("**", "").replace("**", "") .replace("###", "

").replace("##", "

") .replace("#", "

").replace("\n", "
") .replace("```", "
").replace("`", "
") .replace("\n1. ", "
1. ") # For ordered lists starting with "1." .replace("\n2. ", "
2. ") .replace("\n3. ", "
3. ") .replace("\n4. ", "
4. ") .replace("\n5. ", "
5. ") ) return f"""
{formatted_message}
{timestamp}
""" # Main processing function for P&ID steps def process_pnid(image_file, progress_status, progress=gr.Progress()): """Process P&ID document with real-time progress updates.""" try: # Disable verbose logging for processing components logging.getLogger('line_detection_ai').setLevel(logging.WARNING) logging.getLogger('symbol_detection').setLevel(logging.WARNING) logging.getLogger('text_detection').setLevel(logging.WARNING) progress_text = [] outputs = [None] * 9 def update_progress(step, message): timestamp = get_timestamp() progress_text.append(f"{timestamp} - {message}") outputs[7] = "\n".join(progress_text[-20:]) # Keep last 20 lines progress(step, desc=f"Step {step}/7: {message}") return outputs # Update progress with smaller steps update_progress(0.1, "Starting processing...") yield outputs storage = StorageFactory.get_storage() results_dir = "results" outputs = [None] * 9 if image_file is None: raise ValueError("No file uploaded") os.makedirs(results_dir, exist_ok=True) current_progress = 0 progress_text = [] # Step 1: File Upload (10%) logger.info(f"Processing file: {os.path.basename(image_file)}") update_progress(0.1, "Step 1/7: File uploaded successfully") yield outputs # Step 2: Document Processing (25%) update_progress(0.25, "Step 2/7: Processing document...") yield outputs doc_processor = DocumentProcessor(storage) processed_pages = doc_processor.process_document( file_path=image_file, output_dir=results_dir ) if not processed_pages: raise ValueError("No pages processed from document") display_path = processed_pages[0] outputs[0] = display_path update_progress(0.25, "Document processed successfully") yield outputs # Step 3: Symbol Detection (45%) update_progress(0.45, "Step 3/7: Symbol Detection") yield outputs # Store detection results and diagram_bbox detection_results = run_detection_with_optimal_threshold( display_path, results_dir=results_dir, file_name=os.path.basename(display_path), resize_image=True, storage=storage ) detection_image_path, detection_json_path, _, diagram_bbox = detection_results if diagram_bbox is None: logger.warning("No diagram bounding box detected, using full image") # Provide a fallback bbox if needed diagram_bbox = [0, 0, 0, 0] # Or get image dimensions outputs[1] = detection_image_path update_progress(0.45, "Symbol detection completed") yield outputs # Step 4: Text Detection (65%) update_progress(0.65, "Step 4/7: Text Detection") yield outputs text_results, text_summary = process_drawing(display_path, results_dir, storage) outputs[2] = text_results['image_path'] update_progress(0.65, "Text detection completed") update_progress(0.65, f"Found {text_summary['total_detections']} text elements") yield outputs # Step 5: Line Detection (80%) update_progress(0.80, "Step 5/7: Line Detection") yield outputs try: # Initialize components debug_handler = DebugHandler(enabled=True, storage=storage) # Configure detectors line_config = LineConfig() point_config = PointConfig() junction_config = JunctionConfig() symbol_config = SymbolConfig() tag_config = TagConfig() # Create all required detectors symbol_detector = SymbolDetector( config=symbol_config, debug_handler=debug_handler ) tag_detector = TagDetector( config=tag_config, debug_handler=debug_handler ) line_detector = LineDetector( config=line_config, model_path="models/deeplsd_md.tar", model_config={"detect_lines": True}, device=torch.device("cpu"), debug_handler=debug_handler ) point_detector = PointDetector( config=point_config, debug_handler=debug_handler ) junction_detector = JunctionDetector( config=junction_config, debug_handler=debug_handler ) # Create and run pipeline with all detectors pipeline = DiagramDetectionPipeline( tag_detector=tag_detector, symbol_detector=symbol_detector, line_detector=line_detector, point_detector=point_detector, junction_detector=junction_detector, storage=storage, debug_handler=debug_handler ) # Run pipeline result = pipeline.run( image_path=display_path, output_dir=results_dir, config=ImageConfig() ) if result.success: line_image_path = result.image_path line_json_path = result.json_path outputs[3] = line_image_path update_progress(0.80, "Line detection completed") else: logger.error(f"Pipeline failed: {result.error}") raise Exception(result.error) except Exception as e: logger.error(f"Line detection error: {str(e)}") raise # Step 6: Data Aggregation (90%) update_progress(0.90, "Step 6/7: Data Aggregation") yield outputs data_aggregator = DataAggregator(storage=storage) aggregated_data = data_aggregator.aggregate_data( symbols_path=detection_json_path, texts_path=text_results['json_path'], lines_path=line_json_path ) # Add image path to aggregated data aggregated_data['image_path'] = display_path # Save aggregated data aggregated_json_path = os.path.join(results_dir, f"{Path(display_path).stem}_aggregated.json") with open(aggregated_json_path, 'w') as f: json.dump(aggregated_data, f, indent=2) # Use the detection image as the aggregated view for now # TODO: Implement visualization in DataAggregator if needed outputs[4] = detection_image_path # Changed from aggregated_image_path outputs[8] = aggregated_json_path update_progress(0.90, "Data aggregation completed") yield outputs # Step 7: Graph Generation (95%) update_progress(0.95, "Step 7/7: Generating knowledge graph...") yield outputs try: with open(aggregated_json_path, 'r') as f: aggregated_detection_data = json.load(f) logger.info("Creating knowledge graph...") # Create graph visualization - this will save the visualization file G, _ = create_graph_visualization(aggregated_json_path, save_plot=True) if G is not None: # Use the saved visualization file graph_image_path = os.path.join(os.path.dirname(aggregated_json_path), "graph_visualization.png") if os.path.exists(graph_image_path): outputs[5] = graph_image_path update_progress(0.95, "Knowledge graph generated") logger.info("Knowledge graph generated and saved successfully") # Final completion (100%) update_progress(1.0, "✅ Processing Complete") welcome_message = chat_message( "agent", "Processing complete! I can help answer questions about the P&ID contents.", agent_avatar, get_timestamp() ) outputs[6] = welcome_message update_progress(1.0, "✅ All processing steps completed successfully!") yield outputs else: logger.warning("Graph visualization file not found") update_progress(1.0, "⚠️ Warning: Graph visualization could not be generated") yield outputs else: logger.warning("No graph was generated") update_progress(1.0, "⚠️ Warning: No graph could be generated") yield outputs except Exception as e: logger.error(f"Error in graph generation: {str(e)}") logger.error(f"Traceback: {traceback.format_exc()}") raise except Exception as e: logger.error(f"Error in process_pnid: {str(e)}") logger.error(traceback.format_exc()) error_msg = f"❌ Error: {str(e)}" update_progress(1.0, error_msg) yield outputs # Separate function for Chat interaction def handle_user_message(user_input, chat_history, json_path_state): """Handle user messages and generate responses.""" try: if not user_input or not user_input.strip(): return chat_history # Add user message timestamp = get_timestamp() new_history = chat_history + chat_message("user", user_input, user_avatar, timestamp) # Check if json_path exists and is valid if not json_path_state or not os.path.exists(json_path_state): error_message = "Please upload and process a P&ID document first." return new_history + chat_message("assistant", error_message, agent_avatar, get_timestamp()) try: # Log for debugging logger.info(f"Sending question to assistant: {user_input}") logger.info(f"Using JSON path: {json_path_state}") # Generate response response = get_assistant_response(user_input, json_path_state) # Handle the response if isinstance(response, (str, dict)): response_text = str(response) else: try: # Try to get the first response from generator response_text = next(response) if hasattr(response, '__next__') else str(response) except StopIteration: response_text = "I apologize, but I couldn't generate a response." except Exception as e: logger.error(f"Error processing response: {str(e)}") response_text = "I apologize, but I encountered an error processing your request." logger.info(f"Generated response: {response_text}") if not response_text.strip(): response_text = "I apologize, but I couldn't generate a response. Please try asking your question differently." # Add response to chat history new_history += chat_message("assistant", response_text, agent_avatar, get_timestamp()) except Exception as e: logger.error(f"Error generating response: {str(e)}") logger.error(traceback.format_exc()) error_message = "I apologize, but I encountered an error processing your request. Please try again." new_history += chat_message("assistant", error_message, agent_avatar, get_timestamp()) return new_history except Exception as e: logger.error(f"Chat error: {str(e)}") logger.error(traceback.format_exc()) return chat_history + chat_message( "assistant", "I apologize, but something went wrong. Please try again.", agent_avatar, get_timestamp() ) # Update custom CSS custom_css = """ .full-height-row { height: calc(100vh - 150px); /* Adjusted height */ margin: 0; padding: 10px; } .upload-box { background: #2a2a2a; border-radius: 8px; padding: 15px; margin-bottom: 15px; border: 1px solid #3a3a3a; } .status-box-container { background: #2a2a2a; border-radius: 8px; padding: 15px; height: calc(100vh - 350px); /* Reduced height */ border: 1px solid #3a3a3a; margin-bottom: 15px; } .status-box { font-family: 'Courier New', monospace; font-size: 12px; line-height: 1.4; background-color: #1a1a1a; color: #00ff00; padding: 10px; border-radius: 5px; height: calc(100% - 40px); /* Adjust for header */ overflow-y: auto; white-space: pre-wrap; word-wrap: break-word; border: none; } .preview-tabs { height: calc(100vh - 350px); /* Reduced height */ background: #2a2a2a; border-radius: 8px; padding: 15px; border: 1px solid #3a3a3a; margin-bottom: 15px; } .chat-container { height: 100%; /* Take full height */ display: flex; flex-direction: column; background: #2a2a2a; border-radius: 8px; padding: 15px; border: 1px solid #3a3a3a; } .chatbox { flex: 1; /* Take remaining space */ overflow-y: auto; background: #1a1a1a; border-radius: 8px; padding: 15px; margin-bottom: 15px; color: #ffffff; min-height: 200px; /* Ensure minimum height */ } .chat-input-group { height: auto; /* Allow natural height */ min-height: 120px; /* Minimum height for input area */ background: #1a1a1a; border-radius: 8px; padding: 15px; margin-top: auto; /* Push to bottom */ } .chat-input { background: #2a2a2a; color: #ffffff; border: 1px solid #3a3a3a; border-radius: 5px; padding: 12px; min-height: 80px; width: 100%; margin-bottom: 10px; } .send-button { width: 100%; background: #4a4a4a; color: #ffffff; border-radius: 5px; border: none; padding: 12px; cursor: pointer; transition: background-color 0.3s; } .result-image { border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); margin: 10px 0; background: #ffffff; } .chat-message { display: flex; margin-bottom: 1rem; align-items: flex-start; } .chat-message .avatar { width: 40px; height: 40px; margin-right: 10px; border-radius: 50%; } .chat-message .speech-bubble { background: #2a2a2a; padding: 10px 15px; border-radius: 10px; max-width: 80%; margin-bottom: 5px; } .chat-message .timestamp { font-size: 0.8em; color: #666; } .logo-row { width: 100%; background-color: #1a1a1a; padding: 10px 0; margin: 0; border-bottom: 1px solid #3a3a3a; } """ def check_environment(): """Check required environment variables and model files.""" logger.info("Checking environment configuration...") try: from storage import StorageFactory storage = StorageFactory.get_storage() logger.info(f"Storage initialized successfully: {storage.__class__.__name__}") except Exception as e: logger.error(f"Storage initialization error: {str(e)}") logger.error(traceback.format_exc()) return False # Log environment variables (excluding sensitive data) env_vars = { 'STORAGE_TYPE': os.getenv('STORAGE_TYPE'), 'USE_TORCH': os.getenv('USE_TORCH'), 'LANGCHAIN_TRACING_V2': os.getenv('LANGCHAIN_TRACING_V2'), 'LANGCHAIN_PROJECT': os.getenv('LANGCHAIN_PROJECT') } logger.info(f"Environment variables: {env_vars}") return True def create_ui(): """Create the Gradio interface with error handling.""" try: # Check environment before creating UI if not check_environment(): raise EnvironmentError("Missing required configuration. Check logs for details.") # Create UI components with gr.Blocks(css=custom_css) as demo: # Logo row with gr.Row(elem_classes=["logo-row"]): try: logo_path = os.path.join(os.path.dirname(__file__), "assets", "intuigence.png") if os.path.exists(logo_path): with open(logo_path, "rb") as f: logo_base64 = base64.b64encode(f.read()).decode() gr.HTML(f"""
Intuigence Logo
""") else: logger.warning(f"Logo not found at {logo_path}") except Exception as e: logger.error(f"Error loading logo: {e}") # Main layout with gr.Row(equal_height=True, elem_classes=["full-height-row"]): # Left column with gr.Column(scale=2): # Upload area with gr.Column(elem_classes=["upload-box"]): image_input = gr.File( label="Upload P&ID Document", file_types=[".pdf", ".png", ".jpg", ".jpeg"], file_count="single", type="filepath" ) # Status area with gr.Column(elem_classes=["status-box-container"]): gr.Markdown("### Processing Status") progress_status = gr.Textbox( label="Status", show_label=False, elem_classes=["status-box"], lines=15, max_lines=20, interactive=False, autoscroll=True, value="" # Initialize with empty value ) json_path_state = gr.State() # Center column with gr.Column(scale=5): with gr.Tabs(elem_classes=["preview-tabs"]) as tabs: with gr.TabItem("P&ID"): original_image = gr.Image(label="Original P&ID", height=450) # Reduced height with gr.TabItem("Symbols"): symbol_image = gr.Image(label="Detected Symbols", height=450) with gr.TabItem("Tags"): text_image = gr.Image(label="Detected Tags", height=450) with gr.TabItem("Pipelines"): line_image = gr.Image(label="Detected Lines", height=450) with gr.TabItem("Aggregated"): aggregated_image = gr.Image(label="Aggregated Results", height=450) with gr.TabItem("Graph"): graph_image = gr.Image(label="Knowledge Graph", height=450) # Right column with gr.Column(scale=3): with gr.Column(elem_classes=["chat-container"]): gr.Markdown("### Chat Interface") # Initialize chat with a welcome message initial_chat = chat_message( "agent", "Ready to process P&ID documents and answer questions.", agent_avatar, get_timestamp() ) chat_output = gr.HTML( label="Chat", elem_classes=["chatbox"], value=initial_chat ) # Message input and send button in a fixed-height container with gr.Column(elem_classes=["chat-input-group"]): user_input = gr.Textbox( show_label=False, placeholder="Type your question here...", elem_classes=["chat-input"], lines=3 ) send_button = gr.Button( "Send", elem_classes=["send-button"] ) # Set up event handlers inside the Blocks context image_input.upload( fn=process_pnid, inputs=[image_input, progress_status], outputs=[ original_image, symbol_image, text_image, line_image, aggregated_image, graph_image, chat_output, progress_status, json_path_state ], show_progress="hidden" # Hide the default progress bar ) # Add input clearing and enable/disable logic for chat def clear_and_handle_message(user_message, chat_history, json_path): response = handle_user_message(user_message, chat_history, json_path) return "", response # Clear input after sending send_button.click( fn=clear_and_handle_message, inputs=[user_input, chat_output, json_path_state], outputs=[user_input, chat_output] ) # Also trigger on Enter key user_input.submit( fn=clear_and_handle_message, inputs=[user_input, chat_output, json_path_state], outputs=[user_input, chat_output] ) return demo except Exception as e: logger.error(f"Error creating UI: {str(e)}") logger.error(traceback.format_exc()) # Create a minimal UI showing the error with gr.Blocks() as error_demo: gr.Markdown("# ⚠️ Configuration Error") gr.Markdown(f"Error: {str(e)}") gr.Markdown("Please check the logs and configuration.") return error_demo def main(): demo = create_ui() # Local development settings demo.launch(server_name="0.0.0.0", server_port=7860, share=False) if __name__ == "__main__": main() else: # For Spaces deployment try: logger.info("Initializing Spaces deployment...") demo = create_ui() app = demo.app logger.info("Application initialized successfully") except Exception as e: logger.error(f"Failed to initialize app: {str(e)}") logger.error(traceback.format_exc()) with gr.Blocks() as error_demo: gr.Markdown("# ⚠️ Deployment Error") gr.Markdown(f"Error: {str(e)}") gr.Markdown("Please check the logs for details.") app = error_demo.app