Spaces:
Runtime error
Runtime error
| from smolagents.tools import Tool | |
| from typing import Dict, Any, Optional | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| import json | |
| import pandas as pd | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class WebContentExtractor(Tool): | |
| """ | |
| Specialized tool for extracting structured content from specific websites. | |
| Has optimized extractors for Wikipedia, tabular data, and common content patterns. | |
| """ | |
| name = "web_content_extractor" | |
| description = "Extracts structured data from websites with specialized handlers for Wikipedia and other content types." | |
| inputs = { | |
| 'url': {'type': 'string', 'description': 'The URL of the web page to extract content from.'}, | |
| 'target_type': {'type': 'string', 'description': 'Type of content to extract: "info", "table", "list", or "specific_data".'}, | |
| 'extraction_details': {'type': 'object', 'description': 'Additional details for extraction (e.g., table index, data label).', 'nullable': True} | |
| } | |
| outputs = {'result': {'type': 'object', 'description': 'The extracted content as structured data.'}} | |
| output_type = "object" | |
| def __init__(self, user_agent="GAIA-Agent/1.0", *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.headers = {"User-Agent": user_agent} | |
| self.session = requests.Session() | |
| self.session.headers.update(self.headers) | |
| self.is_initialized = True | |
| def forward(self, url: str, target_type: str, extraction_details: Optional[Dict] = None) -> Dict[str, Any]: | |
| """ | |
| Extract specific content from a web page. | |
| Args: | |
| url: URL of the web page | |
| target_type: Type of content to extract ("info", "table", "list", "specific_data") | |
| extraction_details: Additional details for extraction | |
| Returns: | |
| Dict with extracted content or error message | |
| """ | |
| if not extraction_details: | |
| extraction_details = {} | |
| # Validate URL | |
| if not url.startswith(('http://', 'https://')): | |
| return {"error": f"Invalid URL format: {url}"} | |
| try: | |
| # For Wikipedia, use specialized extraction | |
| if 'wikipedia.org' in url: | |
| return self._extract_from_wikipedia(url, target_type, extraction_details) | |
| # For general websites | |
| response = self.session.get(url, timeout=15) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Handle different extraction types | |
| if target_type == "info": | |
| return self._extract_general_info(soup, url) | |
| elif target_type == "table": | |
| return self._extract_table(soup, url, extraction_details) | |
| elif target_type == "list": | |
| return self._extract_list(soup, url, extraction_details) | |
| elif target_type == "specific_data": | |
| return self._extract_specific_data(soup, url, extraction_details) | |
| else: | |
| return {"error": f"Unknown extraction type: {target_type}"} | |
| except requests.exceptions.RequestException as e: | |
| return {"error": f"Request error: {str(e)}"} | |
| except Exception as e: | |
| return {"error": f"Extraction error: {str(e)}"} | |
| def _extract_general_info(self, soup, url): | |
| """Extract general information from a web page""" | |
| title = soup.title.string if soup.title else "No title found" | |
| # Try to get meta description | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| description = meta_desc.get('content', '') if meta_desc else "No description found" | |
| # Get main headings | |
| main_headings = [h1.get_text(strip=True) for h1 in soup.find_all('h1')] | |
| # Get key facts (look for definition lists, key-value pairs) | |
| key_facts = {} | |
| # Check for definition lists | |
| for dl in soup.find_all('dl'): | |
| for dt, dd in zip(dl.find_all('dt'), dl.find_all('dd')): | |
| key = dt.get_text(strip=True) | |
| value = dd.get_text(strip=True) | |
| if key and value: | |
| key_facts[key] = value | |
| # Get text from first few paragraphs for a summary | |
| paragraphs = soup.find_all('p') | |
| summary = "" | |
| para_count = 0 | |
| for p in paragraphs: | |
| text = p.get_text(strip=True) | |
| if len(text) > 50: # Only include substantial paragraphs | |
| summary += text + "\n\n" | |
| para_count += 1 | |
| if para_count >= 3: # Limit to first 3 substantial paragraphs | |
| break | |
| return { | |
| "title": title, | |
| "url": url, | |
| "description": description, | |
| "main_headings": main_headings, | |
| "key_facts": key_facts, | |
| "summary": summary.strip() | |
| } | |
| def _extract_table(self, soup, url, details): | |
| """Extract table data from a web page""" | |
| table_index = details.get('table_index', 0) | |
| # Find all tables | |
| tables = soup.find_all('table') | |
| if not tables: | |
| return {"error": "No tables found on the page"} | |
| if table_index >= len(tables): | |
| return {"error": f"Table index {table_index} is out of range. Found {len(tables)} tables."} | |
| try: | |
| # Try to use pandas to extract the table | |
| table = tables[table_index] | |
| dfs = pd.read_html(str(table)) | |
| if not dfs: | |
| return {"error": "Failed to parse table with pandas"} | |
| df = dfs[0] | |
| # Convert to dictionary format | |
| headers = df.columns.tolist() | |
| rows = df.values.tolist() | |
| return { | |
| "table_data": { | |
| "headers": headers, | |
| "rows": rows | |
| }, | |
| "row_count": len(rows), | |
| "column_count": len(headers), | |
| "url": url | |
| } | |
| except Exception as e: | |
| # Fallback to manual extraction | |
| logger.warning(f"Pandas table extraction failed: {e}. Falling back to manual extraction.") | |
| table = tables[table_index] | |
| headers = [] | |
| rows = [] | |
| # Try to find headers | |
| thead = table.find('thead') | |
| if thead: | |
| header_row = thead.find('tr') | |
| if header_row: | |
| headers = [th.get_text(strip=True) for th in header_row.find_all(['th', 'td'])] | |
| # If no thead, use first row as header | |
| if not headers: | |
| first_row = table.find('tr') | |
| if first_row: | |
| headers = [th.get_text(strip=True) for th in first_row.find_all(['th', 'td'])] | |
| # Extract rows | |
| for tr in table.find_all('tr'): | |
| row = [td.get_text(strip=True) for td in tr.find_all(['td', 'th'])] | |
| if row and row != headers: # Skip header row in data | |
| rows.append(row) | |
| return { | |
| "table_data": { | |
| "headers": headers, | |
| "rows": rows | |
| }, | |
| "row_count": len(rows), | |
| "column_count": len(headers) if headers else (len(rows[0]) if rows else 0), | |
| "url": url, | |
| "extraction_method": "manual_fallback" | |
| } | |
| def _extract_list(self, soup, url, details): | |
| """Extract list data from a web page""" | |
| list_type = details.get('list_type', 'all') # 'ul', 'ol', or 'all' | |
| position = details.get('position', 0) # Which list to extract (0-based index) | |
| list_elements = [] | |
| if list_type == 'ul' or list_type == 'all': | |
| list_elements.extend(soup.find_all('ul')) | |
| if list_type == 'ol' or list_type == 'all': | |
| list_elements.extend(soup.find_all('ol')) | |
| if not list_elements: | |
| return {"error": "No lists found on the page"} | |
| if position >= len(list_elements): | |
| return {"error": f"List position {position} is out of range. Found {len(list_elements)} lists."} | |
| target_list = list_elements[position] | |
| items = [] | |
| for li in target_list.find_all('li', recursive=False): | |
| # Ignore nested lists | |
| for nested_list in li.find_all(['ul', 'ol']): | |
| nested_list.decompose() | |
| item_text = li.get_text(strip=True) | |
| if item_text: | |
| items.append(item_text) | |
| return { | |
| "list_type": target_list.name, # 'ul' or 'ol' | |
| "items": items, | |
| "count": len(items), | |
| "url": url | |
| } | |
| def _extract_specific_data(self, soup, url, details): | |
| """Extract specific data based on given selectors or patterns""" | |
| data_label = details.get('data_label', '') | |
| selector = details.get('selector', '') | |
| attribute = details.get('attribute', '') | |
| regex_pattern = details.get('regex_pattern', '') | |
| result = { | |
| "url": url, | |
| "data_label": data_label, | |
| "found": False | |
| } | |
| # Try CSS selector if provided | |
| if selector: | |
| elements = soup.select(selector) | |
| if elements: | |
| result["found"] = True | |
| if attribute: | |
| # Extract attribute value | |
| values = [elem.get(attribute, '') for elem in elements] | |
| result["values"] = values | |
| else: | |
| # Extract text content | |
| values = [elem.get_text(strip=True) for elem in elements] | |
| result["values"] = values | |
| # If only one value, simplify the result | |
| if len(values) == 1: | |
| result["value"] = values[0] | |
| return result | |
| # Try regex pattern if provided | |
| if regex_pattern: | |
| page_text = soup.get_text() | |
| matches = re.findall(regex_pattern, page_text) | |
| if matches: | |
| result["found"] = True | |
| result["matches"] = matches | |
| # If only one match, simplify the result | |
| if len(matches) == 1: | |
| result["value"] = matches[0] | |
| return result | |
| # Try common patterns based on data_label | |
| if data_label: | |
| # Look for label in text | |
| label_pattern = re.compile(rf'{re.escape(data_label)}\s*[:=-]?\s*([\w\s,.()-]+)', re.IGNORECASE) | |
| page_text = soup.get_text() | |
| match = label_pattern.search(page_text) | |
| if match: | |
| result["found"] = True | |
| result["value"] = match.group(1).strip() | |
| return result | |
| # Look for label in headings followed by paragraph | |
| for heading in soup.find_all(['h1', 'h2', 'h3', 'h4']): | |
| if data_label.lower() in heading.get_text().lower(): | |
| next_sibling = heading.find_next_sibling() | |
| if next_sibling and next_sibling.name == 'p': | |
| result["found"] = True | |
| result["value"] = next_sibling.get_text(strip=True) | |
| return result | |
| # If nothing found | |
| return result | |
| def _extract_from_wikipedia(self, url, target_type, details): | |
| """Specialized extraction for Wikipedia pages using APIs when possible""" | |
| # Extract page title from URL | |
| title = url.split('/')[-1] | |
| # Determine Wikipedia language | |
| domain = url.split('//')[1].split('.')[0] | |
| try: | |
| # First try the Wikipedia API | |
| api_url = f"https://{domain}.wikipedia.org/api/rest_v1/page/summary/{title}" | |
| response = self.session.get(api_url, timeout=15) | |
| response.raise_for_status() | |
| api_data = response.json() | |
| # For info requests, we can use just the API data | |
| if target_type == "info": | |
| return { | |
| "title": api_data.get("title", ""), | |
| "description": api_data.get("description", ""), | |
| "extract": api_data.get("extract", ""), | |
| "url": url, | |
| "source": "wikipedia_api" | |
| } | |
| # For other requests, we need to fetch the HTML as well | |
| html_response = self.session.get(url, timeout=15) | |
| html_response.raise_for_status() | |
| soup = BeautifulSoup(html_response.content, 'html.parser') | |
| if target_type == "table": | |
| # Get the infobox if requested | |
| if details.get('infobox', False): | |
| infobox = {} | |
| infobox_div = soup.find('table', {'class': 'infobox'}) | |
| if infobox_div: | |
| for row in infobox_div.find_all('tr'): | |
| header = row.find('th') | |
| data = row.find('td') | |
| if header and data: | |
| key = header.get_text(strip=True) | |
| value = data.get_text(strip=True) | |
| if key and value: | |
| infobox[key] = value | |
| return { | |
| "title": api_data.get("title", ""), | |
| "infobox": infobox, | |
| "url": url, | |
| "source": "wikipedia_infobox" | |
| } | |
| # Regular table extraction | |
| return self._extract_table(soup, url, details) | |
| elif target_type == "list": | |
| return self._extract_list(soup, url, details) | |
| elif target_type == "specific_data": | |
| # Enhanced extraction for Wikipedia specific data | |
| data_label = details.get('data_label', '') | |
| # Try to find it in infobox first | |
| infobox = soup.find('table', {'class': 'infobox'}) | |
| if infobox and data_label: | |
| for row in infobox.find_all('tr'): | |
| header = row.find('th') | |
| if header and data_label.lower() in header.get_text().lower(): | |
| data = row.find('td') | |
| if data: | |
| return { | |
| "found": True, | |
| "value": data.get_text(strip=True), | |
| "source": "wikipedia_infobox", | |
| "url": url | |
| } | |
| # Fallback to regular specific data extraction | |
| return self._extract_specific_data(soup, url, details) | |
| except Exception as e: | |
| logger.warning(f"Wikipedia API extraction failed: {e}. Falling back to HTML extraction.") | |
| # Fallback to regular HTML extraction | |
| try: | |
| response = self.session.get(url, timeout=15) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| if target_type == "info": | |
| return self._extract_general_info(soup, url) | |
| elif target_type == "table": | |
| return self._extract_table(soup, url, details) | |
| elif target_type == "list": | |
| return self._extract_list(soup, url, details) | |
| elif target_type == "specific_data": | |
| return self._extract_specific_data(soup, url, details) | |
| except Exception as fallback_error: | |
| return {"error": f"Wikipedia extraction error: {fallback_error}"} |