import gradio as gr import json, time import os import re import pandas as pd from google import genai from google.genai import types import chromadb from chromadb.utils import embedding_functions from collections import Counter import base64 import io from PIL import Image import matplotlib.pyplot as plt import openai from datetime import datetime import threading from huggingface_hub import hf_hub_download, HfApi from huggingface_hub.utils import EntryNotFoundError USAGE_DATASET_REPO = os.environ.get("USAGE_DATASET_REPO", "NYSERDA-CRE-Working-Group/nyserda_demo_useage_store") USAGE_FILENAME = os.environ.get("USAGE_FILENAME", "usage.csv") MAX_RUNS_PER_USER = int(os.environ.get("MAX_RUNS_PER_USER", "10")) os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") os.environ["GEMINI_API_KEY"] = os.getenv("GEMINI_API_KEY") HF_TOKEN = os.environ.get("HF_TOKEN") api = HfApi(token=HF_TOKEN) def user_id_from_profile(profile: gr.OAuthProfile | None) -> str | None: if profile is None: return None # You said profile.name exists; normalize it. # If you later can access preferred_username, use that instead (more unique). uid = getattr(profile, "name", None) if not uid: return None return uid.strip().lower() def _load_usage_df() -> pd.DataFrame: try: local_path = hf_hub_download( repo_id=USAGE_DATASET_REPO, repo_type="dataset", filename=USAGE_FILENAME, token=HF_TOKEN, ) return pd.read_csv(local_path) except EntryNotFoundError: # First run: create empty table return pd.DataFrame(columns=["user_id", "runs", "first_seen", "last_seen"]) def _save_usage_df(df: pd.DataFrame, commit_message: str) -> None: tmp_path = "/tmp/usage.csv" df.to_csv(tmp_path, index=False) api.upload_file( path_or_fileobj=tmp_path, path_in_repo=USAGE_FILENAME, repo_id=USAGE_DATASET_REPO, repo_type="dataset", commit_message=commit_message, ) def check_and_increment_quota(user_id: str) -> tuple[bool, int]: now = int(time.time()) df = _load_usage_df() if df.empty or (df["user_id"] == user_id).sum() == 0: runs = 0 if runs >= MAX_RUNS_PER_USER: return False, 0 new_row = { "user_id": user_id, "runs": 1, "first_seen": now, "last_seen": now, } df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) _save_usage_df(df, commit_message=f"usage: increment {user_id} to 1") return True, MAX_RUNS_PER_USER - 1 idx = df.index[df["user_id"] == user_id][0] runs = int(df.loc[idx, "runs"]) if runs >= MAX_RUNS_PER_USER: return False, 0 runs += 1 df.loc[idx, "runs"] = runs df.loc[idx, "last_seen"] = now _save_usage_df(df, commit_message=f"usage: increment {user_id} to {runs}") return True, MAX_RUNS_PER_USER - runs # Global state for the interface class InterfaceState: def __init__(self): self.log_messages = [] self.analysis_messages = [] self.current_chapter = "" self.current_images = [] self.staged_audit_images = [] self.final_answer = "" self.done = False self.lock = threading.Lock() def add_log(self, message): timestamp = datetime.now().strftime("%H:%M:%S") with self.lock: self.log_messages.append(f"**[{timestamp}]** {message}") return "\n\n".join(self.log_messages) def add_analysis(self, message): timestamp = datetime.now().strftime("%H:%M:%S") with self.lock: self.analysis_messages.append(f"**[{timestamp}]** {message}") return "\n\n".join(self.analysis_messages) def set_chapter(self, chapter_text): with self.lock: self.current_chapter = chapter_text return chapter_text def add_image(self, img_pil): with self.lock: self.current_images.append(img_pil) return self.current_images.copy() def add_staged_image_part(self, image_part): """Thread-safe method to stage images for the Gemini Audit.""" with self.lock: self.staged_audit_images.append(image_part) # Log it so we can verify it happened in the console print(f"DEBUG: Staged image part. Total staged: {len(self.staged_audit_images)}") def get_staged_images(self): """Safely retrieve the staged images for the audit turn.""" with self.lock: return list(self.staged_audit_images) # Return a copy to prevent mutation def clear(self): with self.lock: self.log_messages.clear() self.analysis_messages.clear() self.current_chapter = "" self.current_images.clear() self.final_answer = "" self.done = False state = InterfaceState() # Load your data (same as original) with open('Preprocessed Files/page_metadata.json', 'r') as json_file: page_metadata = json.load(json_file) page_metadata = {int(k): v for k, v in page_metadata.items()} with open('Preprocessed Files/text_list.json', 'r') as json_file: text_list = json.load(json_file) with open('Preprocessed Files/tile_metadata.json', 'r') as json_file: tile_metadata = json.load(json_file) tile_metadata = { int(outer_k): { int(inner_k): inner_v for inner_k, inner_v in outer_v.items() } for outer_k, outer_v in tile_metadata.items() } def load_fullpage_images(folder="Images"): files = os.listdir(folder) page_files = [] for f in files: match = re.search(r"page_(\d+)_fullpage\.png", f) if match: page_num = int(match.group(1)) page_files.append((page_num, f)) page_files.sort(key=lambda x: x[0]) image_bytes_list = [] for page_num, filename in page_files: path = os.path.join(folder, filename) with open(path, "rb") as f: img_bytes = f.read() image_bytes_list.append(img_bytes) return image_bytes_list def load_tile_images(page): files = os.listdir('Tiles') page_files = [] for f in files: match = re.search(f"page_{page}_tile_(\d+)\.png", f) if match: page_num = int(match.group(1)) page_files.append((page_num, f)) page_files.sort(key=lambda x: x[0]) image_bytes_list = [] for page_num, filename in page_files: path = os.path.join('Tiles', filename) with open(path, "rb") as f: img_bytes = f.read() image_bytes_list.append(img_bytes) return image_bytes_list image_bytes_list = load_fullpage_images() tile_bytes = {} for page in range(44): tile_list = load_tile_images(page) if tile_list: tile_bytes[page] = load_tile_images(page) # Vector Code Base chroma_client = chromadb.PersistentClient(path="nyc_code_db") embedding_model = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2") collection = chroma_client.get_collection(name="nyc_building_codes", embedding_function=embedding_model) all_pending_images = [] # Modified tool functions with Gradio updates def search_page_text(page_number: int, research_goal: str): state.add_log(f'πŸ” Searching page **{page_metadata[page_number]["sheet_title"]}** for details') state.add_analysis( f'πŸ” Searching page {page_metadata[page_number]["sheet_title"]} with prompt\n{research_goal}' ) raw_text = text_list[page_number] client = openai.OpenAI() response = client.chat.completions.create( model="gpt-5-mini", messages=[ {"role": "system", "content": """ You are a Fast NYC Plans Examiner Signal Agent. Your ONLY job is to extract **code-relevant signals** from the OCR text of a SINGLE drawing page. You do NOT interpret the law and you do NOT summarize design intent. Your output will be used to CONSTRAIN a downstream legal research agent. ======================== WHAT TO EXTRACT ======================== Look only for information that determines which parts of the NYC Code apply such as: - Occupancy classification (e.g., R-2, A-3, M, S, F, mixed-use) - Building height (stories, feet, high-rise indicators) - Construction type (I, II, III, IV, V) - Fire protection systems (sprinklers, standpipes, fire alarm, smoke control) - Means of egress references (stairs, exits, exit access, doors, corridors) - Structural system hints (steel, concrete, load-bearing walls, columns, transfer girders) - Mechanical / fuel / plumbing system mentions (boilers, gas piping, HVAC type, shafts) - Zoning or special district references (if present) - Scope flags (new building, alteration, addition, change of occupancy, retrofit) However only return relevant signals to the provided research goal. ======================== OUTPUT FORMAT (STRICT MARKDOWN) ======================== Return ONLY the following sections: ### Code-Relevant Signals - Bullet list of extracted facts ### Likely Governing Code Domains - One-line list chosen from: Administrative, Building, Mechanical, FuelGas, Plumbing, Fire ### Text Evidence - Short quoted snippets from the page that support each signal ======================== RULES ======================== - Do NOT speculate - If a signal is not present, omit it - Prefer exact phrases over paraphrase - Keep total length under 500 words - No legal conclusions, no compliance advice """}, {"role": "user", "content": f"PAGE TEXT:\n{raw_text}\n\nRESEARCH GOAL: {research_goal}\n\nReturn a breif but comprehensive Markdown summary of your findings and justification with text snippets."} ] ) analysis_text = response.choices[0].message.content state.add_analysis( f"🟦 Text Analyst (Page {page_number})\n{analysis_text}" ) return { "page": page_number, "summary": analysis_text } def discover_code_locations(query: str): state.add_log(f'πŸ“š Searching NYC Code for: **{query}**') results = collection.query( query_texts=[query], n_results=25, include=["metadatas", "documents"] ) if not results['metadatas'][0]: return "No results found. Try a different technical keyword." metas = results['metadatas'][0] docs = results['documents'][0] category_chapter_pairs = [f"{m['code_type']} | Ch. {m['parent_major']}" for m in metas] counts = Counter(category_chapter_pairs) chapter_summary = "\n".join([f"- {pair} ({count} hits)" for pair, count in counts.most_common(5)]) section_reports = [] for m, doc in zip(metas, docs): report = ( f"ID: {m['section_full']} | Code: {m['code_type']} | Chapter: {m['parent_major']}\n" f"Snippet: {doc}" ) section_reports.append(report) output = ( "### CODE DISCOVERY REPORT ###\n" f"MOST RELEVANT CHAPTERS:\n{chapter_summary}\n\n" "TOP RELEVANT SECTIONS:\n" + "\n---\n".join(section_reports) ) return output def fetch_full_chapter(code_type: str, chapter_id: str): state.add_log(f'πŸ“– Fetching Chapter **{chapter_id}** from **{code_type}** code') try: chapter_data = collection.get( where={ "$and": [ {"code_type": {"$eq": code_type}}, {"parent_major": {"$eq": chapter_id}} ] }, include=["documents", "metadatas"] ) if not chapter_data['documents']: return f"No documentation found for {code_type} Chapter {chapter_id}." sections = sorted(zip(chapter_data['metadatas'], chapter_data['documents']), key=lambda x: x[0]['section_full']) full_text = f"## FULL LEGAL TEXT: {code_type.upper()} CODE - CHAPTER {chapter_id}\n\n" for meta, doc in sections: blocks = doc.split("[CONT.]:") unique_blocks = [] for b in blocks: clean_b = b.strip() if clean_b and clean_b not in unique_blocks: unique_blocks.append(clean_b) clean_doc = " ".join(unique_blocks) full_text += f"### SECTION {meta['section_full']}\n{clean_doc}\n\n---\n\n" # Update the chapter display state.set_chapter(full_text) return full_text except Exception as e: return f"Error retrieving chapter content: {str(e)}" def nyc_legal_sub_agent(research_goal: str): state.add_log(f'βš–οΈ Investigating NYC Code for: **{research_goal}**') state.add_analysis( f"βš–οΈ Legal Analyst is searching\n{research_goal}" ) client = openai.OpenAI() internal_tools = [ { "type": "function", "function": { "name": "discover_code_locations", "description": "Scans NYC code in a semantic vector database. Use this FIRST to find which chapters/sections are relevant.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "semantic search string for a vector database (Not a keyword search use a full sentence)"} }, "required": ["query"] } } }, { "type": "function", "function": { "name": "fetch_full_chapter", "description": "Retrieves the full legal text of a specific chapter for deep analysis.", "parameters": { "type": "object", "properties": { "code_type": { "type": "string", "enum": ["Administrative", "Building", "FuelGas", "Mechanical", "Plumbing"], "description": "The specific NYC code volume to search." }, "chapter_id": {"type": "string", "description": "The chapter number string"} }, "required": ["code_type", "chapter_id"] } } } ] messages = [ {"role": "system", "content": """ You are a Senior NYC Building Code Consultant and Legal Research Agent. Your task is to produce a **definitive, citation-backed legal report** that can be used directly by a downstream orchestration agent. Accuracy, traceability, and completeness matter more than brevity. ======================== PRIMARY OBJECTIVE ======================== Given a research goal, identify and analyze relevant NYC Code provisions, including: - Governing sections - Exceptions - Cross-references - Related chapters that modify, limit, or expand the rule Every legal claim MUST be supported by a specific code citation. You are operating in FAST LEGAL MODE. SEARCH BUDGET: - Maximum of 2 calls to `discover_code_locations` - Maximum of 2 calls to `fetch_full_chapter` STOP CONDITIONS: - If the first chapter fetch contains governing text AND exceptions, STOP and synthesize. - Only fetch a second chapter if the first chapter explicitly cross-references another chapter. PRIORITY ORDER: 1) Governing rule section 2) Exceptions 3) Cross-references that MODIFY the rule Ignore definitions and administrative content unless directly referenced. GOOD ENOUGH STANDARD: If you can identify: - The governing section - At least one exception or limitation You must STOP and report. ======================== TOOL STRATEGY (MANDATORY) ======================== This is a semantic vector database, NOT a keyword index. Always search in full English questions. 1) FIRST β€” Call `discover_code_locations` - Use a natural-language query describing the legal requirement you are trying to find - Example: "What NYC Building Code sections regulate emergency egress width in residential buildings" NEVER use a keyword search thi will not work you are searching a vector database. If you know what chaoter you need call the fetch_full_chapter tool instead. If you perform TWO consecutive `discover_code_locations` calls and both return no new relevant chapters or sections: You MUST stop searching and do one of the following: - Conclude that the table/section does NOT exist as a standalone provision in the NYC Code corpus, OR - Conclude that the requirement is embedded within the previously retrieved sections Then proceed to report findings using the closest governing section. DO NOT continue reformulating the same query. You MUST NOT call `discover_code_locations` more than once for the same legal concept. If a new query is semantically similar to a prior query, STOP and move forward with analysis. 2) SECOND β€” Call `fetch_full_chapter` - If multiple relevant sections appear in the same chapter - OR if a section contains exceptions, references, or conditional language - OR if you know what section of the code is relevant and want to see a full chapter 3) THIRD β€” Follow Cross-References - If a section says "See Section X", "As required by Chapter Y", or "Except as permitted in..." - You MUST search and retrieve those sections as well 4) STOP ONLY WHEN - All exceptions are reviewed - All cross-references are resolved - No additional modifying sections remain ======================== OUTPUT FORMAT (STRICT) ======================== Return a structured legal report in the following format: ### Legal Summary Brief, plain-language explanation of what the code requires. ### Governing Code Sections - **[Code Type] Β§[Section Number] β€” [Title]** - Summary: - Key Requirements: - Applicability Conditions: - Exceptions: ### Cross-References Analyzed - **Β§[Section Number] β€” [Title]** - Why It Matters: - Impact on Main Rule: ### Edge Cases & Enforcement Notes - Special conditions (building type, occupancy class, height, system type, jurisdictional notes) - Common misinterpretations - DOB or FDNY enforcement implications (if relevant) ### Compliance Checklist - Bullet list of actionable compliance steps derived from the code ======================== QUALITY RULES ======================== - NEVER summarize without citing - NEVER assume jurisdiction, building type, or occupancy unless the code explicitly states it - If legal text is ambiguous, flag it as **Interpretive** - Prefer quoting short legal phrases when clarity matters ======================== TONE ======================== Professional. Precise. Legal-research quality. No speculation. """}, {"role": "user", "content": f"Analyze the NYC building code with this goal: {research_goal}"} ] for _ in range(20): response = client.chat.completions.create( model="gpt-5-mini", messages=messages, tools=internal_tools, tool_choice="auto" ) msg = response.choices[0].message messages.append(msg) if not msg.tool_calls: break for tool_call in msg.tool_calls: func_name = tool_call.function.name args = json.loads(tool_call.function.arguments) if func_name == "discover_code_locations": result = discover_code_locations(args['query']) elif func_name == "fetch_full_chapter": result = fetch_full_chapter(args['code_type'], args['chapter_id']) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result }) state.add_analysis( f"🟨 Legal Analyst\n{msg.content}" ) return msg.content def merge_tiles(tile_indexes: list[int], page_num: int): state.add_log(f'πŸ”¬ Stitching tiles **{tile_indexes}** from page **{page_num}**') images = [] positions = [] tiles = tile_bytes[page_num] tiles_coords_dict = tile_metadata[page_num] for index in tile_indexes: if index < 0 or index >= len(tiles): raise ValueError(f"Tile index {index} out of range") img_bytes = tiles[index] if img_bytes is None: raise ValueError(f"No image bytes found for tile {index}") img = Image.open(io.BytesIO(img_bytes)).convert('RGBA') images.append(img) x = tiles_coords_dict[index]['coords'][0] y = tiles_coords_dict[index]['coords'][1] positions.append((x, y)) if not images: return None min_x = min(x for x, y in positions) min_y = min(y for x, y in positions) normalized_positions = [(x - min_x, y - min_y) for x, y in positions] total_width = max(pos[0] + img.width for pos, img in zip(normalized_positions, images)) total_height = max(pos[1] + img.height for pos, img in zip(normalized_positions, images)) stitched_image = Image.new('RGB', (total_width, total_height), (255, 255, 255)) for img, pos in zip(images, normalized_positions): stitched_image.paste(img, pos) # Add to image gallery output_buffer = io.BytesIO() stitched_image.save(output_buffer, format='PNG') stitched_bytes = output_buffer.getvalue() return stitched_bytes def extract_json(s: str): s = s.strip() start = s.find("{") end = s.rfind("}") if start == -1 or end == -1 or end < start: raise ValueError("No JSON object found in model output:\n" + repr(s)) json_str = s[start:end+1] return json.loads(json_str) def sanitize_tile_indices(data): """ Forcefully converts various LLM outputs into a clean list of integers. Handles: [1, 2], ["1", "2"], "1, 2, 3", "[1, 2, 3]", and None. """ if not data: return [] # If it's already a list, ensure all elements are integers if isinstance(data, list): clean_list = [] for item in data: try: # This handles strings inside the list like ["1", "2"] clean_list.append(int(str(item).strip())) except (ValueError, TypeError): continue return clean_list # If it's a string, use Regex to find all sequences of digits if isinstance(data, str): # findall returns all non-overlapping matches of the pattern numbers = re.findall(r'\d+', data) return [int(n) for n in numbers] return [] def execute_page_expert(expert_instructions: str, page_num: int): state.add_log(f'πŸ‘οΈ Spawning Page Expert for page **{page_num}**') state.add_analysis(f"πŸ‘οΈ Page Expert searching for {expert_instructions}") state.add_log(f'πŸ“„ Attaching full-page context for page **{page_num}**') state.add_analysis( f"πŸ“„ Full-page context attached for page `{page_num}`" ) full_page_img = Image.open( io.BytesIO(image_bytes_list[page_num]) ) state.add_image(full_page_img) client = openai.OpenAI() tools = [ { "type": "function", "function": { "name": "merge_tiles", "description": "Stitches high-resolution image tiles together into a single zoomed-in view. Use this to read small text, dimensions, or symbols.", "parameters": { "type": "object", "properties": { "tile_indexes": { "type": "array", "items": {"type": "integer"}, "description": "A list of integer tile IDs from the Grid Map to stitch together." } }, "required": ["tile_indexes"] } } } ] page_text = text_list[page_num] relevant_tile_meta = tile_metadata[page_num] b64_full_page = base64.b64encode(image_bytes_list[page_num]).decode() system_prompt = """ You are a Lead AEC Visual Investigator supporting a Compliance Planner. Your mission is to extract **verifiable, high-fidelity evidence** from this drawing page. You must ground every claim in either: - a **Zoomed Tile Image** (via `merge_tiles`) or - a **Direct Text Quote** from the OCR page text. Guesses, assumptions, and general descriptions are not allowed. ======================== MANDATORY WORKFLOW ======================== 1) ORIENT - Review the full-page image and the Grid Map to identify candidate regions. - Decide which tiles likely contain the required evidence. Utilize the tile metadata to assist with this tasl. 2) ZOOM (REQUIRED) - You MUST call `merge_tiles(tile_indexes=[...])` before making ANY factual claim about symbols, dimensions, labels, or locations. - Always request ALL tiles needed in a SINGLE call. - If the first zoom is insufficient, call again with additional tiles. - Call the zoom until you have found all relevant tiles, refer to the tile metadata to assist in your search. 3) VERIFY - Read the zoomed image carefully. - Extract exact values, tags, room names, and directional cues. 4) REPORT - Return the Findings Packet in strict JSON format. ======================== WHAT COUNTS AS PROOF ======================== - Dimension values (e.g., β€œ36\"”, β€œ1 HR RATED”) - Explicit labels (e.g., β€œEXIT”, β€œSTAIR A”, β€œR-2”, β€œCOLUMN C3”) - Symbol legends that define a mark - Path continuity that can be visually traced across tiles - OCR text snippets ======================== FINDINGS RULES ======================== - Every bullet in `findings` MUST cite either: - `[Tile ]` or - `"Quoted text"` - If a claim cannot be verified from the zoomed tiles or text, mark it as **Unverified**. - Be comprehensive in this report, your supervisor only has access to the report you give in findings, not the full page text or other image data you have. - Do NOT repeat planner instructions β€” only report what you observe. ======================== VISUAL POINTERS RULES ======================== - Exclude orientation-only or whitespace tiles. - Include ALL tiles needed to re-trace a path or confirm a relationship. - **Your superviser will ONLY see the tiles that you reference here, be comprehensive when returning these tiles.** ======================== FULL PAGE USEFULNESS ======================== Set `true` ONLY if the finding requires spatial context across the entire page, or if your zoom is missing information. (e.g., tracing egress path, riser continuity, system routing). Otherwise set `false`. ======================== JSON FORMAT (STRICT) ======================== { "findings": "", "visual_pointers": [list of ], "textual_evidence": [""], "full_page_usefulness": , "limitations": "" } ======================== FAILURE CONDITIONS ======================== - If no relevant evidence exists on this page, return: { "findings": "No relevant technical evidence found for the planner's instruction.", "visual_pointers": [], "textual_evidence": [], "full_page_usefulness": false, "limitations": "This page does not contain the requested information or it is not legible at available resolution." } Return ONLY valid JSON. """ messages = [ {"role": "system", "content": system_prompt}, { "role": "user", "content": [ {"type": "text", "text": f"Planner Instruction:\n{expert_instructions}"}, {"type": "text", "text": f"Page Context:\n{page_text}"}, {"type": "text", "text": f"Available Grid Map:\n{json.dumps(relevant_tile_meta)}"}, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{b64_full_page}" } } ] } ] MAX_TURNS = 3 for turn in range(MAX_TURNS): response = client.chat.completions.create( model="gpt-4o", messages=messages, tools=tools, tool_choice="auto" ) msg = response.choices[0].message messages.append(msg) if msg.content: try: res = extract_json(msg.content) state.add_analysis( f"🟨 Page Analyst\n{res.get('findings','')}" ) raw_pointers = res.get("visual_pointers", []) tile_idxs = sanitize_tile_indices(raw_pointers) if tile_idxs and tile_idxs != '[]': stitched_bytes = merge_tiles( tile_indexes=tile_idxs, page_num=page_num ) state.add_log(f'πŸ“Έ Staging {len(tile_idxs)} tiles for final audit...') # Store these to use AFTER the chat finishes state.add_staged_image_part( types.Part.from_bytes( data=stitched_bytes, # <-- 'data=' is required here mime_type="image/png" ) ) stitched_img = Image.open( io.BytesIO(stitched_bytes) ) state.add_image(stitched_img) state.add_staged_image_part( types.Part.from_bytes( data=image_bytes_list[page_num], # <-- 'data=' is required here mime_type="image/png" ) ) return res except: pass if msg.tool_calls: tool_results = [] image_blocks = [] for call in msg.tool_calls: if call.function.name == "merge_tiles": args = json.loads(call.function.arguments) idxs = args["tile_indexes"] stitched_bytes = merge_tiles( tile_indexes=idxs, page_num=page_num ) b64_tile = base64.b64encode(stitched_bytes).decode() tool_results.append({ "role": "tool", "tool_call_id": call.id, "content": json.dumps({ "status": "success", "tiles": idxs }) }) image_blocks.append( { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{b64_tile}" } } ) for tool_msg in tool_results: messages.append(tool_msg) messages.append({ "role": "user", "content": [ { "type": "text", "text": "Here are the high-resolution zooms you requested. Analyze exits, locations, and any capacity labels." }, *image_blocks ] }) continue messages.append({ "role": "user", "content": "Return the FINAL JSON now." }) raise RuntimeError("No FINAL JSON output from Page Expert") # Set up Gemini planner tools_list = [search_page_text, nyc_legal_sub_agent, execute_page_expert] import time planner = genai.Client() planner_model = "gemini-3-pro-preview" planner_prompt = f""" You are the Lead Architectural Compliance Planner for NYC Building Code and Zoning review. Your role is to coordinate specialist sub-agents and deliver a **proof-carrying compliance verdict** based ONLY on: - OCR-extracted drawing text - High-resolution visual evidence (tile zooms) - Official NYC Code citations You must NOT speculate or rely on architectural norms. ======================== DRAWING INDEX (Page Metadata) ======================== Use this index to select pages for visual inspection. Avoid irrelevant sheets (e.g., Site, Civil, Utility, Stormwater) unless zoning or site compliance is explicitly required. {json.dumps(page_metadata)} ======================== SPECIALIST SUB-AGENTS ======================== None of these agents have access to your chat history or internal thought process. They know only how to access information (text, images or code) and what information you give them in the research goal. If they need more context or specific instructions YOU MUST PROVIDE IT WHEN CALLING THEM in the research goal. 1) `search_page_text` Purpose: FAST signal extractor. Use to identify code-triggering facts: - Occupancy classification - Building height / stories / high-rise - Construction type - Scope of work (new, alteration, addition, change of occupancy) - Fire protection systems Output is used ONLY to constrain legal research. 2) `nyc_legal_sub_agent` Purpose: Definitive legal authority. Use to retrieve governing NYC Code sections, exceptions, and cross-references. Always pass a focused topic derived from Phase 1 signals. **YOU MAY ONLY CALL THIS TOOL ONCE** 3) `execute_page_expert` Purpose: High-resolution visual verification. Use to confirm compliance or non-compliance by zooming tiles. This agent provides the ONLY acceptable visual proof. **NEVER CALL THIS TOOL MORE THAN ONCE ON A SINGLE PAGE** ======================== MANDATORY PHASED WORKFLOW ======================== PHASE 1 β€” SIGNAL EXTRACTION - Use `search_page_text` on candidate pages to determine: occupancy, height, construction type, system presence, and scope. - If signals are missing or ambiguous, expand to additional pages. - Do NOT proceed until you have enough facts to define legal applicability. PHASE 2 β€” LEGAL SCOPING - Convert Phase 1 signals into a focused legal topic. - Call `nyc_legal_sub_agent`. - Extract governing sections, exceptions, and edge cases. PHASE 3 β€” VISUAL VERIFICATION - Identify the SINGLE most relevant page for proof. - Call `execute_page_expert` with precise instructions tied to legal requirements (e.g., β€œVerify exit door clear width at Stair A serving R-2 occupancy”). - Ensure returned findings include tile IDs and/or text quotes. PHASE 4 β€” SYNTHESIS & VERDICT - Compare visual findings directly against legal requirements. - Resolve conflicts: - If legal text and visual evidence disagree β†’ flag as **Non-Compliant or Ambiguous** - If evidence is missing β†’ flag as **Unverified** - Cite both: - NYC Code Section(s) - Tile ID(s) or OCR quotes **NEVER CALL THE SAME AGENT FOR THE SAME TASK TWICE REFER TO PREVIOUS ANSWERS WHEN ABLE** **NEVER CALL THE PAGE EXPERT TWICE ON THE SAME PAGE** ======================== FINAL OUTPUT FORMAT (STRICT MARKDOWN) ======================== ### Compliance Verdict **Status:** Compliant | Non-Compliant | Unverified | Ambiguous ### Legal Basis - **[Code Type] Β§[Section] β€” [Title]** - Requirement: - Exceptions Considered: ### Visual Evidence - Finding: - Proof: [Tile ID(s)] or "Quoted OCR Text" ### Reasoning - Step-by-step comparison between legal requirement and observed condition ### Limitations - What could not be verified and why ======================== CONTROL RULES ======================== - NEVER call `nyc_legal_sub_agent` before `search_page_text` - NEVER issue a final verdict without calling `execute_page_expert` - If no page contains sufficient proof, return **Unverified** - Prefer false negatives over false positives *** CRITICAL VISUAL PROTOCOL *** - When `execute_page_expert` returns, it will explicitly state "VISUAL_PROOF_PENDING". - When you see this, your ONLY response must be: "Awaiting visual proof." - DO NOT attempt to guess the verdict. - DO NOT complain about missing images. - Simply wait. The user will immediately send the images in the next turn. ======================== QUALITY STANDARD ======================== This output should be defensible to a DOB plan examiner or legal reviewer. Every claim must be traceable to law and evidence. """ config = types.GenerateContentConfig( system_instruction=planner_prompt, tools=tools_list ) chat = planner.chats.create(model=planner_model, config=config) def agent_worker(user_question): state.clear() state.add_log(f'πŸš€ Starting analysis for: **{user_question}**') state.add_analysis("🧠 Planner initialized. Awaiting tool calls...") # 1. Initialize the Stateful Chat chat = planner.chats.create(model=planner_model, config=config) response = chat.send_message(user_question) # 2. Track images throughout the conversation # 3. Standard Tool Loop (Phases 1-3) while response.candidates[0].content.parts[0].function_call: tool_responses = [] for part in response.candidates[0].content.parts: if part.function_call: name = part.function_call.name args = part.function_call.args state.add_log(f'πŸ› οΈ Tool Call: **{name}**') func = globals()[name] result = func(**args) tool_responses.append( types.Part.from_function_response(name=name, response={"result": result}) ) # Send tool results back to the stateful chat response = chat.send_message(tool_responses) # ----------------------------------------------------------------- # PHASE 4: THE POST-CHAT HANDOFF (The "Visual Audit") # ----------------------------------------------------------------- # At this point, the while loop has ended. # 'response.text' contains the model's preliminary answer. audit_images = state.get_staged_images() if audit_images: state.add_log(f"πŸ‘οΈ Preliminary answer received. Performing audit with {len(audit_images)} images...") # 1. Construct the audit parts # Ensure 'text=' is used for the Part constructor audit_parts = [ types.Part.from_text( text="You have provided a preliminary verdict. Now, look at these images " "to verify your findings. If the visual evidence contradicts your " "text-based search, update your verdict now. " ), *audit_images ] try: # 2. Send directly through the 'chat' session # This automatically appends to history and maintains the session state final_response = chat.send_message(audit_parts) state.final_answer = final_response.text except Exception as e: # If the above fails, try the explicit message keyword state.add_log("πŸ”„ Retrying audit with explicit message keyword...") final_response = chat.send_message(message=audit_parts) state.final_answer = final_response.text else: state.add_log("⚠️ No images found in state. Skipping visual audit.") state.final_answer = response.text state.add_log('🏁 **ANALYSIS COMPLETE**') state.done = True def run_agentic_workflow(user_question, profile: gr.OAuthProfile | None): uid = user_id_from_profile(profile) if uid is None: raise gr.Error("Please sign in with Hugging Face to use this demo.") allowed, remaining = check_and_increment_quota(uid) if not allowed: raise gr.Error(f"Usage limit reached: {MAX_RUNS_PER_USER} runs per user.") if remaining <= 2: gr.Warning(f"⚠️ Only {remaining} run(s) left!") else: gr.Info(f"βœ“ Runs remaining: {remaining}") state.done = False state.final_answer = "" thread = threading.Thread( target=agent_worker, args=(user_question,), daemon=True ) thread.start() while not state.done: with state.lock: logs = "\n\n".join(state.log_messages) analysis = "\n\n".join(state.analysis_messages) chapter = state.current_chapter images = list(state.current_images) yield ( logs, analysis, chapter, images, "*Analysis in progress...*" ) time.sleep(0.25) with state.lock: logs = "\n\n".join(state.log_messages) analysis = "\n\n".join(state.analysis_messages) chapter = state.current_chapter images = list(state.current_images) final = state.final_answer yield ( logs, analysis, chapter, images, final ) # Build Gradio Interface with gr.Blocks(title="AEC Compliance Agent") as demo: gr.LoginButton() gr.Markdown("# πŸ—οΈ AEC Compliance Analysis Agent") gr.Markdown("Ask questions about NYC Building Code compliance for your construction drawings.") with gr.Row(): with gr.Column(scale=1): question_input = gr.Textbox( label="Your Question", placeholder="e.g., Does this building comply with egress requirements for 738 occupants?", lines=3 ) submit_btn = gr.Button("πŸ” Analyze", variant="primary", size="lg") gr.Markdown("### πŸ“‹ Analysis Log") log_output = gr.Markdown(value="", height=400) with gr.Column(scale=1): gr.Markdown("### 🧠 Sub-Agent Analysis") analysis_output = gr.Markdown(value="", height=600) with gr.Column(scale=1): gr.Markdown("### πŸ“– Code Chapter") chapter_output = gr.Markdown(value="*No chapter loaded yet*", height=600) with gr.Row(): gr.Markdown("### πŸ–ΌοΈ Retrieved Images") with gr.Row(): image_gallery = gr.Gallery( label="Visual Evidence", show_label=True, columns=2, height=400, object_fit="contain" ) with gr.Row(): gr.Markdown("### βœ… Final Compliance Verdict") with gr.Row(): final_output = gr.Markdown(value="*Analysis pending...*") submit_btn.click( fn=run_agentic_workflow, inputs=[question_input], outputs=[ log_output, analysis_output, # NEW SLOT chapter_output, image_gallery, final_output ] ) if __name__ == "__main__": demo.queue().launch( inbrowser=True )