api.3 / app /ios_app.py
Celeskry's picture
Update app/ios_app.py
755f254 verified
# app/ios_app.py
import aiohttp
import asyncio
import re
import json
import random
import time
EMAIL_RE = re.compile(r'id="email".*?value="(.*?)"', re.S | re.I)
PASS_RE = re.compile(r'id="password".*?value="(.*?)"', re.S | re.I)
CACHE_TIME = 36
class IOSAppFetcher:
def __init__(self):
self.session = None
self.cache = {}
self.cache_time = {}
self.meta_info = {
"powered_by": "devily",
"version": "0.6.0",
"support": "https://t.me/Dev_Celeste"
}
with open("data/ios_apps.json", "r", encoding="utf-8") as f:
self.apps = json.load(f)
async def start(self):
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=aiohttp.TCPConnector(
limit=20,
ttl_dns_cache=3600,
ssl=False
),
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
)
async def stop(self):
if self.session and not self.session.closed:
await self.session.close()
async def fetch_account(self, url):
for _ in range(2):
try:
async with self.session.get(url) as resp:
if resp.status != 200:
return None
html = await resp.text()
e = EMAIL_RE.search(html)
p = PASS_RE.search(html)
if not e or not p:
return None
return {
"email": e.group(1).strip(),
"password": p.group(1).strip()
}
except:
await asyncio.sleep(0.5)
return None
async def get_app_accounts(self, name):
start = time.perf_counter()
name = name.lower()
if name not in self.apps:
return {"ok": False, "error": "app_not_found", "meta": self.meta_info}
now = time.time()
if name in self.cache and now - self.cache_time[name] < CACHE_TIME:
process_time = round(time.perf_counter() - start, 4)
return {
"ok": True,
"cached": True,
"app": name,
"total": len(self.cache[name]),
"process_time": process_time,
"accounts": self.cache[name],
"meta": self.meta_info
}
urls = list(set(self.apps[name]))
sem = asyncio.Semaphore(5)
async def worker(url):
async with sem:
return await self.fetch_account(url)
tasks = [worker(u) for u in urls]
results = await asyncio.gather(*tasks)
accounts = []
seen = set()
for r in results:
if not r: continue
key = f"{r['email']}:{r['password']}"
if key in seen: continue
seen.add(key)
accounts.append(r)
self.cache[name] = accounts
self.cache_time[name] = now
process_time = round(time.perf_counter() - start, 4)
return {
"ok": True,
"cached": False,
"app": name,
"total": len(accounts),
"process_time": process_time,
"accounts": accounts,
"meta": self.meta_info
}
async def get_random(self, name):
res = await self.get_app_accounts(name)
if not res["ok"]:
return res
if not res["accounts"]:
return {"ok": False, "error": "no_accounts", "meta": self.meta_info}
acc = random.choice(res["accounts"])
return {
"ok": True,
"app": name,
"account": acc,
"meta": self.meta_info
}