Spaces:
Sleeping
Sleeping
File size: 17,410 Bytes
40c79b0 b7ad199 40c79b0 b7ad199 40c79b0 b7ad199 40c79b0 5705468 40c79b0 5705468 40c79b0 5705468 40c79b0 5705468 40c79b0 5705468 40c79b0 5705468 40c79b0 5705468 40c79b0 5705468 40c79b0 4904b99 40c79b0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 | import asyncio
import os
import tempfile
from playwright.async_api import async_playwright, TimeoutError
from pydantic import BaseModel, Field
from typing import Optional, List
from bs4 import BeautifulSoup
import pymupdf
from urllib.parse import urljoin, urlparse, urldefrag
from playwright_stealth import Stealth
from enum import StrEnum
class Status(StrEnum):
RELEVANT = "RELEVANT"
IRRELEVANT = "IRRELEVANT"
FAILED = "FAILED"
class LinkOverview(BaseModel):
summary: str = Field(..., description="A brief summary of the link's content, maximum 1 paragraph.")
SLA: Optional[str] = Field(None, description="Service Level Agreement in days of visa, if mentioned.")
required_docs: Optional[str] = Field(None, description="List of required documents as text, if mentioned.")
price: Optional[str] = Field(None, description="Price or fee information, if mentioned.")
details: Optional[str] = Field(None, description="Additional details about the link's content. 3-5 paragraphs max.")
status: Status = Field(Status.FAILED, description="Overall status of the analysis.")
class LinkNode(BaseModel):
href: str = Field(..., description="The URL of the link")
overview: LinkOverview = Field(..., description="Summary and details about the link's content")
parent: Optional[str] = Field(None, description="The parent link, where this link was found (source).")
child: List[str] = Field(..., description="List of links found on this page")
depth: int = Field(..., description="Depth level in the link hierarchy (0=root, 1=child of root, etc.)")
raw_text: Optional[str] = None # Field to store scraped text before analysis
class BrowserAgent:
def __init__(self, model: any, max_depth: int = 2):
self.model = model
self.max_depth = max_depth
self.link_map = {}
self.browser = None
self.context = None
self.stealth_manager = None
async def __aenter__(self):
"""Initializes the browser using the new Stealth context manager pattern."""
self.stealth_manager = Stealth().use_async(async_playwright())
self.p = await self.stealth_manager.__aenter__()
self.browser = await self.p.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process'
]
)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={'width': 1920, 'height': 1080},
locale='en-US',
timezone_id='America/New_York'
)
print("🚀 Browser agent initialized with Stealth API.")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Closes the browser and cleans up all resources in reverse order."""
if self.context: await self.context.close()
if self.browser: await self.browser.close()
if self.stealth_manager: await self.stealth_manager.__aexit__(exc_type, exc_val, exc_tb)
print("✅ Browser agent shut down gracefully.")
async def run(self, start_url: str):
"""Public method to start the full process: crawl then analyze."""
if not start_url.startswith("http"):
start_url = "http://" + start_url
print(f"Starting crawl from: {start_url}")
await self._explore(url=start_url, depth=0, parent_url=None)
print("\n--- Crawl Complete. Starting AI Analysis ---")
await self.analyze_map()
return self.link_map
async def _explore(self, url: str, depth: int, parent_url: Optional[str]):
"""Recursively scrapes text and finds links, without calling the LLM."""
url = urldefrag(url).url
if url in self.link_map or depth > self.max_depth:
return
print(f"Scraping URL: {url} (Depth: {depth})")
# Initialize with a default 'FAILED' status, which will be updated upon success
overview = LinkOverview(summary="Pending analysis...", status=Status.FAILED)
self.link_map[url] = LinkNode(href=url, overview=overview, parent=parent_url, child=[], depth=depth)
page = await self.context.new_page()
await page.add_init_script("""
// Override the navigator.webdriver property
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Override chrome property
window.chrome = {
runtime: {}
};
// Override permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
try:
content, soup, is_pdf = await self._get_page_content(page, url)
if content is not None:
self.link_map[url].raw_text = content
# The status is still pending; the AI will determine RELEVANT/IRRELEVANT later.
# We can leave the default status as FAILED for now, as it indicates a failure in the *overall* process until the AI succeeds.
is_relevant_for_crawl = "visa" in content.lower()
if not is_pdf and is_relevant_for_crawl:
child_links = self._find_child_links(soup, url)
self.link_map[url].child = child_links
tasks = [self._explore(link, depth + 1, url) for link in child_links if link not in self.link_map]
if tasks:
await asyncio.gather(*tasks)
else:
# Content retrieval failed, so we finalize the status as FAILED.
self.link_map[url].overview.summary = "Failed to retrieve or process page content."
self.link_map[url].overview.status = Status.FAILED # Explicitly confirm failure
finally:
await page.close()
async def analyze_map(self):
"""Iterates through the completed map and sends content to the LLM for analysis."""
tasks = []
for url, node in self.link_map.items():
if node.raw_text and node.overview.summary == "Pending analysis...":
tasks.append(self.analyze_node(url))
if tasks:
print(f"Found {len(tasks)} pages to analyze with the LLM...")
await asyncio.gather(*tasks)
async def analyze_node(self, url: str):
"""Helper function to analyze a single node."""
print(f" Analyzing content for: {url}")
node = self.link_map[url]
overview = await self._analyze_content(node.raw_text)
node.overview = overview
node.raw_text = None # Optional: clear text after analysis to save memory
async def _get_page_content(self, page, url: str):
"""Navigates to a URL and extracts its text content, handling various scenarios."""
NAVIGATION_TIMEOUT_MS = 60000 # Increased timeout
try:
page_text = ""
soup = None
is_pdf = False
# Navigate with a longer timeout and wait for domcontentloaded first
response = await page.goto(url, wait_until="domcontentloaded", timeout=NAVIGATION_TIMEOUT_MS)
if not response:
raise Exception("No response from server.")
content_type = response.headers.get("content-type", "").lower()
if "application/pdf" in content_type:
is_pdf = True
print("-> PDF detected (in-browser viewer)...")
pdf_bytes = await response.body()
doc = pymupdf.open(stream=pdf_bytes, filetype="pdf")
for p in doc:
page_text += p.get_text() + "\n"
doc.close()
else:
print("-> HTML detected...")
try:
# Wait for network to be idle
await page.wait_for_load_state("networkidle", timeout=15000)
except TimeoutError:
print(f" Warning: Timed out waiting for 'networkidle' on {url}.")
# Additional wait for JavaScript to execute
try:
await page.wait_for_load_state("load", timeout=10000)
except TimeoutError:
print(f" Warning: Timed out waiting for 'load' state on {url}.")
# Give extra time for dynamic content to render
await asyncio.sleep(2)
# Try to wait for body content to be present
try:
await page.wait_for_selector('body', timeout=5000)
except TimeoutError:
print(f" Warning: Body selector not found on {url}.")
# Check if there's a "JavaScript required" message
content_check = await page.content()
if "javascript" in content_check.lower() and "enable" in content_check.lower():
print(f" Warning: Page may require JavaScript to be enabled: {url}")
# Try scrolling to trigger lazy-loaded content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(1)
await page.evaluate("window.scrollTo(0, 0)")
await asyncio.sleep(1)
page_content = await page.content()
soup = BeautifulSoup(page_content, 'html.parser')
# Remove unwanted elements
for el in soup(['script', 'style', 'header', 'footer', 'nav', 'aside']):
el.decompose()
page_text = soup.get_text(separator=' ', strip=True)
# If page text is suspiciously short, it might be a JS-heavy page
if len(page_text.strip()) < 100:
print(f" Warning: Very little text content found ({len(page_text)} chars). Page may be JS-dependent.")
# Try getting inner text via JavaScript
try:
js_text = await page.evaluate("document.body.innerText")
if len(js_text) > len(page_text):
print(f" -> Using JavaScript-extracted text instead ({len(js_text)} chars)")
page_text = js_text
except Exception as e:
print(f" -> Could not extract text via JavaScript: {e}")
return page_text, soup, is_pdf
except Exception as e:
if "Download is starting" in str(e):
print(f"-> Page forced a download for {url}. Capturing the file...")
is_pdf = True
download = await page.wait_for_event("download", timeout=NAVIGATION_TIMEOUT_MS)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
await download.save_as(tmp_file.name)
tmp_file_path = tmp_file.name
with open(tmp_file_path, "rb") as f:
pdf_bytes = f.read()
os.unlink(tmp_file_path)
doc = pymupdf.open(stream=pdf_bytes, filetype="pdf")
for p in doc:
page_text += p.get_text() + "\n"
doc.close()
return page_text, None, True
else:
print(f" Error: Failed to get content for {url}. Reason: {e}")
return None, None, None
def _find_child_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
"""Finds, filters, and resolves all valid child links on a page across any domain."""
links = []
for link in soup.find_all('a', href=True):
href = link['href'].strip()
if href.lower().startswith(('mailto:', 'tel:')):
continue
absolute_url = urljoin(base_url, href)
absolute_url = urldefrag(absolute_url).url
if absolute_url.startswith(('http://', 'https://')):
links.append(absolute_url)
return list(set(links))
async def _analyze_content(self, page_text: str) -> LinkOverview:
"""Sends page text to the LLM for structured analysis."""
llm_prompt = f"""
You are an expert visa information analyst. Your task is to meticulously analyze the following web page content (`page_text`) and extract all visa-related information into a structured format based on the provided `LinkOverview` schema.
**Core Directives:**
1. **Comprehensive Analysis:** If the text describes multiple visa types (e.g., Tourist, Business, Student), you must identify and describe each one completely. Do not merge or generalize information.
2. **Self-Contained Output:** The generated response must be exhaustive and self-contained. Extract all relevant details so the user does not need to visit the original webpage. Never instruct the user to "visit the link for more details."
3. **Complete Data Extraction:** For *every* visa type mentioned, you must extract its specific Service Level Agreement (SLA), price, and a full list of required documents. If information for a specific field is not mentioned for a visa type, explicitly state that.
**Field-Specific Formatting and Instructions:**
* **`summary`**: Provide a brief overview of all the visa services described on the page.
* **`details`**: If there are multiple visa types, use this field to provide a detailed breakdown for each one. Use clear headings for each visa type (e.g., "**Tourist Visa (Subclass 600)**").
* **`SLA`**: Clearly list the processing time for each visa type mentioned. For example: "Tourist Visa: 15-20 business days; Business Visa: 10-15 business days."
* **`price`**: Clearly list the fees for each visa type mentioned. For example: "Tourist Visa: $150; Business Visa: $180."
* **`required_docs`**: This is a critical field. The information must be concise, clear, and follow the exact format below. Synthesize conditions and exceptions into the bullet points.
* Do not just copy-paste text. Summarize requirements intelligently (e.g., specify if documents need translation, if physical copies are needed, or note exceptions for minors).
* Use this strict format:
```text
**[Visa Type Name 1]**
Required:
- Passport with at least 6 months validity
- Physical bank statement from the last 3 months (must be translated if not in English)
- Signed consent letter from both parents (for applicants under 18 traveling alone)
Optional:
- Hotel booking confirmation
- Travel insurance
**[Visa Type Name 2]**
Required:
- [Document 1 with specific conditions]
- [Document 2 with specific conditions]
Optional:
- [Optional Document 1]
```
* **`status`**: This is crucial. If unable to load the complete info from the page, including JS enabled or timeout issue, set to `'FAILED'`.
Set to `'RELEVANT'` if the page contains any visa-related information.
If the page is loaded completely and unrelated to visas (e.g., a privacy policy, a different product), set this to `'IRRELEVANT'`. If `'IRRELEVANT'`, you can leave other fields empty.
**Analyze the following web page content and generate the structured data:**
{page_text}"""
try:
print(f" Sending {len(page_text)} chars to GenAI for analysis...")
# This is where you call your actual AI model client
llm_result = await self.model.formated_prompt(
prompt=llm_prompt,
response_schema=LinkOverview
)
if llm_result and llm_result.get("parsed"):
print(" GenAI analysis successful.")
# Pydantic automatically validates and converts the string 'RELEVANT' to Status.RELEVANT
overview = LinkOverview.model_validate(llm_result["parsed"])
else:
print(" Warning: GenAI call succeeded but returned no structured data.")
overview = LinkOverview(
summary="Content analysis failed: The AI model returned an empty or unparsable response.",
status=Status.FAILED
)
except Exception as e:
print(f"Error in GenAI structured extraction: {e}")
overview = LinkOverview(
summary=f"Content analysis failed with an error: {e}",
status=Status.FAILED
)
return overview |