Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| from typing import Dict, Optional, List | |
| from playwright.async_api import async_playwright | |
| import pandas as pd | |
| from constant import COUNTRY_CODES, REVERSE_COUNTRY_CODES | |
| class PassportIndexVisaScraper: | |
| def __init__(self, debug: bool = True): | |
| """ | |
| Initialize the Passport Index visa scraper using Playwright | |
| Args: | |
| debug: Enable debug output | |
| """ | |
| self.base_url = "https://www.passportindex.org/travel-visa-checker/" | |
| self.api_url = "https://www.passportindex.org/core/visachecker.php" | |
| self.debug = debug | |
| self.browser = None | |
| self.context = None | |
| self.page = None | |
| async def __aenter__(self): | |
| """Initialize browser with stealth mode""" | |
| self.playwright = await async_playwright().start() | |
| # Launch browser with stealth settings | |
| self.browser = await self.playwright.chromium.launch( | |
| headless=False, # Using headless mode | |
| args=[ | |
| '--disable-blink-features=AutomationControlled', | |
| '--disable-dev-shm-usage', | |
| '--no-sandbox', | |
| '--disable-setuid-sandbox', | |
| '--disable-web-security', | |
| '--disable-features=IsolateOrigins,site-per-process' | |
| ] | |
| ) | |
| # Create context with realistic settings | |
| self.context = await self.browser.new_context( | |
| viewport={'width': 1920, 'height': 1080}, | |
| user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36', | |
| locale='en-US', | |
| timezone_id='America/New_York' | |
| ) | |
| self.page = await self.context.new_page() | |
| # Add stealth JavaScript to avoid detection | |
| await self.page.add_init_script(""" | |
| // Override the navigator.webdriver property | |
| Object.defineProperty(navigator, 'webdriver', { | |
| get: () => undefined | |
| }); | |
| // Override chrome property | |
| window.chrome = { | |
| runtime: {} | |
| }; | |
| // Override permissions | |
| const originalQuery = window.navigator.permissions.query; | |
| window.navigator.permissions.query = (parameters) => ( | |
| parameters.name === 'notifications' ? | |
| Promise.resolve({ state: Notification.permission }) : | |
| originalQuery(parameters) | |
| ); | |
| """) | |
| if self.debug: | |
| print("π Browser initialized with stealth mode") | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Clean up browser resources""" | |
| if self.page: | |
| await self.page.close() | |
| if self.context: | |
| await self.context.close() | |
| if self.browser: | |
| await self.browser.close() | |
| if self.playwright: | |
| await self.playwright.stop() | |
| if self.debug: | |
| print("π Browser closed") | |
| async def initialize_session(self) -> bool: | |
| """ | |
| Navigate to the website and wait for it to load properly | |
| """ | |
| try: | |
| if self.debug: | |
| print("π± Initializing session...") | |
| # Navigate to the page | |
| try: | |
| response = await self.page.goto( | |
| self.base_url, | |
| wait_until='domcontentloaded', | |
| timeout=30000 | |
| ) | |
| await self.page.wait_for_timeout(3000) | |
| # Get the cl value from the page | |
| cl_value = await self.page.evaluate(""" | |
| () => { | |
| const clInput = document.querySelector('#cl'); | |
| return clInput ? clInput.value : 'bc2140a2d83928ce1112d01e610bad89'; | |
| } | |
| """) | |
| if self.debug: | |
| print(f"β Page loaded, session ID: {cl_value}") | |
| return True | |
| except Exception as e: | |
| if self.debug: | |
| print(f"β οΈ Page load issue: {e}, continuing anyway...") | |
| return True | |
| except Exception as e: | |
| print(f"β Error initializing session: {e}") | |
| return False | |
| async def check_visa_requirement_browser(self, passport_country: str, destination_country: str) -> Optional[Dict]: | |
| """ | |
| Check visa requirements using browser automation | |
| Args: | |
| passport_country: Two-letter country code for passport | |
| destination_country: Two-letter country code for destination | |
| Returns: | |
| Dictionary with visa information or None if failed | |
| """ | |
| if destination_country.lower() == passport_country.lower(): | |
| return { | |
| "text": "Same country" | |
| } | |
| try: | |
| if self.debug: | |
| print(f"π Checking {passport_country.upper()} β {destination_country.upper()}") | |
| # Get the current session ID from the page | |
| cl_value = await self.page.evaluate(""" | |
| () => { | |
| const clInput = document.querySelector('#cl'); | |
| return clInput ? clInput.value : 'bc2140a2d83928ce1112d01e610bad89'; | |
| } | |
| """) | |
| # Make the API request through the browser with proper argument passing | |
| result = await self.page.evaluate(""" | |
| async (args) => { | |
| const [passport, destination, sessionId] = args; | |
| const formData = new URLSearchParams(); | |
| formData.append('d', destination); | |
| formData.append('s', passport); | |
| formData.append('cl', sessionId); | |
| try { | |
| const response = await fetch('https://www.passportindex.org/core/visachecker.php', { | |
| method: 'POST', | |
| headers: { | |
| 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', | |
| 'X-Requested-With': 'XMLHttpRequest', | |
| 'Accept': 'application/json, text/javascript, */*; q=0.01' | |
| }, | |
| body: formData.toString(), | |
| credentials: 'include' | |
| }); | |
| if (!response.ok) { | |
| throw new Error(`HTTP ${response.status}`); | |
| } | |
| const data = await response.json(); | |
| return data; | |
| } catch (error) { | |
| return { error: error.message }; | |
| } | |
| } | |
| """, [passport_country.lower(), destination_country.lower(), cl_value]) | |
| if result and 'error' not in result: | |
| if self.debug: | |
| print(f"β Got result: {result}") | |
| return result | |
| elif result and 'error' in result: | |
| print(f"β API Error: {result['error']}") | |
| return None | |
| else: | |
| return None | |
| except Exception as e: | |
| print(f"β Error checking visa requirement: {e}") | |
| return None | |
| async def check_visa_interactive(self, passport_country: str, destination_country: str) -> Optional[Dict]: | |
| """ | |
| Alternative method: Use the interactive UI to check visa requirements | |
| """ | |
| try: | |
| if self.debug: | |
| print(f"π±οΈ Using interactive method for {passport_country.upper()} β {destination_country.upper()}") | |
| # Click on the passport selector | |
| await self.page.click('.vch-select-pass') | |
| await self.page.wait_for_timeout(500) | |
| # Find and click the country in the list | |
| passport_selector = f'.vch-passports .s-div[data-ccode="{passport_country.lower()}"]' | |
| await self.page.wait_for_selector(passport_selector, timeout=5000) | |
| await self.page.click(passport_selector) | |
| await self.page.wait_for_timeout(500) | |
| # Click on the destination selector | |
| await self.page.click('.vch-select-des') | |
| await self.page.wait_for_timeout(500) | |
| # Find and click the destination country | |
| dest_selector = f'.vch-destinations .s-div[data-ccode="{destination_country.lower()}"]' | |
| await self.page.wait_for_selector(dest_selector, timeout=5000) | |
| await self.page.click(dest_selector) | |
| await self.page.wait_for_timeout(1000) | |
| # Get the result from the page | |
| result = await self.page.evaluate(""" | |
| () => { | |
| const resultElement = document.querySelector('.vch-result'); | |
| if (resultElement) { | |
| const text = resultElement.querySelector('.text'); | |
| const days = resultElement.querySelector('.days'); | |
| return { | |
| text: text ? text.textContent : '', | |
| days: days ? days.textContent : '', | |
| pass: '""" + passport_country.lower() + """', | |
| dest: '""" + destination_country.upper() + """' | |
| }; | |
| } | |
| return null; | |
| } | |
| """) | |
| return result | |
| except Exception as e: | |
| if self.debug: | |
| print(f"β Interactive method failed: {e}") | |
| return None | |
| async def check_multiple_destinations(self, passport_country: str, destinations: List[str], delay: float = 2.0) -> Dict: | |
| """ | |
| Check visa requirements for multiple destinations | |
| Args: | |
| passport_country: Two-letter country code for passport | |
| destinations: List of two-letter country codes for destinations | |
| delay: Delay between requests in seconds | |
| Returns: | |
| Dictionary mapping destination codes to visa information | |
| """ | |
| results = {} | |
| for i, dest in enumerate(destinations, 1): | |
| print(f"\n[{i}/{len(destinations)}] Checking {passport_country.upper()} β {dest.upper()}...") | |
| # Try API method first | |
| result = await self.check_visa_requirement_browser(passport_country, dest) | |
| # If API fails, try interactive method | |
| if not result: | |
| result = await self.check_visa_interactive(passport_country, dest) | |
| if result: | |
| results[dest] = result | |
| text = result.get('text', 'No text available') | |
| print(f" β Result: {text}") | |
| else: | |
| results[dest] = None | |
| print(f" β Failed to get result") | |
| # Rate limiting | |
| if i < len(destinations): | |
| print(f" β³ Waiting {delay} seconds...") | |
| await asyncio.sleep(delay) | |
| return results | |
| async def check_multiple_source(self, passport_countries: List[str], destination: str, delay: float = 2.0) -> Dict: | |
| """ | |
| Check visa requirements for multiple passport countries to a single destination. | |
| Args: | |
| passport_countries: List of two-letter country codes for passports. | |
| destination: Two-letter country code for the destination. | |
| delay: Delay between requests in seconds. | |
| Returns: | |
| Dictionary mapping passport country codes to visa information. | |
| """ | |
| results = {} | |
| for i, passport in enumerate(passport_countries, 1): | |
| print(f"\n[{i}/{len(passport_countries)}] Checking {passport.upper()} β {destination.upper()}...") | |
| # Try API method first | |
| result = await self.check_visa_requirement_browser(passport, destination) | |
| # If API fails, try interactive method | |
| if not result: | |
| result = await self.check_visa_interactive(passport, destination) | |
| if result: | |
| results[passport] = result | |
| text = result.get('text', 'No text available') | |
| print(f" β Result: {text}") | |
| else: | |
| results[passport] = None | |
| print(f" β Failed to get result") | |
| # Rate limiting to avoid blocking | |
| if i < len(passport_countries): | |
| print(f" β³ Waiting {delay} seconds...") | |
| await asyncio.sleep(delay) | |
| return results | |
| def format_result(self, result: Dict) -> str: | |
| """Format a single result for display""" | |
| if not result: | |
| return "No information available" | |
| text = result.get('text', 'N/A') | |
| dest = result.get('dest', 'N/A') | |
| passport = result.get('pass', 'N/A') | |
| return f"{passport.upper()} β {dest.upper()}: {text}" | |
| async def main(): | |
| """Main function to demonstrate usage""" | |
| print("="*60) | |
| print(" Passport Index Visa Checker (Playwright)") | |
| print("="*60) | |
| async with PassportIndexVisaScraper(debug=True) as scraper: | |
| # Initialize session | |
| if not await scraper.initialize_session(): | |
| print("β Failed to initialize session") | |
| return | |
| print("\n" + "="*60) | |
| print(" Testing visa requirements...") | |
| print("="*60) | |
| # Test single visa requirement | |
| print("\nπ Single visa check: US β GB") | |
| print("-" * 40) | |
| result = await scraper.check_visa_requirement_browser('us', 'gb') | |
| if result: | |
| print(f"Result: {scraper.format_result(result)}") | |
| else: | |
| print("Trying interactive method...") | |
| result = await scraper.check_visa_interactive('us', 'gb') | |
| if result: | |
| print(f"Result: {scraper.format_result(result)}") | |
| # Test multiple destinations | |
| print("\nπ Multiple destinations for US passport:") | |
| print("-" * 40) | |
| destinations = ['ca', 'mx', 'jp', 'au'] # Canada, Mexico, Japan, Australia | |
| results = await scraper.check_multiple_destinations('us', destinations, delay=2.0) | |
| print("\nπ Summary:") | |
| for dest, result in results.items(): | |
| if result: | |
| print(f" β {scraper.format_result(result)}") | |
| else: | |
| print(f" β US β {dest.upper()}: Failed") | |
| async def indo(): | |
| print("="*60) | |
| print(" Passport Index Visa Checker (Playwright)") | |
| print("="*60) | |
| async with PassportIndexVisaScraper(debug=True) as scraper: | |
| l = [] | |
| if not await scraper.initialize_session(): | |
| print("β Failed to initialize session") | |
| return | |
| destinations = list(COUNTRY_CODES.values()) | |
| results = await scraper.check_multiple_destinations('id', destinations, delay=2.0) | |
| for dest, result in results.items(): | |
| if result: | |
| dic = {} | |
| dic['text'] = result.get('text', 'N/A') | |
| dic['days'] = result.get('days', 'N/A') | |
| dic['pass'] = result.get('pass', 'N/A') | |
| dic['dest'] = REVERSE_COUNTRY_CODES[dest] | |
| dic['source'] = 'Indonesia' | |
| l.append(dic) | |
| print(f"ID -> {dest.upper()}: {result.get('text', 'N/A')}") | |
| else: | |
| print(f" β US β {dest.upper()}: Failed") | |
| results = await scraper.check_multiple_source(destinations, 'id', delay=2.0) | |
| for passport, result in results.items(): | |
| if result: | |
| dic = {} | |
| dic['text'] = result.get('text', 'N/A') | |
| dic['days'] = result.get('days', 'N/A') | |
| dic['pass'] = result.get('pass', 'N/A') | |
| dic['dest'] = REVERSE_COUNTRY_CODES['id'] | |
| dic['source'] = REVERSE_COUNTRY_CODES[passport] | |
| l.append(dic) | |
| print(f"{passport.upper()} -> ID: {result.get('text', 'N/A')}") | |
| else: | |
| print(f" β {passport.upper()} β ID: Failed") | |
| # save to csv | |
| df = pd.DataFrame(l) | |
| df.to_csv('visa_avaibility_playwright.csv', index=False) | |
| async def asean(): | |
| print("="*60) | |
| print(" Passport Index Visa Checker (Playwright)") | |
| print("="*60) | |
| async with PassportIndexVisaScraper(debug=True) as scraper: | |
| l = [] | |
| if not await scraper.initialize_session(): | |
| print("β Failed to initialize session") | |
| return | |
| mains = ['kh', 'th', 'vn', 'la', 'mm', 'tl', 'ph', 'bn', 'my'] | |
| destinations = list(COUNTRY_CODES.values()) | |
| for main in mains: | |
| results = await scraper.check_multiple_destinations(main, destinations, delay=2.0) | |
| for dest, result in results.items(): | |
| if result: | |
| dic = {} | |
| dic['text'] = result.get('text', 'N/A') | |
| dic['days'] = result.get('days', 'N/A') | |
| dic['pass'] = result.get('pass', 'N/A') | |
| dic['dest'] = REVERSE_COUNTRY_CODES[dest] | |
| dic['source'] = REVERSE_COUNTRY_CODES[main] | |
| l.append(dic) | |
| print(f"{main.upper()} -> {dest.upper()}: {result.get('text', 'N/A')}") | |
| else: | |
| print(f" β {main.upper()} β {dest.upper()}: Failed") | |
| results = await scraper.check_multiple_source(destinations, main, delay=2.0) | |
| for passport, result in results.items(): | |
| if result: | |
| dic = {} | |
| dic['text'] = result.get('text', 'N/A') | |
| dic['days'] = result.get('days', 'N/A') | |
| dic['pass'] = result.get('pass', 'N/A') | |
| dic['dest'] = REVERSE_COUNTRY_CODES[main] | |
| dic['source'] = REVERSE_COUNTRY_CODES[passport] | |
| l.append(dic) | |
| print(f"{passport.upper()} -> {main.upper()}: {result.get('text', 'N/A')}") | |
| else: | |
| print(f" β {passport.upper()} β {main.upper()}: Failed") | |
| # save to csv | |
| df = pd.DataFrame(l) | |
| df.to_csv('asean_visa_avaibility_playwright.csv', index=False) | |
| if __name__ == "__main__": | |
| asyncio.run(asean()) | |