| import asyncio |
| import requests |
| import time |
| from playwright.async_api import async_playwright |
|
|
| FREEIMAGE_API_KEY = "6d207e02198a847aa98d0a2a901485a5" |
| FREEIMAGE_URL = "https://freeimage.host/api/1/upload" |
|
|
| class EnkaApp: |
| def __init__(self): |
| self.playwright = None |
| self.browser = None |
| |
| self.cache = {} |
|
|
| async def start(self): |
| self.playwright = await async_playwright().start() |
| self.browser = await self.playwright.chromium.launch( |
| headless=True, |
| args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu'] |
| ) |
| |
| asyncio.create_task(self._cleanup_cache()) |
| print("🚀 [System] Browser logic with Page Cache started.") |
|
|
| async def _cleanup_cache(self): |
| """Dọn dẹp các tab đã hết hạn 2 phút""" |
| while True: |
| await asyncio.sleep(30) |
| now = time.time() |
| uids_to_remove = [uid for uid, item in self.cache.items() if now > item["expires"]] |
| for uid in uids_to_remove: |
| item = self.cache.pop(uid) |
| await item["context"].close() |
| print(f"🧹 [Cache] Closed expired tab for UID: {uid}") |
|
|
| async def get_page(self, uid: str): |
| """Lấy page từ cache hoặc tạo mới nếu chưa có""" |
| if uid in self.cache: |
| print(f"⚡ [Cache] Reusing existing tab for UID: {uid}") |
| |
| self.cache[uid]["expires"] = time.time() + 120 |
| return self.cache[uid]["page"], False |
|
|
| print(f"🌐 [Network] Creating new tab for UID: {uid}") |
| context = await self.browser.new_context( |
| viewport={'width': 1280, 'height': 800}, |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
| ) |
| page = await context.new_page() |
| |
| self.cache[uid] = { |
| "page": page, |
| "context": context, |
| "expires": time.time() + 120 |
| } |
| return page, True |
|
|
| async def fetch_info(self, uid: str): |
| start_time = time.time() |
| page, is_new = await self.get_page(uid) |
| try: |
| if is_new: |
| await page.goto(f"https://enka.network/u/{uid}/", wait_until="domcontentloaded", timeout=30000) |
| |
| |
| try: |
| await page.click('button[data-icon="cards"]', timeout=1500) |
| await page.wait_for_selector('.Loda .bg', timeout=1500) |
| except: pass |
| |
| data = await page.evaluate("""() => { |
| const arText = document.querySelector('.ar')?.innerText || ""; |
| const stats = Array.from(document.querySelectorAll('tr.stat')).map(tr => { |
| const style = tr.querySelector('td.icon')?.getAttribute('style') || ""; |
| const iconMatch = style.match(/url\\(["']?(.*?)["']?\\)/); |
| return { |
| value: tr.querySelector('td:first-child')?.innerText.trim(), |
| label: tr.querySelector('td:last-child')?.innerText.trim(), |
| icon: iconMatch ? "https://enka.network" + iconMatch[1].replace(/\\\\/g, "") : null |
| }; |
| }); |
| const ncBg = document.querySelector('.Loda .bg')?.style.backgroundImage; |
| const ncMatch = ncBg ? ncBg.match(/url\\(["']?(.*?)["']?\\)/) : null; |
| const characters = Array.from(document.querySelectorAll('.CharacterList .avatar')).map(el => { |
| const style = el.querySelector('figure.chara')?.getAttribute('style') || ""; |
| const path = style.match(/url\\((.*)\\)/)?.[1].replace(/["']/g, ""); |
| return { name: path?.split('_').pop().split('.')[0], level: el.querySelector('.level')?.innerText, icon: "https://enka.network" + path }; |
| }).filter(i => i.name); |
| return { |
| player: { |
| name: document.querySelector('.details h1')?.innerText, |
| signature: document.querySelector('.signature')?.innerText, |
| ar: arText.match(/AR\\s*(\\d+)/i)?.[1], wl: arText.match(/WL\\s*(\\d+)/i)?.[1], |
| avatar: "https://enka.network" + document.querySelector('.avatar-icon img')?.getAttribute('src'), |
| namecard: ncMatch ? "https://enka.network" + ncMatch[1] : null, |
| stats: stats |
| }, |
| characters: characters |
| }; |
| }""") |
| return {"execution_time": f"{round(time.time() - start_time, 2)}s", "uid": uid, "cached": not is_new, **data} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| async def fetch_gen(self, uid: str, char: str = None): |
| start_time = time.time() |
| page, is_new = await self.get_page(uid) |
| try: |
| if is_new: |
| await page.goto(f"https://enka.network/u/{uid}/", wait_until="domcontentloaded", timeout=30000) |
| |
| |
| try: await page.keyboard.press("Escape") |
| except: pass |
|
|
| if char: |
| target = char.strip().capitalize() |
| try: |
| await page.click(f"//div[contains(@class, 'avatar')]//figure[contains(@style, '{target}')]", timeout=3000) |
| await asyncio.sleep(0.4) |
| except: pass |
| |
| await page.click('button[data-icon="image"]') |
| await page.wait_for_selector('.Loda img[src^="blob:"]', timeout=15000) |
| img_base64 = await page.evaluate("""async () => { |
| const img = document.querySelector('.Loda img'); |
| const res = await fetch(img.src); |
| const blob = await res.blob(); |
| return new Promise(r => { const rd = new FileReader(); rd.onloadend = () => r(rd.result.split(',')[1]); rd.readAsDataURL(blob); }); |
| }""") |
| |
| |
| try: await page.keyboard.press("Escape") |
| except: pass |
|
|
| res = requests.post(FREEIMAGE_URL, data={"key": FREEIMAGE_API_KEY, "action": "upload", "source": img_base64, "format": "json"}, timeout=15) |
| return {"execution_time": f"{round(time.time() - start_time, 2)}s", "uid": uid, "cached": not is_new, "card_url": res.json().get('image', {}).get('url')} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| async def stop(self): |
| for item in self.cache.values(): |
| await item["context"].close() |
| if self.browser: await self.browser.close() |
| if self.playwright: await self.playwright.stop() |
|
|