| import os |
| import re |
| import shutil |
| import traceback |
| import gradio as gr |
| from pathlib import Path |
| from histopath.agent import A1 |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| PASSCODE = os.getenv("GRADIO_PASSWORD") |
|
|
| |
| agent = None |
|
|
|
|
| def check_for_output_files(): |
| """Check for all files in the output directory and return their paths.""" |
| output_dir = Path("./output") |
| if not output_dir.exists(): |
| return [], [] |
| |
| image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"} |
| data_extensions = {".csv", ".txt", ".json", ".npy"} |
| |
| images = [] |
| data_files = [] |
| |
| for file in output_dir.iterdir(): |
| if file.is_file(): |
| if file.suffix.lower() in image_extensions: |
| images.append(str(file)) |
| elif file.suffix.lower() in data_extensions: |
| data_files.append(str(file)) |
| |
| return images, data_files |
|
|
|
|
| def preview_uploaded_file(uploaded_file): |
| """Preview the uploaded file - show image or file info.""" |
| if uploaded_file is None: |
| return None, None, "No file uploaded" |
| |
| file_path = Path(uploaded_file.name) |
| file_ext = file_path.suffix.lower() |
| |
| image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"} |
| |
| if file_ext in image_extensions: |
| |
| return uploaded_file.name, None, f"π· Previewing: {file_path.name}" |
| else: |
| |
| file_size = Path(uploaded_file.name).stat().st_size / 1024 |
| return None, uploaded_file.name, f"π File: {file_path.name} ({file_size:.1f} KB)" |
|
|
|
|
| def parse_agent_output(output): |
| """Parse agent output to extract code blocks, observations, and regular text.""" |
| |
| output = re.sub(r'={30,}\s*(Human|Ai)\s+Message\s*={30,}', '', output) |
| output = output.strip() |
| |
| parsed = { |
| "type": "text", |
| "content": output, |
| "code": None, |
| "observation": None, |
| "thinking": None |
| } |
| |
| |
| execute_match = re.search(r'<execute>(.*?)</execute>', output, re.DOTALL) |
| if execute_match: |
| parsed["type"] = "code" |
| parsed["code"] = execute_match.group(1).strip() |
| |
| text_before = output[:execute_match.start()].strip() |
| |
| text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) |
| text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() |
| parsed["thinking"] = text_before if text_before else None |
| return parsed |
| |
| |
| observation_match = re.search(r'<observation>(.*?)</observation>', output, re.DOTALL) |
| if observation_match: |
| parsed["type"] = "observation" |
| parsed["observation"] = observation_match.group(1).strip() |
| |
| text_before = output[:observation_match.start()].strip() |
| text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) |
| text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() |
| parsed["thinking"] = text_before if text_before else None |
| return parsed |
| |
| |
| solution_match = re.search(r'<solution>(.*?)</solution>', output, re.DOTALL) |
| if solution_match: |
| parsed["type"] = "solution" |
| parsed["content"] = solution_match.group(1).strip() |
| |
| text_before = output[:solution_match.start()].strip() |
| text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) |
| text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() |
| parsed["thinking"] = text_before if text_before else None |
| return parsed |
| |
| |
| cleaned = re.sub(r'<think>(.*?)</think>', r'\1', output, flags=re.DOTALL) |
| cleaned = re.sub(r'={30,}.*?={30,}', '', cleaned).strip() |
| parsed["content"] = cleaned |
| |
| return parsed |
|
|
|
|
| def format_message_for_display(parsed_output): |
| """Format parsed output into a readable message for the chatbot.""" |
| msg_parts = [] |
| |
| |
| if parsed_output.get("thinking"): |
| msg_parts.append(parsed_output["thinking"]) |
| |
| if parsed_output["type"] == "code": |
| |
| if parsed_output.get("thinking"): |
| msg_parts.append("\n---\n") |
| |
| msg_parts.append("### π» Executing Code\n") |
| msg_parts.append(f"```python\n{parsed_output['code']}\n```") |
| |
| elif parsed_output["type"] == "observation": |
| |
| if parsed_output.get("thinking"): |
| msg_parts.append("\n---\n") |
| |
| msg_parts.append("### π Observation\n") |
| msg_parts.append(f"```\n{parsed_output['observation']}\n```") |
| |
| elif parsed_output["type"] == "solution": |
| |
| if parsed_output.get("thinking"): |
| msg_parts.append("\n---\n") |
| |
| msg_parts.append("### β
Solution\n") |
| msg_parts.append(parsed_output['content']) |
| |
| else: |
| |
| if not parsed_output.get("thinking"): |
| msg_parts.append(parsed_output["content"]) |
| |
| return "\n\n".join(msg_parts) |
|
|
|
|
| def process_agent_response(prompt, uploaded_file, chatbot_history): |
| """Process the agent response and update chatbot.""" |
| global agent |
| |
| if agent is None: |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": "β οΈ Please enter the passcode first to initialize the agent." |
| }) |
| yield chatbot_history, None, None, None, None, "β οΈ Agent not initialized" |
| return |
| |
| if not prompt.strip() and uploaded_file is None: |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": "β οΈ Please provide a prompt or upload a file." |
| }) |
| yield chatbot_history, None, None, None, None, "β οΈ No input provided" |
| return |
| |
| |
| file_path = None |
| file_info = "" |
| if uploaded_file is not None: |
| try: |
| |
| data_dir = Path("./data") |
| data_dir.mkdir(exist_ok=True) |
| |
| |
| file_name = Path(uploaded_file.name).name |
| file_path = data_dir / file_name |
| shutil.copy(uploaded_file.name, file_path) |
| |
| file_info = f"\n\nπ **Uploaded file:** `{file_path}`\n" |
| |
| |
| if prompt.strip(): |
| prompt = f"{prompt}\n\nUploaded file path: {file_path}" |
| else: |
| prompt = f"I have uploaded a file at: {file_path}. Please analyze it." |
| |
| except Exception as e: |
| error_msg = f"β Error handling file upload: {str(e)}" |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": error_msg |
| }) |
| yield chatbot_history, None, None, None, None, error_msg |
| return |
| |
| |
| user_message = prompt if not file_info else f"{prompt}{file_info}" |
| chatbot_history.append({"role": "user", "content": user_message}) |
| yield chatbot_history, None, None, None, None, "π Processing..." |
| |
| try: |
| |
| step_count = 0 |
| for step in agent.go_stream(prompt): |
| step_count += 1 |
| output = step.get("output", "") |
| |
| if output: |
| |
| parsed = parse_agent_output(output) |
| |
| |
| if parsed.get("thinking"): |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": parsed["thinking"] |
| }) |
| |
| |
| if parsed["type"] == "code" and parsed["code"]: |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": f"### π» Executing Code\n\n```python\n{parsed['code']}\n```" |
| }) |
| elif parsed["type"] == "observation" and parsed["observation"]: |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": f"### π Observation\n\n```\n{parsed['observation']}\n```" |
| }) |
| elif parsed["type"] == "solution": |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": f"### β
Solution\n\n{parsed['content']}" |
| }) |
| elif parsed["type"] == "text" and parsed["content"]: |
| |
| if not parsed.get("thinking"): |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": parsed["content"] |
| }) |
| |
| |
| images, data_files = check_for_output_files() |
| |
| |
| status = f"π Step {step_count}" |
| if parsed["type"] == "code": |
| status += " - Executing code..." |
| elif parsed["type"] == "observation": |
| status += " - Processing results..." |
| elif parsed["type"] == "solution": |
| status += " - Finalizing solution..." |
| |
| yield ( |
| chatbot_history, |
| images if images else None, |
| data_files if data_files else None, |
| None, |
| None, |
| status |
| ) |
| |
| |
| final_images, final_data = check_for_output_files() |
| |
| |
| if final_images or final_data: |
| download_msg = "\n\n---\n\n### π Generated Files Ready for Download\n\n" |
| |
| if final_images: |
| download_msg += f"**πΌοΈ Images ({len(final_images)})** - Available in the **Images** tab β\n" |
| for img_path in final_images: |
| img_name = Path(img_path).name |
| download_msg += f"- `{img_name}`\n" |
| download_msg += "\n" |
| |
| if final_data: |
| download_msg += f"**π Data Files ({len(final_data)})** - Available in the **Data** tab β\n" |
| for data_path in final_data: |
| data_name = Path(data_path).name |
| download_msg += f"- `{data_name}`\n" |
| |
| download_msg += "\n*Click the download button on each file in the respective tabs above.*" |
| |
| |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": download_msg |
| }) |
| |
| status = "β
Complete" |
| if final_images: |
| status += f" | {len(final_images)} image(s)" |
| if final_data: |
| status += f" | {len(final_data)} data file(s)" |
| |
| yield chatbot_history, final_images if final_images else None, final_data if final_data else None, None, None, status |
| |
| except Exception as e: |
| error_msg = f"β Error: {str(e)}\n\n```\n{traceback.format_exc()}\n```" |
| chatbot_history.append({ |
| "role": "assistant", |
| "content": error_msg |
| }) |
| yield chatbot_history, None, None, None, None, "β Error occurred" |
|
|
|
|
| def validate_passcode(passcode): |
| """Validate the passcode and initialize the agent.""" |
| global agent |
| |
| if passcode == PASSCODE: |
| |
| try: |
| agent = A1() |
| return ( |
| gr.update(visible=False), |
| gr.update(visible=True), |
| "β
Access granted! Agent initialized and ready." |
| ) |
| except Exception as e: |
| error_trace = traceback.format_exc() |
| return ( |
| gr.update(visible=True), |
| gr.update(visible=False), |
| f"β Error initializing agent:\n{str(e)}\n\n{error_trace}" |
| ) |
| else: |
| return ( |
| gr.update(visible=True), |
| gr.update(visible=False), |
| "β Invalid passcode. Please try again." |
| ) |
|
|
|
|
| def clear_chat(): |
| """Clear the chat history and output files.""" |
| |
| output_dir = Path("./output") |
| if output_dir.exists(): |
| shutil.rmtree(output_dir) |
| output_dir.mkdir(exist_ok=True) |
| |
| |
| data_dir = Path("./data") |
| if data_dir.exists(): |
| for file in data_dir.iterdir(): |
| if file.is_file(): |
| file.unlink() |
| |
| return [], None, None, None, None, "ποΈ Chat cleared" |
|
|
|
|
| |
| custom_theme = gr.themes.Soft( |
| primary_hue="blue", |
| secondary_hue="slate", |
| spacing_size="sm", |
| radius_size="md", |
| ).set( |
| button_primary_background_fill="*primary_500", |
| button_primary_background_fill_hover="*primary_600", |
| block_label_text_weight="600", |
| block_title_text_weight="600", |
| ) |
|
|
| with gr.Blocks(title="HistoPath Agent", theme=custom_theme, css=""" |
| .gradio-container { |
| max-width: 100% !important; |
| } |
| .main-header { |
| text-align: center; |
| padding: 1.5rem 0; |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| color: white; |
| border-radius: 8px; |
| margin-bottom: 1.5rem; |
| } |
| .main-header h1 { |
| margin: 0; |
| font-size: 2.2rem; |
| font-weight: 700; |
| } |
| .main-header p { |
| margin: 0.5rem 0 0 0; |
| opacity: 0.95; |
| font-size: 1.1rem; |
| } |
| .file-upload-box .wrap { |
| min-width: 0 !important; |
| } |
| .file-upload-box .file-name { |
| word-break: break-word !important; |
| white-space: normal !important; |
| overflow-wrap: break-word !important; |
| } |
| .tab-nav { |
| margin-bottom: 0.5rem; |
| } |
| /* Better styling for code and observation blocks */ |
| .message.bot pre { |
| background-color: #f6f8fa !important; |
| border: 1px solid #d0d7de !important; |
| border-radius: 6px !important; |
| padding: 12px !important; |
| margin: 8px 0 !important; |
| } |
| .message.bot h3 { |
| margin-top: 12px !important; |
| margin-bottom: 8px !important; |
| font-weight: 600 !important; |
| } |
| .message.bot hr { |
| border: none !important; |
| border-top: 2px solid #e1e4e8 !important; |
| margin: 16px 0 !important; |
| } |
| """) as demo: |
| |
| |
| gr.HTML(""" |
| <div class="main-header"> |
| <h1>π¬ HistoPath Agent</h1> |
| <p>AI-Powered Histopathology Analysis Assistant</p> |
| </div> |
| """) |
| |
| |
| with gr.Group(visible=True) as passcode_section: |
| gr.Markdown("### π Authentication Required") |
| |
| with gr.Row(): |
| passcode_input = gr.Textbox( |
| label="Passcode", |
| type="password", |
| placeholder="Enter your passcode...", |
| scale=3 |
| ) |
| passcode_btn = gr.Button("π Unlock", variant="primary", scale=1, size="lg") |
| |
| passcode_status = gr.Textbox( |
| label="Status", |
| interactive=False, |
| lines=2 |
| ) |
| |
| |
| with gr.Group(visible=False) as main_interface: |
| with gr.Row(equal_height=True): |
| |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot( |
| label="π¬ Conversation", |
| height=550, |
| |
| show_label=True, |
| render_markdown=True, |
| ) |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=7): |
| prompt_input = gr.Textbox( |
| label="Your Query", |
| placeholder="E.g., 'Caption the uploaded whole slide image' or 'Segment cells using instanseg model'", |
| lines=2, |
| max_lines=5, |
| show_label=False, |
| ) |
| with gr.Column(scale=3): |
| file_upload = gr.File( |
| label="π Upload File", |
| file_types=[".svs", ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".csv", ".txt", ".json", ".npy"], |
| height=75, |
| elem_classes="file-upload-box", |
| ) |
| |
| with gr.Row(): |
| submit_btn = gr.Button("π Submit", variant="primary", scale=3, size="lg") |
| clear_btn = gr.Button("ποΈ Clear", scale=1, size="lg", variant="secondary") |
| |
| status_text = gr.Textbox( |
| label="Status", |
| interactive=False, |
| value="Ready", |
| show_label=False, |
| container=False, |
| ) |
| |
| |
| with gr.Column(scale=2): |
| with gr.Tabs(): |
| with gr.Tab("π₯ Input"): |
| with gr.Column(): |
| input_image_preview = gr.Image( |
| label="Input Image", |
| height=400, |
| show_label=False, |
| container=True, |
| ) |
| input_file_preview = gr.File( |
| label="Input File", |
| interactive=False, |
| height=100, |
| show_label=False, |
| container=True, |
| ) |
| input_status = gr.Textbox( |
| value="Upload a file to preview", |
| show_label=False, |
| interactive=False, |
| container=False, |
| ) |
| |
| with gr.Tab("πΌοΈ Images"): |
| output_gallery = gr.Gallery( |
| label="Generated Visualizations", |
| columns=1, |
| height=600, |
| object_fit="contain", |
| show_label=False, |
| show_download_button=True, |
| ) |
| |
| with gr.Tab("π Data"): |
| data_files = gr.File( |
| label="Generated Data Files", |
| file_count="multiple", |
| interactive=False, |
| height=600, |
| show_label=False, |
| ) |
| |
| |
| passcode_btn.click( |
| fn=validate_passcode, |
| inputs=[passcode_input], |
| outputs=[passcode_section, main_interface, passcode_status] |
| ) |
| |
| |
| file_upload.change( |
| fn=preview_uploaded_file, |
| inputs=[file_upload], |
| outputs=[input_image_preview, input_file_preview, input_status] |
| ) |
| |
| submit_btn.click( |
| fn=process_agent_response, |
| inputs=[prompt_input, file_upload, chatbot], |
| outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] |
| ) |
| |
| clear_btn.click( |
| fn=clear_chat, |
| outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] |
| ) |
| |
| |
| prompt_input.submit( |
| fn=process_agent_response, |
| inputs=[prompt_input, file_upload, chatbot], |
| outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| |
| Path("./data").mkdir(exist_ok=True) |
| Path("./output").mkdir(exist_ok=True) |
| |
| print("=" * 60) |
| print("π¬ HistoPath Agent - Gradio Interface") |
| print("=" * 60) |
| |
| print("Starting server...") |
| print("=" * 60) |
| |
| |
| |
| |
| |
| |
| |
| |
| demo.launch() |