BuildBrowser / main.py
StarrySkyWorld's picture
Create main.py
3393f43 verified
raw
history blame
3.89 kB
import json
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.responses import Response
from playwright.async_api import async_playwright
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# 全局变量
browser = None
page = None
config = None
scheduler = AsyncIOScheduler()
def load_config():
"""加载配置文件"""
with open("config.json", "r", encoding="utf-8") as f:
return json.load(f)
def load_cookies():
"""加载cookies"""
try:
with open("cookies.json", "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return []
async def activate_task():
"""定时激活任务"""
if config and config.get("activateLink"):
try:
async with httpx.AsyncClient() as client:
response = await client.get(config["activateLink"], timeout=30)
print(f"[Activate] GET {config['activateLink']} - Status: {response.status_code}")
except Exception as e:
print(f"[Activate] Error: {e}")
async def init_browser():
"""初始化浏览器"""
global browser, page, config
config = load_config()
cookies = load_cookies()
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080}
)
# 导入cookies
if cookies:
await context.add_cookies(cookies)
print(f"[Browser] Loaded {len(cookies)} cookies")
page = await context.new_page()
# 打开目标页面
target_link = config.get("targetLink", "https://www.example.com")
await page.goto(target_link, wait_until="networkidle")
print(f"[Browser] Opened: {target_link}")
async def close_browser():
"""关闭浏览器"""
global browser
if browser:
await browser.close()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时
await init_browser()
# 启动定时任务
if config and config.get("activateLink"):
interval = config.get("activateInterval", 60)
scheduler.add_job(activate_task, "interval", seconds=interval)
scheduler.start()
print(f"[Scheduler] Started with interval: {interval}s")
yield
# 关闭时
scheduler.shutdown(wait=False)
await close_browser()
app = FastAPI(lifespan=lifespan)
@app.get("/screenshot")
async def get_screenshot():
"""获取页面截图"""
global page
if not page:
return Response(content="Browser not initialized", status_code=500)
try:
screenshot = await page.screenshot(type="png", full_page=False)
return Response(content=screenshot, media_type="image/png")
except Exception as e:
return Response(content=str(e), status_code=500)
@app.get("/screenshot/full")
async def get_full_screenshot():
"""获取完整页面截图"""
global page
if not page:
return Response(content="Browser not initialized", status_code=500)
try:
screenshot = await page.screenshot(type="png", full_page=True)
return Response(content=screenshot, media_type="image/png")
except Exception as e:
return Response(content=str(e), status_code=500)
@app.get("/refresh")
async def refresh_page():
"""刷新页面"""
global page
if not page:
return {"status": "error", "message": "Browser not initialized"}
await page.reload(wait_until="networkidle")
return {"status": "success", "message": "Page refreshed"}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)