| # modules/web_tools.py | |
| import os | |
| import aiohttp | |
| from bs4 import BeautifulSoup | |
| SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
| async def search_web(query, language="en"): | |
| url = "https://google.serper.dev/search" | |
| headers = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"} | |
| payload = { | |
| "q": query, | |
| "gl": language, | |
| "hl": language, | |
| "num": 3 | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(url, headers=headers, json=payload) as resp: | |
| data = await resp.json() | |
| return [r["link"] for r in data.get("organic", []) if "link" in r] | |
| async def summarize_url(url): | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, timeout=10) as resp: | |
| html = await resp.text() | |
| soup = BeautifulSoup(html, "html.parser") | |
| paragraphs = soup.find_all("p") | |
| text = " ".join(p.get_text() for p in paragraphs[:5]) | |
| return {"url": url, "summary": text[:1000]} | |
| except Exception as e: | |
| return {"url": url, "summary": f"Failed to summarize due to: {e}"} | |