Spaces:
Sleeping
Sleeping
| import os | |
| import httpx | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class WebClient: | |
| """ | |
| Communicates with the Google Custom Search API. | |
| """ | |
| def __init__(self): | |
| self.api_key = os.getenv("GOOGLE_SEARCH_API_KEY") | |
| self.cx_id = os.getenv("GOOGLE_SEARCH_CX_ID") | |
| self.search_endpoint = "https://www.googleapis.com/customsearch/v1" | |
| async def search(self, query: str): | |
| """ | |
| Sends the query to Google Custom Search and returns search results. | |
| """ | |
| if not self.api_key or not self.cx_id: | |
| print("Web Client Error: Google Custom Search credentials not configured.") | |
| return [] | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| self.search_endpoint, | |
| params={ | |
| "key": self.api_key, | |
| "cx": self.cx_id, | |
| "q": query, | |
| "num": 5, | |
| } | |
| ) | |
| if response.status_code != 200: | |
| return [] | |
| data = response.json() | |
| items = data.get("items", []) | |
| return [ | |
| { | |
| "title": item.get("title"), | |
| "link": item.get("link"), | |
| "snippet": item.get("snippet"), | |
| } | |
| for item in items | |
| ] | |
| except Exception as e: | |
| print("Web Client Error:", e) | |
| return [] | |