Spaces:
Runtime error
Runtime error
| """ | |
| Llama + FAISS RAG System for Fire Evacuation with Advanced Reasoning | |
| This module implements a RAG (Retrieval-Augmented Generation) system for fire evacuation scenarios | |
| with advanced LLM reasoning techniques including: | |
| 1. Chain-of-Thought (CoT) Prompting: | |
| - Enables step-by-step reasoning through intermediate steps | |
| - Improves complex problem-solving capabilities | |
| - Reference: https://arxiv.org/pdf/2201.11903 | |
| 2. Tree-of-Thoughts (ToT): | |
| - Maintains multiple reasoning paths | |
| - Self-evaluates progress through intermediate thoughts | |
| - Enables deliberate reasoning process | |
| - Reference: https://arxiv.org/pdf/2305.10601 | |
| 3. Reflexion: | |
| - Reinforces language-based agents through linguistic feedback | |
| - Self-reflection and iterative improvement | |
| - Reference: https://arxiv.org/pdf/2303.11366 | |
| 4. CoT with Tools: | |
| - Combines CoT prompting with external tools | |
| - Interleaved reasoning and tool usage | |
| - Reference: https://arxiv.org/pdf/2303.09014 | |
| 5. Advanced Decoding Strategies: | |
| - Greedy: Deterministic highest probability | |
| - Sampling: Random sampling with temperature | |
| - Beam Search: Explores multiple paths | |
| - Nucleus (Top-p): Samples from top-p probability mass | |
| - Temperature: Temperature-based sampling | |
| Downloads Llama model, creates JSON dataset, builds FAISS index, and provides RAG querying | |
| """ | |
| import unsloth | |
| import json | |
| import os | |
| import pickle | |
| import glob | |
| import re | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from pathlib import Path | |
| from enum import Enum | |
| import copy | |
| import numpy as np | |
| import faiss | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from sentence_transformers import SentenceTransformer | |
| import gradio as gr | |
| # Project imports (use helper_files package) | |
| from floor_plan import create_sample_floor_plan, FloorPlan | |
| from sensor_system import create_sample_fire_scenario, SensorSystem | |
| from pathfinding import PathFinder | |
| class FireEvacuationDataExporter: | |
| """Exports fire evacuation system data to JSON format""" | |
| def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem, pathfinder: PathFinder): | |
| self.floor_plan = floor_plan | |
| self.sensor_system = sensor_system | |
| self.pathfinder = pathfinder | |
| def export_room_data(self, room_id: str) -> Dict[str, Any]: | |
| """Export comprehensive room data to JSON""" | |
| room = self.floor_plan.get_room(room_id) | |
| sensor = self.sensor_system.get_sensor_reading(room_id) | |
| if not room or not sensor: | |
| return {} | |
| return { | |
| "room_id": room_id, | |
| "name": room.name, | |
| "room_type": room.room_type, | |
| "position": room.position, | |
| "size": room.size, | |
| "has_oxygen_cylinder": room.has_oxygen_cylinder, | |
| "has_fire_extinguisher": room.has_fire_extinguisher, | |
| "connected_to": [conn[0] for conn in room.connected_to], | |
| "sensor_data": { | |
| "fire_detected": sensor.fire_detected, | |
| "smoke_level": round(sensor.smoke_level, 2), | |
| "temperature_c": round(sensor.temperature, 1), | |
| "oxygen_pct": round(sensor.oxygen_level, 1), | |
| "visibility_pct": round(sensor.visibility, 1), | |
| "structural_integrity_pct": round(sensor.structural_integrity, 1), | |
| "fire_growth_rate": round(sensor.fire_growth_rate, 2), | |
| "flashover_risk": round(sensor.flashover_risk, 2), | |
| "backdraft_risk": round(sensor.backdraft_risk, 2), | |
| "heat_radiation": round(sensor.heat_radiation, 2), | |
| "fire_type": sensor.fire_type, | |
| "carbon_monoxide_ppm": round(sensor.carbon_monoxide, 1), | |
| "carbon_dioxide_ppm": round(sensor.carbon_dioxide, 1), | |
| "hydrogen_cyanide_ppm": round(sensor.hydrogen_cyanide, 2), | |
| "hydrogen_chloride_ppm": round(sensor.hydrogen_chloride, 2), | |
| "wind_direction": round(sensor.wind_direction, 1), | |
| "wind_speed": round(sensor.wind_speed, 2), | |
| "air_pressure": round(sensor.air_pressure, 2), | |
| "humidity": round(sensor.humidity, 1), | |
| "occupancy_density": round(sensor.occupancy_density, 2), | |
| "mobility_limitations": sensor.mobility_limitations, | |
| "panic_level": round(sensor.panic_level, 2), | |
| "evacuation_progress": round(sensor.evacuation_progress, 1), | |
| "sprinkler_active": sensor.sprinkler_active, | |
| "emergency_lighting": sensor.emergency_lighting, | |
| "elevator_available": sensor.elevator_available, | |
| "stairwell_clear": sensor.stairwell_clear, | |
| "exit_accessible": sensor.exit_accessible, | |
| "exit_capacity": sensor.exit_capacity, | |
| "ventilation_active": sensor.ventilation_active, | |
| "time_since_fire_start": sensor.time_since_fire_start, | |
| "estimated_time_to_exit": sensor.estimated_time_to_exit, | |
| "emergency_comm_working": sensor.emergency_comm_working, | |
| "wifi_signal_strength": round(sensor.wifi_signal_strength, 1), | |
| "danger_score": round(sensor.calculate_danger_score(), 1), | |
| "passable": sensor.is_passable() | |
| } | |
| } | |
| def export_route_data(self, start_location: str = "R1") -> Dict[str, Any]: | |
| """Export all evacuation routes with detailed information""" | |
| routes = self.pathfinder.find_all_evacuation_routes(start_location) | |
| route_data = { | |
| "timestamp_sec": 0, | |
| "start_location": start_location, | |
| "total_routes": len(routes), | |
| "routes": [] | |
| } | |
| for idx, (exit_id, path, risk) in enumerate(routes, 1): | |
| route_info = { | |
| "route_id": f"Route {idx}", | |
| "exit": exit_id, | |
| "path": path, | |
| "metrics": { | |
| "avg_danger": round(risk['avg_danger'], 2), | |
| "max_danger": round(risk['max_danger'], 2), | |
| "max_danger_location": risk['max_danger_location'], | |
| "total_danger": round(risk['total_danger'], 2), | |
| "path_length": risk['path_length'], | |
| "has_fire": risk['has_fire'], | |
| "has_oxygen_hazard": risk['has_oxygen_hazard'], | |
| "passable": risk['passable'], | |
| "risk_factors": risk['risk_factors'] | |
| }, | |
| "nodes": [] | |
| } | |
| # Add detailed node information | |
| for room_id in path: | |
| node_data = self.export_room_data(room_id) | |
| if node_data: | |
| route_info["nodes"].append(node_data) | |
| route_data["routes"].append(route_info) | |
| return route_data | |
| def export_all_rooms(self) -> List[Dict[str, Any]]: | |
| """Export all rooms as separate documents""" | |
| all_rooms = [] | |
| for room_id in self.floor_plan.rooms: | |
| room_data = self.export_room_data(room_id) | |
| if room_data: | |
| all_rooms.append(room_data) | |
| return all_rooms | |
| def export_to_json(self, output_path: str, start_location: str = "R1"): | |
| """Export complete dataset to JSON file""" | |
| data = { | |
| "floor_plan": { | |
| "floor_name": self.floor_plan.floor_name, | |
| "total_rooms": len(self.floor_plan.rooms), | |
| "exits": self.floor_plan.exits | |
| }, | |
| "all_rooms": self.export_all_rooms(), | |
| "evacuation_routes": self.export_route_data(start_location) | |
| } | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| print(f"[OK] Exported data to {output_path}") | |
| return data | |
| class ReasoningMode(Enum): | |
| """Enumeration of reasoning modes""" | |
| STANDARD = "standard" | |
| CHAIN_OF_THOUGHT = "chain_of_thought" | |
| TREE_OF_THOUGHTS = "tree_of_thoughts" | |
| REFLEXION = "reflexion" | |
| COT_WITH_TOOLS = "cot_with_tools" | |
| class DecodingStrategy(Enum): | |
| """Enumeration of decoding strategies""" | |
| GREEDY = "greedy" | |
| SAMPLING = "sampling" | |
| BEAM_SEARCH = "beam_search" | |
| NUCLEUS = "nucleus" | |
| TEMPERATURE = "temperature" | |
| class FireEvacuationRAG: | |
| """RAG system using FAISS for retrieval and Llama for generation with advanced reasoning""" | |
| def __init__(self, model_name: str = "nvidia/Llama-3.1-Minitron-4B-Width-Base", model_dir: str = "./models", | |
| use_8bit: bool = False, use_unsloth: bool = False, load_in_4bit: bool = True, max_seq_length: int = 2048, | |
| reasoning_mode: ReasoningMode = ReasoningMode.CHAIN_OF_THOUGHT, | |
| decoding_strategy: DecodingStrategy = DecodingStrategy.NUCLEUS): | |
| self.model_name = model_name | |
| self.model_dir = model_dir | |
| self.local_model_path = os.path.join(model_dir, model_name.replace("/", "_")) | |
| self.use_8bit = use_8bit | |
| self.use_unsloth = use_unsloth | |
| self.load_in_4bit = load_in_4bit | |
| self.max_seq_length = max_seq_length | |
| self.reasoning_mode = reasoning_mode | |
| self.decoding_strategy = decoding_strategy | |
| self.tokenizer = None | |
| self.model = None | |
| self.pipe = None | |
| self.embedder = None | |
| self.index = None | |
| self.documents = [] | |
| self.metadata = [] | |
| self.reflexion_history = [] # Store reflection history for Reflexion | |
| # Create model directory if it doesn't exist | |
| os.makedirs(self.model_dir, exist_ok=True) | |
| os.makedirs(self.local_model_path, exist_ok=True) | |
| print(f"Initializing RAG system with model: {model_name}") | |
| print(f"Model will be saved to: {self.local_model_path}") | |
| print(f"Reasoning mode: {reasoning_mode.value}") | |
| print(f"Decoding strategy: {decoding_strategy.value}") | |
| if use_unsloth: | |
| print("[*] Unsloth enabled (faster loading and inference)") | |
| if load_in_4bit: | |
| print(" - 4-bit quantization enabled (very fast, low memory)") | |
| elif use_8bit: | |
| print("[!] 8-bit quantization enabled (faster loading, lower memory, slight quality trade-off)") | |
| def _check_model_files_exist(self, model_path: str) -> bool: | |
| """Check if model files actually exist (not just config.json)""" | |
| required_files = [ | |
| "config.json", | |
| "model.safetensors.index.json" # Check for sharded model index | |
| ] | |
| # Check for at least one model file | |
| model_file_patterns = [ | |
| "model.safetensors", | |
| "pytorch_model.bin", | |
| "model-*.safetensors" # Sharded models | |
| ] | |
| config_exists = os.path.exists(os.path.join(model_path, "config.json")) | |
| if not config_exists: | |
| return False | |
| # Check for model weight files | |
| for pattern in model_file_patterns: | |
| if glob.glob(os.path.join(model_path, pattern)): | |
| return True | |
| # Check for sharded model index | |
| if os.path.exists(os.path.join(model_path, "model.safetensors.index.json")): | |
| return True | |
| return False | |
| def download_model(self): | |
| """Download and load the Llama model, saving weights to local directory""" | |
| print("Downloading Llama model (this may take a while)...") | |
| print(f"Model weights will be saved to: {self.local_model_path}") | |
| # Use Unsloth if enabled (much faster loading) - PRIMARY METHOD | |
| if self.use_unsloth: | |
| try: | |
| from unsloth import FastLanguageModel | |
| from transformers import TextStreamer | |
| print("[*] Using Unsloth for fast model loading...") | |
| # Check if model name indicates it's already quantized (contains "bnb-4bit" or "bnb-8bit") | |
| is_pre_quantized = "bnb-4bit" in self.model_name.lower() or "bnb-8bit" in self.model_name.lower() | |
| # For pre-quantized models, don't set load_in_4bit (model is already quantized) | |
| # For non-quantized models, check if bitsandbytes is available | |
| if self.load_in_4bit and not is_pre_quantized: | |
| try: | |
| import bitsandbytes | |
| print("[OK] bitsandbytes available for 4-bit quantization") | |
| except ImportError: | |
| print("[!] bitsandbytes not found. 4-bit quantization requires bitsandbytes.") | |
| print(" Install with: pip install bitsandbytes") | |
| print(" Falling back to full precision...") | |
| self.load_in_4bit = False | |
| # Check if model exists locally | |
| if self._check_model_files_exist(self.local_model_path): | |
| print(f"Loading from local path: {self.local_model_path}") | |
| model_path = self.local_model_path | |
| else: | |
| print(f"Downloading model: {self.model_name}") | |
| model_path = self.model_name | |
| # ==== Load Model with Unsloth (exact pattern from user) ==== | |
| dtype = None # Auto-detect dtype | |
| # Try loading with proper error handling for bitsandbytes | |
| # The model config might have quantization settings that trigger bitsandbytes check | |
| max_retries = 2 | |
| for attempt in range(max_retries): | |
| try: | |
| # For pre-quantized models, don't specify load_in_4bit (it's already quantized) | |
| if is_pre_quantized or attempt > 0: | |
| print("[OK] Loading model without quantization parameters...") | |
| # Don't pass any quantization parameters | |
| load_kwargs = { | |
| "model_name": model_path, | |
| "max_seq_length": self.max_seq_length, | |
| "dtype": dtype, | |
| } | |
| else: | |
| # For non-quantized models, try quantization if requested | |
| load_kwargs = { | |
| "model_name": model_path, | |
| "max_seq_length": self.max_seq_length, | |
| "dtype": dtype, | |
| } | |
| if self.load_in_4bit: | |
| load_kwargs["load_in_4bit"] = True | |
| self.model, self.tokenizer = FastLanguageModel.from_pretrained(**load_kwargs) | |
| break # Success, exit retry loop | |
| except (ImportError, Exception) as quant_error: | |
| error_str = str(quant_error) | |
| is_bitsandbytes_error = ( | |
| "bitsandbytes" in error_str.lower() or | |
| "PackageNotFoundError" in error_str or | |
| "No package metadata" in error_str or | |
| "quantization_config" in error_str.lower() | |
| ) | |
| if is_bitsandbytes_error and attempt < max_retries - 1: | |
| print(f"[!] Attempt {attempt + 1}: bitsandbytes error detected.") | |
| print(f" Error: {error_str[:150]}...") | |
| print(" Retrying without quantization parameters...") | |
| continue # Retry without quantization | |
| elif is_bitsandbytes_error: | |
| print("[!] bitsandbytes required but not installed.") | |
| print(" Options:") | |
| print(" 1. Install bitsandbytes: pip install bitsandbytes") | |
| print(" 2. Use a non-quantized model") | |
| print(" 3. Set USE_UNSLOTH=False to use standard loading") | |
| raise ImportError( | |
| "bitsandbytes is required for this model. " | |
| "Install with: pip install bitsandbytes" | |
| ) from quant_error | |
| else: | |
| # Re-raise if it's a different error | |
| raise | |
| # Optimize for inference | |
| FastLanguageModel.for_inference(self.model) | |
| print("[OK] Model loaded successfully with Unsloth!") | |
| # Verify device | |
| if torch.cuda.is_available(): | |
| actual_device = next(self.model.parameters()).device | |
| print(f"[OK] Model loaded on {actual_device}!") | |
| allocated = torch.cuda.memory_allocated(0) / 1024**3 | |
| print(f"[OK] GPU Memory allocated: {allocated:.2f} GB") | |
| else: | |
| print("[OK] Model loaded on CPU!") | |
| # Set pipe to model for compatibility (we'll use model directly in generation) | |
| self.pipe = self.model # Store model reference for compatibility checks | |
| return # Exit early, Unsloth loading complete | |
| except ImportError: | |
| print("[!] Unsloth not installed. Falling back to standard loading.") | |
| print(" Install with: pip install unsloth") | |
| self.use_unsloth = False # Disable unsloth for this session | |
| except Exception as e: | |
| print(f"[!] Unsloth loading failed: {e}") | |
| print(" Falling back to standard loading...") | |
| self.use_unsloth = False | |
| # Standard loading (original code) | |
| # Check GPU availability and optimize settings | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| if device == "cuda": | |
| gpu_name = torch.cuda.get_device_name(0) | |
| gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3 | |
| print(f"[OK] GPU detected: {gpu_name}") | |
| print(f"[OK] GPU Memory: {gpu_memory:.2f} GB") | |
| # Use bfloat16 for faster loading and inference on GPU | |
| torch_dtype = torch.bfloat16 | |
| print("[OK] Using bfloat16 precision for faster loading") | |
| else: | |
| print("[!] No GPU detected, using CPU (will be slower)") | |
| torch_dtype = torch.float32 | |
| print("[OK] Using float32 precision for CPU") | |
| # Check for optimized attention implementation | |
| try: | |
| import flash_attn # noqa: F401 | |
| attn_impl = 'flash_attention_2' | |
| print("[OK] FlashAttention2 available - using for optimal performance") | |
| except ImportError: | |
| attn_impl = 'sdpa' # Scaled Dot Product Attention (built into PyTorch) | |
| print("[OK] Using SDPA (Scaled Dot Product Attention) for faster inference") | |
| # Check for 8-bit quantization support | |
| use_quantization = False | |
| if self.use_8bit and device == "cuda": | |
| try: | |
| import bitsandbytes | |
| use_quantization = True | |
| print("[OK] 8-bit quantization available - will use for faster loading") | |
| except ImportError: | |
| print("[!] 8-bit requested but bitsandbytes not installed, using full precision") | |
| print(" Install with: pip install bitsandbytes") | |
| try: | |
| # Check if model already exists locally with actual model files | |
| if self._check_model_files_exist(self.local_model_path): | |
| print(f"Found existing model at {self.local_model_path}, loading from local...") | |
| model_path = self.local_model_path | |
| load_from_local = True | |
| else: | |
| print("Downloading model from HuggingFace...") | |
| model_path = self.model_name | |
| load_from_local = False | |
| # Load tokenizer | |
| print("Loading tokenizer...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| model_path, | |
| trust_remote_code=True | |
| ) | |
| # Save tokenizer locally if downloaded (wrap in try-except to avoid crashes) | |
| if not load_from_local: | |
| try: | |
| print("Saving tokenizer to local directory...") | |
| self.tokenizer.save_pretrained(self.local_model_path) | |
| print(f"[OK] Tokenizer saved to {self.local_model_path}") | |
| except Exception as save_err: | |
| print(f"[!] Warning: Could not save tokenizer locally: {save_err}") | |
| print("Continuing without local save...") | |
| # Load model with optimizations | |
| print("Loading model with optimizations...") | |
| load_kwargs = { | |
| "trust_remote_code": True, | |
| "low_cpu_mem_usage": True, # Reduces memory usage during loading | |
| "_attn_implementation": attn_impl, # Optimized attention | |
| } | |
| # Add quantization or dtype | |
| if use_quantization: | |
| from transformers import BitsAndBytesConfig | |
| load_kwargs["quantization_config"] = BitsAndBytesConfig( | |
| load_in_8bit=True, | |
| llm_int8_threshold=6.0 | |
| ) | |
| print("[OK] Using 8-bit quantization for faster loading and lower memory") | |
| else: | |
| load_kwargs["torch_dtype"] = torch_dtype | |
| # Use device_map="auto" for GPU, manual placement for CPU | |
| if device == "cuda": | |
| try: | |
| load_kwargs["device_map"] = "auto" | |
| print("[OK] Using device_map='auto' for optimal GPU memory management") | |
| except Exception as e: | |
| print(f"[!] device_map='auto' failed, using manual GPU placement: {e}") | |
| load_kwargs.pop("device_map", None) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| model_path, | |
| **load_kwargs | |
| ) | |
| # Manual device placement if device_map wasn't used | |
| if device == "cuda" and "device_map" not in load_kwargs: | |
| self.model = self.model.cuda() | |
| print("[OK] Model moved to GPU") | |
| # Save model locally if downloaded (wrap in try-except to handle DTensor errors) | |
| if not load_from_local: | |
| try: | |
| print("Saving model weights to local directory (this may take a while)...") | |
| self.model.save_pretrained( | |
| self.local_model_path, | |
| safe_serialization=True # Use safetensors format | |
| ) | |
| print(f"[OK] Model saved to {self.local_model_path}") | |
| except ImportError as import_err: | |
| if "DTensor" in str(import_err): | |
| print(f"[!] Warning: Could not save model due to PyTorch/transformers compatibility issue: {import_err}") | |
| print("This is a known issue with certain versions. Model will work but won't be saved locally.") | |
| print("Continuing without local save...") | |
| else: | |
| raise | |
| except Exception as save_err: | |
| print(f"[!] Warning: Could not save model locally: {save_err}") | |
| print("Continuing without local save...") | |
| # Create pipeline with optimizations | |
| print("Creating pipeline...") | |
| pipeline_kwargs = { | |
| "model": self.model, | |
| "tokenizer": self.tokenizer, | |
| } | |
| if device == "cuda": | |
| pipeline_kwargs["device_map"] = "auto" | |
| self.pipe = pipeline("text-generation", **pipeline_kwargs) | |
| # Verify model device | |
| if device == "cuda": | |
| actual_device = next(self.model.parameters()).device | |
| print(f"[OK] Model loaded successfully on {actual_device}!") | |
| if torch.cuda.is_available(): | |
| allocated = torch.cuda.memory_allocated(0) / 1024**3 | |
| print(f"[OK] GPU Memory allocated: {allocated:.2f} GB") | |
| else: | |
| print("[OK] Model loaded successfully on CPU!") | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| print("Falling back to pipeline-only loading...") | |
| try: | |
| # Determine device and dtype for fallback | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.bfloat16 if device == "cuda" else torch.float32 | |
| # Try loading from local path first (only if model files actually exist) | |
| if self._check_model_files_exist(self.local_model_path): | |
| print(f"Attempting to load from local path: {self.local_model_path}") | |
| pipeline_kwargs = { | |
| "model": self.local_model_path, | |
| "trust_remote_code": True, | |
| "torch_dtype": torch_dtype, | |
| } | |
| if device == "cuda": | |
| pipeline_kwargs["device_map"] = "auto" | |
| self.pipe = pipeline("text-generation", **pipeline_kwargs) | |
| # Extract tokenizer from pipeline if available | |
| if hasattr(self.pipe, 'tokenizer'): | |
| self.tokenizer = self.pipe.tokenizer | |
| else: | |
| print(f"Downloading model: {self.model_name}") | |
| pipeline_kwargs = { | |
| "model": self.model_name, | |
| "trust_remote_code": True, | |
| "torch_dtype": torch_dtype, | |
| } | |
| if device == "cuda": | |
| pipeline_kwargs["device_map"] = "auto" | |
| self.pipe = pipeline("text-generation", **pipeline_kwargs) | |
| # Extract tokenizer from pipeline if available | |
| if hasattr(self.pipe, 'tokenizer'): | |
| self.tokenizer = self.pipe.tokenizer | |
| # Try to save after loading (but don't fail if it doesn't work) | |
| try: | |
| if hasattr(self.pipe, 'model') and hasattr(self.pipe.model, 'save_pretrained'): | |
| print("Attempting to save downloaded model to local directory...") | |
| self.pipe.model.save_pretrained(self.local_model_path, safe_serialization=True) | |
| if hasattr(self.pipe, 'tokenizer'): | |
| self.pipe.tokenizer.save_pretrained(self.local_model_path) | |
| print("[OK] Model saved successfully") | |
| except ImportError as import_err: | |
| if "DTensor" in str(import_err): | |
| print(f"[!] Warning: Could not save model due to compatibility issue. Model will work but won't be saved locally.") | |
| else: | |
| print(f"[!] Warning: Could not save model: {import_err}") | |
| except Exception as save_err: | |
| print(f"[!] Warning: Could not save model locally: {save_err}") | |
| except Exception as e2: | |
| print(f"Pipeline loading also failed: {e2}") | |
| raise | |
| def load_embedder(self, model_name: str = "all-MiniLM-L6-v2"): | |
| """Load sentence transformer for embeddings, saving to local directory""" | |
| embedder_dir = os.path.join(self.model_dir, "embedder", model_name.replace("/", "_")) | |
| os.makedirs(embedder_dir, exist_ok=True) | |
| print(f"Loading embedding model: {model_name}...") | |
| print(f"Embedder will be cached in: {embedder_dir}") | |
| # Check if embedder exists locally (check for actual model files, not just config) | |
| config_path = os.path.join(embedder_dir, "config.json") | |
| has_model_files = False | |
| if os.path.exists(config_path): | |
| # Check if model files exist | |
| model_files = glob.glob(os.path.join(embedder_dir, "*.safetensors")) + \ | |
| glob.glob(os.path.join(embedder_dir, "pytorch_model.bin")) | |
| if model_files or os.path.exists(os.path.join(embedder_dir, "model.safetensors.index.json")): | |
| has_model_files = True | |
| if has_model_files: | |
| print(f"Loading embedder from local cache: {embedder_dir}") | |
| self.embedder = SentenceTransformer(embedder_dir) | |
| else: | |
| print("Downloading embedder from HuggingFace...") | |
| self.embedder = SentenceTransformer(model_name, cache_folder=embedder_dir) | |
| # Try to save to local directory (but don't fail if it doesn't work) | |
| try: | |
| self.embedder.save(embedder_dir) | |
| print(f"[OK] Embedder saved to {embedder_dir}") | |
| except ImportError as import_err: | |
| if "DTensor" in str(import_err): | |
| print(f"[!] Warning: Could not save embedder due to PyTorch/transformers compatibility issue: {import_err}") | |
| print("This is a known issue with certain versions. Embedder will work but won't be saved locally.") | |
| print("Continuing without local save...") | |
| else: | |
| print(f"[!] Warning: Could not save embedder: {import_err}") | |
| except Exception as save_err: | |
| print(f"[!] Warning: Could not save embedder locally: {save_err}") | |
| print("Continuing without local save...") | |
| print("[OK] Embedding model loaded!") | |
| def build_faiss_index(self, documents: List[str], metadata: List[Dict] = None): | |
| """ | |
| Build FAISS index from documents | |
| Args: | |
| documents: List of text documents to index | |
| metadata: Optional metadata for each document | |
| """ | |
| if not self.embedder: | |
| self.load_embedder() | |
| print(f"Building FAISS index for {len(documents)} documents...") | |
| # Generate embeddings | |
| embeddings = self.embedder.encode(documents, show_progress_bar=True) | |
| embeddings = np.array(embeddings).astype('float32') | |
| # Get dimension | |
| dimension = embeddings.shape[1] | |
| # Create FAISS index (L2 distance) | |
| self.index = faiss.IndexFlatL2(dimension) | |
| # Add embeddings to index | |
| self.index.add(embeddings) | |
| # Store documents and metadata | |
| self.documents = documents | |
| self.metadata = metadata if metadata else [{}] * len(documents) | |
| print(f"[OK] FAISS index built with {self.index.ntotal} vectors") | |
| def build_index_from_json(self, json_data: Dict[str, Any]): | |
| """Build FAISS index from exported JSON data""" | |
| documents = [] | |
| metadata = [] | |
| # Add room documents | |
| for room in json_data.get("all_rooms", []): | |
| # Create text representation | |
| room_text = self._room_to_text(room) | |
| documents.append(room_text) | |
| metadata.append({ | |
| "type": "room", | |
| "room_id": room.get("room_id"), | |
| "data": room | |
| }) | |
| # Add route documents | |
| for route in json_data.get("evacuation_routes", {}).get("routes", []): | |
| route_text = self._route_to_text(route) | |
| documents.append(route_text) | |
| metadata.append({ | |
| "type": "route", | |
| "route_id": route.get("route_id"), | |
| "exit": route.get("exit"), | |
| "data": route | |
| }) | |
| # Build index | |
| self.build_faiss_index(documents, metadata) | |
| def _room_to_text(self, room: Dict[str, Any]) -> str: | |
| """Convert room data to searchable text""" | |
| sensor = room.get("sensor_data", {}) | |
| text_parts = [ | |
| f"Room {room.get('room_id')} ({room.get('name')})", | |
| f"Type: {room.get('room_type')}", | |
| ] | |
| if room.get("has_oxygen_cylinder"): | |
| text_parts.append("[!]️ OXYGEN CYLINDER PRESENT - EXPLOSION RISK") | |
| if sensor.get("fire_detected"): | |
| text_parts.append("[FIRE] FIRE DETECTED") | |
| text_parts.extend([ | |
| f"Temperature: {sensor.get('temperature_c')}°C", | |
| f"Smoke level: {sensor.get('smoke_level')}", | |
| f"Oxygen: {sensor.get('oxygen_pct')}%", | |
| f"Visibility: {sensor.get('visibility_pct')}%", | |
| f"Structural integrity: {sensor.get('structural_integrity_pct')}%", | |
| f"Danger score: {sensor.get('danger_score')}", | |
| f"Passable: {sensor.get('passable')}" | |
| ]) | |
| if sensor.get("carbon_monoxide_ppm", 0) > 50: | |
| text_parts.append(f"[!]️ HIGH CARBON MONOXIDE: {sensor.get('carbon_monoxide_ppm')} ppm") | |
| if sensor.get("flashover_risk", 0) > 0.5: | |
| text_parts.append(f"[!]️ FLASHOVER RISK: {sensor.get('flashover_risk')*100:.0f}%") | |
| if not sensor.get("exit_accessible", True): | |
| text_parts.append("[!]️ EXIT BLOCKED") | |
| if sensor.get("occupancy_density", 0) > 0.7: | |
| text_parts.append(f"[!]️ HIGH CROWD DENSITY: {sensor.get('occupancy_density')*100:.0f}%") | |
| return " | ".join(text_parts) | |
| def _route_to_text(self, route: Dict[str, Any]) -> str: | |
| """Convert route data to searchable text""" | |
| metrics = route.get("metrics", {}) | |
| text_parts = [ | |
| f"{route.get('route_id')} to {route.get('exit')}", | |
| f"Path: {' → '.join(route.get('path', []))}", | |
| f"Average danger: {metrics.get('avg_danger')}", | |
| f"Max danger: {metrics.get('max_danger')} at {metrics.get('max_danger_location')}", | |
| f"Passable: {metrics.get('passable')}", | |
| f"Has fire: {metrics.get('has_fire')}", | |
| f"Has oxygen hazard: {metrics.get('has_oxygen_hazard')}" | |
| ] | |
| risk_factors = metrics.get("risk_factors", []) | |
| if risk_factors: | |
| text_parts.append(f"Risks: {', '.join(risk_factors[:3])}") | |
| return " | ".join(text_parts) | |
| def search(self, query: str, k: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Search FAISS index for relevant documents | |
| Args: | |
| query: Search query | |
| k: Number of results to return | |
| Returns: | |
| List of relevant documents with metadata | |
| """ | |
| if not self.index or not self.embedder: | |
| raise ValueError("Index not built. Call build_faiss_index() first.") | |
| # Encode query | |
| query_embedding = self.embedder.encode([query]) | |
| query_embedding = np.array(query_embedding).astype('float32') | |
| # Search | |
| distances, indices = self.index.search(query_embedding, k) | |
| # Return results | |
| results = [] | |
| for i, idx in enumerate(indices[0]): | |
| if idx < len(self.documents): | |
| results.append({ | |
| "document": self.documents[idx], | |
| "metadata": self.metadata[idx], | |
| "distance": float(distances[0][i]) | |
| }) | |
| return results | |
| def _build_cot_prompt(self, query: str, context: List[str]) -> str: | |
| """Build Chain-of-Thought prompt with step-by-step reasoning""" | |
| context_text = "\n".join([f"- {ctx}" for ctx in context]) | |
| prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to answer the question concisely. | |
| CONTEXT: | |
| {context_text} | |
| QUESTION: {query} | |
| Think step by step, then provide a brief answer: | |
| REASONING: | |
| 1. Analyze available information | |
| 2. Identify key safety factors | |
| 3. Evaluate risks and prioritize | |
| 4. Conclude with recommendation | |
| ANSWER:""" | |
| return prompt | |
| def _build_tot_prompt(self, query: str, context: List[str], thought: str = "") -> str: | |
| """Build Tree-of-Thoughts prompt for exploring multiple reasoning paths""" | |
| context_text = "\n".join([f"- {ctx}" for ctx in context]) | |
| if not thought: | |
| prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to explore different reasoning approaches. | |
| CONTEXT: | |
| {context_text} | |
| QUESTION: {query} | |
| Let's explore different reasoning approaches to solve this problem: | |
| APPROACH 1 - Safety-First Analysis: | |
| """ | |
| else: | |
| prompt = f"""CONTEXT: | |
| {context_text} | |
| QUESTION: {query} | |
| CURRENT THOUGHT: {thought} | |
| Evaluate this thought: | |
| - Is this reasoning sound? | |
| - What are the strengths and weaknesses? | |
| - What alternative approaches should we consider? | |
| EVALUATION: | |
| """ | |
| return prompt | |
| def _build_reflexion_prompt(self, query: str, context: List[str], previous_answer: str = "", | |
| reflection: str = "") -> str: | |
| """Build Reflexion prompt for self-reflection and improvement""" | |
| context_text = "\n".join([f"- {ctx}" for ctx in context]) | |
| if not previous_answer: | |
| # Initial answer | |
| prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to answer the question. | |
| CONTEXT: | |
| {context_text} | |
| QUESTION: {query} | |
| Provide a clear, safety-focused answer based on the context. | |
| ANSWER:""" | |
| else: | |
| # Reflection phase | |
| prompt = f"""You are an expert fire evacuation safety advisor. Review and improve your previous answer. | |
| CONTEXT: | |
| {context_text} | |
| QUESTION: {query} | |
| PREVIOUS ANSWER: | |
| {previous_answer} | |
| REFLECTION: | |
| {reflection} | |
| Now provide an improved answer based on your reflection: | |
| IMPROVED ANSWER:""" | |
| return prompt | |
| def _build_cot_with_tools_prompt(self, query: str, context: List[str], tool_results: List[str] = None) -> str: | |
| """Build Chain-of-Thought prompt with tool integration""" | |
| context_text = "\n".join([f"- {ctx}" for ctx in context]) | |
| tool_text = "" | |
| if tool_results: | |
| tool_text = "\nTOOL RESULTS:\n" + "\n".join([f"- {result}" for result in tool_results]) | |
| prompt = f"""You are an expert fire evacuation safety advisor. Use the following context and tool results to answer the question. | |
| CONTEXT: | |
| {context_text} | |
| {tool_text} | |
| QUESTION: {query} | |
| Let's solve this step by step, using both the context and tool results: | |
| STEP 1 - Understand the question and available data: | |
| """ | |
| return prompt | |
| def _generate_with_decoding_strategy(self, prompt: str, max_length: int = 500, | |
| temperature: float = 0.7, top_p: float = 0.9, | |
| num_beams: int = 3, stop_sequences: List[str] = None) -> str: | |
| """Generate response using specified decoding strategy""" | |
| if not self.pipe and not self.model: | |
| raise ValueError("Model not loaded. Call download_model() first.") | |
| try: | |
| if self.use_unsloth and self.model: | |
| inputs = self.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=self.max_seq_length | |
| ).to(self.model.device) | |
| # Configure generation parameters based on decoding strategy | |
| gen_kwargs = { | |
| "max_new_tokens": max_length, | |
| "pad_token_id": self.tokenizer.eos_token_id, | |
| "eos_token_id": self.tokenizer.eos_token_id, | |
| } | |
| if self.decoding_strategy == DecodingStrategy.GREEDY: | |
| gen_kwargs.update({ | |
| "do_sample": False, | |
| "num_beams": 1 | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.SAMPLING: | |
| gen_kwargs.update({ | |
| "do_sample": True, | |
| "temperature": temperature, | |
| "top_k": 50 | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.BEAM_SEARCH: | |
| gen_kwargs.update({ | |
| "do_sample": False, | |
| "num_beams": num_beams, | |
| "early_stopping": True | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.NUCLEUS: | |
| gen_kwargs.update({ | |
| "do_sample": True, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "top_k": 0 | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.TEMPERATURE: | |
| gen_kwargs.update({ | |
| "do_sample": True, | |
| "temperature": temperature | |
| }) | |
| with torch.no_grad(): | |
| outputs = self.model.generate(**inputs, **gen_kwargs) | |
| response = self.tokenizer.batch_decode( | |
| outputs, | |
| skip_special_tokens=True | |
| )[0] | |
| # Extract response after prompt | |
| if prompt in response: | |
| response = response.split(prompt)[-1].strip() | |
| # Post-process to stop at verbose endings | |
| stop_phrases = [ | |
| "\n\nHowever, please note", | |
| "\n\nAdditionally,", | |
| "\n\nLet me know", | |
| "\n\nIf you have", | |
| "\n\nHere's another", | |
| "\n\nQUESTION:", | |
| "\n\nLet's break", | |
| "\n\nHave a great day", | |
| "\n\nI'm here to help" | |
| ] | |
| for phrase in stop_phrases: | |
| if phrase in response: | |
| response = response.split(phrase)[0].strip() | |
| break | |
| return response | |
| else: | |
| # Use pipeline for standard models | |
| gen_kwargs = { | |
| "max_length": len(self.tokenizer.encode(prompt)) + max_length, | |
| "num_return_sequences": 1, | |
| } | |
| if self.decoding_strategy == DecodingStrategy.GREEDY: | |
| gen_kwargs.update({ | |
| "do_sample": False | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.SAMPLING: | |
| gen_kwargs.update({ | |
| "do_sample": True, | |
| "temperature": temperature, | |
| "top_k": 50 | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.BEAM_SEARCH: | |
| gen_kwargs.update({ | |
| "do_sample": False, | |
| "num_beams": num_beams, | |
| "early_stopping": True | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.NUCLEUS: | |
| gen_kwargs.update({ | |
| "do_sample": True, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "top_k": 0 | |
| }) | |
| elif self.decoding_strategy == DecodingStrategy.TEMPERATURE: | |
| gen_kwargs.update({ | |
| "do_sample": True, | |
| "temperature": temperature | |
| }) | |
| gen_kwargs["pad_token_id"] = self.tokenizer.eos_token_id if self.tokenizer else None | |
| outputs = self.pipe(prompt, **gen_kwargs) | |
| response = outputs[0]['generated_text'] | |
| # Extract response after prompt | |
| if prompt in response: | |
| response = response.split(prompt)[-1].strip() | |
| # Post-process to stop at verbose endings | |
| stop_phrases = [ | |
| "\n\nHowever, please note", | |
| "\n\nAdditionally,", | |
| "\n\nLet me know", | |
| "\n\nIf you have", | |
| "\n\nHere's another", | |
| "\n\nQUESTION:", | |
| "\n\nLet's break", | |
| "\n\nHave a great day", | |
| "\n\nI'm here to help" | |
| ] | |
| for phrase in stop_phrases: | |
| if phrase in response: | |
| response = response.split(phrase)[0].strip() | |
| break | |
| return response | |
| except Exception as e: | |
| return f"Error generating response: {e}" | |
| def _chain_of_thought_reasoning(self, query: str, context: List[str], max_length: int = 500) -> Tuple[str, str]: | |
| """Generate response using Chain-of-Thought reasoning | |
| Returns: | |
| Tuple of (full_reasoning, final_answer) | |
| """ | |
| prompt = self._build_cot_prompt(query, context) | |
| # Use shorter max_length for CoT to prevent verbosity | |
| full_response = self._generate_with_decoding_strategy(prompt, max_length=min(max_length, 300)) | |
| # Extract reasoning steps (everything before ANSWER) | |
| reasoning = "" | |
| if "REASONING:" in full_response: | |
| reasoning_parts = full_response.split("REASONING:") | |
| if len(reasoning_parts) > 1: | |
| reasoning_section = reasoning_parts[1].split("ANSWER:")[0] if "ANSWER:" in reasoning_parts[1] else reasoning_parts[1] | |
| reasoning = reasoning_section.strip() | |
| elif "ANSWER:" in full_response: | |
| reasoning = full_response.split("ANSWER:")[0].strip() | |
| else: | |
| # Try to extract reasoning from numbered steps | |
| lines = full_response.split('\n') | |
| reasoning_lines = [] | |
| for line in lines: | |
| if line.strip().startswith(('1.', '2.', '3.', '4.', '5.', 'Step', 'STEP')): | |
| reasoning_lines.append(line.strip()) | |
| elif "ANSWER" in line.upper(): | |
| break | |
| elif reasoning_lines: # Continue collecting if we've started | |
| reasoning_lines.append(line.strip()) | |
| reasoning = '\n'.join(reasoning_lines) | |
| # Extract final answer (everything after ANSWER:) | |
| final_answer = full_response | |
| if "ANSWER:" in full_response: | |
| answer_parts = full_response.split("ANSWER:") | |
| if len(answer_parts) > 1: | |
| answer_text = answer_parts[-1].strip() | |
| # Stop at common continuation markers | |
| stop_markers = [ | |
| "\n\nHowever, please note", | |
| "\n\nAdditionally,", | |
| "\n\nLet me know", | |
| "\n\nIf you have", | |
| "\n\nHere's another", | |
| "\n\nQUESTION:", | |
| "\n\nLet's break", | |
| "\n\nHave a great day", | |
| "\n\nI'm here to help", | |
| "\n\nThese general guidelines", | |
| "\n\nIf you have any further" | |
| ] | |
| for marker in stop_markers: | |
| if marker in answer_text: | |
| answer_text = answer_text.split(marker)[0].strip() | |
| break | |
| # Also limit to first 2-3 sentences if it's still too long | |
| sentences = answer_text.split('. ') | |
| if len(sentences) > 3: | |
| answer_text = '. '.join(sentences[:3]) | |
| if not answer_text.endswith('.'): | |
| answer_text += '.' | |
| final_answer = answer_text | |
| # Clean up reasoning - remove verbose parts | |
| if reasoning: | |
| # Remove common verbose endings | |
| verbose_endings = [ | |
| "However, please note", | |
| "Additionally,", | |
| "Let me know", | |
| "If you have", | |
| "Here's another", | |
| "Have a great day", | |
| "I'm here to help" | |
| ] | |
| for ending in verbose_endings: | |
| if ending in reasoning: | |
| reasoning = reasoning.split(ending)[0].strip() | |
| break | |
| return reasoning or "Reasoning steps generated", final_answer | |
| def _tree_of_thoughts_reasoning(self, query: str, context: List[str], max_length: int = 500, | |
| max_thoughts: int = 3) -> Tuple[str, str]: | |
| """Generate response using Tree-of-Thoughts reasoning | |
| Returns: | |
| Tuple of (full_reasoning, final_answer) | |
| """ | |
| thoughts = [] | |
| reasoning_log = [] | |
| # Generate initial thoughts | |
| for i in range(max_thoughts): | |
| thought_prompt = self._build_tot_prompt(query, context, | |
| thought=f"Exploring approach {i+1}") | |
| thought = self._generate_with_decoding_strategy(thought_prompt, max_length // max_thoughts) | |
| thoughts.append(thought) | |
| reasoning_log.append(f"APPROACH {i+1}:\n{thought}\n") | |
| # Evaluate thoughts and select best | |
| evaluation_prompt = f"""Evaluate these different reasoning approaches for answering the question: | |
| QUESTION: {query} | |
| APPROACHES: | |
| """ | |
| for i, thought in enumerate(thoughts, 1): | |
| evaluation_prompt += f"\nAPPROACH {i}:\n{thought}\n" | |
| evaluation_prompt += "\nWhich approach is most sound and complete? Provide the best answer based on the evaluation.\n\nBEST ANSWER:" | |
| final_response = self._generate_with_decoding_strategy(evaluation_prompt, max_length) | |
| full_reasoning = "\n".join(reasoning_log) + f"\n\nEVALUATION:\n{final_response}" | |
| return full_reasoning, final_response | |
| def _reflexion_reasoning(self, query: str, context: List[str], max_length: int = 500, | |
| max_iterations: int = 2) -> Tuple[str, str]: | |
| """Generate response using Reflexion (self-reflection and improvement) | |
| Returns: | |
| Tuple of (full_reasoning, final_answer) | |
| """ | |
| reasoning_log = [] | |
| # Initial answer | |
| initial_prompt = self._build_reflexion_prompt(query, context) | |
| answer = self._generate_with_decoding_strategy(initial_prompt, max_length) | |
| reasoning_log.append(f"INITIAL ANSWER:\n{answer}\n") | |
| # Reflection and improvement iterations | |
| for iteration in range(max_iterations): | |
| # Generate reflection | |
| reflection_prompt = f"""Review this answer for a fire evacuation safety question: | |
| QUESTION: {query} | |
| CURRENT ANSWER: | |
| {answer} | |
| What could be improved? Consider: | |
| - Accuracy of safety information | |
| - Completeness of the response | |
| - Clarity and actionability | |
| - Missing critical safety factors | |
| REFLECTION:""" | |
| reflection = self._generate_with_decoding_strategy(reflection_prompt, max_length // 2) | |
| reasoning_log.append(f"ITERATION {iteration + 1} - REFLECTION:\n{reflection}\n") | |
| # Generate improved answer | |
| improved_prompt = self._build_reflexion_prompt(query, context, answer, reflection) | |
| improved_answer = self._generate_with_decoding_strategy(improved_prompt, max_length) | |
| reasoning_log.append(f"ITERATION {iteration + 1} - IMPROVED ANSWER:\n{improved_answer}\n") | |
| # Check if improvement is significant (simple heuristic) | |
| if len(improved_answer) > len(answer) * 0.8: # At least 80% of original length | |
| answer = improved_answer | |
| else: | |
| break # Stop if answer becomes too short | |
| self.reflexion_history.append({ | |
| "query": query, | |
| "final_answer": answer, | |
| "iterations": iteration + 1 | |
| }) | |
| full_reasoning = "\n".join(reasoning_log) | |
| return full_reasoning, answer | |
| def _cot_with_tools_reasoning(self, query: str, context: List[str], max_length: int = 500) -> Tuple[str, str]: | |
| """Generate response using Chain-of-Thought with tool integration | |
| Returns: | |
| Tuple of (full_reasoning, final_answer) | |
| """ | |
| reasoning_log = [] | |
| # Simulate tool calls (in real implementation, these would call actual tools) | |
| tool_results = [] | |
| # Tool 1: Route analysis | |
| if "route" in query.lower() or "path" in query.lower(): | |
| tool_result = "Tool: Route Analyzer - Found 3 evacuation routes with risk scores" | |
| tool_results.append(tool_result) | |
| reasoning_log.append(f"TOOL CALL: {tool_result}\n") | |
| # Tool 2: Risk calculator | |
| if "danger" in query.lower() or "risk" in query.lower(): | |
| tool_result = "Tool: Risk Calculator - Calculated danger scores for all rooms" | |
| tool_results.append(tool_result) | |
| reasoning_log.append(f"TOOL CALL: {tool_result}\n") | |
| # Tool 3: Sensor aggregator | |
| if "sensor" in query.lower() or "temperature" in query.lower() or "smoke" in query.lower(): | |
| tool_result = "Tool: Sensor Aggregator - Aggregated sensor data from all rooms" | |
| tool_results.append(tool_result) | |
| reasoning_log.append(f"TOOL CALL: {tool_result}\n") | |
| prompt = self._build_cot_with_tools_prompt(query, context, tool_results) | |
| response = self._generate_with_decoding_strategy(prompt, max_length) | |
| reasoning_log.append(f"REASONING WITH TOOLS:\n{response}\n") | |
| full_reasoning = "\n".join(reasoning_log) | |
| # Extract final answer | |
| final_answer = response | |
| if "ANSWER:" in response or "answer:" in response.lower(): | |
| parts = response.split("ANSWER:") if "ANSWER:" in response else response.split("answer:") | |
| if len(parts) > 1: | |
| final_answer = parts[-1].strip() | |
| return full_reasoning, final_answer | |
| def generate_response(self, query: str, context: List[str] = None, max_length: int = 500, | |
| return_reasoning: bool = False) -> str: | |
| """ | |
| Generate response using Llama model with context and advanced reasoning | |
| Args: | |
| query: User query | |
| context: Optional context strings (if None, will retrieve from FAISS) | |
| max_length: Maximum response length | |
| return_reasoning: If True, returns tuple of (reasoning, answer), else just answer | |
| Returns: | |
| If return_reasoning is True: Tuple of (reasoning_steps, final_answer) | |
| Otherwise: Just the final answer string | |
| """ | |
| if not self.pipe and not self.model: | |
| raise ValueError("Model not loaded. Call download_model() first.") | |
| # Retrieve context if not provided | |
| if context is None: | |
| search_results = self.search(query, k=3) | |
| context = [r["document"] for r in search_results] | |
| # Route to appropriate reasoning method based on mode | |
| if self.reasoning_mode == ReasoningMode.CHAIN_OF_THOUGHT: | |
| reasoning, answer = self._chain_of_thought_reasoning(query, context, max_length) | |
| elif self.reasoning_mode == ReasoningMode.TREE_OF_THOUGHTS: | |
| reasoning, answer = self._tree_of_thoughts_reasoning(query, context, max_length) | |
| elif self.reasoning_mode == ReasoningMode.REFLEXION: | |
| reasoning, answer = self._reflexion_reasoning(query, context, max_length) | |
| elif self.reasoning_mode == ReasoningMode.COT_WITH_TOOLS: | |
| reasoning, answer = self._cot_with_tools_reasoning(query, context, max_length) | |
| else: | |
| # Standard mode - use enhanced prompt with decoding strategy | |
| context_text = "\n".join([f"- {ctx}" for ctx in context]) | |
| prompt = f"""You are an expert fire evacuation safety advisor. Use the following context about the building's fire safety status to answer the question. | |
| CONTEXT: | |
| {context_text} | |
| QUESTION: {query} | |
| Provide a clear, safety-focused answer based on the context. If the context doesn't contain enough information, say so. | |
| ANSWER:""" | |
| answer = self._generate_with_decoding_strategy(prompt, max_length) | |
| reasoning = f"Standard reasoning mode - Direct answer generation.\n\n{answer}" | |
| if return_reasoning: | |
| return reasoning, answer | |
| return answer | |
| def set_reasoning_mode(self, mode: ReasoningMode): | |
| """Set the reasoning mode for future queries""" | |
| self.reasoning_mode = mode | |
| print(f"[OK] Reasoning mode set to: {mode.value}") | |
| def set_decoding_strategy(self, strategy: DecodingStrategy): | |
| """Set the decoding strategy for future queries""" | |
| self.decoding_strategy = strategy | |
| print(f"[OK] Decoding strategy set to: {strategy.value}") | |
| def query(self, question: str, k: int = 3, reasoning_mode: Optional[ReasoningMode] = None, | |
| show_reasoning: bool = True) -> Dict[str, Any]: | |
| """ | |
| Complete RAG query: retrieve context and generate response with advanced reasoning | |
| Args: | |
| question: User question | |
| k: Number of context documents to retrieve | |
| reasoning_mode: Optional override for reasoning mode (uses instance default if None) | |
| show_reasoning: If True, includes full reasoning steps in response | |
| Returns: | |
| Dictionary with answer, context, metadata, reasoning information, and reasoning steps | |
| """ | |
| # Retrieve relevant context | |
| search_results = self.search(question, k=k) | |
| # Generate response with reasoning | |
| context = [r["document"] for r in search_results] | |
| # Temporarily override reasoning mode if provided | |
| original_mode = self.reasoning_mode | |
| if reasoning_mode is not None: | |
| self.reasoning_mode = reasoning_mode | |
| try: | |
| reasoning, answer = self.generate_response(question, context, return_reasoning=True) | |
| finally: | |
| # Restore original mode | |
| self.reasoning_mode = original_mode | |
| result = { | |
| "question": question, | |
| "answer": answer, | |
| "context": context, | |
| "reasoning_mode": self.reasoning_mode.value, | |
| "decoding_strategy": self.decoding_strategy.value, | |
| "sources": [ | |
| { | |
| "type": r["metadata"].get("type"), | |
| "room_id": r["metadata"].get("room_id"), | |
| "route_id": r["metadata"].get("route_id"), | |
| "relevance_score": 1.0 / (1.0 + r["distance"]) | |
| } | |
| for r in search_results | |
| ] | |
| } | |
| if show_reasoning: | |
| result["reasoning_steps"] = reasoning | |
| return result | |
| def save_index(self, index_path: str, metadata_path: str): | |
| """Save FAISS index and metadata""" | |
| if self.index: | |
| faiss.write_index(self.index, index_path) | |
| with open(metadata_path, 'wb') as f: | |
| pickle.dump({ | |
| "documents": self.documents, | |
| "metadata": self.metadata | |
| }, f) | |
| print(f"[OK] Saved index to {index_path} and metadata to {metadata_path}") | |
| def load_index(self, index_path: str, metadata_path: str): | |
| """Load FAISS index and metadata""" | |
| self.index = faiss.read_index(index_path) | |
| with open(metadata_path, 'rb') as f: | |
| data = pickle.load(f) | |
| self.documents = data["documents"] | |
| self.metadata = data["metadata"] | |
| print(f"[OK] Loaded index with {self.index.ntotal} vectors") | |
| def compare_reasoning_modes(self, question: str, k: int = 3) -> Dict[str, Any]: | |
| """ | |
| Compare all reasoning modes for a given question | |
| Args: | |
| question: User question | |
| k: Number of context documents to retrieve | |
| Returns: | |
| Dictionary with answers from all reasoning modes | |
| """ | |
| # Retrieve context once | |
| search_results = self.search(question, k=k) | |
| context = [r["document"] for r in search_results] | |
| results = { | |
| "question": question, | |
| "context": context, | |
| "sources": [ | |
| { | |
| "type": r["metadata"].get("type"), | |
| "room_id": r["metadata"].get("room_id"), | |
| "route_id": r["metadata"].get("route_id"), | |
| "relevance_score": 1.0 / (1.0 + r["distance"]) | |
| } | |
| for r in search_results | |
| ], | |
| "answers": {} | |
| } | |
| # Save original mode | |
| original_mode = self.reasoning_mode | |
| # Test each reasoning mode | |
| for mode in ReasoningMode: | |
| try: | |
| self.reasoning_mode = mode | |
| reasoning, answer = self.generate_response(question, context, return_reasoning=True) | |
| results["answers"][mode.value] = { | |
| "answer": answer, | |
| "reasoning": reasoning, | |
| "length": len(answer) | |
| } | |
| except Exception as e: | |
| results["answers"][mode.value] = { | |
| "error": str(e) | |
| } | |
| # Restore original mode | |
| self.reasoning_mode = original_mode | |
| return results | |
| # === Gradio integration === | |
| _rag_instance: Optional[FireEvacuationRAG] = None | |
| def _init_rag() -> FireEvacuationRAG: | |
| """Initialize and cache the RAG system for Gradio use.""" | |
| global _rag_instance | |
| if _rag_instance is not None: | |
| return _rag_instance | |
| # Configuration (match original defaults, but without noisy prints) | |
| USE_UNSLOTH = True | |
| USE_8BIT = False | |
| UNSLOTH_MODEL = "unsloth/Meta-Llama-3.1-8B-Instruct" | |
| # Set model directory to absolute path | |
| MODEL_DIR = r"D:\github\cse499\models" | |
| # Create fire evacuation system | |
| floor_plan = create_sample_floor_plan() | |
| sensor_system = create_sample_fire_scenario(floor_plan) | |
| pathfinder = PathFinder(floor_plan, sensor_system) | |
| # Export data and build index | |
| exporter = FireEvacuationDataExporter(floor_plan, sensor_system, pathfinder) | |
| json_data = exporter.export_to_json("fire_evacuation_data.json", start_location="R1") | |
| # Initialize RAG | |
| if USE_UNSLOTH: | |
| rag = FireEvacuationRAG( | |
| model_name=UNSLOTH_MODEL, | |
| model_dir=MODEL_DIR, | |
| use_unsloth=True, | |
| load_in_4bit=False, | |
| max_seq_length=2048, | |
| reasoning_mode=ReasoningMode.CHAIN_OF_THOUGHT, | |
| decoding_strategy=DecodingStrategy.NUCLEUS, | |
| ) | |
| else: | |
| rag = FireEvacuationRAG( | |
| model_name="nvidia/Llama-3.1-Minitron-4B-Width-Base", | |
| model_dir=MODEL_DIR, | |
| use_8bit=USE_8BIT, | |
| reasoning_mode=ReasoningMode.CHAIN_OF_THOUGHT, | |
| decoding_strategy=DecodingStrategy.NUCLEUS, | |
| ) | |
| rag.download_model() | |
| rag.load_embedder() | |
| rag.build_index_from_json(json_data) | |
| _rag_instance = rag | |
| return rag | |
| def gradio_answer(question: str) -> str: | |
| """Gradio callback: take a text question, return LLM/RAG answer.""" | |
| question = (question or "").strip() | |
| if not question: | |
| return "Please enter a question about fire evacuation or building safety." | |
| rag = _init_rag() | |
| result = rag.query(question, k=3, show_reasoning=False) | |
| return result.get("answer", "No answer generated.") | |
| if __name__ == "__main__": | |
| iface = gr.Interface( | |
| fn=gradio_answer, | |
| inputs=gr.Textbox(lines=3, label="Fire Evacuation Question"), | |
| outputs=gr.Textbox(lines=6, label="LLM Recommendation"), | |
| title="Fire Evacuation RAG Advisor", | |
| description="Ask about evacuation routes, dangers, and exits in the simulated building.", | |
| ) | |
| iface.launch() | |