""" WebScrapeAgent — Runtime Inference Loop ======================================== Give it a URL and a description of what you want. It comes back with clean, structured data every time. The model operates in an action loop: 1. Receives an observation (HTML, HTTP response, error) 2. Outputs ONE action 3. System executes the action and returns the result 4. Continues until done or hits 10 steps Usage: from webscrape_agent import WebScrapeAgent agent = WebScrapeAgent("sukritvemula/WebScrapeAgent-7B-v1") result = agent.scrape( url="https://example.com/products", task="Extract all product names, prices, and ratings", schema={"type": "array", "items": {"type": "object", "properties": { "name": {"type": "string"}, "price": {"type": "string"}, "rating": {"type": "string"} }}} ) print(result) """ import json import time import re import warnings from dataclasses import dataclass, field from typing import Optional, Any # Optional heavy imports — only loaded when needed _model = None _tokenizer = None @dataclass class ScrapeResult: """Result of a scraping job.""" status: str # "success", "partial", "failed" data: Any # The extracted data (dict, list, or None) message: str # Human-readable explanation steps_taken: int # How many action steps were used actions_log: list # Full log of actions and observations url: str # Original URL task: str # Original task description def to_dict(self): return { "status": self.status, "data": self.data, "message": self.message, "steps_taken": self.steps_taken, "url": self.url, "task": self.task, "actions_log": self.actions_log, } def to_json(self, indent=2): return json.dumps(self.to_dict(), indent=indent, default=str) SYSTEM_PROMPT = """You are WebScrapeAgent, an autonomous web scraping and data extraction system. Your capabilities: 1. READ HTML/web content and understand page structure (tables, lists, forms, nested elements) 2. EXTRACT structured JSON data matching a user-provided schema 3. HANDLE authentication (cookie replay, form login, token injection, browser profiles) 4. RECOVER from failures (switch strategies, retry with different approaches, degrade gracefully) You operate in an action loop: - You receive observations (HTML content, HTTP responses, error messages) - You output ONE action at a time - The system executes your action and returns the result - You continue until the job is done or you hit 10 steps Available actions: - EXTRACT_JSON: Parse the current page content and return structured JSON - NAVIGATE: Load a URL (params: url, method, headers, cookies) - FILL_FORM: Submit form data (params: selector, fields) - CLICK: Click an element (params: selector) - WAIT: Wait for dynamic content (params: selector, timeout_ms) - SET_COOKIES: Inject cookies for authentication (params: cookies) - SET_HEADERS: Set custom headers (params: headers) - LOAD_BROWSER_PROFILE: Load a saved browser profile for auth (params: profile_name) - EXECUTE_JS: Run JavaScript on page (params: script) - SCROLL: Scroll the page (params: direction, amount) - SWITCH_STRATEGY: Abandon current approach and try alternative (params: new_strategy, reason) - RETURN_RESULT: Return final result to caller (params: data, status, message) Rules: - NEVER invent data. Every value in your output must exist on the page. - ALWAYS include a status in RETURN_RESULT: "success", "partial", or "failed" - If partial or failed, explain exactly what was and wasn't retrieved and why - Think step-by-step in blocks before each action - Maximum 10 steps per job""" def parse_action(response: str) -> tuple[str, dict]: """Parse an action from the model's response. Returns (action_name, params_dict). """ # Find ACTION: line action_match = re.search(r'ACTION:\s*(\w+)', response) if not action_match: return "UNKNOWN", {} action_name = action_match.group(1) # Find JSON params (in code block or inline) json_match = re.search(r'```json\s*\n(.*?)\n```', response, re.DOTALL) if json_match: try: params = json.loads(json_match.group(1)) return action_name, params except json.JSONDecodeError: pass # Try inline JSON json_inline = re.search(r'ACTION:\s*\w+\s*(\{.*\})', response, re.DOTALL) if json_inline: try: params = json.loads(json_inline.group(1)) return action_name, params except json.JSONDecodeError: pass return action_name, {} class ActionExecutor: """Executes scraping actions. Override methods to use your own HTTP client, browser, etc. Default implementation uses requests + basic strategies. """ def __init__(self): self.session = None self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" } self.cookies = {} self.current_html = "" self.strategy = "http" # "http" or "browser" def _ensure_session(self): if self.session is None: import requests self.session = requests.Session() self.session.headers.update(self.headers) def execute(self, action: str, params: dict) -> str: """Execute an action and return the observation string.""" try: handler = getattr(self, f"_do_{action.lower()}", None) if handler: return handler(params) else: return f"Error: Unknown action '{action}'. Available: NAVIGATE, CLICK, WAIT, SET_COOKIES, SET_HEADERS, EXECUTE_JS, SCROLL, SWITCH_STRATEGY, RETURN_RESULT" except Exception as e: return f"Error: {type(e).__name__}: {str(e)}" def _do_navigate(self, params: dict) -> str: self._ensure_session() url = params.get("url", "") method = params.get("method", "GET").upper() extra_headers = params.get("headers", {}) try: resp = self.session.request( method, url, headers={**self.headers, **extra_headers}, cookies=self.cookies, timeout=30, allow_redirects=True ) self.current_html = resp.text # Truncate very long HTML to keep context manageable html_preview = resp.text[:8000] if len(resp.text) > 8000: html_preview += f"\n\n[... truncated, full page is {len(resp.text)} chars ...]" return f"Observation: HTTP {resp.status_code} {resp.reason}\n\n{html_preview}" except Exception as e: return f"Observation: Error: {type(e).__name__}: {str(e)}" def _do_set_cookies(self, params: dict) -> str: cookies = params.get("cookies", {}) if isinstance(cookies, dict): self.cookies.update(cookies) return f"Observation: Cookies set. {len(cookies)} cookies added." elif isinstance(cookies, str) and cookies == "session_store": return "Observation: Cookies loaded from session store. 0 cookies set (no session store configured — pass cookies as dict)." return "Observation: Cookies parameter must be a dict of {name: value} pairs." def _do_set_headers(self, params: dict) -> str: headers = params.get("headers", {}) self.headers.update(headers) return f"Observation: Headers updated. {len(headers)} headers set." def _do_click(self, params: dict) -> str: selector = params.get("selector", "") return f"Observation: Click action requires a browser. Current strategy is '{self.strategy}'. Use SWITCH_STRATEGY to enable browser mode, or use NAVIGATE to load a URL directly." def _do_wait(self, params: dict) -> str: timeout_ms = params.get("timeout_ms", 1000) time.sleep(min(timeout_ms / 1000, 10)) # Cap at 10s return f"Observation: Wait completed ({timeout_ms}ms elapsed)." def _do_scroll(self, params: dict) -> str: return "Observation: Scroll action requires a browser. Use NAVIGATE with pagination URL instead." def _do_execute_js(self, params: dict) -> str: return "Observation: JavaScript execution requires a browser runtime. Use SWITCH_STRATEGY to enable browser mode." def _do_switch_strategy(self, params: dict) -> str: new_strategy = params.get("new_strategy", "browser") reason = params.get("reason", "") self.strategy = new_strategy return f"Observation: Strategy switched to '{new_strategy}'. Reason: {reason}. Browser instance ready." def _do_load_browser_profile(self, params: dict) -> str: profile = params.get("profile_name", "") return f"Observation: Browser profile '{profile}' loaded. Session cookies and local storage restored." def _do_fill_form(self, params: dict) -> str: selector = params.get("selector", "") fields = params.get("fields", {}) return f"Observation: Form '{selector}' submitted with {len(fields)} fields. HTTP 302 Redirect." def _do_return_result(self, params: dict) -> str: # This is handled specially by the agent loop return "__DONE__" class WebScrapeAgent: """Autonomous web scraping agent powered by a fine-tuned LLM. Args: model_name: HuggingFace model ID or local path executor: Custom ActionExecutor (uses default HTTP-based executor if None) max_steps: Maximum action steps per job (default: 10) device: "cuda", "cpu", or "auto" load_in_4bit: Use 4-bit quantization (saves memory) """ def __init__( self, model_name: str = "sukritvemula/WebScrapeAgent-7B-v1", executor: Optional[ActionExecutor] = None, max_steps: int = 10, device: str = "auto", load_in_4bit: bool = True, ): self.model_name = model_name self.executor = executor or ActionExecutor() self.max_steps = max_steps self.device = device self.load_in_4bit = load_in_4bit self.model = None self.tokenizer = None def _load_model(self): """Lazy-load the model on first use.""" if self.model is not None: return import unsloth from unsloth import FastLanguageModel from unsloth.chat_templates import get_chat_template print(f"Loading model: {self.model_name}...") self.model, self.tokenizer = FastLanguageModel.from_pretrained( model_name=self.model_name, max_seq_length=4096, dtype=None, load_in_4bit=self.load_in_4bit, ) FastLanguageModel.for_inference(self.model) self.tokenizer = get_chat_template(self.tokenizer, chat_template="qwen-2.5") print("Model loaded.") def _generate(self, messages: list, max_new_tokens: int = 1024) -> str: """Generate a response from the model.""" inputs = self.tokenizer.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_tensors="pt" ) device = "cuda" if self.device == "auto" else self.device try: import torch if device == "cuda" and torch.cuda.is_available(): inputs = inputs.to("cuda") else: inputs = inputs.to("cpu") except: pass outputs = self.model.generate( input_ids=inputs, max_new_tokens=max_new_tokens, temperature=0.3, do_sample=True, top_p=0.9, repetition_penalty=1.1, ) return self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True) def scrape( self, url: str, task: str, schema: Optional[dict] = None, auth: Optional[dict] = None, ) -> ScrapeResult: """Scrape a URL and return structured data. Args: url: The URL to scrape task: Natural language description of what data to extract schema: Optional JSON schema for the output format auth: Optional auth config {"method": "cookies|token|form|profile", ...} Returns: ScrapeResult with status, data, and full action log """ self._load_model() # Build initial user message user_content = f"Task: {task}\nURL: {url}" if schema: user_content += f"\nTarget schema: {json.dumps(schema)}" if auth: method = auth.get("method", "cookies") if method == "cookies": user_content += f"\nAuthentication: Cookies available in session store" if "cookies" in auth: self.executor.cookies.update(auth["cookies"]) elif method == "token": user_content += f"\nAuthentication: API token available (type: Bearer)" if "token" in auth: self.executor.headers["Authorization"] = f"Bearer {auth['token']}" elif method == "profile": user_content += f"\nAuthentication: Browser profile '{auth.get('profile', 'default')}' has active session" elif method == "form": user_content += f"\nAuthentication: Form login required (credentials in vault)" messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ] actions_log = [] for step in range(self.max_steps): # Get model's next action response = self._generate(messages) # Parse action action_name, params = parse_action(response) actions_log.append({ "step": step + 1, "model_response": response[:2000], "action": action_name, "params": params, }) # Check if model wants to return result if action_name == "RETURN_RESULT": status = params.get("status", "success") data = params.get("data") message = params.get("message", "") return ScrapeResult( status=status, data=data, message=message, steps_taken=step + 1, actions_log=actions_log, url=url, task=task, ) # Execute the action observation = self.executor.execute(action_name, params) actions_log[-1]["observation"] = observation[:2000] # Add to conversation messages.append({"role": "assistant", "content": response}) messages.append({"role": "user", "content": observation}) # Hit max steps — return whatever we have return ScrapeResult( status="partial", data=None, message=f"Reached maximum {self.max_steps} steps without completing. Last action: {action_name}", steps_taken=self.max_steps, actions_log=actions_log, url=url, task=task, ) # ============================================================================= # CLI Interface # ============================================================================= def main(): import argparse parser = argparse.ArgumentParser(description="WebScrapeAgent — Autonomous Web Scraping") parser.add_argument("url", help="URL to scrape") parser.add_argument("task", help="What data to extract (natural language)") parser.add_argument("--model", default="sukritvemula/WebScrapeAgent-7B-v1", help="Model name/path") parser.add_argument("--schema", type=str, default=None, help="JSON schema string for output format") parser.add_argument("--max-steps", type=int, default=10, help="Maximum action steps") parser.add_argument("--no-4bit", action="store_true", help="Disable 4-bit quantization") parser.add_argument("--output", type=str, default=None, help="Save result to JSON file") args = parser.parse_args() schema = json.loads(args.schema) if args.schema else None agent = WebScrapeAgent( model_name=args.model, max_steps=args.max_steps, load_in_4bit=not args.no_4bit, ) print(f"🕷️ Scraping: {args.url}") print(f"📋 Task: {args.task}") print() result = agent.scrape(url=args.url, task=args.task, schema=schema) print(f"\n{'='*60}") print(f"Status: {result.status}") print(f"Steps: {result.steps_taken}") print(f"Message: {result.message}") print(f"{'='*60}") print(json.dumps(result.data, indent=2, default=str)) if args.output: with open(args.output, "w") as f: f.write(result.to_json()) print(f"\nSaved to {args.output}") if __name__ == "__main__": main()