| |
| import asyncio |
| import os |
| import uuid |
| import httpx |
| from playwright_stealth import stealth_async |
|
|
| class GenApeGen: |
| def __init__(self, context, base_url, semaphore): |
| self.context = context |
| self.base_url = base_url |
| self.semaphore = semaphore |
|
|
| async def _clean_popups_safe(self, page): |
| await page.evaluate('''() => { |
| const cssSelectors = [ |
| '[data-testid="popup__gotit-btn"]', |
| '[data-testid="popup__close-btn"]' |
| ]; |
| cssSelectors.forEach(s => { |
| const el = document.querySelector(s); |
| if (el && typeof el.click === 'function') el.click(); |
| }); |
| }''') |
|
|
| async def generate_image(self, prompt: str, image_url: str = None): |
| async with self.semaphore: |
| page = await self.context.new_page() |
| await stealth_async(page) |
|
|
| final_img_url = None |
| task_finished_signal = False |
|
|
| temp_filename = f"temp_{uuid.uuid4().hex}.png" |
| temp_image = os.path.abspath(temp_filename) |
|
|
| async def handle_response(response): |
| nonlocal final_img_url, task_finished_signal |
|
|
| if "users/credit_without_excluded" in response.url: |
| task_finished_signal = True |
|
|
| if "api.genape.ai/generate/image" in response.url and response.status == 200: |
| try: |
| data = await response.json() |
| if data.get("result"): |
| final_img_url = data["result"][0].get("img_url") |
| except: |
| pass |
|
|
| page.on("response", handle_response) |
|
|
| try: |
| await page.goto(self.base_url, wait_until="domcontentloaded", timeout=60000) |
| await asyncio.sleep(8) |
|
|
| await self._clean_popups_safe(page) |
|
|
| await page.evaluate('''(t) => { |
| document.querySelectorAll('textarea').forEach(ta => { |
| ta.focus(); ta.value = t; |
| ta.dispatchEvent(new Event('input', { bubbles: true })); |
| }); |
| }''', prompt) |
|
|
| if image_url: |
| async with httpx.AsyncClient(follow_redirects=True) as client: |
| resp = await client.get(image_url) |
| if resp.status_code == 200: |
| with open(temp_image, "wb") as f: |
| f.write(resp.content) |
|
|
| await page.locator('input[type="file"]').first.set_input_files(temp_image) |
| await asyncio.sleep(5) |
|
|
| await page.evaluate('''() => { |
| const btn = document.querySelector('button#generate') || document.querySelector('.gtm-generate'); |
| if (btn) btn.click(); |
| }''') |
|
|
| for _ in range(300): |
| if task_finished_signal and final_img_url: |
| break |
| await asyncio.sleep(1) |
|
|
| if final_img_url: |
| return {"ok": True, "img_url": final_img_url} |
| else: |
| await page.screenshot(path="timeout_gen.png") |
| return {"ok": False, "error": "Timeout"} |
|
|
| except Exception as e: |
| await page.screenshot(path="error_gen.png") |
| return {"ok": False, "error": str(e)} |
|
|
| finally: |
| if os.path.exists(temp_image): |
| try: |
| os.remove(temp_image) |
| except: |
| pass |
|
|
| await page.close() |