Spaces:
Paused
Paused
| import json | |
| import asyncio | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI | |
| from fastapi.responses import Response | |
| from playwright.async_api import async_playwright | |
| import httpx | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| # 全局变量 | |
| browser = None | |
| page = None | |
| context = None | |
| config = None | |
| scheduler = AsyncIOScheduler() | |
| # WARP 代理地址 | |
| PROXY_SERVER = "socks5://127.0.0.1:1080" | |
| def load_config(): | |
| """加载配置文件""" | |
| with open("config.json", "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def load_cookies(): | |
| """加载并转换cookies格式""" | |
| try: | |
| with open("cookies.json", "r", encoding="utf-8") as f: | |
| raw_cookies = json.load(f) | |
| except FileNotFoundError: | |
| return [] | |
| if not raw_cookies: | |
| return [] | |
| converted_cookies = [] | |
| for cookie in raw_cookies: | |
| new_cookie = { | |
| "name": cookie.get("name"), | |
| "value": cookie.get("value"), | |
| "domain": cookie.get("domain"), | |
| "path": cookie.get("path", "/"), | |
| } | |
| if "expirationDate" in cookie: | |
| new_cookie["expires"] = cookie["expirationDate"] | |
| elif "expires" in cookie: | |
| new_cookie["expires"] = cookie["expires"] | |
| same_site = cookie.get("sameSite", "").lower() | |
| if same_site in ["strict", "lax", "none"]: | |
| new_cookie["sameSite"] = same_site.capitalize() | |
| if same_site == "none": | |
| new_cookie["sameSite"] = "None" | |
| elif same_site == "unspecified" or same_site == "no_restriction": | |
| new_cookie["sameSite"] = "None" | |
| else: | |
| new_cookie["sameSite"] = "Lax" | |
| new_cookie["secure"] = cookie.get("secure", False) | |
| new_cookie["httpOnly"] = cookie.get("httpOnly", False) | |
| if new_cookie["sameSite"] == "None": | |
| new_cookie["secure"] = True | |
| converted_cookies.append(new_cookie) | |
| return converted_cookies | |
| async def activate_task(): | |
| """定时激活任务(通过代理)""" | |
| if config and config.get("activateLink"): | |
| try: | |
| async with httpx.AsyncClient(proxy=PROXY_SERVER, timeout=30) as client: | |
| response = await client.get(config["activateLink"]) | |
| print(f"[Activate] GET {config['activateLink']} - Status: {response.status_code}") | |
| except Exception as e: | |
| print(f"[Activate] Error: {e}") | |
| async def auto_click_task(): | |
| """自动点击指定按钮""" | |
| global page, config | |
| if not page: | |
| return | |
| click_buttons = config.get("autoClickButtons", ["Got it", "GOT IT", "Accept", "OK", "Dismiss", "Close"]) | |
| try: | |
| for text in click_buttons: | |
| selectors = [ | |
| f'mat-dialog-actions button:has-text("{text}")', | |
| f'mat-dialog-actions button.ms-button-primary', | |
| f'.cdk-overlay-container button:has-text("{text}")', | |
| f'.cdk-overlay-container button.ms-button-primary', | |
| f'.cdk-overlay-pane button:has-text("{text}")', | |
| f'.mat-mdc-dialog-actions button:has-text("{text}")', | |
| f'button.ms-button-primary:has-text("{text}")', | |
| f'button:has-text("{text}")', | |
| ] | |
| for selector in selectors: | |
| try: | |
| locator = page.locator(selector) | |
| count = await locator.count() | |
| if count > 0: | |
| element = locator.first | |
| if await element.is_visible(timeout=300): | |
| await element.click(timeout=3000, force=True) | |
| print(f'[AutoClick] Clicked "{text}"') | |
| return | |
| except: | |
| continue | |
| # JavaScript 备用 | |
| try: | |
| clicked = await page.evaluate('''(searchText) => { | |
| const dialogActions = document.querySelector('mat-dialog-actions'); | |
| if (dialogActions) { | |
| const buttons = dialogActions.querySelectorAll('button'); | |
| for (const btn of buttons) { | |
| if (btn.textContent.trim().toLowerCase() === searchText.toLowerCase()) { | |
| btn.click(); | |
| return true; | |
| } | |
| } | |
| } | |
| const overlay = document.querySelector('.cdk-overlay-container'); | |
| if (overlay) { | |
| const buttons = overlay.querySelectorAll('button'); | |
| for (const btn of buttons) { | |
| if (btn.textContent.trim().toLowerCase() === searchText.toLowerCase()) { | |
| btn.click(); | |
| return true; | |
| } | |
| } | |
| } | |
| return false; | |
| }''', text) | |
| if clicked: | |
| print(f'[AutoClick] Clicked "{text}" via JS') | |
| return | |
| except: | |
| pass | |
| except: | |
| pass | |
| async def init_browser(): | |
| """初始化浏览器(使用 WARP 代理)""" | |
| global browser, page, context, config | |
| config = load_config() | |
| cookies = load_cookies() | |
| playwright = await async_playwright().start() | |
| # 使用 WARP SOCKS5 代理 | |
| browser = await playwright.chromium.launch( | |
| headless=True, | |
| proxy={ | |
| "server": PROXY_SERVER | |
| } | |
| ) | |
| context = await browser.new_context( | |
| viewport={"width": 1920, "height": 1080}, | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| ) | |
| if cookies: | |
| valid_cookies = [] | |
| for cookie in cookies: | |
| try: | |
| if cookie.get("name") and cookie.get("value") and cookie.get("domain"): | |
| valid_cookies.append(cookie) | |
| except: | |
| pass | |
| if valid_cookies: | |
| try: | |
| await context.add_cookies(valid_cookies) | |
| print(f"[Browser] Loaded {len(valid_cookies)} cookies") | |
| except Exception as e: | |
| print(f"[Browser] Error loading cookies: {e}") | |
| page = await context.new_page() | |
| target_link = config.get("targetLink", "https://www.example.com") | |
| try: | |
| await page.goto(target_link, wait_until="networkidle", timeout=60000) | |
| print(f"[Browser] Opened: {target_link}") | |
| except Exception as e: | |
| print(f"[Browser] Warning: Page load issue - {e}") | |
| async def close_browser(): | |
| """关闭浏览器""" | |
| global browser | |
| if browser: | |
| await browser.close() | |
| async def lifespan(app: FastAPI): | |
| """应用生命周期管理""" | |
| await init_browser() | |
| if config and config.get("activateLink"): | |
| interval = config.get("activateInterval", 60) | |
| scheduler.add_job( | |
| activate_task, | |
| "interval", | |
| seconds=interval, | |
| max_instances=1, | |
| coalesce=True, | |
| misfire_grace_time=30 | |
| ) | |
| print(f"[Scheduler] Activate task started with interval: {interval}s") | |
| click_interval = config.get("autoClickInterval", 5) | |
| scheduler.add_job( | |
| auto_click_task, | |
| "interval", | |
| seconds=click_interval, | |
| max_instances=1, | |
| coalesce=True, | |
| misfire_grace_time=10, | |
| replace_existing=True | |
| ) | |
| print(f"[Scheduler] Auto-click task started with interval: {click_interval}s") | |
| scheduler.start() | |
| yield | |
| scheduler.shutdown(wait=False) | |
| await close_browser() | |
| app = FastAPI(lifespan=lifespan) | |
| async def get_screenshot(): | |
| """获取页面截图""" | |
| global page | |
| if not page: | |
| return Response(content="Browser not initialized", status_code=500) | |
| try: | |
| screenshot = await page.screenshot(type="png", full_page=False) | |
| return Response(content=screenshot, media_type="image/png") | |
| except Exception as e: | |
| return Response(content=str(e), status_code=500) | |
| async def get_full_screenshot(): | |
| """获取完整页面截图""" | |
| global page | |
| if not page: | |
| return Response(content="Browser not initialized", status_code=500) | |
| try: | |
| screenshot = await page.screenshot(type="png", full_page=True) | |
| return Response(content=screenshot, media_type="image/png") | |
| except Exception as e: | |
| return Response(content=str(e), status_code=500) | |
| async def refresh_page(): | |
| """刷新页面""" | |
| global page | |
| if not page: | |
| return {"status": "error", "message": "Browser not initialized"} | |
| try: | |
| await page.reload(wait_until="networkidle", timeout=60000) | |
| return {"status": "success", "message": "Page refreshed"} | |
| except Exception as e: | |
| return {"status": "warning", "message": f"Refresh completed with issue: {e}"} | |
| async def navigate_to(url: str): | |
| """导航到指定URL""" | |
| global page | |
| if not page: | |
| return {"status": "error", "message": "Browser not initialized"} | |
| try: | |
| await page.goto(url, wait_until="networkidle", timeout=60000) | |
| return {"status": "success", "message": f"Navigated to {url}"} | |
| except Exception as e: | |
| return {"status": "warning", "message": f"Navigation completed with issue: {e}"} | |
| async def get_ip(): | |
| """获取当前出口 IP(验证 WARP 是否生效)""" | |
| try: | |
| async with httpx.AsyncClient(proxy=PROXY_SERVER, timeout=10) as client: | |
| response = await client.get("https://api.ipify.org?format=json") | |
| return response.json() | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def health_check(): | |
| """健康检查""" | |
| return {"status": "ok"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |