import os import re import tempfile import time import gradio as gr import numpy as np import pandas as pd from duckduckgo_search import DDGS from google import genai from google.genai import types # ๐ŸŽจ Responsive Glassmorphism CSS glassy_css = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); *, *::before, *::after { box-sizing: border-box; } body, html { background: linear-gradient(135deg, #0a0f1a 0%, #111827 40%, #1a2332 100%) !important; background-attachment: fixed; color: #e0e0e0 !important; font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif !important; } .gradio-container { background: transparent !important; max-width: 1500px !important; margin: 0 auto !important; padding: 12px !important; } /* ===== RESPONSIVE STACKING ===== */ @media (max-width: 768px) { .gradio-container { padding: 6px !important; } .main-row { flex-direction: column !important; } .main-row > .gr-column { min-width: 100% !important; max-width: 100% !important; } .sidebar-col { display: none !important; } h1 { font-size: 1.4rem !important; } h3 { font-size: 1rem !important; } } @media (min-width: 769px) and (max-width: 1024px) { .main-row { flex-wrap: wrap !important; } .main-row > .gr-column { min-width: 48% !important; } .sidebar-col { min-width: 100% !important; } } /* ===== GLASS PANELS ===== */ div[class*="panel"] { background: rgba(255, 255, 255, 0.03) !important; border: 1px solid rgba(255, 255, 255, 0.08) !important; backdrop-filter: blur(20px) !important; -webkit-backdrop-filter: blur(20px) !important; border-radius: 16px !important; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.4) !important; padding: 16px !important; } /* ===== SIDEBAR ===== */ .sidebar-col { border-right: 1px solid rgba(255,255,255,0.06) !important; } .sidebar-col .gr-accordion { margin-bottom: 8px !important; } /* ===== INPUTS ===== */ textarea, input[type="text"], input[type="password"] { background: rgba(0, 0, 0, 0.3) !important; border: 1px solid rgba(255, 255, 255, 0.12) !important; color: #fff !important; border-radius: 10px !important; transition: border-color 0.2s ease !important; font-family: 'Inter', sans-serif !important; } textarea:focus, input:focus { border-color: rgba(0, 200, 150, 0.5) !important; box-shadow: 0 0 12px rgba(0, 200, 150, 0.15) !important; } /* ===== PRIMARY BUTTON ===== */ button.primary { background: linear-gradient(135deg, #00c896 0%, #00b4d8 100%) !important; border: none !important; color: #fff !important; font-weight: 600 !important; border-radius: 10px !important; padding: 10px 20px !important; transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; box-shadow: 0 4px 15px rgba(0, 200, 150, 0.3) !important; } button.primary:hover { transform: translateY(-2px) !important; box-shadow: 0 6px 20px rgba(0, 200, 150, 0.5) !important; } /* ===== SECONDARY BUTTON ===== */ button.secondary { background: rgba(255,255,255,0.06) !important; border: 1px solid rgba(255,255,255,0.15) !important; color: #c0c0c0 !important; border-radius: 8px !important; transition: all 0.2s ease !important; } button.secondary:hover { background: rgba(255,255,255,0.12) !important; color: #fff !important; } /* ===== TYPOGRAPHY ===== */ h1 { color: #ffffff !important; font-weight: 700 !important; letter-spacing: -0.5px !important; background: linear-gradient(135deg, #00c896, #00b4d8) !important; -webkit-background-clip: text !important; -webkit-text-fill-color: transparent !important; background-clip: text !important; } h2, h3, h4 { color: #e8e8e8 !important; font-weight: 600 !important; } p, span, label { color: #c0c0c0 !important; } /* ===== SURVEYED LINKS ===== */ .surveyed-links a { color: #60efff !important; text-decoration: underline !important; word-break: break-all !important; } .surveyed-links p { margin-bottom: 8px !important; line-height: 1.6 !important; } /* ===== GALLERY ===== */ .viz-gallery { min-height: 200px; } .viz-gallery .gallery-item img { border-radius: 12px !important; border: 1px solid rgba(255,255,255,0.08) !important; cursor: pointer !important; } /* ===== ACCORDION ===== */ .gr-accordion { border-radius: 12px !important; overflow: hidden !important; } /* ===== SCROLLABLE MARKDOWN ===== */ .report-body { max-height: 70vh; overflow-y: auto; padding-right: 8px; } .report-body::-webkit-scrollbar { width: 6px; } .report-body::-webkit-scrollbar-thumb { background: rgba(255,255,255,0.15); border-radius: 3px; } """ # ๐ŸŽฏ Constants QUICK_MODE = "Quick Research (Direct)" DEEP_MODE = "Deep Research & Debate" DEBATE_SKIPPED = "*Debate skipped for Quick mode.*" VIZ_DIR = tempfile.mkdtemp(prefix="research_viz_") GEMINI_MODELS = [ "gemini-2.5-flash", "gemini-flash-latest", "gemini-flash-lite-latest", "gemini-2.5-flash-lite", "gemini-2.0-flash", ] # ๐Ÿ› ๏ธ Core Functions def make_safe(text): """ STRICT SANITIZATION: Strips out ALL emojis and non-standard characters. This guarantees that underlying network libraries on Windows will NEVER crash with a 'UnicodeEncodeError'. """ if not text: return "" return str(text).encode("ascii", "ignore").decode("ascii") def search_web( api_key, query, time_limit, primary_model=GEMINI_MODELS[0], max_results=3 ): """Hybrid Grounding Engine: Tries Native Google Search first, falls back to DuckDuckGo.""" # Clean the query so we don't crash building the prompt safe_query = make_safe(query) # 1. ATTEMPT NATIVE GOOGLE AI SEARCH GROUNDING try: client = genai.Client(api_key=api_key) time_context = ( f" Focus specifically on recent information from the {time_limit.lower()}." if time_limit != "All time" else "" ) prompt = f"Conduct detailed, objective research on the following query: '{safe_query}'.{time_context} Provide comprehensive facts and statistics." # Strip the prompt of emojis just to be absolutely safe safe_prompt = make_safe(prompt) config = types.GenerateContentConfig( tools=[{"google_search": {}}], temperature=0.2 ) response = client.models.generate_content( model=primary_model, contents=safe_prompt, config=config ) urls = [] if response.candidates and response.candidates[0].grounding_metadata: gm = response.candidates[0].grounding_metadata chunks = getattr(gm, "grounding_chunks", []) for chunk in chunks: web = getattr(chunk, "web", None) if web: uri = getattr(web, "uri", None) title = getattr(web, "title", "Source") if uri: urls.append(f"๐Ÿ”— **[{title}]({uri})**\n> {uri}") unique_urls = list(dict.fromkeys(urls)) if unique_urls: # Make sure the returned text from the API doesn't contain weird characters that might crash the next step return make_safe(response.text), "\n\n".join(unique_urls) except Exception as e: print(f"Native Grounding Info (Falling back to DDG): {e}") # 2. FALLBACK TO DUCKDUCKGO SCAPING try: ddgs = DDGS() timelimit_map = { "Today": "d", "Past week": "w", "Past month": "m", "Past year": "y", "All time": None, } t = timelimit_map.get(time_limit) results = list(ddgs.text(safe_query, timelimit=t, max_results=max_results)) extracted = [] urls = [] for r in results: title = make_safe(r.get("title", "Untitled")) href = r.get("href", "") body = make_safe(r.get("body", "")) if href and href.startswith("http"): urls.append(f"๐Ÿ”— **[{title}]({href})**\n> {href}") extracted.append(f"Title: {title}\nLink: {href}\nSnippet: {body}") url_text = "\n\n".join(urls) if urls else "" data_text = "\n\n".join(extracted) if extracted else "" return data_text, url_text except Exception as e: return "", f"โš ๏ธ Search error: {e}" def call_gemini(api_key, prompt, primary_model=GEMINI_MODELS[0], retries=2): """Standard LLM execution with strict sanitization to prevent Windows encoding errors.""" client = genai.Client(api_key=api_key) models_to_try = [primary_model] + [m for m in GEMINI_MODELS if m != primary_model] # STIRCTLY strip the prompt to plain ASCII to prevent the httpx library from crashing safe_prompt = make_safe(prompt) last_error = None for model in models_to_try: for attempt in range(retries): try: response = client.models.generate_content( model=model, contents=safe_prompt ) return response.text # Don't strip the output, Gradio needs to show it. Only the OUTBOUND request causes crashes. except Exception as e: last_error = str(e) if "429" in last_error or "quota" in last_error.lower(): break if attempt < retries - 1: time.sleep(2 * (attempt + 1)) continue break return f"โš ๏ธ Error connecting to Gemini API. Details: {last_error}" def execute_chart_code(code_str, output_filename="chart.png"): match = re.search(r"```python(.*?)```", code_str, re.DOTALL) if match: code_str = match.group(1).strip() code_str = re.sub( r"plt\.savefig\(['\"].*?['\"]", f"plt.savefig('{output_filename}'", code_str ) safe_code = ( "import matplotlib\nmatplotlib.use('Agg')\nimport matplotlib.pyplot as plt\n" + code_str ) namespace = {"pd": pd, "np": np} try: exec(safe_code, namespace) if os.path.exists(output_filename): return output_filename except Exception: pass return None def generate_visualizations( api_key, topic, research_data, num_charts=1, primary_model=GEMINI_MODELS[0] ): chart_types = [ ("statistical chart (bar, pie, line, or scatter)", "viz_chart"), ("comparison table as an image using matplotlib", "viz_table"), ("flowchart or process diagram using matplotlib", "viz_flow"), ] results = [] for i in range(min(num_charts, 3)): chart_desc, prefix = chart_types[i] out_path = os.path.join(VIZ_DIR, f"{prefix}_{int(time.time())}_{i}.png") chart_prompt = f"""Write a Python script using matplotlib to create a {chart_desc} based on: '{topic}'. Research context: {research_data[:1500]} 1. Import matplotlib.pyplot as plt 2. Apply a dark theme using plt.style.use('dark_background') 3. MUST save the figure as '{out_path}' using plt.savefig('{out_path}', bbox_inches='tight', dpi=150) 4. Output ONLY valid python code inside ```python ``` blocks.""" code_response = call_gemini(api_key, chart_prompt, primary_model=primary_model) chart_path = execute_chart_code(code_response, output_filename=out_path) if chart_path: results.append(chart_path) return results def generate_custom_viz(api_key, viz_prompt, primary_model=GEMINI_MODELS[0]): """Generate a standalone custom visualization from sidebar prompt.""" if not api_key or not viz_prompt: return [] out_path = os.path.join(VIZ_DIR, f"custom_{int(time.time())}.png") chart_prompt = f"""Write a Python script using matplotlib to create a visualization for: '{viz_prompt}'. 1. Import matplotlib.pyplot as plt 2. Apply a dark theme using plt.style.use('dark_background') 3. Make it visually clear and professional. 4. MUST save the figure as '{out_path}' using plt.savefig('{out_path}', bbox_inches='tight', dpi=150) 5. Output ONLY valid python code inside ```python ``` blocks. No explanations.""" code_response = call_gemini(api_key, chart_prompt, primary_model=primary_model) chart_path = execute_chart_code(code_response, output_filename=out_path) if chart_path: return [chart_path] return [] def export_report(final_text, surveyed_urls, debate_text): if not final_text or final_text.startswith("*The final"): return None report = f"# Research Report\n\n## Final Intelligence Report\n\n{final_text}\n\n\n\n## Surveyed Resources\n\n{surveyed_urls}\n\n\n\n## Debate Transcript\n\n{debate_text}\n" out_path = os.path.join(VIZ_DIR, f"report_{int(time.time())}.md") with open(out_path, "w", encoding="utf-8") as f: f.write(report) return out_path def clear_outputs(): return ( "", "*Web URLs will appear here...*", "*Debate transcript will stream here...*", "*The final synthesis will appear here...*", [], None, ) # ๐Ÿง  Multi-Agent Orchestration Workflow def orchestrate_agents( topic, mode, time_limit, num_viz, api_key, primary_model, history ): if not api_key: yield ( "โŒ Error: Please provide a Gemini API Key in the sidebar.", "No sites", "No debate", "Error", [], history, gr.update(), "Error", ) return if not topic.strip(): yield ( "โŒ Error: Please enter a research topic.", "", "", "", [], history, gr.update(), "Error", ) return log, live_debate = [], "" def update_log(msg): log.append(f"โœ… {msg}") return "\n".join(log) # 1. Determine Routing actual_mode = mode if mode == "Auto": yield ( update_log("Auto-Routing: Deciding research depth..."), "", "", "Analyzing topic complexity...", [], history, gr.update(), "๐Ÿ”„ Routing...", ) decision = ( call_gemini( api_key, f"Analyze: '{topic}'. Quick factual question or complex deep research? Reply 'Quick' or 'Deep'.", primary_model=primary_model, ) .strip() .lower() ) actual_mode = QUICK_MODE if "quick" in decision else DEEP_MODE yield ( update_log(f"Auto-Routing decided: {actual_mode}"), "", "", "Routing chosen...", [], history, gr.update(), f"Mode: {actual_mode}", ) # 2. Web Grounding Generation yield ( update_log("Agents brainstorming search strategies..."), "๐Ÿ’ก Generating queries...", "", "Optimizing intents...", [], history, gr.update(), "๐Ÿง  Thinking...", ) queries_raw = ( call_gemini( api_key, f"Topic: '{topic}'. Generate exactly 2 highly effective search queries. Return ONLY queries, one per line.", primary_model=primary_model, ) .strip() .split("\n") ) search_queries = [ q.strip(' "-*') for q in queries_raw if q.strip() and "Error" not in q ][:2] or [topic] yield ( update_log("Triggering Google AI Search Grounding..."), "๐Ÿ”Ž Extracting context...", "", "Gathering grounded data...", [], history, gr.update(), "๐ŸŒ Grounding...", ) all_broad_data, all_surveyed_urls = "", "" for q in search_queries: b_data, s_urls = search_web( api_key, q, time_limit, primary_model, max_results=3 ) if b_data: all_broad_data += f"\n\nSource [{q}]:\n" + b_data if s_urls and "โš ๏ธ" not in s_urls: all_surveyed_urls += s_urls + "\n\n" all_surveyed_urls = all_surveyed_urls.strip() or "โš ๏ธ No valid links retrieved." yield ( update_log("Grounding complete."), all_surveyed_urls, "", "Synthesizing...", [], history, gr.update(), "๐Ÿ“Š Analyzing...", ) gallery_images, final_answer = [], "" # 3. Execution if actual_mode == QUICK_MODE: yield ( update_log("Executing Quick Direct Answer..."), all_surveyed_urls, DEBATE_SKIPPED, "Drafting final answer...", [], history, gr.update(), "โœ๏ธ Writing...", ) prompt = f"You are a pragmatic expert. Based on this grounded data: {all_broad_data}. Answer: '{topic}'. Tone: Layman, simple. Provide verified resources." final_answer = call_gemini(api_key, prompt, primary_model=primary_model) else: yield ( update_log("Deep Research: Agent 1 analyzing..."), all_surveyed_urls, live_debate, "Analyzing...", [], history, gr.update(), "๐Ÿ”ฌ Agent 1...", ) ra1_findings = call_gemini( api_key, f"Analyze raw data for '{topic}': {all_broad_data}. Extract core facts.", primary_model=primary_model, ) yield ( update_log("Deep Research: Agent 2 cross-referencing..."), all_surveyed_urls, live_debate, "Cross-referencing...", [], history, gr.update(), "๐Ÿ” Agent 2...", ) deep_data, deep_urls = search_web( api_key, f"{topic} critical analysis", time_limit, primary_model, max_results=2, ) if deep_urls and "โš ๏ธ" not in deep_urls: all_surveyed_urls += "\n\n\n\n**Deep Search Results:**\n\n" + deep_urls master_research = call_gemini( api_key, f"Review Agent 1: {ra1_findings}. Cross-reference with: {deep_data}. Output verified master summary.", primary_model=primary_model, ) tone = "Tone: Use simple, layman terms. Be rational and constructive." yield ( update_log("Debate Round 1..."), all_surveyed_urls, live_debate, "Debating...", [], history, gr.update(), "โš–๏ธ Debate R1...", ) da1_r1 = call_gemini( api_key, f"Debate AI 1: Propose an answer to '{topic}' using: {master_research}. Under 100 words. {tone}", primary_model=primary_model, ) live_debate += f"**๐Ÿค– AI 1 (Proposal):**\n{da1_r1}\n\n" da2_r1 = call_gemini( api_key, f"Debate AI 2: Review AI 1's draft: {da1_r1}. Point out missing context. Under 100 words. {tone}", primary_model=primary_model, ) live_debate += f"**๐Ÿง AI 2 (Critique):**\n{da2_r1}\n\n" yield ( update_log("Debate Round 2..."), all_surveyed_urls, live_debate, "Debating...", [], history, gr.update(), "โš–๏ธ Debate R2...", ) da1_r2 = call_gemini( api_key, f"Debate AI 1: Refine based on AI 2's review: {da2_r1}. Under 100 words. {tone}", primary_model=primary_model, ) live_debate += f"**๐Ÿค– AI 1 (Refinement):**\n{da1_r2}\n\n" da2_r2 = call_gemini( api_key, f"Debate AI 2: Final check on AI 1's revision: {da1_r2}. Under 100 words. {tone}", primary_model=primary_model, ) live_debate += f"**๐Ÿง AI 2 (Final Check):**\n{da2_r2}\n\n" yield ( update_log("Master Orchestrator drafting output..."), all_surveyed_urls, live_debate, "Drafting Final Report...", [], history, gr.update(), "๐Ÿ“ Synthesizing...", ) final_prompt = f"""You are the Final Orchestrator. Review this debate for topic '{topic}': AI 1: {da1_r2} AI 2: {da2_r2} Create the final intelligence report. RULES: 1. Tone: Simple, layman-friendly. Use examples and analogies. 2. Formatting: Beautiful Markdown (headers, bullet points, tables if applicable). 3. End with '### ๐Ÿ“š Verified Resources' with clickable markdown links.""" final_answer = call_gemini(api_key, final_prompt, primary_model=primary_model) debate_display = live_debate if actual_mode != QUICK_MODE else DEBATE_SKIPPED yield ( update_log("Final text generated."), all_surveyed_urls, debate_display, final_answer, [], history, gr.update(), "โœ… Report ready", ) # 4. Visualizations if num_viz > 0: yield ( update_log(f"Generating {num_viz} visualization(s)..."), all_surveyed_urls, debate_display, final_answer, [], history, gr.update(), "๐Ÿ“Š Generating charts...", ) gallery_images = generate_visualizations( api_key, topic, all_broad_data, num_charts=num_viz, primary_model=primary_model, ) yield ( update_log(f"{len(gallery_images)} visualization(s) generated!"), all_surveyed_urls, debate_display, final_answer, gallery_images, history, gr.update(), "โœ… Charts ready", ) # 5. Complete yield ( update_log("All Operations Completed Successfully!"), all_surveyed_urls, debate_display, final_answer, gallery_images, history, gr.update(), "โœ… Done!", ) history.append( { "topic": topic, "log": "\n".join(log), "urls": all_surveyed_urls, "debate": debate_display, "final": final_answer, "charts": gallery_images, } ) yield ( "\n".join(log), all_surveyed_urls, debate_display, final_answer, gallery_images, history, gr.update(choices=[h["topic"] for h in history]), "โœ… Done!", ) def load_from_history(selected_topic, history): for item in history: if item["topic"] == selected_topic: return ( item["log"], item["urls"], item["debate"], item["final"], item.get("charts", []), ) return "", "", "", "No history found.", [] # ๐Ÿ–ฅ๏ธ Responsive Dashboard UI with gr.Blocks(title="AI Research Hub") as app: history_state = gr.State([]) gr.Markdown("# ๐Ÿ” Multi-Agent Research Hub") gr.Markdown( "*Native Google AI Grounding ยท Auto-Routing ยท Live Debates ยท Multi-Viz Analytics*" ) with gr.Row(elem_classes=["main-row"]): with gr.Column(scale=1, min_width=220, elem_classes=["sidebar-col"]): gr.Markdown("### ๐Ÿงญ Sidebar") with gr.Accordion("๐Ÿ”‘ API Key", open=True): api_key = gr.Textbox( label="Gemini API Key", type="password", placeholder="AIzaSy...", show_label=False, ) with gr.Accordion("๐Ÿ“‹ Quick Actions", open=True): export_btn = gr.Button( "๐Ÿ“ฅ Export Report", variant="secondary", size="sm" ) export_file = gr.File(label="Download", visible=True, interactive=False) clear_btn = gr.Button("๐Ÿ—‘๏ธ Clear Outputs", variant="secondary", size="sm") with gr.Accordion("๐ŸŽจ Custom Visualization", open=False): custom_viz_prompt = gr.Textbox( label="Describe your chart", placeholder="e.g. Pie chart of global energy sources", lines=2, ) custom_viz_btn = gr.Button("๐Ÿ“Š Generate", variant="primary", size="sm") custom_viz_gallery = gr.Gallery( label="Custom Charts", columns=1, height=200, object_fit="contain", interactive=False, ) with gr.Accordion("๐Ÿ•ฐ๏ธ History", open=False): history_dropdown = gr.Dropdown(label="Past Queries", choices=[]) load_history_btn = gr.Button("๐Ÿ“‚ Load", variant="secondary", size="sm") with gr.Column(scale=5, min_width=400): with gr.Row(): topic = gr.Textbox( label="๐Ÿ” Research Topic", placeholder="Enter any topic to research...", lines=2, scale=3, ) with gr.Column(scale=1, min_width=180): model_select = gr.Dropdown( choices=GEMINI_MODELS, value=GEMINI_MODELS[0], label="๐Ÿค– Primary Model", ) mode = gr.Radio( ["Auto", QUICK_MODE, DEEP_MODE], value="Auto", label="๐Ÿง  Mode" ) with gr.Row(): time_limit = gr.Dropdown( ["All time", "Past year", "Past month", "Past week", "Today"], value="All time", label="๐Ÿ“… Time Cutoff", scale=1, ) num_viz = gr.Slider( minimum=0, maximum=3, step=1, value=1, label="๐Ÿ“Š Visualizations", scale=1, ) submit_btn = gr.Button( "๐Ÿš€ Start Research", variant="primary", size="lg", scale=1 ) status_bar = gr.Textbox( show_label=False, interactive=False, lines=1, placeholder="Ready to research...", ) with gr.Row(elem_classes=["main-row"]): with gr.Column(scale=1, min_width=280): with gr.Accordion("๐Ÿค– Workflow Logs", open=True): progress_box = gr.Textbox( show_label=False, lines=8, interactive=False ) with gr.Column(scale=1, min_width=280): with gr.Accordion("๐ŸŒ Grounded Resources", open=True): surveyed_sites = gr.Markdown( "*Web URLs will appear here...*", elem_classes=["surveyed-links"], ) with gr.Accordion("โš–๏ธ Live AI Debate", open=False): live_debate = gr.Markdown("*Debate transcript will stream here...*") gr.Markdown("") gr.Markdown("### ๐Ÿ“‘ Final Intelligence Report") final_output = gr.Markdown( "*The final synthesis will appear here...*", elem_classes=["report-body"], ) gr.Markdown("") gr.Markdown("### ๐Ÿ“Š Data Visualizations") viz_gallery = gr.Gallery( label="Generated Visualizations", columns=3, height=350, object_fit="contain", interactive=False, elem_classes=["viz-gallery"], ) submit_btn.click( orchestrate_agents, inputs=[topic, mode, time_limit, num_viz, api_key, model_select, history_state], outputs=[ progress_box, surveyed_sites, live_debate, final_output, viz_gallery, history_state, history_dropdown, status_bar, ], ) load_history_btn.click( load_from_history, inputs=[history_dropdown, history_state], outputs=[progress_box, surveyed_sites, live_debate, final_output, viz_gallery], ) export_btn.click( export_report, inputs=[final_output, surveyed_sites, live_debate], outputs=[export_file], ) clear_btn.click( clear_outputs, outputs=[ progress_box, surveyed_sites, live_debate, final_output, viz_gallery, export_file, ], ) custom_viz_btn.click( generate_custom_viz, inputs=[api_key, custom_viz_prompt, model_select], outputs=[custom_viz_gallery], ) if __name__ == "__main__": app.launch(theme=gr.themes.Soft(), css=glassy_css)