| |
| import aiohttp |
| import asyncio |
| import re |
| import json |
| import random |
| import time |
|
|
| EMAIL_RE = re.compile(r'id="email".*?value="(.*?)"', re.S | re.I) |
| PASS_RE = re.compile(r'id="password".*?value="(.*?)"', re.S | re.I) |
| CACHE_TIME = 36 |
|
|
| class IOSAppFetcher: |
|
|
| def __init__(self): |
| self.session = None |
| self.cache = {} |
| self.cache_time = {} |
| self.meta_info = { |
| "powered_by": "devily", |
| "version": "0.6.0", |
| "support": "https://t.me/Dev_Celeste" |
| } |
|
|
| with open("data/ios_apps.json", "r", encoding="utf-8") as f: |
| self.apps = json.load(f) |
|
|
| async def start(self): |
| timeout = aiohttp.ClientTimeout(total=10) |
| self.session = aiohttp.ClientSession( |
| timeout=timeout, |
| connector=aiohttp.TCPConnector( |
| limit=20, |
| ttl_dns_cache=3600, |
| ssl=False |
| ), |
| headers={ |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
| } |
| ) |
|
|
| async def stop(self): |
| if self.session and not self.session.closed: |
| await self.session.close() |
|
|
| async def fetch_account(self, url): |
| for _ in range(2): |
| try: |
| async with self.session.get(url) as resp: |
| if resp.status != 200: |
| return None |
| html = await resp.text() |
| e = EMAIL_RE.search(html) |
| p = PASS_RE.search(html) |
| if not e or not p: |
| return None |
| return { |
| "email": e.group(1).strip(), |
| "password": p.group(1).strip() |
| } |
| except: |
| await asyncio.sleep(0.5) |
| return None |
|
|
| async def get_app_accounts(self, name): |
| start = time.perf_counter() |
| name = name.lower() |
| |
| if name not in self.apps: |
| return {"ok": False, "error": "app_not_found", "meta": self.meta_info} |
| |
| now = time.time() |
| |
| if name in self.cache and now - self.cache_time[name] < CACHE_TIME: |
| process_time = round(time.perf_counter() - start, 4) |
| return { |
| "ok": True, |
| "cached": True, |
| "app": name, |
| "total": len(self.cache[name]), |
| "process_time": process_time, |
| "accounts": self.cache[name], |
| "meta": self.meta_info |
| } |
| |
| urls = list(set(self.apps[name])) |
| sem = asyncio.Semaphore(5) |
| |
| async def worker(url): |
| async with sem: |
| return await self.fetch_account(url) |
| |
| tasks = [worker(u) for u in urls] |
| results = await asyncio.gather(*tasks) |
| |
| accounts = [] |
| seen = set() |
| |
| for r in results: |
| if not r: continue |
| key = f"{r['email']}:{r['password']}" |
| if key in seen: continue |
| seen.add(key) |
| accounts.append(r) |
| |
| self.cache[name] = accounts |
| self.cache_time[name] = now |
| process_time = round(time.perf_counter() - start, 4) |
| |
| return { |
| "ok": True, |
| "cached": False, |
| "app": name, |
| "total": len(accounts), |
| "process_time": process_time, |
| "accounts": accounts, |
| "meta": self.meta_info |
| } |
|
|
| async def get_random(self, name): |
| res = await self.get_app_accounts(name) |
| if not res["ok"]: |
| return res |
|
|
| if not res["accounts"]: |
| return {"ok": False, "error": "no_accounts", "meta": self.meta_info} |
|
|
| acc = random.choice(res["accounts"]) |
|
|
| return { |
| "ok": True, |
| "app": name, |
| "account": acc, |
| "meta": self.meta_info |
| } |