Spaces:
Sleeping
Sleeping
| """Web tools — search and fetch.""" | |
| import os | |
| import re | |
| import json | |
| from typing import Optional | |
| async def web_search( | |
| query: str, | |
| max_results: int = 5, | |
| topic: str = "general", | |
| ) -> dict: | |
| """Search the web using Tavily API. | |
| Args: | |
| query: Search query | |
| max_results: Number of results (default 5) | |
| topic: "general", "news", or "finance" | |
| """ | |
| import httpx | |
| key = os.environ.get("TAVILY_API_KEY") | |
| if not key: | |
| return {"error": "TAVILY_API_KEY not configured", "results": []} | |
| async with httpx.AsyncClient(timeout=15) as client: | |
| res = await client.post( | |
| "https://api.tavily.com/search", | |
| json={ | |
| "api_key": key, | |
| "query": query, | |
| "max_results": max_results, | |
| "search_depth": "basic", | |
| "topic": topic, | |
| }, | |
| ) | |
| if res.status_code != 200: | |
| return {"error": f"Tavily API error: {res.status_code}", "results": []} | |
| data = res.json() | |
| results = [] | |
| for r in data.get("results", []): | |
| results.append({ | |
| "title": r.get("title", ""), | |
| "url": r.get("url", ""), | |
| "content": r.get("content", "")[:500], | |
| "score": r.get("score"), | |
| }) | |
| return {"query": query, "results": results} | |
| async def web_fetch(url: str) -> dict: | |
| """Fetch a URL and return content as clean text. | |
| Args: | |
| url: The URL to fetch | |
| """ | |
| import httpx | |
| try: | |
| async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: | |
| res = await client.get( | |
| url, | |
| headers={"User-Agent": "ShamiAgent/1.0 (AI research agent)"}, | |
| ) | |
| if res.status_code != 200: | |
| return {"error": f"HTTP {res.status_code}", "url": url} | |
| content_type = res.headers.get("content-type", "") | |
| text = res.text | |
| if "text/html" in content_type: | |
| text = _html_to_text(text) | |
| # Truncate | |
| if len(text) > 15000: | |
| text = text[:15000] + "\n\n[truncated]" | |
| return {"url": str(res.url), "content": text, "length": len(text)} | |
| except Exception as e: | |
| return {"error": str(e), "url": url} | |
| def _html_to_text(html: str) -> str: | |
| """Convert HTML to readable text preserving structure.""" | |
| text = html | |
| # Remove noise | |
| for tag in ["script", "style", "nav", "footer", "header"]: | |
| text = re.sub(f"<{tag}[\\s\\S]*?</{tag}>", "", text, flags=re.IGNORECASE) | |
| # Convert structure to markdown | |
| text = re.sub(r"<h1[^>]*>([\s\S]*?)</h1>", r"\n# \1\n", text, flags=re.IGNORECASE) | |
| text = re.sub(r"<h2[^>]*>([\s\S]*?)</h2>", r"\n## \1\n", text, flags=re.IGNORECASE) | |
| text = re.sub(r"<h3[^>]*>([\s\S]*?)</h3>", r"\n### \1\n", text, flags=re.IGNORECASE) | |
| text = re.sub(r'<a[^>]*href="([^"]*)"[^>]*>([\s\S]*?)</a>', r"[\2](\1)", text, flags=re.IGNORECASE) | |
| text = re.sub(r"<(strong|b)[^>]*>([\s\S]*?)</\1>", r"**\2**", text, flags=re.IGNORECASE) | |
| text = re.sub(r"<li[^>]*>([\s\S]*?)</li>", r"\n- \1", text, flags=re.IGNORECASE) | |
| text = re.sub(r"<br[^>]*>", "\n", text, flags=re.IGNORECASE) | |
| text = re.sub(r"<\/?(p|div|section|article)[^>]*>", "\n", text, flags=re.IGNORECASE) | |
| # Strip remaining tags | |
| text = re.sub(r"<[^>]+>", "", text) | |
| # Decode entities | |
| text = text.replace("&", "&").replace("<", "<").replace(">", ">") | |
| text = text.replace(""", '"').replace("'", "'").replace(" ", " ") | |
| # Collapse whitespace | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |