File size: 4,006 Bytes
295268b 6f329af 295268b 6f329af 755f254 295268b 6f329af 8742f8c 755f254 8742f8c 295268b 6f329af 295268b 6f329af 295268b 6f329af 295268b 6f329af 295268b 6f329af 295268b 6f329af 295268b 8742f8c 6f329af 295268b 6f329af 8742f8c 295268b 6f329af 295268b 6f329af 8742f8c 6f329af 8742f8c 6f329af 8742f8c 6f329af 295268b 6f329af 8742f8c 295268b 6f329af 295268b 8742f8c 295268b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | # app/ios_app.py
import aiohttp
import asyncio
import re
import json
import random
import time
EMAIL_RE = re.compile(r'id="email".*?value="(.*?)"', re.S | re.I)
PASS_RE = re.compile(r'id="password".*?value="(.*?)"', re.S | re.I)
CACHE_TIME = 36
class IOSAppFetcher:
def __init__(self):
self.session = None
self.cache = {}
self.cache_time = {}
self.meta_info = {
"powered_by": "devily",
"version": "0.6.0",
"support": "https://t.me/Dev_Celeste"
}
with open("data/ios_apps.json", "r", encoding="utf-8") as f:
self.apps = json.load(f)
async def start(self):
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=aiohttp.TCPConnector(
limit=20,
ttl_dns_cache=3600,
ssl=False
),
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
)
async def stop(self):
if self.session and not self.session.closed:
await self.session.close()
async def fetch_account(self, url):
for _ in range(2):
try:
async with self.session.get(url) as resp:
if resp.status != 200:
return None
html = await resp.text()
e = EMAIL_RE.search(html)
p = PASS_RE.search(html)
if not e or not p:
return None
return {
"email": e.group(1).strip(),
"password": p.group(1).strip()
}
except:
await asyncio.sleep(0.5)
return None
async def get_app_accounts(self, name):
start = time.perf_counter()
name = name.lower()
if name not in self.apps:
return {"ok": False, "error": "app_not_found", "meta": self.meta_info}
now = time.time()
if name in self.cache and now - self.cache_time[name] < CACHE_TIME:
process_time = round(time.perf_counter() - start, 4)
return {
"ok": True,
"cached": True,
"app": name,
"total": len(self.cache[name]),
"process_time": process_time,
"accounts": self.cache[name],
"meta": self.meta_info
}
urls = list(set(self.apps[name]))
sem = asyncio.Semaphore(5)
async def worker(url):
async with sem:
return await self.fetch_account(url)
tasks = [worker(u) for u in urls]
results = await asyncio.gather(*tasks)
accounts = []
seen = set()
for r in results:
if not r: continue
key = f"{r['email']}:{r['password']}"
if key in seen: continue
seen.add(key)
accounts.append(r)
self.cache[name] = accounts
self.cache_time[name] = now
process_time = round(time.perf_counter() - start, 4)
return {
"ok": True,
"cached": False,
"app": name,
"total": len(accounts),
"process_time": process_time,
"accounts": accounts,
"meta": self.meta_info
}
async def get_random(self, name):
res = await self.get_app_accounts(name)
if not res["ok"]:
return res
if not res["accounts"]:
return {"ok": False, "error": "no_accounts", "meta": self.meta_info}
acc = random.choice(res["accounts"])
return {
"ok": True,
"app": name,
"account": acc,
"meta": self.meta_info
} |