Spaces:
Sleeping
Sleeping
File size: 2,011 Bytes
c16e1c9 29116ed c16e1c9 b86e5c9 c16e1c9 29116ed b86e5c9 c16e1c9 29116ed c16e1c9 b86e5c9 c16e1c9 29116ed b86e5c9 c16e1c9 29116ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
import os
import httpx
from dotenv import load_dotenv
load_dotenv()
class WebClient:
"""
Communicates with the Google Custom Search API.
"""
def __init__(self) -> None:
self.search_endpoint = "https://www.googleapis.com/customsearch/v1"
async def search(self, query: str, max_results: int = 5, region: str = "us"):
"""
Sends the query to Google Custom Search and returns search results.
"""
max_results_value = self._sanitize_max_results(max_results)
api_key = os.getenv("GOOGLE_SEARCH_API_KEY")
cx_id = os.getenv("GOOGLE_SEARCH_CX_ID")
if not api_key or not cx_id:
raise RuntimeError("Google Custom Search credentials not configured.")
params = {
"key": api_key,
"cx": cx_id,
"q": query,
"num": max_results_value,
"gl": self._sanitize_region(region),
}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(self.search_endpoint, params=params)
response.raise_for_status()
except Exception as exc:
raise RuntimeError(f"Google Custom Search request failed: {exc}") from exc
data = response.json()
items = data.get("items", [])
return [
{
"title": item.get("title"),
"link": item.get("link"),
"snippet": item.get("snippet"),
}
for item in items
]
@staticmethod
def _sanitize_max_results(value: int) -> int:
try:
return max(1, min(int(value), 10))
except (TypeError, ValueError):
raise RuntimeError("max_results must be an integer between 1 and 10.")
@staticmethod
def _sanitize_region(region: str) -> str:
region_value = (region or "us").lower().split("-", 1)[0]
if len(region_value) != 2:
return "us"
return region_value
|