SmokeScan / pipeline /generator.py
KinetoLabs's picture
Frontend simplification (4→2 tabs) + lazy imports for HF Spaces
78caafb
"""FDAM Document Generator.
Generates Cleaning Specification / Scope of Work documents
with RAG-enhanced content from the FDAM knowledge base.
"""
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from ui.state import SessionState
logger = logging.getLogger(__name__)
# Type hints only - actual import deferred to retriever property
if TYPE_CHECKING:
from rag import FDAMRetriever, ChromaVectorStore
from .calculations import FDAMCalculator, AirFiltrationResult, SampleDensityResult, RegulatoryFlags
from .dispositions import DispositionEngine, SurfaceDisposition
@dataclass
class GeneratedDocument:
"""A generated assessment document."""
markdown: str
title: str
generated_at: str
word_count: int
sections: list[str]
class DocumentGenerator:
"""Generates FDAM assessment documents with RAG enhancement."""
def __init__(
self,
calculator: Optional[FDAMCalculator] = None,
disposition_engine: Optional[DispositionEngine] = None,
retriever: Optional["FDAMRetriever"] = None,
):
"""Initialize document generator.
Args:
calculator: FDAM calculator instance
disposition_engine: Disposition engine instance
retriever: RAG retriever instance
"""
self.calculator = calculator or FDAMCalculator()
self.disposition_engine = disposition_engine or DispositionEngine()
self._retriever = retriever
@property
def retriever(self) -> "FDAMRetriever":
"""Get or create RAG retriever."""
if self._retriever is None:
# Lazy import to avoid chromadb dependency at module load
from rag import FDAMRetriever, ChromaVectorStore
try:
vs = ChromaVectorStore(persist_directory="chroma_db")
self._retriever = FDAMRetriever(vectorstore=vs)
except Exception:
self._retriever = FDAMRetriever()
return self._retriever
def generate_sow(
self,
session: SessionState,
vision_results: dict,
surface_dispositions: list[SurfaceDisposition],
calculations: dict,
) -> GeneratedDocument:
"""Generate Scope of Work document.
Args:
session: Current session state
vision_results: Vision analysis results by image ID
surface_dispositions: List of surface dispositions
calculations: Calculation results from FDAMCalculator
Returns:
GeneratedDocument with markdown content
"""
logger.debug("Starting SOW document generation")
sections = []
# Header
logger.debug("Generating section: Header")
header = self._generate_header(session)
sections.append(header)
# Project Information
project_info = self._generate_project_info(session)
sections.append(project_info)
# Scope Summary
scope_summary = self._generate_scope_summary(session, calculations)
sections.append(scope_summary)
# Room Inventory
room_inventory = self._generate_room_inventory(session)
sections.append(room_inventory)
# Vision Analysis Summary
vision_summary = self._generate_vision_summary(session, vision_results)
sections.append(vision_summary)
# Field Observations
observations = self._generate_observations(session)
sections.append(observations)
# Disposition Summary
disposition_summary = self._generate_disposition_summary(surface_dispositions)
sections.append(disposition_summary)
# Cleaning Specifications
cleaning_specs = self._generate_cleaning_specs(surface_dispositions, calculations)
sections.append(cleaning_specs)
# Air Filtration Requirements
air_filtration = self._generate_air_filtration(calculations)
sections.append(air_filtration)
# Sampling Plan
sampling_plan = self._generate_sampling_plan(calculations, session)
sections.append(sampling_plan)
# Regulatory Requirements
regulatory = self._generate_regulatory_section(calculations)
sections.append(regulatory)
# Clearance Thresholds
thresholds = self._generate_thresholds_section(calculations)
sections.append(thresholds)
# Disclaimer and Footer
footer = self._generate_footer()
sections.append(footer)
# Combine all sections
markdown = "\n\n---\n\n".join(sections)
word_count = len(markdown.split())
logger.info(f"Document generated: {word_count} words, {len(sections)} sections")
return GeneratedDocument(
markdown=markdown,
title=f"SOW - {session.room.name}",
generated_at=datetime.now().isoformat(),
word_count=word_count,
sections=[
"Header", "Room Info", "Scope Summary", "Room Details",
"Vision Analysis", "Observations", "Dispositions",
"Cleaning Specs", "Air Filtration", "Sampling Plan",
"Regulatory", "Thresholds", "Footer"
],
)
def _generate_header(self, session: SessionState) -> str:
"""Generate document header."""
return f"""# Cleaning Specification / Scope of Work
**Room:** {session.room.name}
**Date:** {datetime.now().strftime('%B %d, %Y')}
**Document Version:** FDAM v4.0.1"""
def _generate_project_info(self, session: SessionState) -> str:
"""Generate room information section."""
r = session.room
return f"""## Room Information
| Field | Value |
|-------|-------|
| **Room Name** | {r.name} |
| **Facility Classification** | {r.facility_classification or 'Not specified'} |
| **Construction Era** | {r.construction_era or 'Not specified'} |"""
def _generate_scope_summary(self, session: SessionState, calculations: dict) -> str:
"""Generate scope summary section."""
air = calculations.get("air_filtration")
sample = calculations.get("sample_density")
return f"""## Scope Summary
| Metric | Value |
|--------|-------|
| **Room** | {session.room.name} |
| **Total Floor Area** | {calculations['total_area_sf']:,.0f} SF |
| **Total Volume** | {calculations['total_volume_cf']:,.0f} CF |
| **Images Analyzed** | {len(session.images)} |
| **Air Scrubbers Required** | {air.units_required if air else 'N/A'} units |
| **Est. Tape Lifts** | {sample.tape_lifts_min}-{sample.tape_lifts_max if sample else 'N/A'} |
| **Est. Surface Wipes** | {sample.surface_wipes_min}-{sample.surface_wipes_max if sample else 'N/A'} |"""
def _generate_room_inventory(self, session: SessionState) -> str:
"""Generate room details section."""
r = session.room
area = r.length_ft * r.width_ft
volume = area * r.ceiling_height_ft
return f"""## Room Details
| Property | Value |
|----------|-------|
| **Room Name** | {r.name} |
| **Dimensions** | {r.length_ft:.0f}' × {r.width_ft:.0f}' × {r.ceiling_height_ft:.0f}' |
| **Floor Area** | {area:,.0f} SF |
| **Volume** | {volume:,.0f} CF |
| **Facility Type** | {r.facility_classification or 'Not specified'} |
| **Construction Era** | {r.construction_era or 'Not specified'} |"""
def _generate_vision_summary(self, session: SessionState, vision_results: dict) -> str:
"""Generate AI vision analysis summary."""
lines = ["## AI Vision Analysis Summary", ""]
if not vision_results:
lines.append("*No images analyzed.*")
return "\n".join(lines)
lines.append("| Image | Zone | Condition | Confidence |")
lines.append("|-------|------|-----------|------------|")
for img_meta in session.images:
result = vision_results.get(img_meta.id, {})
zone = result.get("zone", {})
condition = result.get("condition", {})
zone_class = zone.get("classification", "N/A")
zone_conf = zone.get("confidence", 0)
cond_level = condition.get("level", "N/A")
cond_conf = condition.get("confidence", 0)
lines.append(
f"| {img_meta.filename} | {zone_class} ({zone_conf:.0%}) | "
f"{cond_level} ({cond_conf:.0%}) | {(zone_conf + cond_conf) / 2:.0%} |"
)
return "\n".join(lines)
def _generate_observations(self, session: SessionState) -> str:
"""Generate field observations section."""
obs = session.observations
lines = ["## Field Observations", ""]
items = []
if obs.smoke_fire_odor:
items.append(f"- **Smoke/Fire Odor:** {obs.odor_intensity or 'Present'}")
if obs.visible_soot_deposits:
items.append(f"- **Visible Soot:** {obs.soot_pattern_description or 'Present'}")
if obs.large_char_particles:
items.append(f"- **Char Particles:** {obs.char_density_estimate or 'Present'}")
if obs.ash_like_residue:
items.append(f"- **Ash Residue:** {obs.ash_color_texture or 'Present'}")
if obs.surface_discoloration:
items.append(f"- **Discoloration:** {obs.discoloration_description or 'Present'}")
if obs.wildfire_indicators:
items.append(f"- **Wildfire Indicators:** {obs.wildfire_notes or 'Present'}")
if obs.dust_loading_interference:
items.append(f"- **Dust/Debris:** {obs.dust_notes or 'Present'}")
if obs.additional_notes:
items.append(f"- **Additional Notes:** {obs.additional_notes}")
if items:
lines.extend(items)
else:
lines.append("*No significant observations noted.*")
return "\n".join(lines)
def _generate_disposition_summary(self, dispositions: list[SurfaceDisposition]) -> str:
"""Generate disposition summary table."""
lines = ["## Disposition Summary", ""]
if not dispositions:
lines.append("*No dispositions determined.*")
return "\n".join(lines)
lines.append("| Room | Surface | Zone | Condition | Disposition |")
lines.append("|------|---------|------|-----------|-------------|")
for disp in dispositions:
lines.append(
f"| {disp.room_name} | {disp.surface_type} | {disp.zone} | "
f"{disp.condition} | {disp.disposition.upper()} |"
)
return "\n".join(lines)
def _generate_cleaning_specs(
self,
dispositions: list[SurfaceDisposition],
calculations: dict,
) -> str:
"""Generate cleaning specifications section."""
lines = ["## Cleaning Specifications", ""]
# Group by disposition
by_disposition = {}
for disp in dispositions:
key = disp.disposition
if key not in by_disposition:
by_disposition[key] = []
by_disposition[key].append(disp)
for disposition, items in by_disposition.items():
lines.append(f"### {disposition.upper().replace('-', ' ')} Surfaces")
lines.append("")
for item in items:
lines.append(f"**{item.room_name} - {item.surface_type}:**")
lines.append(f"- Method: {item.cleaning_method}")
if item.notes:
lines.append(f"- Notes: {'; '.join(item.notes)}")
lines.append("")
return "\n".join(lines)
def _generate_air_filtration(self, calculations: dict) -> str:
"""Generate air filtration requirements section."""
air: AirFiltrationResult = calculations.get("air_filtration")
if not air:
return "## Air Filtration Requirements\n\n*Calculation unavailable.*"
return f"""## Air Filtration Requirements
Per NADCA ACR 2021, Section 3.6:
| Parameter | Value |
|-----------|-------|
| **Required ACH** | {air.required_ach} air changes per hour |
| **Total Volume** | {air.total_volume_cf:,.0f} CF |
| **Unit Capacity** | {air.unit_cfm:,} CFM |
| **Units Required** | {air.units_required} |
**Calculation:** {air.calculation_notes}
**Placement Notes:**
- Distribute units evenly throughout work area
- Ensure adequate negative air pressure
- Exhaust to exterior when possible"""
def _generate_sampling_plan(self, calculations: dict, session: SessionState) -> str:
"""Generate sampling plan section."""
sample: SampleDensityResult = calculations.get("sample_density")
if not sample:
return "## Sampling Plan\n\n*Calculation unavailable.*"
lines = ["## Sampling Plan", ""]
lines.append("### Pre-Cleaning Characterization")
lines.append("")
lines.append("| Sample Type | Quantity | Notes |")
lines.append("|-------------|----------|-------|")
lines.append(
f"| Tape Lifts | {sample.tape_lifts_min}-{sample.tape_lifts_max} | "
"Per surface type, per room"
)
lines.append(
f"| Surface Wipes | {sample.surface_wipes_min}-{sample.surface_wipes_max} | "
"Metals analysis"
)
if sample.ceiling_deck_samples > 0:
lines.append(
f"| Ceiling Deck | {sample.ceiling_deck_samples} | "
"Enhanced per FDAM §4.5"
)
lines.append("")
if sample.notes:
lines.append("**Notes:**")
for note in sample.notes:
lines.append(f"- {note}")
lines.append("")
lines.append("### Post-Cleaning Verification (PRV)")
lines.append("")
lines.append("PRV sampling locations should mirror pre-cleaning characterization.")
lines.append("Minimum 50% of original sample locations for initial clearance attempt.")
return "\n".join(lines)
def _generate_regulatory_section(self, calculations: dict) -> str:
"""Generate regulatory requirements section."""
flags: RegulatoryFlags = calculations.get("regulatory_flags")
lines = ["## Regulatory Requirements", ""]
if not flags or not flags.notes:
lines.append("*No specific regulatory flags identified.*")
return "\n".join(lines)
for note in flags.notes:
lines.append(f"- {note}")
if flags.lbp_survey_required:
lines.append("")
lines.append(
"**Lead-Based Paint:** Per 29 CFR 1926.62, LBP survey must be completed "
"prior to disturbance of painted surfaces in pre-1978 construction."
)
if flags.acm_survey_required or flags.acm_survey_recommended:
lines.append("")
action = "required" if flags.acm_survey_required else "recommended"
lines.append(
f"**Asbestos:** ACM survey {action} per NESHAP regulations. "
"No disturbance of suspect materials until survey complete."
)
return "\n".join(lines)
def _generate_thresholds_section(self, calculations: dict) -> str:
"""Generate clearance thresholds section."""
thresholds = calculations.get("metals_thresholds")
particulates = calculations.get("particulate_thresholds", {})
lines = ["## Clearance Thresholds", ""]
lines.append(f"**Facility Type:** {thresholds.facility_type if thresholds else 'N/A'}")
lines.append("")
if thresholds:
lines.append("### Metals (Surface Wipe)")
lines.append("")
lines.append("| Metal | Threshold | Unit |")
lines.append("|-------|-----------|------|")
lines.append(f"| Lead (Pb) | {thresholds.lead_ug_100cm2} | µg/100cm² |")
lines.append(f"| Cadmium (Cd) | {thresholds.cadmium_ug_100cm2} | µg/100cm² |")
lines.append(f"| Arsenic (As) | {thresholds.arsenic_ug_100cm2} | µg/100cm² |")
lines.append(f"| Chromium VI | {thresholds.chromium_vi_ug_100cm2} | µg/100cm² |")
lines.append(f"| Beryllium (Be) | {thresholds.beryllium_ug_100cm2} | µg/100cm² |")
lines.append("")
lines.append(f"*Source: {thresholds.source}*")
lines.append("")
if particulates:
lines.append("### Particulates (Tape Lift)")
lines.append("")
lines.append("| Particle Type | Threshold | Unit |")
lines.append("|---------------|-----------|------|")
ash_char = particulates.get("ash_char", {})
soot = particulates.get("aciniform_soot", {})
lines.append(
f"| Ash/Char | <{ash_char.get('clearance', 150)} | "
f"{ash_char.get('unit', 'cts/cm²')} |"
)
lines.append(
f"| Aciniform Soot | <{soot.get('clearance', 500)} | "
f"{soot.get('unit', 'cts/cm²')} |"
)
lines.append("")
lines.append(f"*Source: {ash_char.get('source', 'FDAM §1.5')}*")
return "\n".join(lines)
def _generate_footer(self) -> str:
"""Generate document footer with disclaimer."""
return f"""## Disclaimer
This document was generated using AI-assisted analysis per the Fire Damage Assessment
Methodology (FDAM) v4.0.1. All recommendations should be reviewed by a qualified
industrial hygienist before implementation.
**Important Notes:**
- Visual assessments require laboratory confirmation for definitive particle identification
- Threshold values are subject to regulatory updates
- Site-specific conditions may require deviation from standard protocols
- Reclean/retest procedures apply per FDAM §4.7 if clearance is not achieved
---
*Generated by FDAM AI Pipeline v4.0.1*
*{datetime.now().strftime('%Y-%m-%d %H:%M')}*"""