| from fastapi import FastAPI, HTTPException, Query |
| import httpx |
| from bs4 import BeautifulSoup |
| import uvicorn |
| import os |
| from urllib.parse import unquote, urlparse, parse_qs |
| from contextlib import asynccontextmanager |
| import asyncio |
| import re |
| import logging |
| import sys |
|
|
| |
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| handlers=[ |
| logging.StreamHandler(sys.stdout) |
| ] |
| ) |
| logger = logging.getLogger("PDALifeScraper") |
|
|
| |
| |
| |
| BASE_DOMAIN = "https://pdalife.com" |
| CDN_DOMAIN = "https://mobdisc.com" |
|
|
| |
| client = None |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| global client |
| logger.info("Starting application lifespan...") |
| |
| |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", |
| "Referer": "https://pdalife.com/", |
| "Accept-Language": "en-US,en;q=0.9", |
| } |
| |
| logger.info("Initializing HTTPX AsyncClient with custom headers.") |
| |
| client = httpx.AsyncClient( |
| headers=headers, |
| verify=False, |
| follow_redirects=True, |
| timeout=None |
| ) |
| |
| logger.info("Application startup complete. Ready to accept requests.") |
| yield |
| |
| logger.info("Shutting down application. Closing HTTPX client.") |
| await client.aclose() |
| logger.info("HTTPX client closed. Application stopped.") |
| |
| app = FastAPI(title="PDALife Scraper", lifespan=lifespan) |
|
|
| |
| |
| |
|
|
| def unwrap_google_url(url: str) -> str: |
| """ |
| Membersihkan URL dari wrapper Google Translate dan menangani Relative Path |
| sesuai domain aslinya (PDALife vs MobDisc). |
| """ |
| if not url: return "" |
| clean = unquote(url) |
| |
| |
| |
| clean = clean.replace("-com.translate.goog", ".com") |
| clean = clean.replace(".translate.goog", "") |
| |
| |
| clean = clean.split("?_x_tr_")[0] |
| clean = clean.split("&_x_tr_")[0] |
| |
| |
| if clean.startswith("/"): |
| |
| if clean.startswith("/fdl/"): |
| return CDN_DOMAIN + clean |
| |
| else: |
| return BASE_DOMAIN + clean |
| |
| |
| if "https://" in clean and "http" in clean[8:]: |
| |
| match = re.search(r'(https?://[^&]+)', clean) |
| if match: |
| return match.group(1) |
|
|
| return clean |
|
|
| async def fetch_until_success(url: str, validator_func) -> BeautifulSoup: |
| """ |
| Mencoba fetch URL. |
| MODIFIED (FULL ASYNC): Parsing BeautifulSoup dipindah ke thread terpisah |
| untuk menghindari blocking pada event loop. |
| """ |
| target_url = url |
| if target_url.startswith("/"): |
| target_url = BASE_DOMAIN + target_url |
|
|
| logger.info(f"Initiating fetch request for URL: {target_url}") |
|
|
| attempt_count = 0 |
| while True: |
| attempt_count += 1 |
| try: |
| res = await client.get(target_url) |
| |
| |
| if res.status_code == 429 or res.status_code >= 500: |
| logger.warning(f"Received status {res.status_code} for {target_url}. Retrying (Attempt {attempt_count})...") |
| continue |
| |
| |
| if res.status_code in [403, 451]: |
| logger.error(f"Access denied ({res.status_code}) for: {target_url}. Content likely blocked/DMCA.") |
| return None |
| |
| |
| if res.status_code == 404: |
| logger.error(f"URL not found (404): {target_url}") |
| return None |
|
|
| |
| |
| |
| |
| soup = await asyncio.to_thread(BeautifulSoup, res.content, 'html.parser') |
| |
| |
| |
| if validator_func(soup): |
| logger.info(f"Successfully fetched and validated content from: {target_url}") |
| return soup |
| |
| |
| |
| if "mobdisc" in str(res.url) or soup.select('a.b-download__button'): |
| logger.info(f"MobDisc content detected for: {target_url}") |
| return soup |
|
|
| |
| |
| if res.status_code == 200: |
| logger.warning(f"Fetched {target_url} with status 200, but validation failed (content missing). Stopping retry.") |
| return None |
| |
| except Exception as e: |
| |
| logger.error(f"Exception occurred while fetching {target_url}: {str(e)}. Retrying (Attempt {attempt_count})...") |
| continue |
| |
| return None |
|
|
| |
| |
| |
|
|
| async def scan_cdn_page_loop(dwn_url: str) -> str: |
| """ |
| Logika pengambilan link dari MobDisc berdasarkan HTML yang diberikan. |
| Diperbarui untuk mengembalikan URL direct MobDisc (dw...) |
| dan menangani proteksi 404. |
| """ |
| logger.info(f"Scanning CDN page loop for: {dwn_url}") |
| |
| def is_valid_mobdisc_page(soup): |
| |
| return bool(soup.select('a.b-download__button')) |
|
|
| |
| |
| soup = await fetch_until_success(dwn_url, is_valid_mobdisc_page) |
| |
| if not soup: |
| logger.warning(f"Failed to retrieve valid CDN page content for: {dwn_url}") |
| return unwrap_google_url(dwn_url) |
| |
| |
| |
| |
| final_mobdisc_url = dwn_url |
| |
| |
| |
| |
| |
| try: |
| head_res = await client.head(dwn_url) |
| if head_res.status_code in [301, 302] and 'location' in head_res.headers: |
| final_mobdisc_url = head_res.headers['location'] |
| elif head_res.status_code == 200: |
| final_mobdisc_url = str(head_res.url) |
| except: |
| pass |
|
|
| return unwrap_google_url(final_mobdisc_url) |
|
|
| async def process_item_fully(name, detail_url, image): |
| """ |
| Pipeline lengkap untuk satu aplikasi. |
| """ |
| logger.info(f"Processing item: {name} | URL: {detail_url}") |
| try: |
| |
| def detail_page_valid(s): |
| |
| return bool(s.select('a.game-versions__downloads-button')) or bool(s.select('.accordion-item')) |
| |
| app_soup = await fetch_until_success(detail_url, detail_page_valid) |
| if not app_soup: |
| logger.error(f"Failed to load detail page for: {name}") |
| return None |
|
|
| |
| link_items = [] |
| |
| |
| download_list_items = app_soup.select('.game-versions__downloads-list li') |
| |
| if download_list_items: |
| for item in download_list_items: |
| btn = item.select_one('a.game-versions__downloads-button') |
| size_tag = item.select_one('.game-versions__downloads-size') |
| if btn: |
| link_items.append({ |
| "tag": btn, |
| "size": size_tag.get_text(strip=True) if size_tag else "" |
| }) |
| else: |
| |
| logger.info(f"No download list found for {name}, trying standalone buttons.") |
| fallback_buttons = app_soup.select('a.game-versions__downloads-button') |
| for btn in fallback_buttons: |
| |
| size_tag = btn.select_one('.game-versions__downloads-size') |
| link_items.append({ |
| "tag": btn, |
| "size": size_tag.get_text(strip=True) if size_tag else "" |
| }) |
|
|
| if not link_items: |
| logger.warning(f"No download link tags found for: {name}") |
| return None |
| |
| logger.info(f"Found {len(link_items)} potential download links for {name}.") |
|
|
| final_links = [] |
| final_sizes = [] |
|
|
| |
| for index, item in enumerate(link_items): |
| link_tag = item["tag"] |
| item_size = item["size"] |
| raw_link = link_tag.get('href') |
| |
| if not raw_link: continue |
|
|
| |
| |
| if raw_link.startswith("magnet:"): |
| logger.info(f"Magnet link detected for {name} [{index}]. Adding directly.") |
| final_links.append(raw_link) |
| final_sizes.append(item_size) |
| continue |
| |
|
|
| processed_link = None |
| |
| |
| if "http" in raw_link and "pdalife.com" not in raw_link and "/dwn/" not in raw_link: |
| logger.info(f"External link detected for {name} [{index}]. Attempting to scan target: {raw_link}") |
| direct_link = await scan_cdn_page_loop(raw_link) |
| if direct_link: |
| processed_link = direct_link |
| else: |
| logger.info(f"CDN scan failed for external link. Using raw link for {name} [{index}].") |
| processed_link = raw_link |
| else: |
| |
| dwn_link = unwrap_google_url(raw_link) |
| logger.info(f"Unwrapped DWN link for {name} [{index}]: {dwn_link}") |
| |
| |
| direct_link = await scan_cdn_page_loop(dwn_link) |
| if direct_link: |
| processed_link = direct_link |
| |
| if processed_link: |
| final_links.append(processed_link) |
| final_sizes.append(item_size) |
|
|
| if not final_links: |
| logger.warning(f"Final data list empty for {name} after processing all links.") |
| return None |
|
|
| |
| joined_downloads = ", ".join(final_links) |
| |
| joined_sizes = ", ".join([s for s in final_sizes if s]) |
| |
| logger.info(f"Successfully processed item: {name} with {len(final_links)} links.") |
| |
| return { |
| "name": name, |
| "link": unwrap_google_url(detail_url), |
| "image": image, |
| "download": joined_downloads, |
| "size": joined_sizes |
| } |
| |
| except Exception as e: |
| logger.error(f"Exception processing item {name}: {str(e)}") |
| return None |
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "message": "Search API for PDALife.com by Bowo", |
| "github": "https://github.com/SaptaZ", |
| "example_usage": "/search?query=minecraft&limit=5" |
| } |
|
|
| @app.get("/search") |
| async def search_apps( |
| query: str = Query(..., description="App name"), |
| limit: int = Query(5, description="Limit results") |
| ): |
| logger.info(f"Search request received. Query: '{query}' | Limit: {limit}") |
| |
| |
| def search_page_valid(s): |
| |
| return bool(s.select('.catalog-item')) or "Found 0 responses" in s.get_text() |
|
|
| collected_item_elements = [] |
| current_page = 1 |
| max_page = 1 |
| |
| |
| |
| |
| while len(collected_item_elements) < limit: |
| logger.info(f"Pagination: Fetching page {current_page} for query '{query}'") |
| |
| |
| if current_page == 1: |
| search_url = f"{BASE_DOMAIN}/search/{query}" |
| else: |
| search_url = f"{BASE_DOMAIN}/search/{query}/page-{current_page}/" |
| |
| soup = await fetch_until_success(search_url, search_page_valid) |
| |
| if not soup: |
| logger.warning(f"Pagination stopped. Could not fetch page {current_page}.") |
| break |
|
|
| |
| if "Found 0 responses" in soup.get_text(): |
| logger.info(f"Search found 0 responses on page {current_page}.") |
| break |
|
|
| |
| page_items = soup.select('.catalog-item') |
| if not page_items: |
| logger.info(f"No catalog items found on page {current_page}.") |
| break |
| |
| collected_item_elements.extend(page_items) |
| logger.info(f"Collected {len(page_items)} items from page {current_page}. Total collected: {len(collected_item_elements)}") |
|
|
| |
| load_more_btn = soup.select_one('.js-load_more') |
| if load_more_btn and load_more_btn.has_attr('data-max_page'): |
| try: |
| max_page = int(load_more_btn['data-max_page']) |
| logger.info(f"Max page detected: {max_page}") |
| except: |
| pass |
| |
| |
| if current_page >= max_page: |
| logger.info("Reached last known page. Stopping pagination.") |
| break |
| |
| current_page += 1 |
| |
| |
| |
|
|
| if not collected_item_elements: |
| logger.info(f"Search finished. No items found for query '{query}'.") |
| return {"success": True, "count": 0, "results": []} |
|
|
| |
| items_to_process = collected_item_elements[:limit] |
| logger.info(f"Starting concurrent processing for {len(items_to_process)} items.") |
| |
| tasks = [] |
| |
| for item in items_to_process: |
| title_el = item.select_one('.catalog-item__title a') |
| if not title_el: continue |
| |
| name = title_el.get_text(strip=True) |
| |
| detail_href = title_el['href'] |
| detail_link = unwrap_google_url(detail_href) |
| |
| img_el = item.select_one('.catalog-item__poster img') |
| image = unwrap_google_url(img_el['src']) if img_el else "" |
| |
| |
| tasks.append(process_item_fully(name, detail_link, image)) |
|
|
| |
| results = await asyncio.gather(*tasks) |
| |
| |
| valid_results = [r for r in results if r is not None] |
| |
| logger.info(f"Search request completed. Returning {len(valid_results)} valid results.") |
|
|
| return { |
| "success": True, |
| "query": query, |
| "limit": limit, |
| "count": len(valid_results), |
| "results": valid_results |
| } |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| logger.info(f"Starting Uvicorn server on port {port}...") |
| uvicorn.run(app, host="0.0.0.0", port=port) |