Gamma_nope / app.py
MaxonML's picture
Rename app3.py to app.py
949409b verified
import os
import re
import tempfile
import time
import gradio as gr
import numpy as np
import pandas as pd
from duckduckgo_search import DDGS
from google import genai
from google.genai import types
# 🎨 Mobile-First Responsive Glassmorphism CSS
glassy_css = """
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap');
*, *::before, *::after { box-sizing: border-box; }
body, html {
background: linear-gradient(135deg, #0a0e1a 0%, #0f172a 50%, #1a1f35 100%) !important;
background-attachment: fixed !important;
color: #e2e8f0 !important;
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif !important;
-webkit-font-smoothing: antialiased !important;
}
.gradio-container {
background: transparent !important;
max-width: 1100px !important;
margin: 0 auto !important;
padding: 20px !important;
}
/* === GLASS PANELS === */
div[class*="panel"], .gr-box, .gr-form {
background: rgba(15, 23, 42, 0.6) !important;
border: 1px solid rgba(148, 163, 184, 0.1) !important;
backdrop-filter: blur(20px) !important;
-webkit-backdrop-filter: blur(20px) !important;
border-radius: 16px !important;
box-shadow: 0 4px 24px rgba(0, 0, 0, 0.3),
inset 0 1px 0 rgba(255, 255, 255, 0.05) !important;
}
/* === INPUTS === */
textarea, input[type="text"], input[type="password"] {
background: rgba(2, 6, 23, 0.5) !important;
border: 1px solid rgba(148, 163, 184, 0.15) !important;
color: #f1f5f9 !important;
border-radius: 12px !important;
font-family: 'Inter', sans-serif !important;
font-size: 14px !important;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important;
padding: 10px 14px !important;
}
textarea:focus, input:focus {
border-color: rgba(56, 189, 248, 0.5) !important;
box-shadow: 0 0 0 3px rgba(56, 189, 248, 0.1),
0 0 20px rgba(56, 189, 248, 0.05) !important;
outline: none !important;
}
/* === PRIMARY BUTTON === */
button.primary {
background: linear-gradient(135deg, #06b6d4 0%, #3b82f6 50%, #8b5cf6 100%) !important;
border: none !important;
color: #fff !important;
font-weight: 600 !important;
font-size: 14px !important;
border-radius: 12px !important;
padding: 12px 24px !important;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important;
box-shadow: 0 4px 15px rgba(59, 130, 246, 0.3) !important;
letter-spacing: 0.3px !important;
}
button.primary:hover {
transform: translateY(-2px) !important;
box-shadow: 0 8px 25px rgba(59, 130, 246, 0.4) !important;
filter: brightness(1.1) !important;
}
button.primary:active { transform: translateY(0) !important; }
/* === SECONDARY BUTTON === */
button.secondary {
background: rgba(30, 41, 59, 0.8) !important;
border: 1px solid rgba(148, 163, 184, 0.2) !important;
color: #cbd5e1 !important;
border-radius: 10px !important;
padding: 10px 18px !important;
font-weight: 500 !important;
transition: all 0.2s ease !important;
}
button.secondary:hover {
background: rgba(51, 65, 85, 0.9) !important;
border-color: rgba(148, 163, 184, 0.35) !important;
color: #f1f5f9 !important;
transform: translateY(-1px) !important;
}
/* === TYPOGRAPHY === */
h1 {
color: #ffffff !important;
font-weight: 700 !important;
letter-spacing: -0.5px !important;
background: linear-gradient(135deg, #06b6d4, #3b82f6, #8b5cf6) !important;
-webkit-background-clip: text !important;
-webkit-text-fill-color: transparent !important;
background-clip: text !important;
font-size: 1.8rem !important;
margin-bottom: 4px !important;
}
h2, h3, h4 { color: #e2e8f0 !important; font-weight: 600 !important; }
p, span, label { color: #94a3b8 !important; }
/* === SURVEYED LINKS === */
.surveyed-links a {
color: #38bdf8 !important;
text-decoration: underline !important;
text-underline-offset: 2px !important;
word-break: break-all !important;
transition: color 0.2s !important;
}
.surveyed-links a:hover { color: #7dd3fc !important; }
.surveyed-links p { margin-bottom: 8px !important; line-height: 1.7 !important; }
/* === GALLERY === */
.viz-gallery { min-height: 200px; }
.viz-gallery .gallery-item img {
border-radius: 12px !important;
border: 1px solid rgba(148, 163, 184, 0.1) !important;
cursor: pointer !important;
transition: transform 0.2s ease !important;
}
.viz-gallery .gallery-item img:hover { transform: scale(1.02) !important; }
/* === ACCORDION === */
.gr-accordion {
border-radius: 14px !important;
overflow: hidden !important;
border: 1px solid rgba(148, 163, 184, 0.08) !important;
transition: border-color 0.2s ease !important;
}
.gr-accordion:hover { border-color: rgba(148, 163, 184, 0.15) !important; }
/* === SCROLLABLE REPORT === */
.report-body {
max-height: 70vh;
overflow-y: auto;
padding-right: 8px;
scroll-behavior: smooth;
}
.report-body::-webkit-scrollbar { width: 5px; }
.report-body::-webkit-scrollbar-track { background: transparent; }
.report-body::-webkit-scrollbar-thumb {
background: rgba(148, 163, 184, 0.2);
border-radius: 10px;
}
/* === STATUS BAR === */
.status-pulse textarea {
border-left: 3px solid #3b82f6 !important;
font-weight: 500 !important;
}
/* === EXPORT BUTTONS ROW === */
.export-row { gap: 8px !important; }
.export-row button { flex: 1 !important; min-height: 44px !important; }
/* === RESPONSIVE β€” MOBILE FIRST === */
@media (max-width: 640px) {
.gradio-container { padding: 10px !important; }
h1 { font-size: 1.35rem !important; }
h3 { font-size: 0.95rem !important; }
textarea, input[type="text"], input[type="password"] {
font-size: 16px !important; /* Prevents iOS auto-zoom */
}
button { min-height: 44px !important; font-size: 14px !important; }
.gr-box, div[class*="panel"] { border-radius: 12px !important; }
}
@media (min-width: 641px) and (max-width: 1024px) {
.gradio-container { padding: 14px !important; }
}
"""
# 🎯 Constants
QUICK_MODE = "Quick Research (Direct)"
DEEP_MODE = "Deep Research & Debate"
DEBATE_SKIPPED = "*Debate skipped for Quick mode.*"
VIZ_DIR = tempfile.mkdtemp(prefix="research_viz_")
GEMINI_MODELS = [
"gemini-2.5-flash",
"gemini-flash-latest",
"gemini-flash-lite-latest",
"gemini-2.5-flash-lite",
"gemini-2.0-flash",
]
# πŸ› οΈ Core Functions
def make_safe(text):
"""
STRICT SANITIZATION: Strips out ALL emojis and non-standard characters.
This guarantees that underlying network libraries on Windows will NEVER
crash with a 'UnicodeEncodeError'.
"""
if not text:
return ""
return str(text).encode("ascii", "ignore").decode("ascii")
def search_web(
api_key, query, time_limit, primary_model=GEMINI_MODELS[0], max_results=3
):
"""Hybrid Grounding Engine: Tries Native Google Search first, falls back to DuckDuckGo."""
# Clean the query so we don't crash building the prompt
safe_query = make_safe(query)
# 1. ATTEMPT NATIVE GOOGLE AI SEARCH GROUNDING
try:
client = genai.Client(api_key=api_key)
time_context = (
f" Focus specifically on recent information from the {time_limit.lower()}."
if time_limit != "All time"
else ""
)
prompt = f"Conduct detailed, objective research on the following query: '{safe_query}'.{time_context} Provide comprehensive facts and statistics."
# Strip the prompt of emojis just to be absolutely safe
safe_prompt = make_safe(prompt)
config = types.GenerateContentConfig(
tools=[{"google_search": {}}], temperature=0.2
)
response = client.models.generate_content(
model=primary_model, contents=safe_prompt, config=config
)
urls = []
if response.candidates and response.candidates[0].grounding_metadata:
gm = response.candidates[0].grounding_metadata
chunks = getattr(gm, "grounding_chunks", [])
for chunk in chunks:
web = getattr(chunk, "web", None)
if web:
uri = getattr(web, "uri", None)
title = getattr(web, "title", "Source")
if uri:
urls.append(f"πŸ”— **[{title}]({uri})**\n> {uri}")
unique_urls = list(dict.fromkeys(urls))
if unique_urls:
# Make sure the returned text from the API doesn't contain weird characters that might crash the next step
return make_safe(response.text), "\n\n".join(unique_urls)
except Exception as e:
print(f"Native Grounding Info (Falling back to DDG): {e}")
# 2. FALLBACK TO DUCKDUCKGO SCAPING
try:
ddgs = DDGS()
timelimit_map = {
"Today": "d",
"Past week": "w",
"Past month": "m",
"Past year": "y",
"All time": None,
}
t = timelimit_map.get(time_limit)
results = list(ddgs.text(safe_query, timelimit=t, max_results=max_results))
extracted = []
urls = []
for r in results:
title = make_safe(r.get("title", "Untitled"))
href = r.get("href", "")
body = make_safe(r.get("body", ""))
if href and href.startswith("http"):
urls.append(f"πŸ”— **[{title}]({href})**\n> {href}")
extracted.append(f"Title: {title}\nLink: {href}\nSnippet: {body}")
url_text = "\n\n".join(urls) if urls else ""
data_text = "\n\n".join(extracted) if extracted else ""
return data_text, url_text
except Exception as e:
return "", f"⚠️ Search error: {e}"
def call_gemini(api_key, prompt, primary_model=GEMINI_MODELS[0], retries=2):
"""Standard LLM execution with strict sanitization to prevent Windows encoding errors."""
client = genai.Client(api_key=api_key)
models_to_try = [primary_model] + [m for m in GEMINI_MODELS if m != primary_model]
# STIRCTLY strip the prompt to plain ASCII to prevent the httpx library from crashing
safe_prompt = make_safe(prompt)
last_error = None
for model in models_to_try:
for attempt in range(retries):
try:
response = client.models.generate_content(
model=model, contents=safe_prompt
)
return response.text # Don't strip the output, Gradio needs to show it. Only the OUTBOUND request causes crashes.
except Exception as e:
last_error = str(e)
if "429" in last_error or "quota" in last_error.lower():
break
if attempt < retries - 1:
time.sleep(2 * (attempt + 1))
continue
break
return f"⚠️ Error connecting to Gemini API. Details: {last_error}"
def execute_chart_code(code_str, output_filename="chart.png"):
match = re.search(r"```python(.*?)```", code_str, re.DOTALL)
if match:
code_str = match.group(1).strip()
code_str = re.sub(
r"plt\.savefig\(['\"].*?['\"]", f"plt.savefig('{output_filename}'", code_str
)
safe_code = (
"import matplotlib\nmatplotlib.use('Agg')\nimport matplotlib.pyplot as plt\n"
+ code_str
)
namespace = {"pd": pd, "np": np}
try:
exec(safe_code, namespace)
if os.path.exists(output_filename):
return output_filename
except Exception:
pass
return None
def generate_visualizations(
api_key, topic, research_data, num_charts=1, primary_model=GEMINI_MODELS[0]
):
chart_types = [
("statistical chart (bar, pie, line, or scatter)", "viz_chart"),
("comparison table as an image using matplotlib", "viz_table"),
("flowchart or process diagram using matplotlib", "viz_flow"),
]
results = []
for i in range(min(num_charts, 3)):
chart_desc, prefix = chart_types[i]
out_path = os.path.join(VIZ_DIR, f"{prefix}_{int(time.time())}_{i}.png")
chart_prompt = f"""Write a Python script using matplotlib to create a {chart_desc} based on: '{topic}'.
Research context: {research_data[:1500]}
1. Import matplotlib.pyplot as plt
2. Apply a dark theme using plt.style.use('dark_background')
3. MUST save the figure as '{out_path}' using plt.savefig('{out_path}', bbox_inches='tight', dpi=150)
4. Output ONLY valid python code inside ```python ``` blocks."""
code_response = call_gemini(api_key, chart_prompt, primary_model=primary_model)
chart_path = execute_chart_code(code_response, output_filename=out_path)
if chart_path:
results.append(chart_path)
return results
def generate_custom_viz(api_key, viz_prompt, primary_model=GEMINI_MODELS[0]):
"""Generate a standalone custom visualization from sidebar prompt."""
if not api_key or not viz_prompt:
return []
out_path = os.path.join(VIZ_DIR, f"custom_{int(time.time())}.png")
chart_prompt = f"""Write a Python script using matplotlib to create a visualization for: '{viz_prompt}'.
1. Import matplotlib.pyplot as plt
2. Apply a dark theme using plt.style.use('dark_background')
3. Make it visually clear and professional.
4. MUST save the figure as '{out_path}' using plt.savefig('{out_path}', bbox_inches='tight', dpi=150)
5. Output ONLY valid python code inside ```python ``` blocks. No explanations."""
code_response = call_gemini(api_key, chart_prompt, primary_model=primary_model)
chart_path = execute_chart_code(code_response, output_filename=out_path)
if chart_path:
return [chart_path]
return []
# πŸ“€ Export Functions β€” MD / PDF / DOCX
# def _pdf_safe(text):
# """Strip markdown formatting and encode to Latin-1 for PDF built-in fonts."""
# if not text:
# return ""
# text = re.sub(r"\*\*(.*?)\*\*", r"\1", text)
# text = re.sub(r"\*(.*?)\*", r"\1", text)
# text = re.sub(r"\[(.*?)\]\(.*?\)", r"\1", text)
# return text.encode("latin-1", "replace").decode("latin-1")
def _parse_md_table_block(lines_subset):
"""Parse markdown table lines into list of row-cell lists, skipping separator rows."""
rows = []
for line in lines_subset:
stripped = line.strip()
if stripped.startswith("|") and stripped.endswith("|"):
cells = [c.strip() for c in stripped.split("|")[1:-1]]
if cells and not all(set(c.strip()) <= set("-: ") for c in cells):
rows.append(cells)
return rows
def _extract_viz_paths(viz_data):
"""Extract file paths from Gradio Gallery data (handles multiple formats)."""
if not viz_data:
return []
paths = []
for item in viz_data:
if isinstance(item, str):
paths.append(item)
elif isinstance(item, (list, tuple)) and item:
paths.append(str(item[0]))
elif isinstance(item, dict):
for key in ("image", "name", "path", "url"):
val = item.get(key)
if val:
if isinstance(val, dict):
val = val.get("path", val.get("url", ""))
paths.append(str(val))
break
return [p for p in paths if p and os.path.exists(p)]
def export_report_md(final_text, surveyed_urls, debate_text):
"""Export the report as a Markdown file."""
if not final_text or final_text.startswith("*The final"):
return None
report = f"# Research Report\n\n## Final Intelligence Report\n\n{final_text}\n\n\n\n## Surveyed Resources\n\n{surveyed_urls}\n\n\n\n## Debate Transcript\n\n{debate_text}\n"
out_path = os.path.join(VIZ_DIR, f"report_{int(time.time())}.md")
with open(out_path, "w", encoding="utf-8") as f:
f.write(report)
return out_path
# def export_report_pdf(final_text, surveyed_urls, debate_text, viz_data):
# """Export as PDF with embedded charts and properly formatted tables."""
# if not final_text or final_text.startswith("*The final"):
# return None
# try:
# from fpdf import FPDF
# except ImportError:
# return None
# pdf = FPDF()
# pdf.set_auto_page_break(auto=True, margin=15)
# def _add_section(title, text):
# pdf.add_page()
# pdf.set_font("Helvetica", "B", 18)
# pdf.cell(0, 12, _pdf_safe(title))
# pdf.ln(16)
# lines = text.split("\n")
# i = 0
# while i < len(lines):
# line = lines[i].strip()
# # β€” Table block β€”
# if line.startswith("|") and "|" in line[1:]:
# table_lines = []
# while i < len(lines) and lines[i].strip().startswith("|"):
# table_lines.append(lines[i])
# i += 1
# rows = _parse_md_table_block(table_lines)
# if rows:
# n_cols = max(len(r) for r in rows)
# col_w = (pdf.w - pdf.l_margin - pdf.r_margin) / max(n_cols, 1)
# for ri, row in enumerate(rows):
# pdf.set_font("Helvetica", "B" if ri == 0 else "", 8)
# for ci in range(n_cols):
# cell_t = row[ci] if ci < len(row) else ""
# pdf.cell(col_w, 6, _pdf_safe(cell_t)[:60], border=1)
# pdf.ln()
# pdf.ln(4)
# continue
# # β€” Headings β€”
# if line.startswith("### "):
# pdf.set_font("Helvetica", "B", 12)
# pdf.cell(0, 8, _pdf_safe(line[4:]))
# pdf.ln(10)
# elif line.startswith("## "):
# pdf.set_font("Helvetica", "B", 14)
# pdf.cell(0, 9, _pdf_safe(line[3:]))
# pdf.ln(11)
# elif line.startswith("# "):
# pdf.set_font("Helvetica", "B", 16)
# pdf.cell(0, 10, _pdf_safe(line[2:]))
# pdf.ln(12)
# elif line.startswith(("- ", "* ")):
# pdf.set_font("Helvetica", "", 10)
# pdf.cell(6, 6, "-")
# pdf.multi_cell(0, 6, _pdf_safe(line[2:]))
# elif line == "":
# pdf.ln(3)
# else:
# pdf.set_font("Helvetica", "", 10)
# pdf.multi_cell(0, 6, _pdf_safe(line))
# i += 1
# _add_section("Final Intelligence Report", final_text)
# _add_section("Surveyed Resources", surveyed_urls)
# if debate_text and debate_text != DEBATE_SKIPPED:
# _add_section("Debate Transcript", debate_text)
# # Embed chart images
# chart_paths = _extract_viz_paths(viz_data)
# if chart_paths:
# pdf.add_page()
# pdf.set_font("Helvetica", "B", 18)
# pdf.cell(0, 12, "Data Visualizations")
# pdf.ln(16)
# for path in chart_paths:
# try:
# img_w = pdf.w - pdf.l_margin - pdf.r_margin
# pdf.image(path, w=img_w)
# pdf.ln(10)
# except Exception:
# pass
# out_path = os.path.join(VIZ_DIR, f"report_{int(time.time())}.pdf")
# pdf.output(out_path)
# return out_path
def export_report_docx(final_text, surveyed_urls, debate_text, viz_data):
"""Export as DOCX with embedded charts and properly formatted tables."""
if not final_text or final_text.startswith("*The final"):
return None
try:
from docx import Document
from docx.shared import Inches, Pt
except ImportError:
return None
doc = Document()
style = doc.styles["Normal"]
style.font.name = "Calibri"
style.font.size = Pt(11)
def _clean_md(text):
text = re.sub(r"\*\*(.*?)\*\*", r"\1", text)
text = re.sub(r"\*(.*?)\*", r"\1", text)
text = re.sub(r"\[(.*?)\]\(.*?\)", r"\1", text)
return text
def _add_section(title, text):
doc.add_heading(title, level=1)
lines = text.split("\n")
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
# β€” Table block β€”
if stripped.startswith("|") and "|" in stripped[1:]:
table_lines = []
while i < len(lines) and lines[i].strip().startswith("|"):
table_lines.append(lines[i])
i += 1
rows = _parse_md_table_block(table_lines)
if rows:
n_cols = max(len(r) for r in rows)
tbl = doc.add_table(
rows=len(rows), cols=n_cols, style="Light Shading Accent 1"
)
for ri, row_data in enumerate(rows):
for ci in range(n_cols):
cell_t = row_data[ci] if ci < len(row_data) else ""
tbl.rows[ri].cells[ci].text = _clean_md(cell_t)
doc.add_paragraph()
continue
# β€” Headings β€”
if stripped.startswith("### "):
doc.add_heading(_clean_md(stripped[4:]), level=3)
elif stripped.startswith("## "):
doc.add_heading(_clean_md(stripped[3:]), level=2)
elif stripped.startswith("# "):
doc.add_heading(_clean_md(stripped[2:]), level=1)
elif stripped.startswith(("- ", "* ")):
doc.add_paragraph(_clean_md(stripped[2:]), style="List Bullet")
elif stripped == "":
pass # skip blank
else:
doc.add_paragraph(_clean_md(stripped))
i += 1
_add_section("Final Intelligence Report", final_text)
_add_section("Surveyed Resources", surveyed_urls)
if debate_text and debate_text != DEBATE_SKIPPED:
_add_section("Debate Transcript", debate_text)
# Embed chart images
chart_paths = _extract_viz_paths(viz_data)
if chart_paths:
doc.add_heading("Data Visualizations", level=1)
for path in chart_paths:
try:
doc.add_picture(path, width=Inches(6))
doc.add_paragraph()
except Exception:
pass
out_path = os.path.join(VIZ_DIR, f"report_{int(time.time())}.docx")
doc.save(out_path)
return out_path
def clear_outputs():
return (
"",
"*Web URLs will appear here...*",
"*Debate transcript will stream here...*",
"*The final synthesis will appear here...*",
[],
None,
)
# 🧠 Multi-Agent Orchestration Workflow
def orchestrate_agents(
topic, mode, time_limit, num_viz, api_key, primary_model, history
):
if not api_key:
yield (
"❌ Error: Please provide a Gemini API Key in the sidebar.",
"No sites",
"No debate",
"Error",
[],
history,
gr.update(),
"Error",
)
return
if not topic.strip():
yield (
"❌ Error: Please enter a research topic.",
"",
"",
"",
[],
history,
gr.update(),
"Error",
)
return
log, live_debate = [], ""
def update_log(msg):
log.append(f"βœ… {msg}")
return "\n".join(log)
# 1. Determine Routing
actual_mode = mode
if mode == "Auto":
yield (
update_log("Auto-Routing: Deciding research depth..."),
"",
"",
"Analyzing topic complexity...",
[],
history,
gr.update(),
"πŸ”„ Routing...",
)
decision = (
call_gemini(
api_key,
f"Analyze: '{topic}'. Quick factual question or complex deep research? Reply 'Quick' or 'Deep'.",
primary_model=primary_model,
)
.strip()
.lower()
)
actual_mode = QUICK_MODE if "quick" in decision else DEEP_MODE
yield (
update_log(f"Auto-Routing decided: {actual_mode}"),
"",
"",
"Routing chosen...",
[],
history,
gr.update(),
f"Mode: {actual_mode}",
)
# 2. Web Grounding Generation
yield (
update_log("Agents brainstorming search strategies..."),
"πŸ’‘ Generating queries...",
"",
"Optimizing intents...",
[],
history,
gr.update(),
"🧠 Thinking...",
)
queries_raw = (
call_gemini(
api_key,
f"Topic: '{topic}'. Generate exactly 2 highly effective search queries. Return ONLY queries, one per line.",
primary_model=primary_model,
)
.strip()
.split("\n")
)
search_queries = [
q.strip(' "-*') for q in queries_raw if q.strip() and "Error" not in q
][:2] or [topic]
yield (
update_log("Triggering Google AI Search Grounding..."),
"πŸ”Ž Extracting context...",
"",
"Gathering grounded data...",
[],
history,
gr.update(),
"🌐 Grounding...",
)
all_broad_data, all_surveyed_urls = "", ""
for q in search_queries:
b_data, s_urls = search_web(
api_key, q, time_limit, primary_model, max_results=3
)
if b_data:
all_broad_data += f"\n\nSource [{q}]:\n" + b_data
if s_urls and "⚠️" not in s_urls:
all_surveyed_urls += s_urls + "\n\n"
all_surveyed_urls = all_surveyed_urls.strip() or "⚠️ No valid links retrieved."
yield (
update_log("Grounding complete."),
all_surveyed_urls,
"",
"Synthesizing...",
[],
history,
gr.update(),
"πŸ“Š Analyzing...",
)
gallery_images, final_answer = [], ""
# 3. Execution
if actual_mode == QUICK_MODE:
yield (
update_log("Executing Quick Direct Answer..."),
all_surveyed_urls,
DEBATE_SKIPPED,
"Drafting final answer...",
[],
history,
gr.update(),
"✍️ Writing...",
)
prompt = f"You are a pragmatic expert. Based on this grounded data: {all_broad_data}. Answer: '{topic}'. Tone: Layman, simple. Provide verified resources."
final_answer = call_gemini(api_key, prompt, primary_model=primary_model)
else:
yield (
update_log("Deep Research: Agent 1 analyzing..."),
all_surveyed_urls,
live_debate,
"Analyzing...",
[],
history,
gr.update(),
"πŸ”¬ Agent 1...",
)
ra1_findings = call_gemini(
api_key,
f"Analyze raw data for '{topic}': {all_broad_data}. Extract core facts.",
primary_model=primary_model,
)
yield (
update_log("Deep Research: Agent 2 cross-referencing..."),
all_surveyed_urls,
live_debate,
"Cross-referencing...",
[],
history,
gr.update(),
"πŸ” Agent 2...",
)
deep_data, deep_urls = search_web(
api_key,
f"{topic} critical analysis",
time_limit,
primary_model,
max_results=2,
)
if deep_urls and "⚠️" not in deep_urls:
all_surveyed_urls += "\n\n\n\n**Deep Search Results:**\n\n" + deep_urls
master_research = call_gemini(
api_key,
f"Review Agent 1: {ra1_findings}. Cross-reference with: {deep_data}. Output verified master summary.",
primary_model=primary_model,
)
tone = "Tone: Use simple, layman terms. Be rational and constructive."
yield (
update_log("Debate Round 1..."),
all_surveyed_urls,
live_debate,
"Debating...",
[],
history,
gr.update(),
"βš–οΈ Debate R1...",
)
da1_r1 = call_gemini(
api_key,
f"Debate AI 1: Propose an answer to '{topic}' using: {master_research}. Under 100 words. {tone}",
primary_model=primary_model,
)
live_debate += f"**πŸ€– AI 1 (Proposal):**\n{da1_r1}\n\n"
da2_r1 = call_gemini(
api_key,
f"Debate AI 2: Review AI 1's draft: {da1_r1}. Point out missing context. Under 100 words. {tone}",
primary_model=primary_model,
)
live_debate += f"**🧐 AI 2 (Critique):**\n{da2_r1}\n\n"
yield (
update_log("Debate Round 2..."),
all_surveyed_urls,
live_debate,
"Debating...",
[],
history,
gr.update(),
"βš–οΈ Debate R2...",
)
da1_r2 = call_gemini(
api_key,
f"Debate AI 1: Refine based on AI 2's review: {da2_r1}. Under 100 words. {tone}",
primary_model=primary_model,
)
live_debate += f"**πŸ€– AI 1 (Refinement):**\n{da1_r2}\n\n"
da2_r2 = call_gemini(
api_key,
f"Debate AI 2: Final check on AI 1's revision: {da1_r2}. Under 100 words. {tone}",
primary_model=primary_model,
)
live_debate += f"**🧐 AI 2 (Final Check):**\n{da2_r2}\n\n"
yield (
update_log("Master Orchestrator drafting output..."),
all_surveyed_urls,
live_debate,
"Drafting Final Report...",
[],
history,
gr.update(),
"πŸ“ Synthesizing...",
)
final_prompt = f"""You are the Final Orchestrator. Review this debate for topic '{topic}':
AI 1: {da1_r2}
AI 2: {da2_r2}
Create the final intelligence report.
RULES:
1. Tone: Simple, layman-friendly. Use examples and analogies.
2. Formatting: Beautiful Markdown (headers, bullet points, tables if applicable).
3. End with '### πŸ“š Verified Resources' with clickable markdown links."""
final_answer = call_gemini(api_key, final_prompt, primary_model=primary_model)
debate_display = live_debate if actual_mode != QUICK_MODE else DEBATE_SKIPPED
yield (
update_log("Final text generated."),
all_surveyed_urls,
debate_display,
final_answer,
[],
history,
gr.update(),
"βœ… Report ready",
)
# 4. Visualizations
if num_viz > 0:
yield (
update_log(f"Generating {num_viz} visualization(s)..."),
all_surveyed_urls,
debate_display,
final_answer,
[],
history,
gr.update(),
"πŸ“Š Generating charts...",
)
gallery_images = generate_visualizations(
api_key,
topic,
all_broad_data,
num_charts=num_viz,
primary_model=primary_model,
)
yield (
update_log(f"{len(gallery_images)} visualization(s) generated!"),
all_surveyed_urls,
debate_display,
final_answer,
gallery_images,
history,
gr.update(),
"βœ… Charts ready",
)
# 5. Complete
yield (
update_log("All Operations Completed Successfully!"),
all_surveyed_urls,
debate_display,
final_answer,
gallery_images,
history,
gr.update(),
"βœ… Done!",
)
history.append(
{
"topic": topic,
"log": "\n".join(log),
"urls": all_surveyed_urls,
"debate": debate_display,
"final": final_answer,
"charts": gallery_images,
}
)
yield (
"\n".join(log),
all_surveyed_urls,
debate_display,
final_answer,
gallery_images,
history,
gr.update(choices=[h["topic"] for h in history]),
"βœ… Done!",
)
def load_from_history(selected_topic, history):
for item in history:
if item["topic"] == selected_topic:
return (
item["log"],
item["urls"],
item["debate"],
item["final"],
item.get("charts", []),
)
return "", "", "", "No history found.", []
# πŸ–₯️ Responsive Dashboard UI β€” No Hidden Sidebar
with gr.Blocks(title="AI Research Hub") as app:
history_state = gr.State([])
# ── Header ──
gr.Markdown("# πŸ” Multi-Agent Research Hub")
gr.Markdown(
"*Native Google AI Grounding Β· Auto-Routing Β· Live Debates Β· Multi-Viz Analytics*"
)
# ── Config Row: API Key + Model + Mode (always visible on all screens) ──
with gr.Row():
api_key = gr.Textbox(
label="πŸ”‘ Gemini API Key",
type="password",
placeholder="AIzaSy...",
scale=2,
)
model_select = gr.Dropdown(
choices=GEMINI_MODELS,
value=GEMINI_MODELS[0],
label="πŸ€– Primary Model",
scale=1,
)
mode = gr.Radio(
["Auto", QUICK_MODE, DEEP_MODE],
value="Auto",
label="🧠 Mode",
scale=1,
)
# ── Topic Input ──
topic = gr.Textbox(
label="πŸ” Research Topic",
placeholder="Enter any topic to research...",
lines=2,
)
# ── Controls Row: Time + Viz Count + Submit ──
with gr.Row():
time_limit = gr.Dropdown(
["All time", "Past year", "Past month", "Past week", "Today"],
value="All time",
label="πŸ“… Time Cutoff",
scale=1,
)
num_viz = gr.Slider(
minimum=0,
maximum=3,
step=1,
value=1,
label="πŸ“Š Visualizations",
scale=1,
)
submit_btn = gr.Button(
"πŸš€ Start Research", variant="primary", size="lg", scale=1
)
# ── Status Bar ──
status_bar = gr.Textbox(
show_label=False,
interactive=False,
lines=1,
placeholder="Ready to research...",
elem_classes=["status-pulse"],
)
# ── Results: Workflow Logs + Grounded Resources ──
with gr.Row():
with gr.Column(scale=1, min_width=280):
with gr.Accordion("πŸ€– Workflow Logs", open=True):
progress_box = gr.Textbox(show_label=False, lines=8, interactive=False)
with gr.Column(scale=1, min_width=280):
with gr.Accordion("🌐 Grounded Resources", open=True):
surveyed_sites = gr.Markdown(
"*Web URLs will appear here...*",
elem_classes=["surveyed-links"],
)
# ── Live Debate ──
with gr.Accordion("βš–οΈ Live AI Debate", open=False):
live_debate = gr.Markdown("*Debate transcript will stream here...*")
# ── Final Report ──
gr.Markdown("")
gr.Markdown("### πŸ“‘ Final Intelligence Report")
final_output = gr.Markdown(
"*The final synthesis will appear here...*",
elem_classes=["report-body"],
)
# ── Data Visualizations ──
gr.Markdown("")
gr.Markdown("### πŸ“Š Data Visualizations")
viz_gallery = gr.Gallery(
label="Generated Visualizations",
columns=3,
height=350,
object_fit="contain",
interactive=False,
elem_classes=["viz-gallery"],
)
# ── Export Buttons (always visible, touch-friendly) ──
gr.Markdown("### πŸ“₯ Export Report")
with gr.Row(elem_classes=["export-row"]):
export_md_btn = gr.Button("πŸ“„ Markdown", variant="secondary", size="sm")
# export_pdf_btn = gr.Button("πŸ“• PDF", variant="secondary", size="sm")
export_docx_btn = gr.Button("πŸ“˜ Word DOCX", variant="secondary", size="sm")
clear_btn = gr.Button("πŸ—‘οΈ Clear All", variant="secondary", size="sm")
export_file = gr.File(label="Download", visible=True, interactive=False)
# ── Custom Visualization (bottom accordion) ──
with gr.Accordion("🎨 Custom Visualization", open=False):
custom_viz_prompt = gr.Textbox(
label="Describe your chart",
placeholder="e.g. Pie chart of global energy sources",
lines=2,
)
custom_viz_btn = gr.Button("πŸ“Š Generate", variant="primary", size="sm")
custom_viz_gallery = gr.Gallery(
label="Custom Charts",
columns=2,
height=200,
object_fit="contain",
interactive=False,
)
# ── History (bottom accordion) ──
with gr.Accordion("πŸ•°οΈ History", open=False):
history_dropdown = gr.Dropdown(label="Past Queries", choices=[])
load_history_btn = gr.Button("πŸ“‚ Load", variant="secondary", size="sm")
# ── Event Handlers ──
submit_btn.click(
orchestrate_agents,
inputs=[topic, mode, time_limit, num_viz, api_key, model_select, history_state],
outputs=[
progress_box,
surveyed_sites,
live_debate,
final_output,
viz_gallery,
history_state,
history_dropdown,
status_bar,
],
)
load_history_btn.click(
load_from_history,
inputs=[history_dropdown, history_state],
outputs=[progress_box, surveyed_sites, live_debate, final_output, viz_gallery],
)
export_md_btn.click(
export_report_md,
inputs=[final_output, surveyed_sites, live_debate],
outputs=[export_file],
)
# export_pdf_btn.click(
# export_report_pdf,
# inputs=[final_output, surveyed_sites, live_debate, viz_gallery],
# outputs=[export_file],
# )
export_docx_btn.click(
export_report_docx,
inputs=[final_output, surveyed_sites, live_debate, viz_gallery],
outputs=[export_file],
)
clear_btn.click(
clear_outputs,
outputs=[
progress_box,
surveyed_sites,
live_debate,
final_output,
viz_gallery,
export_file,
],
)
custom_viz_btn.click(
generate_custom_viz,
inputs=[api_key, custom_viz_prompt, model_select],
outputs=[custom_viz_gallery],
)
if __name__ == "__main__":
app.launch(theme=gr.themes.Soft(), css=glassy_css)