# app/ios_app.py import aiohttp import asyncio import re import json import random import time EMAIL_RE = re.compile(r'id="email".*?value="(.*?)"', re.S | re.I) PASS_RE = re.compile(r'id="password".*?value="(.*?)"', re.S | re.I) CACHE_TIME = 36 class IOSAppFetcher: def __init__(self): self.session = None self.cache = {} self.cache_time = {} self.meta_info = { "powered_by": "devily", "version": "0.6.0", "support": "https://t.me/Dev_Celeste" } with open("data/ios_apps.json", "r", encoding="utf-8") as f: self.apps = json.load(f) async def start(self): timeout = aiohttp.ClientTimeout(total=10) self.session = aiohttp.ClientSession( timeout=timeout, connector=aiohttp.TCPConnector( limit=20, ttl_dns_cache=3600, ssl=False ), headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } ) async def stop(self): if self.session and not self.session.closed: await self.session.close() async def fetch_account(self, url): for _ in range(2): try: async with self.session.get(url) as resp: if resp.status != 200: return None html = await resp.text() e = EMAIL_RE.search(html) p = PASS_RE.search(html) if not e or not p: return None return { "email": e.group(1).strip(), "password": p.group(1).strip() } except: await asyncio.sleep(0.5) return None async def get_app_accounts(self, name): start = time.perf_counter() name = name.lower() if name not in self.apps: return {"ok": False, "error": "app_not_found", "meta": self.meta_info} now = time.time() if name in self.cache and now - self.cache_time[name] < CACHE_TIME: process_time = round(time.perf_counter() - start, 4) return { "ok": True, "cached": True, "app": name, "total": len(self.cache[name]), "process_time": process_time, "accounts": self.cache[name], "meta": self.meta_info } urls = list(set(self.apps[name])) sem = asyncio.Semaphore(5) async def worker(url): async with sem: return await self.fetch_account(url) tasks = [worker(u) for u in urls] results = await asyncio.gather(*tasks) accounts = [] seen = set() for r in results: if not r: continue key = f"{r['email']}:{r['password']}" if key in seen: continue seen.add(key) accounts.append(r) self.cache[name] = accounts self.cache_time[name] = now process_time = round(time.perf_counter() - start, 4) return { "ok": True, "cached": False, "app": name, "total": len(accounts), "process_time": process_time, "accounts": accounts, "meta": self.meta_info } async def get_random(self, name): res = await self.get_app_accounts(name) if not res["ok"]: return res if not res["accounts"]: return {"ok": False, "error": "no_accounts", "meta": self.meta_info} acc = random.choice(res["accounts"]) return { "ok": True, "app": name, "account": acc, "meta": self.meta_info }