| | import gradio as gr |
| | import json, time |
| | import os |
| | import re |
| | import pandas as pd |
| | from google import genai |
| | from google.genai import types |
| | import chromadb |
| | from chromadb.utils import embedding_functions |
| | from collections import Counter |
| | import base64 |
| | import io |
| | from PIL import Image |
| | import matplotlib.pyplot as plt |
| | import openai |
| | from datetime import datetime |
| | import threading |
| | from huggingface_hub import hf_hub_download, HfApi |
| | from huggingface_hub.utils import EntryNotFoundError |
| |
|
| | USAGE_DATASET_REPO = os.environ.get("USAGE_DATASET_REPO", "NYSERDA-CRE-Working-Group/nyserda_demo_useage_store") |
| | USAGE_FILENAME = os.environ.get("USAGE_FILENAME", "usage.csv") |
| | MAX_RUNS_PER_USER = int(os.environ.get("MAX_RUNS_PER_USER", "10")) |
| |
|
| | os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") |
| | os.environ["GEMINI_API_KEY"] = os.getenv("GEMINI_API_KEY") |
| | HF_TOKEN = os.environ.get("HF_TOKEN") |
| |
|
| | api = HfApi(token=HF_TOKEN) |
| | def user_id_from_profile(profile: gr.OAuthProfile | None) -> str | None: |
| | if profile is None: |
| | return None |
| | |
| | |
| | uid = getattr(profile, "name", None) |
| | if not uid: |
| | return None |
| | return uid.strip().lower() |
| |
|
| | def _load_usage_df() -> pd.DataFrame: |
| | try: |
| | local_path = hf_hub_download( |
| | repo_id=USAGE_DATASET_REPO, |
| | repo_type="dataset", |
| | filename=USAGE_FILENAME, |
| | token=HF_TOKEN, |
| | ) |
| | return pd.read_csv(local_path) |
| | except EntryNotFoundError: |
| | |
| | return pd.DataFrame(columns=["user_id", "runs", "first_seen", "last_seen"]) |
| |
|
| | def _save_usage_df(df: pd.DataFrame, commit_message: str) -> None: |
| | tmp_path = "/tmp/usage.csv" |
| | df.to_csv(tmp_path, index=False) |
| |
|
| | api.upload_file( |
| | path_or_fileobj=tmp_path, |
| | path_in_repo=USAGE_FILENAME, |
| | repo_id=USAGE_DATASET_REPO, |
| | repo_type="dataset", |
| | commit_message=commit_message, |
| | ) |
| |
|
| | def check_and_increment_quota(user_id: str) -> tuple[bool, int]: |
| | now = int(time.time()) |
| | df = _load_usage_df() |
| |
|
| | if df.empty or (df["user_id"] == user_id).sum() == 0: |
| | runs = 0 |
| | if runs >= MAX_RUNS_PER_USER: |
| | return False, 0 |
| | new_row = { |
| | "user_id": user_id, |
| | "runs": 1, |
| | "first_seen": now, |
| | "last_seen": now, |
| | } |
| | df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) |
| | _save_usage_df(df, commit_message=f"usage: increment {user_id} to 1") |
| | return True, MAX_RUNS_PER_USER - 1 |
| |
|
| | idx = df.index[df["user_id"] == user_id][0] |
| | runs = int(df.loc[idx, "runs"]) |
| |
|
| | if runs >= MAX_RUNS_PER_USER: |
| | return False, 0 |
| |
|
| | runs += 1 |
| | df.loc[idx, "runs"] = runs |
| | df.loc[idx, "last_seen"] = now |
| |
|
| | _save_usage_df(df, commit_message=f"usage: increment {user_id} to {runs}") |
| | return True, MAX_RUNS_PER_USER - runs |
| | |
| | |
| | class InterfaceState: |
| | def __init__(self): |
| | self.log_messages = [] |
| | self.analysis_messages = [] |
| | self.current_chapter = "" |
| | self.current_images = [] |
| | self.staged_audit_images = [] |
| | self.final_answer = "" |
| | self.done = False |
| | self.lock = threading.Lock() |
| | |
| | def add_log(self, message): |
| | timestamp = datetime.now().strftime("%H:%M:%S") |
| | with self.lock: |
| | self.log_messages.append(f"**[{timestamp}]** {message}") |
| | return "\n\n".join(self.log_messages) |
| |
|
| | def add_analysis(self, message): |
| | timestamp = datetime.now().strftime("%H:%M:%S") |
| | with self.lock: |
| | self.analysis_messages.append(f"**[{timestamp}]** {message}") |
| | return "\n\n".join(self.analysis_messages) |
| |
|
| | def set_chapter(self, chapter_text): |
| | with self.lock: |
| | self.current_chapter = chapter_text |
| | return chapter_text |
| |
|
| | def add_image(self, img_pil): |
| | with self.lock: |
| | self.current_images.append(img_pil) |
| | return self.current_images.copy() |
| | |
| | def add_staged_image_part(self, image_part): |
| | """Thread-safe method to stage images for the Gemini Audit.""" |
| | with self.lock: |
| | self.staged_audit_images.append(image_part) |
| | |
| | print(f"DEBUG: Staged image part. Total staged: {len(self.staged_audit_images)}") |
| |
|
| | def get_staged_images(self): |
| | """Safely retrieve the staged images for the audit turn.""" |
| | with self.lock: |
| | return list(self.staged_audit_images) |
| | |
| | def clear(self): |
| | with self.lock: |
| | self.log_messages.clear() |
| | self.analysis_messages.clear() |
| | self.current_chapter = "" |
| | self.current_images.clear() |
| | self.final_answer = "" |
| | self.done = False |
| | |
| | |
| |
|
| | state = InterfaceState() |
| |
|
| | |
| | with open('Preprocessed Files/page_metadata.json', 'r') as json_file: |
| | page_metadata = json.load(json_file) |
| | page_metadata = {int(k): v for k, v in page_metadata.items()} |
| |
|
| | with open('Preprocessed Files/text_list.json', 'r') as json_file: |
| | text_list = json.load(json_file) |
| | |
| | with open('Preprocessed Files/tile_metadata.json', 'r') as json_file: |
| | tile_metadata = json.load(json_file) |
| | tile_metadata = { |
| | int(outer_k): { |
| | int(inner_k): inner_v |
| | for inner_k, inner_v in outer_v.items() |
| | } |
| | for outer_k, outer_v in tile_metadata.items() |
| | } |
| |
|
| | def load_fullpage_images(folder="Images"): |
| | files = os.listdir(folder) |
| | page_files = [] |
| | for f in files: |
| | match = re.search(r"page_(\d+)_fullpage\.png", f) |
| | if match: |
| | page_num = int(match.group(1)) |
| | page_files.append((page_num, f)) |
| | page_files.sort(key=lambda x: x[0]) |
| | image_bytes_list = [] |
| | for page_num, filename in page_files: |
| | path = os.path.join(folder, filename) |
| | with open(path, "rb") as f: |
| | img_bytes = f.read() |
| | image_bytes_list.append(img_bytes) |
| | return image_bytes_list |
| |
|
| | def load_tile_images(page): |
| | files = os.listdir('Tiles') |
| | page_files = [] |
| | for f in files: |
| | match = re.search(f"page_{page}_tile_(\d+)\.png", f) |
| | if match: |
| | page_num = int(match.group(1)) |
| | page_files.append((page_num, f)) |
| | page_files.sort(key=lambda x: x[0]) |
| | image_bytes_list = [] |
| | for page_num, filename in page_files: |
| | path = os.path.join('Tiles', filename) |
| | with open(path, "rb") as f: |
| | img_bytes = f.read() |
| | image_bytes_list.append(img_bytes) |
| | return image_bytes_list |
| |
|
| | image_bytes_list = load_fullpage_images() |
| |
|
| | tile_bytes = {} |
| | for page in range(44): |
| | tile_list = load_tile_images(page) |
| | if tile_list: |
| | tile_bytes[page] = load_tile_images(page) |
| |
|
| | |
| | chroma_client = chromadb.PersistentClient(path="nyc_code_db") |
| | embedding_model = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2") |
| | collection = chroma_client.get_collection(name="nyc_building_codes", embedding_function=embedding_model) |
| |
|
| | all_pending_images = [] |
| |
|
| | |
| | def search_page_text(page_number: int, research_goal: str): |
| | state.add_log(f'🔍 Searching page **{page_metadata[page_number]["sheet_title"]}** for details') |
| | |
| | state.add_analysis( |
| | f'🔍 Searching page {page_metadata[page_number]["sheet_title"]} with prompt\n{research_goal}' |
| | ) |
| | |
| | raw_text = text_list[page_number] |
| | |
| | client = openai.OpenAI() |
| | response = client.chat.completions.create( |
| | model="gpt-5-mini", |
| | messages=[ |
| | {"role": "system", "content": """ |
| | You are a Fast NYC Plans Examiner Signal Agent. |
| | |
| | Your ONLY job is to extract **code-relevant signals** from the OCR text of a SINGLE drawing page. |
| | You do NOT interpret the law and you do NOT summarize design intent. |
| | |
| | Your output will be used to CONSTRAIN a downstream legal research agent. |
| | |
| | ======================== |
| | WHAT TO EXTRACT |
| | ======================== |
| | Look only for information that determines which parts of the NYC Code apply such as: |
| | |
| | - Occupancy classification (e.g., R-2, A-3, M, S, F, mixed-use) |
| | - Building height (stories, feet, high-rise indicators) |
| | - Construction type (I, II, III, IV, V) |
| | - Fire protection systems (sprinklers, standpipes, fire alarm, smoke control) |
| | - Means of egress references (stairs, exits, exit access, doors, corridors) |
| | - Structural system hints (steel, concrete, load-bearing walls, columns, transfer girders) |
| | - Mechanical / fuel / plumbing system mentions (boilers, gas piping, HVAC type, shafts) |
| | - Zoning or special district references (if present) |
| | - Scope flags (new building, alteration, addition, change of occupancy, retrofit) |
| | |
| | However only return relevant signals to the provided research goal. |
| | |
| | ======================== |
| | OUTPUT FORMAT (STRICT MARKDOWN) |
| | ======================== |
| | Return ONLY the following sections: |
| | |
| | ### Code-Relevant Signals |
| | - Bullet list of extracted facts |
| | |
| | ### Likely Governing Code Domains |
| | - One-line list chosen from: Administrative, Building, Mechanical, FuelGas, Plumbing, Fire |
| | |
| | ### Text Evidence |
| | - Short quoted snippets from the page that support each signal |
| | |
| | ======================== |
| | RULES |
| | ======================== |
| | - Do NOT speculate |
| | - If a signal is not present, omit it |
| | - Prefer exact phrases over paraphrase |
| | - Keep total length under 500 words |
| | - No legal conclusions, no compliance advice |
| | """}, |
| | {"role": "user", "content": f"PAGE TEXT:\n{raw_text}\n\nRESEARCH GOAL: {research_goal}\n\nReturn a breif but comprehensive Markdown summary of your findings and justification with text snippets."} |
| | ] |
| | ) |
| | |
| | analysis_text = response.choices[0].message.content |
| |
|
| | state.add_analysis( |
| | f"🟦 Text Analyst (Page {page_number})\n{analysis_text}" |
| | ) |
| | |
| | return { |
| | "page": page_number, |
| | "summary": analysis_text |
| | } |
| |
|
| | def discover_code_locations(query: str): |
| | state.add_log(f'📚 Searching NYC Code for: **{query}**') |
| | |
| | results = collection.query( |
| | query_texts=[query], |
| | n_results=25, |
| | include=["metadatas", "documents"] |
| | ) |
| | |
| | if not results['metadatas'][0]: |
| | return "No results found. Try a different technical keyword." |
| |
|
| | metas = results['metadatas'][0] |
| | docs = results['documents'][0] |
| | |
| | category_chapter_pairs = [f"{m['code_type']} | Ch. {m['parent_major']}" for m in metas] |
| | counts = Counter(category_chapter_pairs) |
| | chapter_summary = "\n".join([f"- {pair} ({count} hits)" for pair, count in counts.most_common(5)]) |
| |
|
| | section_reports = [] |
| | for m, doc in zip(metas, docs): |
| | report = ( |
| | f"ID: {m['section_full']} | Code: {m['code_type']} | Chapter: {m['parent_major']}\n" |
| | f"Snippet: {doc}" |
| | ) |
| | section_reports.append(report) |
| |
|
| | output = ( |
| | "### CODE DISCOVERY REPORT ###\n" |
| | f"MOST RELEVANT CHAPTERS:\n{chapter_summary}\n\n" |
| | "TOP RELEVANT SECTIONS:\n" + |
| | "\n---\n".join(section_reports) |
| | ) |
| | |
| | return output |
| |
|
| | def fetch_full_chapter(code_type: str, chapter_id: str): |
| | state.add_log(f'📖 Fetching Chapter **{chapter_id}** from **{code_type}** code') |
| | |
| | try: |
| | chapter_data = collection.get( |
| | where={ |
| | "$and": [ |
| | {"code_type": {"$eq": code_type}}, |
| | {"parent_major": {"$eq": chapter_id}} |
| | ] |
| | }, |
| | include=["documents", "metadatas"] |
| | ) |
| |
|
| | if not chapter_data['documents']: |
| | return f"No documentation found for {code_type} Chapter {chapter_id}." |
| |
|
| | sections = sorted(zip(chapter_data['metadatas'], chapter_data['documents']), |
| | key=lambda x: x[0]['section_full']) |
| |
|
| | full_text = f"## FULL LEGAL TEXT: {code_type.upper()} CODE - CHAPTER {chapter_id}\n\n" |
| |
|
| | for meta, doc in sections: |
| | blocks = doc.split("[CONT.]:") |
| | unique_blocks = [] |
| | for b in blocks: |
| | clean_b = b.strip() |
| | if clean_b and clean_b not in unique_blocks: |
| | unique_blocks.append(clean_b) |
| | |
| | clean_doc = " ".join(unique_blocks) |
| | full_text += f"### SECTION {meta['section_full']}\n{clean_doc}\n\n---\n\n" |
| |
|
| | |
| | state.set_chapter(full_text) |
| | |
| | return full_text |
| |
|
| | except Exception as e: |
| | return f"Error retrieving chapter content: {str(e)}" |
| |
|
| | def nyc_legal_sub_agent(research_goal: str): |
| | state.add_log(f'⚖️ Investigating NYC Code for: **{research_goal}**') |
| | |
| | state.add_analysis( |
| | f"⚖️ Legal Analyst is searching\n{research_goal}" |
| | ) |
| | |
| | client = openai.OpenAI() |
| | |
| | internal_tools = [ |
| | { |
| | "type": "function", |
| | "function": { |
| | "name": "discover_code_locations", |
| | "description": "Scans NYC code in a semantic vector database. Use this FIRST to find which chapters/sections are relevant.", |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "query": {"type": "string", "description": "semantic search string for a vector database (Not a keyword search use a full sentence)"} |
| | }, |
| | "required": ["query"] |
| | } |
| | } |
| | }, |
| | { |
| | "type": "function", |
| | "function": { |
| | "name": "fetch_full_chapter", |
| | "description": "Retrieves the full legal text of a specific chapter for deep analysis.", |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "code_type": { |
| | "type": "string", |
| | "enum": ["Administrative", "Building", "FuelGas", "Mechanical", "Plumbing"], |
| | "description": "The specific NYC code volume to search." |
| | }, |
| | "chapter_id": {"type": "string", "description": "The chapter number string"} |
| | }, |
| | "required": ["code_type", "chapter_id"] |
| | } |
| | } |
| | } |
| | ] |
| |
|
| | messages = [ |
| | {"role": "system", "content": """ |
| | You are a Senior NYC Building Code Consultant and Legal Research Agent. |
| | |
| | Your task is to produce a **definitive, citation-backed legal report** that can be used directly by a downstream orchestration agent. |
| | Accuracy, traceability, and completeness matter more than brevity. |
| | |
| | ======================== |
| | PRIMARY OBJECTIVE |
| | ======================== |
| | Given a research goal, identify and analyze relevant NYC Code provisions, including: |
| | - Governing sections |
| | - Exceptions |
| | - Cross-references |
| | - Related chapters that modify, limit, or expand the rule |
| | |
| | Every legal claim MUST be supported by a specific code citation. |
| | |
| | You are operating in FAST LEGAL MODE. |
| | |
| | SEARCH BUDGET: |
| | - Maximum of 2 calls to `discover_code_locations` |
| | - Maximum of 2 calls to `fetch_full_chapter` |
| | |
| | STOP CONDITIONS: |
| | - If the first chapter fetch contains governing text AND exceptions, STOP and synthesize. |
| | - Only fetch a second chapter if the first chapter explicitly cross-references another chapter. |
| | |
| | PRIORITY ORDER: |
| | 1) Governing rule section |
| | 2) Exceptions |
| | 3) Cross-references that MODIFY the rule |
| | Ignore definitions and administrative content unless directly referenced. |
| | |
| | GOOD ENOUGH STANDARD: |
| | If you can identify: |
| | - The governing section |
| | - At least one exception or limitation |
| | You must STOP and report. |
| | |
| | ======================== |
| | TOOL STRATEGY (MANDATORY) |
| | ======================== |
| | This is a semantic vector database, NOT a keyword index. Always search in full English questions. |
| | |
| | 1) FIRST — Call `discover_code_locations` |
| | - Use a natural-language query describing the legal requirement you are trying to find |
| | - Example: "What NYC Building Code sections regulate emergency egress width in residential buildings" |
| | NEVER use a keyword search thi will not work you are searching a vector database. |
| | If you know what chaoter you need call the fetch_full_chapter tool instead. |
| | If you perform TWO consecutive `discover_code_locations` calls |
| | and both return no new relevant chapters or sections: |
| | |
| | You MUST stop searching and do one of the following: |
| | - Conclude that the table/section does NOT exist as a standalone provision in the NYC Code corpus, OR |
| | - Conclude that the requirement is embedded within the previously retrieved sections |
| | |
| | Then proceed to report findings using the closest governing section. |
| | |
| | DO NOT continue reformulating the same query. |
| | You MUST NOT call `discover_code_locations` more than once for the same legal concept. |
| | If a new query is semantically similar to a prior query, STOP and move forward with analysis. |
| | |
| | 2) SECOND — Call `fetch_full_chapter` |
| | - If multiple relevant sections appear in the same chapter |
| | - OR if a section contains exceptions, references, or conditional language |
| | - OR if you know what section of the code is relevant and want to see a full chapter |
| | |
| | 3) THIRD — Follow Cross-References |
| | - If a section says "See Section X", "As required by Chapter Y", or "Except as permitted in..." |
| | - You MUST search and retrieve those sections as well |
| | |
| | 4) STOP ONLY WHEN |
| | - All exceptions are reviewed |
| | - All cross-references are resolved |
| | - No additional modifying sections remain |
| | |
| | ======================== |
| | OUTPUT FORMAT (STRICT) |
| | ======================== |
| | Return a structured legal report in the following format: |
| | |
| | ### Legal Summary |
| | Brief, plain-language explanation of what the code requires. |
| | |
| | ### Governing Code Sections |
| | - **[Code Type] §[Section Number] — [Title]** |
| | - Summary: |
| | - Key Requirements: |
| | - Applicability Conditions: |
| | - Exceptions: |
| | |
| | ### Cross-References Analyzed |
| | - **§[Section Number] — [Title]** |
| | - Why It Matters: |
| | - Impact on Main Rule: |
| | |
| | ### Edge Cases & Enforcement Notes |
| | - Special conditions (building type, occupancy class, height, system type, jurisdictional notes) |
| | - Common misinterpretations |
| | - DOB or FDNY enforcement implications (if relevant) |
| | |
| | ### Compliance Checklist |
| | - Bullet list of actionable compliance steps derived from the code |
| | |
| | ======================== |
| | QUALITY RULES |
| | ======================== |
| | - NEVER summarize without citing |
| | - NEVER assume jurisdiction, building type, or occupancy unless the code explicitly states it |
| | - If legal text is ambiguous, flag it as **Interpretive** |
| | - Prefer quoting short legal phrases when clarity matters |
| | |
| | ======================== |
| | TONE |
| | ======================== |
| | Professional. Precise. Legal-research quality. No speculation. |
| | """}, |
| | {"role": "user", "content": f"Analyze the NYC building code with this goal: {research_goal}"} |
| | ] |
| |
|
| | for _ in range(20): |
| | response = client.chat.completions.create( |
| | model="gpt-5-mini", |
| | messages=messages, |
| | tools=internal_tools, |
| | tool_choice="auto" |
| | ) |
| | |
| | msg = response.choices[0].message |
| | messages.append(msg) |
| |
|
| | if not msg.tool_calls: |
| | |
| | break |
| |
|
| | for tool_call in msg.tool_calls: |
| | func_name = tool_call.function.name |
| | args = json.loads(tool_call.function.arguments) |
| | |
| | if func_name == "discover_code_locations": |
| | result = discover_code_locations(args['query']) |
| | elif func_name == "fetch_full_chapter": |
| | result = fetch_full_chapter(args['code_type'], args['chapter_id']) |
| | |
| | messages.append({ |
| | "role": "tool", |
| | "tool_call_id": tool_call.id, |
| | "content": result |
| | }) |
| | |
| | state.add_analysis( |
| | f"🟨 Legal Analyst\n{msg.content}" |
| | ) |
| | |
| | return msg.content |
| |
|
| | def merge_tiles(tile_indexes: list[int], page_num: int): |
| | state.add_log(f'🔬 Stitching tiles **{tile_indexes}** from page **{page_num}**') |
| | |
| | images = [] |
| | positions = [] |
| | |
| | tiles = tile_bytes[page_num] |
| | tiles_coords_dict = tile_metadata[page_num] |
| |
|
| | for index in tile_indexes: |
| | if index < 0 or index >= len(tiles): |
| | raise ValueError(f"Tile index {index} out of range") |
| |
|
| | img_bytes = tiles[index] |
| | if img_bytes is None: |
| | raise ValueError(f"No image bytes found for tile {index}") |
| |
|
| | img = Image.open(io.BytesIO(img_bytes)).convert('RGBA') |
| | images.append(img) |
| |
|
| | x = tiles_coords_dict[index]['coords'][0] |
| | y = tiles_coords_dict[index]['coords'][1] |
| | positions.append((x, y)) |
| |
|
| | if not images: |
| | return None |
| |
|
| | min_x = min(x for x, y in positions) |
| | min_y = min(y for x, y in positions) |
| | normalized_positions = [(x - min_x, y - min_y) for x, y in positions] |
| |
|
| | total_width = max(pos[0] + img.width for pos, img in zip(normalized_positions, images)) |
| | total_height = max(pos[1] + img.height for pos, img in zip(normalized_positions, images)) |
| |
|
| | stitched_image = Image.new('RGB', (total_width, total_height), (255, 255, 255)) |
| |
|
| | for img, pos in zip(images, normalized_positions): |
| | stitched_image.paste(img, pos) |
| | |
| | |
| | |
| | output_buffer = io.BytesIO() |
| | stitched_image.save(output_buffer, format='PNG') |
| | stitched_bytes = output_buffer.getvalue() |
| |
|
| | return stitched_bytes |
| |
|
| | def extract_json(s: str): |
| | s = s.strip() |
| | start = s.find("{") |
| | end = s.rfind("}") |
| | if start == -1 or end == -1 or end < start: |
| | raise ValueError("No JSON object found in model output:\n" + repr(s)) |
| | json_str = s[start:end+1] |
| | return json.loads(json_str) |
| |
|
| | def sanitize_tile_indices(data): |
| | """ |
| | Forcefully converts various LLM outputs into a clean list of integers. |
| | Handles: [1, 2], ["1", "2"], "1, 2, 3", "[1, 2, 3]", and None. |
| | """ |
| | if not data: |
| | return [] |
| | |
| | |
| | if isinstance(data, list): |
| | clean_list = [] |
| | for item in data: |
| | try: |
| | |
| | clean_list.append(int(str(item).strip())) |
| | except (ValueError, TypeError): |
| | continue |
| | return clean_list |
| |
|
| | |
| | if isinstance(data, str): |
| | |
| | numbers = re.findall(r'\d+', data) |
| | return [int(n) for n in numbers] |
| |
|
| | return [] |
| |
|
| | def execute_page_expert(expert_instructions: str, page_num: int): |
| | state.add_log(f'👁️ Spawning Page Expert for page **{page_num}**') |
| | state.add_analysis(f"👁️ Page Expert searching for {expert_instructions}") |
| | state.add_log(f'📄 Attaching full-page context for page **{page_num}**') |
| | state.add_analysis( |
| | f"📄 Full-page context attached for page `{page_num}`" |
| | ) |
| |
|
| | full_page_img = Image.open( |
| | io.BytesIO(image_bytes_list[page_num]) |
| | ) |
| | state.add_image(full_page_img) |
| | |
| | client = openai.OpenAI() |
| |
|
| | tools = [ |
| | { |
| | "type": "function", |
| | "function": { |
| | "name": "merge_tiles", |
| | "description": "Stitches high-resolution image tiles together into a single zoomed-in view. Use this to read small text, dimensions, or symbols.", |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "tile_indexes": { |
| | "type": "array", |
| | "items": {"type": "integer"}, |
| | "description": "A list of integer tile IDs from the Grid Map to stitch together." |
| | } |
| | }, |
| | "required": ["tile_indexes"] |
| | } |
| | } |
| | } |
| | ] |
| |
|
| | page_text = text_list[page_num] |
| | relevant_tile_meta = tile_metadata[page_num] |
| | b64_full_page = base64.b64encode(image_bytes_list[page_num]).decode() |
| |
|
| | system_prompt = """ |
| | You are a Lead AEC Visual Investigator supporting a Compliance Planner. |
| | |
| | Your mission is to extract **verifiable, high-fidelity evidence** from this drawing page. |
| | You must ground every claim in either: |
| | - a **Zoomed Tile Image** (via `merge_tiles`) or |
| | - a **Direct Text Quote** from the OCR page text. |
| | |
| | Guesses, assumptions, and general descriptions are not allowed. |
| | |
| | ======================== |
| | MANDATORY WORKFLOW |
| | ======================== |
| | 1) ORIENT |
| | - Review the full-page image and the Grid Map to identify candidate regions. |
| | - Decide which tiles likely contain the required evidence. Utilize the tile metadata to assist with this tasl. |
| | |
| | 2) ZOOM (REQUIRED) |
| | - You MUST call `merge_tiles(tile_indexes=[...])` before making ANY factual claim about symbols, dimensions, labels, or locations. |
| | - Always request ALL tiles needed in a SINGLE call. |
| | - If the first zoom is insufficient, call again with additional tiles. |
| | - Call the zoom until you have found all relevant tiles, refer to the tile metadata to assist in your search. |
| | |
| | 3) VERIFY |
| | - Read the zoomed image carefully. |
| | - Extract exact values, tags, room names, and directional cues. |
| | |
| | 4) REPORT |
| | - Return the Findings Packet in strict JSON format. |
| | |
| | ======================== |
| | WHAT COUNTS AS PROOF |
| | ======================== |
| | - Dimension values (e.g., “36\"”, “1 HR RATED”) |
| | - Explicit labels (e.g., “EXIT”, “STAIR A”, “R-2”, “COLUMN C3”) |
| | - Symbol legends that define a mark |
| | - Path continuity that can be visually traced across tiles |
| | - OCR text snippets |
| | |
| | ======================== |
| | FINDINGS RULES |
| | ======================== |
| | - Every bullet in `findings` MUST cite either: |
| | - `[Tile <ID>]` or |
| | - `"Quoted text"` |
| | - If a claim cannot be verified from the zoomed tiles or text, mark it as **Unverified**. |
| | - Be comprehensive in this report, your supervisor only has access to the report you give in findings, not the full page text or other image data you have. |
| | - Do NOT repeat planner instructions — only report what you observe. |
| | |
| | ======================== |
| | VISUAL POINTERS RULES |
| | ======================== |
| | - Exclude orientation-only or whitespace tiles. |
| | - Include ALL tiles needed to re-trace a path or confirm a relationship. |
| | - **Your superviser will ONLY see the tiles that you reference here, be comprehensive when returning these tiles.** |
| | |
| | ======================== |
| | FULL PAGE USEFULNESS |
| | ======================== |
| | Set `true` ONLY if the finding requires spatial context across the entire page, or if your zoom is missing information. |
| | (e.g., tracing egress path, riser continuity, system routing). |
| | Otherwise set `false`. |
| | |
| | ======================== |
| | JSON FORMAT (STRICT) |
| | ======================== |
| | { |
| | "findings": "<markdown string with bullet points and citations>", |
| | "visual_pointers": [list of <int>], |
| | "textual_evidence": ["<exact quotes from PAGE TEXT>"], |
| | "full_page_usefulness": <true|false>, |
| | "limitations": "<what could not be verified and why>" |
| | } |
| | |
| | ======================== |
| | FAILURE CONDITIONS |
| | ======================== |
| | - If no relevant evidence exists on this page, return: |
| | { |
| | "findings": "No relevant technical evidence found for the planner's instruction.", |
| | "visual_pointers": [], |
| | "textual_evidence": [], |
| | "full_page_usefulness": false, |
| | "limitations": "This page does not contain the requested information or it is not legible at available resolution." |
| | } |
| | |
| | Return ONLY valid JSON. |
| | """ |
| |
|
| | messages = [ |
| | {"role": "system", "content": system_prompt}, |
| | { |
| | "role": "user", |
| | "content": [ |
| | {"type": "text", "text": f"Planner Instruction:\n{expert_instructions}"}, |
| | {"type": "text", "text": f"Page Context:\n{page_text}"}, |
| | {"type": "text", "text": f"Available Grid Map:\n{json.dumps(relevant_tile_meta)}"}, |
| | { |
| | "type": "image_url", |
| | "image_url": { |
| | "url": f"data:image/png;base64,{b64_full_page}" |
| | } |
| | } |
| | ] |
| | } |
| | ] |
| |
|
| | MAX_TURNS = 3 |
| |
|
| | for turn in range(MAX_TURNS): |
| | response = client.chat.completions.create( |
| | model="gpt-4o", |
| | messages=messages, |
| | tools=tools, |
| | tool_choice="auto" |
| | ) |
| |
|
| | msg = response.choices[0].message |
| | messages.append(msg) |
| |
|
| | if msg.content: |
| | try: |
| | res = extract_json(msg.content) |
| | |
| | |
| | state.add_analysis( |
| | f"🟨 Page Analyst\n{res.get('findings','')}" |
| | ) |
| | raw_pointers = res.get("visual_pointers", []) |
| | tile_idxs = sanitize_tile_indices(raw_pointers) |
| | |
| | |
| | if tile_idxs and tile_idxs != '[]': |
| | stitched_bytes = merge_tiles( |
| | tile_indexes=tile_idxs, |
| | page_num=page_num |
| | ) |
| | |
| | state.add_log(f'📸 Staging {len(tile_idxs)} tiles for final audit...') |
| | |
| | |
| | state.add_staged_image_part( |
| | types.Part.from_bytes( |
| | data=stitched_bytes, |
| | mime_type="image/png" |
| | ) |
| | ) |
| | |
| |
|
| | stitched_img = Image.open( |
| | io.BytesIO(stitched_bytes) |
| | ) |
| | state.add_image(stitched_img) |
| | |
| | |
| | state.add_staged_image_part( |
| | types.Part.from_bytes( |
| | data=image_bytes_list[page_num], |
| | mime_type="image/png" |
| | ) |
| | ) |
| |
|
| | return res |
| | except: |
| | pass |
| | |
| | if msg.tool_calls: |
| | tool_results = [] |
| | image_blocks = [] |
| | |
| | for call in msg.tool_calls: |
| | if call.function.name == "merge_tiles": |
| | args = json.loads(call.function.arguments) |
| | idxs = args["tile_indexes"] |
| | |
| | stitched_bytes = merge_tiles( |
| | tile_indexes=idxs, |
| | page_num=page_num |
| | ) |
| | |
| | b64_tile = base64.b64encode(stitched_bytes).decode() |
| | |
| | tool_results.append({ |
| | "role": "tool", |
| | "tool_call_id": call.id, |
| | "content": json.dumps({ |
| | "status": "success", |
| | "tiles": idxs |
| | }) |
| | }) |
| | |
| | image_blocks.append( |
| | { |
| | "type": "image_url", |
| | "image_url": { |
| | "url": f"data:image/png;base64,{b64_tile}" |
| | } |
| | } |
| | ) |
| | |
| | for tool_msg in tool_results: |
| | messages.append(tool_msg) |
| | |
| | messages.append({ |
| | "role": "user", |
| | "content": [ |
| | { |
| | "type": "text", |
| | "text": "Here are the high-resolution zooms you requested. Analyze exits, locations, and any capacity labels." |
| | }, |
| | *image_blocks |
| | ] |
| | }) |
| | |
| | continue |
| |
|
| | messages.append({ |
| | "role": "user", |
| | "content": "Return the FINAL JSON now." |
| | }) |
| |
|
| | raise RuntimeError("No FINAL JSON output from Page Expert") |
| |
|
| | |
| | tools_list = [search_page_text, nyc_legal_sub_agent, execute_page_expert] |
| | import time |
| | planner = genai.Client() |
| | planner_model = "gemini-3-pro-preview" |
| | planner_prompt = f""" |
| | You are the Lead Architectural Compliance Planner for NYC Building Code and Zoning review. |
| | |
| | Your role is to coordinate specialist sub-agents and deliver a **proof-carrying compliance verdict** |
| | based ONLY on: |
| | - OCR-extracted drawing text |
| | - High-resolution visual evidence (tile zooms) |
| | - Official NYC Code citations |
| | |
| | You must NOT speculate or rely on architectural norms. |
| | |
| | ======================== |
| | DRAWING INDEX (Page Metadata) |
| | ======================== |
| | Use this index to select pages for visual inspection. |
| | Avoid irrelevant sheets (e.g., Site, Civil, Utility, Stormwater) unless zoning or site compliance is explicitly required. |
| | {json.dumps(page_metadata)} |
| | |
| | ======================== |
| | SPECIALIST SUB-AGENTS |
| | ======================== |
| | None of these agents have access to your chat history or internal thought process. |
| | They know only how to access information (text, images or code) and what information you give them in the research goal. |
| | If they need more context or specific instructions YOU MUST PROVIDE IT WHEN CALLING THEM in the research goal. |
| | |
| | 1) `search_page_text` |
| | Purpose: FAST signal extractor. |
| | Use to identify code-triggering facts: |
| | - Occupancy classification |
| | - Building height / stories / high-rise |
| | - Construction type |
| | - Scope of work (new, alteration, addition, change of occupancy) |
| | - Fire protection systems |
| | Output is used ONLY to constrain legal research. |
| | |
| | 2) `nyc_legal_sub_agent` |
| | Purpose: Definitive legal authority. |
| | Use to retrieve governing NYC Code sections, exceptions, and cross-references. |
| | Always pass a focused topic derived from Phase 1 signals. |
| | **YOU MAY ONLY CALL THIS TOOL ONCE** |
| | |
| | 3) `execute_page_expert` |
| | Purpose: High-resolution visual verification. |
| | Use to confirm compliance or non-compliance by zooming tiles. |
| | This agent provides the ONLY acceptable visual proof. |
| | **NEVER CALL THIS TOOL MORE THAN ONCE ON A SINGLE PAGE** |
| | |
| | ======================== |
| | MANDATORY PHASED WORKFLOW |
| | ======================== |
| | PHASE 1 — SIGNAL EXTRACTION |
| | - Use `search_page_text` on candidate pages to determine: |
| | occupancy, height, construction type, system presence, and scope. |
| | - If signals are missing or ambiguous, expand to additional pages. |
| | - Do NOT proceed until you have enough facts to define legal applicability. |
| | |
| | PHASE 2 — LEGAL SCOPING |
| | - Convert Phase 1 signals into a focused legal topic. |
| | - Call `nyc_legal_sub_agent`. |
| | - Extract governing sections, exceptions, and edge cases. |
| | |
| | PHASE 3 — VISUAL VERIFICATION |
| | - Identify the SINGLE most relevant page for proof. |
| | - Call `execute_page_expert` with precise instructions tied to legal requirements |
| | (e.g., “Verify exit door clear width at Stair A serving R-2 occupancy”). |
| | - Ensure returned findings include tile IDs and/or text quotes. |
| | |
| | PHASE 4 — SYNTHESIS & VERDICT |
| | - Compare visual findings directly against legal requirements. |
| | - Resolve conflicts: |
| | - If legal text and visual evidence disagree → flag as **Non-Compliant or Ambiguous** |
| | - If evidence is missing → flag as **Unverified** |
| | - Cite both: |
| | - NYC Code Section(s) |
| | - Tile ID(s) or OCR quotes |
| | |
| | **NEVER CALL THE SAME AGENT FOR THE SAME TASK TWICE REFER TO PREVIOUS ANSWERS WHEN ABLE** |
| | **NEVER CALL THE PAGE EXPERT TWICE ON THE SAME PAGE** |
| | |
| | ======================== |
| | FINAL OUTPUT FORMAT (STRICT MARKDOWN) |
| | ======================== |
| | ### Compliance Verdict |
| | **Status:** Compliant | Non-Compliant | Unverified | Ambiguous |
| | |
| | ### Legal Basis |
| | - **[Code Type] §[Section] — [Title]** |
| | - Requirement: |
| | - Exceptions Considered: |
| | |
| | ### Visual Evidence |
| | - Finding: <short statement> |
| | - Proof: [Tile ID(s)] or "Quoted OCR Text" |
| | |
| | ### Reasoning |
| | - Step-by-step comparison between legal requirement and observed condition |
| | |
| | ### Limitations |
| | - What could not be verified and why |
| | |
| | ======================== |
| | CONTROL RULES |
| | ======================== |
| | - NEVER call `nyc_legal_sub_agent` before `search_page_text` |
| | - NEVER issue a final verdict without calling `execute_page_expert` |
| | - If no page contains sufficient proof, return **Unverified** |
| | - Prefer false negatives over false positives |
| | *** CRITICAL VISUAL PROTOCOL *** |
| | - When `execute_page_expert` returns, it will explicitly state "VISUAL_PROOF_PENDING". |
| | - When you see this, your ONLY response must be: "Awaiting visual proof." |
| | - DO NOT attempt to guess the verdict. |
| | - DO NOT complain about missing images. |
| | - Simply wait. The user will immediately send the images in the next turn. |
| | |
| | |
| | ======================== |
| | QUALITY STANDARD |
| | ======================== |
| | This output should be defensible to a DOB plan examiner or legal reviewer. |
| | Every claim must be traceable to law and evidence. |
| | """ |
| |
|
| | config = types.GenerateContentConfig( |
| | system_instruction=planner_prompt, |
| | tools=tools_list |
| | ) |
| |
|
| | chat = planner.chats.create(model=planner_model, config=config) |
| |
|
| |
|
| | def agent_worker(user_question): |
| | state.clear() |
| | state.add_log(f'🚀 Starting analysis for: **{user_question}**') |
| | state.add_analysis("🧠 Planner initialized. Awaiting tool calls...") |
| |
|
| | |
| | chat = planner.chats.create(model=planner_model, config=config) |
| | response = chat.send_message(user_question) |
| | |
| | |
| |
|
| | |
| | while response.candidates[0].content.parts[0].function_call: |
| | tool_responses = [] |
| |
|
| | for part in response.candidates[0].content.parts: |
| | if part.function_call: |
| | name = part.function_call.name |
| | args = part.function_call.args |
| | state.add_log(f'🛠️ Tool Call: **{name}**') |
| |
|
| | func = globals()[name] |
| | result = func(**args) |
| |
|
| | tool_responses.append( |
| | types.Part.from_function_response(name=name, response={"result": result}) |
| | ) |
| |
|
| | |
| | response = chat.send_message(tool_responses) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | audit_images = state.get_staged_images() |
| |
|
| | if audit_images: |
| | state.add_log(f"👁️ Preliminary answer received. Performing audit with {len(audit_images)} images...") |
| |
|
| | |
| | |
| | audit_parts = [ |
| | types.Part.from_text( |
| | text="You have provided a preliminary verdict. Now, look at these images " |
| | "to verify your findings. If the visual evidence contradicts your " |
| | "text-based search, update your verdict now. " |
| | ), |
| | *audit_images |
| | ] |
| | |
| | try: |
| | |
| | |
| | final_response = chat.send_message(audit_parts) |
| | |
| | state.final_answer = final_response.text |
| | |
| | except Exception as e: |
| | |
| | state.add_log("🔄 Retrying audit with explicit message keyword...") |
| | final_response = chat.send_message(message=audit_parts) |
| | state.final_answer = final_response.text |
| | |
| | else: |
| | state.add_log("⚠️ No images found in state. Skipping visual audit.") |
| | state.final_answer = response.text |
| | |
| | state.add_log('🏁 **ANALYSIS COMPLETE**') |
| | state.done = True |
| |
|
| | |
| | def run_agentic_workflow(user_question, profile: gr.OAuthProfile | None): |
| | uid = user_id_from_profile(profile) |
| | if uid is None: |
| | raise gr.Error("Please sign in with Hugging Face to use this demo.") |
| |
|
| | allowed, remaining = check_and_increment_quota(uid) |
| | if not allowed: |
| | raise gr.Error(f"Usage limit reached: {MAX_RUNS_PER_USER} runs per user.") |
| | |
| | if remaining <= 2: |
| | gr.Warning(f"⚠️ Only {remaining} run(s) left!") |
| | else: |
| | gr.Info(f"✓ Runs remaining: {remaining}") |
| | |
| | state.done = False |
| | state.final_answer = "" |
| |
|
| | thread = threading.Thread( |
| | target=agent_worker, |
| | args=(user_question,), |
| | daemon=True |
| | ) |
| | thread.start() |
| |
|
| | while not state.done: |
| | with state.lock: |
| | logs = "\n\n".join(state.log_messages) |
| | analysis = "\n\n".join(state.analysis_messages) |
| | chapter = state.current_chapter |
| | images = list(state.current_images) |
| |
|
| | yield ( |
| | logs, |
| | analysis, |
| | chapter, |
| | images, |
| | "*Analysis in progress...*" |
| | ) |
| | time.sleep(0.25) |
| |
|
| | with state.lock: |
| | logs = "\n\n".join(state.log_messages) |
| | analysis = "\n\n".join(state.analysis_messages) |
| | chapter = state.current_chapter |
| | images = list(state.current_images) |
| | final = state.final_answer |
| |
|
| | yield ( |
| | logs, |
| | analysis, |
| | chapter, |
| | images, |
| | final |
| | ) |
| |
|
| |
|
| | |
| | with gr.Blocks(title="AEC Compliance Agent") as demo: |
| | gr.LoginButton() |
| | |
| | gr.Markdown("# 🏗️ AEC Compliance Analysis Agent") |
| | gr.Markdown("Ask questions about NYC Building Code compliance for your construction drawings.") |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | question_input = gr.Textbox( |
| | label="Your Question", |
| | placeholder="e.g., Does this building comply with egress requirements for 738 occupants?", |
| | lines=3 |
| | ) |
| | submit_btn = gr.Button("🔍 Analyze", variant="primary", size="lg") |
| | |
| | gr.Markdown("### 📋 Analysis Log") |
| | log_output = gr.Markdown(value="", height=400) |
| | |
| | with gr.Column(scale=1): |
| | gr.Markdown("### 🧠 Sub-Agent Analysis") |
| | analysis_output = gr.Markdown(value="", height=600) |
| | |
| | with gr.Column(scale=1): |
| | gr.Markdown("### 📖 Code Chapter") |
| | chapter_output = gr.Markdown(value="*No chapter loaded yet*", height=600) |
| | |
| | with gr.Row(): |
| | gr.Markdown("### 🖼️ Retrieved Images") |
| | |
| | with gr.Row(): |
| | image_gallery = gr.Gallery( |
| | label="Visual Evidence", |
| | show_label=True, |
| | columns=2, |
| | height=400, |
| | object_fit="contain" |
| | ) |
| | |
| | with gr.Row(): |
| | gr.Markdown("### ✅ Final Compliance Verdict") |
| | |
| | with gr.Row(): |
| | final_output = gr.Markdown(value="*Analysis pending...*") |
| | |
| | submit_btn.click( |
| | fn=run_agentic_workflow, |
| | inputs=[question_input], |
| | outputs=[ |
| | log_output, |
| | analysis_output, |
| | chapter_output, |
| | image_gallery, |
| | final_output |
| | ] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.queue().launch( |
| | inbrowser=True |
| | ) |