api.3 / app /ht_rd.py
Celeskry's picture
Update app/ht_rd.py
3569927 verified
# app/ht_rd.py
import cloudscraper
import json
import random
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from fastapi import HTTPException
class HentaiRandomApp:
def __init__(self):
self.base_url = "https://hentaiz.dog/watch"
self.api_url = (
"https://hentaiz.dog/__data.json"
"?x-sveltekit-trailing-slash=1"
"&x-sveltekit-invalidated=001"
)
self.img_base = "https://storage.haiten.org"
self.scraper = cloudscraper.create_scraper(
browser={"browser": "chrome", "platform": "windows"}
)
self.cache_data: Optional[List[Dict]] = None
self.cache_expire: Optional[datetime] = None
self.cache_ttl = 600
async def start(self):
return
async def stop(self):
try:
self.scraper.close()
except:
pass
def _get_val(self, idx, pool):
"""Hàm lấy giá trị an toàn từ pool theo index"""
if isinstance(idx, int) and 0 <= idx < len(pool):
return pool[idx]
return idx
def _resolve_img(self, img_idx, pool):
"""Xử lý lấy URL ảnh từ cấu trúc lồng nhau"""
img_obj = self._get_val(img_idx, pool)
if isinstance(img_obj, dict) and "filePath" in img_obj:
path = self._get_val(img_obj["filePath"], pool)
if isinstance(path, str):
return f"{self.img_base}{path}"
return ""
async def fetch_data(self) -> List[Dict]:
if (self.cache_data and self.cache_expire and datetime.utcnow() < self.cache_expire):
return self.cache_data
response = self.scraper.get(self.api_url, timeout=20)
if response.status_code != 200:
return self.cache_data or []
# Xử lý SvelteKit multi-line JSON
lines = response.text.strip().split("\n")
full_pool = []
for line in lines:
try:
chunk = json.loads(line)
# SvelteKit thường để data trong nodes -> data
if "nodes" in chunk:
for node in chunk["nodes"]:
if node and "data" in node:
full_pool.extend(node["data"])
elif "data" in chunk:
full_pool.extend(chunk["data"])
except:
continue
if not full_pool:
return []
items = []
# Duyệt qua pool để tìm các object là Anime (có title và slug)
for entry in full_pool:
if isinstance(entry, dict) and "title" in entry and "slug" in entry:
try:
# Lấy Title & Slug
title = self._get_val(entry["title"], full_pool)
slug = self._get_val(entry["slug"], full_pool)
if not isinstance(slug, str): continue
# Fix Encoding nếu bị lỗi latin1
if isinstance(title, str):
try: title = title.encode("latin1").decode("utf-8")
except: pass
# Episode
ep_val = self._get_val(entry.get("episodeNumber", 1), full_pool)
# Ảnh
poster = self._resolve_img(entry.get("posterImage"), full_pool)
backdrop = self._resolve_img(entry.get("backdropImage"), full_pool)
# TRUY VẾT STUDIO (Logic từ ht_noti.py)
studios = []
st_ref = entry.get("studios")
st_list = self._get_val(st_ref, full_pool)
if isinstance(st_list, list):
for st_idx in st_list:
st_wrapper = self._get_val(st_idx, full_pool)
if isinstance(st_wrapper, dict) and "studio" in st_wrapper:
st_info = self._get_val(st_wrapper["studio"], full_pool)
if isinstance(st_info, dict) and "name" in st_info:
st_name = self._get_val(st_info["name"], full_pool)
if isinstance(st_name, str):
try: st_name = st_name.encode("latin1").decode("utf-8")
except: pass
studios.append(st_name)
items.append({
"title": title,
"slug": slug,
"episode": int(ep_val) if isinstance(ep_val, (int, float)) else 1,
"poster_url": poster,
"backdrop_url": backdrop,
"studios": studios,
"link": f"{self.base_url}/{slug}"
})
except:
continue
# Loại bỏ các item lỗi hoặc trùng lặp
unique_items = list({it["slug"]: it for it in items}.values())
self.cache_data = unique_items
self.cache_expire = datetime.utcnow() + timedelta(seconds=self.cache_ttl)
return unique_items
async def get_random(self, apikey: Optional[str], system_key: Optional[str], limit: str = "1"):
if system_key and apikey != system_key:
raise HTTPException(status_code=403, detail="Invalid API key")
try:
if "-" in limit:
low, high = map(int, limit.split("-"))
count = random.randint(max(1, low), min(50, high))
else:
count = max(1, min(50, int(limit)))
except:
count = 1
all_items = await self.fetch_data()
if not all_items:
return {"ok": False, "msg": "No data found"}
selected = random.sample(all_items, min(count, len(all_items)))
return {
"ok": True,
"total_pool": len(all_items),
"item": selected if (count > 1 or "-" in limit) else selected[0],
"source": "hentaiz.dog",
"cached": True,
"made_by": "devily"
}