"""Combined Gradio interface with both Single Query and Chat modes.""" import asyncio import os import gradio as gr from .multi_web import ( process_query_sync, process_chat_message, AVAILABLE_MODELS, load_config, generate_plan_mode ) PREVIEW_CHAR_LIMIT = 2000 TEXT_EXTENSIONS = { ".txt", ".md", ".py", ".json", ".csv", ".tsv", ".yaml", ".yml", ".log", ".xml", } IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"} def _is_probably_binary(path: str) -> bool: try: with open(path, "rb") as f: chunk = f.read(2048) except OSError: return False if not chunk: return False printable = sum(32 <= b <= 126 or b in (9, 10, 13) for b in chunk) return printable / len(chunk) < 0.85 def _extract_docx_text(path: str) -> str: import zipfile from xml.etree import ElementTree try: with zipfile.ZipFile(path) as zf: xml_data = zf.read("word/document.xml") except Exception: return "" namespace = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" try: root = ElementTree.fromstring(xml_data) except ElementTree.ParseError: return "" paragraphs = [] for para in root.iter(f"{namespace}p"): texts = [node.text for node in para.iter(f"{namespace}t") if node.text] if texts: paragraphs.append("".join(texts)) return "\n\n".join(paragraphs) def _extract_pdf_text(path: str) -> str: try: from pypdf import PdfReader except ImportError: return "" try: reader = PdfReader(path) except Exception: return "" pages = [] for page in reader.pages[:10]: # cap for safety try: text = page.extract_text() or "" except Exception: text = "" if text: pages.append(text.strip()) return "\n\n".join(pages) def _extract_pptx_text(path: str) -> str: try: from pptx import Presentation except ImportError: return "" try: presentation = Presentation(path) except Exception: return "" slides = [] for index, slide in enumerate(presentation.slides, start=1): texts = [] for shape in slide.shapes: if hasattr(shape, "text") and shape.text: texts.append(shape.text.strip()) if texts: slides.append(f"Slide {index}:\n" + "\n".join(texts)) return "\n\n".join(slides) def _extract_excel_text(path: str, extension: str) -> str: extension = extension.lower() rows = [] if extension in {".xlsx", ".xlsm"}: try: from openpyxl import load_workbook except ImportError: return "" try: workbook = load_workbook(path, read_only=True, data_only=True) except Exception: return "" sheet_limit = 5 row_limit = 40 for sheet_index, sheet in enumerate(workbook.worksheets): if sheet_index >= sheet_limit: rows.append("... (additional sheets not shown)") break rows.append(f"Sheet: {sheet.title}") displayed = 0 for row in sheet.iter_rows(values_only=True): if displayed >= row_limit: rows.append("... (rows truncated)") break cells = [str(cell) if cell is not None else "" for cell in row] rows.append(" | ".join(cells)) displayed += 1 elif extension == ".xls": try: import xlrd except ImportError: return "" try: workbook = xlrd.open_workbook(path) except Exception: return "" sheet_limit = 5 row_limit = 40 for sheet_index, sheet in enumerate(workbook.sheets()): if sheet_index >= sheet_limit: rows.append("... (additional sheets not shown)") break rows.append(f"Sheet: {sheet.name}") row_count = min(sheet.nrows, row_limit) for ridx in range(row_count): cells = [ str(sheet.cell_value(ridx, cidx)) for cidx in range(min(sheet.ncols, 20)) ] rows.append(" | ".join(cells)) if sheet.nrows > row_limit: rows.append("... (rows truncated)") return "\n".join(rows) def _describe_image(path: str) -> str: try: from PIL import Image, ExifTags except ImportError: return "(Preview unavailable: Pillow is required for image metadata.)" try: with Image.open(path) as img: width, height = img.size mode = img.mode info_lines = [f"Dimensions: {width}x{height}px", f"Color mode: {mode}"] exif_data = {} if hasattr(img, "_getexif") and img._getexif(): raw_exif = img._getexif() or {} for tag, value in raw_exif.items(): decoded = ExifTags.TAGS.get(tag, tag) if decoded in ("Make", "Model", "Software", "DateTimeOriginal"): exif_data[decoded] = value if exif_data: info_lines.append("EXIF:") for key, value in exif_data.items(): info_lines.append(f" - {key}: {value}") return "\n".join(info_lines) except Exception: return "(Preview unavailable: could not read image metadata.)" def _read_text_file(path: str) -> str: try: with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read(PREVIEW_CHAR_LIMIT) except Exception: return "" def _generate_preview(path: str, extension: str) -> str: extension = extension.lower() preview_text = "" if extension == ".docx": preview_text = _extract_docx_text(path) elif extension == ".pdf": preview_text = _extract_pdf_text(path) elif extension in (".pptx", ".ppt"): preview_text = _extract_pptx_text(path) if not preview_text and extension == ".ppt": preview_text = "(Preview unavailable for legacy .ppt files. Convert to .pptx for text access.)" elif extension in {".xlsx", ".xlsm", ".xls"}: preview_text = _extract_excel_text(path, extension) elif extension in TEXT_EXTENSIONS: preview_text = _read_text_file(path) elif extension in IMAGE_EXTENSIONS: preview_text = _describe_image(path) elif not _is_probably_binary(path): preview_text = _read_text_file(path) if preview_text: if len(preview_text) > PREVIEW_CHAR_LIMIT: preview_text = preview_text[:PREVIEW_CHAR_LIMIT] + "\n...\n(Preview truncated)" return preview_text return "(Preview unavailable. File may be binary or unsupported for inline preview.)" # Create Combined Gradio interface with TABS with gr.Blocks( title="Heavy Multi-Model 2.0", theme=gr.themes.Soft() ) as demo: gr.Markdown("# 🤖 Heavy Multi-Model 2.0") with gr.Tabs() as tabs: # ============================================ # TAB 1: CHAT MODE # ============================================ with gr.Tab("C", id="chat"): # State for conversation history and file attachments chat_state = gr.State([]) chat_uploaded_file_state = gr.State(value=None) with gr.Row(): with gr.Column(scale=3): # API Keys with gr.Group(): chat_api_key = gr.Textbox( label="O", placeholder="sk-or-v1-...", type="password" ) chat_use_tavily = gr.Checkbox(label="T", value=False) chat_tavily_key = gr.Textbox( label="T Key", placeholder="tvly-...", type="password", visible=False ) # Model Config with gr.Accordion("🎯 Model Configuration", open=True): chat_mode = gr.Radio( choices=[ "S", "M", "Original M" ], value="S", label="Mode" ) with gr.Group(visible=True) as chat_single_group: chat_single_model = gr.Dropdown( choices=AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Model" ) with gr.Group(visible=False) as chat_multi_group: chat_orch = gr.Dropdown(AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Orchestrator") chat_agent = gr.Dropdown(AVAILABLE_MODELS, value="gpt-5.1", label="Agents") chat_synth = gr.Dropdown(AVAILABLE_MODELS, value="gemini-3-pro-preview", label="Synthesizer") with gr.Accordion("⚙️ Settings", open=False): chat_num_agents = gr.Slider(2, 8, 4, step=1, label="Number of Agents") chat_show_thoughts = gr.Checkbox(label="Show Agent Details", value=False) # Chat UI gr.Markdown("### 💬 Conversation") chat_display = gr.Chatbot( value=[], label="Chat", height=400, type="messages" ) with gr.Row(): chat_upload = gr.UploadButton( "📎 Attach File", size="sm", scale=1, variant="secondary", file_types=["image", "video", "audio", "text", "document"], file_count="single" ) chat_input = gr.Textbox( placeholder="Type your message...", lines=2, scale=4, show_label=False ) chat_send = gr.Button("Send 🚀", variant="primary", scale=1) chat_clear = gr.Button("🗑️ Clear Chat", variant="secondary") chat_file_info = gr.Markdown("No file uploaded yet.", visible=True) chat_file_preview = gr.Textbox( label="Attached File Preview (first 2000 characters)", lines=6, interactive=False ) with gr.Column(scale=1): pass with gr.Accordion("📊 Analysis Details (Latest)", open=False): chat_model_info = gr.Markdown() chat_questions = gr.Textbox(label="Questions", lines=3, interactive=False) chat_agents = gr.Markdown(label="Agent Analyses") # ============================================ # TAB 2: SINGLE QUERY MODE # ============================================ with gr.Tab("Q", id="single"): with gr.Row(): with gr.Column(scale=3): # API Keys single_api_key = gr.Textbox( label="O", placeholder="sk-or-v1-...", type="password" ) single_use_tavily = gr.Checkbox(label="T", value=False) single_tavily_key = gr.Textbox( label="T Key", placeholder="tvly-...", type="password", visible=False ) single_query = gr.Textbox( label="Your Query", placeholder="What are the implications of quantum computing?", lines=3 ) # Model Config with gr.Accordion("🎯 Model Configuration", open=True): single_mode = gr.Radio( choices=[ "S", "M", "Original M" ], value="S", label="Mode" ) with gr.Group(visible=True) as single_single_group: single_single_model = gr.Dropdown( choices=AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Model" ) with gr.Group(visible=False) as single_multi_group: single_orch = gr.Dropdown(AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Orchestrator") single_agent = gr.Dropdown(AVAILABLE_MODELS, value="gpt-5.1", label="Agents") single_synth = gr.Dropdown(AVAILABLE_MODELS, value="gemini-3-pro-preview", label="Synthesizer") with gr.Accordion("⚙️ Settings", open=False): single_num_agents = gr.Slider(2, 8, 4, step=1, label="Number of Agents") single_show_thoughts = gr.Checkbox(label="Show Agent Thoughts", value=True) single_submit = gr.Button("🚀 Analyze", variant="primary", size="lg") with gr.Column(scale=1): pass with gr.Accordion("🎯 Model Configuration", open=True): single_model_info = gr.Markdown() with gr.Accordion("📋 Generated Questions", open=True): single_questions = gr.Textbox(label="Questions", lines=6, interactive=False) with gr.Accordion("🔍 Agent Analyses", open=False): single_agents = gr.Markdown() with gr.Accordion("✨ Final Response", open=True): single_response = gr.Markdown() # ============================================ # TAB 3: PLAN MODE # ============================================ with gr.Tab("Plan Mode", id="plan"): gr.Markdown("### 🧭 Plan Mode") with gr.Row(): with gr.Column(scale=3): plan_api_key = gr.Textbox( label="O", placeholder="sk-or-v1-...", type="password" ) plan_task = gr.Textbox( label="Task / Goal to Plan", placeholder="Describe the backlog item or project you want planned...", lines=4 ) with gr.Accordion("🧠 Planner Settings", open=True): plan_model = gr.Dropdown( choices=AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Planner Model" ) plan_num_agents = gr.Slider( 3, 8, 4, step=1, label="Parallel Agents / Workstreams" ) with gr.Row(): plan_generate = gr.Button("🧭 Generate Plan", variant="primary") plan_clear = gr.Button("🗑️ Clear", variant="secondary") with gr.Column(scale=1): pass plan_state = gr.State("") with gr.Accordion("📋 Plan Output", open=True): plan_model_info = gr.Markdown() plan_output = gr.Markdown() with gr.Accordion("🚀 Execute with Heavy (uses plan as context)", open=False): plan_exec_query = gr.Textbox( label="Execution Task (Heavy will follow the plan context)", placeholder="What should Heavy execute? e.g., \"Build auth UI per plan above\"", lines=3 ) plan_exec_mode = gr.Radio( choices=[ "S", "M", "Original M" ], value="S", label="Mode" ) with gr.Group(visible=True) as plan_exec_single_group: plan_exec_single_model = gr.Dropdown( choices=AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Model" ) with gr.Group(visible=False) as plan_exec_multi_group: plan_exec_orch = gr.Dropdown(AVAILABLE_MODELS, value="claude-4.5-sonnet", label="Orchestrator") plan_exec_agent = gr.Dropdown(AVAILABLE_MODELS, value="gpt-5.1", label="Agents") plan_exec_synth = gr.Dropdown(AVAILABLE_MODELS, value="gemini-3-pro-preview", label="Synthesizer") plan_exec_num_agents = gr.Slider(2, 8, 4, step=1, label="Number of Agents") plan_exec_show_thoughts = gr.Checkbox(label="Show Agent Thoughts", value=True) plan_exec_use_tavily = gr.Checkbox(label="Enable Web Search (Tavily)", value=False) plan_exec_tavily_key = gr.Textbox( label="T Key", placeholder="tvly-...", type="password", visible=False ) plan_execute = gr.Button("🚀 Run Heavy with Plan Context", variant="primary", size="lg") with gr.Accordion("Execution Output", open=True): plan_exec_model_info = gr.Markdown() plan_exec_questions = gr.Textbox(label="Questions", lines=6, interactive=False) plan_exec_agents = gr.Markdown() plan_exec_response = gr.Markdown() # ============================================ # TAB 4: FILE UPLOAD TEST MODE # ============================================ with gr.Tab("Upload Test", id="upload_test"): gr.Markdown("### 📁 Upload a file to quickly inspect it") upload_file_input = gr.File( label="Select a file to upload", file_count="single", type="filepath", file_types=["image", "video", "audio", "text", "document"] ) upload_process_btn = gr.Button("Process File", variant="primary") upload_file_info = gr.Markdown("No file uploaded yet.") upload_file_preview = gr.Textbox( label="File Preview (first 2000 characters)", lines=10, interactive=False ) # ============================================ # EVENT HANDLERS # ============================================ # Toggle functions def toggle_model_selection(mode): if mode == "S": return gr.update(visible=True), gr.update(visible=False) elif mode == "M": return gr.update(visible=False), gr.update(visible=True) else: return gr.update(visible=False), gr.update(visible=False) def toggle_tavily(use_tavily): return gr.update(visible=use_tavily) # Plan mode handlers def handle_plan_request(task, num_agents, model, api_key): model_info, plan_text = generate_plan_mode(task, num_agents, model, api_key) return model_info, plan_text, plan_text def clear_plan(): return "", "", "", "", "", "", "" def handle_plan_execute(execution_task, plan_text, num_agents, show_thoughts, mode, single, orch, agent, synth, api_key, use_tavily, tavily_key): if not plan_text.strip(): return "⚠️ Generate a plan first.", "", "", "" if not execution_task.strip(): return "⚠️ Enter an execution task for Heavy to run with the plan context.", "", "", "" execution_query = ( "Follow the plan below as context. Execute the task, using the plan to guide questions and steps.\n\n" f"=== PLAN START ===\n{plan_text}\n=== PLAN END ===\n\n" f"Execution task: {execution_task.strip()}" ) return process_query_sync( execution_query, num_agents, show_thoughts, mode, single, orch, agent, synth, api_key, use_tavily, tavily_key ) # Chat handlers def handle_chat(msg, hist, num_agents, show_thoughts, mode, single, orch, agent, synth, api_key, use_tavily, tavily_key, uploaded_file): attachment_note = "" if uploaded_file and uploaded_file.get("preview"): attachment_note = ( "\n\n---\nAttached file information:\n" f"{uploaded_file.get('info', '')}\n\n" "Attached file preview:\n" f"{uploaded_file['preview']}" ) elif uploaded_file and uploaded_file.get("info"): attachment_note = ( "\n\n---\nAttached file information:\n" f"{uploaded_file['info']}\n" "(Preview unavailable.)" ) msg_payload = f"{msg}{attachment_note}" if attachment_note else msg updated_hist, model_info, questions, agents, _ = process_chat_message( msg_payload, hist, num_agents, show_thoughts, mode, single, orch, agent, synth, api_key, use_tavily, tavily_key ) chat_display = [{"role": m["role"], "content": m["content"]} for m in updated_hist] if uploaded_file: reset_info = "No file uploaded yet." reset_preview = "" reset_attachment = None else: reset_info = gr.update() reset_preview = gr.update() reset_attachment = uploaded_file return ( chat_display, updated_hist, "", model_info, questions, agents, reset_info, reset_preview, reset_attachment ) def clear_chat(): return [], [] def handle_file_upload(file_path): """Return basic metadata and safe preview text for uploaded files.""" if not file_path: return "⚠️ Please upload a file first.", "" file_name = os.path.basename(file_path) file_ext = os.path.splitext(file_name)[1].lower() try: size_bytes = os.path.getsize(file_path) size_info = f"{size_bytes} bytes ({size_bytes / 1024:.1f} KB)" except OSError: size_bytes = None size_info = "Unknown" preview = _generate_preview(file_path, file_ext) info = ( f"**File:** {file_name}\n" f"- Type: {file_ext or 'unknown'}\n" f"- Size: {size_info}\n" f"- Location: `{file_path}`" ) return info, preview def handle_chat_file_upload(file_path): """Extend file upload handler to store attachment metadata for chat.""" info, preview = handle_file_upload(file_path) payload = None if file_path: payload = { "path": file_path, "info": info, "preview": preview } return info, preview, payload # Chat mode events chat_mode.change( fn=toggle_model_selection, inputs=[chat_mode], outputs=[chat_single_group, chat_multi_group] ) chat_use_tavily.change( fn=toggle_tavily, inputs=[chat_use_tavily], outputs=[chat_tavily_key] ) chat_send.click( fn=handle_chat, inputs=[chat_input, chat_state, chat_num_agents, chat_show_thoughts, chat_mode, chat_single_model, chat_orch, chat_agent, chat_synth, chat_api_key, chat_use_tavily, chat_tavily_key, chat_uploaded_file_state], outputs=[ chat_display, chat_state, chat_input, chat_model_info, chat_questions, chat_agents, chat_file_info, chat_file_preview, chat_uploaded_file_state ] ) chat_input.submit( fn=handle_chat, inputs=[chat_input, chat_state, chat_num_agents, chat_show_thoughts, chat_mode, chat_single_model, chat_orch, chat_agent, chat_synth, chat_api_key, chat_use_tavily, chat_tavily_key, chat_uploaded_file_state], outputs=[ chat_display, chat_state, chat_input, chat_model_info, chat_questions, chat_agents, chat_file_info, chat_file_preview, chat_uploaded_file_state ] ) chat_clear.click(fn=clear_chat, outputs=[chat_display, chat_state]) chat_upload.upload( fn=handle_chat_file_upload, inputs=[chat_upload], outputs=[chat_file_info, chat_file_preview, chat_uploaded_file_state] ) # Plan mode events plan_generate.click( fn=handle_plan_request, inputs=[plan_task, plan_num_agents, plan_model, plan_api_key], outputs=[plan_model_info, plan_output, plan_state] ) plan_clear.click( fn=clear_plan, outputs=[ plan_model_info, plan_output, plan_state, plan_exec_model_info, plan_exec_questions, plan_exec_agents, plan_exec_response ] ) plan_exec_mode.change( fn=toggle_model_selection, inputs=[plan_exec_mode], outputs=[plan_exec_single_group, plan_exec_multi_group] ) plan_exec_use_tavily.change( fn=toggle_tavily, inputs=[plan_exec_use_tavily], outputs=[plan_exec_tavily_key] ) plan_execute.click( fn=handle_plan_execute, inputs=[ plan_exec_query, plan_state, plan_exec_num_agents, plan_exec_show_thoughts, plan_exec_mode, plan_exec_single_model, plan_exec_orch, plan_exec_agent, plan_exec_synth, plan_api_key, plan_exec_use_tavily, plan_exec_tavily_key ], outputs=[ plan_exec_model_info, plan_exec_questions, plan_exec_agents, plan_exec_response ] ) # Single query mode events single_mode.change( fn=toggle_model_selection, inputs=[single_mode], outputs=[single_single_group, single_multi_group] ) single_use_tavily.change( fn=toggle_tavily, inputs=[single_use_tavily], outputs=[single_tavily_key] ) single_submit.click( fn=process_query_sync, inputs=[single_query, single_num_agents, single_show_thoughts, single_mode, single_single_model, single_orch, single_agent, single_synth, single_api_key, single_use_tavily, single_tavily_key], outputs=[single_model_info, single_questions, single_agents, single_response] ) upload_process_btn.click( fn=handle_file_upload, inputs=[upload_file_input], outputs=[upload_file_info, upload_file_preview] ) def launch(share=True, server_port=7860): """Launch the combined interface.""" demo.launch( share=share, server_port=server_port, server_name="0.0.0.0", show_error=True, inbrowser=True, prevent_thread_lock=False ) if __name__ == "__main__": launch()