"""Scraping endpoints with SSE and websocket live updates.""" from __future__ import annotations import asyncio import csv import io import json import logging import os import re import shutil import tempfile import time import uuid from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, AsyncGenerator from urllib.error import HTTPError, URLError from urllib.parse import quote_plus, urljoin, urlparse from urllib.request import Request, urlopen from bs4 import BeautifulSoup from fastapi import APIRouter, BackgroundTasks, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from app.config import Settings from app.api.deps import ( MemoryManagerDep, SettingsDep, get_model_router, create_environment, remove_environment, ) from app.models.router import SmartModelRouter, TaskType from app.api.routes.plugins import PLUGIN_REGISTRY from app.api.routes.websocket import get_connection_manager from app.core.action import Action, ActionType from app.memory.manager import MemoryManager, MemoryType from app.plugins.python_sandbox import ( DEFAULT_ANALYSIS_CODE, SandboxExecutionResult, execute_python_sandbox, ) from app.search.engine import SearchEngineRouter from app.search.providers.duckduckgo import DuckDuckGoProvider from app.sites import match_site_template, serialize_site_template logger = logging.getLogger(__name__) router = APIRouter(prefix="/scrape", tags=["Scraping"]) def parse_html(html: str) -> BeautifulSoup: """Parse HTML string into BeautifulSoup object.""" return BeautifulSoup(html, "html.parser") class OutputFormat(str, Enum): """Supported output formats.""" JSON = "json" CSV = "csv" MARKDOWN = "markdown" TEXT = "text" class TaskComplexity(str, Enum): """Task complexity levels.""" LOW = "low" MEDIUM = "medium" HIGH = "high" class ScrapeRequest(BaseModel): """Request model for scraping.""" assets: list[str] = Field(..., description="List of URLs or asset identifiers") instructions: str = Field(..., description="Scraping instructions") output_instructions: str = Field( default="Return as JSON", description="Output format instructions", ) output_format: OutputFormat = Field( default=OutputFormat.JSON, description="Desired output format", ) complexity: TaskComplexity = Field( default=TaskComplexity.MEDIUM, description="Task complexity", ) session_id: str | None = Field(default=None, description="Optional client-provided session ID") model: str = Field(default="llama-3.3-70b", description="AI model to use") provider: str = Field(default="nvidia", description="AI provider") enable_memory: bool = Field(default=True, description="Enable memory features") enable_plugins: list[str] = Field(default_factory=list, description="Enabled plugin IDs") selected_agents: list[str] = Field(default_factory=list, description="Enabled agent roles/modules") max_steps: int = Field(default=50, description="Maximum steps per URL") python_code: str | None = Field( default=None, description="Optional sandboxed Python analysis code (must assign to variable `result`)", ) class ScrapeStep(BaseModel): """A single step in the scraping process.""" step_number: int action: str url: str | None = None status: str message: str reward: float = 0.0 extracted_data: dict[str, Any] | None = None duration_ms: float | None = None timestamp: str class ScrapeResponse(BaseModel): """Final scrape response.""" session_id: str status: str total_steps: int total_reward: float extracted_data: dict[str, Any] output: str output_format: OutputFormat duration_seconds: float urls_processed: int errors: list[str] enabled_plugins: list[str] requested_plugins: list[str] selected_agents: list[str] memory_enabled: bool sandbox_artifacts: list[str] = Field(default_factory=list) _active_sessions: dict[str, dict[str, Any]] = {} def _now_iso() -> str: """Return UTC timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() def _sse_event(event: dict[str, Any]) -> str: """Serialize a dictionary as one SSE event.""" return f"data: {json.dumps(event, default=str)}\n\n" def get_session(session_id: str) -> dict[str, Any] | None: """Get an active session by ID.""" return _active_sessions.get(session_id) def _is_agent_plugin_id(plugin_id: str) -> bool: """Check if a plugin id actually belongs to an agent/skill.""" lowered = plugin_id.lower() return lowered.startswith("skill-") or lowered == "web_scraper" def _resolve_enabled_plugins( requested_plugins: list[str], ) -> tuple[list[str], list[str]]: """Resolve requested plugin IDs against installed plugin registry.""" if not requested_plugins: return [], [] available: set[str] = { plugin["id"] for category_name, category in PLUGIN_REGISTRY.items() if category_name != "skills" for plugin in category if plugin.get("installed") } unique_requested = list(dict.fromkeys(requested_plugins)) enabled = [plugin_id for plugin_id in unique_requested if plugin_id in available] missing = [ plugin_id for plugin_id in unique_requested if plugin_id not in available and not _is_agent_plugin_id(plugin_id) ] return enabled, missing def create_session(session_id: str, request: ScrapeRequest, enabled_plugins: list[str]) -> dict[str, Any]: """Create and store a scraping session.""" sandbox_dir = Path(tempfile.mkdtemp(prefix=f"scraperl-session-{session_id}-")) session = { "id": session_id, "request": request, "status": "running", "steps": [], "total_reward": 0.0, "extracted_data": {}, "errors": [], "start_time": time.time(), "current_url_index": 0, "enabled_plugins": enabled_plugins, "resolved_assets": [], "sandbox_dir": str(sandbox_dir), } _active_sessions[session_id] = session return session def update_session(session_id: str, updates: dict[str, Any]) -> dict[str, Any] | None: """Update a session in storage.""" if session_id in _active_sessions: _active_sessions[session_id].update(updates) return _active_sessions[session_id] return None def remove_session(session_id: str) -> bool: """Remove a session from storage.""" if session_id in _active_sessions: sandbox_dir = _active_sessions[session_id].get("sandbox_dir") if sandbox_dir: shutil.rmtree(sandbox_dir, ignore_errors=True) del _active_sessions[session_id] return True return False def _safe_artifact_name(value: str) -> str: """Create a safe artifact filename stem.""" sanitized = re.sub(r"[^a-zA-Z0-9_-]+", "_", value).strip("_") return sanitized[:80] or "artifact" def _write_session_artifact(session: dict[str, Any], file_name: str, content: str) -> None: """Write a text artifact to the session sandbox.""" sandbox_dir = session.get("sandbox_dir") if not sandbox_dir: return path = Path(sandbox_dir) / file_name path.write_text(content, encoding="utf-8") def _write_session_json_artifact(session: dict[str, Any], file_name: str, data: Any) -> None: """Write a JSON artifact to the session sandbox.""" sandbox_dir = session.get("sandbox_dir") if not sandbox_dir: return path = Path(sandbox_dir) / file_name path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8") def _list_session_artifacts(session: dict[str, Any]) -> list[str]: """List files currently written to the session sandbox.""" sandbox_dir = session.get("sandbox_dir") if not sandbox_dir: return [] base = Path(sandbox_dir) if not base.exists(): return [] return sorted([file.name for file in base.iterdir() if file.is_file()]) def _create_tool_call_step( session: dict[str, Any], tool_name: str, description: str, parameters: dict[str, Any], status: str = "running", result: dict[str, Any] | None = None, reward: float = 0.0, url: str | None = None, ) -> dict[str, Any]: """Create a tool call step event.""" step_number = len(session.get("steps", [])) + 1 def _format_arg(value: Any) -> str: rendered = json.dumps(value, default=str) return rendered if len(rendered) <= 40 else f"{rendered[:37]}..." message = f"{tool_name}({', '.join(f'{k}={_format_arg(v)}' for k, v in parameters.items())})" if status == "completed" and result: result_preview = ", ".join(f"{k}={v}" for k, v in list(result.items())[:2]) message = f"{tool_name}() → {result_preview[:50]}" return _record_step( session, ScrapeStep( step_number=step_number, action="tool_call", url=url, status=status, message=message, reward=reward, extracted_data={ "tool_name": tool_name, "tool_description": description, "parameters": parameters, **({"result": result} if result else {}), }, timestamp=_now_iso(), ), ) def _record_step(session: dict[str, Any], step: ScrapeStep) -> dict[str, Any]: """Store and return a step event payload.""" payload = step.model_dump() session["steps"].append(payload) return {"type": "step", "data": payload} def _csv_escape(value: Any) -> str: """Escape one CSV value.""" text = str(value) if any(ch in text for ch in [",", '"', "\n"]): text = '"' + text.replace('"', '""') + '"' return text def _rows_to_csv(rows: list[dict[str, Any]], preferred_headers: list[str] | None = None) -> str: """Render list-of-dicts rows as CSV text.""" if not rows: return "" headers = preferred_headers or list(rows[0].keys()) lines = [",".join(_csv_escape(h) for h in headers)] for row in rows: lines.append(",".join(_csv_escape(row.get(h, "")) for h in headers)) return "\n".join(lines) def _flatten_for_csv(data: dict[str, Any]) -> tuple[list[str], list[list[str]]]: """Flatten extracted dict into CSV headers and rows.""" if not data: return [], [] if all(isinstance(value, dict) for value in data.values()): all_headers = sorted({k for value in data.values() if isinstance(value, dict) for k in value.keys()}) headers = ["asset", *all_headers] rows = [] for asset, values in data.items(): value_dict = values if isinstance(values, dict) else {} row = [_csv_escape(asset), *[_csv_escape(value_dict.get(key, "")) for key in all_headers]] rows.append(row) return headers, rows headers = ["key", "value"] rows = [[_csv_escape(k), _csv_escape(v)] for k, v in data.items()] return headers, rows async def format_output(data: dict[str, Any], output_format: OutputFormat, _instructions: str) -> str: """Format extracted data based on requested output format.""" if output_format == OutputFormat.JSON: return json.dumps(data, indent=2, default=str) if output_format == OutputFormat.CSV: # Check if there's a pre-formatted csv_output if isinstance(data, dict) and "csv_output" in data: return data["csv_output"] # Check for rows format if ( isinstance(data, dict) and isinstance(data.get("rows"), list) and all(isinstance(row, dict) for row in data.get("rows", [])) ): rows = data.get("rows", []) preferred_headers = ( data.get("columns") if isinstance(data.get("columns"), list) else None ) return _rows_to_csv(rows, preferred_headers=preferred_headers) headers, rows = _flatten_for_csv(data) if not headers: return "" lines = [",".join(headers)] lines.extend(",".join(row) for row in rows) return "\n".join(lines) if output_format == OutputFormat.MARKDOWN: lines: list[str] = ["# Extracted Data", ""] for key, value in data.items(): lines.append(f"## {key}") if isinstance(value, dict): for sub_key, sub_value in value.items(): lines.append(f"- **{sub_key}**: {sub_value}") elif isinstance(value, list): for item in value: lines.append(f"- {item}") else: lines.append(f"- {value}") lines.append("") return "\n".join(lines) lines = [f"{key}: {value}" for key, value in data.items()] return "\n".join(lines) def _extract_fields_for_complexity(complexity: TaskComplexity) -> list[str]: """Map complexity level to extraction fields.""" # For agentic scraping, we need to be goal-oriented # These are basic fields, but the planner should navigate intelligently fields = ["title", "content", "links"] if complexity in (TaskComplexity.MEDIUM, TaskComplexity.HIGH): fields.extend(["meta", "images", "data"]) if complexity == TaskComplexity.HIGH: fields.extend(["scripts", "forms", "tables"]) return fields def _plan_from_site_template( site_template: Any, strategy_override: str | None = None, extraction_goal_override: str | None = None, ) -> dict[str, Any]: """Build a navigation plan from a matched site template.""" target_urls = list(site_template.target_urls) if site_template.target_urls else [] if not target_urls and site_template.domains: target_urls = [f"https://{site_template.domains[0]}"] return { "strategy": strategy_override or "intelligent_exploration", "target_urls": target_urls, "navigation_steps": list(site_template.navigation_steps) or [ "Navigate to site and identify relevant sections", "Extract structured fields aligned with instructions", ], "extraction_goal": extraction_goal_override or site_template.extraction_goal, "output_fields": list(site_template.output_fields), "site_template_id": site_template.site_id, "site_template_name": site_template.name, "site_template_domains": list(site_template.domains), } def _create_intelligent_navigation_plan(instructions: str, assets: list[str]) -> dict[str, Any]: """Create an intelligent navigation plan based on user instructions.""" instructions_lower = instructions.lower() site_template = match_site_template(instructions, assets) # Site-specific strategy overrides if site_template and site_template.site_id == "github": # Detect GitHub trending/top repos requests (flexible matching) github_trending_signals = [ "trending" in instructions_lower, "top" in instructions_lower and "repo" in instructions_lower, "top" in instructions_lower and "project" in instructions_lower, "best" in instructions_lower and "repo" in instructions_lower, "popular" in instructions_lower and "repo" in instructions_lower, "this week" in instructions_lower, "this month" in instructions_lower, "today" in instructions_lower and "repo" in instructions_lower, ] if any(github_trending_signals): return _plan_from_site_template( site_template, strategy_override="github_trending", extraction_goal_override="trending_repositories", ) if site_template and site_template.site_id == "reddit": if any( token in instructions_lower for token in ("trending", "popular", "community", "communities", "subreddit", "subreddits") ): return _plan_from_site_template( site_template, strategy_override="reddit_trending", extraction_goal_override="trending_communities", ) if site_template: return _plan_from_site_template(site_template) # News articles detection elif any(word in instructions_lower for word in ["news", "article", "headline"]): return { "strategy": "news_extraction", "navigation_steps": [ "Navigate to main news page", "Extract article headlines and summaries", "Follow article links if needed" ], "extraction_goal": "news_articles", "output_fields": ["headline", "summary", "publish_date", "author"] } # General search/exploration elif any(word in instructions_lower for word in ["search", "find", "explore", "all"]): return { "strategy": "intelligent_exploration", "navigation_steps": [ "Analyze main page for relevant navigation", "Follow relevant links based on instructions", "Extract data according to specified format" ], "extraction_goal": "custom_exploration" } # Default single-page extraction return { "strategy": "single_page", "navigation_steps": ["Extract content from provided URL"], "extraction_goal": "basic_extraction", "site_template_id": None, "site_template_name": None, "site_template_domains": [], } def _is_url_asset(asset: str) -> bool: """Check whether an asset string is a URL.""" return _coerce_url_asset(asset) is not None def _looks_like_host(host: str) -> bool: """Return True when host resembles a real domain, localhost, or IPv4.""" lowered = host.lower() if lowered == "localhost": return True if re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", lowered): return True return bool(re.match(r"^(?:[a-z0-9-]+\.)+[a-z]{2,63}$", lowered)) def _coerce_url_asset(asset: str) -> str | None: """Normalize URL-like asset strings (supports bare domains such as github.com).""" candidate = asset.strip() if not candidate or any(ch.isspace() for ch in candidate): return None normalized = candidate if not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*://", normalized): normalized = f"https://{normalized}" parsed = urlparse(normalized) if parsed.scheme not in {"http", "https"} or not parsed.netloc: return None host = (parsed.hostname or "").strip().lower() if not host or not _looks_like_host(host): return None return normalized def _discover_assets_for_query(query: str) -> list[str]: """Resolve non-URL query assets using deterministic query-aware fallbacks.""" query_l = query.lower() if "gold" in query_l and ("price" in query_l or "trend" in query_l): return [ "https://raw.githubusercontent.com/datasets/gold-prices/master/data/monthly.csv", "https://github.com/datasets/gold-prices", ] encoded = quote_plus(query) # r.jina.ai provides a static, text-friendly rendering of dynamic search pages. return [f"https://r.jina.ai/http://duckduckgo.com/?q={encoded}"] def _fetch_text_render_markdown(url: str, timeout_seconds: int = 12) -> tuple[str, str] | None: """Fetch a URL through r.jina.ai text rendering for dynamic-page fallback extraction.""" normalized = _coerce_url_asset(url) or url if "://" not in normalized: normalized = f"https://{normalized}" proxy_url = _apply_text_render_proxy(normalized, force=True) request = Request( proxy_url, headers={ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept": "text/plain,text/markdown,*/*", }, ) try: with urlopen(request, timeout=timeout_seconds) as response: payload = response.read() markdown = payload.decode("utf-8", errors="replace") if markdown.strip(): return markdown, proxy_url except (HTTPError, URLError, TimeoutError, ValueError) as error: logger.debug("Text-render fallback fetch failed for %s: %s", proxy_url, error) return None async def _search_urls_with_mcp(query: str, max_results: int = 6) -> list[str]: """Use MCP search provider to discover URLs for non-URL assets.""" router = SearchEngineRouter() provider = DuckDuckGoProvider() router.register_provider("duckduckgo", provider, set_default=True) try: await router.initialize() results = await router.search(query=query, max_results=max_results, provider="duckduckgo") urls: list[str] = [] for result in results: url = result.url if hasattr(result, "url") else result.get("url", "") if not _is_url_asset(str(url)): continue if "example.com" in str(url): continue if url not in urls: urls.append(str(url)) return urls except Exception: return [] finally: await router.shutdown() def _build_recovery_queries(base_url: str, instructions: str | None) -> list[str]: """Build generic discovery queries for low-relevance extraction recovery.""" normalized_url = _coerce_url_asset(base_url) or base_url if "://" not in normalized_url: normalized_url = f"https://{normalized_url}" parsed = urlparse(normalized_url) host = (parsed.hostname or "").lower() clean_instructions = (instructions or "").strip() queries: list[str] = [] if host and clean_instructions: queries.append(f"{host} {clean_instructions}") if clean_instructions: queries.append(clean_instructions) if host: queries.append(f"{host} latest trending top") deduped: list[str] = [] for query in queries: normalized = query.strip() if not normalized or normalized in deduped: continue deduped.append(normalized) return deduped def _extract_markdown_link_rows( markdown: str, source_url: str, output_instructions: str | None, instructions: str | None, row_limit: int, ) -> list[dict[str, Any]]: """Extract rows from markdown content using link patterns and line analysis.""" columns = _requested_columns_from_output_instructions(output_instructions) or ["title", "link", "content"] keywords = _instruction_keywords(instructions, max_keywords=8) # Boilerplate patterns to filter out boilerplate_labels = { "home", "about", "contact", "contact us", "help", "search", "press", "copyright", "creator", "creators", "advertise", "developers", "terms", "privacy", "policy & safety", "sign in", "log in", "sign up", "register", "settings", "report history", "send feedback", "learn more", "more info", "test new features", "how youtube works", "nfl sunday ticket", "shorts", "subscriptions", "you", "playlist", "now playing", "skip navigation", } boilerplate_url_tokens = ( "privacy", "terms", "cookie", "contact", "advertis", "copyright", "policy", "press", "help", "about/", "/t/", "legal", "support", "feedback", "settings", "account", "login", "signin", "signup", "ServiceLogin", "accounts.google.com", ) candidate_rows: list[tuple[int, dict[str, Any]]] = [] seen_titles: set[str] = set() seen_links: set[str] = set() # Patterns for extracting content # Match markdown links like [Title](URL) but NOT image links like ![Image](URL) # URL ends at first space, quote, or closing paren content_link_pattern = re.compile(r'(? dict[str, str]: """Get metrics from nearby lines.""" metrics = {"views": "", "likes": "", "comments": "", "date": ""} for offset in range(-window, window + 1): check_idx = line_idx + offset if 0 <= check_idx < len(lines): check_line = lines[check_idx] if not metrics["views"]: m = views_pattern.search(check_line) if m: metrics["views"] = m.group(1) if not metrics["likes"]: m = likes_pattern.search(check_line) if m: metrics["likes"] = m.group(1) if not metrics["comments"]: m = comments_pattern.search(check_line) if m: metrics["comments"] = m.group(1) if not metrics["date"]: m = date_pattern.search(check_line) if m: metrics["date"] = m.group(1) return metrics # Process each line for i, line in enumerate(lines): line = line.strip() if not line or len(line) < 15: continue lowered_line = line.lower() # Skip pure navigation/boilerplate lines if any(label == lowered_line for label in boilerplate_labels): continue # First check for complex links (like Twitch format: [![Image](url) StreamerName Game Live 22K viewers](channel_url)) complex_match = complex_link_pattern.search(line) if complex_match: embedded_text = complex_match.group(1).strip() link = complex_match.group(2).strip() # Parse embedded text: "StreamerName Game Live 22K 22K viewers Use the..." # Remove "Use the..." suffix and Hype Train info embedded_text = re.sub(r'\s*Use the.*$', '', embedded_text) embedded_text = re.sub(r'\s*Hype Train.*$', '', embedded_text, flags=re.IGNORECASE) # Extract viewer count first viewer_match = views_pattern.search(embedded_text) viewers = viewer_match.group(1) if viewer_match else "" # Remove ALL occurrences of viewer count patterns, including orphan "ers"/"viewers" name_game = re.sub(r'\d+(?:[.,]\d+)?[KkMmBb]?\s*(?:views?|viewers?|ers)?', '', embedded_text, flags=re.IGNORECASE) # Remove standalone "ers" or "viewers" that might remain name_game = re.sub(r'\b(?:ers|viewers?|views?)\b', '', name_game, flags=re.IGNORECASE) # Remove "Live" name_game = re.sub(r'\bLive\b', '', name_game, flags=re.IGNORECASE) # Collapse whitespace name_game = re.sub(r'\s+', ' ', name_game).strip() # Split into name and game (heuristic: first word is name, rest is game) parts = name_game.split(maxsplit=1) streamer_name = parts[0] if parts else "" game = parts[1].strip() if len(parts) > 1 else "" if streamer_name and link: link_normalized = link.split('?')[0] if link_normalized not in seen_links: seen_links.add(link_normalized) row: dict[str, Any] = {} for col in columns: lower_col = col.lower() if lower_col in {"url", "link", "href", "channel"}: row[col] = link elif lower_col in {"title", "name", "streamer_name", "streamer", "username"}: row[col] = streamer_name elif lower_col in {"game", "category", "playing"}: row[col] = game elif lower_col in {"views", "view_count", "viewers", "viewer_count"}: row[col] = viewers else: row[col] = "" # Streams with viewers are highly relevant score = 5 if viewers else 2 candidate_rows.append((score, row)) continue # Move to next line # Find content links (not images) for match in content_link_pattern.finditer(line): title = match.group(1).strip() link = match.group(2).strip() # Skip image references in title if title.startswith("Image ") or title.startswith("!["): continue # Skip very short titles (likely navigation) if len(title) < 5: continue # Skip boilerplate titles title_lower = title.lower() if title_lower in boilerplate_labels: continue # Skip titles that are just "#### Something" headers without real content clean_title = re.sub(r'^#+\s*', '', title).strip() if not clean_title or len(clean_title) < 5: continue # Skip if already seen this title or link title_normalized = clean_title.lower()[:50] link_normalized = link.split('?')[0] # Remove query params for dedup if title_normalized in seen_titles: continue if link_normalized in seen_links and "watch" in link.lower(): continue # Skip boilerplate URLs if any(token in link.lower() for token in boilerplate_url_tokens): continue # Get metrics from nearby lines metrics = get_nearby_metrics(i) # Calculate relevance score score_text = f"{clean_title} {link}".lower() keyword_score = sum(1 for kw in keywords if kw in score_text) has_content_marker = any([ "video" in score_text, "music" in score_text, "official" in score_text, metrics["views"], metrics["likes"], "watch" in link.lower(), ]) # Skip if no keyword match and no content markers if keywords and keyword_score == 0 and not has_content_marker: continue # Build row row: dict[str, Any] = {} for col in columns: lower_col = col.lower() if lower_col in {"url", "link", "href"}: row[col] = link elif lower_col in {"title", "name", "text"}: row[col] = clean_title[:160] elif lower_col in {"content", "summary", "description"}: row[col] = clean_title[:320] elif lower_col in {"views", "view_count", "viewers", "points", "score", "upvotes"}: row[col] = metrics["views"] elif lower_col in {"likes", "like_count"}: row[col] = metrics["likes"] elif lower_col in {"comments", "comment_count"}: row[col] = metrics["comments"] elif lower_col in {"date", "date_uploaded", "date_uplaoded", "published", "uploaded"}: row[col] = metrics["date"] else: row[col] = "" # Track seen items seen_titles.add(title_normalized) seen_links.add(link_normalized) # Calculate final score for ranking quality_score = keyword_score if metrics["views"]: quality_score += 3 if metrics["likes"] or metrics["comments"]: quality_score += 1 if "official" in title_lower: quality_score += 1 if "watch" in link.lower(): quality_score += 1 candidate_rows.append((quality_score, row)) # Also look for standalone lines with view counts (sometimes titles are separate from links) # But only if we haven't found enough rows with proper links if len(candidate_rows) < row_limit: for i, views in line_views.items(): if i > 0 and len(candidate_rows) < row_limit * 2: prev_line = lines[i - 1].strip() # Check if previous line might be a title if len(prev_line) > 20 and not prev_line.startswith("![") and not prev_line.startswith("http"): title_normalized = prev_line.lower()[:50] if title_normalized not in seen_titles: # Look for a nearby link nearby_link = None for offset in range(-3, 4): check_idx = i + offset if 0 <= check_idx < len(lines): link_match = content_link_pattern.search(lines[check_idx]) if link_match and "watch" in link_match.group(2).lower(): nearby_link = link_match.group(2) break if nearby_link: # Only add if we found a real link row = {} for col in columns: lower_col = col.lower() if lower_col in {"title", "name", "text"}: row[col] = prev_line[:160] elif lower_col in {"views", "view_count", "viewers"}: row[col] = views elif lower_col in {"url", "link", "href"}: row[col] = nearby_link else: row[col] = "" seen_titles.add(title_normalized) candidate_rows.append((2, row)) # Lower score for these # Sort by score (higher is better) and filter out items without views when we have enough with views candidate_rows.sort(key=lambda x: x[0], reverse=True) # Prefer rows with views with_views = [(score, row) for score, row in candidate_rows if row.get("views") or row.get("view_count")] without_views = [(score, row) for score, row in candidate_rows if not (row.get("views") or row.get("view_count"))] result = [] for _, row in with_views[:row_limit]: result.append(row) # Fill remaining slots with rows without views remaining = row_limit - len(result) if remaining > 0: for _, row in without_views[:remaining]: result.append(row) return result def _extract_rows_from_text_render( markdown: str, source_url: str, output_instructions: str | None, instructions: str | None, row_limit: int, ) -> tuple[list[dict[str, Any]], list[str]]: """Execute fallback extraction code against text-rendered markdown.""" columns = _requested_columns_from_output_instructions(output_instructions) or ["title", "link", "content"] # First try dedicated markdown extraction (better for jina.ai output) markdown_rows = _extract_markdown_link_rows( markdown=markdown, source_url=source_url, output_instructions=output_instructions, instructions=instructions, row_limit=row_limit, ) if _rows_have_signal(markdown_rows): markdown_rows, _ = _enforce_requested_schema(markdown_rows, output_instructions) return markdown_rows[:row_limit], columns # Fallback to HTML-based extraction (for cases where markdown contains HTML) extraction_code = _fallback_extraction_code(output_instructions, instructions) sandbox_globals = { "soup": BeautifulSoup(markdown, "html.parser"), "html": markdown, "url": source_url, "re": re, "urljoin": urljoin, "urlparse": urlparse, "BeautifulSoup": BeautifulSoup, "extracted_data": [], } try: exec(extraction_code, sandbox_globals) extracted_data = sandbox_globals.get("extracted_data", []) except Exception as error: logger.debug("Fallback text-render extraction failed for %s: %s", source_url, error) extracted_data = [] if not isinstance(extracted_data, list): extracted_data = [extracted_data] if extracted_data else [] extracted_data, output_columns = _enforce_requested_schema(extracted_data, output_instructions) extracted_data = extracted_data[:row_limit] return extracted_data, output_columns or columns async def _search_recovery_rows( base_url: str, instructions: str | None, output_instructions: str | None, row_limit: int, ) -> tuple[list[dict[str, Any]], list[str], str | None, float]: """Search-guided generic recovery for low-relevance extraction results. IMPORTANT: Prioritize the user's specified site - try alternative paths on the same domain before resorting to external search engines. """ best_rows: list[dict[str, Any]] = [] best_columns: list[str] = [] best_source: str | None = None best_score = 0.0 # Normalize the base URL normalized = _coerce_url_asset(base_url) or base_url if "://" not in normalized: normalized = f"https://{normalized}" parsed = urlparse(normalized) # FIRST: Try alternative paths on the SAME SITE (stay on user's specified domain) alternative_paths = _infer_navigation_paths(instructions) for alt_path in alternative_paths[:4]: alt_url = f"{parsed.scheme}://{parsed.netloc}{alt_path}" text_payload = _fetch_text_render_markdown(alt_url, timeout_seconds=12) if not text_payload: continue markdown, source_url = text_payload rows, columns = _extract_rows_from_text_render( markdown=markdown, source_url=source_url, output_instructions=output_instructions, instructions=instructions, row_limit=row_limit, ) if not _rows_have_signal(rows): continue score = _rows_relevance_score(rows, instructions) if score > best_score or ( abs(score - best_score) <= 0.0001 and len(rows) > len(best_rows) ): best_rows = rows best_columns = columns best_source = source_url best_score = score # If we found good data on the user's site, return it if best_score > 0.25: return best_rows, best_columns, best_source, best_score # SECOND: Only as last resort, try external search (duckduckgo) queries = _build_recovery_queries(base_url, instructions) for query in queries[:2]: discovered_urls = await _search_urls_with_mcp(query, max_results=5) if not discovered_urls: discovered_urls = _discover_assets_for_query(query) for candidate_url in discovered_urls[:3]: text_payload = _fetch_text_render_markdown(candidate_url, timeout_seconds=12) if not text_payload: continue markdown, source_url = text_payload rows, columns = _extract_rows_from_text_render( markdown=markdown, source_url=source_url, output_instructions=output_instructions, instructions=instructions, row_limit=row_limit, ) if not _rows_have_signal(rows): continue score = _rows_relevance_score(rows, instructions) if score > best_score or ( abs(score - best_score) <= 0.0001 and len(rows) > len(best_rows) ): best_rows = rows best_columns = columns best_source = source_url best_score = score return best_rows, best_columns, best_source, best_score async def _discover_reddit_communities_via_search(limit: int = 25) -> list[dict[str, Any]]: """Discover subreddit URLs via search engine fallback.""" queries = [ "site:reddit.com/r popular communities", "reddit popular subreddits list", "best reddit communities technology", ] excluded = {"popular", "all", "announcements", "new", "top", "best"} seen: set[str] = set() communities: list[dict[str, Any]] = [] for query in queries: urls = await _search_urls_with_mcp(query, max_results=18) for candidate in urls: match = re.search(r"reddit\.com/r/([A-Za-z0-9_]+)/?", candidate, flags=re.IGNORECASE) if not match: continue name = match.group(1) normalized = name.lower() if normalized in excluded or normalized in seen: continue seen.add(normalized) communities.append( { "subreddit": f"r/{name}", "title": f"r/{name}", "subscribers": 0, "active_users": 0, "url": f"https://www.reddit.com/r/{name}/", "description": "Discovered via search fallback", } ) if len(communities) >= limit: return communities return communities def _fallback_reddit_communities_static(limit: int = 25) -> list[dict[str, Any]]: """Provide deterministic Reddit community rows when direct fetch is unavailable.""" names = [ "AskReddit", "funny", "gaming", "worldnews", "todayilearned", "science", "movies", "technology", "pics", "news", "aww", "sports", "Music", "books", "food", "dataisbeautiful", "MachineLearning", "programming", "python", "javascript", "learnprogramming", "wallstreetbets", "explainlikeimfive", "history", "space", ] rows: list[dict[str, Any]] = [] for name in names[:limit]: rows.append( { "subreddit": f"r/{name}", "title": f"r/{name}", "subscribers": 0, "active_users": 0, "url": f"https://www.reddit.com/r/{name}/", "description": "Static fallback community entry", } ) return rows def _fetch_reddit_communities(limit: int = 25) -> tuple[list[dict[str, Any]], str]: """Compatibility helper used by tests and optional monkeypatch overrides.""" return _fallback_reddit_communities_static(limit), "static_fallback" async def _resolve_assets( assets: list[str], enabled_plugins: list[str], ) -> tuple[list[str], list[dict[str, Any]]]: """Resolve user-provided assets into URLs for scraping.""" resolved: list[str] = [] discoveries: list[dict[str, Any]] = [] for asset in assets: candidate = asset.strip() if not candidate: continue normalized_url = _coerce_url_asset(candidate) if normalized_url: if normalized_url not in resolved: resolved.append(normalized_url) continue discovered: list[str] = await _search_urls_with_mcp(candidate, max_results=8) if not discovered: discovered = _discover_assets_for_query(candidate) if discovered: for url in discovered: if url not in resolved: resolved.append(url) discoveries.append({"query": candidate, "resolved_urls": discovered}) else: discoveries.append({"query": candidate, "resolved_urls": []}) return resolved, discoveries def _normalize_month(value: Any) -> str | None: """Normalize date-like values to YYYY-MM.""" if value is None: return None text = str(value).strip() if not text: return None match = re.match(r"^(\d{4})[-/](\d{1,2})", text) if not match: return None year = int(match.group(1)) month = int(match.group(2)) if month < 1 or month > 12: return None return f"{year:04d}-{month:02d}" def _parse_price(value: Any) -> float | None: """Parse a numeric price from text.""" if value is None: return None text = str(value).strip().replace(",", "") try: return float(text) except ValueError: return None def _build_gold_dataset_rows( extracted_data: dict[str, Any], from_month: str = "2016-01", ) -> list[dict[str, Any]]: """Build normalized monthly gold-price rows from extracted source data.""" rows: list[dict[str, Any]] = [] for source_url, payload in extracted_data.items(): if not isinstance(payload, dict): continue data_rows = payload.get("data") if not isinstance(data_rows, list): continue for entry in data_rows: if not isinstance(entry, dict): continue date_value = ( entry.get("Date") or entry.get("date") or entry.get("Month") or entry.get("month") ) price_value = ( entry.get("Price") or entry.get("price") or entry.get("Close") or entry.get("close") or entry.get("Value") or entry.get("value") ) month = _normalize_month(date_value) price = _parse_price(price_value) if not month or price is None: continue if month < from_month: continue rows.append( { "month": month, "gold_price_usd": price, "source_link": source_url, } ) dedup: dict[str, dict[str, Any]] = {} for row in rows: dedup[row["month"]] = row ordered = [dedup[key] for key in sorted(dedup.keys())] return ordered def _should_run_python_sandbox(request: ScrapeRequest, extracted_data: dict[str, Any]) -> bool: """Decide whether sandbox analysis should run for current scrape output.""" if request.python_code: return True if not isinstance(extracted_data, dict) or not extracted_data: return False if isinstance(extracted_data.get("rows"), list) and len(extracted_data.get("rows", [])) > 0: return True for value in extracted_data.values(): if not isinstance(value, dict): continue if isinstance(value.get("data"), list) and len(value.get("data", [])) > 0: return True if isinstance(value.get("tables"), list) and len(value.get("tables", [])) > 0: return True return False async def _store_url_memory( session_id: str, url: str, extracted: dict[str, Any], memory_manager: MemoryManager, ) -> None: """Store URL extraction in memory layers.""" await memory_manager.store( key=f"scrape:{session_id}:url:{url}", value=extracted, memory_type=MemoryType.SHORT_TERM, tags=["scrape", "url"], ) await memory_manager.store( key=f"scrape:{session_id}:lt:{url}", value=json.dumps(extracted, default=str), memory_type=MemoryType.LONG_TERM, metadata={"session_id": session_id, "url": url, "source": "scrape"}, ) async def scrape_url( session: dict[str, Any], session_id: str, url: str, settings: Settings, request: ScrapeRequest, memory_manager: MemoryManager, enabled_plugins: list[str], ) -> AsyncGenerator[dict[str, Any], None]: """Scrape a single URL and yield progress events.""" episode_id = f"{session_id}-{uuid.uuid4().hex[:8]}" try: env = create_environment(episode_id, settings) await env.reset(task_id=f"scrape_{session_id}") step_num = 0 yield _record_step( session, ScrapeStep( step_number=step_num, action="initialize", url=url, status="completed", message=f"Initialized scraping for {url}", timestamp=_now_iso(), ), ) step_num += 1 step_start = time.time() navigate_action = Action( action_type=ActionType.NAVIGATE, parameters={"url": url}, reasoning=f"Navigate to target URL: {url}", ) nav_observation, reward, _, _, _, nav_info = await env.step(navigate_action) nav_result = nav_info.get("action_result", {}) nav_success = bool(nav_result.get("success")) nav_error = nav_result.get("error") bypassed_tls = bool(nav_result.get("tls_verification_bypassed")) navigate_message = f"Navigated to {url}" if bypassed_tls: navigate_message = f"{navigate_message} (TLS verification bypassed after certificate failure)" yield _record_step( session, ScrapeStep( step_number=step_num, action="navigate", url=url, status="completed" if nav_success else "failed", message=navigate_message if nav_success else f"Failed to navigate: {nav_error or 'unknown error'}", reward=reward, duration_ms=(time.time() - step_start) * 1000, timestamp=_now_iso(), ), ) if nav_observation.page_html: source_name = _safe_artifact_name(urlparse(url).netloc or url) _write_session_artifact( session, f"{source_name}_source.txt", nav_observation.page_html, ) elif not nav_success: session["errors"].append(f"{url}: {nav_error or 'navigation failed'}") return extracted: dict[str, Any] = {} total_reward = reward fields_to_extract = _extract_fields_for_complexity(request.complexity) for field_name in fields_to_extract: if step_num >= request.max_steps: break step_num += 1 step_start = time.time() yield _record_step( session, ScrapeStep( step_number=step_num, action="extract", url=url, status="running", message=f"Extracting {field_name}...", timestamp=_now_iso(), ), ) extract_action = Action( action_type=ActionType.EXTRACT_FIELD, parameters={"field_name": field_name}, reasoning=f"Extract {field_name} using: {request.instructions}", ) observation, reward, _, terminated, truncated, _ = await env.step(extract_action) total_reward += reward if observation.extracted_so_far: for extracted_field in observation.extracted_so_far: if extracted_field.field_name == field_name: extracted[field_name] = extracted_field.value break yield _record_step( session, ScrapeStep( step_number=step_num, action="extract", url=url, status="completed", message=f"Extracted {field_name}", reward=reward, extracted_data={field_name: extracted.get(field_name)}, duration_ms=(time.time() - step_start) * 1000, timestamp=_now_iso(), ), ) if terminated or truncated: break except Exception as exc: error_message = f"{url}: {exc}" session["errors"].append(error_message) logger.exception("Error scraping URL", extra={"url": url, "session_id": session_id}) yield { "type": "error", "data": { "url": url, "error": str(exc), "timestamp": _now_iso(), }, } finally: remove_environment(episode_id) def _agentic_live_llm_enabled() -> bool: """Return True when live LLM calls should be used for agentic planning/extraction.""" if os.getenv("SCRAPERL_DISABLE_LIVE_LLM") == "1": return False if os.getenv("PYTEST_CURRENT_TEST"): return False return True def _apply_text_render_proxy(url: str, force: bool = False) -> str: """Optionally route a URL through a text renderer for deterministic extraction.""" normalized = _coerce_url_asset(url) or url if "://" not in normalized: normalized = f"https://{normalized}" if normalized.startswith("https://r.jina.ai/http://") or normalized.startswith("https://r.jina.ai/https://"): return normalized if force: return f"https://r.jina.ai/http://{normalized.split('://', 1)[1]}" return normalized def _infer_navigation_paths(instructions: str | None) -> list[str]: """Infer common navigation paths based on user intent - works generically across sites.""" if not instructions: return ["/"] # Default to homepage instruction_text = instructions.lower() paths: list[str] = [] # Trending/popular intent - common paths across many sites # Include "/" (homepage) because many sites show top content on homepage if any(token in instruction_text for token in ("trending", "popular", "top", "hot", "best")): paths.extend([ "/", # Homepage often shows top/trending content (HN, Reddit, etc.) "/trending", "/popular", "/explore", "/top", "/hot", "/discover", ]) # Latest/new/recent intent if any(token in instruction_text for token in ("latest", "new", "recent", "today")): paths.extend([ "/new", "/latest", "/recent", "/feed/new", ]) # Category-specific paths based on content type mentioned if "music" in instruction_text or "song" in instruction_text: paths.extend(["/feed/trending?bp=4gINGgt5dG1hX2NoYXJ0cw%3D%3D", "/music", "/charts"]) if "video" in instruction_text: paths.extend(["/feed/trending", "/videos"]) if "game" in instruction_text or "gaming" in instruction_text: paths.extend(["/gaming", "/feed/trending?bp=4gIcGhpnYW1pbmdfY29ycHVzX21vc3RfcG9wdWxhcg%3D%3D"]) if "news" in instruction_text: paths.extend(["/news", "/feed/news"]) if "movie" in instruction_text or "film" in instruction_text: paths.extend(["/feed/trending?bp=4gIKGgh0cmFpbGVycw%3D%3D", "/movies"]) # Dedupe while preserving order seen: set[str] = set() unique_paths: list[str] = [] for path in paths: if path not in seen: seen.add(path) unique_paths.append(path) return unique_paths def _build_search_navigation_url(base_url: str, instructions: str | None) -> str | None: """Build a search URL when direct navigation paths don't exist - generic across sites.""" if not instructions: return None parsed = urlparse(base_url) host = (parsed.hostname or "").lower() # Extract search terms from instructions keywords = _instruction_keywords(instructions, max_keywords=6) if not keywords: return None query_text = "+".join(keywords) # Common search URL patterns across sites (generic, not site-specific) search_patterns = [ f"{parsed.scheme}://{parsed.netloc}/search?q={query_text}", f"{parsed.scheme}://{parsed.netloc}/results?search_query={query_text}", f"{parsed.scheme}://{parsed.netloc}/search?query={query_text}", f"{parsed.scheme}://{parsed.netloc}/?s={query_text}", ] return search_patterns[0] if search_patterns else None def _fallback_navigation_url( base_url: str, instructions: str, navigation_plan: dict[str, Any], ) -> str: """Derive a deterministic navigation URL using plan/template hints when LLM is unavailable. Strategy: Prioritize DIRECT SITE ACCESS over search when user specifies a site. 1. Template target URLs (if available) 2. Inferred navigation paths (trending, popular, etc.) 3. Search only for EXPLICIT search intent 4. Return the base URL (trust the site content) """ normalized = _coerce_url_asset(base_url) or base_url if "://" not in normalized: normalized = f"https://{normalized}" parsed = urlparse(normalized) instruction_text = (instructions or "").lower() # 1. Check template target URLs first (hints only) plan_targets = navigation_plan.get("target_urls") or [] valid_targets = [target for target in plan_targets if isinstance(target, str) and _is_url_asset(target)] if valid_targets: ranked_intent = any(token in instruction_text for token in ("trending", "popular", "top", "latest")) if ranked_intent: keyword_target = next( ( target for target in valid_targets if any(token in target.lower() for token in ("trending", "popular", "explore", "discover", "new")) ), None, ) if keyword_target: return _apply_text_render_proxy(keyword_target) search_intent = any(token in instruction_text for token in ("search", "query", "lookup")) if search_intent: search_target = next( (target for target in valid_targets if any(token in target.lower() for token in ("search", "query"))), None, ) if search_target: return _apply_text_render_proxy(search_target) # 2. Try direct navigation paths FIRST (trending, hot, etc.) # These are direct site pages, not search queries inferred_paths = _infer_navigation_paths(instructions) if inferred_paths: best_path = inferred_paths[0] inferred_url = f"{parsed.scheme}://{parsed.netloc}{best_path}" return _apply_text_render_proxy(inferred_url) # 3. Only use site-internal search for EXPLICIT search intents search_intent = any(token in instruction_text for token in ("search for", "find ", "looking for", "search:")) if search_intent: search_url = _build_search_navigation_url(normalized, instructions) if search_url: return _apply_text_render_proxy(search_url) # 4. Return the base URL - trust the site content (homepage often has what user wants) return _apply_text_render_proxy(normalized) def _requested_columns_from_output_instructions(output_instructions: str | None) -> list[str]: """Extract requested output columns from instructions like 'csv of username, repo, stars'.""" if not output_instructions: return [] cleaned = output_instructions.strip() cleaned = re.sub(r"^(?:csv|json|table)\s+of\s+", "", cleaned, flags=re.IGNORECASE) cleaned = cleaned.replace(" and ", ", ") columns: list[str] = [] for piece in cleaned.split(","): candidate = re.sub(r"[^A-Za-z0-9_]+", " ", piece).strip().lower().replace(" ", "_") if candidate and candidate not in columns: columns.append(candidate) return columns def _enforce_requested_schema( rows: list[dict[str, Any]], output_instructions: str | None, ) -> tuple[list[dict[str, Any]], list[str]]: """Project extracted rows onto requested columns from output instructions.""" requested_columns = _requested_columns_from_output_instructions(output_instructions) if not requested_columns: if not rows: return rows, [] inferred = list(rows[0].keys()) return rows, inferred normalized_rows: list[dict[str, Any]] = [] for row in rows: if not isinstance(row, dict): continue normalized_rows.append({column: row.get(column, "") for column in requested_columns}) if not normalized_rows: normalized_rows = [{column: "" for column in requested_columns}] return normalized_rows, requested_columns def _requested_row_limit(instructions: str | None, default_limit: int = 25) -> int: """Extract a requested row limit (e.g., 'top 5') from instructions.""" if not instructions: return default_limit text = instructions.lower() match = re.search(r"\btop\s+(\d{1,3})\b", text) or re.search( r"\b(\d{1,3})\s+(?:rows|items|results|entries|records|repos|frameworks)\b", text, ) if not match: return default_limit value = int(match.group(1)) if value < 1: return default_limit return min(value, 100) def _instruction_keywords(instructions: str | None, max_keywords: int = 8) -> list[str]: """Extract semantic keywords from user instructions for relevance checks.""" if not instructions: return [] tokens = re.findall(r"[a-zA-Z]{3,}", instructions.lower()) stop_words = { "get", "give", "show", "find", "extract", "with", "from", "this", "that", "what", "where", "when", "which", "return", "output", "format", "data", "list", "site", "website", "page", "entries", "results", "items", "records", "details", "about", "across", "into", "only", "please", "the", "and", } keywords: list[str] = [] for token in tokens: if token in stop_words: continue if token not in keywords: keywords.append(token) if len(keywords) >= max_keywords: break return keywords def _rows_have_signal(rows: list[dict[str, Any]]) -> bool: """Return True when extracted rows contain at least one non-empty value.""" for row in rows: if not isinstance(row, dict): continue for value in row.values(): if value is None: continue if isinstance(value, str): if value.strip(): return True elif value: return True return False def _rows_relevance_score(rows: list[dict[str, Any]], instructions: str | None) -> float: """Score row relevance against instruction keywords (0-1).""" if not rows: return 0.0 keywords = _instruction_keywords(instructions, max_keywords=8) if not keywords: return 1.0 row_scores: list[float] = [] for row in rows: if not isinstance(row, dict): continue joined = " ".join( str(value).lower() for value in row.values() if value is not None and str(value).strip() ) if not joined: continue hits = sum(1 for keyword in keywords if keyword in joined) row_scores.append(hits / len(keywords)) if not row_scores: return 0.0 row_scores.sort(reverse=True) top_n = max(1, min(3, len(row_scores))) return sum(row_scores[:top_n]) / top_n def _parse_column_names(output_instructions: str | None) -> list[str]: """Parse column names from output instructions. Examples: "csv of title, points" -> ["title", "points"] "json with heading and description" -> ["heading", "description"] "title, url, views" -> ["title", "url", "views"] """ if not output_instructions: return [] # Remove common prefixes text = output_instructions.lower() for prefix in ["csv of ", "json of ", "json with ", "fields: "]: if text.startswith(prefix): text = text[len(prefix):] break # Split on commas and clean columns = [col.strip() for col in text.split(",")] # Also try splitting on "and" if no commas found if len(columns) == 1 and " and " in columns[0]: columns = [col.strip() for col in columns[0].split(" and ")] return [col for col in columns if col] def _fallback_extraction_code(output_instructions: str | None, instructions: str | None = None) -> str: """Build deterministic extraction code when live LLM code generation is unavailable.""" columns = _requested_columns_from_output_instructions(output_instructions) or [ "title", "url", "content", ] keywords = _instruction_keywords(instructions, max_keywords=8) category_hint = keywords[0].title() if keywords else "" columns_literal = repr(columns) keywords_literal = repr(keywords) category_hint_literal = repr(category_hint) return f""" columns = {columns_literal} keywords = {keywords_literal} category_hint = {category_hint_literal} rows = [] candidate_rows = [] seen = set() anchors = soup.select("a[href]") noise_fragments = [ "javascript is disabled", "please enable javascript", "skip to main content", "press enter to activate", "toggle navigation", "close menu", "open menu", "cookie settings", ] boilerplate_labels = {{ "home", "about", "contact", "contact us", "help", "search", "press", "copyright", "creator", "creators", "advertise", "developers", "terms", "privacy", "policy & safety", "how youtube works", "test new features", "nfl sunday ticket", "sign in", "log in", "sign up", "register", "settings", "report history", "send feedback", "learn more", "more info", }} boilerplate_url_tokens = ( "privacy", "terms", "cookie", "contact", "advertis", "copyright", "policy", "press", "help", "about/", "/t/", "legal", "support", "feedback", "settings", "account", "login", "signin", "signup", "creators/", "howyoutubeworks", ) ranked_intent = bool(re.search(r"\\b(top|trending|popular|latest|today|best)\\b", " ".join(keywords), re.IGNORECASE)) def _extract_metric(text, patterns): for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(1) return "" def _compact(value, limit): return re.sub(r"\\s+", " ", value).strip()[:limit] def _metric_numeric(raw): normalized = str(raw or "").strip().lower().replace(",", "") if not normalized: return 0.0 multiplier = 1.0 if normalized.endswith("k"): multiplier = 1000.0 normalized = normalized[:-1] elif normalized.endswith("m"): multiplier = 1000000.0 normalized = normalized[:-1] try: return float(normalized) * multiplier except ValueError: return 0.0 for anchor in anchors: href = (anchor.get("href") or "").strip() text = anchor.get_text(" ", strip=True) if not href and not text: continue if href.startswith("#") or href.startswith("mailto:") or href.startswith("javascript:"): continue full_href = urljoin(url, href) if not full_href.startswith("http"): continue if full_href.count("/") <= 2: continue parsed_href = urlparse(full_href) path_parts = [part for part in parsed_href.path.split("/") if part] slug_value = path_parts[-1].replace("-", " ").replace("_", " ").strip() if path_parts else "" container = anchor.find_parent(["article", "tr", "li", "div"]) container_text = container.get_text(" ", strip=True) if container else text stars_value = _extract_metric(container_text, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:stars?|star)\\b"]) forks_value = _extract_metric(container_text, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:forks?|fork)\\b"]) views_value = _extract_metric( container_text, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:views?|viewers?|watching|plays?)\\b"], ) likes_value = _extract_metric(container_text, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:likes?|thumbs\\s*up)\\b"]) comments_value = _extract_metric(container_text, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:comments?|replies)\\b"]) date_value = _extract_metric( container_text, [ r"\\b(today|yesterday|\\d+\\s+(?:minutes?|hours?|days?|weeks?|months?|years?)\\s+ago)\\b", r"\\b(\\d{{4}}[-/]\\d{{1,2}}[-/]\\d{{1,2}})\\b", r"\\b(\\d{{1,2}}\\s+[A-Za-z]{{3,9}}\\s+\\d{{4}})\\b", ], ) category_from_url = "" if len(path_parts) >= 3 and path_parts[0].lower() in {"category", "tags", "topic", "topics", "genres", "genre"}: category_from_url = path_parts[1].replace("-", " ").replace("_", " ").strip().title() label = (text or container_text).strip() if not label: continue lowered_label = label.lower() lowered_href = full_href.lower() if any(fragment in lowered_label for fragment in noise_fragments): continue if lowered_label in boilerplate_labels: continue if any(token in lowered_href for token in boilerplate_url_tokens): continue if len(label) > 180 or len(label.split()) > 22: continue if label.lower() in {{ "main page", "home", "about", "contact", "help", "search", "read", "talk", "view source", "view history", "contents", "current events", "special pages", }}: continue score_text = " ".join([label, container_text, full_href]).lower() keyword_score = sum(1 for keyword in keywords if keyword in score_text) has_engagement_metric = any([views_value, likes_value, comments_value, date_value]) if keywords and keyword_score == 0 and not has_engagement_metric: continue content_text = (container_text or label).strip() lowered_content_text = content_text.lower() if ( len(content_text) > 220 or " menu " in lowered_content_text or "dropdown" in lowered_content_text or "press enter to" in lowered_content_text ): content_text = label row = {{}} for column in columns: lower = column.lower() if lower in {{"url", "link", "href"}}: row[column] = full_href elif lower in {{"title", "name", "text"}}: row[column] = _compact(label, 160) elif lower in {{"content", "summary", "description"}}: row[column] = _compact(content_text, 320) elif lower in {{"streamer", "channel", "creator", "username", "user", "owner"}}: row[column] = _compact(slug_value or label, 120) elif lower in {{"repo", "repository", "repo_name"}}: row[column] = path_parts[1] if len(path_parts) >= 2 else _compact(slug_value, 120) elif lower in {{"stars", "star", "star_count"}}: row[column] = stars_value elif lower in {{"forks", "fork", "fork_count"}}: row[column] = forks_value elif lower in {{"views", "view_count", "viewers", "viewer_count", "watchers", "watching"}}: row[column] = views_value elif lower in {{"likes", "like_count"}}: row[column] = likes_value elif lower in {{"comments", "comment_count"}}: row[column] = comments_value elif lower in {{"date", "date_uploaded", "date_uplaoded", "published", "uploaded", "upload_date"}}: row[column] = date_value elif lower in {{"category", "game", "topic"}}: row[column] = category_from_url or category_hint else: row[column] = "" row_key = tuple(row.get(column, "") for column in columns) if row_key in seen: continue seen.add(row_key) if any(value for value in row.values()): quality_score = keyword_score if views_value: quality_score += 2 if likes_value or comments_value: quality_score += 1 candidate_rows.append((quality_score, row)) if not candidate_rows: raw_lines = [line.strip() for line in soup.get_text("\\n").splitlines() if line and line.strip()] for line in raw_lines: if len(line) < 15: continue lowered_line = line.lower() if any(fragment in lowered_line for fragment in noise_fragments): continue if len(line) > 260: continue if lowered_line.startswith(("title:", "url source:", "markdown content:")): continue if re.match(r"^\\*\\s+\\[(all|images|videos|news|maps|shopping)\\]", lowered_line): continue if re.match(r"^\\[[^\\]]+\\]\\(https?://duckduckgo\\.com/", lowered_line): continue if lowered_line in {"privacy", "terms", "advertising", "about duckduckgo"}: continue if lowered_line.startswith("![image"): continue if lowered_line in boilerplate_labels: continue keyword_score = sum(1 for keyword in keywords if keyword in lowered_line) views_value = _extract_metric(line, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:views?|viewers?|watching|plays?)\\b"]) likes_value = _extract_metric(line, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:likes?|thumbs\\s*up)\\b"]) comments_value = _extract_metric(line, [r"([0-9][0-9,\\.kKmM]*)\\s*(?:comments?|replies)\\b"]) date_value = _extract_metric( line, [ r"\\b(today|yesterday|\\d+\\s+(?:minutes?|hours?|days?|weeks?|months?|years?)\\s+ago)\\b", r"\\b(\\d{{4}}[-/]\\d{{1,2}}[-/]\\d{{1,2}})\\b", r"\\b(\\d{{1,2}}\\s+[A-Za-z]{{3,9}}\\s+\\d{{4}})\\b", ], ) markdown_link_match = re.search(r"\\[([^\\]]+)\\]\\((https?://[^\\)]+)\\)", line) plain_link_match = re.search(r"https?://[^\\s\\)]+", line) if markdown_link_match: line_title = markdown_link_match.group(1).strip() line_link = markdown_link_match.group(2).strip() else: line_title = line.strip() line_link = plain_link_match.group(0).strip() if plain_link_match else url if ranked_intent and keywords and keyword_score == 0 and not any([views_value, likes_value, comments_value]): continue row = {{}} for column in columns: lower = column.lower() if lower in {{"url", "link", "href"}}: row[column] = line_link elif lower in {{"title", "name", "text"}}: row[column] = _compact(line_title, 160) elif lower in {{"content", "summary", "description"}}: row[column] = _compact(line, 320) elif lower in {{"streamer", "channel", "creator", "username", "user", "owner"}}: row[column] = _compact(line_title, 120) elif lower in {{"views", "view_count", "viewers", "viewer_count", "watchers", "watching"}}: row[column] = views_value elif lower in {{"likes", "like_count"}}: row[column] = likes_value elif lower in {{"comments", "comment_count"}}: row[column] = comments_value elif lower in {{"date", "date_uploaded", "date_uplaoded", "published", "uploaded", "upload_date"}}: row[column] = date_value elif lower in {{"category", "game", "topic"}}: row[column] = category_hint else: row[column] = "" row_key = tuple(row.get(column, "") for column in columns) if row_key in seen: continue seen.add(row_key) quality_score = max(keyword_score, 1) if views_value: quality_score += 2 if likes_value or comments_value: quality_score += 1 candidate_rows.append((quality_score, row)) if len(candidate_rows) >= 40: break ranking_column = next( ( column for column in columns if column.lower() in {{ "views", "view_count", "viewers", "viewer_count", "watchers", "watching", "likes", "like_count", "comments", "comment_count", "stars", "star_count", "forks", "fork_count", }} ), None, ) if ranking_column: candidate_rows.sort(key=lambda pair: (_metric_numeric(pair[1].get(ranking_column, "")), pair[0]), reverse=True) elif keywords: candidate_rows.sort(key=lambda pair: pair[0], reverse=True) for _, row in candidate_rows: rows.append(row) if len(rows) >= 25: break if not rows: rows = [{{column: "" for column in columns}}] extracted_data = rows """ async def _scrape_with_agentic_llm( session: dict[str, Any], session_id: str, env, request: ScrapeRequest, navigation_plan: dict[str, Any], url: str, step_num: int, total_reward: float, model_router: SmartModelRouter, ) -> AsyncGenerator[dict[str, Any], None]: """Truly agentic scraping using LLM to decide navigation and extraction. This function uses the LLM to: 1. Decide where to navigate based on instructions + template hints 2. Analyze the HTML content 3. Generate extraction code dynamically 4. Format output according to output_instructions Templates serve as reference hints only, not rigid execution scripts. """ # Get template hint if available (for reference only) template_hint = "" if navigation_plan.get("matched_template"): template = navigation_plan["matched_template"] template_hint = f""" SITE TEMPLATE HINT (reference only, not mandatory): - Domain: {template.get('domain', 'N/A')} - Strategies: {', '.join(template.get('strategies', []))} - Suggested output fields: {', '.join(template.get('output_fields', []))} - Typical patterns: {template.get('patterns', 'N/A')} """ # Step 1: Ask LLM to decide navigation strategy step_num += 1 navigation_prompt = f"""You are a web scraping agent. Analyze the user's request and decide where to navigate. USER REQUEST: - Assets: {request.assets} - Target: {url} - Instructions: {request.instructions or 'Extract all relevant data'} - Desired output format: {request.output_format.value} - Output instructions: {request.output_instructions or 'All available data'} {template_hint} TASK: Decide the best URL to navigate to accomplish this task. Consider: - If the user wants trending/popular content, should you go to a trending page? - If the user wants specific data, do you need to navigate to a specific section? - Use site template hints only as references, never as rigid rules. - Return ONLY the URL to navigate to, nothing else. URL:""" live_llm_enabled = _agentic_live_llm_enabled() target_url = _fallback_navigation_url(url, request.instructions, navigation_plan) navigation_mode = "heuristic" if live_llm_enabled: try: nav_response = await asyncio.wait_for( model_router.complete( messages=[{"role": "user", "content": navigation_prompt}], task_type=TaskType.REASONING, model=request.model, ), timeout=12, ) candidate = nav_response.content.strip() if candidate: if not candidate.startswith("http"): if "://" not in url: candidate = f"https://{url}/{candidate.lstrip('/')}" else: parsed = urlparse(url) candidate = f"{parsed.scheme}://{parsed.netloc}/{candidate.lstrip('/')}" target_url = candidate navigation_mode = "llm" except Exception as e: logger.warning("LLM navigation decision failed, using heuristic fallback: %s", e) target_url = _apply_text_render_proxy(target_url) # Tool call: LLM navigation planning yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"llm.plan_navigation() → {target_url}", extracted_data={ "tool_name": "llm.plan_navigation", "tool_description": "LLM decides optimal navigation URL based on instructions", "parameters": {"instructions": request.instructions, "base_url": url}, "result": target_url, "mode": navigation_mode, }, reward=0.15, timestamp=_now_iso(), ), ) total_reward += 0.15 # Validate URL before navigation step_num += 1 is_valid_target = _is_url_asset(target_url) yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"validate.url(url='{target_url}') → {'valid' if is_valid_target else 'invalid'}", extracted_data={ "tool_name": "validate.url", "tool_description": "Validate and normalize navigation URL", "parameters": {"url": target_url}, "result": { "valid": is_valid_target, "normalized_url": _coerce_url_asset(target_url) or target_url, }, }, reward=0.05 if is_valid_target else 0.0, timestamp=_now_iso(), ), ) total_reward += 0.05 if is_valid_target else 0.0 # Step 2: Navigate to the decided URL step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message=f"browser.navigate(url='{target_url}')", extracted_data={ "tool_name": "browser.navigate", "tool_description": "Navigate browser to target URL", "parameters": {"url": target_url, "wait_for": "page_load"}, }, timestamp=_now_iso(), ), ) navigate_action = Action( action_type=ActionType.NAVIGATE, parameters={"url": target_url}, reasoning=f"Navigate to {target_url} based on LLM's decision", ) nav_obs, nav_reward, _, _, _, nav_info = await env.step(navigate_action) total_reward += nav_reward yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"browser.navigate() → Success", extracted_data={ "tool_name": "browser.navigate", "tool_description": "Navigate browser to target URL", "parameters": {"url": target_url}, "result": {"status_code": nav_obs.page_html is not None}, }, reward=nav_reward, timestamp=_now_iso(), ), ) if not nav_obs.page_html: logger.error("Navigation failed - no HTML received") return # Step 3: Parse HTML step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message="html.parse(html=page_content)", extracted_data={ "tool_name": "html.parse", "tool_description": "Parse HTML into DOM structure", "parameters": {"content_length": len(nav_obs.page_html)}, }, timestamp=_now_iso(), ), ) soup = BeautifulSoup(nav_obs.page_html, "html.parser") total_reward += 0.1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message="html.parse() → DOM ready", extracted_data={ "tool_name": "html.parse", "tool_description": "Parse HTML into DOM structure", "result": {"elements_count": len(soup.find_all())}, }, reward=0.1, timestamp=_now_iso(), ), ) # Extract links for tool visibility and fallback processing step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message="extract.urls(html)", extracted_data={ "tool_name": "extract.urls", "tool_description": "Extract hyperlinks from parsed HTML", "parameters": {"scope": "document"}, }, timestamp=_now_iso(), ), ) extracted_links: list[str] = [] for anchor in soup.find_all("a", href=True): href = str(anchor.get("href", "")).strip() if not href: continue if href.startswith("/"): href = f"{target_url.rstrip('/')}{href}" if href not in extracted_links: extracted_links.append(href) if len(extracted_links) >= 200: break yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"extract.urls() → {len(extracted_links)} links", extracted_data={ "tool_name": "extract.urls", "result": {"count": len(extracted_links), "sample": extracted_links[:5]}, }, reward=0.05, timestamp=_now_iso(), ), ) total_reward += 0.05 # Extract emails for tool visibility and fallback processing step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message="extract.emails(html)", extracted_data={ "tool_name": "extract.emails", "tool_description": "Extract email addresses from page content", "parameters": {"pattern": "email regex"}, }, timestamp=_now_iso(), ), ) extracted_emails = sorted(set(re.findall(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", nav_obs.page_html))) yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"extract.emails() → {len(extracted_emails)} emails", extracted_data={ "tool_name": "extract.emails", "result": {"count": len(extracted_emails), "sample": extracted_emails[:5]}, }, reward=0.05, timestamp=_now_iso(), ), ) total_reward += 0.05 # Extract quick structural fields step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message="html.extract(fields=['title','content','links'])", extracted_data={ "tool_name": "html.extract", "tool_description": "Extract key structural fields for downstream processing", "parameters": {"fields": ["title", "content", "links"]}, }, timestamp=_now_iso(), ), ) page_title = soup.title.get_text(strip=True) if soup.title else "" page_content = soup.get_text(" ", strip=True) quick_extract = { "title": page_title, "content": page_content[:2000], "links": extracted_links[:100], } yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message="html.extract() → fields ready", extracted_data={ "tool_name": "html.extract", "result": { "title_length": len(page_title), "content_length": len(quick_extract["content"]), "link_count": len(quick_extract["links"]), }, }, reward=0.05, timestamp=_now_iso(), ), ) total_reward += 0.05 # Step 4: Ask LLM to generate extraction code step_num += 1 # Get a larger sample of the HTML for LLM analysis (first 15000 chars to include content) html_sample = nav_obs.page_html[:15000] # === AGENT TOOL CALLING: runtime-selected, registry-backed === agent_tool_calls = [] tool_call_results = [] tool_observations = "" if live_llm_enabled: try: from app.agents.tool_caller import AgentToolCaller, ToolExecutor, summarize_tool_results tool_caller = AgentToolCaller(model_router) executor = ToolExecutor() agent_tool_calls = await tool_caller.decide_tools( task_description=( f"Extract {request.output_instructions or 'data'} from page content. " f"User instructions: {request.instructions}" ), context={ "url": target_url, "html_length": len(nav_obs.page_html), "instructions": request.instructions, "output_format": request.output_format.value, "tools_used": [], }, model=request.model, max_tools=6, ) if agent_tool_calls: tool_decision_step = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="agent_decision", status="completed", message=f"Agent selected {len(agent_tool_calls)} runtime tools", reward=0.1, extracted_data={ "tool_calls": [ { "tool": tool_call.tool_name, "params": tool_call.parameters, "reasoning": tool_call.reasoning, } for tool_call in agent_tool_calls ], }, timestamp=_now_iso(), ), ) yield tool_decision_step tool_context = { "soup": BeautifulSoup(nav_obs.page_html, "html.parser"), "html": nav_obs.page_html, "url": target_url, "instructions": request.instructions or "", } for tool_call in agent_tool_calls: result = await executor.execute_tool_call(tool_call, tool_context) tool_call_results.append(result) if result.success and isinstance(result.result, dict): for context_key in ("rows", "text", "data"): if context_key in result.result: tool_context[context_key] = result.result[context_key] tool_exec_step = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="tool_call", status="completed" if result.success else "failed", message=f"Tool {result.tool_name}: {'ok' if result.success else 'failed'}", reward=0.05 if result.success else -0.02, extracted_data={ "tool": result.tool_name, "success": result.success, "result_preview": str(result.result)[:200] if result.result is not None else None, "error": result.error, "duration_ms": result.duration_ms, }, timestamp=_now_iso(), ), ) yield tool_exec_step if tool_call_results: tool_observations = summarize_tool_results(tool_call_results) except Exception as e: logger.warning("Agent tool calling failed: %s", e) extraction_prompt = f"""You are a web scraping expert. Generate Python code to extract data from HTML. USER REQUEST: - Assets: {request.assets} - Instructions: {request.instructions or 'Extract all relevant data'} - Output format: {request.output_format.value} - Output instructions: {request.output_instructions or 'All available data'} HTML SAMPLE (first 15000 chars): ```html {html_sample} ``` {template_hint} AGENT TOOL OBSERVATIONS (runtime execution, not hardcoded): {tool_observations or "No additional tool observations collected."} TASK: Generate Python code using BeautifulSoup to extract the requested data. REQUIREMENTS: 1. The `soup` variable is already provided as a BeautifulSoup object 2. Extract data matching the user's output_instructions: "{request.output_instructions}" 3. Return `extracted_data` as a list of dictionaries 4. Column names MUST exactly match: {_parse_column_names(request.output_instructions) if request.output_instructions else []} 5. Handle missing data gracefully (use empty string "" for missing fields) 6. Extract ACTUAL text content from HTML elements, not empty strings 7. Look for the most relevant elements containing the requested data 8. If data appears in different formats (e.g., "123 points" or "123"), extract just the number 9. Do not include extra columns that were not requested EXAMPLE OUTPUT FORMAT: extracted_data = [ {{"username": "google", "repo": "tensorflow", "stars": "12345", "forks": "6789"}}, {{"username": "microsoft", "repo": "vscode", "stars": "11111", "forks": "2222"}}, ] Return ONLY executable Python code, no explanations or markdown:""" extraction_code = _fallback_extraction_code( request.output_instructions, request.instructions, ) codegen_mode = "heuristic" if live_llm_enabled: try: code_response = await asyncio.wait_for( model_router.complete( messages=[{"role": "user", "content": extraction_prompt}], task_type=TaskType.CODE, model=request.model, temperature=0.3, ), timeout=12, ) candidate_code = code_response.content.strip() if "```python" in candidate_code: candidate_code = candidate_code.split("```python")[1].split("```")[0].strip() elif "```" in candidate_code: candidate_code = candidate_code.split("```")[1].split("```")[0].strip() if candidate_code: extraction_code = candidate_code codegen_mode = "llm" except Exception as e: logger.warning("LLM code generation failed, using heuristic extraction code: %s", e) yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"{'llm' if codegen_mode == 'llm' else 'agent.fallback'}.generate_extraction_code() → {len(extraction_code)} chars", extracted_data={ "tool_name": "llm.generate_extraction_code", "tool_description": "Generate extraction code from page context and requested output schema", "parameters": { "html_sample_length": len(html_sample), "instructions": request.instructions, "output_format": request.output_format.value, }, "result": {"code_length": len(extraction_code), "mode": codegen_mode}, }, reward=0.2 if codegen_mode == "llm" else 0.05, timestamp=_now_iso(), ), ) total_reward += 0.2 if codegen_mode == "llm" else 0.05 # Step 5: Execute generated code in sandbox step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message="sandbox.execute(code=llm_generated_code)", extracted_data={ "tool_name": "sandbox.execute", "tool_description": "Execute LLM-generated extraction code in sandboxed Python environment", "parameters": {"code_length": len(extraction_code), "timeout": 30}, }, timestamp=_now_iso(), ), ) # Prepare execution context sandbox_globals = { "soup": soup, "html": nav_obs.page_html, "url": target_url, "re": re, "urljoin": urljoin, "urlparse": urlparse, "BeautifulSoup": BeautifulSoup, "extracted_data": [], # LLM code should populate this } output_columns: list[str] = [] execution_mode = codegen_mode try: # Execute the LLM-generated code exec(extraction_code, sandbox_globals) extracted_data = sandbox_globals.get("extracted_data", []) if not isinstance(extracted_data, list): extracted_data = [extracted_data] if extracted_data else [] extracted_data, output_columns = _enforce_requested_schema( extracted_data, request.output_instructions, ) requested_limit = _requested_row_limit(request.instructions, default_limit=25) extracted_data = extracted_data[:requested_limit] relevance_score = _rows_relevance_score(extracted_data, request.instructions) if not _rows_have_signal(extracted_data): if codegen_mode == "llm": try: heuristic_code = _fallback_extraction_code( request.output_instructions, request.instructions, ) heuristic_globals = { **sandbox_globals, "extracted_data": [], } exec(heuristic_code, heuristic_globals) heuristic_data = heuristic_globals.get("extracted_data", []) if not isinstance(heuristic_data, list): heuristic_data = [heuristic_data] if heuristic_data else [] heuristic_data, heuristic_columns = _enforce_requested_schema( heuristic_data, request.output_instructions, ) heuristic_data = heuristic_data[:requested_limit] if _rows_have_signal(heuristic_data): extracted_data = heuristic_data output_columns = heuristic_columns or output_columns execution_mode = "llm_with_heuristic_recovery" except Exception as recovery_error: logger.warning("Heuristic recovery after empty LLM extraction failed: %s", recovery_error) if not _rows_have_signal(extracted_data): text_render_payload = _fetch_text_render_markdown(target_url, timeout_seconds=12) if text_render_payload: text_markdown, text_render_url = text_render_payload try: text_data, text_columns = _extract_rows_from_text_render( markdown=text_markdown, source_url=text_render_url, output_instructions=request.output_instructions, instructions=request.instructions, row_limit=requested_limit, ) if _rows_have_signal(text_data): extracted_data = text_data output_columns = text_columns or output_columns execution_mode = "text_render_recovery" target_url = text_render_url except Exception as text_recovery_error: logger.warning("Text-render recovery after empty extraction failed: %s", text_recovery_error) relevance_score = _rows_relevance_score(extracted_data, request.instructions) recovery_keywords = _instruction_keywords(request.instructions, max_keywords=8) # Only attempt recovery if we have NO useful signal from the user's specified site # If we have data with signal, trust the user's site - don't go to external search if not _rows_have_signal(extracted_data) and recovery_keywords: step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message="agent.recover_relevance(query)", extracted_data={ "tool_name": "agent.recover_relevance", "tool_description": "Search-guided relevance recovery for empty extraction output", "parameters": { "keywords": recovery_keywords, "baseline_relevance": round(relevance_score, 3), }, }, timestamp=_now_iso(), ), ) recovered_rows, recovered_columns, recovered_source, recovered_score = await _search_recovery_rows( base_url=url, instructions=request.instructions, output_instructions=request.output_instructions, row_limit=requested_limit, ) # Only use recovery data if it's significantly better AND provides signal improved = _rows_have_signal(recovered_rows) and recovered_score > 0.3 and len(recovered_rows) >= 3 if improved: extracted_data = recovered_rows output_columns = recovered_columns or output_columns target_url = recovered_source or target_url execution_mode = "search_recovery" relevance_score = recovered_score yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=( f"agent.recover_relevance() → {'improved' if improved else 'no_change'} " f"({relevance_score:.2f})" ), extracted_data={ "tool_name": "agent.recover_relevance", "result": { "improved": improved, "relevance": round(relevance_score, 3), "recovered_rows": len(recovered_rows), "source": recovered_source, }, }, reward=0.1 if improved else 0.0, timestamp=_now_iso(), ), ) if improved: total_reward += 0.1 has_signal = _rows_have_signal(extracted_data) exec_reward = 0.5 if has_signal else 0.1 total_reward += exec_reward yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"sandbox.execute() → Extracted {len(extracted_data)} items", extracted_data={ "tool_name": "sandbox.execute", "tool_description": "Execute extraction code in sandbox", "result": { "items_extracted": len(extracted_data), "has_signal": has_signal, "relevance_score": round(relevance_score, 3), "mode": execution_mode, "columns": output_columns, "sample": extracted_data[:2] if extracted_data else [], }, }, reward=exec_reward, timestamp=_now_iso(), ), ) except Exception as e: logger.error(f"Extraction code execution failed: {e}") logger.error(f"Generated code was:\n{extraction_code}") # Fallback: basic extraction extracted_data = [{ "url": target_url, "title": soup.find("title").get_text() if soup.find("title") else "", "error": f"Extraction failed: {str(e)}", }] extracted_data, output_columns = _enforce_requested_schema( extracted_data, request.output_instructions, ) requested_limit = _requested_row_limit(request.instructions, default_limit=25) extracted_data = extracted_data[:requested_limit] total_reward += 0.05 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"sandbox.execute() → Failed: {str(e)[:100]}", extracted_data={ "tool_name": "sandbox.execute", "tool_description": "Execute extraction code (failed)", "result": {"error": str(e)}, }, reward=0.05, timestamp=_now_iso(), ), ) # Step 6: Format output according to requested format step_num += 1 if request.output_format == OutputFormat.CSV: tool_name = "csv.generate" tool_desc = "Generate CSV output from extracted data" elif request.output_format == OutputFormat.JSON: tool_name = "json.dumps" tool_desc = "Format extracted data as JSON" else: tool_name = "data.format" tool_desc = "Format extracted data" yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="running", message=f"{tool_name}(data=extracted_items)", extracted_data={ "tool_name": tool_name, "tool_description": tool_desc, "parameters": {"item_count": len(extracted_data)}, }, timestamp=_now_iso(), ), ) # Store extracted data in session if request.output_format == OutputFormat.CSV and extracted_data: existing_rows: list[dict[str, Any]] = [] existing_sources: list[str] = [] existing_payload = session.get("extracted_data") if isinstance(existing_payload, dict): if isinstance(existing_payload.get("rows"), list): existing_rows = [row for row in existing_payload["rows"] if isinstance(row, dict)] if isinstance(existing_payload.get("sources"), list): existing_sources = [str(value) for value in existing_payload["sources"]] merged_rows = [*existing_rows, *extracted_data] fieldnames = output_columns or list(extracted_data[0].keys()) deduped_rows: list[dict[str, Any]] = [] seen_keys: set[tuple[str, ...]] = set() for row in merged_rows: normalized_row = {field: str(row.get(field, "")) for field in fieldnames} row_key = tuple(normalized_row[field] for field in fieldnames) if row_key in seen_keys: continue seen_keys.add(row_key) deduped_rows.append(normalized_row) requested_limit = _requested_row_limit(request.instructions, default_limit=25) deduped_rows = deduped_rows[:requested_limit] output_buffer = io.StringIO() writer = csv.DictWriter(output_buffer, fieldnames=fieldnames) writer.writeheader() writer.writerows(deduped_rows) merged_sources = [*existing_sources] if target_url not in merged_sources: merged_sources.append(target_url) session["extracted_data"] = { "csv_output": output_buffer.getvalue(), "rows": deduped_rows, "columns": fieldnames, "row_count": len(deduped_rows), "sources": merged_sources, } else: current_payload = session.get("extracted_data") merged_payload: dict[str, Any] = {} if isinstance(current_payload, dict) and "csv_output" not in current_payload: merged_payload.update(current_payload) merged_payload[target_url] = extracted_data session["extracted_data"] = merged_payload total_reward += 0.1 yield _record_step( session, ScrapeStep( step_number=step_num, action="tool_call", url=target_url, status="complete", message=f"{tool_name}() → Output ready", extracted_data={ "tool_name": tool_name, "tool_description": tool_desc, "result": {"format": request.output_format.value, "size": len(extracted_data)}, }, reward=0.1, timestamp=_now_iso(), ), ) # Final completion step_num += 1 yield _record_step( session, ScrapeStep( step_number=step_num, action="complete", url=target_url, status="complete", message=f"Agentic scraping complete: {len(extracted_data)} items extracted", extracted_data={"item_count": len(extracted_data)}, reward=total_reward, timestamp=_now_iso(), ), ) async def scrape_url_intelligently( session: dict[str, Any], session_id: str, url: str, settings: Settings, request: ScrapeRequest, memory_manager: MemoryManager, enabled_plugins: list[str], navigation_plan: dict[str, Any], ) -> AsyncGenerator[dict[str, Any], None]: """Intelligent scraping using agentic LLM-driven approach. This function uses LLM to make ALL decisions: - Navigation: Where to go based on instructions - Extraction: What data to extract and how - Formatting: How to present the results Templates serve as reference hints only, NOT rigid scripts. """ episode_id = f"{session_id}-{uuid.uuid4().hex[:8]}" try: env = create_environment(episode_id, settings) await env.reset(task_id=f"scrape_{session_id}") # Get model router model_router = get_model_router() if not model_router: logger.error("Model router not available") session["errors"].append("Model router not initialized") return step_num = 0 total_reward = 0.0 # ALWAYS use agentic approach - no hardcoded strategies async for event in _scrape_with_agentic_llm( session, session_id, env, request, navigation_plan, url, step_num, total_reward, model_router, ): yield event except Exception as exc: logger.error(f"Intelligent scraping failed for {url}: {exc}") session["errors"].append(f"Scraping failed: {exc}") async def scrape_stream( session_id: str, request: ScrapeRequest, settings: Settings, memory_manager: MemoryManager, ) -> AsyncGenerator[str, None]: """Stream scraping progress as SSE events and websocket broadcasts.""" enabled_plugins, missing_plugins = _resolve_enabled_plugins(request.enable_plugins) session = create_session(session_id, request, enabled_plugins) python_plugin_ids = { "mcp-python-sandbox", "proc-python", "proc-pandas", "proc-numpy", "proc-bs4", } if missing_plugins: session["errors"].append(f"Unavailable plugins ignored: {', '.join(missing_plugins)}") manager = get_connection_manager() start_time = time.time() init_event = {"type": "init", "session_id": session_id} await manager.broadcast(init_event, session_id) yield _sse_event(init_event) # Create intelligent navigation plan based on instructions navigation_plan = _create_intelligent_navigation_plan(request.instructions, request.assets) plugin_event = _record_step( session, ScrapeStep( step_number=0, action="plugins", status="completed", message=( f"Enabled plugins: {enabled_plugins}" if enabled_plugins else "No plugins enabled" ), reward=0.1 if enabled_plugins else 0.0, # Small reward for plugin setup extracted_data={ "requested": request.enable_plugins, "enabled": enabled_plugins, "missing": missing_plugins, "navigation_strategy": navigation_plan["strategy"], "extraction_goal": navigation_plan["extraction_goal"], "site_template_id": navigation_plan.get("site_template_id"), "site_template_name": navigation_plan.get("site_template_name"), "site_template_domains": navigation_plan.get("site_template_domains", []), }, timestamp=_now_iso(), ), ) await manager.broadcast(plugin_event, session_id) yield _sse_event(plugin_event) resolved_assets, discoveries = await _resolve_assets(request.assets, enabled_plugins) if not resolved_assets: resolved_assets = request.assets session["resolved_assets"] = resolved_assets if discoveries: discovery_event = _record_step( session, ScrapeStep( step_number=1, action="mcp_search", status="completed", message="Resolved non-URL assets using search/discovery plugin logic", reward=0.2, # Reward for successful discovery extracted_data={"discoveries": discoveries, "resolved_assets": resolved_assets}, timestamp=_now_iso(), ), ) await manager.broadcast(discovery_event, session_id) yield _sse_event(discovery_event) planner_site_template = match_site_template(request.instructions, resolved_assets) planner_template_payload = ( serialize_site_template(planner_site_template) if planner_site_template else None ) if request.enable_memory: try: await memory_manager.store( key=f"scrape:{session_id}:request", value={ "assets": request.assets, "resolved_assets": resolved_assets, "instructions": request.instructions, "output_instructions": request.output_instructions, "complexity": request.complexity.value, }, memory_type=MemoryType.SHORT_TERM, tags=["scrape", "request"], ) _write_session_json_artifact( session, "memory_request.json", { "assets": request.assets, "resolved_assets": resolved_assets, "instructions": request.instructions, "output_instructions": request.output_instructions, "selected_agents": request.selected_agents, "enabled_plugins": enabled_plugins, }, ) except Exception as exc: message = f"Failed to store request memory: {exc}" session["errors"].append(message) memory_error = {"type": "error", "data": {"url": None, "error": message, "timestamp": _now_iso()}} await manager.broadcast(memory_error, session_id) yield _sse_event(memory_error) planner_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="planner", status="completed", message=f"Planner created execution plan for {len(resolved_assets)} assets", reward=0.15, # Reward for planning extracted_data={ "assets": resolved_assets, "instructions": request.instructions, "output_instructions": request.output_instructions, "site_template": planner_template_payload, }, timestamp=_now_iso(), ), ) await manager.broadcast(planner_event, session_id) yield _sse_event(planner_event) if any(plugin_id in enabled_plugins for plugin_id in python_plugin_ids): planner_payload = { "phase": "planner", "instructions": request.instructions, "output_instructions": request.output_instructions, "resolved_assets": resolved_assets, "selected_agents": request.selected_agents, "site_template": planner_template_payload, } planner_code = ( "result = {" "'phase': payload.get('phase'), " "'asset_count': len(payload.get('resolved_assets') or []), " "'selected_agents': payload.get('selected_agents') or [], " "'site_template_id': (payload.get('site_template') or {}).get('site_id'), " "'site_strategy': (payload.get('site_template') or {}).get('default_strategy')" "}" ) # Tool call: sandbox.execute (planner) sandbox_tool_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="tool_call", status="running", message="sandbox.execute(code='planner_analysis')", extracted_data={ "tool_name": "sandbox.execute", "tool_description": "Execute Python code in isolated sandbox environment", "parameters": { "code_type": "planner_analysis", "imports": ["json"], "payload_keys": list(planner_payload.keys()), }, }, timestamp=_now_iso(), ), ) await manager.broadcast(sandbox_tool_event, session_id) yield _sse_event(sandbox_tool_event) try: planner_sandbox = await asyncio.to_thread( execute_python_sandbox, planner_code, planner_payload, session_id=session_id, timeout_seconds=15, ) except Exception as exc: planner_sandbox = SandboxExecutionResult( success=False, output=None, error=f"Planner sandbox setup failed: {exc}", ) # Tool call result sandbox_result_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="tool_call", status="completed" if planner_sandbox.success else "failed", message=f"sandbox.execute() → {'success' if planner_sandbox.success else 'failed'}", reward=0.05 if planner_sandbox.success else 0.0, extracted_data={ "tool_name": "sandbox.execute", "result": { "success": planner_sandbox.success, "output_keys": list(planner_sandbox.output.keys()) if planner_sandbox.output else [], "error": planner_sandbox.error, }, }, timestamp=_now_iso(), ), ) await manager.broadcast(sandbox_result_event, session_id) yield _sse_event(sandbox_result_event) if planner_sandbox.success and planner_sandbox.output is not None: planner_python_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="planner_python", status="completed", message="Planner agent executed sandbox Python code", reward=0.1, # Reward for sandbox execution extracted_data=planner_sandbox.output, timestamp=_now_iso(), ), ) await manager.broadcast(planner_python_event, session_id) yield _sse_event(planner_python_event) else: session["errors"].append(planner_sandbox.error or "Planner sandbox execution failed") # Tool call: url.parse (validate and parse URLs) url_parse_event = _create_tool_call_step( session, "url.parse", "Parse and validate target URLs", {"urls": resolved_assets, "count": len(resolved_assets)}, status="running", ) await manager.broadcast(url_parse_event, session_id) yield _sse_event(url_parse_event) parsed_urls = [] for url in resolved_assets: parsed = urlparse(url) parsed_urls.append({ "url": url, "scheme": parsed.scheme, "domain": parsed.netloc, "path": parsed.path, }) url_parse_result = _create_tool_call_step( session, "url.parse", "Parse and validate target URLs", {"urls": resolved_assets}, status="completed", result={"parsed": len(parsed_urls), "domains": list(set(p["domain"] for p in parsed_urls))}, reward=0.05, ) await manager.broadcast(url_parse_result, session_id) yield _sse_event(url_parse_result) for idx, url in enumerate(resolved_assets): session["current_url_index"] = idx url_navigation_plan = _create_intelligent_navigation_plan(request.instructions, [url]) url_site_template = match_site_template(request.instructions, [url]) url_template_payload = serialize_site_template(url_site_template) if url_site_template else None if url_template_payload: site_template_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="site_template", url=url, status="completed", message=f"Navigator loaded site template: {url_template_payload['name']}", reward=0.05, extracted_data={ "site_id": url_template_payload["site_id"], "strategy": url_navigation_plan["strategy"], "domains": url_template_payload["domains"], }, timestamp=_now_iso(), ), ) await manager.broadcast(site_template_event, session_id) yield _sse_event(site_template_event) navigator_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="navigator", url=url, status="running", message=( f"Navigator selected source {idx + 1}/{len(resolved_assets)} " f"({url_navigation_plan['strategy']})" ), reward=0.05, # Small reward for navigator selection extracted_data={ "site_template_id": url_navigation_plan.get("site_template_id"), "site_template_name": url_navigation_plan.get("site_template_name"), }, timestamp=_now_iso(), ), ) await manager.broadcast(navigator_event, session_id) yield _sse_event(navigator_event) if any(plugin_id in enabled_plugins for plugin_id in python_plugin_ids): navigator_payload = { "phase": "navigator", "url": url, "index": idx, "total": len(resolved_assets), "site_template": url_template_payload, "navigation_strategy": url_navigation_plan["strategy"], } navigator_code = ( "result = {" "'phase': payload.get('phase'), " "'selected_url': payload.get('url'), " "'progress': f\"{payload.get('index', 0) + 1}/{payload.get('total', 0)}\", " "'site_template_id': (payload.get('site_template') or {}).get('site_id'), " "'strategy': payload.get('navigation_strategy')" "}" ) # Tool call: sandbox.execute (navigator) nav_sandbox_tool_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="tool_call", url=url, status="running", message="sandbox.execute(code='navigator_analysis')", extracted_data={ "tool_name": "sandbox.execute", "tool_description": "Execute navigator analysis in sandbox", "parameters": { "code_type": "navigator_analysis", "imports": ["json"], "url": url, }, }, timestamp=_now_iso(), ), ) await manager.broadcast(nav_sandbox_tool_event, session_id) yield _sse_event(nav_sandbox_tool_event) try: navigator_sandbox = await asyncio.to_thread( execute_python_sandbox, navigator_code, navigator_payload, session_id=session_id, timeout_seconds=15, ) except Exception as exc: navigator_sandbox = SandboxExecutionResult( success=False, output=None, error=f"Navigator sandbox setup failed: {exc}", ) # Tool call result nav_sandbox_result_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="tool_call", url=url, status="completed" if navigator_sandbox.success else "failed", message=f"sandbox.execute() → {'success' if navigator_sandbox.success else 'failed'}", reward=0.05 if navigator_sandbox.success else 0.0, extracted_data={ "tool_name": "sandbox.execute", "result": { "success": navigator_sandbox.success, "output_keys": list(navigator_sandbox.output.keys()) if navigator_sandbox.output else [], }, }, timestamp=_now_iso(), ), ) await manager.broadcast(nav_sandbox_result_event, session_id) yield _sse_event(nav_sandbox_result_event) if navigator_sandbox.success and navigator_sandbox.output is not None: navigator_python_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="navigator_python", url=url, status="completed", message="Navigator agent executed sandbox Python code", reward=0.1, # Reward for sandbox navigation extracted_data=navigator_sandbox.output, timestamp=_now_iso(), ), ) await manager.broadcast(navigator_python_event, session_id) yield _sse_event(navigator_python_event) else: session["errors"].append(navigator_sandbox.error or "Navigator sandbox execution failed") url_start_event = {"type": "url_start", "url": url, "index": idx, "total": len(resolved_assets)} await manager.broadcast(url_start_event, session_id) yield _sse_event(url_start_event) async for update in scrape_url_intelligently( session, session_id, url, settings, request, memory_manager, enabled_plugins, url_navigation_plan, ): await manager.broadcast(update, session_id) yield _sse_event(update) url_done_event = {"type": "url_complete", "url": url, "index": idx} await manager.broadcast(url_done_event, session_id) yield _sse_event(url_done_event) instruction_text = f"{request.instructions} {request.output_instructions} {' '.join(request.assets)}".lower() if "gold" in instruction_text and ("price" in instruction_text or "trend" in instruction_text): gold_rows = _build_gold_dataset_rows(session["extracted_data"], from_month="2016-01") if gold_rows: source_links = sorted({row["source_link"] for row in gold_rows}) session["extracted_data"] = { "dataset_name": "gold_prices_monthly", "description": "Monthly gold prices in USD from 2016 onward", "columns": ["month", "gold_price_usd", "source_link"], "rows": gold_rows, "row_count": len(gold_rows), "from_month": "2016-01", "to_month": gold_rows[-1]["month"], "source_links": source_links, } quality_status = "completed" if len(gold_rows) >= 100 else "partial" quality_message = ( f"Verifier assembled monthly gold dataset with {len(gold_rows)} rows" if quality_status == "completed" else f"Verifier assembled only {len(gold_rows)} rows; expected >= 100" ) quality_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="verifier", status=quality_status, message=quality_message, extracted_data={ "row_count": len(gold_rows), "sources": source_links, }, timestamp=_now_iso(), ), ) await manager.broadcast(quality_event, session_id) yield _sse_event(quality_event) else: quality_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="verifier", status="partial", message="Verifier could not assemble monthly gold rows from resolved sources", extracted_data={"row_count": 0, "sources": []}, timestamp=_now_iso(), ), ) await manager.broadcast(quality_event, session_id) yield _sse_event(quality_event) if ( any(plugin_id in enabled_plugins for plugin_id in python_plugin_ids) and _should_run_python_sandbox(request, session["extracted_data"]) ): extracted_payload = session["extracted_data"] dataset_rows: list[dict[str, Any]] = [] source_links: list[str] = [] html_samples: dict[str, str] = {} if isinstance(extracted_payload, dict): if isinstance(extracted_payload.get("rows"), list): dataset_rows = [ row for row in extracted_payload.get("rows", []) if isinstance(row, dict) ] if isinstance(extracted_payload.get("source_links"), list): source_links = [str(link) for link in extracted_payload.get("source_links", [])] for source, payload in extracted_payload.items(): if isinstance(payload, dict) and isinstance(payload.get("content"), str): html_samples[str(source)] = payload.get("content", "") # Tool call: extract.urls (find URLs in content) if html_samples: extract_urls_event = _create_tool_call_step( session, "extract.urls", "Extract URLs from HTML content", {"sources": len(html_samples), "total_bytes": sum(len(h) for h in html_samples.values())}, status="running", ) await manager.broadcast(extract_urls_event, session_id) yield _sse_event(extract_urls_event) all_urls = [] for html in html_samples.values(): all_urls.extend(re.findall(r'href=["\']([^"\']+)["\']', html[:50000])) # Limit search extract_urls_result = _create_tool_call_step( session, "extract.urls", "Extract URLs from HTML content", {"sources": len(html_samples)}, status="completed", result={"urls_found": len(all_urls), "unique": len(set(all_urls))}, reward=0.05, ) await manager.broadcast(extract_urls_result, session_id) yield _sse_event(extract_urls_result) # Tool call: extract.emails (find emails in content) extract_emails_event = _create_tool_call_step( session, "extract.emails", "Extract email addresses from HTML content", {"sources": len(html_samples)}, status="running", ) await manager.broadcast(extract_emails_event, session_id) yield _sse_event(extract_emails_event) all_emails = [] email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' for html in html_samples.values(): all_emails.extend(re.findall(email_pattern, html[:50000])) extract_emails_result = _create_tool_call_step( session, "extract.emails", "Extract email addresses from HTML content", {"sources": len(html_samples)}, status="completed", result={"emails_found": len(all_emails), "unique": len(set(all_emails))}, reward=0.02, ) await manager.broadcast(extract_emails_result, session_id) yield _sse_event(extract_emails_result) analysis_payload = { "instructions": request.instructions, "output_instructions": request.output_instructions, "dataset_rows": dataset_rows, "source_links": source_links, "html_samples": html_samples, "extracted_data": extracted_payload, } sandbox_code = request.python_code or DEFAULT_ANALYSIS_CODE # Tool call: pandas.DataFrame (data analysis) pandas_tool_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="tool_call", status="running", message="pandas.DataFrame(rows)", extracted_data={ "tool_name": "pandas.DataFrame", "tool_description": "Create DataFrame from extracted dataset rows", "parameters": { "row_count": len(dataset_rows), "source_count": len(source_links), }, }, timestamp=_now_iso(), ), ) await manager.broadcast(pandas_tool_event, session_id) yield _sse_event(pandas_tool_event) # Tool call: bs4.BeautifulSoup (HTML analysis) if html_samples: bs4_tool_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="tool_call", status="running", message=f"bs4.BeautifulSoup(html, 'html.parser') × {len(html_samples)}", extracted_data={ "tool_name": "bs4.BeautifulSoup", "tool_description": "Parse HTML samples for link analysis", "parameters": { "parser": "html.parser", "sample_count": len(html_samples), "total_bytes": sum(len(h) for h in html_samples.values()), }, }, timestamp=_now_iso(), ), ) await manager.broadcast(bs4_tool_event, session_id) yield _sse_event(bs4_tool_event) # Tool call: sandbox.execute (analysis) analysis_sandbox_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="tool_call", status="running", message="sandbox.execute(code='data_analysis')", extracted_data={ "tool_name": "sandbox.execute", "tool_description": "Run comprehensive data analysis in sandbox", "parameters": { "imports": ["pandas", "numpy", "bs4", "json"], "dataset_rows": len(dataset_rows), "html_samples": len(html_samples), "custom_code": bool(request.python_code), }, }, timestamp=_now_iso(), ), ) await manager.broadcast(analysis_sandbox_event, session_id) yield _sse_event(analysis_sandbox_event) try: sandbox_result = await asyncio.to_thread( execute_python_sandbox, sandbox_code, analysis_payload, session_id=session_id, timeout_seconds=25, ) except Exception as exc: sandbox_result = SandboxExecutionResult( success=False, output=None, error=f"Sandbox setup failed: {exc}", stderr="", ) # Tool call result: sandbox.execute sandbox_exec_result_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="tool_call", status="completed" if sandbox_result.success else "failed", message=f"sandbox.execute() → {'analysis complete' if sandbox_result.success else 'failed'}", reward=0.1 if sandbox_result.success else 0.0, extracted_data={ "tool_name": "sandbox.execute", "result": { "success": sandbox_result.success, "output_keys": list(sandbox_result.output.keys()) if sandbox_result.output else [], "error": sandbox_result.error if not sandbox_result.success else None, }, }, timestamp=_now_iso(), ), ) await manager.broadcast(sandbox_exec_result_event, session_id) yield _sse_event(sandbox_exec_result_event) if sandbox_result.success and sandbox_result.output is not None: if isinstance(session["extracted_data"], dict): session["extracted_data"]["python_analysis"] = sandbox_result.output else: session["extracted_data"] = { "result": session["extracted_data"], "python_analysis": sandbox_result.output, } sandbox_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="python_sandbox", status="completed", message="Sandboxed Python plugin executed successfully", extracted_data={"analysis_keys": sorted(sandbox_result.output.keys())}, timestamp=_now_iso(), ), ) await manager.broadcast(sandbox_event, session_id) yield _sse_event(sandbox_event) else: error = sandbox_result.error or "Sandboxed Python execution failed" session["errors"].append(error) sandbox_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="python_sandbox", status="failed", message=error, extracted_data={"stderr": sandbox_result.stderr[:500]}, timestamp=_now_iso(), ), ) await manager.broadcast(sandbox_event, session_id) yield _sse_event(sandbox_event) duration = time.time() - start_time # Tool call: json.dumps (output formatting) json_format_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="tool_call", status="running", message=f"json.dumps(data, format='{request.output_format.value}')", extracted_data={ "tool_name": "json.dumps", "tool_description": f"Format extracted data as {request.output_format.value.upper()}", "parameters": { "output_format": request.output_format.value, "data_keys": list(session["extracted_data"].keys()) if isinstance(session["extracted_data"], dict) else ["data"], }, }, timestamp=_now_iso(), ), ) await manager.broadcast(json_format_event, session_id) yield _sse_event(json_format_event) output = await format_output( session["extracted_data"], request.output_format, request.output_instructions, ) json_format_result_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="tool_call", status="completed", message=f"json.dumps() → {len(output)} bytes", reward=0.05, extracted_data={ "tool_name": "json.dumps", "result": { "output_length": len(output), "format": request.output_format.value, }, }, timestamp=_now_iso(), ), ) await manager.broadcast(json_format_result_event, session_id) yield _sse_event(json_format_result_event) output_ext = request.output_format.value _write_session_artifact(session, f"final_output.{output_ext}", output) _write_session_json_artifact(session, "final_extracted_data.json", session["extracted_data"]) if request.enable_memory: # Tool call: memory.store memory_store_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]) + 1, action="tool_call", status="running", message="memory.store(key='summary', type='LONG_TERM')", extracted_data={ "tool_name": "memory.store", "tool_description": "Store scrape summary in long-term memory", "parameters": { "key": f"scrape:{session_id}:summary", "memory_type": "LONG_TERM", "output_length": len(output), }, }, timestamp=_now_iso(), ), ) await manager.broadcast(memory_store_event, session_id) yield _sse_event(memory_store_event) try: await memory_manager.store( key=f"scrape:{session_id}:summary", value=output, memory_type=MemoryType.LONG_TERM, metadata={ "session_id": session_id, "complexity": request.complexity.value, "provider": request.provider, "model": request.model, }, ) _write_session_artifact(session, "memory_summary.txt", output) # Tool call result: memory.store memory_store_result_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="tool_call", status="completed", message="memory.store() → stored", reward=0.05, extracted_data={ "tool_name": "memory.store", "result": {"stored": True, "key": f"scrape:{session_id}:summary"}, }, timestamp=_now_iso(), ), ) await manager.broadcast(memory_store_result_event, session_id) yield _sse_event(memory_store_result_event) except Exception as exc: session["errors"].append(f"Failed to store summary memory: {exc}") memory_store_fail_event = _record_step( session, ScrapeStep( step_number=len(session["steps"]), action="tool_call", status="failed", message=f"memory.store() → {str(exc)[:50]}", extracted_data={ "tool_name": "memory.store", "result": {"stored": False, "error": str(exc)[:100]}, }, timestamp=_now_iso(), ), ) await manager.broadcast(memory_store_fail_event, session_id) yield _sse_event(memory_store_fail_event) response = ScrapeResponse( session_id=session_id, status="completed" if not session["errors"] else "partial", total_steps=len(session["steps"]), total_reward=session["total_reward"], extracted_data=session["extracted_data"], output=output, output_format=request.output_format, duration_seconds=duration, urls_processed=len(resolved_assets), errors=session["errors"], enabled_plugins=enabled_plugins, requested_plugins=request.enable_plugins, selected_agents=request.selected_agents, memory_enabled=request.enable_memory, sandbox_artifacts=_list_session_artifacts(session), ) complete_event = {"type": "complete", "data": response.model_dump()} await manager.broadcast(complete_event, session_id) yield _sse_event(complete_event) session["status"] = response.status session["duration"] = duration @router.post("/stream") async def scrape_with_stream( request: ScrapeRequest, settings: SettingsDep, memory_manager: MemoryManagerDep, ) -> StreamingResponse: """Start a scrape run and stream updates via SSE.""" if not request.assets: raise HTTPException(status_code=400, detail="At least one asset URL is required") session_id = request.session_id or str(uuid.uuid4()) if get_session(session_id): raise HTTPException(status_code=409, detail=f"Session {session_id} already exists") return StreamingResponse( scrape_stream(session_id, request, settings, memory_manager), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Session-Id": session_id, }, ) @router.post("/") async def scrape_sync( request: ScrapeRequest, settings: SettingsDep, memory_manager: MemoryManagerDep, background_tasks: BackgroundTasks, ) -> dict[str, Any]: """Start a scrape run in the background and return session ID.""" if not request.assets: raise HTTPException(status_code=400, detail="At least one asset URL is required") session_id = request.session_id or str(uuid.uuid4()) if get_session(session_id): raise HTTPException(status_code=409, detail=f"Session {session_id} already exists") async def run_scrape() -> None: try: async for _ in scrape_stream(session_id, request, settings, memory_manager): pass except Exception as exc: logger.exception("Background scrape failed", extra={"session_id": session_id}) update_session(session_id, {"status": "failed", "errors": [str(exc)]}) background_tasks.add_task(run_scrape) return { "session_id": session_id, "status": "started", "message": f"Scraping {len(request.assets)} URLs", "assets": request.assets, "selected_agents": request.selected_agents, } @router.get("/sessions") async def list_sessions() -> dict[str, Any]: """List all active scrape sessions.""" sessions = [ { "session_id": session_id, "status": session["status"], "urls_count": len(session.get("resolved_assets") or session["request"].assets), "current_index": session.get("current_url_index", 0), "total_reward": session["total_reward"], "steps": len(session["steps"]), } for session_id, session in _active_sessions.items() ] return {"sessions": sessions, "count": len(sessions)} @router.get("/{session_id}/status") async def get_scrape_status(session_id: str) -> dict[str, Any]: """Get current status for one scrape session.""" session = get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") duration = ( time.time() - session["start_time"] if session["status"] == "running" else session.get("duration", 0.0) ) return { "session_id": session_id, "status": session["status"], "current_url_index": session.get("current_url_index", 0), "total_urls": len(session.get("resolved_assets") or session["request"].assets), "total_reward": session["total_reward"], "extracted_count": len(session["extracted_data"]), "steps_count": len(session["steps"]), "errors": session["errors"], "enabled_plugins": session.get("enabled_plugins", []), "selected_agents": session["request"].selected_agents, "sandbox_artifacts": _list_session_artifacts(session), "duration": duration, } @router.get("/{session_id}/sandbox/files") async def list_sandbox_files(session_id: str) -> dict[str, Any]: """List sandbox artifacts for a scrape session.""" session = get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") sandbox_dir = session.get("sandbox_dir") if not sandbox_dir: return {"session_id": session_id, "files": [], "count": 0} base = Path(sandbox_dir) if not base.exists(): return {"session_id": session_id, "files": [], "count": 0} files: list[dict[str, Any]] = [] for file in base.iterdir(): if not file.is_file(): continue files.append( { "name": file.name, "size_bytes": file.stat().st_size, } ) files.sort(key=lambda item: item["name"]) return {"session_id": session_id, "files": files, "count": len(files)} @router.get("/{session_id}/sandbox/files/{file_name}") async def read_sandbox_file(session_id: str, file_name: str) -> dict[str, Any]: """Read a sandbox file content from the current session.""" session = get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") sandbox_dir = session.get("sandbox_dir") if not sandbox_dir: raise HTTPException(status_code=404, detail="Sandbox not available for session") safe_name = Path(file_name).name file_path = Path(sandbox_dir) / safe_name if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail="Sandbox file not found") content = file_path.read_text(encoding="utf-8", errors="ignore") return { "session_id": session_id, "file_name": safe_name, "size_bytes": file_path.stat().st_size, "content": content, } @router.get("/{session_id}/result") async def get_scrape_result(session_id: str) -> ScrapeResponse: """Get final result for one scrape session.""" session = get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") if session["status"] == "running": raise HTTPException(status_code=400, detail="Scraping still in progress") request: ScrapeRequest = session["request"] duration = session.get("duration", time.time() - session["start_time"]) output = await format_output( session["extracted_data"], request.output_format, request.output_instructions, ) return ScrapeResponse( session_id=session_id, status=session["status"], total_steps=len(session["steps"]), total_reward=session["total_reward"], extracted_data=session["extracted_data"], output=output, output_format=request.output_format, duration_seconds=duration, urls_processed=len(session.get("resolved_assets") or request.assets), errors=session["errors"], enabled_plugins=session.get("enabled_plugins", []), requested_plugins=request.enable_plugins, selected_agents=request.selected_agents, memory_enabled=request.enable_memory, sandbox_artifacts=_list_session_artifacts(session), ) @router.delete("/{session_id}") async def cancel_scrape(session_id: str) -> dict[str, str]: """Cancel a running scrape session.""" session = get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") update_session(session_id, {"status": "cancelled"}) return {"status": "cancelled", "session_id": session_id} @router.delete("/{session_id}/cleanup") async def cleanup_scrape(session_id: str) -> dict[str, str]: """Delete a completed/cancelled session.""" removed = remove_session(session_id) if not removed: raise HTTPException(status_code=404, detail="Session not found") return {"status": "removed", "session_id": session_id}