Spaces:
Runtime error
Runtime error
| import requests | |
| from bs4 import BeautifulSoup | |
| from smolagents.tools import Tool | |
| import re | |
| import json | |
| import logging | |
| import time | |
| from urllib.parse import urlparse, urljoin | |
| import pandas as pd | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class WebBrowser(Tool): | |
| """ | |
| Retrieves information from online sources by browsing web pages. | |
| Useful for extracting or summarizing web content, with special handling for structured data. | |
| Can extract tables, lists, and key information from web pages. | |
| """ | |
| name = "web_browser" | |
| description = "Fetches content from web pages with improved structured data handling. Has specialized extraction for Wikipedia. Returns text content or structured data." | |
| inputs = { | |
| 'url': {'type': 'string', 'description': 'The URL of the web page to browse.'}, | |
| 'extraction_mode': {'type': 'string', 'description': 'Mode for data extraction: "text" (default), "tables", "lists", or "structured".', 'nullable': True} | |
| } | |
| outputs = {'content': {'type': 'object', 'description': 'The extracted content from the web page, either as text or structured data.'}} | |
| output_type = "object" | |
| def __init__(self, user_agent="GAIA-Agent/1.0", *args, **kwargs): | |
| """ | |
| Initializes the web browser with a user agent. | |
| Args: | |
| user_agent (str): The User-Agent string to use for requests. | |
| """ | |
| super().__init__(*args, **kwargs) | |
| self.headers = {"User-Agent": user_agent} | |
| self.is_initialized = True | |
| # Add a session to maintain cookies | |
| self.session = requests.Session() | |
| self.session.headers.update(self.headers) | |
| def forward(self, url: str, extraction_mode: str = "text") -> dict: | |
| """ | |
| Fetches the content of a web page and extracts information based on the specified mode. | |
| Args: | |
| url (str): The URL of the web page to browse. | |
| extraction_mode (str): The mode for data extraction - "text" (default), "tables", "lists", or "structured" | |
| Returns: | |
| dict: The extracted content or an error message | |
| """ | |
| # Validate URL | |
| if not url.startswith(('http://', 'https://')): | |
| return {"error": f"Invalid URL format. URL must start with http:// or https://. Received: {url}"} | |
| try: | |
| # Check if it's Wikipedia and use special handling | |
| if 'wikipedia.org' in url: | |
| return self._handle_wikipedia(url, extraction_mode) | |
| # Process normal web pages | |
| return self._process_regular_webpage(url, extraction_mode) | |
| except requests.exceptions.HTTPError as http_err: | |
| return {"error": f"HTTP error occurred while fetching {url}: {http_err}"} | |
| except requests.exceptions.ConnectionError as conn_err: | |
| return {"error": f"Connection error occurred while fetching {url}: {conn_err}"} | |
| except requests.exceptions.Timeout as timeout_err: | |
| return {"error": f"Timeout occurred while fetching {url}: {timeout_err}"} | |
| except requests.exceptions.RequestException as req_err: | |
| return {"error": f"An unexpected error occurred while fetching {url}: {req_err}"} | |
| except Exception as e: | |
| return {"error": f"An unexpected error occurred during parsing of {url}: {e}"} | |
| def _process_regular_webpage(self, url, extraction_mode): | |
| """Process a regular (non-Wikipedia) webpage""" | |
| response = self.session.get(url, timeout=15) | |
| response.raise_for_status() | |
| # Use BeautifulSoup to parse the HTML content | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove script and style elements | |
| for script_or_style in soup(["script", "style"]): | |
| script_or_style.decompose() | |
| if extraction_mode == "text": | |
| return self._extract_text(soup, url) | |
| elif extraction_mode == "tables": | |
| return self._extract_tables(soup, url) | |
| elif extraction_mode == "lists": | |
| return self._extract_lists(soup, url) | |
| elif extraction_mode == "structured": | |
| return self._extract_structured_data(soup, url) | |
| else: | |
| return {"error": f"Unknown extraction mode: {extraction_mode}"} | |
| def _handle_wikipedia(self, url, extraction_mode): | |
| """Special handling for Wikipedia pages""" | |
| # For Wikipedia, try to use the API instead of scraping the HTML | |
| parsed_url = urlparse(url) | |
| if not parsed_url.netloc.endswith('wikipedia.org'): | |
| return self._process_regular_webpage(url, extraction_mode) | |
| # Extract the title from the URL path | |
| path_parts = parsed_url.path.split('/') | |
| if len(path_parts) < 3 or path_parts[1] != 'wiki': | |
| # Not a standard Wikipedia article URL | |
| return self._process_regular_webpage(url, extraction_mode) | |
| title = path_parts[2] | |
| lang = parsed_url.netloc.split('.')[0] | |
| # Use Wikipedia API to get structured content | |
| api_url = f"https://{lang}.wikipedia.org/api/rest_v1/page/summary/{title}" | |
| try: | |
| logger.info(f"Fetching Wikipedia API data from {api_url}") | |
| api_response = self.session.get(api_url, timeout=15) | |
| api_response.raise_for_status() | |
| api_data = api_response.json() | |
| # Basic information from the API | |
| wiki_data = { | |
| "title": api_data.get("title", ""), | |
| "description": api_data.get("description", ""), | |
| "extract": api_data.get("extract", ""), | |
| "url": api_data.get("content_urls", {}).get("desktop", {}).get("page", url) | |
| } | |
| # If we need more detailed data beyond the summary | |
| if extraction_mode in ["tables", "structured"]: | |
| # Get the full HTML anyway for tables and other structured data | |
| response = self.session.get(url, timeout=15) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Add tables to the response | |
| tables = self._extract_tables(soup, url, return_raw=False) | |
| wiki_data["tables"] = tables.get("tables", []) | |
| # For "structured" mode, add sections, infobox and other elements | |
| if extraction_mode == "structured": | |
| wiki_data["infobox"] = self._extract_wikipedia_infobox(soup) | |
| wiki_data["sections"] = self._extract_wikipedia_sections(soup) | |
| return { | |
| "source": "wikipedia_api_enhanced", | |
| "url": url, | |
| "data": wiki_data | |
| } | |
| # For basic text, return the API data | |
| return { | |
| "source": "wikipedia_api", | |
| "url": url, | |
| "data": wiki_data | |
| } | |
| except (requests.exceptions.RequestException, ValueError) as e: | |
| logger.warning(f"Wikipedia API request failed: {e}. Falling back to HTML scraping.") | |
| # Fallback to normal HTML processing | |
| return self._process_regular_webpage(url, extraction_mode) | |
| def _extract_text(self, soup, url): | |
| """Extract clean text from the page""" | |
| text_from_soup = soup.get_text(separator='\n', strip=True) | |
| # Convert multiple newlines to a single newline and clean spaces within lines | |
| cleaned_lines = [] | |
| for line in text_from_soup.splitlines(): | |
| line = line.strip() # Strip leading/trailing whitespace | |
| if line: # Only process non-empty lines | |
| # Replace multiple spaces with a single space | |
| cleaned_line = ' '.join(line.split()) | |
| cleaned_lines.append(cleaned_line) | |
| text = '\n'.join(cleaned_lines) | |
| if not text: | |
| return {"error": f"No text content found at {url}."} | |
| return { | |
| "source": "web_page", | |
| "url": url, | |
| "content_type": "text", | |
| "text": text | |
| } | |
| def _extract_tables(self, soup, url, return_raw=True): | |
| """Extract tables from the page""" | |
| tables = [] | |
| # Find all table elements | |
| html_tables = soup.find_all('table') | |
| for i, table in enumerate(html_tables): | |
| try: | |
| # Try to convert to a pandas DataFrame | |
| dfs = pd.read_html(str(table)) | |
| if dfs: | |
| # Convert each DataFrame to a dict for JSON serialization | |
| for j, df in enumerate(dfs): | |
| # Clean column names | |
| df.columns = [str(col).strip() for col in df.columns] | |
| # Convert DataFrame to dict | |
| table_dict = { | |
| "table_id": f"table_{i}_{j}", | |
| "headers": df.columns.tolist(), | |
| "rows": df.values.tolist(), | |
| } | |
| tables.append(table_dict) | |
| except Exception as e: | |
| logger.warning(f"Failed to parse table {i}: {e}") | |
| # Try a manual extraction | |
| try: | |
| headers = [] | |
| header_row = table.find('tr') | |
| if header_row: | |
| headers = [th.get_text(strip=True) for th in header_row.find_all(['th', 'td'])] | |
| rows = [] | |
| for tr in table.find_all('tr'): | |
| row = [td.get_text(strip=True) for td in tr.find_all(['td', 'th'])] | |
| if row and row != headers: # Skip header row in data | |
| rows.append(row) | |
| if headers or rows: | |
| tables.append({ | |
| "table_id": f"table_{i}_manual", | |
| "headers": headers, | |
| "rows": rows | |
| }) | |
| except Exception: | |
| continue # Skip if manual extraction also fails | |
| if return_raw: | |
| return { | |
| "source": "web_page", | |
| "url": url, | |
| "content_type": "tables", | |
| "table_count": len(tables), | |
| "tables": tables | |
| } | |
| else: | |
| return {"tables": tables} | |
| def _extract_lists(self, soup, url): | |
| """Extract lists from the page""" | |
| lists = [] | |
| # Find all ul and ol elements | |
| for list_type in ['ul', 'ol']: | |
| list_elements = soup.find_all(list_type, recursive=True) | |
| for i, list_elem in enumerate(list_elements): | |
| # Skip nested lists to avoid duplication | |
| if list_elem.parent.name in ['li', 'ul', 'ol']: | |
| continue | |
| items = [] | |
| for li in list_elem.find_all('li', recursive=False): | |
| # Get text but exclude any nested lists | |
| for nested_list in li.find_all(['ul', 'ol']): | |
| nested_list.decompose() | |
| item_text = li.get_text(strip=True) | |
| if item_text: | |
| items.append(item_text) | |
| if items: | |
| lists.append({ | |
| "list_id": f"{list_type}_{i}", | |
| "list_type": "ordered" if list_type == "ol" else "unordered", | |
| "items": items | |
| }) | |
| return { | |
| "source": "web_page", | |
| "url": url, | |
| "content_type": "lists", | |
| "list_count": len(lists), | |
| "lists": lists | |
| } | |
| def _extract_structured_data(self, soup, url): | |
| """Extract various types of structured data from the page""" | |
| result = { | |
| "source": "web_page", | |
| "url": url, | |
| "content_type": "structured", | |
| "title": soup.title.string if soup.title else "", | |
| "meta_description": "", | |
| } | |
| # Extract meta description | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| if meta_desc: | |
| result["meta_description"] = meta_desc.get('content', '') | |
| # Extract main text content | |
| text_result = self._extract_text(soup, url) | |
| if "text" in text_result: | |
| result["text"] = text_result["text"] | |
| # Extract tables | |
| tables_result = self._extract_tables(soup, url, return_raw=False) | |
| result["tables"] = tables_result.get("tables", []) | |
| # Extract lists | |
| lists_result = self._extract_lists(soup, url) | |
| result["lists"] = lists_result.get("lists", []) | |
| # Extract headings for document structure | |
| headings = [] | |
| for i, heading in enumerate(soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])): | |
| headings.append({ | |
| "id": f"heading_{i}", | |
| "level": int(heading.name[1]), | |
| "text": heading.get_text(strip=True) | |
| }) | |
| result["headings"] = headings | |
| # Look for JSON-LD structured data | |
| json_ld_data = [] | |
| for script in soup.find_all('script', type='application/ld+json'): | |
| try: | |
| json_data = json.loads(script.string) | |
| json_ld_data.append(json_data) | |
| except (json.JSONDecodeError, ValueError): | |
| continue | |
| if json_ld_data: | |
| result["structured_data"] = json_ld_data | |
| return result | |
| def _extract_wikipedia_infobox(self, soup): | |
| """Extract information from Wikipedia infobox""" | |
| infobox = {} | |
| # Look for the infobox table | |
| infobox_table = soup.find('table', class_=['infobox', 'vcard']) | |
| if infobox_table: | |
| for row in infobox_table.find_all('tr'): | |
| # Look for th/td pairs | |
| header = row.find('th') | |
| value = row.find('td') | |
| if header and value: | |
| key = header.get_text(strip=True) | |
| # Clean up the value text | |
| for sup in value.find_all('sup'): | |
| sup.decompose() # Remove reference superscripts | |
| val = value.get_text(strip=True) | |
| if key and val: | |
| infobox[key] = val | |
| return infobox | |
| def _extract_wikipedia_sections(self, soup): | |
| """Extract sections and their content from Wikipedia""" | |
| sections = [] | |
| current_section = None | |
| # Find all headings | |
| headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']) | |
| for heading in headings: | |
| # Skip non-content headings | |
| if heading.get('id') in ['firstHeading', 'mw-toc-heading']: | |
| continue | |
| level = int(heading.name[1]) | |
| title = heading.get_text(strip=True) | |
| # Start a new section | |
| current_section = { | |
| "level": level, | |
| "title": title, | |
| "content": "" | |
| } | |
| # Get content until next heading | |
| content_elements = [] | |
| sibling = heading.next_sibling | |
| while sibling and not (sibling.name and sibling.name.startswith('h')): | |
| if sibling.name in ['p', 'ul', 'ol']: | |
| content_elements.append(sibling.get_text(strip=True)) | |
| sibling = sibling.next_sibling | |
| if content_elements: | |
| current_section["content"] = "\n".join(content_elements) | |
| sections.append(current_section) | |
| return sections | |
| if __name__ == '__main__': | |
| browser = WebBrowser() # Instantiation remains the same for testing | |
| # Example usage: | |
| # Note: For a real agent, the URL would come from the task or a search step. | |
| # This example uses a known Wikipedia page for demonstration. | |
| # For tasks like "How many studio albums were published by Mercedes Sosa...", | |
| # the agent would first need to find the relevant Wikipedia URL. | |
| test_url_wikipedia = "https://en.wikipedia.org/wiki/Mercedes_Sosa" | |
| print(f"--- Browsing: {test_url_wikipedia} ---") | |
| # For testing, call 'forward' directly | |
| content_wikipedia = browser.forward(test_url_wikipedia) | |
| if content_wikipedia.startswith("Error:"): | |
| print(content_wikipedia) | |
| else: | |
| # Print first 1000 characters for brevity in example | |
| print(content_wikipedia[:1000] + "..." if len(content_wikipedia) > 1000 else content_wikipedia) | |
| print("\n--- Example with a non-existent page ---") | |
| test_url_non_existent = "http://example.com/nonexistentpage12345.html" | |
| content_non_existent = browser.forward(test_url_non_existent) | |
| print(content_non_existent) | |
| print("\n--- Example with an invalid URL format ---") | |
| test_url_invalid_format = "www.google.com" | |
| content_invalid_format = browser.forward(test_url_invalid_format) | |
| print(content_invalid_format) |