BuildBrowser / main.py
StarrySkyWorld's picture
Update main.py
a935e3c verified
import json
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.responses import Response
from playwright.async_api import async_playwright
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# 全局变量
browser = None
page = None
context = None
config = None
scheduler = AsyncIOScheduler()
# WARP 代理地址
PROXY_SERVER = "socks5://127.0.0.1:1080"
def load_config():
"""加载配置文件"""
with open("config.json", "r", encoding="utf-8") as f:
return json.load(f)
def load_cookies():
"""加载并转换cookies格式"""
try:
with open("cookies.json", "r", encoding="utf-8") as f:
raw_cookies = json.load(f)
except FileNotFoundError:
return []
if not raw_cookies:
return []
converted_cookies = []
for cookie in raw_cookies:
new_cookie = {
"name": cookie.get("name"),
"value": cookie.get("value"),
"domain": cookie.get("domain"),
"path": cookie.get("path", "/"),
}
if "expirationDate" in cookie:
new_cookie["expires"] = cookie["expirationDate"]
elif "expires" in cookie:
new_cookie["expires"] = cookie["expires"]
same_site = cookie.get("sameSite", "").lower()
if same_site in ["strict", "lax", "none"]:
new_cookie["sameSite"] = same_site.capitalize()
if same_site == "none":
new_cookie["sameSite"] = "None"
elif same_site == "unspecified" or same_site == "no_restriction":
new_cookie["sameSite"] = "None"
else:
new_cookie["sameSite"] = "Lax"
new_cookie["secure"] = cookie.get("secure", False)
new_cookie["httpOnly"] = cookie.get("httpOnly", False)
if new_cookie["sameSite"] == "None":
new_cookie["secure"] = True
converted_cookies.append(new_cookie)
return converted_cookies
async def activate_task():
"""定时激活任务(通过代理)"""
if config and config.get("activateLink"):
try:
async with httpx.AsyncClient(proxy=PROXY_SERVER, timeout=30) as client:
response = await client.get(config["activateLink"])
print(f"[Activate] GET {config['activateLink']} - Status: {response.status_code}")
except Exception as e:
print(f"[Activate] Error: {e}")
async def auto_click_task():
"""自动点击指定按钮"""
global page, config
if not page:
return
click_buttons = config.get("autoClickButtons", ["Got it", "GOT IT", "Accept", "OK", "Dismiss", "Close"])
try:
for text in click_buttons:
selectors = [
f'mat-dialog-actions button:has-text("{text}")',
f'mat-dialog-actions button.ms-button-primary',
f'.cdk-overlay-container button:has-text("{text}")',
f'.cdk-overlay-container button.ms-button-primary',
f'.cdk-overlay-pane button:has-text("{text}")',
f'.mat-mdc-dialog-actions button:has-text("{text}")',
f'button.ms-button-primary:has-text("{text}")',
f'button:has-text("{text}")',
]
for selector in selectors:
try:
locator = page.locator(selector)
count = await locator.count()
if count > 0:
element = locator.first
if await element.is_visible(timeout=300):
await element.click(timeout=3000, force=True)
print(f'[AutoClick] Clicked "{text}"')
return
except:
continue
# JavaScript 备用
try:
clicked = await page.evaluate('''(searchText) => {
const dialogActions = document.querySelector('mat-dialog-actions');
if (dialogActions) {
const buttons = dialogActions.querySelectorAll('button');
for (const btn of buttons) {
if (btn.textContent.trim().toLowerCase() === searchText.toLowerCase()) {
btn.click();
return true;
}
}
}
const overlay = document.querySelector('.cdk-overlay-container');
if (overlay) {
const buttons = overlay.querySelectorAll('button');
for (const btn of buttons) {
if (btn.textContent.trim().toLowerCase() === searchText.toLowerCase()) {
btn.click();
return true;
}
}
}
return false;
}''', text)
if clicked:
print(f'[AutoClick] Clicked "{text}" via JS')
return
except:
pass
except:
pass
async def init_browser():
"""初始化浏览器(使用 WARP 代理)"""
global browser, page, context, config
config = load_config()
cookies = load_cookies()
playwright = await async_playwright().start()
# 使用 WARP SOCKS5 代理
browser = await playwright.chromium.launch(
headless=True,
proxy={
"server": PROXY_SERVER
}
)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
if cookies:
valid_cookies = []
for cookie in cookies:
try:
if cookie.get("name") and cookie.get("value") and cookie.get("domain"):
valid_cookies.append(cookie)
except:
pass
if valid_cookies:
try:
await context.add_cookies(valid_cookies)
print(f"[Browser] Loaded {len(valid_cookies)} cookies")
except Exception as e:
print(f"[Browser] Error loading cookies: {e}")
page = await context.new_page()
target_link = config.get("targetLink", "https://www.example.com")
try:
await page.goto(target_link, wait_until="networkidle", timeout=60000)
print(f"[Browser] Opened: {target_link}")
except Exception as e:
print(f"[Browser] Warning: Page load issue - {e}")
async def close_browser():
"""关闭浏览器"""
global browser
if browser:
await browser.close()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
await init_browser()
if config and config.get("activateLink"):
interval = config.get("activateInterval", 60)
scheduler.add_job(
activate_task,
"interval",
seconds=interval,
max_instances=1,
coalesce=True,
misfire_grace_time=30
)
print(f"[Scheduler] Activate task started with interval: {interval}s")
click_interval = config.get("autoClickInterval", 5)
scheduler.add_job(
auto_click_task,
"interval",
seconds=click_interval,
max_instances=1,
coalesce=True,
misfire_grace_time=10,
replace_existing=True
)
print(f"[Scheduler] Auto-click task started with interval: {click_interval}s")
scheduler.start()
yield
scheduler.shutdown(wait=False)
await close_browser()
app = FastAPI(lifespan=lifespan)
@app.get("/screenshot")
async def get_screenshot():
"""获取页面截图"""
global page
if not page:
return Response(content="Browser not initialized", status_code=500)
try:
screenshot = await page.screenshot(type="png", full_page=False)
return Response(content=screenshot, media_type="image/png")
except Exception as e:
return Response(content=str(e), status_code=500)
@app.get("/screenshot/full")
async def get_full_screenshot():
"""获取完整页面截图"""
global page
if not page:
return Response(content="Browser not initialized", status_code=500)
try:
screenshot = await page.screenshot(type="png", full_page=True)
return Response(content=screenshot, media_type="image/png")
except Exception as e:
return Response(content=str(e), status_code=500)
@app.get("/refresh")
async def refresh_page():
"""刷新页面"""
global page
if not page:
return {"status": "error", "message": "Browser not initialized"}
try:
await page.reload(wait_until="networkidle", timeout=60000)
return {"status": "success", "message": "Page refreshed"}
except Exception as e:
return {"status": "warning", "message": f"Refresh completed with issue: {e}"}
@app.get("/navigate")
async def navigate_to(url: str):
"""导航到指定URL"""
global page
if not page:
return {"status": "error", "message": "Browser not initialized"}
try:
await page.goto(url, wait_until="networkidle", timeout=60000)
return {"status": "success", "message": f"Navigated to {url}"}
except Exception as e:
return {"status": "warning", "message": f"Navigation completed with issue: {e}"}
@app.get("/ip")
async def get_ip():
"""获取当前出口 IP(验证 WARP 是否生效)"""
try:
async with httpx.AsyncClient(proxy=PROXY_SERVER, timeout=10) as client:
response = await client.get("https://api.ipify.org?format=json")
return response.json()
except Exception as e:
return {"status": "error", "message": str(e)}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)