# -*- coding: utf-8 -*-
"""
OpenResearcher DeepSearch Agent - Hugging Face Space
Uses ZeroGPU for efficient inference with the Nemotron model
Aligned with app_local.py frontend and logic
"""
import os
import gradio as gr
import httpx
import json
import json5
import re
import time
import html
import asyncio
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple, Generator
import traceback
import base64
from transformers import AutoTokenizer
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# ============================================================
# Configuration
# ============================================================
MODEL_NAME = os.getenv("MODEL_NAME", "OpenResearcher/Nemotron-3-Nano-30B-A3B")
REMOTE_API_BASE = os.getenv("REMOTE_API_BASE", "")
SERPER_API_KEY = os.getenv("SERPER_API_KEY", "")
MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "4096")) # Safe limit for ZeroGPU
# ============================================================
# System Prompt & Tools
# ============================================================
DEVELOPER_CONTENT = """
You are a helpful assistant and harmless assistant.
You will be able to use a set of browsering tools to answer user queries.
Tool for browsing.
The `cursor` appears in brackets before each browsing display: `[{cursor}]`.
Cite information from the tool using the following format:
`【{cursor}†L{line_start}(-L{line_end})?】`, for example: `【6†L9-L11】` or `【8†L3】`.
Do not quote more than 10 words directly from the tool output.
sources=web
""".strip()
TOOL_CONTENT = """
[
{
"type": "function",
"function": {
"name": "browser.search",
"description": "Searches for information related to a query and displays top N results. Returns a list of search results with titles, URLs, and summaries.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query string"
},
"topn": {
"type": "integer",
"description": "Number of results to display",
"default": 10
}
},
"required": [
"query"
]
}
}
},
{
"type": "function",
"function": {
"name": "browser.open",
"description": "Opens a link from the current page or a fully qualified URL. Can scroll to a specific location and display a specific number of lines. Valid link ids are displayed with the formatting: 【{id}†.*】.",
"parameters": {
"type": "object",
"properties": {
"id": {
"type": [
"integer",
"string"
],
"description": "Link id from current page (integer) or fully qualified URL (string). Default is -1 (most recent page)",
"default": -1
},
"cursor": {
"type": "integer",
"description": "Page cursor to operate on. If not provided, the most recent page is implied",
"default": -1
},
"loc": {
"type": "integer",
"description": "Starting line number. If not provided, viewport will be positioned at the beginning or centered on relevant passage",
"default": -1
},
"num_lines": {
"type": "integer",
"description": "Number of lines to display",
"default": -1
},
"view_source": {
"type": "boolean",
"description": "Whether to view page source",
"default": false
},
"source": {
"type": "string",
"description": "The source identifier (e.g., 'web')"
}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "browser.find",
"description": "Finds exact matches of a pattern in the current page or a specified page by cursor.",
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "The exact text pattern to search for"
},
"cursor": {
"type": "integer",
"description": "Page cursor to search in. If not provided, searches in the current page",
"default": -1
}
},
"required": [
"pattern"
]
}
}
}
]
""".strip()
# ============================================================
# Browser Tool Implementation
# ============================================================
class SimpleBrowser:
"""Browser tool using Serper API."""
def __init__(self, serper_key: str):
self.serper_key = serper_key
self.pages: Dict[str, Dict] = {}
self.page_stack: List[str] = []
self.link_map: Dict[int, Dict] = {} # Map from cursor ID (int) to {url, title}
self.used_citations = [] # List of cursor IDs (int) in order of first appearance
@property
def current_cursor(self) -> int:
return len(self.page_stack) - 1
def add_link(self, cursor: int, url: str, title: str = ""):
self.link_map[cursor] = {'url': url, 'title': title}
def get_link_info(self, cursor: int) -> Optional[dict]:
return self.link_map.get(cursor)
def get_citation_index(self, cursor: int) -> int:
if cursor not in self.used_citations:
self.used_citations.append(cursor)
return self.used_citations.index(cursor) + 1 # Start from 1 instead of 0
def get_page_info(self, cursor: int) -> Optional[Dict[str, str]]:
# Prioritize link_map as it stores search result metadata
if cursor in self.link_map:
return self.link_map[cursor]
# Fallback to page_stack for opened pages
if 0 <= cursor < len(self.page_stack):
url = self.page_stack[cursor]
page = self.pages.get(url)
if page:
return {'url': url, 'title': page.get('title', ''), 'snippet': ''}
return None
def _format_line_numbers(self, text: str, offset: int = 0) -> str:
lines = text.split('\n')
return '\n'.join(f"L{i + offset}: {line}" for i, line in enumerate(lines))
def _clean_links(self, results: List[Dict], query: str) -> Tuple[str, Dict[int, str]]:
link_map = {}
lines = []
for i, r in enumerate(results):
title = html.escape(r.get('title', 'No Title'))
url = r.get('link', r.get('url', ''))
snippet = html.escape(r.get('snippet', r.get('summary', '')))
try:
domain = url.split('/')[2] if url else ''
except:
domain = ''
try:
domain = url.split('/')[2] if url else ''
except:
domain = ''
# Store snippet information as well
self.link_map[i] = {'url': url, 'title': title, 'snippet': snippet}
link_map[i] = {'url': url, 'title': title, 'snippet': snippet}
link_text = f"【{i}†{title}†{domain}】" if domain else f"【{i}†{title}】"
lines.append(f"{link_text}")
lines.append(f" {snippet}")
lines.append("")
return '\n'.join(lines), link_map
async def search(self, query: str, topn: int = 10) -> str:
url = "https://google.serper.dev/search"
headers = {'X-API-KEY': self.serper_key, 'Content-Type': 'application/json'}
payload = json.dumps({"q": query, "num": topn})
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=headers, data=payload, timeout=20.0)
if response.status_code != 200:
return f"Error: Search failed with status {response.status_code}"
data = response.json()
results = data.get("organic", [])
if not results:
return f"No results found for: '{query}'"
content, new_link_map = self._clean_links(results, query)
self.link_map.update(new_link_map) # Merge new links
pseudo_url = f"web-search://q={query}&ts={int(time.time())}"
cursor = self.current_cursor + 1
page_data = {
'url': pseudo_url,
'title': f"Search Results: {query}",
'text': content,
'urls': {str(k): v['url'] for k, v in new_link_map.items()}
}
self.pages[pseudo_url] = page_data
self.page_stack.append(pseudo_url)
header = f"{page_data['title']} ({pseudo_url})\n**viewing lines [0 - {len(content.split(chr(10)))-1}]**\n\n"
body = self._format_line_numbers(content)
return f"[{cursor}] {header}{body}"
except Exception as e:
return f"Error during search: {str(e)}"
async def open(self, id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, **kwargs) -> str:
target_url = None
if isinstance(id, str) and id.startswith("http"):
target_url = id
elif isinstance(id, int) and id >= 0:
info = self.link_map.get(id)
target_url = info['url'] if info else None
if not target_url:
return f"Error: Invalid link id '{id}'. Available: {list(self.link_map.keys())}"
elif cursor >= 0 and cursor < len(self.page_stack):
page_url = self.page_stack[cursor]
page = self.pages.get(page_url)
if page:
text = page['text']
lines = text.split('\n')
start = max(0, loc) if loc >= 0 else 0
end = min(len(lines), start + num_lines) if num_lines > 0 else len(lines)
header = f"{page['title']} ({page['url']})\n**viewing lines [{start} - {end-1}] of {len(lines)-1}**\n\n"
body = self._format_line_numbers('\n'.join(lines[start:end]), offset=start)
return f"[{cursor}] {header}{body}"
else:
return "Error: No valid target specified"
if not target_url:
return "Error: Could not determine target URL"
headers = {'X-API-KEY': self.serper_key, 'Content-Type': 'application/json'}
payload = json.dumps({"url": target_url})
async with httpx.AsyncClient() as client:
try:
response = await client.post("https://scrape.serper.dev/", headers=headers, data=payload, timeout=30.0)
if response.status_code != 200:
return f"Error fetching URL: {response.status_code}"
data = response.json()
text = data.get("text", "")
title = data.get("metadata", {}).get("title", "") if isinstance(data.get("metadata"), dict) else ""
if not text:
return f"No content found at URL"
lines = text.split('\n')
content = '\n'.join(lines)
max_lines = 150
if len(lines) > max_lines:
content = '\n'.join(lines[:max_lines]) + "\n\n...(content truncated)..."
new_cursor = self.current_cursor + 1
page_data = {
'url': target_url,
'title': title or target_url,
'text': content,
'urls': {}
}
self.pages[target_url] = page_data
self.page_stack.append(target_url)
start = max(0, loc) if loc >= 0 else 0
display_lines = content.split('\n')
end = min(len(display_lines), start + num_lines) if num_lines > 0 else len(display_lines)
header = f"{title or target_url} ({target_url})\n**viewing lines [{start} - {end-1}] of {len(display_lines)-1}**\n\n"
body = self._format_line_numbers('\n'.join(display_lines[start:end]), offset=start)
return f"[{new_cursor}] {header}{body}"
except Exception as e:
return f"Error fetching URL: {str(e)}"
def find(self, pattern: str, cursor: int = -1) -> str:
if not self.page_stack:
return "Error: No page open"
page_url = self.page_stack[cursor] if cursor >= 0 and cursor < len(self.page_stack) else self.page_stack[-1]
page = self.pages.get(page_url)
if not page:
return "Error: Page not found"
text = page['text']
lines = text.split('\n')
matches = []
for i, line in enumerate(lines):
if str(pattern).lower() in line.lower():
start = max(0, i - 1)
end = min(len(lines), i + 3)
context = '\n'.join(f"L{j}: {lines[j]}" for j in range(start, end))
matches.append(f"# 【{len(matches)}†match at L{i}】\n{context}")
if len(matches) >= 10:
break
if not matches:
return f"No matches found for: '{pattern}'"
result_url = f"{page_url}/find?pattern={pattern}"
new_cursor = self.current_cursor + 1
result_content = '\n\n'.join(matches)
page_data = {
'url': result_url,
'title': f"Find results for: '{pattern}'",
'text': result_content,
'urls': {}
}
self.pages[result_url] = page_data
self.page_stack.append(result_url)
header = f"Find results for text: `{pattern}` in `{page['title']}`\n\n"
return f"[{new_cursor}] {header}{result_content}"
def get_cursor_url(self, cursor: int) -> Optional[str]:
if cursor >= 0 and cursor < len(self.page_stack):
return self.page_stack[cursor]
return None
# ============================================================
# Tokenizer Loading
# ============================================================
tokenizer = None
def load_tokenizer():
global tokenizer
if tokenizer is None:
print(f"Loading tokenizer: {MODEL_NAME}")
try:
tokenizer = AutoTokenizer.from_pretrained(
MODEL_NAME,
trust_remote_code=True
)
print("Tokenizer loaded successfully!")
except Exception as e:
print(f"Error loading tokenizer: {e}")
import traceback
traceback.print_exc()
raise
return tokenizer
# ============================================================
# Text Processing
# ============================================================
def extract_thinking(text: str) -> Tuple[Optional[str], str]:
reasoning_content = None
content = text
if '\1', result)
result = result.replace('\n\n', '
').replace('\n', '
')
if not result.startswith('
'): result = f'
{result}
' # Fourth pass: Restore citation HTML from placeholders for placeholder, citation_html in citation_store.items(): result = result.replace(placeholder, citation_html) # Fifth pass: Deduplicate adjacent identical citations while True: new_result = re.sub(r'(.*?)(\s*)\1', r'\1', result) if new_result == result: break result = new_result return result def render_thinking_streaming(text: str) -> str: """Render thinking content in streaming mode (visible, with animation).""" escaped = html.escape(text) return f'')
formatted_result = formatted_result.replace('\n', '
')
if not formatted_result.startswith('
{formatted_result}
' max_length = 5000 if len(result) > max_length: formatted_result = formatted_result[:max_length] + 'Please enter a question to begin.
" return if not serper_key: yield """""" return # Load tokenizer for prompt formatting try: load_tokenizer() except Exception as e: yield f"Error loading tokenizer: {html.escape(str(e))}
" return browser = SimpleBrowser(serper_key) tools = json.loads(TOOL_CONTENT) system_prompt = DEVELOPER_CONTENT + f"\n\nToday's date: {datetime.now().strftime('%Y-%m-%d')}" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": question} ] stop_strings = ["\nGeneration Error: {html.escape(str(e))}
") yield ''.join(html_parts) return for stop_str in stop_strings: if stop_str in generated: generated = generated[:generated.find(stop_str)] reasoning, content = extract_thinking(generated) tool_call, clean_content = parse_tool_call(content) if reasoning: html_parts.append(render_thinking_collapsed(reasoning)) yield ''.join(html_parts) if tool_call: fn_name = tool_call.get("name", "unknown") args = tool_call.get("arguments", {}) html_parts.append(render_tool_call(fn_name, args, browser)) yield ''.join(html_parts) if clean_content.strip() and not tool_call: rendered = render_citations(clean_content, browser) html_parts.append(f'Error: {html.escape(str(e))}
{html.escape(tb)}I am OpenResearcher, a leading open-source Deep Research Agent, welcome to try!
Due to high traffic, if your submission has no response, please refresh the page and resubmit. Thank you!
Ask any question and I'll search the web to find answers