| |
| import cloudscraper |
| import json |
| import random |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional, Any |
| from fastapi import HTTPException |
|
|
| class HentaiRandomApp: |
| def __init__(self): |
| self.base_url = "https://hentaiz.dog/watch" |
| self.api_url = ( |
| "https://hentaiz.dog/__data.json" |
| "?x-sveltekit-trailing-slash=1" |
| "&x-sveltekit-invalidated=001" |
| ) |
| self.img_base = "https://storage.haiten.org" |
| self.scraper = cloudscraper.create_scraper( |
| browser={"browser": "chrome", "platform": "windows"} |
| ) |
| self.cache_data: Optional[List[Dict]] = None |
| self.cache_expire: Optional[datetime] = None |
| self.cache_ttl = 600 |
|
|
| async def start(self): |
| return |
|
|
| async def stop(self): |
| try: |
| self.scraper.close() |
| except: |
| pass |
|
|
| def _get_val(self, idx, pool): |
| """Hàm lấy giá trị an toàn từ pool theo index""" |
| if isinstance(idx, int) and 0 <= idx < len(pool): |
| return pool[idx] |
| return idx |
|
|
| def _resolve_img(self, img_idx, pool): |
| """Xử lý lấy URL ảnh từ cấu trúc lồng nhau""" |
| img_obj = self._get_val(img_idx, pool) |
| if isinstance(img_obj, dict) and "filePath" in img_obj: |
| path = self._get_val(img_obj["filePath"], pool) |
| if isinstance(path, str): |
| return f"{self.img_base}{path}" |
| return "" |
|
|
| async def fetch_data(self) -> List[Dict]: |
| if (self.cache_data and self.cache_expire and datetime.utcnow() < self.cache_expire): |
| return self.cache_data |
|
|
| response = self.scraper.get(self.api_url, timeout=20) |
| if response.status_code != 200: |
| return self.cache_data or [] |
|
|
| |
| lines = response.text.strip().split("\n") |
| full_pool = [] |
| |
| for line in lines: |
| try: |
| chunk = json.loads(line) |
| |
| if "nodes" in chunk: |
| for node in chunk["nodes"]: |
| if node and "data" in node: |
| full_pool.extend(node["data"]) |
| elif "data" in chunk: |
| full_pool.extend(chunk["data"]) |
| except: |
| continue |
|
|
| if not full_pool: |
| return [] |
|
|
| items = [] |
| |
| for entry in full_pool: |
| if isinstance(entry, dict) and "title" in entry and "slug" in entry: |
| try: |
| |
| title = self._get_val(entry["title"], full_pool) |
| slug = self._get_val(entry["slug"], full_pool) |
| |
| if not isinstance(slug, str): continue |
|
|
| |
| if isinstance(title, str): |
| try: title = title.encode("latin1").decode("utf-8") |
| except: pass |
|
|
| |
| ep_val = self._get_val(entry.get("episodeNumber", 1), full_pool) |
| |
| |
| poster = self._resolve_img(entry.get("posterImage"), full_pool) |
| backdrop = self._resolve_img(entry.get("backdropImage"), full_pool) |
|
|
| |
| studios = [] |
| st_ref = entry.get("studios") |
| st_list = self._get_val(st_ref, full_pool) |
| |
| if isinstance(st_list, list): |
| for st_idx in st_list: |
| st_wrapper = self._get_val(st_idx, full_pool) |
| if isinstance(st_wrapper, dict) and "studio" in st_wrapper: |
| st_info = self._get_val(st_wrapper["studio"], full_pool) |
| if isinstance(st_info, dict) and "name" in st_info: |
| st_name = self._get_val(st_info["name"], full_pool) |
| if isinstance(st_name, str): |
| try: st_name = st_name.encode("latin1").decode("utf-8") |
| except: pass |
| studios.append(st_name) |
|
|
| items.append({ |
| "title": title, |
| "slug": slug, |
| "episode": int(ep_val) if isinstance(ep_val, (int, float)) else 1, |
| "poster_url": poster, |
| "backdrop_url": backdrop, |
| "studios": studios, |
| "link": f"{self.base_url}/{slug}" |
| }) |
| except: |
| continue |
|
|
| |
| unique_items = list({it["slug"]: it for it in items}.values()) |
| self.cache_data = unique_items |
| self.cache_expire = datetime.utcnow() + timedelta(seconds=self.cache_ttl) |
| return unique_items |
|
|
| async def get_random(self, apikey: Optional[str], system_key: Optional[str], limit: str = "1"): |
| if system_key and apikey != system_key: |
| raise HTTPException(status_code=403, detail="Invalid API key") |
|
|
| try: |
| if "-" in limit: |
| low, high = map(int, limit.split("-")) |
| count = random.randint(max(1, low), min(50, high)) |
| else: |
| count = max(1, min(50, int(limit))) |
| except: |
| count = 1 |
|
|
| all_items = await self.fetch_data() |
| if not all_items: |
| return {"ok": False, "msg": "No data found"} |
|
|
| selected = random.sample(all_items, min(count, len(all_items))) |
|
|
| return { |
| "ok": True, |
| "total_pool": len(all_items), |
| "item": selected if (count > 1 or "-" in limit) else selected[0], |
| "source": "hentaiz.dog", |
| "cached": True, |
| "made_by": "devily" |
| } |
|
|