Heavy / src /multi_web_combined.py
justinhew
Add plan execution flow and UI
a2d37bf
"""Combined Gradio interface with both Single Query and Chat modes."""
import asyncio
import os
import gradio as gr
from .multi_web import (
process_query_sync,
process_chat_message,
AVAILABLE_MODELS,
load_config,
generate_plan_mode
)
PREVIEW_CHAR_LIMIT = 2000
TEXT_EXTENSIONS = {
".txt",
".md",
".py",
".json",
".csv",
".tsv",
".yaml",
".yml",
".log",
".xml",
}
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"}
def _is_probably_binary(path: str) -> bool:
try:
with open(path, "rb") as f:
chunk = f.read(2048)
except OSError:
return False
if not chunk:
return False
printable = sum(32 <= b <= 126 or b in (9, 10, 13) for b in chunk)
return printable / len(chunk) < 0.85
def _extract_docx_text(path: str) -> str:
import zipfile
from xml.etree import ElementTree
try:
with zipfile.ZipFile(path) as zf:
xml_data = zf.read("word/document.xml")
except Exception:
return ""
namespace = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
try:
root = ElementTree.fromstring(xml_data)
except ElementTree.ParseError:
return ""
paragraphs = []
for para in root.iter(f"{namespace}p"):
texts = [node.text for node in para.iter(f"{namespace}t") if node.text]
if texts:
paragraphs.append("".join(texts))
return "\n\n".join(paragraphs)
def _extract_pdf_text(path: str) -> str:
try:
from pypdf import PdfReader
except ImportError:
return ""
try:
reader = PdfReader(path)
except Exception:
return ""
pages = []
for page in reader.pages[:10]: # cap for safety
try:
text = page.extract_text() or ""
except Exception:
text = ""
if text:
pages.append(text.strip())
return "\n\n".join(pages)
def _extract_pptx_text(path: str) -> str:
try:
from pptx import Presentation
except ImportError:
return ""
try:
presentation = Presentation(path)
except Exception:
return ""
slides = []
for index, slide in enumerate(presentation.slides, start=1):
texts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
texts.append(shape.text.strip())
if texts:
slides.append(f"Slide {index}:\n" + "\n".join(texts))
return "\n\n".join(slides)
def _extract_excel_text(path: str, extension: str) -> str:
extension = extension.lower()
rows = []
if extension in {".xlsx", ".xlsm"}:
try:
from openpyxl import load_workbook
except ImportError:
return ""
try:
workbook = load_workbook(path, read_only=True, data_only=True)
except Exception:
return ""
sheet_limit = 5
row_limit = 40
for sheet_index, sheet in enumerate(workbook.worksheets):
if sheet_index >= sheet_limit:
rows.append("... (additional sheets not shown)")
break
rows.append(f"Sheet: {sheet.title}")
displayed = 0
for row in sheet.iter_rows(values_only=True):
if displayed >= row_limit:
rows.append("... (rows truncated)")
break
cells = [str(cell) if cell is not None else "" for cell in row]
rows.append(" | ".join(cells))
displayed += 1
elif extension == ".xls":
try:
import xlrd
except ImportError:
return ""
try:
workbook = xlrd.open_workbook(path)
except Exception:
return ""
sheet_limit = 5
row_limit = 40
for sheet_index, sheet in enumerate(workbook.sheets()):
if sheet_index >= sheet_limit:
rows.append("... (additional sheets not shown)")
break
rows.append(f"Sheet: {sheet.name}")
row_count = min(sheet.nrows, row_limit)
for ridx in range(row_count):
cells = [
str(sheet.cell_value(ridx, cidx))
for cidx in range(min(sheet.ncols, 20))
]
rows.append(" | ".join(cells))
if sheet.nrows > row_limit:
rows.append("... (rows truncated)")
return "\n".join(rows)
def _describe_image(path: str) -> str:
try:
from PIL import Image, ExifTags
except ImportError:
return "(Preview unavailable: Pillow is required for image metadata.)"
try:
with Image.open(path) as img:
width, height = img.size
mode = img.mode
info_lines = [f"Dimensions: {width}x{height}px", f"Color mode: {mode}"]
exif_data = {}
if hasattr(img, "_getexif") and img._getexif():
raw_exif = img._getexif() or {}
for tag, value in raw_exif.items():
decoded = ExifTags.TAGS.get(tag, tag)
if decoded in ("Make", "Model", "Software", "DateTimeOriginal"):
exif_data[decoded] = value
if exif_data:
info_lines.append("EXIF:")
for key, value in exif_data.items():
info_lines.append(f" - {key}: {value}")
return "\n".join(info_lines)
except Exception:
return "(Preview unavailable: could not read image metadata.)"
def _read_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read(PREVIEW_CHAR_LIMIT)
except Exception:
return ""
def _generate_preview(path: str, extension: str) -> str:
extension = extension.lower()
preview_text = ""
if extension == ".docx":
preview_text = _extract_docx_text(path)
elif extension == ".pdf":
preview_text = _extract_pdf_text(path)
elif extension in (".pptx", ".ppt"):
preview_text = _extract_pptx_text(path)
if not preview_text and extension == ".ppt":
preview_text = "(Preview unavailable for legacy .ppt files. Convert to .pptx for text access.)"
elif extension in {".xlsx", ".xlsm", ".xls"}:
preview_text = _extract_excel_text(path, extension)
elif extension in TEXT_EXTENSIONS:
preview_text = _read_text_file(path)
elif extension in IMAGE_EXTENSIONS:
preview_text = _describe_image(path)
elif not _is_probably_binary(path):
preview_text = _read_text_file(path)
if preview_text:
if len(preview_text) > PREVIEW_CHAR_LIMIT:
preview_text = preview_text[:PREVIEW_CHAR_LIMIT] + "\n...\n(Preview truncated)"
return preview_text
return "(Preview unavailable. File may be binary or unsupported for inline preview.)"
# Create Combined Gradio interface with TABS
with gr.Blocks(
title="Heavy Multi-Model 2.0",
theme=gr.themes.Soft()
) as demo:
gr.Markdown("# 🤖 Heavy Multi-Model 2.0")
with gr.Tabs() as tabs:
# ============================================
# TAB 1: CHAT MODE
# ============================================
with gr.Tab("C", id="chat"):
# State for conversation history and file attachments
chat_state = gr.State([])
chat_uploaded_file_state = gr.State(value=None)
with gr.Row():
with gr.Column(scale=3):
# API Keys
with gr.Group():
chat_api_key = gr.Textbox(
label="O",
placeholder="sk-or-v1-...",
type="password"
)
chat_use_tavily = gr.Checkbox(label="T", value=False)
chat_tavily_key = gr.Textbox(
label="T Key",
placeholder="tvly-...",
type="password",
visible=False
)
# Model Config
with gr.Accordion("🎯 Model Configuration", open=True):
chat_mode = gr.Radio(
choices=[
"S",
"M",
"Original M"
],
value="S",
label="Mode"
)
with gr.Group(visible=True) as chat_single_group:
chat_single_model = gr.Dropdown(
choices=AVAILABLE_MODELS,
value="claude-4.5-sonnet",
label="Model"
)
with gr.Group(visible=False) as chat_multi_group:
chat_orch = gr.Dropdown(AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Orchestrator")
chat_agent = gr.Dropdown(AVAILABLE_MODELS, value="gpt-5.1", label="Agents")
chat_synth = gr.Dropdown(AVAILABLE_MODELS, value="gemini-3-pro-preview", label="Synthesizer")
with gr.Accordion("⚙️ Settings", open=False):
chat_num_agents = gr.Slider(2, 8, 4, step=1, label="Number of Agents")
chat_show_thoughts = gr.Checkbox(label="Show Agent Details", value=False)
# Chat UI
gr.Markdown("### 💬 Conversation")
chat_display = gr.Chatbot(
value=[],
label="Chat",
height=400,
type="messages"
)
with gr.Row():
chat_upload = gr.UploadButton(
"📎 Attach File",
size="sm",
scale=1,
variant="secondary",
file_types=["image", "video", "audio", "text", "document"],
file_count="single"
)
chat_input = gr.Textbox(
placeholder="Type your message...",
lines=2,
scale=4,
show_label=False
)
chat_send = gr.Button("Send 🚀", variant="primary", scale=1)
chat_clear = gr.Button("🗑️ Clear Chat", variant="secondary")
chat_file_info = gr.Markdown("No file uploaded yet.", visible=True)
chat_file_preview = gr.Textbox(
label="Attached File Preview (first 2000 characters)",
lines=6,
interactive=False
)
with gr.Column(scale=1):
pass
with gr.Accordion("📊 Analysis Details (Latest)", open=False):
chat_model_info = gr.Markdown()
chat_questions = gr.Textbox(label="Questions", lines=3, interactive=False)
chat_agents = gr.Markdown(label="Agent Analyses")
# ============================================
# TAB 2: SINGLE QUERY MODE
# ============================================
with gr.Tab("Q", id="single"):
with gr.Row():
with gr.Column(scale=3):
# API Keys
single_api_key = gr.Textbox(
label="O",
placeholder="sk-or-v1-...",
type="password"
)
single_use_tavily = gr.Checkbox(label="T", value=False)
single_tavily_key = gr.Textbox(
label="T Key",
placeholder="tvly-...",
type="password",
visible=False
)
single_query = gr.Textbox(
label="Your Query",
placeholder="What are the implications of quantum computing?",
lines=3
)
# Model Config
with gr.Accordion("🎯 Model Configuration", open=True):
single_mode = gr.Radio(
choices=[
"S",
"M",
"Original M"
],
value="S",
label="Mode"
)
with gr.Group(visible=True) as single_single_group:
single_single_model = gr.Dropdown(
choices=AVAILABLE_MODELS,
value="claude-4.5-sonnet",
label="Model"
)
with gr.Group(visible=False) as single_multi_group:
single_orch = gr.Dropdown(AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Orchestrator")
single_agent = gr.Dropdown(AVAILABLE_MODELS, value="gpt-5.1", label="Agents")
single_synth = gr.Dropdown(AVAILABLE_MODELS, value="gemini-3-pro-preview", label="Synthesizer")
with gr.Accordion("⚙️ Settings", open=False):
single_num_agents = gr.Slider(2, 8, 4, step=1, label="Number of Agents")
single_show_thoughts = gr.Checkbox(label="Show Agent Thoughts", value=True)
single_submit = gr.Button("🚀 Analyze", variant="primary", size="lg")
with gr.Column(scale=1):
pass
with gr.Accordion("🎯 Model Configuration", open=True):
single_model_info = gr.Markdown()
with gr.Accordion("📋 Generated Questions", open=True):
single_questions = gr.Textbox(label="Questions", lines=6, interactive=False)
with gr.Accordion("🔍 Agent Analyses", open=False):
single_agents = gr.Markdown()
with gr.Accordion("✨ Final Response", open=True):
single_response = gr.Markdown()
# ============================================
# TAB 3: PLAN MODE
# ============================================
with gr.Tab("Plan Mode", id="plan"):
gr.Markdown("### 🧭 Plan Mode")
with gr.Row():
with gr.Column(scale=3):
plan_api_key = gr.Textbox(
label="O",
placeholder="sk-or-v1-...",
type="password"
)
plan_task = gr.Textbox(
label="Task / Goal to Plan",
placeholder="Describe the backlog item or project you want planned...",
lines=4
)
with gr.Accordion("🧠 Planner Settings", open=True):
plan_model = gr.Dropdown(
choices=AVAILABLE_MODELS,
value="claude-4.5-sonnet",
label="Planner Model"
)
plan_num_agents = gr.Slider(
3,
8,
4,
step=1,
label="Parallel Agents / Workstreams"
)
with gr.Row():
plan_generate = gr.Button("🧭 Generate Plan", variant="primary")
plan_clear = gr.Button("🗑️ Clear", variant="secondary")
with gr.Column(scale=1):
pass
plan_state = gr.State("")
with gr.Accordion("📋 Plan Output", open=True):
plan_model_info = gr.Markdown()
plan_output = gr.Markdown()
with gr.Accordion("🚀 Execute with Heavy (uses plan as context)", open=False):
plan_exec_query = gr.Textbox(
label="Execution Task (Heavy will follow the plan context)",
placeholder="What should Heavy execute? e.g., \"Build auth UI per plan above\"",
lines=3
)
plan_exec_mode = gr.Radio(
choices=[
"S",
"M",
"Original M"
],
value="S",
label="Mode"
)
with gr.Group(visible=True) as plan_exec_single_group:
plan_exec_single_model = gr.Dropdown(
choices=AVAILABLE_MODELS,
value="claude-4.5-sonnet",
label="Model"
)
with gr.Group(visible=False) as plan_exec_multi_group:
plan_exec_orch = gr.Dropdown(AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Orchestrator")
plan_exec_agent = gr.Dropdown(AVAILABLE_MODELS, value="gpt-5.1", label="Agents")
plan_exec_synth = gr.Dropdown(AVAILABLE_MODELS, value="gemini-3-pro-preview", label="Synthesizer")
plan_exec_num_agents = gr.Slider(2, 8, 4, step=1, label="Number of Agents")
plan_exec_show_thoughts = gr.Checkbox(label="Show Agent Thoughts", value=True)
plan_exec_use_tavily = gr.Checkbox(label="Enable Web Search (Tavily)", value=False)
plan_exec_tavily_key = gr.Textbox(
label="T Key",
placeholder="tvly-...",
type="password",
visible=False
)
plan_execute = gr.Button("🚀 Run Heavy with Plan Context", variant="primary", size="lg")
with gr.Accordion("Execution Output", open=True):
plan_exec_model_info = gr.Markdown()
plan_exec_questions = gr.Textbox(label="Questions", lines=6, interactive=False)
plan_exec_agents = gr.Markdown()
plan_exec_response = gr.Markdown()
# ============================================
# TAB 4: FILE UPLOAD TEST MODE
# ============================================
with gr.Tab("Upload Test", id="upload_test"):
gr.Markdown("### 📁 Upload a file to quickly inspect it")
upload_file_input = gr.File(
label="Select a file to upload",
file_count="single",
type="filepath",
file_types=["image", "video", "audio", "text", "document"]
)
upload_process_btn = gr.Button("Process File", variant="primary")
upload_file_info = gr.Markdown("No file uploaded yet.")
upload_file_preview = gr.Textbox(
label="File Preview (first 2000 characters)",
lines=10,
interactive=False
)
# ============================================
# EVENT HANDLERS
# ============================================
# Toggle functions
def toggle_model_selection(mode):
if mode == "S":
return gr.update(visible=True), gr.update(visible=False)
elif mode == "M":
return gr.update(visible=False), gr.update(visible=True)
else:
return gr.update(visible=False), gr.update(visible=False)
def toggle_tavily(use_tavily):
return gr.update(visible=use_tavily)
# Plan mode handlers
def handle_plan_request(task, num_agents, model, api_key):
model_info, plan_text = generate_plan_mode(task, num_agents, model, api_key)
return model_info, plan_text, plan_text
def clear_plan():
return "", "", "", "", "", "", ""
def handle_plan_execute(execution_task, plan_text, num_agents, show_thoughts, mode,
single, orch, agent, synth, api_key, use_tavily, tavily_key):
if not plan_text.strip():
return "⚠️ Generate a plan first.", "", "", ""
if not execution_task.strip():
return "⚠️ Enter an execution task for Heavy to run with the plan context.", "", "", ""
execution_query = (
"Follow the plan below as context. Execute the task, using the plan to guide questions and steps.\n\n"
f"=== PLAN START ===\n{plan_text}\n=== PLAN END ===\n\n"
f"Execution task: {execution_task.strip()}"
)
return process_query_sync(
execution_query, num_agents, show_thoughts, mode,
single, orch, agent, synth, api_key, use_tavily, tavily_key
)
# Chat handlers
def handle_chat(msg, hist, num_agents, show_thoughts, mode, single, orch, agent, synth, api_key, use_tavily, tavily_key, uploaded_file):
attachment_note = ""
if uploaded_file and uploaded_file.get("preview"):
attachment_note = (
"\n\n---\nAttached file information:\n"
f"{uploaded_file.get('info', '')}\n\n"
"Attached file preview:\n"
f"{uploaded_file['preview']}"
)
elif uploaded_file and uploaded_file.get("info"):
attachment_note = (
"\n\n---\nAttached file information:\n"
f"{uploaded_file['info']}\n"
"(Preview unavailable.)"
)
msg_payload = f"{msg}{attachment_note}" if attachment_note else msg
updated_hist, model_info, questions, agents, _ = process_chat_message(
msg_payload, hist, num_agents, show_thoughts, mode,
single, orch, agent, synth, api_key, use_tavily, tavily_key
)
chat_display = [{"role": m["role"], "content": m["content"]} for m in updated_hist]
if uploaded_file:
reset_info = "No file uploaded yet."
reset_preview = ""
reset_attachment = None
else:
reset_info = gr.update()
reset_preview = gr.update()
reset_attachment = uploaded_file
return (
chat_display,
updated_hist,
"",
model_info,
questions,
agents,
reset_info,
reset_preview,
reset_attachment
)
def clear_chat():
return [], []
def handle_file_upload(file_path):
"""Return basic metadata and safe preview text for uploaded files."""
if not file_path:
return "⚠️ Please upload a file first.", ""
file_name = os.path.basename(file_path)
file_ext = os.path.splitext(file_name)[1].lower()
try:
size_bytes = os.path.getsize(file_path)
size_info = f"{size_bytes} bytes ({size_bytes / 1024:.1f} KB)"
except OSError:
size_bytes = None
size_info = "Unknown"
preview = _generate_preview(file_path, file_ext)
info = (
f"**File:** {file_name}\n"
f"- Type: {file_ext or 'unknown'}\n"
f"- Size: {size_info}\n"
f"- Location: `{file_path}`"
)
return info, preview
def handle_chat_file_upload(file_path):
"""Extend file upload handler to store attachment metadata for chat."""
info, preview = handle_file_upload(file_path)
payload = None
if file_path:
payload = {
"path": file_path,
"info": info,
"preview": preview
}
return info, preview, payload
# Chat mode events
chat_mode.change(
fn=toggle_model_selection,
inputs=[chat_mode],
outputs=[chat_single_group, chat_multi_group]
)
chat_use_tavily.change(
fn=toggle_tavily,
inputs=[chat_use_tavily],
outputs=[chat_tavily_key]
)
chat_send.click(
fn=handle_chat,
inputs=[chat_input, chat_state, chat_num_agents, chat_show_thoughts, chat_mode,
chat_single_model, chat_orch, chat_agent, chat_synth,
chat_api_key, chat_use_tavily, chat_tavily_key, chat_uploaded_file_state],
outputs=[
chat_display, chat_state, chat_input, chat_model_info,
chat_questions, chat_agents, chat_file_info, chat_file_preview,
chat_uploaded_file_state
]
)
chat_input.submit(
fn=handle_chat,
inputs=[chat_input, chat_state, chat_num_agents, chat_show_thoughts, chat_mode,
chat_single_model, chat_orch, chat_agent, chat_synth,
chat_api_key, chat_use_tavily, chat_tavily_key, chat_uploaded_file_state],
outputs=[
chat_display, chat_state, chat_input, chat_model_info,
chat_questions, chat_agents, chat_file_info, chat_file_preview,
chat_uploaded_file_state
]
)
chat_clear.click(fn=clear_chat, outputs=[chat_display, chat_state])
chat_upload.upload(
fn=handle_chat_file_upload,
inputs=[chat_upload],
outputs=[chat_file_info, chat_file_preview, chat_uploaded_file_state]
)
# Plan mode events
plan_generate.click(
fn=handle_plan_request,
inputs=[plan_task, plan_num_agents, plan_model, plan_api_key],
outputs=[plan_model_info, plan_output, plan_state]
)
plan_clear.click(
fn=clear_plan,
outputs=[
plan_model_info,
plan_output,
plan_state,
plan_exec_model_info,
plan_exec_questions,
plan_exec_agents,
plan_exec_response
]
)
plan_exec_mode.change(
fn=toggle_model_selection,
inputs=[plan_exec_mode],
outputs=[plan_exec_single_group, plan_exec_multi_group]
)
plan_exec_use_tavily.change(
fn=toggle_tavily,
inputs=[plan_exec_use_tavily],
outputs=[plan_exec_tavily_key]
)
plan_execute.click(
fn=handle_plan_execute,
inputs=[
plan_exec_query,
plan_state,
plan_exec_num_agents,
plan_exec_show_thoughts,
plan_exec_mode,
plan_exec_single_model,
plan_exec_orch,
plan_exec_agent,
plan_exec_synth,
plan_api_key,
plan_exec_use_tavily,
plan_exec_tavily_key
],
outputs=[
plan_exec_model_info,
plan_exec_questions,
plan_exec_agents,
plan_exec_response
]
)
# Single query mode events
single_mode.change(
fn=toggle_model_selection,
inputs=[single_mode],
outputs=[single_single_group, single_multi_group]
)
single_use_tavily.change(
fn=toggle_tavily,
inputs=[single_use_tavily],
outputs=[single_tavily_key]
)
single_submit.click(
fn=process_query_sync,
inputs=[single_query, single_num_agents, single_show_thoughts, single_mode,
single_single_model, single_orch, single_agent, single_synth,
single_api_key, single_use_tavily, single_tavily_key],
outputs=[single_model_info, single_questions, single_agents, single_response]
)
upload_process_btn.click(
fn=handle_file_upload,
inputs=[upload_file_input],
outputs=[upload_file_info, upload_file_preview]
)
def launch(share=True, server_port=7860):
"""Launch the combined interface."""
demo.launch(
share=share,
server_port=server_port,
server_name="0.0.0.0",
show_error=True,
inbrowser=True,
prevent_thread_lock=False
)
if __name__ == "__main__":
launch()