api / app /enka_logic.py
Hana Celeste
Update app/enka_logic.py
eb53891 verified
import asyncio
import requests
import time
from playwright.async_api import async_playwright
FREEIMAGE_API_KEY = "6d207e02198a847aa98d0a2a901485a5"
FREEIMAGE_URL = "https://freeimage.host/api/1/upload"
class EnkaApp:
def __init__(self):
self.playwright = None
self.browser = None
# Cache lưu trữ: { uid: {"page": page, "context": context, "expires": timestamp} }
self.cache = {}
async def start(self):
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu']
)
# Chạy vòng lặp dọn dẹp cache ngầm
asyncio.create_task(self._cleanup_cache())
print("🚀 [System] Browser logic with Page Cache started.")
async def _cleanup_cache(self):
"""Dọn dẹp các tab đã hết hạn 2 phút"""
while True:
await asyncio.sleep(30)
now = time.time()
uids_to_remove = [uid for uid, item in self.cache.items() if now > item["expires"]]
for uid in uids_to_remove:
item = self.cache.pop(uid)
await item["context"].close()
print(f"🧹 [Cache] Closed expired tab for UID: {uid}")
async def get_page(self, uid: str):
"""Lấy page từ cache hoặc tạo mới nếu chưa có"""
if uid in self.cache:
print(f"⚡ [Cache] Reusing existing tab for UID: {uid}")
# Cập nhật thời gian hết hạn thêm 2 phút kể từ lần dùng mới nhất
self.cache[uid]["expires"] = time.time() + 120
return self.cache[uid]["page"], False # False nghĩa là không phải trang mới hoàn toàn
print(f"🌐 [Network] Creating new tab for UID: {uid}")
context = await self.browser.new_context(
viewport={'width': 1280, 'height': 800},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
page = await context.new_page()
# Lưu vào cache
self.cache[uid] = {
"page": page,
"context": context,
"expires": time.time() + 120
}
return page, True
async def fetch_info(self, uid: str):
start_time = time.time()
page, is_new = await self.get_page(uid)
try:
if is_new:
await page.goto(f"https://enka.network/u/{uid}/", wait_until="domcontentloaded", timeout=30000)
# Click lấy Namecard
try:
await page.click('button[data-icon="cards"]', timeout=1500)
await page.wait_for_selector('.Loda .bg', timeout=1500)
except: pass
data = await page.evaluate("""() => {
const arText = document.querySelector('.ar')?.innerText || "";
const stats = Array.from(document.querySelectorAll('tr.stat')).map(tr => {
const style = tr.querySelector('td.icon')?.getAttribute('style') || "";
const iconMatch = style.match(/url\\(["']?(.*?)["']?\\)/);
return {
value: tr.querySelector('td:first-child')?.innerText.trim(),
label: tr.querySelector('td:last-child')?.innerText.trim(),
icon: iconMatch ? "https://enka.network" + iconMatch[1].replace(/\\\\/g, "") : null
};
});
const ncBg = document.querySelector('.Loda .bg')?.style.backgroundImage;
const ncMatch = ncBg ? ncBg.match(/url\\(["']?(.*?)["']?\\)/) : null;
const characters = Array.from(document.querySelectorAll('.CharacterList .avatar')).map(el => {
const style = el.querySelector('figure.chara')?.getAttribute('style') || "";
const path = style.match(/url\\((.*)\\)/)?.[1].replace(/["']/g, "");
return { name: path?.split('_').pop().split('.')[0], level: el.querySelector('.level')?.innerText, icon: "https://enka.network" + path };
}).filter(i => i.name);
return {
player: {
name: document.querySelector('.details h1')?.innerText,
signature: document.querySelector('.signature')?.innerText,
ar: arText.match(/AR\\s*(\\d+)/i)?.[1], wl: arText.match(/WL\\s*(\\d+)/i)?.[1],
avatar: "https://enka.network" + document.querySelector('.avatar-icon img')?.getAttribute('src'),
namecard: ncMatch ? "https://enka.network" + ncMatch[1] : null,
stats: stats
},
characters: characters
};
}""")
return {"execution_time": f"{round(time.time() - start_time, 2)}s", "uid": uid, "cached": not is_new, **data}
except Exception as e:
return {"error": str(e)}
async def fetch_gen(self, uid: str, char: str = None):
start_time = time.time()
page, is_new = await self.get_page(uid)
try:
if is_new:
await page.goto(f"https://enka.network/u/{uid}/", wait_until="domcontentloaded", timeout=30000)
# Đóng cái Namecard (nếu đang mở từ bước info) để không che nút char
try: await page.keyboard.press("Escape")
except: pass
if char:
target = char.strip().capitalize()
try:
await page.click(f"//div[contains(@class, 'avatar')]//figure[contains(@style, '{target}')]", timeout=3000)
await asyncio.sleep(0.4)
except: pass
await page.click('button[data-icon="image"]')
await page.wait_for_selector('.Loda img[src^="blob:"]', timeout=15000)
img_base64 = await page.evaluate("""async () => {
const img = document.querySelector('.Loda img');
const res = await fetch(img.src);
const blob = await res.blob();
return new Promise(r => { const rd = new FileReader(); rd.onloadend = () => r(rd.result.split(',')[1]); rd.readAsDataURL(blob); });
}""")
# Tắt popup ảnh để page sẵn sàng cho lần gọi sau
try: await page.keyboard.press("Escape")
except: pass
res = requests.post(FREEIMAGE_URL, data={"key": FREEIMAGE_API_KEY, "action": "upload", "source": img_base64, "format": "json"}, timeout=15)
return {"execution_time": f"{round(time.time() - start_time, 2)}s", "uid": uid, "cached": not is_new, "card_url": res.json().get('image', {}).get('url')}
except Exception as e:
return {"error": str(e)}
async def stop(self):
for item in self.cache.values():
await item["context"].close()
if self.browser: await self.browser.close()
if self.playwright: await self.playwright.stop()