"""compliance_analyst node — multimodal fusion of images + code for compliance determination.""" from __future__ import annotations import json import re from datetime import datetime from google import genai from google.genai import types from config import ANALYZER_MODEL, GOOGLE_API_KEY from prompts.compliance_analyst import COMPLIANCE_ANALYST_SYSTEM_PROMPT from state import AgentMessage, CodeQuery, ComplianceState, CropTask from tools.image_store import ImageStore def compliance_analyst(state: ComplianceState, image_store: ImageStore) -> dict: """Review all cropped images AND code sections to produce compliance findings.""" question = state["question"] image_refs = state.get("image_refs", []) code_report = state.get("code_report", "") legend_pages = set(state.get("legend_pages", [])) investigation_round = state.get("investigation_round", 0) discussion_log = state.get("discussion_log", []) client = genai.Client(api_key=GOOGLE_API_KEY) # Build multimodal content content_parts: list[types.Part] = [] # 1. User question content_parts.append(types.Part.from_text(text=f"USER COMPLIANCE QUESTION: {question}")) # 2. Code report (legal requirements) if code_report: content_parts.append( types.Part.from_text( text=f"\n=== LEGAL REQUIREMENTS FROM NYC CODE ===\n{code_report}" ) ) else: content_parts.append( types.Part.from_text(text="\n=== NO CODE SECTIONS RETRIEVED ===\n") ) # 3. Discussion log summary (what previous agents found) if discussion_log: log_summary = "\n".join( f"[{m['timestamp']}] {m['agent']}: {m['summary']}" for m in discussion_log[-10:] # Last 10 messages ) content_parts.append( types.Part.from_text( text=f"\n=== AGENT DISCUSSION LOG ===\n{log_summary}" ) ) # 4. Images — legends first, then detail crops, then annotated legend_refs = [r for r in image_refs if r["page_num"] in legend_pages and r["crop_type"] == "crop"] detail_crops = [r for r in image_refs if r["page_num"] not in legend_pages and r["crop_type"] == "crop"] annotated_refs = [r for r in image_refs if r["crop_type"] == "annotated"] ordered_refs = legend_refs + detail_crops + annotated_refs if legend_refs: content_parts.append( types.Part.from_text(text="\n=== LEGEND / SCHEDULE CROPS (study these first) ===") ) first_detail_id = detail_crops[0]["id"] if detail_crops else None first_annotated_id = annotated_refs[0]["id"] if annotated_refs else None for ref in ordered_refs: if first_detail_id is not None and ref["id"] == first_detail_id: content_parts.append(types.Part.from_text(text="\n=== DETAIL CROPS ===")) if first_annotated_id is not None and ref["id"] == first_annotated_id: content_parts.append( types.Part.from_text(text="\n=== ANNOTATED CROPS (highlighted versions) ===") ) content_parts.append(types.Part.from_text(text=f"\nImage: {ref['label']}")) try: content_parts.append(image_store.to_gemini_part(ref)) except Exception as e: content_parts.append( types.Part.from_text(text=f"(Could not load image: {e})") ) # 5. Investigation round context content_parts.append( types.Part.from_text( text=( f"\nThis is investigation round {investigation_round + 1}. " "Analyze the drawings against the code requirements. " "If you need more evidence (crops or code lookups), include a JSON block at the end." ) ) ) # Call Gemini response = client.models.generate_content( model=ANALYZER_MODEL, contents=[types.Content(role="user", parts=content_parts)], config=types.GenerateContentConfig( system_instruction=COMPLIANCE_ANALYST_SYSTEM_PROMPT, ), ) analysis_text = response.text # Parse additional investigation requests needs_more = False additional_crops: list[CropTask] = [] additional_code_queries: list[CodeQuery] = [] json_match = re.search( r"```json\s*(\{.*?\"needs_more\"\s*:\s*true.*?\})\s*```", analysis_text, re.DOTALL, ) if json_match: try: extra = json.loads(json_match.group(1)) if extra.get("needs_more"): needs_more = True for t in extra.get("additional_crops", []): raw_page = int(t.get("page_num", 1)) additional_crops.append( CropTask( page_num=raw_page - 1, crop_instruction=t.get("crop_instruction", ""), annotate=bool(t.get("annotate", False)), annotation_prompt=t.get("annotation_prompt", ""), label=t.get("label", "Additional crop"), priority=int(t.get("priority", 1)), ) ) for q in extra.get("additional_code_queries", []): additional_code_queries.append( CodeQuery( query=q.get("query", ""), focus_area=q.get("focus_area", ""), context=q.get("context", ""), priority=int(q.get("priority", 0)), ) ) except (json.JSONDecodeError, KeyError): pass # Clean the JSON block from the analysis text analysis_text = analysis_text[: json_match.start()].strip() # Build discussion message if needs_more: summary = ( f"Round {investigation_round + 1} analysis complete. " f"Requesting {len(additional_crops)} more crops and " f"{len(additional_code_queries)} more code lookups." ) else: summary = f"Round {investigation_round + 1} compliance analysis complete." discussion_msg = AgentMessage( timestamp=datetime.now().strftime("%H:%M:%S"), agent="compliance_analyst", action="analyze" if not needs_more else "request_more", summary=summary, detail=analysis_text[:1500], evidence_refs=[ref["id"] for ref in image_refs[:5]], ) result: dict = { "compliance_analysis": analysis_text, "investigation_round": investigation_round + 1, "needs_more_investigation": needs_more, "discussion_log": [discussion_msg], "status_message": [summary], } if additional_crops: result["crop_tasks"] = additional_crops result["additional_crop_tasks"] = additional_crops if additional_code_queries: result["additional_code_queries"] = additional_code_queries return result