diff --git "a/FDAM_AI_Pipeline_Technical_Spec.md" "b/FDAM_AI_Pipeline_Technical_Spec.md" new file mode 100644--- /dev/null +++ "b/FDAM_AI_Pipeline_Technical_Spec.md" @@ -0,0 +1,3206 @@ +# FDAM AI Pipeline - Technical Specification +## Fire Damage Assessment Methodology Implementation Guide +### Version 1.0 | January 2026 + +--- + +## Table of Contents + +1. [Executive Summary](#1-executive-summary) +2. [System Overview](#2-system-overview) +3. [Model Stack Configuration](#3-model-stack-configuration) +4. [RAG Knowledge Base](#4-rag-knowledge-base) +5. [Input Schema](#5-input-schema) +6. [Processing Pipeline](#6-processing-pipeline) +7. [Vision Analysis Module](#7-vision-analysis-module) +8. [Calculation Engine](#8-calculation-engine) +9. [Output Generation](#9-output-generation) +10. [Gradio UI Specification](#10-gradio-ui-specification) +11. [Confidence Framework](#11-confidence-framework) +12. [Project Structure](#12-project-structure) +13. [Implementation Notes](#13-implementation-notes) + +--- + +## 1. Executive Summary + +### Purpose +Build an AI-powered fire damage assessment system that generates professional Cleaning Specifications / Scope of Work documents aligned with FDAM v4.0.1 methodology. + +### Scope +- **MVP Focus**: Phase 1 (PRE) and Phase 2 (PRA) — pre-lab assessment +- **Primary Output**: Cleaning Specification / Scope of Work document +- **Secondary Output**: Sampling plan recommendations for lab testing + +### Key Constraints +- 100% locally-owned models (no Claude/OpenAI API calls) +- HuggingFace Spaces deployment with Nvidia A100 80GB +- 60-90 second processing time acceptable +- Static RAG knowledge base (no user-uploaded documents) + +### What This System Does NOT Do +- Process lab results (future phase) +- Make pass/fail determinations (requires lab data) +- Replace professional industrial hygienist judgment +- Perform microscopy-level particle analysis + +--- + +## 2. System Overview + +### Architecture Diagram + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ FDAM AI Pipeline Architecture │ +└─────────────────────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────────────────────┐ +│ USER INTERFACE │ +│ (Multi-Tab Gradio) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Tab 1: Project Tab 2: Building Tab 3: Images Tab 4: Observations │ +│ - Facility name - Rooms/areas - Upload 1-20 - Qualitative │ +│ - Address - Dimensions - Per-image - Odor/soot/char │ +│ - Classification - Surface types metadata - Checklist │ +│ - Construction - Manual inventory │ +└───────────────────────────────┬─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ INPUT VALIDATION │ +│ - Schema validation - Image format check - Dimension ranges │ +└───────────────────────────────┬─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ VISION ANALYSIS MODULE │ +│ (Qwen3-VL-30B-A3B-Instruct) │ +├───────────────────────────────────────────────────────────────────────��─────┤ +│ Per Image: │ +│ ├── Zone Classification (Burn/Near-Field/Far-Field) + confidence │ +│ ├── Material Identification (steel, concrete, drywall, carpet...) │ +│ ├── Condition Assessment (Background/Light/Moderate/Heavy/Structural) │ +│ ├── Combustion Particle Patterns (visual soot/char/ash deposits) │ +│ └── Bounding Box Annotations for detected elements │ +└───────────────────────────────┬─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ RAG RETRIEVAL MODULE │ +│ (Qwen3-VL-Embedding-8B + Qwen3-VL-Reranker-8B) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Query Types: │ +│ ├── Disposition lookup: "steel near-field moderate" → clean protocol │ +│ ├── Threshold retrieval: "lead non-operational" → 22 µg/100cm² │ +│ ├── Method reference: "ceiling deck cleaning" → HEPA + wet wipe │ +│ └── Image similarity: (future) match to reference damage patterns │ +└───────────────────────────────┬─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ FDAM LOGIC ENGINE │ +│ (Deterministic Rules + Calculations) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ ├── Surface Area Aggregation (by type, by disposition) │ +│ ├── Disposition Matrix Application (FDAM §4.3) │ +│ ├── ACH Calculation (Volume × 4 / (CFM × 60)) │ +│ ├── Sample Density Recommendation (FDAM §2.3) │ +│ ├── Labor Estimation (hours by task) │ +│ └── Regulatory Flag Generation (LBP/ACM by construction date) │ +└───────────────────────────────┬─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ DOCUMENT GENERATION MODULE │ +│ (Qwen3-VL-30B-A3B-Instruct) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Outputs: │ +│ ├── Cleaning Specification / SOW (primary) │ +│ │ ├── Project identification │ +│ │ ├── Scope summary with zone classifications │ +│ │ ├── Surface inventory with dispositions │ +│ │ ├── Air filtration calculations (4 ACH) │ +│ │ ├── Surface-specific procedures │ +│ │ ├── Labor estimates │ +│ │ ├── Equipment requirements │ +│ │ └── Sampling plan recommendations │ +│ ├── Annotated Images (bounding boxes overlay) │ +│ └── Confidence Report (flagged items for review) │ +└───────────────────────────────┬─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ OUTPUT DELIVERY │ +│ ├── In-UI Preview (Markdown rendered) │ +│ ├── Downloadable Markdown (.md) │ +│ ├── Downloadable PDF (.pdf via pandoc) │ +│ └── Annotated Images Gallery │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +### Technology Stack + +| Component | Technology | Version | +|-----------|------------|---------| +| Platform | HuggingFace Spaces | - | +| GPU | Nvidia A100 | 80GB | +| Vision/Generation Model | Qwen3-VL-30B-A3B-Instruct | Latest | +| Embedding Model | Qwen3-VL-Embedding-8B | Latest | +| Reranker Model | Qwen3-VL-Reranker-8B | Latest | +| Vector Store | ChromaDB | 0.4.x | +| UI Framework | Gradio | 4.x | +| PDF Generation | Pandoc | 3.x | +| Image Processing | Pillow, OpenCV | Latest | + +--- + +## 3. Model Stack Configuration + +### Memory Budget (A100 80GB) + +| Component | VRAM | Status | +|-----------|------|--------| +| Qwen3-VL-30B-A3B-Instruct | ~24GB | Always loaded | +| Qwen3-VL-Embedding-8B | ~16GB | Always loaded | +| Qwen3-VL-Reranker-8B | ~16GB | Always loaded | +| ChromaDB + KV Cache | ~5GB | Always loaded | +| **Available Headroom** | ~19GB | Context expansion | +| **Total** | ~61GB | ✅ Fits | + +### Model Loading Configuration + +```python +# models/loader.py + +import torch +from transformers import ( + Qwen3VLMoeForConditionalGeneration, # Note: Qwen3-VL uses MoE architecture + AutoProcessor, + AutoModel, + AutoTokenizer +) + +class ModelStack: + """Manages all models with concurrent loading on A100 80GB.""" + + def __init__(self, device="cuda"): + self.device = device + self.models = {} + self.processors = {} + + def load_all(self): + """Load all models into VRAM.""" + print("Loading Qwen3-VL-30B-A3B-Instruct (Vision + Generation)...") + self.models["vision"] = Qwen3VLMoeForConditionalGeneration.from_pretrained( + "Qwen/Qwen3-VL-30B-A3B-Instruct", + torch_dtype=torch.bfloat16, + device_map="auto", + trust_remote_code=True + ) + self.processors["vision"] = AutoProcessor.from_pretrained( + "Qwen/Qwen3-VL-30B-A3B-Instruct", + trust_remote_code=True + ) + + print("Loading Qwen3-VL-Embedding-8B (Multimodal RAG)...") + self.models["embedding"] = AutoModel.from_pretrained( + "Qwen/Qwen3-VL-Embedding-8B", + torch_dtype=torch.bfloat16, + device_map="auto", + trust_remote_code=True + ) + self.processors["embedding"] = AutoProcessor.from_pretrained( + "Qwen/Qwen3-VL-Embedding-8B", + trust_remote_code=True + ) + + print("Loading Qwen3-VL-Reranker-8B (Retrieval Precision)...") + self.models["reranker"] = AutoModel.from_pretrained( + "Qwen/Qwen3-VL-Reranker-8B", + torch_dtype=torch.bfloat16, + device_map="auto", + trust_remote_code=True + ) + self.processors["reranker"] = AutoProcessor.from_pretrained( + "Qwen/Qwen3-VL-Reranker-8B", + trust_remote_code=True + ) + + print("All models loaded successfully.") + return self + +# Global singleton +model_stack = ModelStack() +``` + +### Inference Configuration + +```python +# config/inference.py + +VISION_CONFIG = { + "max_new_tokens": 4096, + "temperature": 0.1, # Low for consistency + "top_p": 0.9, + "do_sample": True, + "repetition_penalty": 1.1 +} + +GENERATION_CONFIG = { + "max_new_tokens": 8192, # Long documents + "temperature": 0.2, + "top_p": 0.95, + "do_sample": True, + "repetition_penalty": 1.05 +} + +RAG_CONFIG = { + "top_k_retrieval": 10, + "top_k_rerank": 5, + "similarity_threshold": 0.7, + "chunk_size": 500, # tokens + "chunk_overlap": 50 +} +``` + +--- + +## 4. RAG Knowledge Base + +### Directory Structure + +``` +rag_knowledge/ +├── README.md # Index and navigation guide +│ +├── methodology/ +│ ├── FDAM_v4.0.1/ +│ │ ├── 01_executive_summary.md +│ │ ├── 02_standards_basis.md +│ │ ├── 03_threshold_classification.md +│ │ ├── 04_metals_thresholds.md +│ │ ├── 05_combustion_definitions.md +│ │ ├── 06_particulate_thresholds.md +│ │ ├── 07_assessment_workflow.md +│ │ ├── 08_facility_classification.md +│ │ ├── 09_zone_classification.md +│ │ ├── 10_condition_scale.md +│ │ ├── 11_disposition_matrix_nonporous.md +│ │ ├── 12_disposition_matrix_porous.md +│ │ ├── 13_material_disposition_tiers.md +│ │ ├── 14_ceiling_deck_protocol.md +│ │ ├── 15_cleaning_sequence.md +│ │ ├── 16_surface_methods.md +│ │ ├── 17_air_filtration_ach.md +│ │ ├── 18_reclean_retest.md +│ │ ├── 19_sow_template.md +│ │ ├── 20_results_template.md +│ │ ├── 21_executive_summary_template.md +│ │ ├── 22_lab_format_quantitative.md +│ │ ├── 23_lab_format_semiquantitative.md +│ │ ├── 24_unit_conversions.md +│ │ └── 25_regulatory_justification_blocks.md +│ │ +│ └── sampling/ +│ ├── sample_density_guidelines.md +│ ├── tape_lift_protocol.md +│ └── surface_wipe_protocol.md +│ +├── lab_methods/ +│ ├── EAA_Method_Guide/ +│ │ ├── 01_particle_classification.md +│ │ ├── 02_biogenic_particles.md +│ │ ├── 03_fibrous_particles.md +│ │ ├── 04_inorganic_particles.md +│ │ ├── 05_combustion_categories.md +│ │ ├── 06_soot_morphology.md +│ │ ├── 07_char_morphology.md +│ │ ├── 08_ash_morphology.md +│ │ ├── 09_wildfire_thresholds.md +│ │ ├── 10_mold_background_levels.md +│ │ ├── 11_sem_spectral_patterns.md +│ │ └── 12_concentration_ranges.md +│ │ +│ └── Hayes_Reference/ +│ └── normal_ranges_astm_d6602.md +│ +├── standards/ +│ ├── BNL_SOP_IH75190/ +│ │ ├── operational_thresholds.md +│ │ ├── nonoperational_thresholds.md +│ │ └── eating_surfaces.md +│ │ +│ ├── EPA_HUD_Lead/ +│ │ ├── public_childcare_thresholds.md +│ │ └── october_2024_update.md +│ │ +│ ├── NADCA_ACR_2021/ +│ │ ├── ach_requirements.md +│ │ └── duct_cleaning_standards.md +│ │ +│ └── IICRC_RIA_CIRI/ +│ ├── zone_definitions.md +│ └── wildfire_restoration.md +│ +├── regulatory/ +│ ├── OSHA_1910.1025_lead.md +│ ├── OSHA_1910.1018_arsenic.md +│ ├── OSHA_1910.1027_cadmium.md +│ ├── OSHA_technical_manual.md +│ └── construction_date_flags.md +│ +└── reference_images/ # For multimodal RAG (future) + ├── soot_patterns/ + ├── char_deposits/ + ├── ash_residue/ + └── material_types/ +``` + +### Chunking Implementation + +```python +# rag/chunker.py + +import os +from pathlib import Path +from typing import List, Dict +import hashlib + +class KnowledgeChunker: + """Chunks FDAM knowledge base for RAG indexing.""" + + def __init__(self, knowledge_dir: str = "rag_knowledge"): + self.knowledge_dir = Path(knowledge_dir) + self.chunks: List[Dict] = [] + + def chunk_document(self, filepath: Path, chunk_size: int = 500, overlap: int = 50) -> List[Dict]: + """ + Chunk a markdown document while preserving structure. + + Rules: + - Preserve table integrity (don't split tables) + - Keep headers with their content + - Include metadata (source, section, category) + """ + with open(filepath, 'r', encoding='utf-8') as f: + content = f.read() + + # Extract metadata from path + parts = filepath.relative_to(self.knowledge_dir).parts + category = parts[0] if len(parts) > 0 else "general" + subcategory = parts[1] if len(parts) > 1 else "" + filename = filepath.stem + + # Split by headers first + sections = self._split_by_headers(content) + + chunks = [] + for section_title, section_content in sections: + # Check for tables - keep them intact + if self._contains_table(section_content): + chunks.append({ + "id": self._generate_id(filepath, section_title), + "content": section_content, + "metadata": { + "source": str(filepath), + "category": category, + "subcategory": subcategory, + "section": section_title, + "has_table": True, + "chunk_type": "table" + } + }) + else: + # Split long sections by approximate token count + sub_chunks = self._split_by_tokens(section_content, chunk_size, overlap) + for i, sub_chunk in enumerate(sub_chunks): + chunks.append({ + "id": self._generate_id(filepath, f"{section_title}_{i}"), + "content": f"## {section_title}\n\n{sub_chunk}", + "metadata": { + "source": str(filepath), + "category": category, + "subcategory": subcategory, + "section": section_title, + "has_table": False, + "chunk_type": "text", + "chunk_index": i + } + }) + + return chunks + + def _split_by_headers(self, content: str) -> List[tuple]: + """Split content by markdown headers.""" + import re + pattern = r'^(#{1,3})\s+(.+)$' + sections = [] + current_title = "Introduction" + current_content = [] + + for line in content.split('\n'): + match = re.match(pattern, line) + if match: + if current_content: + sections.append((current_title, '\n'.join(current_content))) + current_title = match.group(2) + current_content = [] + else: + current_content.append(line) + + if current_content: + sections.append((current_title, '\n'.join(current_content))) + + return sections + + def _contains_table(self, content: str) -> bool: + """Check if content contains a markdown table.""" + lines = content.split('\n') + for line in lines: + if '|' in line and line.count('|') >= 2: + return True + return False + + def _split_by_tokens(self, content: str, chunk_size: int, overlap: int) -> List[str]: + """Split content by approximate token count (4 chars ≈ 1 token).""" + char_size = chunk_size * 4 + char_overlap = overlap * 4 + + if len(content) <= char_size: + return [content] + + chunks = [] + start = 0 + while start < len(content): + end = start + char_size + + # Try to break at paragraph boundary + if end < len(content): + newline_pos = content.rfind('\n\n', start, end) + if newline_pos > start + char_size // 2: + end = newline_pos + + chunks.append(content[start:end].strip()) + start = end - char_overlap + + return chunks + + def _generate_id(self, filepath: Path, section: str) -> str: + """Generate unique chunk ID.""" + raw = f"{filepath}_{section}" + return hashlib.md5(raw.encode()).hexdigest()[:12] + + def process_all(self) -> List[Dict]: + """Process entire knowledge base.""" + self.chunks = [] + + for md_file in self.knowledge_dir.rglob("*.md"): + if md_file.name == "README.md": + continue + file_chunks = self.chunk_document(md_file) + self.chunks.extend(file_chunks) + print(f"Chunked {md_file}: {len(file_chunks)} chunks") + + print(f"Total chunks: {len(self.chunks)}") + return self.chunks +``` + +### ChromaDB Setup + +```python +# rag/vectorstore.py + +import chromadb +from chromadb.config import Settings +from typing import List, Dict, Optional +import numpy as np + +class FDAMVectorStore: + """ChromaDB vector store for FDAM knowledge base.""" + + def __init__(self, persist_dir: str = "./chroma_db"): + self.client = chromadb.PersistentClient( + path=persist_dir, + settings=Settings(anonymized_telemetry=False) + ) + + # Collections for different retrieval modes + self.text_collection = self.client.get_or_create_collection( + name="fdam_text", + metadata={"description": "FDAM methodology text chunks"} + ) + + self.image_collection = self.client.get_or_create_collection( + name="fdam_images", + metadata={"description": "Reference damage pattern images"} + ) + + def index_chunks(self, chunks: List[Dict], embeddings: List[np.ndarray]): + """Index text chunks with embeddings.""" + self.text_collection.add( + ids=[c["id"] for c in chunks], + embeddings=[e.tolist() for e in embeddings], + documents=[c["content"] for c in chunks], + metadatas=[c["metadata"] for c in chunks] + ) + print(f"Indexed {len(chunks)} chunks to text collection.") + + def query_text( + self, + query_embedding: np.ndarray, + n_results: int = 10, + filter_category: Optional[str] = None + ) -> List[Dict]: + """Query text collection.""" + where_filter = {"category": filter_category} if filter_category else None + + results = self.text_collection.query( + query_embeddings=[query_embedding.tolist()], + n_results=n_results, + where=where_filter, + include=["documents", "metadatas", "distances"] + ) + + return self._format_results(results) + + def query_by_metadata( + self, + metadata_filter: Dict, + n_results: int = 10 + ) -> List[Dict]: + """Query by metadata only (e.g., get all threshold chunks).""" + results = self.text_collection.query( + query_embeddings=None, + n_results=n_results, + where=metadata_filter, + include=["documents", "metadatas"] + ) + return self._format_results(results) + + def _format_results(self, results: Dict) -> List[Dict]: + """Format ChromaDB results for pipeline consumption.""" + formatted = [] + for i in range(len(results["ids"][0])): + formatted.append({ + "id": results["ids"][0][i], + "content": results["documents"][0][i], + "metadata": results["metadatas"][0][i], + "distance": results.get("distances", [[]])[0][i] if results.get("distances") else None + }) + return formatted +``` + +### RAG Query Types + +```python +# rag/retriever.py + +from typing import List, Dict, Tuple +from .vectorstore import FDAMVectorStore + +class FDAMRetriever: + """Retrieval strategies for different query types.""" + + def __init__(self, vectorstore: FDAMVectorStore, model_stack): + self.vectorstore = vectorstore + self.embedding_model = model_stack.models["embedding"] + self.embedding_processor = model_stack.processors["embedding"] + self.reranker = model_stack.models["reranker"] + self.reranker_processor = model_stack.processors["reranker"] + + def retrieve_disposition( + self, + material: str, + zone: str, + condition: str + ) -> Dict: + """ + Retrieve disposition for a specific material/zone/condition combination. + + Example: retrieve_disposition("steel", "near-field", "moderate") + Returns: {"disposition": "Clean", "protocol": "Aggressive protocol, multiple passes"} + """ + query = f"disposition {material} {zone} {condition}" + + # Embed query + embedding = self._embed_text(query) + + # Retrieve candidates + candidates = self.vectorstore.query_text( + query_embedding=embedding, + n_results=10, + filter_category="methodology" + ) + + # Rerank for precision + reranked = self._rerank(query, candidates, top_k=3) + + return reranked[0] if reranked else None + + def retrieve_threshold( + self, + analyte: str, + facility_class: str, + surface_type: str = None + ) -> Dict: + """ + Retrieve threshold for specific analyte and facility classification. + + Example: retrieve_threshold("lead", "non-operational") + Returns: {"threshold": 22, "unit": "µg/100cm²", "source": "BNL SOP IH75190"} + """ + query = f"threshold {analyte} {facility_class}" + if surface_type: + query += f" {surface_type}" + + embedding = self._embed_text(query) + + candidates = self.vectorstore.query_text( + query_embedding=embedding, + n_results=10, + filter_category="standards" + ) + + reranked = self._rerank(query, candidates, top_k=3) + return reranked[0] if reranked else None + + def retrieve_cleaning_method( + self, + surface_type: str + ) -> Dict: + """ + Retrieve cleaning method for surface type. + + Example: retrieve_cleaning_method("steel roof deck") + Returns: {"method": "HEPA vac → Wet wipe → Rinse", "sequence": [...]} + """ + query = f"cleaning method {surface_type}" + + embedding = self._embed_text(query) + + candidates = self.vectorstore.query_text( + query_embedding=embedding, + n_results=10, + filter_category="methodology" + ) + + reranked = self._rerank(query, candidates, top_k=3) + return reranked[0] if reranked else None + + def retrieve_regulatory_justification( + self, + facility_class: str + ) -> str: + """ + Retrieve regulatory justification block for facility classification. + + Example: retrieve_regulatory_justification("non-operational") + Returns: Full justification text block per FDAM §3.3 + """ + query = f"regulatory justification {facility_class}" + + embedding = self._embed_text(query) + + candidates = self.vectorstore.query_text( + query_embedding=embedding, + n_results=5, + filter_category="methodology" + ) + + # Return first match content directly + if candidates: + return candidates[0]["content"] + return "" + + def retrieve_sample_density( + self, + area_sf: float + ) -> Dict: + """ + Retrieve sample density guidelines for area size. + + Example: retrieve_sample_density(15000) + Returns: {"tape_lifts": "5-10 per surface type", "surface_wipes": "5-10 per surface type"} + """ + # Determine size category + if area_sf < 5000: + size_cat = "small under 5000" + elif area_sf < 25000: + size_cat = "medium 5000 to 25000" + elif area_sf < 100000: + size_cat = "large 25000 to 100000" + else: + size_cat = "very large over 100000" + + query = f"sample density {size_cat}" + + embedding = self._embed_text(query) + + candidates = self.vectorstore.query_text( + query_embedding=embedding, + n_results=5, + filter_category="methodology" + ) + + return candidates[0] if candidates else None + + def _embed_text(self, text: str) -> np.ndarray: + """Generate embedding for text query.""" + inputs = self.embedding_processor( + text=text, + return_tensors="pt" + ).to(self.embedding_model.device) + + with torch.no_grad(): + outputs = self.embedding_model(**inputs) + embedding = outputs.last_hidden_state.mean(dim=1).cpu().numpy()[0] + + return embedding + + def _rerank( + self, + query: str, + candidates: List[Dict], + top_k: int = 5 + ) -> List[Dict]: + """Rerank candidates for precision.""" + if not candidates: + return [] + + # Score each candidate + scores = [] + for candidate in candidates: + inputs = self.reranker_processor( + text=query, + text_pair=candidate["content"], + return_tensors="pt" + ).to(self.reranker.device) + + with torch.no_grad(): + outputs = self.reranker(**inputs) + score = outputs.logits[0].item() + + scores.append(score) + + # Sort by score descending + ranked_indices = np.argsort(scores)[::-1][:top_k] + + return [candidates[i] for i in ranked_indices] +``` + +--- + +## 5. Input Schema + +### Project Input + +```python +# schemas/input.py + +from pydantic import BaseModel, Field, validator +from typing import List, Optional, Literal +from datetime import date +from enum import Enum + +class FacilityClassification(str, Enum): + OPERATIONAL = "operational" + NON_OPERATIONAL = "non-operational" + PUBLIC_CHILDCARE = "public-childcare" + +class ConstructionEra(str, Enum): + PRE_1980 = "pre-1980" + ERA_1980_2000 = "1980-2000" + POST_2000 = "post-2000" + +class ZoneType(str, Enum): + BURN = "burn" + NEAR_FIELD = "near-field" + FAR_FIELD = "far-field" + +class ConditionLevel(str, Enum): + BACKGROUND = "background" + LIGHT = "light" + MODERATE = "moderate" + HEAVY = "heavy" + STRUCTURAL_DAMAGE = "structural-damage" + +class MaterialCategory(str, Enum): + # Non-porous + STEEL = "steel" + CONCRETE = "concrete" + GLASS = "glass" + METAL = "metal" + CMU = "cmu" + # Semi-porous + DRYWALL_PAINTED = "drywall-painted" + DRYWALL_UNPAINTED = "drywall-unpainted" + WOOD_SEALED = "wood-sealed" + WOOD_UNSEALED = "wood-unsealed" + # Porous + CARPET = "carpet" + CARPET_PAD = "carpet-pad" + INSULATION_FIBERGLASS = "insulation-fiberglass" + INSULATION_OTHER = "insulation-other" + ACOUSTIC_TILE = "acoustic-tile" + UPHOLSTERY = "upholstery" + # HVAC + DUCTWORK_RIGID = "ductwork-rigid" + DUCTWORK_FLEXIBLE = "ductwork-flexible" + HVAC_INTERIOR_INSULATION = "hvac-interior-insulation" + +class Disposition(str, Enum): + NO_ACTION = "no-action" + CLEAN = "clean" + EVALUATE = "evaluate" + REMOVE = "remove" + REMOVE_REPAIR = "remove-repair" + +# --- Project Level --- + +class ProjectInfo(BaseModel): + """Project-level information.""" + project_name: str = Field(..., description="Project or facility name") + address: str = Field(..., description="Full street address") + city: str + state: str + zip_code: str + + client_name: str + client_contact: Optional[str] = None + client_email: Optional[str] = None + client_phone: Optional[str] = None + + fire_date: date = Field(..., description="Date of fire incident") + assessment_date: date = Field(..., description="Date of assessment") + + facility_classification: FacilityClassification + construction_era: ConstructionEra + + assessor_name: str = Field(..., description="Industrial hygienist name") + assessor_credentials: Optional[str] = Field(None, description="CIH, CSP, etc.") + +# --- Room/Area Level --- + +class Dimensions(BaseModel): + """Room dimensions for calculations.""" + length_ft: float = Field(..., gt=0, le=10000) + width_ft: float = Field(..., gt=0, le=10000) + ceiling_height_ft: float = Field(..., gt=0, le=500) + + @property + def area_sf(self) -> float: + return self.length_ft * self.width_ft + + @property + def volume_cf(self) -> float: + return self.area_sf * self.ceiling_height_ft + +class Surface(BaseModel): + """Individual surface within a room.""" + id: str = Field(..., description="Unique surface identifier") + material: MaterialCategory + description: str = Field(..., description="e.g., 'North wall drywall'") + area_sf: float = Field(..., gt=0) + + zone: Optional[ZoneType] = None # Can be set by AI or user + condition: Optional[ConditionLevel] = None # Can be set by AI or user + disposition: Optional[Disposition] = None # Calculated by system + + ai_detected: bool = Field(False, description="Was this detected by AI from images?") + confidence: Optional[float] = Field(None, ge=0, le=1) + +class Room(BaseModel): + """Room or area within the building.""" + id: str = Field(..., description="Unique room identifier") + name: str = Field(..., description="e.g., 'Warehouse Bay A'") + floor: Optional[str] = Field(None, description="e.g., 'Ground Floor'") + + dimensions: Dimensions + + zone_classification: Optional[ZoneType] = None # AI-determined or user override + zone_confidence: Optional[float] = Field(None, ge=0, le=1) + zone_user_override: bool = Field(False) + + surfaces: List[Surface] = Field(default_factory=list) + image_ids: List[str] = Field(default_factory=list, description="Associated image IDs") + +# --- Image Level --- + +class ImageMetadata(BaseModel): + """Metadata for uploaded image.""" + id: str + filename: str + room_id: str = Field(..., description="Associated room ID") + description: Optional[str] = Field(None, description="User description of image") + + # AI-populated fields + detected_materials: List[MaterialCategory] = Field(default_factory=list) + detected_zone: Optional[ZoneType] = None + zone_confidence: Optional[float] = None + detected_condition: Optional[ConditionLevel] = None + condition_confidence: Optional[float] = None + + # Bounding box annotations (for UI overlay) + annotations: List[dict] = Field(default_factory=list) + + analysis_complete: bool = Field(False) + +# --- Qualitative Observations --- + +class QualitativeObservations(BaseModel): + """Qualitative observation checklist per FDAM §2.3.""" + smoke_fire_odor: bool = Field(..., description="Smoke/fire odor present?") + odor_intensity: Optional[Literal["none", "faint", "moderate", "strong"]] = None + + visible_soot_deposits: bool = Field(..., description="Visible soot deposits?") + soot_pattern_description: Optional[str] = None + + large_char_particles: bool = Field(..., description="Large char particles observed?") + char_density_estimate: Optional[Literal["sparse", "moderate", "dense"]] = None + + ash_like_residue: bool = Field(..., description="Ash-like residue present?") + ash_color_texture: Optional[str] = None + + surface_discoloration: bool = Field(..., description="Surface discoloration?") + discoloration_description: Optional[str] = None + + dust_loading_interference: bool = Field(..., description="Dust loading or interference?") + dust_notes: Optional[str] = None + + wildfire_indicators: bool = Field(..., description="Burned soil/pollen/vegetation indicators?") + wildfire_notes: Optional[str] = None + + additional_notes: Optional[str] = None + +# --- Complete Assessment Input --- + +class AssessmentInput(BaseModel): + """Complete input for FDAM AI assessment.""" + project: ProjectInfo + rooms: List[Room] = Field(..., min_items=1) + images: List[ImageMetadata] = Field(default_factory=list, max_items=20) + observations: QualitativeObservations + + @validator('rooms') + def validate_room_ids(cls, rooms): + ids = [r.id for r in rooms] + if len(ids) != len(set(ids)): + raise ValueError("Room IDs must be unique") + return rooms + + @validator('images') + def validate_image_rooms(cls, images, values): + if 'rooms' not in values: + return images + room_ids = {r.id for r in values['rooms']} + for img in images: + if img.room_id not in room_ids: + raise ValueError(f"Image {img.id} references unknown room {img.room_id}") + return images +``` + +--- + +## 6. Processing Pipeline + +### Main Pipeline + +```python +# pipeline/main.py + +from typing import Dict, List, Tuple +from schemas.input import AssessmentInput, Room, Surface +from schemas.output import AssessmentOutput, CleaningSpecification +from models.loader import model_stack +from rag.retriever import FDAMRetriever +from .vision import VisionAnalyzer +from .calculations import FDAMCalculator +from .generator import DocumentGenerator + +class FDAMPipeline: + """Main processing pipeline for FDAM assessments.""" + + def __init__(self): + self.vision = VisionAnalyzer(model_stack) + self.retriever = FDAMRetriever(vectorstore, model_stack) + self.calculator = FDAMCalculator() + self.generator = DocumentGenerator(model_stack, self.retriever) + + async def process( + self, + input_data: AssessmentInput, + images: Dict[str, bytes] # image_id -> image bytes + ) -> AssessmentOutput: + """ + Process complete FDAM assessment. + + Pipeline stages: + 1. Input validation (already done by Pydantic) + 2. Vision analysis (per image) + 3. RAG context retrieval + 4. FDAM logic application + 5. Calculations + 6. Document generation + """ + + # Stage 2: Vision Analysis + print("Stage 2: Analyzing images...") + vision_results = await self._analyze_images(input_data, images) + + # Update input with vision detections + input_data = self._merge_vision_results(input_data, vision_results) + + # Stage 3: RAG Context Retrieval + print("Stage 3: Retrieving methodology context...") + rag_context = self._retrieve_context(input_data) + + # Stage 4: Apply FDAM Logic + print("Stage 4: Applying FDAM disposition logic...") + input_data = self._apply_fdam_logic(input_data, rag_context) + + # Stage 5: Calculations + print("Stage 5: Running calculations...") + calculations = self.calculator.compute_all(input_data) + + # Stage 6: Document Generation + print("Stage 6: Generating documents...") + documents = await self.generator.generate(input_data, rag_context, calculations) + + # Build output + output = AssessmentOutput( + input=input_data, + vision_results=vision_results, + calculations=calculations, + documents=documents, + confidence_report=self._build_confidence_report(input_data, vision_results) + ) + + return output + + async def _analyze_images( + self, + input_data: AssessmentInput, + images: Dict[str, bytes] + ) -> Dict[str, dict]: + """Run vision analysis on all images.""" + results = {} + + for img_meta in input_data.images: + if img_meta.id not in images: + continue + + image_bytes = images[img_meta.id] + room = next((r for r in input_data.rooms if r.id == img_meta.room_id), None) + + result = await self.vision.analyze_image( + image_bytes=image_bytes, + room_context=room, + observations=input_data.observations + ) + + results[img_meta.id] = result + + return results + + def _merge_vision_results( + self, + input_data: AssessmentInput, + vision_results: Dict[str, dict] + ) -> AssessmentInput: + """Merge vision detections into input data.""" + + for img_id, result in vision_results.items(): + # Update image metadata + for img in input_data.images: + if img.id == img_id: + img.detected_materials = result.get("materials", []) + img.detected_zone = result.get("zone") + img.zone_confidence = result.get("zone_confidence") + img.detected_condition = result.get("condition") + img.condition_confidence = result.get("condition_confidence") + img.annotations = result.get("annotations", []) + img.analysis_complete = True + break + + # Add detected surfaces to room if not already present + room_id = next((img.room_id for img in input_data.images if img.id == img_id), None) + if room_id: + room = next((r for r in input_data.rooms if r.id == room_id), None) + if room: + # Update room zone if higher confidence + if result.get("zone_confidence", 0) > (room.zone_confidence or 0): + if not room.zone_user_override: + room.zone_classification = result.get("zone") + room.zone_confidence = result.get("zone_confidence") + + # Add AI-detected surfaces + for detected_surface in result.get("detected_surfaces", []): + # Check if similar surface already exists + existing = self._find_similar_surface(room.surfaces, detected_surface) + if not existing: + room.surfaces.append(Surface( + id=f"ai_{img_id}_{detected_surface['material']}", + material=detected_surface["material"], + description=detected_surface.get("description", "AI-detected"), + area_sf=detected_surface.get("area_sf", 0), # Needs user input + zone=result.get("zone"), + condition=result.get("condition"), + ai_detected=True, + confidence=detected_surface.get("confidence") + )) + + return input_data + + def _retrieve_context(self, input_data: AssessmentInput) -> Dict: + """Retrieve all necessary RAG context.""" + + context = { + "regulatory_justification": self.retriever.retrieve_regulatory_justification( + input_data.project.facility_classification.value + ), + "thresholds": {}, + "methods": {}, + "sample_density": None + } + + # Get thresholds for facility class + for analyte in ["lead", "cadmium", "arsenic", "ash_char", "aciniform_soot"]: + context["thresholds"][analyte] = self.retriever.retrieve_threshold( + analyte, + input_data.project.facility_classification.value + ) + + # Get cleaning methods for each material type + materials = set() + for room in input_data.rooms: + for surface in room.surfaces: + materials.add(surface.material.value) + + for material in materials: + context["methods"][material] = self.retriever.retrieve_cleaning_method(material) + + # Get sample density for total area + total_sf = sum(room.dimensions.area_sf for room in input_data.rooms) + context["sample_density"] = self.retriever.retrieve_sample_density(total_sf) + + return context + + def _apply_fdam_logic( + self, + input_data: AssessmentInput, + rag_context: Dict + ) -> AssessmentInput: + """Apply FDAM disposition matrix logic.""" + + for room in input_data.rooms: + for surface in room.surfaces: + if surface.disposition is not None: + continue # Already set by user + + # Determine zone (room or surface level) + zone = surface.zone or room.zone_classification + + # Determine condition + condition = surface.condition or ConditionLevel.LIGHT # Default + + # Look up disposition + disposition_info = self.retriever.retrieve_disposition( + material=surface.material.value, + zone=zone.value if zone else "far-field", + condition=condition.value + ) + + if disposition_info: + surface.disposition = self._parse_disposition(disposition_info) + else: + # Default conservative disposition + surface.disposition = Disposition.CLEAN + + return input_data + + def _parse_disposition(self, info: Dict) -> Disposition: + """Parse disposition from RAG result.""" + content = info.get("content", "").lower() + + if "remove" in content and "repair" in content: + return Disposition.REMOVE_REPAIR + elif "remove" in content: + return Disposition.REMOVE + elif "evaluate" in content: + return Disposition.EVALUATE + elif "clean" in content: + return Disposition.CLEAN + elif "no action" in content or "document only" in content: + return Disposition.NO_ACTION + else: + return Disposition.CLEAN # Conservative default + + def _find_similar_surface(self, surfaces: List[Surface], detected: Dict) -> Surface: + """Find existing surface similar to detected one.""" + for s in surfaces: + if s.material.value == detected.get("material"): + return s + return None + + def _build_confidence_report( + self, + input_data: AssessmentInput, + vision_results: Dict[str, dict] + ) -> Dict: + """Build confidence report for flagged items.""" + + flagged_items = [] + + for room in input_data.rooms: + # Flag low confidence zone classifications + if room.zone_confidence and room.zone_confidence < 0.7: + flagged_items.append({ + "type": "zone_classification", + "room": room.name, + "confidence": room.zone_confidence, + "recommendation": "Professional review recommended for zone classification" + }) + + for surface in room.surfaces: + # Flag AI-detected surfaces with low confidence + if surface.ai_detected and surface.confidence and surface.confidence < 0.7: + flagged_items.append({ + "type": "material_detection", + "room": room.name, + "surface": surface.description, + "confidence": surface.confidence, + "recommendation": "Verify material identification on-site" + }) + + return { + "flagged_items": flagged_items, + "overall_confidence": self._calculate_overall_confidence(input_data, vision_results), + "review_required": len(flagged_items) > 0 + } + + def _calculate_overall_confidence( + self, + input_data: AssessmentInput, + vision_results: Dict[str, dict] + ) -> float: + """Calculate overall assessment confidence.""" + confidences = [] + + for room in input_data.rooms: + if room.zone_confidence: + confidences.append(room.zone_confidence) + for surface in room.surfaces: + if surface.confidence: + confidences.append(surface.confidence) + + if not confidences: + return 0.5 # No confidence data + + return sum(confidences) / len(confidences) +``` + +--- + +## 7. Vision Analysis Module + +### System Prompt + +```python +# pipeline/vision.py + +VISION_SYSTEM_PROMPT = """You are an expert industrial hygienist analyzing fire damage images for the FDAM (Fire Damage Assessment Methodology) framework. + +## Your Task +Analyze the provided image and extract structured information about fire damage, materials, and conditions. + +## Zone Classification Criteria +- **Burn Zone**: Direct fire involvement. Look for structural char, complete combustion, exposed/damaged structural elements. +- **Near-Field**: Adjacent to burn zone with heavy smoke/heat exposure. Look for heavy soot deposits, heat damage (warping, discoloration), strong visible contamination. +- **Far-Field**: Smoke migration without direct heat exposure. Look for light to moderate deposits, discoloration, no structural damage. + +## Condition Assessment Criteria +- **Background**: No visible contamination; surfaces appear normal/clean. +- **Light**: Faint discoloration; minimal visible deposits; would show faint marks on white wipe test. +- **Moderate**: Visible film or deposits; clear contamination; surface color noticeably altered. +- **Heavy**: Thick deposits; surface texture obscured; heavy coating visible. +- **Structural Damage**: Physical damage requiring repair before cleaning (charring, warping, holes, collapse). + +## Material Identification +Identify visible materials and categorize as: +- **Non-porous**: steel, concrete, glass, metal, CMU (concrete masonry unit) +- **Semi-porous**: painted drywall, sealed wood +- **Porous**: unpainted drywall, carpet, insulation, acoustic tile, upholstery +- **HVAC**: rigid ductwork, flexible ductwork + +## Combustion Particle Visual Indicators +- **Soot**: Black/dark gray coating with oily/sticky appearance; fine uniform texture; often creates "shadow" patterns +- **Char**: Black angular fragments; visible wood grain or fibrous structure; larger particles +- **Ash**: Gray/white powdery residue; crystalline appearance; often found with char + +## Output Format +Respond with a JSON object containing your analysis. Include confidence scores (0.0-1.0) for each determination. + +## Important Notes +- This is VISUAL assessment only - definitive particle identification requires laboratory analysis +- When uncertain between two classifications, note both with relative confidence +- Flag any areas that require professional on-site verification +- Note any potential access issues visible in the image +""" + +VISION_OUTPUT_SCHEMA = { + "type": "object", + "properties": { + "zone": { + "type": "object", + "properties": { + "classification": {"type": "string", "enum": ["burn", "near-field", "far-field"]}, + "confidence": {"type": "number", "minimum": 0, "maximum": 1}, + "reasoning": {"type": "string"} + }, + "required": ["classification", "confidence", "reasoning"] + }, + "condition": { + "type": "object", + "properties": { + "level": {"type": "string", "enum": ["background", "light", "moderate", "heavy", "structural-damage"]}, + "confidence": {"type": "number", "minimum": 0, "maximum": 1}, + "reasoning": {"type": "string"} + }, + "required": ["level", "confidence", "reasoning"] + }, + "materials": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": {"type": "string"}, + "category": {"type": "string", "enum": ["non-porous", "semi-porous", "porous", "hvac"]}, + "confidence": {"type": "number", "minimum": 0, "maximum": 1}, + "location_description": {"type": "string"}, + "bounding_box": { + "type": "object", + "properties": { + "x": {"type": "number"}, + "y": {"type": "number"}, + "width": {"type": "number"}, + "height": {"type": "number"} + } + } + }, + "required": ["type", "category", "confidence"] + } + }, + "combustion_indicators": { + "type": "object", + "properties": { + "soot_visible": {"type": "boolean"}, + "soot_pattern": {"type": "string"}, + "char_visible": {"type": "boolean"}, + "char_description": {"type": "string"}, + "ash_visible": {"type": "boolean"}, + "ash_description": {"type": "string"} + } + }, + "structural_concerns": { + "type": "array", + "items": {"type": "string"} + }, + "access_issues": { + "type": "array", + "items": {"type": "string"} + }, + "recommended_sampling_locations": { + "type": "array", + "items": { + "type": "object", + "properties": { + "description": {"type": "string"}, + "sample_type": {"type": "string", "enum": ["tape_lift", "surface_wipe", "both"]}, + "priority": {"type": "string", "enum": ["high", "medium", "low"]} + } + } + }, + "flags_for_review": { + "type": "array", + "items": {"type": "string"} + } + }, + "required": ["zone", "condition", "materials", "combustion_indicators"] +} +``` + +### Vision Analyzer Implementation + +```python +# pipeline/vision.py (continued) + +import json +import base64 +from PIL import Image +import io +import torch + +class VisionAnalyzer: + """Analyzes fire damage images using Qwen3-VL.""" + + def __init__(self, model_stack): + self.model = model_stack.models["vision"] + self.processor = model_stack.processors["vision"] + + async def analyze_image( + self, + image_bytes: bytes, + room_context: Room = None, + observations: QualitativeObservations = None + ) -> dict: + """ + Analyze a single fire damage image. + + Returns structured analysis per FDAM methodology. + """ + + # Build context prompt + context_parts = [] + + if room_context: + context_parts.append(f"Room: {room_context.name}") + context_parts.append(f"Dimensions: {room_context.dimensions.length_ft}' x {room_context.dimensions.width_ft}' x {room_context.dimensions.ceiling_height_ft}' ceiling") + if room_context.zone_classification: + context_parts.append(f"Pre-assigned zone: {room_context.zone_classification.value} (user-provided)") + + if observations: + obs_parts = [] + if observations.smoke_fire_odor: + obs_parts.append(f"Smoke odor: {observations.odor_intensity or 'present'}") + if observations.visible_soot_deposits: + obs_parts.append("Visible soot deposits reported") + if observations.large_char_particles: + obs_parts.append("Large char particles observed") + if observations.ash_like_residue: + obs_parts.append("Ash-like residue present") + if observations.wildfire_indicators: + obs_parts.append("Wildfire indicators noted") + if obs_parts: + context_parts.append("Field observations: " + "; ".join(obs_parts)) + + context_text = "\n".join(context_parts) if context_parts else "No additional context provided." + + # Prepare image + image = Image.open(io.BytesIO(image_bytes)) + + # Build prompt + user_prompt = f"""Analyze this fire damage image and provide a structured assessment. + +## Context +{context_text} + +## Instructions +1. Classify the zone (burn/near-field/far-field) based on visible damage +2. Assess the condition level (background/light/moderate/heavy/structural-damage) +3. Identify all visible materials and their categories +4. Note any combustion indicators (soot patterns, char, ash) +5. Flag any structural concerns or access issues +6. Recommend sampling locations for laboratory analysis + +Respond with a JSON object following the specified schema. Include confidence scores for each determination.""" + + # Process with vision model + messages = [ + {"role": "system", "content": VISION_SYSTEM_PROMPT}, + {"role": "user", "content": [ + {"type": "image", "image": image}, + {"type": "text", "text": user_prompt} + ]} + ] + + inputs = self.processor( + text=self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True), + images=[image], + return_tensors="pt" + ).to(self.model.device) + + with torch.no_grad(): + outputs = self.model.generate( + **inputs, + max_new_tokens=4096, + temperature=0.1, + top_p=0.9, + do_sample=True + ) + + response_text = self.processor.decode(outputs[0], skip_special_tokens=True) + + # Extract JSON from response + result = self._parse_json_response(response_text) + + # Add bounding box annotations for UI + result["annotations"] = self._build_annotations(result, image.size) + + return result + + def _parse_json_response(self, response: str) -> dict: + """Extract and parse JSON from model response.""" + # Find JSON block + try: + # Try to find JSON in code block + if "```json" in response: + start = response.find("```json") + 7 + end = response.find("```", start) + json_str = response[start:end].strip() + elif "```" in response: + start = response.find("```") + 3 + end = response.find("```", start) + json_str = response[start:end].strip() + else: + # Try to find raw JSON object + start = response.find("{") + end = response.rfind("}") + 1 + json_str = response[start:end] + + return json.loads(json_str) + except (json.JSONDecodeError, ValueError) as e: + # Return default structure on parse failure + return { + "zone": {"classification": "far-field", "confidence": 0.5, "reasoning": "Parse error - defaulting to far-field"}, + "condition": {"level": "moderate", "confidence": 0.5, "reasoning": "Parse error - defaulting to moderate"}, + "materials": [], + "combustion_indicators": {"soot_visible": False, "char_visible": False, "ash_visible": False}, + "flags_for_review": ["Model response parsing failed - manual review required"] + } + + def _build_annotations(self, result: dict, image_size: tuple) -> list: + """Build bounding box annotations for UI overlay.""" + annotations = [] + width, height = image_size + + for material in result.get("materials", []): + if "bounding_box" in material: + bbox = material["bounding_box"] + annotations.append({ + "type": "material", + "label": f"{material['type']} ({material['category']})", + "confidence": material.get("confidence", 0), + "bbox": { + "x": int(bbox["x"] * width), + "y": int(bbox["y"] * height), + "width": int(bbox["width"] * width), + "height": int(bbox["height"] * height) + }, + "color": self._get_category_color(material["category"]) + }) + + return annotations + + def _get_category_color(self, category: str) -> str: + """Get annotation color by material category.""" + colors = { + "non-porous": "#00FF00", # Green - cleanable + "semi-porous": "#FFFF00", # Yellow - evaluate + "porous": "#FF0000", # Red - likely remove + "hvac": "#00FFFF" # Cyan - special handling + } + return colors.get(category, "#FFFFFF") +``` + +--- + +## 8. Calculation Engine + +```python +# pipeline/calculations.py + +from typing import Dict, List +from schemas.input import AssessmentInput, Room, Surface, Disposition +import math + +class FDAMCalculator: + """FDAM-aligned calculation engine.""" + + # Production rates (SF per labor hour) + PRODUCTION_RATES = { + "hepa_vacuum": 500, # SF/hr + "wet_wipe": 200, # SF/hr + "dry_sponge": 150, # SF/hr + "power_wash": 300, # SF/hr + "scrubber": 1000, # SF/hr + "removal_drywall": 100, # SF/hr + "removal_insulation": 150, # SF/hr + "removal_carpet": 200, # SF/hr + } + + # Air scrubber specs + AIR_SCRUBBER_CFM = 2000 # Standard unit + REQUIRED_ACH = 4 # Per NADCA ACR 2021 + + def compute_all(self, input_data: AssessmentInput) -> Dict: + """Compute all FDAM calculations.""" + + return { + "surface_areas": self.compute_surface_areas(input_data), + "air_filtration": self.compute_air_filtration(input_data), + "sample_density": self.compute_sample_density(input_data), + "labor_estimate": self.compute_labor_estimate(input_data), + "equipment": self.compute_equipment_requirements(input_data), + "regulatory_flags": self.compute_regulatory_flags(input_data) + } + + def compute_surface_areas(self, input_data: AssessmentInput) -> Dict: + """Aggregate surface areas by type and disposition.""" + + by_type = {} + by_disposition = {} + by_zone = {} + by_room = {} + + for room in input_data.rooms: + room_total = 0 + + for surface in room.surfaces: + # By material type + mat_key = surface.material.value + if mat_key not in by_type: + by_type[mat_key] = 0 + by_type[mat_key] += surface.area_sf + + # By disposition + disp_key = surface.disposition.value if surface.disposition else "undetermined" + if disp_key not in by_disposition: + by_disposition[disp_key] = 0 + by_disposition[disp_key] += surface.area_sf + + # By zone + zone_key = (surface.zone or room.zone_classification or "undetermined") + if hasattr(zone_key, 'value'): + zone_key = zone_key.value + if zone_key not in by_zone: + by_zone[zone_key] = 0 + by_zone[zone_key] += surface.area_sf + + room_total += surface.area_sf + + by_room[room.name] = { + "floor_area": room.dimensions.area_sf, + "surface_area": room_total, + "volume": room.dimensions.volume_cf + } + + return { + "by_type": by_type, + "by_disposition": by_disposition, + "by_zone": by_zone, + "by_room": by_room, + "total_floor_sf": sum(r.dimensions.area_sf for r in input_data.rooms), + "total_surface_sf": sum(by_type.values()), + "total_volume_cf": sum(r.dimensions.volume_cf for r in input_data.rooms) + } + + def compute_air_filtration(self, input_data: AssessmentInput) -> Dict: + """Calculate air scrubber requirements per NADCA ACR 2021.""" + + total_volume = sum(r.dimensions.volume_cf for r in input_data.rooms) + + # Formula: Units = (Volume × ACH) / (CFM × 60) + units_required = (total_volume * self.REQUIRED_ACH) / (self.AIR_SCRUBBER_CFM * 60) + units_required = math.ceil(units_required) # Round up + + return { + "total_volume_cf": total_volume, + "required_ach": self.REQUIRED_ACH, + "unit_cfm": self.AIR_SCRUBBER_CFM, + "units_required": units_required, + "calculation": f"({total_volume:,.0f} CF × {self.REQUIRED_ACH} ACH) / ({self.AIR_SCRUBBER_CFM} CFM × 60) = {units_required} units", + "standard_reference": "NADCA ACR 2021, Section 3.6" + } + + def compute_sample_density(self, input_data: AssessmentInput) -> Dict: + """Compute sampling recommendations per FDAM §2.3.""" + + total_sf = sum(r.dimensions.area_sf for r in input_data.rooms) + + # Determine size category and recommendations + if total_sf < 5000: + tape_range = "3-5" + wipe_range = "3-5" + size_cat = "< 5,000 SF" + elif total_sf < 25000: + tape_range = "5-10" + wipe_range = "5-10" + size_cat = "5,000 - 25,000 SF" + elif total_sf < 100000: + tape_range = "10-20" + wipe_range = "10-15" + size_cat = "25,000 - 100,000 SF" + else: + tape_range = "20+" + wipe_range = "15-25" + size_cat = "> 100,000 SF" + + # Count unique surface types + surface_types = set() + has_ceiling_deck = False + + for room in input_data.rooms: + for surface in room.surfaces: + surface_types.add(surface.material.value) + if "ceiling" in surface.material.value.lower() or "deck" in surface.description.lower(): + has_ceiling_deck = True + + # Calculate recommended counts + tape_min, tape_max = map(int, tape_range.replace("+", "").split("-") if "-" in tape_range else [int(tape_range.replace("+", "")), int(tape_range.replace("+", "")) + 5]) + wipe_min, wipe_max = map(int, wipe_range.replace("+", "").split("-") if "-" in wipe_range else [int(wipe_range.replace("+", "")), int(wipe_range.replace("+", "")) + 5]) + + recommended_tape_lifts = tape_max * len(surface_types) + recommended_surface_wipes = wipe_max * len(surface_types) + + # Ceiling deck enhancement per FDAM §4.5 + ceiling_deck_note = None + if has_ceiling_deck: + ceiling_deck_samples = math.ceil(total_sf / 2500) # 1 per 2,500 SF + ceiling_deck_note = f"Ceiling deck surfaces require enhanced sampling: minimum {ceiling_deck_samples} samples (1 per 2,500 SF per FDAM §4.5)" + + return { + "total_sf": total_sf, + "size_category": size_cat, + "surface_types_count": len(surface_types), + "surface_types": list(surface_types), + "tape_lifts_per_type": tape_range, + "surface_wipes_per_type": wipe_range, + "recommended_tape_lifts": recommended_tape_lifts, + "recommended_surface_wipes": recommended_surface_wipes, + "ceiling_deck_note": ceiling_deck_note, + "control_samples_recommended": True, + "control_sample_note": "Control samples from unaffected areas recommended for baseline comparison" + } + + def compute_labor_estimate(self, input_data: AssessmentInput) -> Dict: + """Estimate labor hours by task.""" + + labor = { + "hepa_vacuum": 0, + "wet_wipe": 0, + "dry_sponge": 0, + "power_wash": 0, + "scrubber": 0, + "removal": 0, + "hvac_cleaning": 0 + } + + for room in input_data.rooms: + for surface in room.surfaces: + if surface.disposition == Disposition.NO_ACTION: + continue + + area = surface.area_sf + mat = surface.material.value + + if surface.disposition == Disposition.REMOVE or surface.disposition == Disposition.REMOVE_REPAIR: + if "insulation" in mat: + labor["removal"] += area / self.PRODUCTION_RATES["removal_insulation"] + elif "carpet" in mat: + labor["removal"] += area / self.PRODUCTION_RATES["removal_carpet"] + elif "drywall" in mat: + labor["removal"] += area / self.PRODUCTION_RATES["removal_drywall"] + else: + labor["removal"] += area / self.PRODUCTION_RATES["removal_drywall"] # Default + + elif surface.disposition == Disposition.CLEAN: + # Determine cleaning method by material + if mat in ["steel", "metal", "glass"]: + labor["hepa_vacuum"] += area / self.PRODUCTION_RATES["hepa_vacuum"] + labor["wet_wipe"] += area / self.PRODUCTION_RATES["wet_wipe"] + elif mat in ["concrete"]: + labor["scrubber"] += area / self.PRODUCTION_RATES["scrubber"] + elif mat in ["cmu"]: + labor["hepa_vacuum"] += area / self.PRODUCTION_RATES["hepa_vacuum"] + labor["power_wash"] += area / self.PRODUCTION_RATES["power_wash"] + elif "ductwork" in mat: + labor["hvac_cleaning"] += area / 100 # Estimate + else: + labor["hepa_vacuum"] += area / self.PRODUCTION_RATES["hepa_vacuum"] + labor["wet_wipe"] += area / self.PRODUCTION_RATES["wet_wipe"] + + # Round up all values + labor = {k: math.ceil(v) for k, v in labor.items()} + + total_hours = sum(labor.values()) + + return { + "by_task": labor, + "total_hours": total_hours, + "crew_days_2_person": math.ceil(total_hours / 16), + "crew_days_4_person": math.ceil(total_hours / 32), + "note": "Estimates based on standard production rates. Adjust for site conditions, access constraints, and contamination severity." + } + + def compute_equipment_requirements(self, input_data: AssessmentInput) -> Dict: + """Compute equipment requirements.""" + + air_filt = self.compute_air_filtration(input_data) + surface_areas = self.compute_surface_areas(input_data) + + # Determine if lifts needed based on ceiling heights + max_ceiling = max(r.dimensions.ceiling_height_ft for r in input_data.rooms) + lift_type = None + if max_ceiling > 12: + lift_type = "scissor_lift" if max_ceiling <= 30 else "boom_lift" + + return { + "air_scrubbers": { + "quantity": air_filt["units_required"], + "cfm_each": self.AIR_SCRUBBER_CFM, + "filter_type": "HEPA" + }, + "hepa_vacuums": { + "quantity": max(2, math.ceil(surface_areas["total_surface_sf"] / 50000)), + "note": "Minimum 2 units for efficiency" + }, + "lift_equipment": { + "required": lift_type is not None, + "type": lift_type, + "max_ceiling_height": max_ceiling + }, + "ppe_sets": { + "quantity": 8, # Standard crew assumption + "includes": ["Tyvek suit", "N95/P100 respirator", "safety glasses", "gloves"] + }, + "cleaning_supplies": { + "alkaline_detergent_gallons": math.ceil(surface_areas["total_surface_sf"] / 500), + "degreaser_gallons": math.ceil(surface_areas["total_surface_sf"] / 2000), + "dry_sponges": math.ceil(surface_areas["total_surface_sf"] / 1000), + "microfiber_cloths": math.ceil(surface_areas["total_surface_sf"] / 200) + } + } + + def compute_regulatory_flags(self, input_data: AssessmentInput) -> Dict: + """Generate regulatory flags based on project characteristics.""" + + flags = [] + + # Construction era flags + era = input_data.project.construction_era.value + if era == "pre-1980": + flags.append({ + "type": "lbp", + "severity": "high", + "message": "Pre-1980 construction: Lead-based paint (LBP) presumed present. EPA RRP Rule compliance required.", + "reference": "40 CFR 745" + }) + flags.append({ + "type": "acm", + "severity": "high", + "message": "Pre-1980 construction: Asbestos-containing materials (ACM) presumed present. Survey recommended before disturbance.", + "reference": "40 CFR 61 Subpart M" + }) + elif era == "1980-2000": + flags.append({ + "type": "lbp", + "severity": "medium", + "message": "1980-2000 construction: LBP possible in some applications. Testing recommended for painted surfaces.", + "reference": "40 CFR 745" + }) + flags.append({ + "type": "acm", + "severity": "medium", + "message": "1980-2000 construction: ACM possible in specific applications (floor tile, roofing, insulation). Survey recommended.", + "reference": "40 CFR 61 Subpart M" + }) + + # Facility classification flags + if input_data.project.facility_classification.value == "public-childcare": + flags.append({ + "type": "childcare", + "severity": "high", + "message": "Public/Childcare facility: Enhanced lead clearance thresholds apply (0.54 µg/100cm² floors, 4.3 µg/100cm² window sills).", + "reference": "EPA/HUD October 2024" + }) + + # Observation-based flags + if input_data.observations.wildfire_indicators: + flags.append({ + "type": "wildfire", + "severity": "medium", + "message": "Wildfire indicators noted: Apply IICRC/RIA/CIRI Technical Guide zone framework. Consider outdoor air quality impacts.", + "reference": "IICRC/RIA/CIRI Technical Guide December 2025" + }) + + return { + "flags": flags, + "high_severity_count": len([f for f in flags if f["severity"] == "high"]), + "medium_severity_count": len([f for f in flags if f["severity"] == "medium"]) + } +``` + +--- + +## 9. Output Generation + +### Document Templates + +```python +# pipeline/generator.py + +from typing import Dict +from datetime import datetime + +class DocumentGenerator: + """Generates FDAM-aligned documents.""" + + def __init__(self, model_stack, retriever): + self.model = model_stack.models["vision"] # Same model for generation + self.processor = model_stack.processors["vision"] + self.retriever = retriever + + async def generate( + self, + input_data: AssessmentInput, + rag_context: Dict, + calculations: Dict + ) -> Dict: + """Generate all output documents.""" + + return { + "cleaning_specification": await self._generate_sow(input_data, rag_context, calculations), + "sampling_plan": self._generate_sampling_plan(input_data, calculations), + "confidence_summary": self._generate_confidence_summary(input_data) + } + + async def _generate_sow( + self, + input_data: AssessmentInput, + rag_context: Dict, + calculations: Dict + ) -> str: + """Generate Cleaning Specification / Scope of Work.""" + + # Build document sections + sections = [] + + # Header + sections.append(self._build_header(input_data)) + + # Scope Summary + sections.append(self._build_scope_summary(input_data, calculations)) + + # Zone Summary Table + sections.append(self._build_zone_summary_table(input_data, calculations)) + + # Surface Inventory + sections.append(self._build_surface_inventory(input_data)) + + # Regulatory Framework + sections.append(self._build_regulatory_framework(input_data, rag_context)) + + # Work Area Preparation + sections.append(self._build_work_preparation(calculations)) + + # Surface-Specific Procedures + sections.append(self._build_cleaning_procedures(input_data, rag_context)) + + # Removal Scope + sections.append(self._build_removal_scope(input_data, calculations)) + + # Labor Estimate + sections.append(self._build_labor_estimate(calculations)) + + # Equipment Requirements + sections.append(self._build_equipment_requirements(calculations)) + + # Sampling Plan Summary + sections.append(self._build_sampling_section(input_data, calculations)) + + # Acceptance Criteria + sections.append(self._build_acceptance_criteria(input_data, rag_context)) + + # Regulatory Flags + if calculations["regulatory_flags"]["flags"]: + sections.append(self._build_regulatory_flags(calculations)) + + # Confidence Notes + sections.append(self._build_confidence_notes(input_data)) + + return "\n\n---\n\n".join(sections) + + def _build_header(self, input_data: AssessmentInput) -> str: + """Build document header.""" + return f"""# Cleaning Specification / Scope of Work + +## Fire Damage Restoration + +**Project:** {input_data.project.project_name} +**Address:** {input_data.project.address}, {input_data.project.city}, {input_data.project.state} {input_data.project.zip_code} + +**Client:** {input_data.project.client_name} +**Fire Date:** {input_data.project.fire_date.strftime('%B %d, %Y')} +**Assessment Date:** {input_data.project.assessment_date.strftime('%B %d, %Y')} + +**Prepared By:** {input_data.project.assessor_name}{f', {input_data.project.assessor_credentials}' if input_data.project.assessor_credentials else ''} +**Generated:** {datetime.now().strftime('%B %d, %Y at %I:%M %p')} + +**Methodology:** FDAM v4.0.1 (Fire Damage Assessment Methodology) +**Facility Classification:** {input_data.project.facility_classification.value.replace('-', '/').title()} +""" + + def _build_scope_summary(self, input_data: AssessmentInput, calculations: Dict) -> str: + """Build scope summary section.""" + areas = calculations["surface_areas"] + + summary = f"""## Scope Summary + +{input_data.project.project_name} sustained fire damage on {input_data.project.fire_date.strftime('%B %d, %Y')}. Based on visual assessment and field observations, the following scope has been developed for fire residue restoration. + +### Project Metrics + +| Metric | Value | +|--------|-------| +| Total Floor Area | {areas['total_floor_sf']:,.0f} SF | +| Total Surface Area | {areas['total_surface_sf']:,.0f} SF | +| Total Volume | {areas['total_volume_cf']:,.0f} CF | +| Rooms/Areas Assessed | {len(input_data.rooms)} | +| Images Analyzed | {len(input_data.images)} | + +### Disposition Summary + +| Disposition | Area (SF) | +|-------------|-----------| +""" + for disp, area in areas["by_disposition"].items(): + summary += f"| {disp.replace('-', ' ').title()} | {area:,.0f} |\n" + + return summary + + def _build_zone_summary_table(self, input_data: AssessmentInput, calculations: Dict) -> str: + """Build zone summary table.""" + + table = """## Zone Classification Summary + +| Room/Area | Zone | Floor SF | Condition | Disposition Summary | +|-----------|------|----------|-----------|---------------------| +""" + for room in input_data.rooms: + zone = room.zone_classification.value if room.zone_classification else "TBD" + + # Summarize dispositions for room + disp_counts = {} + for s in room.surfaces: + d = s.disposition.value if s.disposition else "undetermined" + disp_counts[d] = disp_counts.get(d, 0) + 1 + disp_summary = ", ".join([f"{v} {k}" for k, v in disp_counts.items()]) + + # Get predominant condition + conditions = [s.condition.value for s in room.surfaces if s.condition] + condition = max(set(conditions), key=conditions.count) if conditions else "TBD" + + table += f"| {room.name} | {zone.title()} | {room.dimensions.area_sf:,.0f} | {condition.title()} | {disp_summary} |\n" + + return table + + def _build_surface_inventory(self, input_data: AssessmentInput) -> str: + """Build surface inventory section.""" + + inventory = """## Surface Inventory + +### By Material Type + +| Material | Category | Total SF | Disposition | +|----------|----------|----------|-------------| +""" + # Aggregate by material + by_material = {} + for room in input_data.rooms: + for surface in room.surfaces: + mat = surface.material.value + if mat not in by_material: + by_material[mat] = {"area": 0, "dispositions": set()} + by_material[mat]["area"] += surface.area_sf + if surface.disposition: + by_material[mat]["dispositions"].add(surface.disposition.value) + + for mat, data in sorted(by_material.items()): + category = self._get_material_category(mat) + disps = ", ".join(data["dispositions"]) if data["dispositions"] else "TBD" + inventory += f"| {mat.replace('-', ' ').title()} | {category} | {data['area']:,.0f} | {disps} |\n" + + # Detailed room breakdown + inventory += "\n### Detailed Inventory by Room\n" + + for room in input_data.rooms: + inventory += f"\n#### {room.name}\n\n" + inventory += "| Surface | Material | Area (SF) | Zone | Condition | Disposition |\n" + inventory += "|---------|----------|-----------|------|-----------|-------------|\n" + + for surface in room.surfaces: + zone = surface.zone.value if surface.zone else (room.zone_classification.value if room.zone_classification else "TBD") + condition = surface.condition.value if surface.condition else "TBD" + disposition = surface.disposition.value if surface.disposition else "TBD" + + inventory += f"| {surface.description} | {surface.material.value.replace('-', ' ').title()} | {surface.area_sf:,.0f} | {zone.title()} | {condition.title()} | {disposition.title()} |\n" + + return inventory + + def _get_material_category(self, material: str) -> str: + """Get material category.""" + non_porous = ["steel", "concrete", "glass", "metal", "cmu"] + semi_porous = ["drywall-painted", "wood-sealed"] + porous = ["drywall-unpainted", "carpet", "insulation", "acoustic-tile", "upholstery"] + hvac = ["ductwork-rigid", "ductwork-flexible", "hvac-interior-insulation"] + + if any(np in material for np in non_porous): + return "Non-Porous" + elif any(sp in material for sp in semi_porous): + return "Semi-Porous" + elif any(p in material for p in porous): + return "Porous" + elif any(h in material for h in hvac): + return "HVAC" + return "Other" + + def _build_regulatory_framework(self, input_data: AssessmentInput, rag_context: Dict) -> str: + """Build regulatory framework section.""" + + classification = input_data.project.facility_classification.value + + framework = f"""## Regulatory Framework + +### Facility Classification + +**Classification:** {classification.replace('-', '/').title()} + +{rag_context.get('regulatory_justification', '')} + +### Applicable Standards + +| Standard | Application | +|----------|-------------| +| BNL SOP IH75190 (Rev23) | Surface wipe clearance for metals | +| NADCA ACR 2021 | Air filtration requirements (4 ACH minimum) | +| IICRC/RIA/CIRI Technical Guide (Dec 2025) | Zone-based assessment framework | +""" + + if classification == "public-childcare": + framework += "| EPA/HUD Lead Standards (Oct 2024) | Public/Childcare lead thresholds |\n" + + return framework + + def _build_work_preparation(self, calculations: Dict) -> str: + """Build work preparation section.""" + + air_filt = calculations["air_filtration"] + + return f"""## Work Area Preparation + +### Air Filtration Requirements + +Per NADCA ACR 2021 Section 3.6, minimum 4 air changes per hour (ACH) required during restoration activities. + +**Calculation:** +``` +Work area volume: {air_filt['total_volume_cf']:,.0f} CF +Required ACH: {air_filt['required_ach']} +Air scrubber capacity: {air_filt['unit_cfm']:,} CFM per unit + +Units required: {air_filt['calculation']} +``` + +**Requirement:** {air_filt['units_required']} HEPA air scrubbers @ {air_filt['unit_cfm']:,} CFM each + +### Containment + +- Establish work area boundaries with warning signage +- Seal HVAC supply/return registers in work areas +- Maintain negative pressure during active cleaning +- Run air scrubbers continuously during work and minimum 4 hours after completion +""" + + def _build_cleaning_procedures(self, input_data: AssessmentInput, rag_context: Dict) -> str: + """Build cleaning procedures section.""" + + procedures = """## Surface-Specific Cleaning Procedures + +### Standard Cleaning Sequence (per FDAM §5.1) + +1. **HEPA Vacuum** — Remove loose particulate from all surfaces +2. **Dry Sponge** (if needed) — Chemical sponge for char/soot on non-porous surfaces +3. **Wet Wipe - Alkaline Detergent** — pH 10-12 solution for chemical residue removal +4. **Rinse Wipe** — Clean water to remove detergent residue +5. **Degreaser** (if needed) — For stubborn residues not removed by standard protocol + +**Sequencing Rule:** Clean top-down (roof deck → structure → walls → floor) to prevent recontamination. + +### Procedures by Surface Type + +| Surface Type | Standard Method | +|--------------|-----------------| +| Steel roof deck | HEPA vac → Wet wipe → Rinse | +| Steel joists/beams | HEPA vac → Wet wipe → Rinse | +| Steel columns | HEPA vac → Wet wipe → Rinse | +| Concrete floor | Scrubber machine + alkaline | +| CMU walls | HEPA vac → Wet wipe OR power wash | +| Metal doors | Wet wipe → Rinse | +| Rigid ductwork | Per NADCA ACR | +""" + return procedures + + def _build_removal_scope(self, input_data: AssessmentInput, calculations: Dict) -> str: + """Build removal scope section.""" + + removal_surfaces = [] + for room in input_data.rooms: + for surface in room.surfaces: + if surface.disposition in [Disposition.REMOVE, Disposition.REMOVE_REPAIR]: + removal_surfaces.append({ + "room": room.name, + "surface": surface.description, + "material": surface.material.value, + "area": surface.area_sf, + "disposition": surface.disposition.value + }) + + if not removal_surfaces: + return """## Removal Scope + +No materials identified for removal at this time. All surfaces designated for cleaning.""" + + removal = """## Removal Scope + +The following materials require removal and are beyond the scope of cleaning: + +| Room | Surface | Material | Area (SF) | Rationale | +|------|---------|----------|-----------|-----------| +""" + for item in removal_surfaces: + rationale = "Porous material contamination" if "porous" in self._get_material_category(item["material"]).lower() else "Structural damage" + removal += f"| {item['room']} | {item['surface']} | {item['material'].replace('-', ' ').title()} | {item['area']:,.0f} | {rationale} |\n" + + total_removal = sum(s["area"] for s in removal_surfaces) + removal += f"\n**Total Removal Area:** {total_removal:,.0f} SF\n" + + return removal + + def _build_labor_estimate(self, calculations: Dict) -> str: + """Build labor estimate section.""" + + labor = calculations["labor_estimate"] + + estimate = """## Labor Estimate + +### Hours by Task + +| Task | Estimated Hours | +|------|-----------------| +""" + for task, hours in labor["by_task"].items(): + if hours > 0: + estimate += f"| {task.replace('_', ' ').title()} | {hours} |\n" + + estimate += f""" +**Total Labor Hours:** {labor['total_hours']} +**Crew Days (2-person):** {labor['crew_days_2_person']} +**Crew Days (4-person):** {labor['crew_days_4_person']} + +*Note: {labor['note']}* +""" + return estimate + + def _build_equipment_requirements(self, calculations: Dict) -> str: + """Build equipment requirements section.""" + + equip = calculations["equipment"] + + requirements = f"""## Equipment Requirements + +### Air Filtration +- **HEPA Air Scrubbers:** {equip['air_scrubbers']['quantity']} units @ {equip['air_scrubbers']['cfm_each']:,} CFM each + +### Cleaning Equipment +- **HEPA Vacuums:** {equip['hepa_vacuums']['quantity']} units +""" + + if equip['lift_equipment']['required']: + requirements += f"- **Lift Equipment:** {equip['lift_equipment']['type'].replace('_', ' ').title()} (max ceiling height: {equip['lift_equipment']['max_ceiling_height']} ft)\n" + + requirements += f""" +### Supplies +- Alkaline Detergent: {equip['cleaning_supplies']['alkaline_detergent_gallons']} gallons +- Degreaser: {equip['cleaning_supplies']['degreaser_gallons']} gallons +- Dry Sponges: {equip['cleaning_supplies']['dry_sponges']} units +- Microfiber Cloths: {equip['cleaning_supplies']['microfiber_cloths']} units + +### Personal Protective Equipment +- PPE Sets: {equip['ppe_sets']['quantity']} +- Includes: {', '.join(equip['ppe_sets']['includes'])} +""" + return requirements + + def _build_sampling_section(self, input_data: AssessmentInput, calculations: Dict) -> str: + """Build sampling plan section.""" + + sampling = calculations["sample_density"] + + section = f"""## Sampling Plan Recommendations + +### Pre-Restoration Assessment (PRA) Sampling + +Based on total area of {sampling['total_sf']:,.0f} SF ({sampling['size_category']}): + +| Sample Type | Per Surface Type | Surface Types | Total Recommended | +|-------------|------------------|---------------|-------------------| +| Tape Lift (PLM) | {sampling['tape_lifts_per_type']} | {sampling['surface_types_count']} | {sampling['recommended_tape_lifts']} | +| Surface Wipe (ICP-MS) | {sampling['surface_wipes_per_type']} | {sampling['surface_types_count']} | {sampling['recommended_surface_wipes']} | + +### Surface Types Identified + +""" + for st in sampling["surface_types"]: + section += f"- {st.replace('-', ' ').title()}\n" + + if sampling["ceiling_deck_note"]: + section += f"\n**Ceiling Deck Protocol:** {sampling['ceiling_deck_note']}\n" + + section += """ +### Control Samples + +Control samples from unaffected areas are recommended for baseline comparison. Minimum 2 control samples per sample type. + +### Laboratory Requirements + +- Tape Lift Analysis: Polarized light microscopy (PLM) at AIHA-accredited laboratory +- Surface Wipe Analysis: ICP-MS or ICP-OES for metals at AIHA-accredited laboratory +- Sample Media: Ghost Wipes or equivalent pre-moistened media +- Sample Area: 100 cm² (10cm × 10cm template) per NIOSH Method 9100 +""" + return section + + def _build_acceptance_criteria(self, input_data: AssessmentInput, rag_context: Dict) -> str: + """Build acceptance criteria section.""" + + classification = input_data.project.facility_classification.value + + # Get appropriate thresholds + if classification == "operational": + lead_threshold = "500 µg/100cm²" + lead_source = "BNL SOP IH75190 Operational" + elif classification == "public-childcare": + lead_threshold = "0.54 µg/100cm² (floors), 4.3 µg/100cm² (sills/troughs)" + lead_source = "EPA/HUD October 2024" + else: + lead_threshold = "22 µg/100cm²" + lead_source = "BNL SOP IH75190 Non-Operational" + + return f"""## Acceptance Criteria + +### Post-Restoration Verification (PRV) Thresholds + +Post-restoration verification sampling will be conducted per FDAM methodology. The following clearance thresholds apply: + +#### Metals Thresholds + +| Analyte | Threshold | Unit | Source | +|---------|-----------|------|--------| +| Lead (Pb) | {lead_threshold} | µg/100cm² | {lead_source} | +| Cadmium (Cd) | 3.3 (Non-Op) / 50 (Op) | µg/100cm² | BNL SOP IH75190 | +| Arsenic (As) | 6.7 (Non-Op) / 100 (Op) | µg/100cm² | BNL SOP IH75190 | + +#### Particulate Thresholds + +| Analyte | Threshold | Unit | Classification | +|---------|-----------|------|----------------| +| Ash and Char | < 150 | particles/cm² | Professional Judgment* | +| Aciniform Soot | < 500 | particles/cm² | Professional Judgment* | + +*Professional Judgment thresholds validated at 93.3% first-pass clearance rate (n=45, QVC dataset). See FDAM Appendix B. + +### Pass/Fail Criteria + +- All samples must pass applicable thresholds +- Visual inspection confirms dust-free surfaces +- No detectable fire/smoke odor + +### Reclean/Retest Protocol + +Surfaces exceeding thresholds require reclean and retest until passing per FDAM §5.4. +""" + + def _build_regulatory_flags(self, calculations: Dict) -> str: + """Build regulatory flags section.""" + + flags = calculations["regulatory_flags"]["flags"] + + section = """## Regulatory Considerations + +The following regulatory flags have been identified for this project: + +""" + for flag in flags: + severity_icon = "🔴" if flag["severity"] == "high" else "🟡" + section += f"""### {severity_icon} {flag['type'].upper()} + +{flag['message']} + +*Reference: {flag['reference']}* + +""" + return section + + def _build_confidence_notes(self, input_data: AssessmentInput) -> str: + """Build confidence notes section.""" + + notes = """## Assessment Confidence Notes + +### AI-Assisted Analysis + +This assessment utilized AI-powered image analysis to assist with: +- Zone classification +- Material identification +- Condition assessment +- Combustion particle pattern recognition + +### Items Flagged for Review + +""" + flagged = [] + for room in input_data.rooms: + if room.zone_confidence and room.zone_confidence < 0.7: + flagged.append(f"- **{room.name}**: Zone classification confidence {room.zone_confidence:.0%} - recommend verification") + + for surface in room.surfaces: + if surface.ai_detected and surface.confidence and surface.confidence < 0.7: + flagged.append(f"- **{room.name} - {surface.description}**: Material detection confidence {surface.confidence:.0%} - recommend verification") + + if flagged: + notes += "\n".join(flagged) + else: + notes += "No items flagged for additional review. All confidence scores above 70% threshold." + + notes += """ + +### Limitations + +1. **Visual Analysis Only** — Definitive particle identification requires laboratory microscopy (PLM/SEM) +2. **Surface Area Estimates** — Areas from images are approximations; user-provided dimensions used for calculations +3. **Odor Assessment** — Odor presence/intensity based on user-reported observations, not instrument measurement +4. **Professional Review Required** — This specification should be reviewed by a qualified industrial hygienist before execution + +### Standards Basis Statement + +Metals thresholds are standards-based per BNL SOP IH75190 (Rev23, 06/23/17). Particulate thresholds represent professional judgment with empirical validation (93.3% pass rate, n=45). See FDAM v4.0.1 for complete methodology documentation. +""" + return notes + + def _generate_sampling_plan(self, input_data: AssessmentInput, calculations: Dict) -> str: + """Generate standalone sampling plan document.""" + + sampling = calculations["sample_density"] + + plan = f"""# Sampling Plan + +## Project: {input_data.project.project_name} +## Date: {datetime.now().strftime('%B %d, %Y')} + +--- + +## Summary + +| Parameter | Value | +|-----------|-------| +| Total Area | {sampling['total_sf']:,.0f} SF | +| Size Category | {sampling['size_category']} | +| Surface Types | {sampling['surface_types_count']} | +| Recommended Tape Lifts | {sampling['recommended_tape_lifts']} | +| Recommended Surface Wipes | {sampling['recommended_surface_wipes']} | + +--- + +## Sample Locations by Room + +""" + for room in input_data.rooms: + zone = room.zone_classification.value if room.zone_classification else "TBD" + + plan += f"""### {room.name} ({zone.title()} Zone) + +**Dimensions:** {room.dimensions.length_ft}' × {room.dimensions.width_ft}' × {room.dimensions.ceiling_height_ft}' = {room.dimensions.area_sf:,.0f} SF + +**Recommended Sample Locations:** + +| Location | Surface Type | Sample Type | Priority | +|----------|--------------|-------------|----------| +""" + # Generate sample locations based on surfaces + for i, surface in enumerate(room.surfaces[:5]): # Top 5 surfaces per room + sample_type = "Both" if surface.material.value in ["steel", "concrete"] else "Tape Lift" + priority = "High" if zone == "near-field" else "Medium" + plan += f"| {surface.description} | {surface.material.value.replace('-', ' ').title()} | {sample_type} | {priority} |\n" + + plan += "\n" + + if sampling["ceiling_deck_note"]: + plan += f"""--- + +## Ceiling Deck Enhanced Sampling + +{sampling['ceiling_deck_note']} + +Per FDAM §4.5, ceiling deck surfaces exhibit higher post-cleaning contamination rates (82.4% vs 95%+ for other surfaces). Increase sample density by 50% for ceiling decks. +""" + + plan += """--- + +## Control Sample Locations + +Collect control samples from unaffected areas for baseline comparison: + +1. **Control Location 1:** [To be determined on-site - area with no visible contamination] +2. **Control Location 2:** [To be determined on-site - separate building or wing if available] + +Minimum 2 control samples per sample type (tape lift and surface wipe). + +--- + +## Laboratory Instructions + +- **Laboratory:** AIHA-accredited laboratory +- **Tape Lift Analysis:** Polarized light microscopy (PLM) +- **Surface Wipe Analysis:** ICP-MS for metals (Pb, Cd, As) +- **Reporting Format:** Request particles/cm² format when available +- **Turnaround:** Standard (5-7 business days) unless expedited required +""" + + return plan + + def _generate_confidence_summary(self, input_data: AssessmentInput) -> str: + """Generate confidence summary for flagged items.""" + + summary = f"""# Confidence Summary Report + +## Project: {input_data.project.project_name} +## Generated: {datetime.now().strftime('%B %d, %Y at %I:%M %p')} + +--- + +## Overall Assessment Confidence + +""" + # Calculate overall confidence + confidences = [] + for room in input_data.rooms: + if room.zone_confidence: + confidences.append(("Zone", room.name, room.zone_confidence)) + for surface in room.surfaces: + if surface.confidence: + confidences.append(("Material", f"{room.name} - {surface.description}", surface.confidence)) + + if confidences: + avg_confidence = sum(c[2] for c in confidences) / len(confidences) + summary += f"**Average Confidence Score:** {avg_confidence:.0%}\n\n" + + # Confidence breakdown + high_conf = [c for c in confidences if c[2] >= 0.9] + med_conf = [c for c in confidences if 0.7 <= c[2] < 0.9] + low_conf = [c for c in confidences if c[2] < 0.7] + + summary += f"""### Confidence Distribution + +| Level | Count | Percentage | +|-------|-------|------------| +| High (≥90%) | {len(high_conf)} | {len(high_conf)/len(confidences)*100:.0f}% | +| Medium (70-89%) | {len(med_conf)} | {len(med_conf)/len(confidences)*100:.0f}% | +| Low (<70%) | {len(low_conf)} | {len(low_conf)/len(confidences)*100:.0f}% | + +""" + if low_conf: + summary += """--- + +## Items Requiring Review + +The following items have confidence scores below 70% and require professional verification: + +| Type | Location | Confidence | Recommendation | +|------|----------|------------|----------------| +""" + for item_type, location, conf in low_conf: + rec = "Verify on-site" if item_type == "Material" else "Confirm zone classification" + summary += f"| {item_type} | {location} | {conf:.0%} | {rec} |\n" + else: + summary += "No AI-generated confidence scores available. All determinations were user-provided.\n" + + return summary +``` + +--- + +## 10. Gradio UI Specification + +### Multi-Tab Interface + +```python +# ui/app.py + +import gradio as gr +from typing import Dict, List, Tuple +from PIL import Image +import io + +class FDAMUI: + """Multi-tab Gradio interface for FDAM AI Pipeline.""" + + def __init__(self, pipeline): + self.pipeline = pipeline + self.state = {} + + def build(self) -> gr.Blocks: + """Build the Gradio interface.""" + + with gr.Blocks( + title="FDAM AI Pipeline - Fire Damage Assessment", + theme=gr.themes.Soft(), + css=self._custom_css() + ) as app: + + gr.Markdown(""" + # 🔥 FDAM AI Pipeline + ## Fire Damage Assessment Methodology v4.0.1 + + Upload images and project information to generate a professional Cleaning Specification / Scope of Work. + """) + + with gr.Tabs() as tabs: + + # Tab 1: Project Information + with gr.Tab("1. Project Info", id="project"): + with gr.Row(): + with gr.Column(): + project_name = gr.Textbox(label="Project/Facility Name", placeholder="e.g., ABC Warehouse") + address = gr.Textbox(label="Street Address") + + with gr.Row(): + city = gr.Textbox(label="City") + state = gr.Textbox(label="State", max_lines=1) + zip_code = gr.Textbox(label="ZIP Code", max_lines=1) + + with gr.Column(): + client_name = gr.Textbox(label="Client Name") + client_contact = gr.Textbox(label="Client Contact (optional)") + client_email = gr.Textbox(label="Client Email (optional)") + client_phone = gr.Textbox(label="Client Phone (optional)") + + with gr.Row(): + fire_date = gr.Textbox(label="Fire Date", placeholder="YYYY-MM-DD") + assessment_date = gr.Textbox(label="Assessment Date", placeholder="YYYY-MM-DD") + + with gr.Row(): + facility_classification = gr.Radio( + choices=["Non-Operational", "Operational", "Public/Childcare"], + label="Facility Classification", + value="Non-Operational", + info="See FDAM §3.1 for classification criteria" + ) + + construction_era = gr.Radio( + choices=["Pre-1980", "1980-2000", "Post-2000"], + label="Construction Era", + value="Post-2000", + info="Affects LBP/ACM regulatory flags" + ) + + with gr.Row(): + assessor_name = gr.Textbox(label="Assessor Name") + assessor_credentials = gr.Textbox(label="Credentials (optional)", placeholder="CIH, CSP, etc.") + + # Tab 2: Building/Rooms + with gr.Tab("2. Building/Rooms", id="rooms"): + gr.Markdown("### Add rooms/areas to assess") + + rooms_data = gr.State([]) + + with gr.Row(): + with gr.Column(scale=2): + room_name = gr.Textbox(label="Room/Area Name", placeholder="e.g., Warehouse Bay A") + room_floor = gr.Textbox(label="Floor (optional)", placeholder="e.g., Ground Floor") + + with gr.Row(): + room_length = gr.Number(label="Length (ft)", minimum=1, maximum=10000) + room_width = gr.Number(label="Width (ft)", minimum=1, maximum=10000) + room_height = gr.Number(label="Ceiling Height (ft)", minimum=1, maximum=500) + + room_zone = gr.Radio( + choices=["Undetermined (AI will analyze)", "Burn Zone", "Near-Field", "Far-Field"], + label="Zone Classification (optional - can be AI-determined)", + value="Undetermined (AI will analyze)" + ) + + add_room_btn = gr.Button("➕ Add Room", variant="primary") + + with gr.Column(scale=3): + rooms_table = gr.Dataframe( + headers=["ID", "Name", "Floor", "L × W × H", "Area (SF)", "Zone"], + datatype=["str", "str", "str", "str", "number", "str"], + label="Rooms Added", + interactive=False + ) + + clear_rooms_btn = gr.Button("🗑️ Clear All Rooms", variant="secondary") + + gr.Markdown("### Manual Surface Entry (optional)") + gr.Markdown("*Surfaces can also be detected automatically from images*") + + with gr.Row(): + with gr.Column(): + surface_room_id = gr.Dropdown(label="Room", choices=[], interactive=True) + surface_material = gr.Dropdown( + label="Material", + choices=[ + "Steel", "Concrete", "Glass", "Metal", "CMU", + "Drywall (Painted)", "Drywall (Unpainted)", + "Wood (Sealed)", "Wood (Unsealed)", + "Carpet", "Carpet Pad", "Insulation (Fiberglass)", + "Acoustic Tile", "Upholstery", + "Ductwork (Rigid)", "Ductwork (Flexible)" + ] + ) + surface_description = gr.Textbox(label="Description", placeholder="e.g., North wall") + surface_area = gr.Number(label="Area (SF)", minimum=0) + add_surface_btn = gr.Button("➕ Add Surface") + + with gr.Column(): + surfaces_table = gr.Dataframe( + headers=["Room", "Material", "Description", "Area (SF)"], + datatype=["str", "str", "str", "number"], + label="Surfaces Added", + interactive=False + ) + + # Tab 3: Images + with gr.Tab("3. Images", id="images"): + gr.Markdown(""" + ### Upload Fire Damage Images + + Upload 1-20 images for AI analysis. The system will identify: + - Zone classification (Burn/Near-Field/Far-Field) + - Materials present + - Condition assessment + - Combustion particle indicators + """) + + images_data = gr.State([]) + + with gr.Row(): + with gr.Column(scale=2): + image_upload = gr.Image( + label="Upload Image", + type="pil", + sources=["upload"] + ) + + image_room = gr.Dropdown( + label="Associated Room", + choices=[], + interactive=True + ) + + image_description = gr.Textbox( + label="Image Description (optional)", + placeholder="e.g., View of ceiling deck from center aisle" + ) + + add_image_btn = gr.Button("➕ Add Image", variant="primary") + + with gr.Column(scale=3): + images_gallery = gr.Gallery( + label="Images Added", + columns=3, + height="auto", + object_fit="contain" + ) + + images_info = gr.Dataframe( + headers=["#", "Room", "Description", "Status"], + datatype=["number", "str", "str", "str"], + label="Image Details" + ) + + clear_images_btn = gr.Button("🗑️ Clear All Images", variant="secondary") + + # Tab 4: Observations + with gr.Tab("4. Observations", id="observations"): + gr.Markdown(""" + ### Qualitative Observation Checklist + + Per FDAM §2.3, document the following field observations: + """) + + with gr.Row(): + with gr.Column(): + gr.Markdown("#### Odor Assessment") + smoke_odor = gr.Checkbox(label="Smoke/fire odor present?") + odor_intensity = gr.Radio( + choices=["None", "Faint", "Moderate", "Strong"], + label="Odor Intensity", + visible=True + ) + + gr.Markdown("#### Visible Contamination") + visible_soot = gr.Checkbox(label="Visible soot deposits?") + soot_pattern = gr.Textbox(label="Soot pattern description (if present)", visible=True) + + large_char = gr.Checkbox(label="Large char particles observed?") + char_density = gr.Radio( + choices=["Sparse", "Moderate", "Dense"], + label="Char density estimate", + visible=True + ) + + with gr.Column(): + ash_residue = gr.Checkbox(label="Ash-like residue present?") + ash_description = gr.Textbox(label="Ash color/texture (if present)") + + surface_discoloration = gr.Checkbox(label="Surface discoloration?") + discoloration_description = gr.Textbox(label="Discoloration description") + + dust_interference = gr.Checkbox(label="Dust loading or interference?") + dust_notes = gr.Textbox(label="Dust notes") + + wildfire_indicators = gr.Checkbox(label="Wildfire indicators (burned soil/pollen/vegetation)?") + wildfire_notes = gr.Textbox(label="Wildfire notes") + + additional_notes = gr.Textbox( + label="Additional Observations", + lines=3, + placeholder="Any other relevant observations..." + ) + + # Tab 5: Results + with gr.Tab("5. Generate Results", id="results"): + gr.Markdown(""" + ### Generate Assessment Documents + + Click below to process all inputs and generate: + 1. **Cleaning Specification / Scope of Work** (primary output) + 2. **Sampling Plan Recommendations** + 3. **Confidence Report** + """) + + with gr.Row(): + generate_btn = gr.Button( + "🚀 Generate Assessment", + variant="primary", + scale=2 + ) + + processing_status = gr.Textbox( + label="Status", + value="Ready", + interactive=False + ) + + with gr.Row(): + with gr.Column(): + gr.Markdown("### Annotated Images") + annotated_gallery = gr.Gallery( + label="AI-Analyzed Images", + columns=2, + height="auto" + ) + + with gr.Column(): + gr.Markdown("### Quick Stats") + stats_output = gr.JSON(label="Assessment Statistics") + + gr.Markdown("### Cleaning Specification / Scope of Work") + sow_output = gr.Markdown(label="SOW Preview") + + with gr.Row(): + download_md = gr.File(label="Download Markdown (.md)") + download_pdf = gr.File(label="Download PDF (.pdf)") + + gr.Markdown("### Sampling Plan") + with gr.Accordion("View Sampling Plan", open=False): + sampling_output = gr.Markdown() + + gr.Markdown("### Confidence Report") + with gr.Accordion("View Confidence Report", open=False): + confidence_output = gr.Markdown() + + # Event handlers + add_room_btn.click( + fn=self._add_room, + inputs=[rooms_data, room_name, room_floor, room_length, room_width, room_height, room_zone], + outputs=[rooms_data, rooms_table, surface_room_id, image_room, room_name, room_floor, room_length, room_width, room_height] + ) + + clear_rooms_btn.click( + fn=self._clear_rooms, + inputs=[], + outputs=[rooms_data, rooms_table, surface_room_id, image_room] + ) + + add_image_btn.click( + fn=self._add_image, + inputs=[images_data, image_upload, image_room, image_description], + outputs=[images_data, images_gallery, images_info, image_upload, image_description] + ) + + clear_images_btn.click( + fn=self._clear_images, + inputs=[], + outputs=[images_data, images_gallery, images_info] + ) + + generate_btn.click( + fn=self._generate_assessment, + inputs=[ + # Project info + project_name, address, city, state, zip_code, + client_name, client_contact, client_email, client_phone, + fire_date, assessment_date, facility_classification, construction_era, + assessor_name, assessor_credentials, + # Rooms and images + rooms_data, images_data, + # Observations + smoke_odor, odor_intensity, visible_soot, soot_pattern, + large_char, char_density, ash_residue, ash_description, + surface_discoloration, discoloration_description, + dust_interference, dust_notes, wildfire_indicators, wildfire_notes, + additional_notes + ], + outputs=[ + processing_status, annotated_gallery, stats_output, + sow_output, download_md, download_pdf, + sampling_output, confidence_output + ] + ) + + return app + + def _custom_css(self) -> str: + """Custom CSS for styling.""" + return """ + .primary-btn { background-color: #ff6b35 !important; } + .tab-selected { border-bottom: 3px solid #ff6b35 !important; } + """ + + # Event handler implementations... + def _add_room(self, rooms_data, name, floor, length, width, height, zone): + # Implementation + pass + + def _clear_rooms(self): + # Implementation + pass + + def _add_image(self, images_data, image, room, description): + # Implementation + pass + + def _clear_images(self): + # Implementation + pass + + async def _generate_assessment(self, *args): + # Implementation - calls pipeline + pass +``` + +--- + +## 11. Confidence Framework + +### Confidence Thresholds + +| Level | Score Range | Action | +|-------|-------------|--------| +| Very High | 90-100% | Accept without review | +| High | 70-89% | Accept, note in report | +| Moderate | 50-69% | Flag for human review | +| Low | <50% | Require human verification | + +### Confidence Application Rules + +```python +# pipeline/confidence.py + +class ConfidenceFramework: + """FDAM confidence framework for AI determinations.""" + + THRESHOLDS = { + "very_high": 0.90, + "high": 0.70, + "moderate": 0.50, + "low": 0.0 + } + + @staticmethod + def get_level(confidence: float) -> str: + """Get confidence level from score.""" + if confidence >= 0.90: + return "very_high" + elif confidence >= 0.70: + return "high" + elif confidence >= 0.50: + return "moderate" + else: + return "low" + + @staticmethod + def requires_review(confidence: float) -> bool: + """Check if confidence requires human review.""" + return confidence < 0.70 + + @staticmethod + def format_confidence(confidence: float) -> str: + """Format confidence for display.""" + level = ConfidenceFramework.get_level(confidence) + emoji = { + "very_high": "🟢", + "high": "🟢", + "moderate": "🟡", + "low": "🔴" + } + return f"{emoji[level]} {confidence:.0%} ({level.replace('_', ' ').title()})" +``` + +--- + +## 12. Project Structure + +``` +fdam-ai-pipeline/ +├── README.md +├── requirements.txt +├── app.py # Main Gradio application entry point +│ +├── config/ +│ ├── __init__.py +│ ├── inference.py # Model inference configuration +│ └── settings.py # Application settings +│ +├── models/ +│ ├── __init__.py +│ └── loader.py # Model loading and management +│ +├── rag/ +│ ├── __init__.py +│ ├── chunker.py # Knowledge base chunking +│ ├── vectorstore.py # ChromaDB setup +│ └── retriever.py # RAG retrieval strategies +│ +├── schemas/ +│ ├── __init__.py +│ ├── input.py # Pydantic input models +│ └── output.py # Pydantic output models +│ +├── pipeline/ +│ ├── __init__.py +│ ├── main.py # Main processing pipeline +│ ├── vision.py # Vision analysis module +│ ├── calculations.py # FDAM calculation engine +│ ├── generator.py # Document generation +│ └── confidence.py # Confidence framework +│ +├── ui/ +│ ├── __init__.py +│ └── app.py # Gradio UI components +│ +├── rag_knowledge/ # RAG knowledge base +│ ├── README.md +│ ├── methodology/ +│ │ ├── FDAM_v4.0.1/ +│ │ └── sampling/ +│ ├── lab_methods/ +│ │ ├── EAA_Method_Guide/ +│ │ └── Hayes_Reference/ +│ ├── standards/ +│ │ ├── BNL_SOP_IH75190/ +│ │ ├── EPA_HUD_Lead/ +│ │ ├── NADCA_ACR_2021/ +│ │ └── IICRC_RIA_CIRI/ +│ ├── regulatory/ +│ └── reference_images/ +│ +├── chroma_db/ # ChromaDB persistence +│ +├── outputs/ # Generated documents +│ +└── tests/ + ├── __init__.py + ├── test_pipeline.py + ├── test_calculations.py + └── test_rag.py +``` + +--- + +## 13. Implementation Notes + +### Critical Implementation Details + +1. **Model Loading Order** + - Load all three models at startup (no swapping) + - Use `torch.bfloat16` for memory efficiency + - Use `device_map="auto"` for automatic GPU allocation + +2. **RAG Knowledge Base Setup** + - Pre-chunk and index at deployment time + - Store ChromaDB in persistent storage + - Rebuild index only when knowledge base changes + +3. **Image Processing** + - Accept JPEG, PNG, WebP formats + - Resize images > 2048px for processing efficiency + - Maintain original for annotation overlay + +4. **Output Generation** + - Generate Markdown as primary format + - Convert to PDF via pandoc for download + - Store temporary files in `/tmp` or designated output directory + +5. **Error Handling** + - Graceful degradation if vision analysis fails + - Default to conservative dispositions on uncertainty + - Log all errors for debugging + +### HuggingFace Spaces Requirements + +```txt +# requirements.txt + +torch>=2.0.0 +transformers>=4.40.0 +accelerate>=0.27.0 +gradio>=4.0.0 +chromadb>=0.4.0 +pydantic>=2.0.0 +pillow>=10.0.0 +opencv-python>=4.8.0 +pandas>=2.0.0 +numpy>=1.24.0 +``` + +### Deployment Configuration + +```yaml +# README.md (HuggingFace Spaces metadata) + +--- +title: FDAM AI Pipeline +emoji: 🔥 +colorFrom: orange +colorTo: red +sdk: gradio +sdk_version: 4.44.0 +app_file: app.py +pinned: true +license: apache-2.0 +suggested_hardware: a100-large +--- +``` + +--- + +## Document End + +**Version:** 1.0 +**Last Updated:** January 2026 +**Methodology Reference:** FDAM v4.0.1 + +This specification is intended for implementation by a Claude coding agent. All code examples are illustrative and should be adapted based on actual model API requirements and HuggingFace Spaces constraints.