import json import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.responses import Response from playwright.async_api import async_playwright import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler # 全局变量 browser = None page = None context = None config = None scheduler = AsyncIOScheduler() # WARP 代理地址 PROXY_SERVER = "socks5://127.0.0.1:1080" def load_config(): """加载配置文件""" with open("config.json", "r", encoding="utf-8") as f: return json.load(f) def load_cookies(): """加载并转换cookies格式""" try: with open("cookies.json", "r", encoding="utf-8") as f: raw_cookies = json.load(f) except FileNotFoundError: return [] if not raw_cookies: return [] converted_cookies = [] for cookie in raw_cookies: new_cookie = { "name": cookie.get("name"), "value": cookie.get("value"), "domain": cookie.get("domain"), "path": cookie.get("path", "/"), } if "expirationDate" in cookie: new_cookie["expires"] = cookie["expirationDate"] elif "expires" in cookie: new_cookie["expires"] = cookie["expires"] same_site = cookie.get("sameSite", "").lower() if same_site in ["strict", "lax", "none"]: new_cookie["sameSite"] = same_site.capitalize() if same_site == "none": new_cookie["sameSite"] = "None" elif same_site == "unspecified" or same_site == "no_restriction": new_cookie["sameSite"] = "None" else: new_cookie["sameSite"] = "Lax" new_cookie["secure"] = cookie.get("secure", False) new_cookie["httpOnly"] = cookie.get("httpOnly", False) if new_cookie["sameSite"] == "None": new_cookie["secure"] = True converted_cookies.append(new_cookie) return converted_cookies async def activate_task(): """定时激活任务(通过代理)""" if config and config.get("activateLink"): try: async with httpx.AsyncClient(proxy=PROXY_SERVER, timeout=30) as client: response = await client.get(config["activateLink"]) print(f"[Activate] GET {config['activateLink']} - Status: {response.status_code}") except Exception as e: print(f"[Activate] Error: {e}") async def auto_click_task(): """自动点击指定按钮""" global page, config if not page: return click_buttons = config.get("autoClickButtons", ["Got it", "GOT IT", "Accept", "OK", "Dismiss", "Close"]) try: for text in click_buttons: selectors = [ f'mat-dialog-actions button:has-text("{text}")', f'mat-dialog-actions button.ms-button-primary', f'.cdk-overlay-container button:has-text("{text}")', f'.cdk-overlay-container button.ms-button-primary', f'.cdk-overlay-pane button:has-text("{text}")', f'.mat-mdc-dialog-actions button:has-text("{text}")', f'button.ms-button-primary:has-text("{text}")', f'button:has-text("{text}")', ] for selector in selectors: try: locator = page.locator(selector) count = await locator.count() if count > 0: element = locator.first if await element.is_visible(timeout=300): await element.click(timeout=3000, force=True) print(f'[AutoClick] Clicked "{text}"') return except: continue # JavaScript 备用 try: clicked = await page.evaluate('''(searchText) => { const dialogActions = document.querySelector('mat-dialog-actions'); if (dialogActions) { const buttons = dialogActions.querySelectorAll('button'); for (const btn of buttons) { if (btn.textContent.trim().toLowerCase() === searchText.toLowerCase()) { btn.click(); return true; } } } const overlay = document.querySelector('.cdk-overlay-container'); if (overlay) { const buttons = overlay.querySelectorAll('button'); for (const btn of buttons) { if (btn.textContent.trim().toLowerCase() === searchText.toLowerCase()) { btn.click(); return true; } } } return false; }''', text) if clicked: print(f'[AutoClick] Clicked "{text}" via JS') return except: pass except: pass async def init_browser(): """初始化浏览器(使用 WARP 代理)""" global browser, page, context, config config = load_config() cookies = load_cookies() playwright = await async_playwright().start() # 使用 WARP SOCKS5 代理 browser = await playwright.chromium.launch( headless=True, proxy={ "server": PROXY_SERVER } ) context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) if cookies: valid_cookies = [] for cookie in cookies: try: if cookie.get("name") and cookie.get("value") and cookie.get("domain"): valid_cookies.append(cookie) except: pass if valid_cookies: try: await context.add_cookies(valid_cookies) print(f"[Browser] Loaded {len(valid_cookies)} cookies") except Exception as e: print(f"[Browser] Error loading cookies: {e}") page = await context.new_page() target_link = config.get("targetLink", "https://www.example.com") try: await page.goto(target_link, wait_until="networkidle", timeout=60000) print(f"[Browser] Opened: {target_link}") except Exception as e: print(f"[Browser] Warning: Page load issue - {e}") async def close_browser(): """关闭浏览器""" global browser if browser: await browser.close() @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" await init_browser() if config and config.get("activateLink"): interval = config.get("activateInterval", 60) scheduler.add_job( activate_task, "interval", seconds=interval, max_instances=1, coalesce=True, misfire_grace_time=30 ) print(f"[Scheduler] Activate task started with interval: {interval}s") click_interval = config.get("autoClickInterval", 5) scheduler.add_job( auto_click_task, "interval", seconds=click_interval, max_instances=1, coalesce=True, misfire_grace_time=10, replace_existing=True ) print(f"[Scheduler] Auto-click task started with interval: {click_interval}s") scheduler.start() yield scheduler.shutdown(wait=False) await close_browser() app = FastAPI(lifespan=lifespan) @app.get("/screenshot") async def get_screenshot(): """获取页面截图""" global page if not page: return Response(content="Browser not initialized", status_code=500) try: screenshot = await page.screenshot(type="png", full_page=False) return Response(content=screenshot, media_type="image/png") except Exception as e: return Response(content=str(e), status_code=500) @app.get("/screenshot/full") async def get_full_screenshot(): """获取完整页面截图""" global page if not page: return Response(content="Browser not initialized", status_code=500) try: screenshot = await page.screenshot(type="png", full_page=True) return Response(content=screenshot, media_type="image/png") except Exception as e: return Response(content=str(e), status_code=500) @app.get("/refresh") async def refresh_page(): """刷新页面""" global page if not page: return {"status": "error", "message": "Browser not initialized"} try: await page.reload(wait_until="networkidle", timeout=60000) return {"status": "success", "message": "Page refreshed"} except Exception as e: return {"status": "warning", "message": f"Refresh completed with issue: {e}"} @app.get("/navigate") async def navigate_to(url: str): """导航到指定URL""" global page if not page: return {"status": "error", "message": "Browser not initialized"} try: await page.goto(url, wait_until="networkidle", timeout=60000) return {"status": "success", "message": f"Navigated to {url}"} except Exception as e: return {"status": "warning", "message": f"Navigation completed with issue: {e}"} @app.get("/ip") async def get_ip(): """获取当前出口 IP(验证 WARP 是否生效)""" try: async with httpx.AsyncClient(proxy=PROXY_SERVER, timeout=10) as client: response = await client.get("https://api.ipify.org?format=json") return response.json() except Exception as e: return {"status": "error", "message": str(e)} @app.get("/health") async def health_check(): """健康检查""" return {"status": "ok"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)