Updated_code_complaince / nodes /compliance_analyst.py
Ryan2219's picture
Upload 70 files
e1ced8e verified
"""compliance_analyst node — multimodal fusion of images + code for compliance determination."""
from __future__ import annotations
import json
import re
from datetime import datetime
from google import genai
from google.genai import types
from config import ANALYZER_MODEL, GOOGLE_API_KEY
from prompts.compliance_analyst import COMPLIANCE_ANALYST_SYSTEM_PROMPT
from state import AgentMessage, CodeQuery, ComplianceState, CropTask
from tools.image_store import ImageStore
def compliance_analyst(state: ComplianceState, image_store: ImageStore) -> dict:
"""Review all cropped images AND code sections to produce compliance findings."""
question = state["question"]
image_refs = state.get("image_refs", [])
code_report = state.get("code_report", "")
legend_pages = set(state.get("legend_pages", []))
investigation_round = state.get("investigation_round", 0)
discussion_log = state.get("discussion_log", [])
client = genai.Client(api_key=GOOGLE_API_KEY)
# Build multimodal content
content_parts: list[types.Part] = []
# 1. User question
content_parts.append(types.Part.from_text(text=f"USER COMPLIANCE QUESTION: {question}"))
# 2. Code report (legal requirements)
if code_report:
content_parts.append(
types.Part.from_text(
text=f"\n=== LEGAL REQUIREMENTS FROM NYC CODE ===\n{code_report}"
)
)
else:
content_parts.append(
types.Part.from_text(text="\n=== NO CODE SECTIONS RETRIEVED ===\n")
)
# 3. Discussion log summary (what previous agents found)
if discussion_log:
log_summary = "\n".join(
f"[{m['timestamp']}] {m['agent']}: {m['summary']}"
for m in discussion_log[-10:] # Last 10 messages
)
content_parts.append(
types.Part.from_text(
text=f"\n=== AGENT DISCUSSION LOG ===\n{log_summary}"
)
)
# 4. Images — legends first, then detail crops, then annotated
legend_refs = [r for r in image_refs if r["page_num"] in legend_pages and r["crop_type"] == "crop"]
detail_crops = [r for r in image_refs if r["page_num"] not in legend_pages and r["crop_type"] == "crop"]
annotated_refs = [r for r in image_refs if r["crop_type"] == "annotated"]
ordered_refs = legend_refs + detail_crops + annotated_refs
if legend_refs:
content_parts.append(
types.Part.from_text(text="\n=== LEGEND / SCHEDULE CROPS (study these first) ===")
)
first_detail_id = detail_crops[0]["id"] if detail_crops else None
first_annotated_id = annotated_refs[0]["id"] if annotated_refs else None
for ref in ordered_refs:
if first_detail_id is not None and ref["id"] == first_detail_id:
content_parts.append(types.Part.from_text(text="\n=== DETAIL CROPS ==="))
if first_annotated_id is not None and ref["id"] == first_annotated_id:
content_parts.append(
types.Part.from_text(text="\n=== ANNOTATED CROPS (highlighted versions) ===")
)
content_parts.append(types.Part.from_text(text=f"\nImage: {ref['label']}"))
try:
content_parts.append(image_store.to_gemini_part(ref))
except Exception as e:
content_parts.append(
types.Part.from_text(text=f"(Could not load image: {e})")
)
# 5. Investigation round context
content_parts.append(
types.Part.from_text(
text=(
f"\nThis is investigation round {investigation_round + 1}. "
"Analyze the drawings against the code requirements. "
"If you need more evidence (crops or code lookups), include a JSON block at the end."
)
)
)
# Call Gemini
response = client.models.generate_content(
model=ANALYZER_MODEL,
contents=[types.Content(role="user", parts=content_parts)],
config=types.GenerateContentConfig(
system_instruction=COMPLIANCE_ANALYST_SYSTEM_PROMPT,
),
)
analysis_text = response.text
# Parse additional investigation requests
needs_more = False
additional_crops: list[CropTask] = []
additional_code_queries: list[CodeQuery] = []
json_match = re.search(
r"```json\s*(\{.*?\"needs_more\"\s*:\s*true.*?\})\s*```",
analysis_text,
re.DOTALL,
)
if json_match:
try:
extra = json.loads(json_match.group(1))
if extra.get("needs_more"):
needs_more = True
for t in extra.get("additional_crops", []):
raw_page = int(t.get("page_num", 1))
additional_crops.append(
CropTask(
page_num=raw_page - 1,
crop_instruction=t.get("crop_instruction", ""),
annotate=bool(t.get("annotate", False)),
annotation_prompt=t.get("annotation_prompt", ""),
label=t.get("label", "Additional crop"),
priority=int(t.get("priority", 1)),
)
)
for q in extra.get("additional_code_queries", []):
additional_code_queries.append(
CodeQuery(
query=q.get("query", ""),
focus_area=q.get("focus_area", ""),
context=q.get("context", ""),
priority=int(q.get("priority", 0)),
)
)
except (json.JSONDecodeError, KeyError):
pass
# Clean the JSON block from the analysis text
analysis_text = analysis_text[: json_match.start()].strip()
# Build discussion message
if needs_more:
summary = (
f"Round {investigation_round + 1} analysis complete. "
f"Requesting {len(additional_crops)} more crops and "
f"{len(additional_code_queries)} more code lookups."
)
else:
summary = f"Round {investigation_round + 1} compliance analysis complete."
discussion_msg = AgentMessage(
timestamp=datetime.now().strftime("%H:%M:%S"),
agent="compliance_analyst",
action="analyze" if not needs_more else "request_more",
summary=summary,
detail=analysis_text[:1500],
evidence_refs=[ref["id"] for ref in image_refs[:5]],
)
result: dict = {
"compliance_analysis": analysis_text,
"investigation_round": investigation_round + 1,
"needs_more_investigation": needs_more,
"discussion_log": [discussion_msg],
"status_message": [summary],
}
if additional_crops:
result["crop_tasks"] = additional_crops
result["additional_crop_tasks"] = additional_crops
if additional_code_queries:
result["additional_code_queries"] = additional_code_queries
return result