# -*- coding: utf-8 -*- """ OpenResearcher DeepSearch Agent - Hugging Face Space Uses ZeroGPU for efficient inference with the Nemotron model Aligned with app_local.py frontend and logic """ import os import gradio as gr import httpx import json import json5 import re import time import html import asyncio from datetime import datetime from typing import List, Dict, Any, Optional, Tuple, Generator import traceback import base64 from transformers import AutoTokenizer try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # ============================================================ # Configuration # ============================================================ MODEL_NAME = os.getenv("MODEL_NAME", "OpenResearcher/Nemotron-3-Nano-30B-A3B") REMOTE_API_BASE = os.getenv("REMOTE_API_BASE", "") SERPER_API_KEY = os.getenv("SERPER_API_KEY", "") MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "4096")) # Safe limit for ZeroGPU # ============================================================ # System Prompt & Tools # ============================================================ DEVELOPER_CONTENT = """ You are a helpful assistant and harmless assistant. You will be able to use a set of browsering tools to answer user queries. Tool for browsing. The `cursor` appears in brackets before each browsing display: `[{cursor}]`. Cite information from the tool using the following format: `【{cursor}†L{line_start}(-L{line_end})?】`, for example: `【6†L9-L11】` or `【8†L3】`. Do not quote more than 10 words directly from the tool output. sources=web """.strip() TOOL_CONTENT = """ [ { "type": "function", "function": { "name": "browser.search", "description": "Searches for information related to a query and displays top N results. Returns a list of search results with titles, URLs, and summaries.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query string" }, "topn": { "type": "integer", "description": "Number of results to display", "default": 10 } }, "required": [ "query" ] } } }, { "type": "function", "function": { "name": "browser.open", "description": "Opens a link from the current page or a fully qualified URL. Can scroll to a specific location and display a specific number of lines. Valid link ids are displayed with the formatting: 【{id}†.*】.", "parameters": { "type": "object", "properties": { "id": { "type": [ "integer", "string" ], "description": "Link id from current page (integer) or fully qualified URL (string). Default is -1 (most recent page)", "default": -1 }, "cursor": { "type": "integer", "description": "Page cursor to operate on. If not provided, the most recent page is implied", "default": -1 }, "loc": { "type": "integer", "description": "Starting line number. If not provided, viewport will be positioned at the beginning or centered on relevant passage", "default": -1 }, "num_lines": { "type": "integer", "description": "Number of lines to display", "default": -1 }, "view_source": { "type": "boolean", "description": "Whether to view page source", "default": false }, "source": { "type": "string", "description": "The source identifier (e.g., 'web')" } }, "required": [] } } }, { "type": "function", "function": { "name": "browser.find", "description": "Finds exact matches of a pattern in the current page or a specified page by cursor.", "parameters": { "type": "object", "properties": { "pattern": { "type": "string", "description": "The exact text pattern to search for" }, "cursor": { "type": "integer", "description": "Page cursor to search in. If not provided, searches in the current page", "default": -1 } }, "required": [ "pattern" ] } } } ] """.strip() # ============================================================ # Browser Tool Implementation # ============================================================ class SimpleBrowser: """Browser tool using Serper API.""" def __init__(self, serper_key: str): self.serper_key = serper_key self.pages: Dict[str, Dict] = {} self.page_stack: List[str] = [] self.link_map: Dict[int, Dict] = {} # Map from cursor ID (int) to {url, title} self.used_citations = [] # List of cursor IDs (int) in order of first appearance @property def current_cursor(self) -> int: return len(self.page_stack) - 1 def add_link(self, cursor: int, url: str, title: str = ""): self.link_map[cursor] = {'url': url, 'title': title} def get_link_info(self, cursor: int) -> Optional[dict]: return self.link_map.get(cursor) def get_citation_index(self, cursor: int) -> int: if cursor not in self.used_citations: self.used_citations.append(cursor) return self.used_citations.index(cursor) + 1 # Start from 1 instead of 0 def get_page_info(self, cursor: int) -> Optional[Dict[str, str]]: # Prioritize link_map as it stores search result metadata if cursor in self.link_map: return self.link_map[cursor] # Fallback to page_stack for opened pages if 0 <= cursor < len(self.page_stack): url = self.page_stack[cursor] page = self.pages.get(url) if page: return {'url': url, 'title': page.get('title', ''), 'snippet': ''} return None def _format_line_numbers(self, text: str, offset: int = 0) -> str: lines = text.split('\n') return '\n'.join(f"L{i + offset}: {line}" for i, line in enumerate(lines)) def _clean_links(self, results: List[Dict], query: str) -> Tuple[str, Dict[int, str]]: link_map = {} lines = [] for i, r in enumerate(results): title = html.escape(r.get('title', 'No Title')) url = r.get('link', r.get('url', '')) snippet = html.escape(r.get('snippet', r.get('summary', ''))) try: domain = url.split('/')[2] if url else '' except: domain = '' try: domain = url.split('/')[2] if url else '' except: domain = '' # Store snippet information as well self.link_map[i] = {'url': url, 'title': title, 'snippet': snippet} link_map[i] = {'url': url, 'title': title, 'snippet': snippet} link_text = f"【{i}†{title}†{domain}】" if domain else f"【{i}†{title}】" lines.append(f"{link_text}") lines.append(f" {snippet}") lines.append("") return '\n'.join(lines), link_map async def search(self, query: str, topn: int = 10) -> str: url = "https://google.serper.dev/search" headers = {'X-API-KEY': self.serper_key, 'Content-Type': 'application/json'} payload = json.dumps({"q": query, "num": topn}) async with httpx.AsyncClient() as client: try: response = await client.post(url, headers=headers, data=payload, timeout=20.0) if response.status_code != 200: return f"Error: Search failed with status {response.status_code}" data = response.json() results = data.get("organic", []) if not results: return f"No results found for: '{query}'" content, new_link_map = self._clean_links(results, query) self.link_map.update(new_link_map) # Merge new links pseudo_url = f"web-search://q={query}&ts={int(time.time())}" cursor = self.current_cursor + 1 page_data = { 'url': pseudo_url, 'title': f"Search Results: {query}", 'text': content, 'urls': {str(k): v['url'] for k, v in new_link_map.items()} } self.pages[pseudo_url] = page_data self.page_stack.append(pseudo_url) header = f"{page_data['title']} ({pseudo_url})\n**viewing lines [0 - {len(content.split(chr(10)))-1}]**\n\n" body = self._format_line_numbers(content) return f"[{cursor}] {header}{body}" except Exception as e: return f"Error during search: {str(e)}" async def open(self, id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, **kwargs) -> str: target_url = None if isinstance(id, str) and id.startswith("http"): target_url = id elif isinstance(id, int) and id >= 0: info = self.link_map.get(id) target_url = info['url'] if info else None if not target_url: return f"Error: Invalid link id '{id}'. Available: {list(self.link_map.keys())}" elif cursor >= 0 and cursor < len(self.page_stack): page_url = self.page_stack[cursor] page = self.pages.get(page_url) if page: text = page['text'] lines = text.split('\n') start = max(0, loc) if loc >= 0 else 0 end = min(len(lines), start + num_lines) if num_lines > 0 else len(lines) header = f"{page['title']} ({page['url']})\n**viewing lines [{start} - {end-1}] of {len(lines)-1}**\n\n" body = self._format_line_numbers('\n'.join(lines[start:end]), offset=start) return f"[{cursor}] {header}{body}" else: return "Error: No valid target specified" if not target_url: return "Error: Could not determine target URL" headers = {'X-API-KEY': self.serper_key, 'Content-Type': 'application/json'} payload = json.dumps({"url": target_url}) async with httpx.AsyncClient() as client: try: response = await client.post("https://scrape.serper.dev/", headers=headers, data=payload, timeout=30.0) if response.status_code != 200: return f"Error fetching URL: {response.status_code}" data = response.json() text = data.get("text", "") title = data.get("metadata", {}).get("title", "") if isinstance(data.get("metadata"), dict) else "" if not text: return f"No content found at URL" lines = text.split('\n') content = '\n'.join(lines) max_lines = 150 if len(lines) > max_lines: content = '\n'.join(lines[:max_lines]) + "\n\n...(content truncated)..." new_cursor = self.current_cursor + 1 page_data = { 'url': target_url, 'title': title or target_url, 'text': content, 'urls': {} } self.pages[target_url] = page_data self.page_stack.append(target_url) start = max(0, loc) if loc >= 0 else 0 display_lines = content.split('\n') end = min(len(display_lines), start + num_lines) if num_lines > 0 else len(display_lines) header = f"{title or target_url} ({target_url})\n**viewing lines [{start} - {end-1}] of {len(display_lines)-1}**\n\n" body = self._format_line_numbers('\n'.join(display_lines[start:end]), offset=start) return f"[{new_cursor}] {header}{body}" except Exception as e: return f"Error fetching URL: {str(e)}" def find(self, pattern: str, cursor: int = -1) -> str: if not self.page_stack: return "Error: No page open" page_url = self.page_stack[cursor] if cursor >= 0 and cursor < len(self.page_stack) else self.page_stack[-1] page = self.pages.get(page_url) if not page: return "Error: Page not found" text = page['text'] lines = text.split('\n') matches = [] for i, line in enumerate(lines): if str(pattern).lower() in line.lower(): start = max(0, i - 1) end = min(len(lines), i + 3) context = '\n'.join(f"L{j}: {lines[j]}" for j in range(start, end)) matches.append(f"# 【{len(matches)}†match at L{i}】\n{context}") if len(matches) >= 10: break if not matches: return f"No matches found for: '{pattern}'" result_url = f"{page_url}/find?pattern={pattern}" new_cursor = self.current_cursor + 1 result_content = '\n\n'.join(matches) page_data = { 'url': result_url, 'title': f"Find results for: '{pattern}'", 'text': result_content, 'urls': {} } self.pages[result_url] = page_data self.page_stack.append(result_url) header = f"Find results for text: `{pattern}` in `{page['title']}`\n\n" return f"[{new_cursor}] {header}{result_content}" def get_cursor_url(self, cursor: int) -> Optional[str]: if cursor >= 0 and cursor < len(self.page_stack): return self.page_stack[cursor] return None # ============================================================ # Tokenizer Loading # ============================================================ tokenizer = None def load_tokenizer(): global tokenizer if tokenizer is None: print(f"Loading tokenizer: {MODEL_NAME}") try: tokenizer = AutoTokenizer.from_pretrained( MODEL_NAME, trust_remote_code=True ) print("Tokenizer loaded successfully!") except Exception as e: print(f"Error loading tokenizer: {e}") import traceback traceback.print_exc() raise return tokenizer # ============================================================ # Text Processing # ============================================================ def extract_thinking(text: str) -> Tuple[Optional[str], str]: reasoning_content = None content = text if '' in content and '' in content: match = re.search(r'(.*?)', content, re.DOTALL) if match: reasoning_content = match.group(1).strip() content = content.replace(match.group(0), "").strip() elif '' in content: match = re.search(r'^(.*?)', content, re.DOTALL) if match: reasoning_content = match.group(1).strip() content = content.replace(match.group(0), "").strip() return reasoning_content, content def parse_tool_call(text: str) -> Tuple[Optional[Dict], str]: tool_call_text = None content = text if '' in content and '' in content: match = re.search(r'(.*?)', content, re.DOTALL) if match: tool_call_text = match.group(1).strip() content = content.replace(match.group(0), "").strip() elif '' in content: match = re.search(r'^(.*?)', content, re.DOTALL) if match: tool_call_text = match.group(1).strip() content = content.replace(match.group(0), "").strip() if tool_call_text: try: if "```json" in tool_call_text: tool_call_text = tool_call_text.split("```json")[1].split("```")[0].strip() elif "```" in tool_call_text: tool_call_text = tool_call_text.split("```")[1].split("```")[0].strip() parsed = json5.loads(tool_call_text) return parsed, content except: pass func_match = re.search(r'', tool_call_text) if func_match: tool_name = func_match.group(1) tool_args = {} params = re.finditer(r'\s*(.*?)\s*', tool_call_text, re.DOTALL) for p in params: param_name = p.group(1) param_value = p.group(2).strip() if param_value.startswith('"') and param_value.endswith('"'): param_value = param_value[1:-1] try: if param_value.isdigit(): param_value = int(param_value) except: pass tool_args[param_name] = param_value return {"name": tool_name, "arguments": tool_args}, content return None, content def is_final_answer(text: str) -> bool: t = text.lower() return ( ('' in t and '' in t) or 'final answer:' in t or ('exact answer:' in t and 'confidence:' in t) ) # ============================================================ # HTML Rendering Helpers (From app_local.py) # ============================================================ def render_citations(text: str, browser: SimpleBrowser) -> str: """Convert citation markers to clickable HTML links with tooltips.""" # Store citation HTML to protect from markdown conversion citation_store = {} citation_counter = [0] def replace_citation(m): cursor_str = m.group(1) full_match = m.group(0) # Get the full match to extract line info # Extract line information from the citation marker # Format: 【{cursor}†L{line_start}(-L{line_end})?】 line_info = "" line_match = re.search(r'†(L\d+(?:-L\d+)?)', full_match) if line_match: line_info = line_match.group(1) try: cursor = int(cursor_str) index = browser.get_citation_index(cursor) # Check if we have URL info info = browser.get_page_info(cursor) if info and info.get('url'): url = info.get('url', '') title = info.get('title', 'No Title') snippet = info.get('snippet', '') # Unescape HTML entities and remove newlines to prevent rendering issues title_display = html.unescape(title).replace('\n', ' ').replace('\r', '').strip() snippet_display = html.unescape(snippet).replace('\n', ' ').replace('\r', '').strip() if snippet else 'No description available' # Extract domain from URL try: domain = url.split('/')[2] if len(url.split('/')) > 2 else url except: domain = url # Add line info if available line_html = "" if line_info: line_html = f'
📍 {line_info}
' # Create citation with tooltip (single line to avoid markdown conversion issues) tooltip_html = f'
{title_display}
{line_html}
{snippet_display}
🔗 {html.escape(domain)}
' citation_html = f'[{index}]{tooltip_html}' # Store citation HTML and return placeholder placeholder = f'___CITATION_{citation_counter[0]}___' citation_store[placeholder] = citation_html citation_counter[0] += 1 return placeholder # Fallback if no URL return f'[{index}]' except Exception as e: # print(f"Error in replace_citation: {e}, match: {m.group(0)}") pass return m.group(0) # First pass: replace citations with placeholders result = re.sub(r'[【\[](\d+)†.*?[】\]]', replace_citation, text) # Second pass: Remove standalone URLs that appear after text (common pattern) # This removes URLs that directly follow sentences without proper citation result = re.sub(r'(?<=[.!?])\s+(https?://[^\s]+)', '', result) # Third pass: Convert basic markdown to HTML result = re.sub(r'\*\*(.+?)\*\*', r'\1', result) result = re.sub(r'\*(.+?)\*', r'\1', result) result = re.sub(r'`(.+?)`', r'\1', result) result = result.replace('\n\n', '

').replace('\n', '
') if not result.startswith('

'): result = f'

{result}

' # Fourth pass: Restore citation HTML from placeholders for placeholder, citation_html in citation_store.items(): result = result.replace(placeholder, citation_html) # Fifth pass: Deduplicate adjacent identical citations while True: new_result = re.sub(r'(.*?)(\s*)\1', r'\1', result) if new_result == result: break result = new_result return result def render_thinking_streaming(text: str) -> str: """Render thinking content in streaming mode (visible, with animation).""" escaped = html.escape(text) return f'
{escaped}
' def render_thinking_collapsed(text: str) -> str: """Render thinking content in collapsed mode after completion.""" escaped = html.escape(text) preview = text[:100] + "..." if len(text) > 100 else text preview_escaped = html.escape(preview) return f'''
Thought process: "{preview_escaped}"
{escaped}
''' def render_tool_call(fn_name: str, args: dict, browser: SimpleBrowser = None) -> str: """Render a tool call card with unified format and subtle distinction.""" border_colors = { "browser.search": "#667eea", "browser.open": "#4facfe", "browser.find": "#fa709a" } border_color = border_colors.get(fn_name, "#9ca3af") if fn_name == "browser.search": query = str(args.get('query', '')) return f'''
Searching the web
Query: "{html.escape(query)}"
''' elif fn_name == "browser.open": link_id = args.get('id', '') url_info = "" if browser and isinstance(link_id, int) and link_id >= 0: info = browser.link_map.get(link_id) url = info.get('url', "") if info else "" if url: try: domain = url.split('/')[2] url_info = f" → {domain}" except: url_info = "" return f'''
Opening page
Link #{link_id}{url_info}
''' elif fn_name == "browser.find": pattern = str(args.get('pattern', '')) return f'''
Finding in page
Pattern: "{html.escape(pattern)}"
''' else: return f'''
{html.escape(str(fn_name))}
{html.escape(json.dumps(args))}
''' def render_tool_result(result: str, fn_name: str) -> str: """Render tool result in an expanded card with direct HTML rendering.""" import uuid tool_label = { "browser.search": "🔍 Search Results", "browser.open": "📄 Page Content", "browser.find": "🔎 Find Results" }.get(fn_name, "📋 Result") border_colors = { "browser.search": "#667eea", "browser.open": "#4facfe", "browser.find": "#86efac" } border_color = border_colors.get(fn_name, "#9ca3af") # ===== SEARCH RESULTS ===== if fn_name == "browser.search" and "" in result and "