Spaces:
Sleeping
Sleeping
| """Extractor agent for data extraction with selectors.""" | |
| import re | |
| from typing import Any | |
| from app.core.action import Action, ActionType | |
| from app.core.observation import Observation, PageElement | |
| from .base import BaseAgent | |
| class ExtractorAgent(BaseAgent): | |
| """ | |
| Agent responsible for extracting structured data from pages. | |
| The ExtractorAgent handles: | |
| - Identifying data elements using CSS/XPath selectors | |
| - Extracting text, attributes, and structured content | |
| - Handling tables and lists | |
| - Post-processing extracted values | |
| - Confidence scoring for extractions | |
| """ | |
| def __init__( | |
| self, | |
| agent_id: str = "extractor", | |
| config: dict[str, Any] | None = None, | |
| ): | |
| """ | |
| Initialize the ExtractorAgent. | |
| Args: | |
| agent_id: Unique identifier for this agent. | |
| config: Optional configuration with keys: | |
| - min_confidence: Minimum confidence to accept extraction | |
| - extraction_timeout: Timeout for extraction operations | |
| - enable_fuzzy_matching: Enable fuzzy text matching | |
| """ | |
| super().__init__(agent_id, config) | |
| self.min_confidence = self.config.get("min_confidence", 0.5) | |
| self.extraction_timeout = self.config.get("extraction_timeout", 5000) | |
| self.enable_fuzzy_matching = self.config.get("enable_fuzzy_matching", True) | |
| self._extraction_cache: dict[str, Any] = {} | |
| self._selector_patterns: dict[str, list[str]] = self._init_selector_patterns() | |
| def _init_selector_patterns(self) -> dict[str, list[str]]: | |
| """Initialize common selector patterns for different field types.""" | |
| return { | |
| "price": [ | |
| "[class*='price']", | |
| "[id*='price']", | |
| "[itemprop='price']", | |
| ".product-price", | |
| ".item-price", | |
| "span[data-price]", | |
| ], | |
| "title": [ | |
| "h1", | |
| "[class*='title']", | |
| "[itemprop='name']", | |
| ".product-title", | |
| ".item-title", | |
| ], | |
| "description": [ | |
| "[class*='description']", | |
| "[itemprop='description']", | |
| ".product-description", | |
| "article p", | |
| ".content p", | |
| ], | |
| "image": [ | |
| "[class*='product-image'] img", | |
| "[itemprop='image']", | |
| ".main-image img", | |
| "figure img", | |
| ], | |
| "date": [ | |
| "time", | |
| "[datetime]", | |
| "[class*='date']", | |
| "[itemprop='datePublished']", | |
| ], | |
| "author": [ | |
| "[class*='author']", | |
| "[itemprop='author']", | |
| "[rel='author']", | |
| ".byline", | |
| ], | |
| } | |
| async def act(self, observation: Observation) -> Action: | |
| """ | |
| Select the best extraction action based on observation. | |
| Analyzes the page and decides what data to extract next. | |
| Args: | |
| observation: The current state observation. | |
| Returns: | |
| The extraction action to execute. | |
| """ | |
| try: | |
| # Get remaining fields to extract | |
| remaining_fields = observation.fields_remaining | |
| if not remaining_fields: | |
| return Action( | |
| action_type=ActionType.DONE, | |
| parameters={"success": True, "message": "All fields extracted"}, | |
| reasoning="No more fields to extract", | |
| confidence=1.0, | |
| agent_id=self.agent_id, | |
| ) | |
| # Pick the next field to extract | |
| field_name = remaining_fields[0] | |
| # Find best selector for the field | |
| selector, confidence = await self._find_selector_for_field( | |
| field_name, | |
| observation, | |
| ) | |
| if selector and confidence >= self.min_confidence: | |
| return self._create_extraction_action( | |
| field_name, | |
| selector, | |
| confidence, | |
| ) | |
| # Try alternative extraction methods | |
| alt_action = await self._try_alternative_extraction( | |
| field_name, | |
| observation, | |
| ) | |
| if alt_action: | |
| return alt_action | |
| # Cannot extract this field | |
| return Action( | |
| action_type=ActionType.EXTRACT_FIELD, | |
| parameters={ | |
| "field_name": field_name, | |
| "selector": None, | |
| "extraction_method": "llm", | |
| }, | |
| reasoning=f"No selector found, using LLM extraction for {field_name}", | |
| confidence=0.4, | |
| agent_id=self.agent_id, | |
| ) | |
| except Exception as e: | |
| return Action( | |
| action_type=ActionType.FAIL, | |
| parameters={"success": False, "message": str(e)}, | |
| reasoning=f"Extraction error: {e}", | |
| confidence=1.0, | |
| agent_id=self.agent_id, | |
| ) | |
| async def plan(self, observation: Observation) -> list[Action]: | |
| """ | |
| Create an extraction plan for all remaining fields. | |
| Analyzes the page structure and plans the optimal | |
| extraction sequence. | |
| Args: | |
| observation: The current state observation. | |
| Returns: | |
| A list of planned extraction actions. | |
| """ | |
| try: | |
| actions: list[Action] = [] | |
| remaining_fields = observation.fields_remaining | |
| for field_name in remaining_fields: | |
| selector, confidence = await self._find_selector_for_field( | |
| field_name, | |
| observation, | |
| ) | |
| if selector: | |
| actions.append( | |
| self._create_extraction_action( | |
| field_name, | |
| selector, | |
| confidence, | |
| ) | |
| ) | |
| else: | |
| # Plan LLM-based extraction as fallback | |
| actions.append( | |
| Action( | |
| action_type=ActionType.EXTRACT_FIELD, | |
| parameters={ | |
| "field_name": field_name, | |
| "extraction_method": "llm", | |
| }, | |
| reasoning=f"Planning LLM extraction for {field_name}", | |
| confidence=0.5, | |
| agent_id=self.agent_id, | |
| ) | |
| ) | |
| return actions | |
| except Exception as e: | |
| return [ | |
| Action( | |
| action_type=ActionType.FAIL, | |
| parameters={"message": f"Extraction planning failed: {e}"}, | |
| reasoning=str(e), | |
| confidence=1.0, | |
| agent_id=self.agent_id, | |
| ) | |
| ] | |
| async def _find_selector_for_field( | |
| self, | |
| field_name: str, | |
| observation: Observation, | |
| ) -> tuple[str | None, float]: | |
| """ | |
| Find the best selector for a field. | |
| Args: | |
| field_name: Name of the field to extract. | |
| observation: Current observation. | |
| Returns: | |
| Tuple of (selector, confidence). | |
| """ | |
| best_selector: str | None = None | |
| best_confidence = 0.0 | |
| # Check predefined patterns first | |
| patterns = self._get_patterns_for_field(field_name) | |
| for pattern in patterns: | |
| element = self._find_element_by_selector( | |
| pattern, | |
| observation.page_elements, | |
| ) | |
| if element: | |
| confidence = self._calculate_confidence(element, field_name) | |
| if confidence > best_confidence: | |
| best_selector = element.selector | |
| best_confidence = confidence | |
| # Search by text content if fuzzy matching enabled | |
| if self.enable_fuzzy_matching and best_confidence < 0.7: | |
| element, confidence = self._find_element_by_text( | |
| field_name, | |
| observation.page_elements, | |
| ) | |
| if element and confidence > best_confidence: | |
| best_selector = element.selector | |
| best_confidence = confidence | |
| return best_selector, best_confidence | |
| def _get_patterns_for_field(self, field_name: str) -> list[str]: | |
| """Get selector patterns for a field type.""" | |
| field_lower = field_name.lower() | |
| # Direct match | |
| if field_lower in self._selector_patterns: | |
| return self._selector_patterns[field_lower] | |
| # Partial match | |
| for key, patterns in self._selector_patterns.items(): | |
| if key in field_lower or field_lower in key: | |
| return patterns | |
| # Generate generic patterns | |
| return [ | |
| f"[class*='{field_lower}']", | |
| f"[id*='{field_lower}']", | |
| f"[data-{field_lower}]", | |
| f".{field_lower}", | |
| f"#{field_lower}", | |
| ] | |
| def _find_element_by_selector( | |
| self, | |
| selector: str, | |
| elements: list[PageElement], | |
| ) -> PageElement | None: | |
| """Find an element matching a selector pattern.""" | |
| selector_lower = selector.lower() | |
| for element in elements: | |
| element_selector = element.selector.lower() | |
| if selector_lower in element_selector: | |
| return element | |
| # Check class and id attributes | |
| classes = element.attributes.get("class", "").lower() | |
| element_id = element.attributes.get("id", "").lower() | |
| if selector_lower.strip(".[#]") in classes: | |
| return element | |
| if selector_lower.strip(".[#]") in element_id: | |
| return element | |
| return None | |
| def _find_element_by_text( | |
| self, | |
| field_name: str, | |
| elements: list[PageElement], | |
| ) -> tuple[PageElement | None, float]: | |
| """Find an element by text content matching.""" | |
| field_lower = field_name.lower().replace("_", " ") | |
| best_element: PageElement | None = None | |
| best_score = 0.0 | |
| for element in elements: | |
| if not element.text: | |
| continue | |
| text_lower = element.text.lower() | |
| # Check for label-like patterns | |
| if f"{field_lower}:" in text_lower or f"{field_lower} :" in text_lower: | |
| score = 0.9 | |
| elif field_lower in text_lower: | |
| # Calculate similarity score | |
| score = len(field_lower) / max(len(text_lower), 1) * 0.8 | |
| else: | |
| continue | |
| if score > best_score: | |
| best_element = element | |
| best_score = score | |
| return best_element, best_score | |
| def _calculate_confidence(self, element: PageElement, field_name: str) -> float: | |
| """Calculate extraction confidence for an element.""" | |
| confidence = 0.5 | |
| # Boost for visible elements | |
| if element.is_visible: | |
| confidence += 0.1 | |
| # Boost for semantic attributes | |
| if element.attributes.get("itemprop"): | |
| confidence += 0.2 | |
| if element.attributes.get("data-field"): | |
| confidence += 0.15 | |
| # Boost if text contains field name | |
| if element.text and field_name.lower() in element.text.lower(): | |
| confidence += 0.1 | |
| # Penalty for very long text (likely not a single field) | |
| if element.text and len(element.text) > 500: | |
| confidence -= 0.2 | |
| return min(1.0, max(0.0, confidence)) | |
| async def _try_alternative_extraction( | |
| self, | |
| field_name: str, | |
| observation: Observation, | |
| ) -> Action | None: | |
| """Try alternative extraction methods.""" | |
| # Check for table data | |
| for element in observation.page_elements: | |
| if element.tag in ("table", "tbody"): | |
| return Action( | |
| action_type=ActionType.EXTRACT_TABLE, | |
| parameters={ | |
| "table_selector": element.selector, | |
| "target_field": field_name, | |
| }, | |
| reasoning=f"Extracting {field_name} from table", | |
| confidence=0.6, | |
| agent_id=self.agent_id, | |
| ) | |
| # Check for list data | |
| for element in observation.page_elements: | |
| if element.tag in ("ul", "ol", "dl"): | |
| return Action( | |
| action_type=ActionType.EXTRACT_LIST, | |
| parameters={ | |
| "container_selector": element.selector, | |
| "item_selector": "li", | |
| "field_selectors": {field_name: "text"}, | |
| }, | |
| reasoning=f"Extracting {field_name} from list", | |
| confidence=0.55, | |
| agent_id=self.agent_id, | |
| ) | |
| return None | |
| def _create_extraction_action( | |
| self, | |
| field_name: str, | |
| selector: str, | |
| confidence: float, | |
| ) -> Action: | |
| """Create an extraction action.""" | |
| return Action( | |
| action_type=ActionType.EXTRACT_FIELD, | |
| parameters={ | |
| "field_name": field_name, | |
| "selector": selector, | |
| "extraction_method": "text", | |
| }, | |
| reasoning=f"Extracting {field_name} using selector: {selector}", | |
| confidence=confidence, | |
| agent_id=self.agent_id, | |
| ) | |
| def extract_with_regex( | |
| self, | |
| text: str, | |
| pattern: str, | |
| group: int = 0, | |
| ) -> str | None: | |
| """ | |
| Extract text using a regex pattern. | |
| Args: | |
| text: The text to search in. | |
| pattern: Regex pattern. | |
| group: Capture group to return. | |
| Returns: | |
| Extracted text or None. | |
| """ | |
| try: | |
| match = re.search(pattern, text) | |
| if match: | |
| return match.group(group) | |
| return None | |
| except re.error: | |
| return None | |
| def post_process_value( | |
| self, | |
| value: Any, | |
| field_name: str, | |
| ) -> Any: | |
| """ | |
| Post-process an extracted value based on field type. | |
| Args: | |
| value: The raw extracted value. | |
| field_name: Name of the field (used to infer type). | |
| Returns: | |
| Processed value. | |
| """ | |
| if value is None: | |
| return None | |
| value_str = str(value).strip() | |
| field_lower = field_name.lower() | |
| # Price processing | |
| if "price" in field_lower: | |
| # Remove currency symbols but keep numbers and decimal | |
| price_match = re.search(r"[\d,]+\.?\d*", value_str.replace(",", "")) | |
| if price_match: | |
| return float(price_match.group().replace(",", "")) | |
| # Date processing | |
| if "date" in field_lower: | |
| return value_str # Return as-is, let caller parse | |
| # Number processing | |
| if any(x in field_lower for x in ["count", "quantity", "number"]): | |
| num_match = re.search(r"\d+", value_str) | |
| if num_match: | |
| return int(num_match.group()) | |
| return value_str | |
| def reset(self) -> None: | |
| """Reset the extractor state.""" | |
| super().reset() | |
| self._extraction_cache.clear() | |