Spaces:
Paused
Paused
| """FDAM Pipeline Orchestrator. | |
| Coordinates the 6-stage processing pipeline: | |
| 1. Input Validation | |
| 2. Vision Analysis | |
| 3. RAG Retrieval | |
| 4. FDAM Logic (Dispositions) | |
| 5. Calculations | |
| 6. Document Generation | |
| """ | |
| import logging | |
| import time | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from typing import Callable, Optional, TYPE_CHECKING | |
| from PIL import Image | |
| import io | |
| from ui.state import SessionState | |
| from ui.components import image_store | |
| from models.loader import get_models | |
| logger = logging.getLogger(__name__) | |
| # Type hints only - actual import deferred to retriever property | |
| if TYPE_CHECKING: | |
| from rag import FDAMRetriever, ChromaVectorStore | |
| from .calculations import FDAMCalculator | |
| from .dispositions import DispositionEngine, SurfaceDisposition | |
| from .generator import DocumentGenerator, GeneratedDocument | |
| class PipelineProgress: | |
| """Progress information for pipeline execution.""" | |
| stage: int | |
| total_stages: int | |
| stage_name: str | |
| percent: float | |
| message: str | |
| class VisionResult: | |
| """Result from vision analysis of a single image.""" | |
| image_id: str | |
| filename: str | |
| room_id: str | |
| zone: dict | |
| condition: dict | |
| materials: list[dict] | |
| bounding_boxes: list[dict] | |
| raw_response: dict | |
| class PipelineResult: | |
| """Complete result from pipeline execution.""" | |
| success: bool | |
| session: SessionState | |
| vision_results: dict[str, VisionResult] | |
| dispositions: list[SurfaceDisposition] | |
| calculations: dict | |
| document: Optional[GeneratedDocument] | |
| annotated_images: list[tuple] # (PIL.Image, caption) | |
| errors: list[str] = field(default_factory=list) | |
| warnings: list[str] = field(default_factory=list) | |
| execution_time_seconds: float = 0.0 | |
| ProgressCallback = Callable[[PipelineProgress], None] | |
| class FDAMPipeline: | |
| """Main FDAM processing pipeline.""" | |
| STAGES = [ | |
| "Validating inputs", | |
| "Analyzing images", | |
| "Retrieving context", | |
| "Applying FDAM logic", | |
| "Running calculations", | |
| "Generating documents", | |
| ] | |
| def __init__( | |
| self, | |
| calculator: Optional[FDAMCalculator] = None, | |
| disposition_engine: Optional[DispositionEngine] = None, | |
| generator: Optional[DocumentGenerator] = None, | |
| retriever: Optional["FDAMRetriever"] = None, | |
| ): | |
| """Initialize pipeline with optional component overrides. | |
| Args: | |
| calculator: FDAM calculator instance | |
| disposition_engine: Disposition engine instance | |
| generator: Document generator instance | |
| retriever: RAG retriever instance | |
| """ | |
| self.calculator = calculator or FDAMCalculator() | |
| self._retriever = retriever | |
| self.disposition_engine = disposition_engine or DispositionEngine( | |
| retriever=self._retriever | |
| ) | |
| self.generator = generator or DocumentGenerator( | |
| calculator=self.calculator, | |
| disposition_engine=self.disposition_engine, | |
| retriever=self._retriever, | |
| ) | |
| def retriever(self) -> "FDAMRetriever": | |
| """Get or create RAG retriever.""" | |
| if self._retriever is None: | |
| # Lazy import to avoid chromadb dependency at module load | |
| from rag import FDAMRetriever, ChromaVectorStore | |
| try: | |
| vs = ChromaVectorStore(persist_directory="chroma_db") | |
| self._retriever = FDAMRetriever(vectorstore=vs) | |
| except Exception as e: | |
| logger.warning(f"ChromaDB init failed, using fallback retriever: {e}") | |
| self._retriever = FDAMRetriever() | |
| return self._retriever | |
| def execute( | |
| self, | |
| session: SessionState, | |
| progress_callback: Optional[ProgressCallback] = None, | |
| ) -> PipelineResult: | |
| """Execute the full FDAM pipeline. | |
| Args: | |
| session: Session state with all input data | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| PipelineResult with all outputs | |
| """ | |
| pipeline_start = time.time() | |
| start_time = datetime.now() | |
| errors = [] | |
| warnings = [] | |
| logger.info("=" * 60) | |
| logger.info("FDAM PIPELINE EXECUTION STARTED") | |
| logger.info("=" * 60) | |
| logger.info(f"Room: {session.room.name}") | |
| logger.info(f"Facility: {session.room.facility_classification}") | |
| logger.info(f"Images: {len(session.images)}") | |
| def report_progress(stage: int, message: str = ""): | |
| if progress_callback: | |
| progress_callback( | |
| PipelineProgress( | |
| stage=stage, | |
| total_stages=len(self.STAGES), | |
| stage_name=self.STAGES[stage - 1] if stage > 0 else "Starting", | |
| percent=stage / len(self.STAGES), | |
| message=message, | |
| ) | |
| ) | |
| # Stage 1: Input Validation | |
| stage_start = time.time() | |
| logger.info("Stage 1/6: Input Validation") | |
| report_progress(1, "Validating inputs...") | |
| can_generate, validation_errors = session.can_generate() | |
| # Check images in store | |
| expected_ids = [img.id for img in session.images] | |
| missing_ids = image_store.get_missing_ids(expected_ids) | |
| if not can_generate or missing_ids: | |
| errors.extend(validation_errors) | |
| if missing_ids: | |
| errors.append(f"{len(missing_ids)} image(s) need to be re-uploaded") | |
| logger.error(f"Validation failed with {len(errors)} error(s)") | |
| for err in errors: | |
| logger.error(f" - {err}") | |
| return PipelineResult( | |
| success=False, | |
| session=session, | |
| vision_results={}, | |
| dispositions=[], | |
| calculations={}, | |
| document=None, | |
| annotated_images=[], | |
| errors=errors, | |
| execution_time_seconds=(datetime.now() - start_time).total_seconds(), | |
| ) | |
| logger.debug(f"Stage 1 completed in {time.time() - stage_start:.2f}s") | |
| # Stage 2: Vision Analysis | |
| stage_start = time.time() | |
| logger.info(f"Stage 2/6: Vision Analysis ({len(session.images)} images)") | |
| report_progress(2, "Analyzing images with AI...") | |
| model_stack = get_models() | |
| vision_results = {} | |
| annotated_images = [] | |
| room_mapping = {} | |
| for i, img_meta in enumerate(session.images): | |
| logger.debug(f"Analyzing image {i+1}/{len(session.images)}: {img_meta.filename}") | |
| img_bytes = image_store.get(img_meta.id) | |
| if not img_bytes: | |
| warnings.append(f"Image {img_meta.filename} not found in store") | |
| continue | |
| try: | |
| pil_image = Image.open(io.BytesIO(img_bytes)) | |
| # Run vision analysis | |
| result = model_stack.vision.analyze_image( | |
| pil_image, | |
| img_meta.description or "", | |
| ) | |
| vision_result = VisionResult( | |
| image_id=img_meta.id, | |
| filename=img_meta.filename, | |
| room_id=img_meta.room_id, | |
| zone=result.get("zone", {}), | |
| condition=result.get("condition", {}), | |
| materials=result.get("materials", []), | |
| bounding_boxes=result.get("bounding_boxes", []), | |
| raw_response=result, | |
| ) | |
| vision_results[img_meta.id] = vision_result | |
| # Build room mapping (single room) | |
| room_mapping[img_meta.id] = { | |
| "name": session.room.name, | |
| "id": session.room.id, | |
| } | |
| # Create annotated image caption | |
| zone_class = result.get("zone", {}).get("classification", "N/A") | |
| zone_conf = result.get("zone", {}).get("confidence", 0) | |
| caption = f"{img_meta.filename}\nZone: {zone_class} ({zone_conf:.0%})" | |
| annotated_images.append((pil_image, caption)) | |
| report_progress( | |
| 2, | |
| f"Analyzed {i + 1}/{len(session.images)}: {img_meta.filename}", | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Error analyzing {img_meta.filename}: {e}") | |
| warnings.append(f"Error analyzing {img_meta.filename}: {e}") | |
| logger.info(f"Stage 2 completed in {time.time() - stage_start:.2f}s: " | |
| f"{len(vision_results)} images analyzed") | |
| # Stage 3: RAG Retrieval | |
| stage_start = time.time() | |
| logger.info("Stage 3/6: RAG Retrieval") | |
| report_progress(3, "Retrieving FDAM methodology context...") | |
| # RAG is integrated into disposition engine, just verify connection | |
| try: | |
| test_results = self.retriever.retrieve("test connection", top_k=1) | |
| logger.debug(f"RAG connection verified: {len(test_results)} results") | |
| except Exception as e: | |
| logger.warning(f"RAG retrieval unavailable: {e}") | |
| warnings.append(f"RAG retrieval unavailable: {e}") | |
| logger.debug(f"Stage 3 completed in {time.time() - stage_start:.2f}s") | |
| # Stage 4: FDAM Logic (Dispositions) | |
| stage_start = time.time() | |
| logger.info("Stage 4/6: FDAM Logic (Dispositions)") | |
| report_progress(4, "Applying disposition logic...") | |
| # Convert vision results to dict format for disposition engine | |
| vision_dict = { | |
| img_id: { | |
| "zone": vr.zone, | |
| "condition": vr.condition, | |
| "materials": vr.materials, | |
| } | |
| for img_id, vr in vision_results.items() | |
| } | |
| dispositions = self.disposition_engine.process_vision_results( | |
| vision_results=vision_dict, | |
| room_mapping=room_mapping, | |
| ) | |
| logger.info(f"Stage 4 completed in {time.time() - stage_start:.2f}s: " | |
| f"{len(dispositions)} dispositions generated") | |
| # Stage 5: Calculations | |
| stage_start = time.time() | |
| logger.info("Stage 5/6: Calculations") | |
| report_progress(5, "Running FDAM calculations...") | |
| calculations = self.calculator.calculate_from_session(session) | |
| logger.debug(f"Calculations: area={calculations.get('total_area_sf', 0):.0f} SF, " | |
| f"volume={calculations.get('total_volume_cf', 0):.0f} CF") | |
| logger.debug(f"Stage 5 completed in {time.time() - stage_start:.2f}s") | |
| # Stage 6: Document Generation | |
| stage_start = time.time() | |
| logger.info("Stage 6/6: Document Generation") | |
| report_progress(6, "Generating documents...") | |
| document = self.generator.generate_sow( | |
| session=session, | |
| vision_results=vision_dict, | |
| surface_dispositions=dispositions, | |
| calculations=calculations, | |
| ) | |
| logger.info(f"Stage 6 completed in {time.time() - stage_start:.2f}s: " | |
| f"{len(document.sections)} sections generated") | |
| # Update session | |
| session.has_results = True | |
| session.results_generated_at = datetime.now().isoformat() | |
| session.update_timestamp() | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| total_time = time.time() - pipeline_start | |
| # Log final summary | |
| logger.info("=" * 60) | |
| logger.info("PIPELINE EXECUTION SUMMARY") | |
| logger.info("=" * 60) | |
| logger.info("Success: True") | |
| logger.info(f"Total execution time: {total_time:.2f}s") | |
| logger.info(f"Images analyzed: {len(vision_results)}") | |
| logger.info(f"Dispositions generated: {len(dispositions)}") | |
| logger.info(f"Document sections: {len(document.sections)}") | |
| logger.info(f"Warnings: {len(warnings)}") | |
| if warnings: | |
| for w in warnings: | |
| logger.warning(f" - {w}") | |
| logger.info("=" * 60) | |
| return PipelineResult( | |
| success=True, | |
| session=session, | |
| vision_results=vision_results, | |
| dispositions=dispositions, | |
| calculations=calculations, | |
| document=document, | |
| annotated_images=annotated_images, | |
| errors=errors, | |
| warnings=warnings, | |
| execution_time_seconds=execution_time, | |
| ) | |
| def generate_stats_dict(self, result: PipelineResult) -> dict: | |
| """Generate statistics dictionary for UI display. | |
| Args: | |
| result: Pipeline execution result | |
| Returns: | |
| Dictionary with stats for JSON display | |
| """ | |
| calc = result.calculations | |
| air = calc.get("air_filtration") | |
| sample = calc.get("sample_density") | |
| reg = calc.get("regulatory_flags") | |
| thresholds = calc.get("metals_thresholds") | |
| # Count dispositions by type | |
| disp_counts = {} | |
| for d in result.dispositions: | |
| disp_counts[d.disposition] = disp_counts.get(d.disposition, 0) + 1 | |
| return { | |
| "room_name": result.session.room.name, | |
| "facility_classification": result.session.room.facility_classification, | |
| "construction_era": result.session.room.construction_era, | |
| "total_images": len(result.session.images), | |
| "images_analyzed": len(result.vision_results), | |
| "total_floor_area_sf": f"{calc.get('total_area_sf', 0):,.0f}", | |
| "total_volume_cf": f"{calc.get('total_volume_cf', 0):,.0f}", | |
| "air_scrubbers_required": air.units_required if air else 0, | |
| "tape_lifts_recommended": f"{sample.tape_lifts_min}-{sample.tape_lifts_max}" if sample else "N/A", | |
| "surface_wipes_recommended": f"{sample.surface_wipes_min}-{sample.surface_wipes_max}" if sample else "N/A", | |
| "disposition_counts": disp_counts, | |
| "regulatory_flags": reg.notes if reg else [], | |
| "lead_threshold": f"{thresholds.lead_ug_100cm2} µg/100cm²" if thresholds else "N/A", | |
| "execution_time": f"{result.execution_time_seconds:.1f}s", | |
| "warnings": result.warnings, | |
| } | |