Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import tempfile | |
| from playwright.async_api import async_playwright, TimeoutError | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, List | |
| from bs4 import BeautifulSoup | |
| import pymupdf | |
| from urllib.parse import urljoin, urlparse, urldefrag | |
| from playwright_stealth import Stealth | |
| from enum import StrEnum | |
| class Status(StrEnum): | |
| RELEVANT = "RELEVANT" | |
| IRRELEVANT = "IRRELEVANT" | |
| FAILED = "FAILED" | |
| class LinkOverview(BaseModel): | |
| summary: str = Field(..., description="A brief summary of the link's content, maximum 1 paragraph.") | |
| SLA: Optional[str] = Field(None, description="Service Level Agreement in days of visa, if mentioned.") | |
| required_docs: Optional[str] = Field(None, description="List of required documents as text, if mentioned.") | |
| price: Optional[str] = Field(None, description="Price or fee information, if mentioned.") | |
| details: Optional[str] = Field(None, description="Additional details about the link's content. 3-5 paragraphs max.") | |
| status: Status = Field(Status.FAILED, description="Overall status of the analysis.") | |
| class LinkNode(BaseModel): | |
| href: str = Field(..., description="The URL of the link") | |
| overview: LinkOverview = Field(..., description="Summary and details about the link's content") | |
| parent: Optional[str] = Field(None, description="The parent link, where this link was found (source).") | |
| child: List[str] = Field(..., description="List of links found on this page") | |
| depth: int = Field(..., description="Depth level in the link hierarchy (0=root, 1=child of root, etc.)") | |
| raw_text: Optional[str] = None # Field to store scraped text before analysis | |
| class BrowserAgent: | |
| def __init__(self, model: any, max_depth: int = 2): | |
| self.model = model | |
| self.max_depth = max_depth | |
| self.link_map = {} | |
| self.browser = None | |
| self.context = None | |
| self.stealth_manager = None | |
| async def __aenter__(self): | |
| """Initializes the browser using the new Stealth context manager pattern.""" | |
| self.stealth_manager = Stealth().use_async(async_playwright()) | |
| self.p = await self.stealth_manager.__aenter__() | |
| self.browser = await self.p.chromium.launch( | |
| headless=True, | |
| args=[ | |
| '--disable-blink-features=AutomationControlled', | |
| '--disable-dev-shm-usage', | |
| '--no-sandbox', | |
| '--disable-setuid-sandbox', | |
| '--disable-web-security', | |
| '--disable-features=IsolateOrigins,site-per-process' | |
| ] | |
| ) | |
| self.context = await self.browser.new_context( | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| viewport={'width': 1920, 'height': 1080}, | |
| locale='en-US', | |
| timezone_id='America/New_York' | |
| ) | |
| print("🚀 Browser agent initialized with Stealth API.") | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Closes the browser and cleans up all resources in reverse order.""" | |
| if self.context: await self.context.close() | |
| if self.browser: await self.browser.close() | |
| if self.stealth_manager: await self.stealth_manager.__aexit__(exc_type, exc_val, exc_tb) | |
| print("✅ Browser agent shut down gracefully.") | |
| async def run(self, start_url: str): | |
| """Public method to start the full process: crawl then analyze.""" | |
| if not start_url.startswith("http"): | |
| start_url = "http://" + start_url | |
| print(f"Starting crawl from: {start_url}") | |
| await self._explore(url=start_url, depth=0, parent_url=None) | |
| print("\n--- Crawl Complete. Starting AI Analysis ---") | |
| await self.analyze_map() | |
| return self.link_map | |
| async def _explore(self, url: str, depth: int, parent_url: Optional[str]): | |
| """Recursively scrapes text and finds links, without calling the LLM.""" | |
| url = urldefrag(url).url | |
| if url in self.link_map or depth > self.max_depth: | |
| return | |
| print(f"Scraping URL: {url} (Depth: {depth})") | |
| # Initialize with a default 'FAILED' status, which will be updated upon success | |
| overview = LinkOverview(summary="Pending analysis...", status=Status.FAILED) | |
| self.link_map[url] = LinkNode(href=url, overview=overview, parent=parent_url, child=[], depth=depth) | |
| page = await self.context.new_page() | |
| await page.add_init_script(""" | |
| // Override the navigator.webdriver property | |
| Object.defineProperty(navigator, 'webdriver', { | |
| get: () => undefined | |
| }); | |
| // Override chrome property | |
| window.chrome = { | |
| runtime: {} | |
| }; | |
| // Override permissions | |
| const originalQuery = window.navigator.permissions.query; | |
| window.navigator.permissions.query = (parameters) => ( | |
| parameters.name === 'notifications' ? | |
| Promise.resolve({ state: Notification.permission }) : | |
| originalQuery(parameters) | |
| ); | |
| """) | |
| try: | |
| content, soup, is_pdf = await self._get_page_content(page, url) | |
| if content is not None: | |
| self.link_map[url].raw_text = content | |
| # The status is still pending; the AI will determine RELEVANT/IRRELEVANT later. | |
| # We can leave the default status as FAILED for now, as it indicates a failure in the *overall* process until the AI succeeds. | |
| is_relevant_for_crawl = "visa" in content.lower() | |
| if not is_pdf and is_relevant_for_crawl: | |
| child_links = self._find_child_links(soup, url) | |
| self.link_map[url].child = child_links | |
| tasks = [self._explore(link, depth + 1, url) for link in child_links if link not in self.link_map] | |
| if tasks: | |
| await asyncio.gather(*tasks) | |
| else: | |
| # Content retrieval failed, so we finalize the status as FAILED. | |
| self.link_map[url].overview.summary = "Failed to retrieve or process page content." | |
| self.link_map[url].overview.status = Status.FAILED # Explicitly confirm failure | |
| finally: | |
| await page.close() | |
| async def analyze_map(self): | |
| """Iterates through the completed map and sends content to the LLM for analysis.""" | |
| tasks = [] | |
| for url, node in self.link_map.items(): | |
| if node.raw_text and node.overview.summary == "Pending analysis...": | |
| tasks.append(self.analyze_node(url)) | |
| if tasks: | |
| print(f"Found {len(tasks)} pages to analyze with the LLM...") | |
| await asyncio.gather(*tasks) | |
| async def analyze_node(self, url: str): | |
| """Helper function to analyze a single node.""" | |
| print(f" Analyzing content for: {url}") | |
| node = self.link_map[url] | |
| overview = await self._analyze_content(node.raw_text) | |
| node.overview = overview | |
| node.raw_text = None # Optional: clear text after analysis to save memory | |
| async def _get_page_content(self, page, url: str): | |
| """Navigates to a URL and extracts its text content, handling various scenarios.""" | |
| NAVIGATION_TIMEOUT_MS = 60000 # Increased timeout | |
| try: | |
| page_text = "" | |
| soup = None | |
| is_pdf = False | |
| # Navigate with a longer timeout and wait for domcontentloaded first | |
| response = await page.goto(url, wait_until="domcontentloaded", timeout=NAVIGATION_TIMEOUT_MS) | |
| if not response: | |
| raise Exception("No response from server.") | |
| content_type = response.headers.get("content-type", "").lower() | |
| if "application/pdf" in content_type: | |
| is_pdf = True | |
| print("-> PDF detected (in-browser viewer)...") | |
| pdf_bytes = await response.body() | |
| doc = pymupdf.open(stream=pdf_bytes, filetype="pdf") | |
| for p in doc: | |
| page_text += p.get_text() + "\n" | |
| doc.close() | |
| else: | |
| print("-> HTML detected...") | |
| try: | |
| # Wait for network to be idle | |
| await page.wait_for_load_state("networkidle", timeout=15000) | |
| except TimeoutError: | |
| print(f" Warning: Timed out waiting for 'networkidle' on {url}.") | |
| # Additional wait for JavaScript to execute | |
| try: | |
| await page.wait_for_load_state("load", timeout=10000) | |
| except TimeoutError: | |
| print(f" Warning: Timed out waiting for 'load' state on {url}.") | |
| # Give extra time for dynamic content to render | |
| await asyncio.sleep(2) | |
| # Try to wait for body content to be present | |
| try: | |
| await page.wait_for_selector('body', timeout=5000) | |
| except TimeoutError: | |
| print(f" Warning: Body selector not found on {url}.") | |
| # Check if there's a "JavaScript required" message | |
| content_check = await page.content() | |
| if "javascript" in content_check.lower() and "enable" in content_check.lower(): | |
| print(f" Warning: Page may require JavaScript to be enabled: {url}") | |
| # Try scrolling to trigger lazy-loaded content | |
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| await asyncio.sleep(1) | |
| await page.evaluate("window.scrollTo(0, 0)") | |
| await asyncio.sleep(1) | |
| page_content = await page.content() | |
| soup = BeautifulSoup(page_content, 'html.parser') | |
| # Remove unwanted elements | |
| for el in soup(['script', 'style', 'header', 'footer', 'nav', 'aside']): | |
| el.decompose() | |
| page_text = soup.get_text(separator=' ', strip=True) | |
| # If page text is suspiciously short, it might be a JS-heavy page | |
| if len(page_text.strip()) < 100: | |
| print(f" Warning: Very little text content found ({len(page_text)} chars). Page may be JS-dependent.") | |
| # Try getting inner text via JavaScript | |
| try: | |
| js_text = await page.evaluate("document.body.innerText") | |
| if len(js_text) > len(page_text): | |
| print(f" -> Using JavaScript-extracted text instead ({len(js_text)} chars)") | |
| page_text = js_text | |
| except Exception as e: | |
| print(f" -> Could not extract text via JavaScript: {e}") | |
| return page_text, soup, is_pdf | |
| except Exception as e: | |
| if "Download is starting" in str(e): | |
| print(f"-> Page forced a download for {url}. Capturing the file...") | |
| is_pdf = True | |
| download = await page.wait_for_event("download", timeout=NAVIGATION_TIMEOUT_MS) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| await download.save_as(tmp_file.name) | |
| tmp_file_path = tmp_file.name | |
| with open(tmp_file_path, "rb") as f: | |
| pdf_bytes = f.read() | |
| os.unlink(tmp_file_path) | |
| doc = pymupdf.open(stream=pdf_bytes, filetype="pdf") | |
| for p in doc: | |
| page_text += p.get_text() + "\n" | |
| doc.close() | |
| return page_text, None, True | |
| else: | |
| print(f" Error: Failed to get content for {url}. Reason: {e}") | |
| return None, None, None | |
| def _find_child_links(self, soup: BeautifulSoup, base_url: str) -> List[str]: | |
| """Finds, filters, and resolves all valid child links on a page across any domain.""" | |
| links = [] | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'].strip() | |
| if href.lower().startswith(('mailto:', 'tel:')): | |
| continue | |
| absolute_url = urljoin(base_url, href) | |
| absolute_url = urldefrag(absolute_url).url | |
| if absolute_url.startswith(('http://', 'https://')): | |
| links.append(absolute_url) | |
| return list(set(links)) | |
| async def _analyze_content(self, page_text: str) -> LinkOverview: | |
| """Sends page text to the LLM for structured analysis.""" | |
| llm_prompt = f""" | |
| You are an expert visa information analyst. Your task is to meticulously analyze the following web page content (`page_text`) and extract all visa-related information into a structured format based on the provided `LinkOverview` schema. | |
| **Core Directives:** | |
| 1. **Comprehensive Analysis:** If the text describes multiple visa types (e.g., Tourist, Business, Student), you must identify and describe each one completely. Do not merge or generalize information. | |
| 2. **Self-Contained Output:** The generated response must be exhaustive and self-contained. Extract all relevant details so the user does not need to visit the original webpage. Never instruct the user to "visit the link for more details." | |
| 3. **Complete Data Extraction:** For *every* visa type mentioned, you must extract its specific Service Level Agreement (SLA), price, and a full list of required documents. If information for a specific field is not mentioned for a visa type, explicitly state that. | |
| **Field-Specific Formatting and Instructions:** | |
| * **`summary`**: Provide a brief overview of all the visa services described on the page. | |
| * **`details`**: If there are multiple visa types, use this field to provide a detailed breakdown for each one. Use clear headings for each visa type (e.g., "**Tourist Visa (Subclass 600)**"). | |
| * **`SLA`**: Clearly list the processing time for each visa type mentioned. For example: "Tourist Visa: 15-20 business days; Business Visa: 10-15 business days." | |
| * **`price`**: Clearly list the fees for each visa type mentioned. For example: "Tourist Visa: $150; Business Visa: $180." | |
| * **`required_docs`**: This is a critical field. The information must be concise, clear, and follow the exact format below. Synthesize conditions and exceptions into the bullet points. | |
| * Do not just copy-paste text. Summarize requirements intelligently (e.g., specify if documents need translation, if physical copies are needed, or note exceptions for minors). | |
| * Use this strict format: | |
| ```text | |
| **[Visa Type Name 1]** | |
| Required: | |
| - Passport with at least 6 months validity | |
| - Physical bank statement from the last 3 months (must be translated if not in English) | |
| - Signed consent letter from both parents (for applicants under 18 traveling alone) | |
| Optional: | |
| - Hotel booking confirmation | |
| - Travel insurance | |
| **[Visa Type Name 2]** | |
| Required: | |
| - [Document 1 with specific conditions] | |
| - [Document 2 with specific conditions] | |
| Optional: | |
| - [Optional Document 1] | |
| ``` | |
| * **`status`**: This is crucial. If unable to load the complete info from the page, including JS enabled or timeout issue, set to `'FAILED'`. | |
| Set to `'RELEVANT'` if the page contains any visa-related information. | |
| If the page is loaded completely and unrelated to visas (e.g., a privacy policy, a different product), set this to `'IRRELEVANT'`. If `'IRRELEVANT'`, you can leave other fields empty. | |
| **Analyze the following web page content and generate the structured data:** | |
| {page_text}""" | |
| try: | |
| print(f" Sending {len(page_text)} chars to GenAI for analysis...") | |
| # This is where you call your actual AI model client | |
| llm_result = await self.model.formated_prompt( | |
| prompt=llm_prompt, | |
| response_schema=LinkOverview | |
| ) | |
| if llm_result and llm_result.get("parsed"): | |
| print(" GenAI analysis successful.") | |
| # Pydantic automatically validates and converts the string 'RELEVANT' to Status.RELEVANT | |
| overview = LinkOverview.model_validate(llm_result["parsed"]) | |
| else: | |
| print(" Warning: GenAI call succeeded but returned no structured data.") | |
| overview = LinkOverview( | |
| summary="Content analysis failed: The AI model returned an empty or unparsable response.", | |
| status=Status.FAILED | |
| ) | |
| except Exception as e: | |
| print(f"Error in GenAI structured extraction: {e}") | |
| overview = LinkOverview( | |
| summary=f"Content analysis failed with an error: {e}", | |
| status=Status.FAILED | |
| ) | |
| return overview |