api.2 / app /gen_logic.py
Celeskry's picture
Update app/gen_logic.py
4235a64 verified
# app/gen_logic.py
import asyncio
import os
import uuid
import httpx
from playwright_stealth import stealth_async
class GenApeGen:
def __init__(self, context, base_url, semaphore):
self.context = context
self.base_url = base_url
self.semaphore = semaphore
async def _clean_popups_safe(self, page):
await page.evaluate('''() => {
const cssSelectors = [
'[data-testid="popup__gotit-btn"]',
'[data-testid="popup__close-btn"]'
];
cssSelectors.forEach(s => {
const el = document.querySelector(s);
if (el && typeof el.click === 'function') el.click();
});
}''')
async def generate_image(self, prompt: str, image_url: str = None):
async with self.semaphore:
page = await self.context.new_page()
await stealth_async(page)
final_img_url = None
task_finished_signal = False
temp_filename = f"temp_{uuid.uuid4().hex}.png"
temp_image = os.path.abspath(temp_filename)
async def handle_response(response):
nonlocal final_img_url, task_finished_signal
if "users/credit_without_excluded" in response.url:
task_finished_signal = True
if "api.genape.ai/generate/image" in response.url and response.status == 200:
try:
data = await response.json()
if data.get("result"):
final_img_url = data["result"][0].get("img_url")
except:
pass
page.on("response", handle_response)
try:
await page.goto(self.base_url, wait_until="domcontentloaded", timeout=60000)
await asyncio.sleep(8)
await self._clean_popups_safe(page)
await page.evaluate('''(t) => {
document.querySelectorAll('textarea').forEach(ta => {
ta.focus(); ta.value = t;
ta.dispatchEvent(new Event('input', { bubbles: true }));
});
}''', prompt)
if image_url:
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.get(image_url)
if resp.status_code == 200:
with open(temp_image, "wb") as f:
f.write(resp.content)
await page.locator('input[type="file"]').first.set_input_files(temp_image)
await asyncio.sleep(5)
await page.evaluate('''() => {
const btn = document.querySelector('button#generate') || document.querySelector('.gtm-generate');
if (btn) btn.click();
}''')
for _ in range(300):
if task_finished_signal and final_img_url:
break
await asyncio.sleep(1)
if final_img_url:
return {"ok": True, "img_url": final_img_url}
else:
await page.screenshot(path="timeout_gen.png")
return {"ok": False, "error": "Timeout"}
except Exception as e:
await page.screenshot(path="error_gen.png")
return {"ok": False, "error": str(e)}
finally:
if os.path.exists(temp_image):
try:
os.remove(temp_image)
except:
pass
await page.close()