deep-agent / tools /web.py
Shami96's picture
Upload tools/web.py with huggingface_hub
09674d0 verified
"""Web tools — search and fetch."""
import os
import re
import json
from typing import Optional
async def web_search(
query: str,
max_results: int = 5,
topic: str = "general",
) -> dict:
"""Search the web using Tavily API.
Args:
query: Search query
max_results: Number of results (default 5)
topic: "general", "news", or "finance"
"""
import httpx
key = os.environ.get("TAVILY_API_KEY")
if not key:
return {"error": "TAVILY_API_KEY not configured", "results": []}
async with httpx.AsyncClient(timeout=15) as client:
res = await client.post(
"https://api.tavily.com/search",
json={
"api_key": key,
"query": query,
"max_results": max_results,
"search_depth": "basic",
"topic": topic,
},
)
if res.status_code != 200:
return {"error": f"Tavily API error: {res.status_code}", "results": []}
data = res.json()
results = []
for r in data.get("results", []):
results.append({
"title": r.get("title", ""),
"url": r.get("url", ""),
"content": r.get("content", "")[:500],
"score": r.get("score"),
})
return {"query": query, "results": results}
async def web_fetch(url: str) -> dict:
"""Fetch a URL and return content as clean text.
Args:
url: The URL to fetch
"""
import httpx
try:
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client:
res = await client.get(
url,
headers={"User-Agent": "ShamiAgent/1.0 (AI research agent)"},
)
if res.status_code != 200:
return {"error": f"HTTP {res.status_code}", "url": url}
content_type = res.headers.get("content-type", "")
text = res.text
if "text/html" in content_type:
text = _html_to_text(text)
# Truncate
if len(text) > 15000:
text = text[:15000] + "\n\n[truncated]"
return {"url": str(res.url), "content": text, "length": len(text)}
except Exception as e:
return {"error": str(e), "url": url}
def _html_to_text(html: str) -> str:
"""Convert HTML to readable text preserving structure."""
text = html
# Remove noise
for tag in ["script", "style", "nav", "footer", "header"]:
text = re.sub(f"<{tag}[\\s\\S]*?</{tag}>", "", text, flags=re.IGNORECASE)
# Convert structure to markdown
text = re.sub(r"<h1[^>]*>([\s\S]*?)</h1>", r"\n# \1\n", text, flags=re.IGNORECASE)
text = re.sub(r"<h2[^>]*>([\s\S]*?)</h2>", r"\n## \1\n", text, flags=re.IGNORECASE)
text = re.sub(r"<h3[^>]*>([\s\S]*?)</h3>", r"\n### \1\n", text, flags=re.IGNORECASE)
text = re.sub(r'<a[^>]*href="([^"]*)"[^>]*>([\s\S]*?)</a>', r"[\2](\1)", text, flags=re.IGNORECASE)
text = re.sub(r"<(strong|b)[^>]*>([\s\S]*?)</\1>", r"**\2**", text, flags=re.IGNORECASE)
text = re.sub(r"<li[^>]*>([\s\S]*?)</li>", r"\n- \1", text, flags=re.IGNORECASE)
text = re.sub(r"<br[^>]*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<\/?(p|div|section|article)[^>]*>", "\n", text, flags=re.IGNORECASE)
# Strip remaining tags
text = re.sub(r"<[^>]+>", "", text)
# Decode entities
text = text.replace("&amp;", "&").replace("&lt;", "<").replace("&gt;", ">")
text = text.replace("&quot;", '"').replace("&#39;", "'").replace("&nbsp;", " ")
# Collapse whitespace
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()