| | """compliance_analyst node — multimodal fusion of images + code for compliance determination."""
|
| | from __future__ import annotations
|
| |
|
| | import json
|
| | import re
|
| | from datetime import datetime
|
| |
|
| | from google import genai
|
| | from google.genai import types
|
| |
|
| | from config import ANALYZER_MODEL, GOOGLE_API_KEY
|
| | from prompts.compliance_analyst import COMPLIANCE_ANALYST_SYSTEM_PROMPT
|
| | from state import AgentMessage, CodeQuery, ComplianceState, CropTask
|
| | from tools.image_store import ImageStore
|
| |
|
| |
|
| | def compliance_analyst(state: ComplianceState, image_store: ImageStore) -> dict:
|
| | """Review all cropped images AND code sections to produce compliance findings."""
|
| | question = state["question"]
|
| | image_refs = state.get("image_refs", [])
|
| | code_report = state.get("code_report", "")
|
| | legend_pages = set(state.get("legend_pages", []))
|
| | investigation_round = state.get("investigation_round", 0)
|
| | discussion_log = state.get("discussion_log", [])
|
| |
|
| | client = genai.Client(api_key=GOOGLE_API_KEY)
|
| |
|
| |
|
| | content_parts: list[types.Part] = []
|
| |
|
| |
|
| | content_parts.append(types.Part.from_text(text=f"USER COMPLIANCE QUESTION: {question}"))
|
| |
|
| |
|
| | if code_report:
|
| | content_parts.append(
|
| | types.Part.from_text(
|
| | text=f"\n=== LEGAL REQUIREMENTS FROM NYC CODE ===\n{code_report}"
|
| | )
|
| | )
|
| | else:
|
| | content_parts.append(
|
| | types.Part.from_text(text="\n=== NO CODE SECTIONS RETRIEVED ===\n")
|
| | )
|
| |
|
| |
|
| | if discussion_log:
|
| | log_summary = "\n".join(
|
| | f"[{m['timestamp']}] {m['agent']}: {m['summary']}"
|
| | for m in discussion_log[-10:]
|
| | )
|
| | content_parts.append(
|
| | types.Part.from_text(
|
| | text=f"\n=== AGENT DISCUSSION LOG ===\n{log_summary}"
|
| | )
|
| | )
|
| |
|
| |
|
| | legend_refs = [r for r in image_refs if r["page_num"] in legend_pages and r["crop_type"] == "crop"]
|
| | detail_crops = [r for r in image_refs if r["page_num"] not in legend_pages and r["crop_type"] == "crop"]
|
| | annotated_refs = [r for r in image_refs if r["crop_type"] == "annotated"]
|
| |
|
| | ordered_refs = legend_refs + detail_crops + annotated_refs
|
| |
|
| | if legend_refs:
|
| | content_parts.append(
|
| | types.Part.from_text(text="\n=== LEGEND / SCHEDULE CROPS (study these first) ===")
|
| | )
|
| |
|
| | first_detail_id = detail_crops[0]["id"] if detail_crops else None
|
| | first_annotated_id = annotated_refs[0]["id"] if annotated_refs else None
|
| |
|
| | for ref in ordered_refs:
|
| | if first_detail_id is not None and ref["id"] == first_detail_id:
|
| | content_parts.append(types.Part.from_text(text="\n=== DETAIL CROPS ==="))
|
| | if first_annotated_id is not None and ref["id"] == first_annotated_id:
|
| | content_parts.append(
|
| | types.Part.from_text(text="\n=== ANNOTATED CROPS (highlighted versions) ===")
|
| | )
|
| |
|
| | content_parts.append(types.Part.from_text(text=f"\nImage: {ref['label']}"))
|
| | try:
|
| | content_parts.append(image_store.to_gemini_part(ref))
|
| | except Exception as e:
|
| | content_parts.append(
|
| | types.Part.from_text(text=f"(Could not load image: {e})")
|
| | )
|
| |
|
| |
|
| | content_parts.append(
|
| | types.Part.from_text(
|
| | text=(
|
| | f"\nThis is investigation round {investigation_round + 1}. "
|
| | "Analyze the drawings against the code requirements. "
|
| | "If you need more evidence (crops or code lookups), include a JSON block at the end."
|
| | )
|
| | )
|
| | )
|
| |
|
| |
|
| | response = client.models.generate_content(
|
| | model=ANALYZER_MODEL,
|
| | contents=[types.Content(role="user", parts=content_parts)],
|
| | config=types.GenerateContentConfig(
|
| | system_instruction=COMPLIANCE_ANALYST_SYSTEM_PROMPT,
|
| | ),
|
| | )
|
| |
|
| | analysis_text = response.text
|
| |
|
| |
|
| | needs_more = False
|
| | additional_crops: list[CropTask] = []
|
| | additional_code_queries: list[CodeQuery] = []
|
| |
|
| | json_match = re.search(
|
| | r"```json\s*(\{.*?\"needs_more\"\s*:\s*true.*?\})\s*```",
|
| | analysis_text,
|
| | re.DOTALL,
|
| | )
|
| | if json_match:
|
| | try:
|
| | extra = json.loads(json_match.group(1))
|
| | if extra.get("needs_more"):
|
| | needs_more = True
|
| |
|
| | for t in extra.get("additional_crops", []):
|
| | raw_page = int(t.get("page_num", 1))
|
| | additional_crops.append(
|
| | CropTask(
|
| | page_num=raw_page - 1,
|
| | crop_instruction=t.get("crop_instruction", ""),
|
| | annotate=bool(t.get("annotate", False)),
|
| | annotation_prompt=t.get("annotation_prompt", ""),
|
| | label=t.get("label", "Additional crop"),
|
| | priority=int(t.get("priority", 1)),
|
| | )
|
| | )
|
| |
|
| | for q in extra.get("additional_code_queries", []):
|
| | additional_code_queries.append(
|
| | CodeQuery(
|
| | query=q.get("query", ""),
|
| | focus_area=q.get("focus_area", ""),
|
| | context=q.get("context", ""),
|
| | priority=int(q.get("priority", 0)),
|
| | )
|
| | )
|
| | except (json.JSONDecodeError, KeyError):
|
| | pass
|
| |
|
| |
|
| | analysis_text = analysis_text[: json_match.start()].strip()
|
| |
|
| |
|
| | if needs_more:
|
| | summary = (
|
| | f"Round {investigation_round + 1} analysis complete. "
|
| | f"Requesting {len(additional_crops)} more crops and "
|
| | f"{len(additional_code_queries)} more code lookups."
|
| | )
|
| | else:
|
| | summary = f"Round {investigation_round + 1} compliance analysis complete."
|
| |
|
| | discussion_msg = AgentMessage(
|
| | timestamp=datetime.now().strftime("%H:%M:%S"),
|
| | agent="compliance_analyst",
|
| | action="analyze" if not needs_more else "request_more",
|
| | summary=summary,
|
| | detail=analysis_text[:1500],
|
| | evidence_refs=[ref["id"] for ref in image_refs[:5]],
|
| | )
|
| |
|
| | result: dict = {
|
| | "compliance_analysis": analysis_text,
|
| | "investigation_round": investigation_round + 1,
|
| | "needs_more_investigation": needs_more,
|
| | "discussion_log": [discussion_msg],
|
| | "status_message": [summary],
|
| | }
|
| |
|
| | if additional_crops:
|
| | result["crop_tasks"] = additional_crops
|
| | result["additional_crop_tasks"] = additional_crops
|
| | if additional_code_queries:
|
| | result["additional_code_queries"] = additional_code_queries
|
| |
|
| | return result
|
| |
|