Ryan2219's picture
Update app.py
b6c5b89 verified
import gradio as gr
import json, time
import os
import re
import pandas as pd
from google import genai
from google.genai import types
import chromadb
from chromadb.utils import embedding_functions
from collections import Counter
import base64
import io
from PIL import Image
import matplotlib.pyplot as plt
import openai
from datetime import datetime
import threading
from huggingface_hub import hf_hub_download, HfApi
from huggingface_hub.utils import EntryNotFoundError
USAGE_DATASET_REPO = os.environ.get("USAGE_DATASET_REPO", "NYSERDA-CRE-Working-Group/nyserda_demo_useage_store")
USAGE_FILENAME = os.environ.get("USAGE_FILENAME", "usage.csv")
MAX_RUNS_PER_USER = int(os.environ.get("MAX_RUNS_PER_USER", "10"))
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["GEMINI_API_KEY"] = os.getenv("GEMINI_API_KEY")
HF_TOKEN = os.environ.get("HF_TOKEN")
api = HfApi(token=HF_TOKEN)
def user_id_from_profile(profile: gr.OAuthProfile | None) -> str | None:
if profile is None:
return None
# You said profile.name exists; normalize it.
# If you later can access preferred_username, use that instead (more unique).
uid = getattr(profile, "name", None)
if not uid:
return None
return uid.strip().lower()
def _load_usage_df() -> pd.DataFrame:
try:
local_path = hf_hub_download(
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
filename=USAGE_FILENAME,
token=HF_TOKEN,
)
return pd.read_csv(local_path)
except EntryNotFoundError:
# First run: create empty table
return pd.DataFrame(columns=["user_id", "runs", "first_seen", "last_seen"])
def _save_usage_df(df: pd.DataFrame, commit_message: str) -> None:
tmp_path = "/tmp/usage.csv"
df.to_csv(tmp_path, index=False)
api.upload_file(
path_or_fileobj=tmp_path,
path_in_repo=USAGE_FILENAME,
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
commit_message=commit_message,
)
def check_and_increment_quota(user_id: str) -> tuple[bool, int]:
now = int(time.time())
df = _load_usage_df()
if df.empty or (df["user_id"] == user_id).sum() == 0:
runs = 0
if runs >= MAX_RUNS_PER_USER:
return False, 0
new_row = {
"user_id": user_id,
"runs": 1,
"first_seen": now,
"last_seen": now,
}
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
_save_usage_df(df, commit_message=f"usage: increment {user_id} to 1")
return True, MAX_RUNS_PER_USER - 1
idx = df.index[df["user_id"] == user_id][0]
runs = int(df.loc[idx, "runs"])
if runs >= MAX_RUNS_PER_USER:
return False, 0
runs += 1
df.loc[idx, "runs"] = runs
df.loc[idx, "last_seen"] = now
_save_usage_df(df, commit_message=f"usage: increment {user_id} to {runs}")
return True, MAX_RUNS_PER_USER - runs
# Global state for the interface
class InterfaceState:
def __init__(self):
self.log_messages = []
self.analysis_messages = []
self.current_chapter = ""
self.current_images = []
self.staged_audit_images = []
self.final_answer = ""
self.done = False
self.lock = threading.Lock()
def add_log(self, message):
timestamp = datetime.now().strftime("%H:%M:%S")
with self.lock:
self.log_messages.append(f"**[{timestamp}]** {message}")
return "\n\n".join(self.log_messages)
def add_analysis(self, message):
timestamp = datetime.now().strftime("%H:%M:%S")
with self.lock:
self.analysis_messages.append(f"**[{timestamp}]** {message}")
return "\n\n".join(self.analysis_messages)
def set_chapter(self, chapter_text):
with self.lock:
self.current_chapter = chapter_text
return chapter_text
def add_image(self, img_pil):
with self.lock:
self.current_images.append(img_pil)
return self.current_images.copy()
def add_staged_image_part(self, image_part):
"""Thread-safe method to stage images for the Gemini Audit."""
with self.lock:
self.staged_audit_images.append(image_part)
# Log it so we can verify it happened in the console
print(f"DEBUG: Staged image part. Total staged: {len(self.staged_audit_images)}")
def get_staged_images(self):
"""Safely retrieve the staged images for the audit turn."""
with self.lock:
return list(self.staged_audit_images) # Return a copy to prevent mutation
def clear(self):
with self.lock:
self.log_messages.clear()
self.analysis_messages.clear()
self.current_chapter = ""
self.current_images.clear()
self.final_answer = ""
self.done = False
state = InterfaceState()
# Load your data (same as original)
with open('Preprocessed Files/page_metadata.json', 'r') as json_file:
page_metadata = json.load(json_file)
page_metadata = {int(k): v for k, v in page_metadata.items()}
with open('Preprocessed Files/text_list.json', 'r') as json_file:
text_list = json.load(json_file)
with open('Preprocessed Files/tile_metadata.json', 'r') as json_file:
tile_metadata = json.load(json_file)
tile_metadata = {
int(outer_k): {
int(inner_k): inner_v
for inner_k, inner_v in outer_v.items()
}
for outer_k, outer_v in tile_metadata.items()
}
def load_fullpage_images(folder="Images"):
files = os.listdir(folder)
page_files = []
for f in files:
match = re.search(r"page_(\d+)_fullpage\.png", f)
if match:
page_num = int(match.group(1))
page_files.append((page_num, f))
page_files.sort(key=lambda x: x[0])
image_bytes_list = []
for page_num, filename in page_files:
path = os.path.join(folder, filename)
with open(path, "rb") as f:
img_bytes = f.read()
image_bytes_list.append(img_bytes)
return image_bytes_list
def load_tile_images(page):
files = os.listdir('Tiles')
page_files = []
for f in files:
match = re.search(f"page_{page}_tile_(\d+)\.png", f)
if match:
page_num = int(match.group(1))
page_files.append((page_num, f))
page_files.sort(key=lambda x: x[0])
image_bytes_list = []
for page_num, filename in page_files:
path = os.path.join('Tiles', filename)
with open(path, "rb") as f:
img_bytes = f.read()
image_bytes_list.append(img_bytes)
return image_bytes_list
image_bytes_list = load_fullpage_images()
tile_bytes = {}
for page in range(44):
tile_list = load_tile_images(page)
if tile_list:
tile_bytes[page] = load_tile_images(page)
# Vector Code Base
chroma_client = chromadb.PersistentClient(path="nyc_code_db")
embedding_model = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")
collection = chroma_client.get_collection(name="nyc_building_codes", embedding_function=embedding_model)
all_pending_images = []
# Modified tool functions with Gradio updates
def search_page_text(page_number: int, research_goal: str):
state.add_log(f'🔍 Searching page **{page_metadata[page_number]["sheet_title"]}** for details')
state.add_analysis(
f'🔍 Searching page {page_metadata[page_number]["sheet_title"]} with prompt\n{research_goal}'
)
raw_text = text_list[page_number]
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-5-mini",
messages=[
{"role": "system", "content": """
You are a Fast NYC Plans Examiner Signal Agent.
Your ONLY job is to extract **code-relevant signals** from the OCR text of a SINGLE drawing page.
You do NOT interpret the law and you do NOT summarize design intent.
Your output will be used to CONSTRAIN a downstream legal research agent.
========================
WHAT TO EXTRACT
========================
Look only for information that determines which parts of the NYC Code apply such as:
- Occupancy classification (e.g., R-2, A-3, M, S, F, mixed-use)
- Building height (stories, feet, high-rise indicators)
- Construction type (I, II, III, IV, V)
- Fire protection systems (sprinklers, standpipes, fire alarm, smoke control)
- Means of egress references (stairs, exits, exit access, doors, corridors)
- Structural system hints (steel, concrete, load-bearing walls, columns, transfer girders)
- Mechanical / fuel / plumbing system mentions (boilers, gas piping, HVAC type, shafts)
- Zoning or special district references (if present)
- Scope flags (new building, alteration, addition, change of occupancy, retrofit)
However only return relevant signals to the provided research goal.
========================
OUTPUT FORMAT (STRICT MARKDOWN)
========================
Return ONLY the following sections:
### Code-Relevant Signals
- Bullet list of extracted facts
### Likely Governing Code Domains
- One-line list chosen from: Administrative, Building, Mechanical, FuelGas, Plumbing, Fire
### Text Evidence
- Short quoted snippets from the page that support each signal
========================
RULES
========================
- Do NOT speculate
- If a signal is not present, omit it
- Prefer exact phrases over paraphrase
- Keep total length under 500 words
- No legal conclusions, no compliance advice
"""},
{"role": "user", "content": f"PAGE TEXT:\n{raw_text}\n\nRESEARCH GOAL: {research_goal}\n\nReturn a breif but comprehensive Markdown summary of your findings and justification with text snippets."}
]
)
analysis_text = response.choices[0].message.content
state.add_analysis(
f"🟦 Text Analyst (Page {page_number})\n{analysis_text}"
)
return {
"page": page_number,
"summary": analysis_text
}
def discover_code_locations(query: str):
state.add_log(f'📚 Searching NYC Code for: **{query}**')
results = collection.query(
query_texts=[query],
n_results=25,
include=["metadatas", "documents"]
)
if not results['metadatas'][0]:
return "No results found. Try a different technical keyword."
metas = results['metadatas'][0]
docs = results['documents'][0]
category_chapter_pairs = [f"{m['code_type']} | Ch. {m['parent_major']}" for m in metas]
counts = Counter(category_chapter_pairs)
chapter_summary = "\n".join([f"- {pair} ({count} hits)" for pair, count in counts.most_common(5)])
section_reports = []
for m, doc in zip(metas, docs):
report = (
f"ID: {m['section_full']} | Code: {m['code_type']} | Chapter: {m['parent_major']}\n"
f"Snippet: {doc}"
)
section_reports.append(report)
output = (
"### CODE DISCOVERY REPORT ###\n"
f"MOST RELEVANT CHAPTERS:\n{chapter_summary}\n\n"
"TOP RELEVANT SECTIONS:\n" +
"\n---\n".join(section_reports)
)
return output
def fetch_full_chapter(code_type: str, chapter_id: str):
state.add_log(f'📖 Fetching Chapter **{chapter_id}** from **{code_type}** code')
try:
chapter_data = collection.get(
where={
"$and": [
{"code_type": {"$eq": code_type}},
{"parent_major": {"$eq": chapter_id}}
]
},
include=["documents", "metadatas"]
)
if not chapter_data['documents']:
return f"No documentation found for {code_type} Chapter {chapter_id}."
sections = sorted(zip(chapter_data['metadatas'], chapter_data['documents']),
key=lambda x: x[0]['section_full'])
full_text = f"## FULL LEGAL TEXT: {code_type.upper()} CODE - CHAPTER {chapter_id}\n\n"
for meta, doc in sections:
blocks = doc.split("[CONT.]:")
unique_blocks = []
for b in blocks:
clean_b = b.strip()
if clean_b and clean_b not in unique_blocks:
unique_blocks.append(clean_b)
clean_doc = " ".join(unique_blocks)
full_text += f"### SECTION {meta['section_full']}\n{clean_doc}\n\n---\n\n"
# Update the chapter display
state.set_chapter(full_text)
return full_text
except Exception as e:
return f"Error retrieving chapter content: {str(e)}"
def nyc_legal_sub_agent(research_goal: str):
state.add_log(f'⚖️ Investigating NYC Code for: **{research_goal}**')
state.add_analysis(
f"⚖️ Legal Analyst is searching\n{research_goal}"
)
client = openai.OpenAI()
internal_tools = [
{
"type": "function",
"function": {
"name": "discover_code_locations",
"description": "Scans NYC code in a semantic vector database. Use this FIRST to find which chapters/sections are relevant.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "semantic search string for a vector database (Not a keyword search use a full sentence)"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "fetch_full_chapter",
"description": "Retrieves the full legal text of a specific chapter for deep analysis.",
"parameters": {
"type": "object",
"properties": {
"code_type": {
"type": "string",
"enum": ["Administrative", "Building", "FuelGas", "Mechanical", "Plumbing"],
"description": "The specific NYC code volume to search."
},
"chapter_id": {"type": "string", "description": "The chapter number string"}
},
"required": ["code_type", "chapter_id"]
}
}
}
]
messages = [
{"role": "system", "content": """
You are a Senior NYC Building Code Consultant and Legal Research Agent.
Your task is to produce a **definitive, citation-backed legal report** that can be used directly by a downstream orchestration agent.
Accuracy, traceability, and completeness matter more than brevity.
========================
PRIMARY OBJECTIVE
========================
Given a research goal, identify and analyze relevant NYC Code provisions, including:
- Governing sections
- Exceptions
- Cross-references
- Related chapters that modify, limit, or expand the rule
Every legal claim MUST be supported by a specific code citation.
You are operating in FAST LEGAL MODE.
SEARCH BUDGET:
- Maximum of 2 calls to `discover_code_locations`
- Maximum of 2 calls to `fetch_full_chapter`
STOP CONDITIONS:
- If the first chapter fetch contains governing text AND exceptions, STOP and synthesize.
- Only fetch a second chapter if the first chapter explicitly cross-references another chapter.
PRIORITY ORDER:
1) Governing rule section
2) Exceptions
3) Cross-references that MODIFY the rule
Ignore definitions and administrative content unless directly referenced.
GOOD ENOUGH STANDARD:
If you can identify:
- The governing section
- At least one exception or limitation
You must STOP and report.
========================
TOOL STRATEGY (MANDATORY)
========================
This is a semantic vector database, NOT a keyword index. Always search in full English questions.
1) FIRST — Call `discover_code_locations`
- Use a natural-language query describing the legal requirement you are trying to find
- Example: "What NYC Building Code sections regulate emergency egress width in residential buildings"
NEVER use a keyword search thi will not work you are searching a vector database.
If you know what chaoter you need call the fetch_full_chapter tool instead.
If you perform TWO consecutive `discover_code_locations` calls
and both return no new relevant chapters or sections:
You MUST stop searching and do one of the following:
- Conclude that the table/section does NOT exist as a standalone provision in the NYC Code corpus, OR
- Conclude that the requirement is embedded within the previously retrieved sections
Then proceed to report findings using the closest governing section.
DO NOT continue reformulating the same query.
You MUST NOT call `discover_code_locations` more than once for the same legal concept.
If a new query is semantically similar to a prior query, STOP and move forward with analysis.
2) SECOND — Call `fetch_full_chapter`
- If multiple relevant sections appear in the same chapter
- OR if a section contains exceptions, references, or conditional language
- OR if you know what section of the code is relevant and want to see a full chapter
3) THIRD — Follow Cross-References
- If a section says "See Section X", "As required by Chapter Y", or "Except as permitted in..."
- You MUST search and retrieve those sections as well
4) STOP ONLY WHEN
- All exceptions are reviewed
- All cross-references are resolved
- No additional modifying sections remain
========================
OUTPUT FORMAT (STRICT)
========================
Return a structured legal report in the following format:
### Legal Summary
Brief, plain-language explanation of what the code requires.
### Governing Code Sections
- **[Code Type] §[Section Number] — [Title]**
- Summary:
- Key Requirements:
- Applicability Conditions:
- Exceptions:
### Cross-References Analyzed
- **§[Section Number] — [Title]**
- Why It Matters:
- Impact on Main Rule:
### Edge Cases & Enforcement Notes
- Special conditions (building type, occupancy class, height, system type, jurisdictional notes)
- Common misinterpretations
- DOB or FDNY enforcement implications (if relevant)
### Compliance Checklist
- Bullet list of actionable compliance steps derived from the code
========================
QUALITY RULES
========================
- NEVER summarize without citing
- NEVER assume jurisdiction, building type, or occupancy unless the code explicitly states it
- If legal text is ambiguous, flag it as **Interpretive**
- Prefer quoting short legal phrases when clarity matters
========================
TONE
========================
Professional. Precise. Legal-research quality. No speculation.
"""},
{"role": "user", "content": f"Analyze the NYC building code with this goal: {research_goal}"}
]
for _ in range(20):
response = client.chat.completions.create(
model="gpt-5-mini",
messages=messages,
tools=internal_tools,
tool_choice="auto"
)
msg = response.choices[0].message
messages.append(msg)
if not msg.tool_calls:
break
for tool_call in msg.tool_calls:
func_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
if func_name == "discover_code_locations":
result = discover_code_locations(args['query'])
elif func_name == "fetch_full_chapter":
result = fetch_full_chapter(args['code_type'], args['chapter_id'])
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
state.add_analysis(
f"🟨 Legal Analyst\n{msg.content}"
)
return msg.content
def merge_tiles(tile_indexes: list[int], page_num: int):
state.add_log(f'🔬 Stitching tiles **{tile_indexes}** from page **{page_num}**')
images = []
positions = []
tiles = tile_bytes[page_num]
tiles_coords_dict = tile_metadata[page_num]
for index in tile_indexes:
if index < 0 or index >= len(tiles):
raise ValueError(f"Tile index {index} out of range")
img_bytes = tiles[index]
if img_bytes is None:
raise ValueError(f"No image bytes found for tile {index}")
img = Image.open(io.BytesIO(img_bytes)).convert('RGBA')
images.append(img)
x = tiles_coords_dict[index]['coords'][0]
y = tiles_coords_dict[index]['coords'][1]
positions.append((x, y))
if not images:
return None
min_x = min(x for x, y in positions)
min_y = min(y for x, y in positions)
normalized_positions = [(x - min_x, y - min_y) for x, y in positions]
total_width = max(pos[0] + img.width for pos, img in zip(normalized_positions, images))
total_height = max(pos[1] + img.height for pos, img in zip(normalized_positions, images))
stitched_image = Image.new('RGB', (total_width, total_height), (255, 255, 255))
for img, pos in zip(images, normalized_positions):
stitched_image.paste(img, pos)
# Add to image gallery
output_buffer = io.BytesIO()
stitched_image.save(output_buffer, format='PNG')
stitched_bytes = output_buffer.getvalue()
return stitched_bytes
def extract_json(s: str):
s = s.strip()
start = s.find("{")
end = s.rfind("}")
if start == -1 or end == -1 or end < start:
raise ValueError("No JSON object found in model output:\n" + repr(s))
json_str = s[start:end+1]
return json.loads(json_str)
def sanitize_tile_indices(data):
"""
Forcefully converts various LLM outputs into a clean list of integers.
Handles: [1, 2], ["1", "2"], "1, 2, 3", "[1, 2, 3]", and None.
"""
if not data:
return []
# If it's already a list, ensure all elements are integers
if isinstance(data, list):
clean_list = []
for item in data:
try:
# This handles strings inside the list like ["1", "2"]
clean_list.append(int(str(item).strip()))
except (ValueError, TypeError):
continue
return clean_list
# If it's a string, use Regex to find all sequences of digits
if isinstance(data, str):
# findall returns all non-overlapping matches of the pattern
numbers = re.findall(r'\d+', data)
return [int(n) for n in numbers]
return []
def execute_page_expert(expert_instructions: str, page_num: int):
state.add_log(f'👁️ Spawning Page Expert for page **{page_num}**')
state.add_analysis(f"👁️ Page Expert searching for {expert_instructions}")
state.add_log(f'📄 Attaching full-page context for page **{page_num}**')
state.add_analysis(
f"📄 Full-page context attached for page `{page_num}`"
)
full_page_img = Image.open(
io.BytesIO(image_bytes_list[page_num])
)
state.add_image(full_page_img)
client = openai.OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "merge_tiles",
"description": "Stitches high-resolution image tiles together into a single zoomed-in view. Use this to read small text, dimensions, or symbols.",
"parameters": {
"type": "object",
"properties": {
"tile_indexes": {
"type": "array",
"items": {"type": "integer"},
"description": "A list of integer tile IDs from the Grid Map to stitch together."
}
},
"required": ["tile_indexes"]
}
}
}
]
page_text = text_list[page_num]
relevant_tile_meta = tile_metadata[page_num]
b64_full_page = base64.b64encode(image_bytes_list[page_num]).decode()
system_prompt = """
You are a Lead AEC Visual Investigator supporting a Compliance Planner.
Your mission is to extract **verifiable, high-fidelity evidence** from this drawing page.
You must ground every claim in either:
- a **Zoomed Tile Image** (via `merge_tiles`) or
- a **Direct Text Quote** from the OCR page text.
Guesses, assumptions, and general descriptions are not allowed.
========================
MANDATORY WORKFLOW
========================
1) ORIENT
- Review the full-page image and the Grid Map to identify candidate regions.
- Decide which tiles likely contain the required evidence. Utilize the tile metadata to assist with this tasl.
2) ZOOM (REQUIRED)
- You MUST call `merge_tiles(tile_indexes=[...])` before making ANY factual claim about symbols, dimensions, labels, or locations.
- Always request ALL tiles needed in a SINGLE call.
- If the first zoom is insufficient, call again with additional tiles.
- Call the zoom until you have found all relevant tiles, refer to the tile metadata to assist in your search.
3) VERIFY
- Read the zoomed image carefully.
- Extract exact values, tags, room names, and directional cues.
4) REPORT
- Return the Findings Packet in strict JSON format.
========================
WHAT COUNTS AS PROOF
========================
- Dimension values (e.g., “36\"”, “1 HR RATED”)
- Explicit labels (e.g., “EXIT”, “STAIR A”, “R-2”, “COLUMN C3”)
- Symbol legends that define a mark
- Path continuity that can be visually traced across tiles
- OCR text snippets
========================
FINDINGS RULES
========================
- Every bullet in `findings` MUST cite either:
- `[Tile <ID>]` or
- `"Quoted text"`
- If a claim cannot be verified from the zoomed tiles or text, mark it as **Unverified**.
- Be comprehensive in this report, your supervisor only has access to the report you give in findings, not the full page text or other image data you have.
- Do NOT repeat planner instructions — only report what you observe.
========================
VISUAL POINTERS RULES
========================
- Exclude orientation-only or whitespace tiles.
- Include ALL tiles needed to re-trace a path or confirm a relationship.
- **Your superviser will ONLY see the tiles that you reference here, be comprehensive when returning these tiles.**
========================
FULL PAGE USEFULNESS
========================
Set `true` ONLY if the finding requires spatial context across the entire page, or if your zoom is missing information.
(e.g., tracing egress path, riser continuity, system routing).
Otherwise set `false`.
========================
JSON FORMAT (STRICT)
========================
{
"findings": "<markdown string with bullet points and citations>",
"visual_pointers": [list of <int>],
"textual_evidence": ["<exact quotes from PAGE TEXT>"],
"full_page_usefulness": <true|false>,
"limitations": "<what could not be verified and why>"
}
========================
FAILURE CONDITIONS
========================
- If no relevant evidence exists on this page, return:
{
"findings": "No relevant technical evidence found for the planner's instruction.",
"visual_pointers": [],
"textual_evidence": [],
"full_page_usefulness": false,
"limitations": "This page does not contain the requested information or it is not legible at available resolution."
}
Return ONLY valid JSON.
"""
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "text", "text": f"Planner Instruction:\n{expert_instructions}"},
{"type": "text", "text": f"Page Context:\n{page_text}"},
{"type": "text", "text": f"Available Grid Map:\n{json.dumps(relevant_tile_meta)}"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{b64_full_page}"
}
}
]
}
]
MAX_TURNS = 3
for turn in range(MAX_TURNS):
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
tool_choice="auto"
)
msg = response.choices[0].message
messages.append(msg)
if msg.content:
try:
res = extract_json(msg.content)
state.add_analysis(
f"🟨 Page Analyst\n{res.get('findings','')}"
)
raw_pointers = res.get("visual_pointers", [])
tile_idxs = sanitize_tile_indices(raw_pointers)
if tile_idxs and tile_idxs != '[]':
stitched_bytes = merge_tiles(
tile_indexes=tile_idxs,
page_num=page_num
)
state.add_log(f'📸 Staging {len(tile_idxs)} tiles for final audit...')
# Store these to use AFTER the chat finishes
state.add_staged_image_part(
types.Part.from_bytes(
data=stitched_bytes, # <-- 'data=' is required here
mime_type="image/png"
)
)
stitched_img = Image.open(
io.BytesIO(stitched_bytes)
)
state.add_image(stitched_img)
state.add_staged_image_part(
types.Part.from_bytes(
data=image_bytes_list[page_num], # <-- 'data=' is required here
mime_type="image/png"
)
)
return res
except:
pass
if msg.tool_calls:
tool_results = []
image_blocks = []
for call in msg.tool_calls:
if call.function.name == "merge_tiles":
args = json.loads(call.function.arguments)
idxs = args["tile_indexes"]
stitched_bytes = merge_tiles(
tile_indexes=idxs,
page_num=page_num
)
b64_tile = base64.b64encode(stitched_bytes).decode()
tool_results.append({
"role": "tool",
"tool_call_id": call.id,
"content": json.dumps({
"status": "success",
"tiles": idxs
})
})
image_blocks.append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{b64_tile}"
}
}
)
for tool_msg in tool_results:
messages.append(tool_msg)
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Here are the high-resolution zooms you requested. Analyze exits, locations, and any capacity labels."
},
*image_blocks
]
})
continue
messages.append({
"role": "user",
"content": "Return the FINAL JSON now."
})
raise RuntimeError("No FINAL JSON output from Page Expert")
# Set up Gemini planner
tools_list = [search_page_text, nyc_legal_sub_agent, execute_page_expert]
import time
planner = genai.Client()
planner_model = "gemini-3-pro-preview"
planner_prompt = f"""
You are the Lead Architectural Compliance Planner for NYC Building Code and Zoning review.
Your role is to coordinate specialist sub-agents and deliver a **proof-carrying compliance verdict**
based ONLY on:
- OCR-extracted drawing text
- High-resolution visual evidence (tile zooms)
- Official NYC Code citations
You must NOT speculate or rely on architectural norms.
========================
DRAWING INDEX (Page Metadata)
========================
Use this index to select pages for visual inspection.
Avoid irrelevant sheets (e.g., Site, Civil, Utility, Stormwater) unless zoning or site compliance is explicitly required.
{json.dumps(page_metadata)}
========================
SPECIALIST SUB-AGENTS
========================
None of these agents have access to your chat history or internal thought process.
They know only how to access information (text, images or code) and what information you give them in the research goal.
If they need more context or specific instructions YOU MUST PROVIDE IT WHEN CALLING THEM in the research goal.
1) `search_page_text`
Purpose: FAST signal extractor.
Use to identify code-triggering facts:
- Occupancy classification
- Building height / stories / high-rise
- Construction type
- Scope of work (new, alteration, addition, change of occupancy)
- Fire protection systems
Output is used ONLY to constrain legal research.
2) `nyc_legal_sub_agent`
Purpose: Definitive legal authority.
Use to retrieve governing NYC Code sections, exceptions, and cross-references.
Always pass a focused topic derived from Phase 1 signals.
**YOU MAY ONLY CALL THIS TOOL ONCE**
3) `execute_page_expert`
Purpose: High-resolution visual verification.
Use to confirm compliance or non-compliance by zooming tiles.
This agent provides the ONLY acceptable visual proof.
**NEVER CALL THIS TOOL MORE THAN ONCE ON A SINGLE PAGE**
========================
MANDATORY PHASED WORKFLOW
========================
PHASE 1 — SIGNAL EXTRACTION
- Use `search_page_text` on candidate pages to determine:
occupancy, height, construction type, system presence, and scope.
- If signals are missing or ambiguous, expand to additional pages.
- Do NOT proceed until you have enough facts to define legal applicability.
PHASE 2 — LEGAL SCOPING
- Convert Phase 1 signals into a focused legal topic.
- Call `nyc_legal_sub_agent`.
- Extract governing sections, exceptions, and edge cases.
PHASE 3 — VISUAL VERIFICATION
- Identify the SINGLE most relevant page for proof.
- Call `execute_page_expert` with precise instructions tied to legal requirements
(e.g., “Verify exit door clear width at Stair A serving R-2 occupancy”).
- Ensure returned findings include tile IDs and/or text quotes.
PHASE 4 — SYNTHESIS & VERDICT
- Compare visual findings directly against legal requirements.
- Resolve conflicts:
- If legal text and visual evidence disagree → flag as **Non-Compliant or Ambiguous**
- If evidence is missing → flag as **Unverified**
- Cite both:
- NYC Code Section(s)
- Tile ID(s) or OCR quotes
**NEVER CALL THE SAME AGENT FOR THE SAME TASK TWICE REFER TO PREVIOUS ANSWERS WHEN ABLE**
**NEVER CALL THE PAGE EXPERT TWICE ON THE SAME PAGE**
========================
FINAL OUTPUT FORMAT (STRICT MARKDOWN)
========================
### Compliance Verdict
**Status:** Compliant | Non-Compliant | Unverified | Ambiguous
### Legal Basis
- **[Code Type] §[Section] — [Title]**
- Requirement:
- Exceptions Considered:
### Visual Evidence
- Finding: <short statement>
- Proof: [Tile ID(s)] or "Quoted OCR Text"
### Reasoning
- Step-by-step comparison between legal requirement and observed condition
### Limitations
- What could not be verified and why
========================
CONTROL RULES
========================
- NEVER call `nyc_legal_sub_agent` before `search_page_text`
- NEVER issue a final verdict without calling `execute_page_expert`
- If no page contains sufficient proof, return **Unverified**
- Prefer false negatives over false positives
*** CRITICAL VISUAL PROTOCOL ***
- When `execute_page_expert` returns, it will explicitly state "VISUAL_PROOF_PENDING".
- When you see this, your ONLY response must be: "Awaiting visual proof."
- DO NOT attempt to guess the verdict.
- DO NOT complain about missing images.
- Simply wait. The user will immediately send the images in the next turn.
========================
QUALITY STANDARD
========================
This output should be defensible to a DOB plan examiner or legal reviewer.
Every claim must be traceable to law and evidence.
"""
config = types.GenerateContentConfig(
system_instruction=planner_prompt,
tools=tools_list
)
chat = planner.chats.create(model=planner_model, config=config)
def agent_worker(user_question):
state.clear()
state.add_log(f'🚀 Starting analysis for: **{user_question}**')
state.add_analysis("🧠 Planner initialized. Awaiting tool calls...")
# 1. Initialize the Stateful Chat
chat = planner.chats.create(model=planner_model, config=config)
response = chat.send_message(user_question)
# 2. Track images throughout the conversation
# 3. Standard Tool Loop (Phases 1-3)
while response.candidates[0].content.parts[0].function_call:
tool_responses = []
for part in response.candidates[0].content.parts:
if part.function_call:
name = part.function_call.name
args = part.function_call.args
state.add_log(f'🛠️ Tool Call: **{name}**')
func = globals()[name]
result = func(**args)
tool_responses.append(
types.Part.from_function_response(name=name, response={"result": result})
)
# Send tool results back to the stateful chat
response = chat.send_message(tool_responses)
# -----------------------------------------------------------------
# PHASE 4: THE POST-CHAT HANDOFF (The "Visual Audit")
# -----------------------------------------------------------------
# At this point, the while loop has ended.
# 'response.text' contains the model's preliminary answer.
audit_images = state.get_staged_images()
if audit_images:
state.add_log(f"👁️ Preliminary answer received. Performing audit with {len(audit_images)} images...")
# 1. Construct the audit parts
# Ensure 'text=' is used for the Part constructor
audit_parts = [
types.Part.from_text(
text="You have provided a preliminary verdict. Now, look at these images "
"to verify your findings. If the visual evidence contradicts your "
"text-based search, update your verdict now. "
),
*audit_images
]
try:
# 2. Send directly through the 'chat' session
# This automatically appends to history and maintains the session state
final_response = chat.send_message(audit_parts)
state.final_answer = final_response.text
except Exception as e:
# If the above fails, try the explicit message keyword
state.add_log("🔄 Retrying audit with explicit message keyword...")
final_response = chat.send_message(message=audit_parts)
state.final_answer = final_response.text
else:
state.add_log("⚠️ No images found in state. Skipping visual audit.")
state.final_answer = response.text
state.add_log('🏁 **ANALYSIS COMPLETE**')
state.done = True
def run_agentic_workflow(user_question, profile: gr.OAuthProfile | None):
uid = user_id_from_profile(profile)
if uid is None:
raise gr.Error("Please sign in with Hugging Face to use this demo.")
allowed, remaining = check_and_increment_quota(uid)
if not allowed:
raise gr.Error(f"Usage limit reached: {MAX_RUNS_PER_USER} runs per user.")
if remaining <= 2:
gr.Warning(f"⚠️ Only {remaining} run(s) left!")
else:
gr.Info(f"✓ Runs remaining: {remaining}")
state.done = False
state.final_answer = ""
thread = threading.Thread(
target=agent_worker,
args=(user_question,),
daemon=True
)
thread.start()
while not state.done:
with state.lock:
logs = "\n\n".join(state.log_messages)
analysis = "\n\n".join(state.analysis_messages)
chapter = state.current_chapter
images = list(state.current_images)
yield (
logs,
analysis,
chapter,
images,
"*Analysis in progress...*"
)
time.sleep(0.25)
with state.lock:
logs = "\n\n".join(state.log_messages)
analysis = "\n\n".join(state.analysis_messages)
chapter = state.current_chapter
images = list(state.current_images)
final = state.final_answer
yield (
logs,
analysis,
chapter,
images,
final
)
# Build Gradio Interface
with gr.Blocks(title="AEC Compliance Agent") as demo:
gr.LoginButton()
gr.Markdown("# 🏗️ AEC Compliance Analysis Agent")
gr.Markdown("Ask questions about NYC Building Code compliance for your construction drawings.")
with gr.Row():
with gr.Column(scale=1):
question_input = gr.Textbox(
label="Your Question",
placeholder="e.g., Does this building comply with egress requirements for 738 occupants?",
lines=3
)
submit_btn = gr.Button("🔍 Analyze", variant="primary", size="lg")
gr.Markdown("### 📋 Analysis Log")
log_output = gr.Markdown(value="", height=400)
with gr.Column(scale=1):
gr.Markdown("### 🧠 Sub-Agent Analysis")
analysis_output = gr.Markdown(value="", height=600)
with gr.Column(scale=1):
gr.Markdown("### 📖 Code Chapter")
chapter_output = gr.Markdown(value="*No chapter loaded yet*", height=600)
with gr.Row():
gr.Markdown("### 🖼️ Retrieved Images")
with gr.Row():
image_gallery = gr.Gallery(
label="Visual Evidence",
show_label=True,
columns=2,
height=400,
object_fit="contain"
)
with gr.Row():
gr.Markdown("### ✅ Final Compliance Verdict")
with gr.Row():
final_output = gr.Markdown(value="*Analysis pending...*")
submit_btn.click(
fn=run_agentic_workflow,
inputs=[question_input],
outputs=[
log_output,
analysis_output, # NEW SLOT
chapter_output,
image_gallery,
final_output
]
)
if __name__ == "__main__":
demo.queue().launch(
inbrowser=True
)