# app/ht_rd.py import cloudscraper import json import random from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from fastapi import HTTPException class HentaiRandomApp: def __init__(self): self.base_url = "https://hentaiz.dog/watch" self.api_url = ( "https://hentaiz.dog/__data.json" "?x-sveltekit-trailing-slash=1" "&x-sveltekit-invalidated=001" ) self.img_base = "https://storage.haiten.org" self.scraper = cloudscraper.create_scraper( browser={"browser": "chrome", "platform": "windows"} ) self.cache_data: Optional[List[Dict]] = None self.cache_expire: Optional[datetime] = None self.cache_ttl = 600 async def start(self): return async def stop(self): try: self.scraper.close() except: pass def _get_val(self, idx, pool): """Hàm lấy giá trị an toàn từ pool theo index""" if isinstance(idx, int) and 0 <= idx < len(pool): return pool[idx] return idx def _resolve_img(self, img_idx, pool): """Xử lý lấy URL ảnh từ cấu trúc lồng nhau""" img_obj = self._get_val(img_idx, pool) if isinstance(img_obj, dict) and "filePath" in img_obj: path = self._get_val(img_obj["filePath"], pool) if isinstance(path, str): return f"{self.img_base}{path}" return "" async def fetch_data(self) -> List[Dict]: if (self.cache_data and self.cache_expire and datetime.utcnow() < self.cache_expire): return self.cache_data response = self.scraper.get(self.api_url, timeout=20) if response.status_code != 200: return self.cache_data or [] # Xử lý SvelteKit multi-line JSON lines = response.text.strip().split("\n") full_pool = [] for line in lines: try: chunk = json.loads(line) # SvelteKit thường để data trong nodes -> data if "nodes" in chunk: for node in chunk["nodes"]: if node and "data" in node: full_pool.extend(node["data"]) elif "data" in chunk: full_pool.extend(chunk["data"]) except: continue if not full_pool: return [] items = [] # Duyệt qua pool để tìm các object là Anime (có title và slug) for entry in full_pool: if isinstance(entry, dict) and "title" in entry and "slug" in entry: try: # Lấy Title & Slug title = self._get_val(entry["title"], full_pool) slug = self._get_val(entry["slug"], full_pool) if not isinstance(slug, str): continue # Fix Encoding nếu bị lỗi latin1 if isinstance(title, str): try: title = title.encode("latin1").decode("utf-8") except: pass # Episode ep_val = self._get_val(entry.get("episodeNumber", 1), full_pool) # Ảnh poster = self._resolve_img(entry.get("posterImage"), full_pool) backdrop = self._resolve_img(entry.get("backdropImage"), full_pool) # TRUY VẾT STUDIO (Logic từ ht_noti.py) studios = [] st_ref = entry.get("studios") st_list = self._get_val(st_ref, full_pool) if isinstance(st_list, list): for st_idx in st_list: st_wrapper = self._get_val(st_idx, full_pool) if isinstance(st_wrapper, dict) and "studio" in st_wrapper: st_info = self._get_val(st_wrapper["studio"], full_pool) if isinstance(st_info, dict) and "name" in st_info: st_name = self._get_val(st_info["name"], full_pool) if isinstance(st_name, str): try: st_name = st_name.encode("latin1").decode("utf-8") except: pass studios.append(st_name) items.append({ "title": title, "slug": slug, "episode": int(ep_val) if isinstance(ep_val, (int, float)) else 1, "poster_url": poster, "backdrop_url": backdrop, "studios": studios, "link": f"{self.base_url}/{slug}" }) except: continue # Loại bỏ các item lỗi hoặc trùng lặp unique_items = list({it["slug"]: it for it in items}.values()) self.cache_data = unique_items self.cache_expire = datetime.utcnow() + timedelta(seconds=self.cache_ttl) return unique_items async def get_random(self, apikey: Optional[str], system_key: Optional[str], limit: str = "1"): if system_key and apikey != system_key: raise HTTPException(status_code=403, detail="Invalid API key") try: if "-" in limit: low, high = map(int, limit.split("-")) count = random.randint(max(1, low), min(50, high)) else: count = max(1, min(50, int(limit))) except: count = 1 all_items = await self.fetch_data() if not all_items: return {"ok": False, "msg": "No data found"} selected = random.sample(all_items, min(count, len(all_items))) return { "ok": True, "total_pool": len(all_items), "item": selected if (count > 1 or "-" in limit) else selected[0], "source": "hentaiz.dog", "cached": True, "made_by": "devily" }