SmokeScan / pipeline /dispositions.py
KinetoLabs's picture
Frontend simplification (4β†’2 tabs) + lazy imports for HF Spaces
78caafb
"""FDAM Dispositions Module.
Determines cleaning dispositions based on zone classification,
condition level, and RAG-retrieved methodology context.
"""
import logging
from dataclasses import dataclass, field
from typing import Literal, Optional, TYPE_CHECKING
# Type hints only - actual import deferred to retriever property
if TYPE_CHECKING:
from rag import FDAMRetriever, ChromaVectorStore
logger = logging.getLogger(__name__)
# Disposition matrix from FDAM Β§4.3
DISPOSITION_MATRIX = {
# (zone, condition) -> (disposition, protocol)
("any", "background"): ("no-action", "Document only"),
("far-field", "light"): ("clean", "Standard protocol"),
("far-field", "moderate"): ("clean", "Full protocol"),
("far-field", "heavy"): ("clean", "Aggressive protocol"),
("near-field", "light"): ("clean", "Full protocol"),
("near-field", "moderate"): ("clean", "Aggressive protocol, multiple passes"),
("near-field", "heavy"): ("clean", "Aggressive protocol with verification sampling"),
("burn-zone", "light"): ("clean", "Post-structural repair; full protocol"),
("burn-zone", "moderate"): ("clean", "Post-structural repair; aggressive protocol"),
("burn-zone", "heavy"): ("clean", "Post-structural repair; aggressive protocol"),
("any", "structural-damage"): ("remove-repair", "Beyond cleaning scope"),
}
# Protocol details
CLEANING_PROTOCOLS = {
"standard": {
"name": "Standard Protocol",
"steps": [
"HEPA vacuum all surfaces",
"Wet wipe with appropriate cleaner",
"Allow to dry",
"Visual inspection",
],
"passes": 1,
},
"full": {
"name": "Full Protocol",
"steps": [
"HEPA vacuum all surfaces (2 passes)",
"Wet wipe with degreaser/cleaner",
"Rinse wipe",
"Allow to dry",
"Visual inspection",
"Verification sampling if required",
],
"passes": 2,
},
"aggressive": {
"name": "Aggressive Protocol",
"steps": [
"HEPA vacuum all surfaces (minimum 3 passes)",
"Apply cleaning solution, allow dwell time",
"Agitate with appropriate brush/pad",
"Wet wipe extraction",
"Rinse wipe",
"Repeat cleaning cycle if needed",
"Verification sampling required",
],
"passes": 3,
},
}
@dataclass
class DispositionResult:
"""Result of disposition determination."""
zone: str
condition: str
disposition: Literal["no-action", "clean", "evaluate", "remove", "remove-repair"]
protocol: str
protocol_details: Optional[dict] = None
confidence: float = 1.0
rag_context: Optional[str] = None
notes: list[str] = field(default_factory=list)
@dataclass
class SurfaceDisposition:
"""Disposition for a specific surface."""
surface_type: str
room_name: str
zone: str
condition: str
disposition: str
cleaning_method: str
notes: list[str] = field(default_factory=list)
class DispositionEngine:
"""Determines cleaning dispositions using FDAM methodology and RAG."""
def __init__(self, retriever: Optional["FDAMRetriever"] = None):
"""Initialize disposition engine.
Args:
retriever: Optional RAG retriever. If None, uses default.
"""
self._retriever = retriever
@property
def retriever(self) -> "FDAMRetriever":
"""Get or create RAG retriever."""
if self._retriever is None:
# Lazy import to avoid chromadb dependency at module load
from rag import FDAMRetriever, ChromaVectorStore
try:
vs = ChromaVectorStore(persist_directory="chroma_db")
self._retriever = FDAMRetriever(vectorstore=vs)
except Exception as e:
# Fall back to in-memory if no persistent store
logger.warning(f"ChromaDB init failed, using fallback retriever: {e}")
self._retriever = FDAMRetriever()
return self._retriever
def determine_disposition(
self,
zone: Literal["burn-zone", "near-field", "far-field"],
condition: Literal["background", "light", "moderate", "heavy", "structural-damage"],
surface_type: Optional[str] = None,
use_rag: bool = True,
) -> DispositionResult:
"""Determine disposition for a zone/condition combination.
Args:
zone: Zone classification
condition: Condition level
surface_type: Optional surface type for specific guidance
use_rag: Whether to retrieve additional context from RAG
Returns:
DispositionResult with disposition and protocol
"""
notes = []
# Handle background condition (any zone)
if condition == "background":
return DispositionResult(
zone=zone,
condition=condition,
disposition="no-action",
protocol="Document only",
confidence=1.0,
notes=["No visible contamination - document and proceed"],
)
# Handle structural damage (any zone)
if condition == "structural-damage":
return DispositionResult(
zone=zone,
condition=condition,
disposition="remove-repair",
protocol="Beyond cleaning scope",
confidence=1.0,
notes=["Structural damage requires repair before cleaning assessment"],
)
# Look up in disposition matrix
key = (zone, condition)
if key in DISPOSITION_MATRIX:
disposition, protocol = DISPOSITION_MATRIX[key]
else:
# Fallback for unexpected combinations
disposition = "evaluate"
protocol = "Professional judgment required"
notes.append("Combination not in standard matrix - requires evaluation")
# Determine protocol details
protocol_details = None
if "standard" in protocol.lower():
protocol_details = CLEANING_PROTOCOLS["standard"]
elif "aggressive" in protocol.lower():
protocol_details = CLEANING_PROTOCOLS["aggressive"]
elif "full" in protocol.lower():
protocol_details = CLEANING_PROTOCOLS["full"]
# Get RAG context if enabled
rag_context = None
if use_rag:
try:
results = self.retriever.retrieve_disposition(
zone=zone,
condition=condition,
material_type=surface_type,
)
if results:
rag_context = results[0].text[:500] # First result, truncated
notes.append(f"RAG context from: {results[0].source}")
except Exception as e:
notes.append(f"RAG lookup unavailable: {e}")
return DispositionResult(
zone=zone,
condition=condition,
disposition=disposition,
protocol=protocol,
protocol_details=protocol_details,
confidence=0.9 if disposition != "evaluate" else 0.6,
rag_context=rag_context,
notes=notes,
)
def get_cleaning_method(
self,
surface_type: str,
condition: Literal["light", "moderate", "heavy"],
use_rag: bool = True,
) -> dict:
"""Get recommended cleaning method for a surface type.
Args:
surface_type: Type of surface (e.g., "drywall", "concrete")
condition: Contamination level
use_rag: Whether to retrieve from RAG
Returns:
Dictionary with cleaning method details
"""
# Default cleaning methods by surface type (from FDAM Β§5.2)
default_methods = {
"drywall": "HEPA vacuum β†’ Dry sponge OR wet wipe",
"painted-drywall": "HEPA vacuum β†’ Wet wipe with degreaser",
"concrete": "Scrubber machine + alkaline cleaner",
"concrete-floor": "Scrubber machine + alkaline cleaner",
"cmu": "HEPA vacuum β†’ Wet wipe OR power wash",
"cmu-walls": "HEPA vacuum β†’ Wet wipe OR power wash",
"metal": "Wet wipe β†’ Rinse",
"metal-doors": "Wet wipe β†’ Rinse",
"wood": "HEPA vacuum β†’ Appropriate wood cleaner",
"glass": "Glass cleaner with lint-free cloth",
"carpet": "HEPA vacuum β†’ Hot water extraction",
"hvac-ductwork": "Per NADCA ACR standards",
"ceiling-deck": "HEPA vacuum β†’ Wet wipe (enhanced sampling required)",
}
# Normalize surface type
surface_lower = surface_type.lower().replace(" ", "-")
# Find best match
method = None
for key, value in default_methods.items():
if key in surface_lower or surface_lower in key:
method = value
break
if method is None:
method = "HEPA vacuum β†’ Wet wipe (consult IH professional)"
# Enhance method based on condition
if condition == "heavy":
method = f"{method} (multiple passes, verification sampling)"
elif condition == "moderate":
method = f"{method} (consider additional pass)"
result = {
"surface_type": surface_type,
"condition": condition,
"method": method,
"source": "FDAM Β§5.2",
}
# Get RAG context for additional detail
if use_rag:
try:
rag_results = self.retriever.retrieve_cleaning_method(
surface_type=surface_type,
condition=condition,
)
if rag_results:
result["rag_context"] = rag_results[0].text[:300]
result["rag_source"] = rag_results[0].source
except Exception as e:
logger.warning(f"RAG retrieval failed for cleaning method: {e}")
return result
def process_vision_results(
self,
vision_results: dict,
room_mapping: dict,
) -> list[SurfaceDisposition]:
"""Process vision analysis results into surface dispositions.
Args:
vision_results: Dictionary of image_id -> vision result
room_mapping: Dictionary of image_id -> room info
Returns:
List of SurfaceDisposition for each analyzed surface
"""
logger.debug(f"Processing {len(vision_results)} vision results")
dispositions = []
for image_id, result in vision_results.items():
room_info = room_mapping.get(image_id, {})
room_name = room_info.get("name", "Unknown Room")
# Extract zone and condition with fallback tracking
zone_data = result.get("zone", {})
zone = zone_data.get("classification") if zone_data else None
condition_data = result.get("condition", {})
condition = condition_data.get("level") if condition_data else None
# Track if fallbacks were used (affects confidence scoring)
fallback_used = False
if zone is None:
zone = "far-field"
fallback_used = True
logger.warning(f"Image {image_id}: Using fallback zone 'far-field'")
if condition is None:
condition = "light"
fallback_used = True
logger.warning(f"Image {image_id}: Using fallback condition 'light'")
# Flag for confidence scoring
if fallback_used:
result["_fallback_used"] = True
# Get materials detected
materials = result.get("materials", [])
if not materials:
materials = [{"type": "general-surface", "confidence": 0.8}]
result["_fallback_used"] = True
for material in materials:
material_type = material.get("type", "unknown")
# Get disposition
disp_result = self.determine_disposition(
zone=zone,
condition=condition,
surface_type=material_type,
use_rag=True,
)
# Get cleaning method
if condition != "background" and disp_result.disposition == "clean":
method_info = self.get_cleaning_method(
surface_type=material_type,
condition=condition,
)
cleaning_method = method_info["method"]
else:
cleaning_method = disp_result.protocol
dispositions.append(
SurfaceDisposition(
surface_type=material_type,
room_name=room_name,
zone=zone,
condition=condition,
disposition=disp_result.disposition,
cleaning_method=cleaning_method,
notes=disp_result.notes,
)
)
logger.debug(f" {room_name}/{material_type}: {zone}/{condition} -> {disp_result.disposition}")
# Log disposition summary
disp_counts = {}
for d in dispositions:
disp_counts[d.disposition] = disp_counts.get(d.disposition, 0) + 1
logger.info(f"Dispositions generated: {dict(disp_counts)}")
return dispositions