SmokeScan / ui /state.py
KinetoLabs's picture
Initial commit: FDAM AI Pipeline v4.0.1
88bdcff
raw
history blame
8.97 kB
"""Session state management for FDAM AI Pipeline.
Provides Pydantic models for session state and localStorage persistence.
Images are stored separately (not in localStorage due to size limits).
"""
import json
import uuid
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from schemas.input import (
ConstructionEra,
FacilityClassification,
OdorIntensity,
CharDensity,
)
# --- Form Data Models (for localStorage) ---
class ProjectFormData(BaseModel):
"""Form data for Tab 1: Project Info."""
project_name: str = ""
address: str = ""
city: str = ""
state: str = ""
zip_code: str = ""
client_name: str = ""
fire_date: str = "" # ISO format string for form compatibility
assessment_date: str = ""
facility_classification: FacilityClassification = "non-operational"
construction_era: ConstructionEra = "post-2000"
assessor_name: str = ""
assessor_credentials: str = ""
class RoomFormData(BaseModel):
"""Form data for a single room."""
id: str = Field(default_factory=lambda: f"room-{uuid.uuid4().hex[:8]}")
name: str = ""
floor: str = ""
length_ft: float = 0
width_ft: float = 0
ceiling_height_ft: float = 0
class ImageFormData(BaseModel):
"""Form data for a single image (metadata only, not bytes)."""
id: str = Field(default_factory=lambda: f"img-{uuid.uuid4().hex[:8]}")
filename: str = ""
room_id: str = ""
description: str = ""
# Image bytes stored separately, referenced by id
class ObservationsFormData(BaseModel):
"""Form data for Tab 4: Observations."""
smoke_fire_odor: bool = False
odor_intensity: OdorIntensity = "none"
visible_soot_deposits: bool = False
soot_pattern_description: str = ""
large_char_particles: bool = False
char_density_estimate: Optional[CharDensity] = None
ash_like_residue: bool = False
ash_color_texture: str = ""
surface_discoloration: bool = False
discoloration_description: str = ""
dust_loading_interference: bool = False
dust_notes: str = ""
wildfire_indicators: bool = False
wildfire_notes: str = ""
additional_notes: str = ""
class SessionState(BaseModel):
"""Complete session state for an assessment.
This model is serialized to localStorage for persistence.
Images are stored separately and referenced by ID.
"""
# Session metadata
session_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
created_at: str = Field(default_factory=lambda: datetime.now().isoformat())
updated_at: str = Field(default_factory=lambda: datetime.now().isoformat())
name: str = "" # Display name for history list
# Tab completion status
tab1_complete: bool = False
tab2_complete: bool = False
tab3_complete: bool = False
tab4_complete: bool = False
# Form data by tab
project: ProjectFormData = Field(default_factory=ProjectFormData)
rooms: list[RoomFormData] = Field(default_factory=list)
images: list[ImageFormData] = Field(default_factory=list)
observations: ObservationsFormData = Field(default_factory=ObservationsFormData)
# Results (after generation)
has_results: bool = False
results_generated_at: Optional[str] = None
def update_timestamp(self) -> None:
"""Update the updated_at timestamp."""
self.updated_at = datetime.now().isoformat()
def get_display_name(self) -> str:
"""Get a display name for the history list."""
if self.name:
return self.name
if self.project.project_name:
return self.project.project_name
return f"Assessment {self.session_id[:8]}"
def validate_tab1(self) -> tuple[bool, list[str]]:
"""Validate Tab 1 (Project Info) is complete."""
errors = []
p = self.project
if not p.project_name:
errors.append("Project name is required")
if not p.address:
errors.append("Address is required")
if not p.city:
errors.append("City is required")
if not p.state:
errors.append("State is required")
if not p.zip_code:
errors.append("ZIP code is required")
if not p.client_name:
errors.append("Client name is required")
if not p.fire_date:
errors.append("Fire date is required")
if not p.assessment_date:
errors.append("Assessment date is required")
if not p.assessor_name:
errors.append("Assessor name is required")
return len(errors) == 0, errors
def validate_tab2(self) -> tuple[bool, list[str]]:
"""Validate Tab 2 (Building/Rooms) is complete."""
errors = []
if not self.rooms:
errors.append("At least one room is required")
for room in self.rooms:
if not room.name:
errors.append(f"Room name is required")
if room.length_ft <= 0:
errors.append(f"Room '{room.name}': Length must be greater than 0")
if room.width_ft <= 0:
errors.append(f"Room '{room.name}': Width must be greater than 0")
if room.ceiling_height_ft <= 0:
errors.append(f"Room '{room.name}': Ceiling height must be greater than 0")
return len(errors) == 0, errors
def validate_tab3(self) -> tuple[bool, list[str]]:
"""Validate Tab 3 (Images) is complete."""
errors = []
if not self.images:
errors.append("At least one image is required")
for img in self.images:
if not img.room_id:
errors.append(f"Image '{img.filename}': Must be associated with a room")
return len(errors) == 0, errors
def validate_tab4(self) -> tuple[bool, list[str]]:
"""Validate Tab 4 (Observations) is complete."""
# Tab 4 has no required fields - all checkboxes default to False
return True, []
def can_generate(self) -> tuple[bool, list[str]]:
"""Check if assessment can be generated."""
all_errors = []
valid1, errors1 = self.validate_tab1()
if not valid1:
all_errors.extend(errors1)
valid2, errors2 = self.validate_tab2()
if not valid2:
all_errors.extend(errors2)
valid3, errors3 = self.validate_tab3()
if not valid3:
all_errors.extend(errors3)
valid4, errors4 = self.validate_tab4()
if not valid4:
all_errors.extend(errors4)
return len(all_errors) == 0, all_errors
class AssessmentHistory(BaseModel):
"""Collection of saved assessments for history list."""
assessments: list[SessionState] = Field(default_factory=list)
current_session_id: Optional[str] = None
def add_assessment(self, session: SessionState) -> None:
"""Add or update an assessment in history."""
session.update_timestamp()
# Remove existing if present
self.assessments = [a for a in self.assessments if a.session_id != session.session_id]
# Add to front of list
self.assessments.insert(0, session)
# Keep only last 20 assessments
self.assessments = self.assessments[:20]
def get_assessment(self, session_id: str) -> Optional[SessionState]:
"""Get an assessment by ID."""
for a in self.assessments:
if a.session_id == session_id:
return a
return None
def remove_assessment(self, session_id: str) -> None:
"""Remove an assessment from history."""
self.assessments = [a for a in self.assessments if a.session_id != session_id]
def get_history_items(self) -> list[dict]:
"""Get history items for display in dropdown."""
return [
{
"id": a.session_id,
"name": a.get_display_name(),
"updated": a.updated_at,
"has_results": a.has_results,
}
for a in self.assessments
]
# --- Gradio State Helpers ---
def create_new_session() -> SessionState:
"""Create a new empty session."""
return SessionState()
def session_to_json(session: SessionState) -> str:
"""Serialize session to JSON for localStorage."""
return session.model_dump_json()
def session_from_json(json_str: str) -> SessionState:
"""Deserialize session from JSON."""
try:
return SessionState.model_validate_json(json_str)
except Exception:
return create_new_session()
def history_to_json(history: AssessmentHistory) -> str:
"""Serialize history to JSON for localStorage."""
return history.model_dump_json()
def history_from_json(json_str: str) -> AssessmentHistory:
"""Deserialize history from JSON."""
try:
return AssessmentHistory.model_validate_json(json_str)
except Exception:
return AssessmentHistory()