import asyncio import os import tempfile from playwright.async_api import async_playwright, TimeoutError from pydantic import BaseModel, Field from typing import Optional, List from bs4 import BeautifulSoup import pymupdf from urllib.parse import urljoin, urlparse, urldefrag from playwright_stealth import Stealth from enum import StrEnum class Status(StrEnum): RELEVANT = "RELEVANT" IRRELEVANT = "IRRELEVANT" FAILED = "FAILED" class LinkOverview(BaseModel): summary: str = Field(..., description="A brief summary of the link's content, maximum 1 paragraph.") SLA: Optional[str] = Field(None, description="Service Level Agreement in days of visa, if mentioned.") required_docs: Optional[str] = Field(None, description="List of required documents as text, if mentioned.") price: Optional[str] = Field(None, description="Price or fee information, if mentioned.") details: Optional[str] = Field(None, description="Additional details about the link's content. 3-5 paragraphs max.") status: Status = Field(Status.FAILED, description="Overall status of the analysis.") class LinkNode(BaseModel): href: str = Field(..., description="The URL of the link") overview: LinkOverview = Field(..., description="Summary and details about the link's content") parent: Optional[str] = Field(None, description="The parent link, where this link was found (source).") child: List[str] = Field(..., description="List of links found on this page") depth: int = Field(..., description="Depth level in the link hierarchy (0=root, 1=child of root, etc.)") raw_text: Optional[str] = None # Field to store scraped text before analysis class BrowserAgent: def __init__(self, model: any, max_depth: int = 2): self.model = model self.max_depth = max_depth self.link_map = {} self.browser = None self.context = None self.stealth_manager = None async def __aenter__(self): """Initializes the browser using the new Stealth context manager pattern.""" self.stealth_manager = Stealth().use_async(async_playwright()) self.p = await self.stealth_manager.__aenter__() self.browser = await self.p.chromium.launch( headless=True, args=[ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--disable-setuid-sandbox', '--disable-web-security', '--disable-features=IsolateOrigins,site-per-process' ] ) self.context = await self.browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", viewport={'width': 1920, 'height': 1080}, locale='en-US', timezone_id='America/New_York' ) print("🚀 Browser agent initialized with Stealth API.") return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Closes the browser and cleans up all resources in reverse order.""" if self.context: await self.context.close() if self.browser: await self.browser.close() if self.stealth_manager: await self.stealth_manager.__aexit__(exc_type, exc_val, exc_tb) print("✅ Browser agent shut down gracefully.") async def run(self, start_url: str): """Public method to start the full process: crawl then analyze.""" if not start_url.startswith("http"): start_url = "http://" + start_url print(f"Starting crawl from: {start_url}") await self._explore(url=start_url, depth=0, parent_url=None) print("\n--- Crawl Complete. Starting AI Analysis ---") await self.analyze_map() return self.link_map async def _explore(self, url: str, depth: int, parent_url: Optional[str]): """Recursively scrapes text and finds links, without calling the LLM.""" url = urldefrag(url).url if url in self.link_map or depth > self.max_depth: return print(f"Scraping URL: {url} (Depth: {depth})") # Initialize with a default 'FAILED' status, which will be updated upon success overview = LinkOverview(summary="Pending analysis...", status=Status.FAILED) self.link_map[url] = LinkNode(href=url, overview=overview, parent=parent_url, child=[], depth=depth) page = await self.context.new_page() await page.add_init_script(""" // Override the navigator.webdriver property Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // Override chrome property window.chrome = { runtime: {} }; // Override permissions const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); """) try: content, soup, is_pdf = await self._get_page_content(page, url) if content is not None: self.link_map[url].raw_text = content # The status is still pending; the AI will determine RELEVANT/IRRELEVANT later. # We can leave the default status as FAILED for now, as it indicates a failure in the *overall* process until the AI succeeds. is_relevant_for_crawl = "visa" in content.lower() if not is_pdf and is_relevant_for_crawl: child_links = self._find_child_links(soup, url) self.link_map[url].child = child_links tasks = [self._explore(link, depth + 1, url) for link in child_links if link not in self.link_map] if tasks: await asyncio.gather(*tasks) else: # Content retrieval failed, so we finalize the status as FAILED. self.link_map[url].overview.summary = "Failed to retrieve or process page content." self.link_map[url].overview.status = Status.FAILED # Explicitly confirm failure finally: await page.close() async def analyze_map(self): """Iterates through the completed map and sends content to the LLM for analysis.""" tasks = [] for url, node in self.link_map.items(): if node.raw_text and node.overview.summary == "Pending analysis...": tasks.append(self.analyze_node(url)) if tasks: print(f"Found {len(tasks)} pages to analyze with the LLM...") await asyncio.gather(*tasks) async def analyze_node(self, url: str): """Helper function to analyze a single node.""" print(f" Analyzing content for: {url}") node = self.link_map[url] overview = await self._analyze_content(node.raw_text) node.overview = overview node.raw_text = None # Optional: clear text after analysis to save memory async def _get_page_content(self, page, url: str): """Navigates to a URL and extracts its text content, handling various scenarios.""" NAVIGATION_TIMEOUT_MS = 60000 # Increased timeout try: page_text = "" soup = None is_pdf = False # Navigate with a longer timeout and wait for domcontentloaded first response = await page.goto(url, wait_until="domcontentloaded", timeout=NAVIGATION_TIMEOUT_MS) if not response: raise Exception("No response from server.") content_type = response.headers.get("content-type", "").lower() if "application/pdf" in content_type: is_pdf = True print("-> PDF detected (in-browser viewer)...") pdf_bytes = await response.body() doc = pymupdf.open(stream=pdf_bytes, filetype="pdf") for p in doc: page_text += p.get_text() + "\n" doc.close() else: print("-> HTML detected...") try: # Wait for network to be idle await page.wait_for_load_state("networkidle", timeout=15000) except TimeoutError: print(f" Warning: Timed out waiting for 'networkidle' on {url}.") # Additional wait for JavaScript to execute try: await page.wait_for_load_state("load", timeout=10000) except TimeoutError: print(f" Warning: Timed out waiting for 'load' state on {url}.") # Give extra time for dynamic content to render await asyncio.sleep(2) # Try to wait for body content to be present try: await page.wait_for_selector('body', timeout=5000) except TimeoutError: print(f" Warning: Body selector not found on {url}.") # Check if there's a "JavaScript required" message content_check = await page.content() if "javascript" in content_check.lower() and "enable" in content_check.lower(): print(f" Warning: Page may require JavaScript to be enabled: {url}") # Try scrolling to trigger lazy-loaded content await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(1) await page.evaluate("window.scrollTo(0, 0)") await asyncio.sleep(1) page_content = await page.content() soup = BeautifulSoup(page_content, 'html.parser') # Remove unwanted elements for el in soup(['script', 'style', 'header', 'footer', 'nav', 'aside']): el.decompose() page_text = soup.get_text(separator=' ', strip=True) # If page text is suspiciously short, it might be a JS-heavy page if len(page_text.strip()) < 100: print(f" Warning: Very little text content found ({len(page_text)} chars). Page may be JS-dependent.") # Try getting inner text via JavaScript try: js_text = await page.evaluate("document.body.innerText") if len(js_text) > len(page_text): print(f" -> Using JavaScript-extracted text instead ({len(js_text)} chars)") page_text = js_text except Exception as e: print(f" -> Could not extract text via JavaScript: {e}") return page_text, soup, is_pdf except Exception as e: if "Download is starting" in str(e): print(f"-> Page forced a download for {url}. Capturing the file...") is_pdf = True download = await page.wait_for_event("download", timeout=NAVIGATION_TIMEOUT_MS) with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: await download.save_as(tmp_file.name) tmp_file_path = tmp_file.name with open(tmp_file_path, "rb") as f: pdf_bytes = f.read() os.unlink(tmp_file_path) doc = pymupdf.open(stream=pdf_bytes, filetype="pdf") for p in doc: page_text += p.get_text() + "\n" doc.close() return page_text, None, True else: print(f" Error: Failed to get content for {url}. Reason: {e}") return None, None, None def _find_child_links(self, soup: BeautifulSoup, base_url: str) -> List[str]: """Finds, filters, and resolves all valid child links on a page across any domain.""" links = [] for link in soup.find_all('a', href=True): href = link['href'].strip() if href.lower().startswith(('mailto:', 'tel:')): continue absolute_url = urljoin(base_url, href) absolute_url = urldefrag(absolute_url).url if absolute_url.startswith(('http://', 'https://')): links.append(absolute_url) return list(set(links)) async def _analyze_content(self, page_text: str) -> LinkOverview: """Sends page text to the LLM for structured analysis.""" llm_prompt = f""" You are an expert visa information analyst. Your task is to meticulously analyze the following web page content (`page_text`) and extract all visa-related information into a structured format based on the provided `LinkOverview` schema. **Core Directives:** 1. **Comprehensive Analysis:** If the text describes multiple visa types (e.g., Tourist, Business, Student), you must identify and describe each one completely. Do not merge or generalize information. 2. **Self-Contained Output:** The generated response must be exhaustive and self-contained. Extract all relevant details so the user does not need to visit the original webpage. Never instruct the user to "visit the link for more details." 3. **Complete Data Extraction:** For *every* visa type mentioned, you must extract its specific Service Level Agreement (SLA), price, and a full list of required documents. If information for a specific field is not mentioned for a visa type, explicitly state that. **Field-Specific Formatting and Instructions:** * **`summary`**: Provide a brief overview of all the visa services described on the page. * **`details`**: If there are multiple visa types, use this field to provide a detailed breakdown for each one. Use clear headings for each visa type (e.g., "**Tourist Visa (Subclass 600)**"). * **`SLA`**: Clearly list the processing time for each visa type mentioned. For example: "Tourist Visa: 15-20 business days; Business Visa: 10-15 business days." * **`price`**: Clearly list the fees for each visa type mentioned. For example: "Tourist Visa: $150; Business Visa: $180." * **`required_docs`**: This is a critical field. The information must be concise, clear, and follow the exact format below. Synthesize conditions and exceptions into the bullet points. * Do not just copy-paste text. Summarize requirements intelligently (e.g., specify if documents need translation, if physical copies are needed, or note exceptions for minors). * Use this strict format: ```text **[Visa Type Name 1]** Required: - Passport with at least 6 months validity - Physical bank statement from the last 3 months (must be translated if not in English) - Signed consent letter from both parents (for applicants under 18 traveling alone) Optional: - Hotel booking confirmation - Travel insurance **[Visa Type Name 2]** Required: - [Document 1 with specific conditions] - [Document 2 with specific conditions] Optional: - [Optional Document 1] ``` * **`status`**: This is crucial. If unable to load the complete info from the page, including JS enabled or timeout issue, set to `'FAILED'`. Set to `'RELEVANT'` if the page contains any visa-related information. If the page is loaded completely and unrelated to visas (e.g., a privacy policy, a different product), set this to `'IRRELEVANT'`. If `'IRRELEVANT'`, you can leave other fields empty. **Analyze the following web page content and generate the structured data:** {page_text}""" try: print(f" Sending {len(page_text)} chars to GenAI for analysis...") # This is where you call your actual AI model client llm_result = await self.model.formated_prompt( prompt=llm_prompt, response_schema=LinkOverview ) if llm_result and llm_result.get("parsed"): print(" GenAI analysis successful.") # Pydantic automatically validates and converts the string 'RELEVANT' to Status.RELEVANT overview = LinkOverview.model_validate(llm_result["parsed"]) else: print(" Warning: GenAI call succeeded but returned no structured data.") overview = LinkOverview( summary="Content analysis failed: The AI model returned an empty or unparsable response.", status=Status.FAILED ) except Exception as e: print(f"Error in GenAI structured extraction: {e}") overview = LinkOverview( summary=f"Content analysis failed with an error: {e}", status=Status.FAILED ) return overview