| | |
| | """ |
| | Created on Fri Dec 5 12:28:34 2025 |
| | |
| | @author: rmd2219 |
| | """ |
| | import os, json, io, re, time |
| | from PIL import Image |
| | import pandas as pd |
| | from google import genai |
| | from google.genai import types |
| | import gradio as gr |
| | import hashlib |
| | import numpy as np |
| | from huggingface_hub import hf_hub_download, HfApi |
| | from huggingface_hub.utils import EntryNotFoundError |
| |
|
| | USAGE_DATASET_REPO = os.environ.get("USAGE_DATASET_REPO", "NYSERDA-CRE-Working-Group/nyserda_demo_useage_store") |
| | USAGE_FILENAME = os.environ.get("USAGE_FILENAME", "usage.csv") |
| | MAX_RUNS_PER_USER = int(os.environ.get("MAX_RUNS_PER_USER", "10")) |
| |
|
| |
|
| | os.environ["GEMINI_API_KEY"] = os.environ.get("GEMINI_API_KEY") |
| | HF_TOKEN = os.environ.get("HF_TOKEN") |
| |
|
| | api = HfApi(token=HF_TOKEN) |
| |
|
| | def user_id_from_profile(profile: gr.OAuthProfile | None) -> str | None: |
| | if profile is None: |
| | return None |
| | |
| | |
| | uid = getattr(profile, "name", None) |
| | if not uid: |
| | return None |
| | return uid.strip().lower() |
| |
|
| | def _load_usage_df() -> pd.DataFrame: |
| | try: |
| | local_path = hf_hub_download( |
| | repo_id=USAGE_DATASET_REPO, |
| | repo_type="dataset", |
| | filename=USAGE_FILENAME, |
| | token=HF_TOKEN, |
| | ) |
| | return pd.read_csv(local_path) |
| | except EntryNotFoundError: |
| | |
| | return pd.DataFrame(columns=["user_id", "runs", "first_seen", "last_seen"]) |
| |
|
| | def _save_usage_df(df: pd.DataFrame, commit_message: str) -> None: |
| | tmp_path = "/tmp/usage.csv" |
| | df.to_csv(tmp_path, index=False) |
| |
|
| | api.upload_file( |
| | path_or_fileobj=tmp_path, |
| | path_in_repo=USAGE_FILENAME, |
| | repo_id=USAGE_DATASET_REPO, |
| | repo_type="dataset", |
| | commit_message=commit_message, |
| | ) |
| |
|
| | def check_and_increment_quota(user_id: str) -> tuple[bool, int]: |
| | now = int(time.time()) |
| | df = _load_usage_df() |
| |
|
| | if df.empty or (df["user_id"] == user_id).sum() == 0: |
| | runs = 0 |
| | if runs >= MAX_RUNS_PER_USER: |
| | return False, 0 |
| | new_row = { |
| | "user_id": user_id, |
| | "runs": 1, |
| | "first_seen": now, |
| | "last_seen": now, |
| | } |
| | df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) |
| | _save_usage_df(df, commit_message=f"usage: increment {user_id} to 1") |
| | return True, MAX_RUNS_PER_USER - 1 |
| |
|
| | idx = df.index[df["user_id"] == user_id][0] |
| | runs = int(df.loc[idx, "runs"]) |
| |
|
| | if runs >= MAX_RUNS_PER_USER: |
| | return False, 0 |
| |
|
| | runs += 1 |
| | df.loc[idx, "runs"] = runs |
| | df.loc[idx, "last_seen"] = now |
| |
|
| | _save_usage_df(df, commit_message=f"usage: increment {user_id} to {runs}") |
| | return True, MAX_RUNS_PER_USER - runs |
| |
|
| | |
| |
|
| | tile_metadata = json.load(open("tile_metadata.json")) |
| | global_context = json.load(open("global_context.json")) |
| | text_list = json.load(open("text_list.json")) |
| | page_metadata = json.load(open("page_metadata.json")) |
| |
|
| | def load_fullpage_images(folder="Images"): |
| | files = os.listdir(folder) |
| |
|
| | |
| | page_files = [] |
| | for f in files: |
| | match = re.search(r"page_(\d+)_fullpage\.png", f) |
| | if match: |
| | page_num = int(match.group(1)) |
| | page_files.append((page_num, f)) |
| |
|
| | |
| | page_files.sort(key=lambda x: x[0]) |
| |
|
| | |
| | image_bytes_list = [] |
| | for page_num, filename in page_files: |
| | path = os.path.join(folder, filename) |
| | with open(path, "rb") as f: |
| | img_bytes = f.read() |
| | image_bytes_list.append(img_bytes) |
| |
|
| | return image_bytes_list |
| |
|
| | def load_cropped_images(folder="Images"): |
| | files = os.listdir(folder) |
| |
|
| | |
| | page_files = [] |
| | for f in files: |
| | match = re.search(r"page_(\d+)_drawing\.png", f) |
| | if match: |
| | page_num = int(match.group(1)) |
| | page_files.append((page_num, f)) |
| |
|
| | |
| | page_files.sort(key=lambda x: x[0]) |
| |
|
| | |
| | image_bytes_list = {} |
| | for page_num, filename in page_files: |
| | path = os.path.join(folder, filename) |
| | with open(path, "rb") as f: |
| | img_bytes = f.read() |
| | image_bytes_list[page_num] = img_bytes |
| |
|
| | return image_bytes_list |
| |
|
| | def load_tile_images(page): |
| | files = os.listdir('Tiles') |
| |
|
| | |
| | page_files = [] |
| | for f in files: |
| | match = re.search(f"page_{page}_tile_(\d+)\.png", f) |
| | if match: |
| | page_num = int(match.group(1)) |
| | page_files.append((page_num, f)) |
| |
|
| | |
| | page_files.sort(key=lambda x: x[0]) |
| |
|
| | |
| | image_bytes_list = [] |
| | for page_num, filename in page_files: |
| | path = os.path.join('Tiles', filename) |
| | with open(path, "rb") as f: |
| | img_bytes = f.read() |
| | image_bytes_list.append(img_bytes) |
| |
|
| | return image_bytes_list |
| |
|
| | image_bytes_list = load_fullpage_images() |
| | cropped_bytes_list = load_cropped_images() |
| |
|
| | tile_bytes = {} |
| | for page in range(44): |
| | tile_list = load_tile_images(page) |
| | if tile_list: |
| | tile_bytes[page] = load_tile_images(page) |
| |
|
| | class DrawingChatbot: |
| | def __init__(self, model_name, global_context, global_metadata, tile_metadata, text_data, image_data, cropped_image_data, tile_data): |
| | """ |
| | Initializes the Dual-Model Chatbot. |
| | - Model A (Investigator): Forced to use tools to gather data. |
| | - Model B (Analyst): Pure text model that interprets the findings. |
| | """ |
| | self.client = genai.Client() |
| | self.text_data = text_data |
| | self.full_page_data = image_data |
| | self.cropped_image_data = cropped_image_data |
| | self.tiles_data = tile_data |
| | self.tile_metadata = tile_metadata |
| | self.model_name = model_name |
| | |
| | |
| | |
| | |
| | self.tools = [ |
| | types.Tool( |
| | function_declarations=[ |
| | |
| | types.FunctionDeclaration( |
| | name="get_page_by_index", |
| | description="Fetch PDF page text and image as well as tile metadata.", |
| | parameters=types.Schema( |
| | type="object", |
| | properties={"index": types.Schema(type="integer")}, |
| | required=["index"] |
| | ) |
| | ), |
| | types.FunctionDeclaration( |
| | name="get_tiles", |
| | description="Fetch tile images by indices.", |
| | parameters=types.Schema( |
| | type="object", |
| | properties={ |
| | "page_num": types.Schema(type="integer"), |
| | "indices": types.Schema(type="array", items=types.Schema(type="integer")) |
| | }, |
| | required=["page_num", "indices"] |
| | ) |
| | ), |
| | |
| | types.FunctionDeclaration( |
| | name="investigation_complete", |
| | description="Call this ONLY when you have gathered sufficient information to answer the user's question.", |
| | parameters=types.Schema( |
| | type="object", |
| | properties={"reason": types.Schema(type="string")}, |
| | required=["reason"] |
| | ) |
| | ) |
| | ] |
| | ) |
| | ] |
| |
|
| | |
| | |
| | |
| | investigator_system_prompt = (f''' |
| | You are a Data Retrieval Worker for MEP drawings. You will be gathering data to answer the users question. |
| | Your ONLY job is to call tools to gather information to answer the user's prompt. |
| | |
| | You have access to: |
| | - Global context about how these drawings are structured. |
| | - Metadata for every page. |
| | - High-resolution page images and tile images. |
| | |
| | Proper Workflow: |
| | 1. Consider the users question and examine the page metadata in detail. |
| | - Identify all potentially relevant pages that may assist in your answer |
| | 2. Call the pages one or two at a time in order of which one is most likely to be helpful for the users question. |
| | - The pages will return the visual as well as detailed metadata of what tiles ar included in the page and what is in each tile |
| | - Follow up this query by locating the specific tiles releevant to the users question as they are invaluable when answering detailed questions. |
| | - ALWAYS CALL TILES IF YOU CAN FIND RELEVANT ONES TO THE USERS QUESTION. |
| | 3. Consider weather or not the previously queried information will be enough to answer the users question. If so call the investigation_complete function, if not return to step 1. |
| | |
| | RULES: |
| | 1. You CANNOT speak to the user. You can ONLY call tools. |
| | 2. You are in FORCE TOOL mode. |
| | 3. When you have gathered enough information from pages/tiles, you MUST call 'investigation_complete'. |
| | 4. If comparing two states (e.g. Demolition vs New Work), retrieve BOTH before calling complete. |
| | 5. Tile images are **MANDATORY** if feasible to provide them. Always request ALL potentially relevant tiles, not just the most relevant. |
| | 6. tiles will be stitched together before being passed to the user. If you need to call non-touching tiles call them in seperate tool calls. |
| | - eg. [21,22,23] okay (vertically stacked tiles are fine as well) |
| | - eg [4, 22] not allowed |
| | |
| | Global Context: {json.dumps(global_context)} |
| | Page Metadata: {json.dumps(global_metadata)} |
| | |
| | **REMEMBER: ALWAYS CALL TILES** |
| | ALso - if you are describing anything visual be sure to search for the legend to explain symbols and callouts. Also supply this image to the analyst. |
| | ''') |
| |
|
| | self.investigator_config = types.GenerateContentConfig( |
| | system_instruction=investigator_system_prompt, |
| | tools=self.tools, |
| | tool_config=types.ToolConfig( |
| | function_calling_config=types.FunctionCallingConfig(mode="ANY") |
| | ) |
| | ) |
| | self.investigator_chat = self.client.chats.create( |
| | model=self.model_name, |
| | config=self.investigator_config |
| | ) |
| |
|
| | |
| | |
| | |
| | self.analyst_system_prompt = (f''' |
| | You are an industry expert in architecture, MEP engineering, structural engineering, and construction documentation. (Analyst). |
| | You will receive a user question and a detailed LOG of an investigation performed by a worker. |
| | You will receive stitched together tiles that show a detailed close up view of key areas of the image. These are your superpower, always give them alot of attention. |
| | The results of the get_tiles tool call is one image of all requested tiles stitched together so there are no overlap concerns between tiles. |
| | Tile metadata should be used to help you examine the tiles but the stitched together image should be the ground truth, always use this when counting equipment, examining layouts, etc. |
| | |
| | ex. Count peices of equipment. |
| | 1. Examine the iamage of the stitched together tile. |
| | - Note unique visual landmarks near each item (e.g., 'near the left door', 'near the right wall'). |
| | 2. Identify: Identify all potentially relevant markers and symbols and prepare to describe them to the user. |
| | 3. Determine: Determine which symbols correspond to unique peices of equipment. |
| | - Cross check across document text given to you for symbol meanings and consistency in your report. |
| | 3. Final Count: What is the total number of unique equipment peices in the room?" |
| | |
| | Your job is to read the logs (which contain text data and image analysis) and answer the user's question. |
| | |
| | Your goals: |
| | 1. Give the user an accurate, detailed, spatially-grounded interpretation of the drawings. |
| | 2. Provide high-level reasoning steps (visible chain-of-thought) without revealing internal scratchpads. |
| | 3. Avoid contradictions in spatial flow analysis (e.g., tracing a duct from a source only to discover later it did not connect). |
| | β Before answering, verify the entire path by examining relevant tiles. |
| | |
| | When you receive a question: |
| | 1. **Restate** the user question in brief. |
| | 2. **Perform spatial reasoning using a structured approach:** |
| | - Identify the component(s). |
| | - Locate their exact position(s) on the drawing using tile metadata (x/y, bounding boxes, labels). |
| | - Verify all upstream/downstream connections before describing them. |
| | - Trace flows fully BEFORE writing your final answer. |
| | - Give the user a detailed description of spatial landmarks that are relavant to what you are discussing so they can reference the drawing themselves while reading your response. |
| | |
| | NOTES: |
| | - Do a detailed search of all information given to you for relevant information before answering. |
| | - Attempt to locate a legend or notes list to help you decifer what is going on visually in the drawing. |
| | |
| | 3. **Answer formatting**: |
| | - **Provide clear references: βLocated at the upper-left quadrant near Room 212,β βSouth of AHU-1,β etc. |
| | - ALWAYS explain relevant equipment, numbers and locations |
| | - ALWAYS explain the callout or symbol relevant to what you are discussing and describe it visually. |
| | - Never give vague answers eg. 'exisiting ductwork', instead say 'the ductwork leading from x to y is remiainign in place and connected to diffuser z shown as _ symbol in the drawing'. |
| | - This includes symbols (eg. hexagonal, callouts, squares) for equipment, boxes, etc. ALWAYS describe what every relevent symbol looks like. |
| | - Describe flow paths step-by-step in spatial order (e.g., βfrom AHU-1 β main supply riser β branch duct β Gymnasiumβ) This is NOT the actual path followed in the drawing. |
| | - If tiles were inspected, briefly mention which tiles provided key details. |
| | - Always mention both the page index and page name that you can find the information you are discussing on. |
| | |
| | ***IMPORTANT** |
| | If you are unsure about something say so. Never make assumptions about details. When making statements site where you found it in the drawing, what symbol is relevant, etc. |
| | If somethign is unclear state that. |
| | |
| | Here is the original metadata: |
| | Global Context: {json.dumps(global_context)} |
| | Page Metadata: {json.dumps(global_metadata)} |
| | ''') |
| | |
| | self.analyst_config = types.GenerateContentConfig( |
| | system_instruction=self.analyst_system_prompt, |
| | thinking_config=types.ThinkingConfig(thinking_level="high"), |
| | |
| | ) |
| | |
| | self.analyst_chat = self.client.chats.create( |
| | model=self.model_name, |
| | config=self.analyst_config |
| | ) |
| | |
| | self.investigation_logs = [] |
| | self.collected_images = [] |
| | self.analyst_image_parts = [] |
| | self.raw_tool_responses = [] |
| |
|
| | |
| | |
| | |
| | def _get_page_by_index(self, index): |
| | if index < 0 or index >= len(self.text_data): |
| | raise ValueError(f"Page index {index} out of bounds.") |
| | |
| | try: |
| | return { |
| | "text": f"PAGE {index} TEXT:\n{self.text_data[index]}\nTILE META:\n{json.dumps(self.tile_metadata.get(str(index), {}))}", |
| | "image_bytes": self.cropped_image_data[index], |
| | "mime_type": "image/png" |
| | } |
| | except: |
| | return { |
| | "text": f"PAGE {index} TEXT:\n{self.text_data[index]}\n", |
| | "image_bytes": self.full_page_data[index], |
| | "mime_type": "image/png" |
| | } |
| |
|
| | def _get_tiles(self, page_num, indicies): |
| | """ |
| | Retrieves specific tiles, stitches them into a single image, |
| | and returns the combined image bytes. |
| | """ |
| | images = [] |
| | positions = [] |
| | |
| | |
| | for index in indicies: |
| | |
| | if page_num < len(self.tiles_data) and index < len(self.tiles_data[page_num]): |
| | |
| | |
| | img_bytes = self.tiles_data[page_num][index] |
| | |
| | |
| | img = Image.open(io.BytesIO(img_bytes)) |
| | images.append(img) |
| | |
| | |
| | |
| | |
| | try: |
| | |
| | |
| | coords = self.tile_metadata[str(page_num)][str(index)]['coords'] |
| | |
| | |
| | positions.append((int(coords[0]), int(coords[1]))) |
| | except (KeyError, IndexError, AttributeError): |
| | raise ValueError(f"Metadata/Coordinates missing for Tile {index} on Page {page_num}") |
| | |
| | else: |
| | raise ValueError(f"Tile {index} on Page {page_num} not found.") |
| | |
| | if not images: |
| | return {} |
| | |
| | |
| | |
| | min_x = min(x for x, y in positions) |
| | min_y = min(y for x, y in positions) |
| | |
| | |
| | normalized_positions = [(x - min_x, y - min_y) for x, y in positions] |
| | |
| | |
| | |
| | total_width = max(pos[0] + img.width for pos, img in zip(normalized_positions, images)) |
| | total_height = max(pos[1] + img.height for pos, img in zip(normalized_positions, images)) |
| | |
| | |
| | |
| | stitched_image = Image.new('RGB', (total_width, total_height), (255, 255, 255)) |
| | |
| | for img, pos in zip(images, normalized_positions): |
| | stitched_image.paste(img, pos) |
| | |
| | output_buffer = io.BytesIO() |
| | stitched_image.save(output_buffer, format='PNG') |
| | stitched_bytes = output_buffer.getvalue() |
| | |
| | |
| | |
| | |
| | return { |
| | "stitched_image": { |
| | "image_bytes": stitched_bytes, |
| | "mime_type": "image/png" |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | def ask(self, question): |
| | |
| | if self.collected_images: |
| | yield ( |
| | f"\nπ STARTING INVESTIGATION PROCESS for: {question}", |
| | self.collected_images, |
| | None |
| | ) |
| | else: |
| | yield ( |
| | f"\nπ STARTING INVESTIGATION PROCESS for: {question}", |
| | None, |
| | None |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | try: |
| | |
| | response = self.investigator_chat.send_message(question + "Remember to always ask for ALL tiles associated with the question!") |
| | except Exception as e: |
| | return f"β οΈ Critical Error starting investigation: {e}" |
| |
|
| | max_steps = 15 |
| | step = 0 |
| | investigation_done = False |
| |
|
| | while not investigation_done and step < max_steps: |
| | step += 1 |
| | |
| | |
| | if response.text: |
| | log_entry = f"π΅οΈ Investigator Thought: {response.text.strip()}" |
| | self.investigation_logs.append(log_entry) |
| |
|
| | if not response.function_calls: |
| | |
| | self.investigation_logs.append("β οΈ Investigator returned no tools in ANY mode.") |
| | break |
| |
|
| | parts_to_send_back = [] |
| | |
| | for tool_call in response.function_calls: |
| | name = tool_call.name |
| | args = tool_call.args |
| | self.investigation_logs.append(f" > Executing Tool: {name}") |
| |
|
| | |
| | if name == "investigation_complete": |
| | self.investigation_logs.append("β
Investigator signaled completion.") |
| | self.investigation_logs.append(f"π ACTION: Investigation Complete. Reason: {args.get('reason')}") |
| | investigation_done = True |
| | break |
| |
|
| | |
| | result_log = "" |
| | function_result_dict = {} |
| | image_parts = [] |
| |
|
| | try: |
| | if name == "get_page_by_index": |
| | idx = int(args["index"]) |
| | self.investigation_logs.append(f'Fetching page {idx}') |
| | data = self._get_page_by_index(idx) |
| | function_result_dict = {"text": data["text"], "status": "success"} |
| | self.raw_tool_responses.append({ |
| | "tool_name": name, |
| | "args": args, |
| | "raw_result": data["text"], |
| | }) |
| | |
| | image_parts.append(types.Part.from_bytes(data=data["image_bytes"], mime_type=data["mime_type"])) |
| | img = Image.open(io.BytesIO(data["image_bytes"])) |
| | self.collected_images.append(img) |
| | self.analyst_image_parts.append(types.Part.from_bytes(data=data["image_bytes"], mime_type=data["mime_type"])) |
| | |
| | |
| | result_log = f"Fetched Page {idx}. Text length: {len(data['text'])} chars." |
| |
|
| | elif name == "get_tiles": |
| | p_num = int(args["page_num"]) |
| | |
| | raw_indices = args["indices"] |
| | self.investigation_logs.append(f'Fetching tiles {raw_indices} from page {p_num}') |
| | t_indices = [int(raw_indices)] if isinstance(raw_indices, (int, str)) else [int(x) for x in raw_indices] |
| | |
| | tile_dict = self._get_tiles(p_num, t_indices) |
| | function_result_dict = {"status": "success", "tiles": list(tile_dict.keys())} |
| | |
| | for t_id, t_data in tile_dict.items(): |
| | image_parts.append(types.Part.from_bytes(data=t_data["image_bytes"], mime_type=t_data["mime_type"])) |
| | self.analyst_image_parts.append(types.Part.from_bytes(data=t_data["image_bytes"], mime_type=t_data["mime_type"])) |
| | img = Image.open(io.BytesIO(t_data["image_bytes"])) |
| | self.collected_images.append(img) |
| | |
| | result_log = f"Fetched Tiles {t_indices} from Page {p_num}." |
| |
|
| | else: |
| | raise ValueError(f"Unknown tool: {name}") |
| |
|
| | except Exception as e: |
| | self.investigation_logs.append(f" β Tool Error: {e}") |
| | function_result_dict = {"error": str(e)} |
| | result_log = f"Error executing {name}: {e}" |
| |
|
| | |
| | self.investigation_logs.append(f"π§ ACTION: Called {name} with {args}") |
| | self.investigation_logs.append(f" RESULT: {result_log}") |
| |
|
| | |
| | parts_to_send_back.append( |
| | types.Part.from_function_response( |
| | name=name, |
| | response={"result": function_result_dict} |
| | ) |
| | ) |
| | parts_to_send_back.extend(image_parts) |
| | |
| | yield ( |
| | "\n".join(self.investigation_logs), |
| | self.collected_images, |
| | None |
| | ) |
| |
|
| | |
| | if investigation_done: |
| | break |
| |
|
| | |
| | try: |
| | |
| | response = self.investigator_chat.send_message(parts_to_send_back) |
| | except Exception as e: |
| | yield ( |
| | "\n".join(self.investigation_logs), |
| | self.collected_images, |
| | f"β οΈ Investigator API Error: {e}" |
| | ) |
| | |
| | |
| | yield ( |
| | "\nπ Investigation finished. Handing over to Analyst...", |
| | self.collected_images, |
| | None |
| | ) |
| | |
| | |
| | final_prompt = ( |
| | f"USER QUESTION: {question}\n\n" |
| | f"=== INVESTIGATION LOGS ===\n" |
| | + "\n".join(self.investigation_logs) + "\n" |
| | f"==========================\n\n" |
| | f"=== RAW TOOL RESPONSES ===\n" |
| | + json.dumps(self.raw_tool_responses, indent=2) + "\n" |
| | f"===========================\n\n" |
| | "Based on the logs and raw tool outputs above, provide a detailed answer to the user." |
| | ) |
| | |
| | print(len(final_prompt)) |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.analyst_contents = [types.Part.from_text(text=final_prompt)] |
| | |
| | self.analyst_contents.extend(self.analyst_image_parts) |
| | |
| | try: |
| | |
| | analyst_response = self.analyst_chat.send_message(self.analyst_contents) |
| | |
| | yield ( |
| | "\n".join(self.investigation_logs), |
| | self.collected_images, |
| | analyst_response.text |
| | ) |
| | |
| | except Exception as e: |
| | yield ( |
| | "\n".join(self.investigation_logs), |
| | self.collected_images, |
| | f"β οΈ Analyst API Error: {e}" |
| | ) |
| | |
| | |
| | |
| | |
| | BOT_ARGS = dict( |
| | model_name="gemini-3-pro-preview", |
| | global_context=global_context, |
| | global_metadata=page_metadata, |
| | tile_metadata=tile_metadata, |
| | text_data=text_list, |
| | image_data=image_bytes_list, |
| | cropped_image_data=cropped_bytes_list, |
| | tile_data=tile_bytes |
| | ) |
| |
|
| | |
| | bot = DrawingChatbot(**BOT_ARGS) |
| |
|
| | ''' |
| | # 3. Start the Conversation Loop |
| | print("--- Construction Bot Ready (Type 'quit' to exit) ---") |
| | |
| | while True: |
| | user_input = input("\nYou: ") |
| | if user_input.lower() in ["quit", "exit"]: |
| | break |
| | |
| | try: |
| | answer = bot.ask(user_input) |
| | print(f"Bot: {answer}") |
| | except Exception as e: |
| | print(f"Error: {e}") |
| | ''' |
| | |
| | def reset_bot(): |
| | global bot |
| |
|
| | |
| | bot = DrawingChatbot(**BOT_ARGS) |
| |
|
| | |
| | return None, "π Bot has been reset. Ask a new question!" |
| |
|
| | def to_bytes(img): |
| | """Convert literally any image-like object to bytes safely.""" |
| |
|
| | |
| | if isinstance(img, (bytes, bytearray)): |
| | return bytes(img) |
| |
|
| | |
| | if isinstance(img, Image.Image): |
| | buf = io.BytesIO() |
| | img.save(buf, format="PNG") |
| | return buf.getvalue() |
| |
|
| | |
| | if isinstance(img, np.ndarray): |
| | pil = Image.fromarray(img) |
| | buf = io.BytesIO() |
| | pil.save(buf, format="PNG") |
| | return buf.getvalue() |
| |
|
| | |
| | if isinstance(img, dict): |
| | |
| | if "bytes" in img: |
| | return img["bytes"] |
| |
|
| | |
| | return repr(img).encode("utf-8") |
| |
|
| |
|
| | def hash_bytes(b): |
| | """Hash only after ensuring `b` is truly bytes.""" |
| | return hashlib.md5(b).hexdigest() |
| |
|
| | |
| | def run_investigation(question, profile: gr.OAuthProfile | None): |
| | """ |
| | Streams gallery + text while preventing duplicates. |
| | """ |
| | uid = user_id_from_profile(profile) |
| | if uid is None: |
| | raise gr.Error("Please sign in with Hugging Face to use this demo.") |
| |
|
| | allowed, remaining = check_and_increment_quota(uid) |
| | if not allowed: |
| | raise gr.Error(f"Usage limit reached: {MAX_RUNS_PER_USER} runs per user.") |
| | |
| | if remaining <= 2: |
| | gr.Warning(f"β οΈ Only {remaining} run(s) left!") |
| | else: |
| | gr.Info(f"β Runs remaining: {remaining}") |
| | |
| | all_images = [] |
| | seen = set() |
| |
|
| | for logs, images, answer in bot.ask(question): |
| |
|
| | if images: |
| | for img in images: |
| |
|
| | |
| | img_bytes = to_bytes(img) |
| |
|
| | |
| | h = hash_bytes(img_bytes) |
| |
|
| | |
| | if h not in seen: |
| | seen.add(h) |
| | all_images.append(img) |
| |
|
| | text = answer if answer else logs |
| | yield all_images, text |
| | |
| |
|
| | |
| | custom_css = """ |
| | .custom-btn, |
| | .custom-btn button, |
| | button.custom-btn { |
| | border: 2px solid #222 !important; |
| | background: #FFF8DC !important; |
| | color: black !important; |
| | border-radius: 6px !important; |
| | font-weight: 600 !important; |
| | padding: 6px 14px !important; |
| | box-shadow: 0 0 0 1px #ccc !important; |
| | transition: 0.2s ease-in-out; |
| | } |
| | |
| | .custom-btn:hover, |
| | button.custom-btn:hover { |
| | border-color: #2563eb !important; |
| | background: white !important; |
| | box-shadow: 0px 0px 6px rgba(37, 99, 235, 0.5) !important; |
| | color: black !important; |
| | } |
| | """ |
| | with gr.Blocks(css=custom_css) as demo: |
| | gr.LoginButton() |
| | with gr.Group(visible=True) as main_app: |
| | gr.Markdown(""" |
| | # ποΈ MEP Drawing Investigator |
| | Ask a question about the drawing below. |
| | |
| | Retreived Images will appear live as they are retrieved. |
| | """) |
| | |
| | reset_btn = gr.Button("π Reset Bot", elem_classes="custom-btn") |
| | |
| | gr.DownloadButton( |
| | label="π Download Drawings PDF", |
| | value="NorthMaconPark.pdf", |
| | elem_classes="custom-btn" |
| | ) |
| | |
| | |
| | question = gr.Textbox( |
| | label="Your Question", |
| | placeholder="e.g., 'Trace the supply duct to Room 204'", |
| | lines=2 |
| | ) |
| | |
| | submit_btn = gr.Button("π Investigate", elem_classes="custom-btn") |
| | |
| | |
| | image_display = gr.Gallery( |
| | label="πΈ Drawing Tiles (Live)", |
| | elem_id="image-pane", |
| | show_label=True, |
| | preview=True, |
| | columns=3, |
| | height="auto" |
| | ) |
| | |
| | |
| | text_display = gr.Markdown( |
| | label="π Investigation Logs / Final Answer", |
| | elem_id="text_display" |
| | ) |
| | |
| | |
| | submit_btn.click( |
| | fn=run_investigation, |
| | inputs=question, |
| | outputs=[image_display, text_display] |
| | ) |
| | |
| | reset_btn.click( |
| | fn=reset_bot, |
| | inputs=None, |
| | outputs=[image_display, text_display] |
| | ) |
| |
|
| | demo.launch() |