| from fastapi import FastAPI, HTTPException, Query |
| import httpx |
| from bs4 import BeautifulSoup |
| import uvicorn |
| import os |
| from urllib.parse import unquote, urlparse, parse_qs |
| from contextlib import asynccontextmanager |
| import asyncio |
|
|
| |
| client = None |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| global client |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| } |
| |
| client = httpx.AsyncClient(headers=headers, verify=False, follow_redirects=True, timeout=None) |
| yield |
| await client.aclose() |
|
|
| app = FastAPI(title="AN1.com Scraper", lifespan=lifespan) |
|
|
| BASE_DOMAIN = "https://an1.com" |
|
|
| def unwrap_google_url(url: str) -> str: |
| """Membersihkan URL dari wrapper Google Translate.""" |
| if not url: return "" |
| clean = unquote(url) |
| |
| if "google" in clean and "/website" in clean and "u=" in clean: |
| try: |
| parsed = urlparse(clean) |
| qs = parse_qs(parsed.query) |
| if 'u' in qs: |
| return unwrap_google_url(qs['u'][0]) |
| except: |
| pass |
|
|
| clean = clean.replace("an1-com.translate.goog", "an1.com") |
| clean = clean.replace("files-an1-net.translate.goog", "files.an1.net") |
| clean = clean.replace("file-an1-co.translate.goog", "file.an1.co") |
| clean = clean.split("?_x_tr_")[0] |
| clean = clean.split("&_x_tr_")[0] |
| |
| if clean.startswith("/"): |
| clean = BASE_DOMAIN + clean |
| return clean |
|
|
| async def fetch_until_success(url: str, validator_func) -> BeautifulSoup: |
| """ |
| Core Logic: Terus melakukan request ke URL sampai validator_func mengembalikan True. |
| Jika terkena 429 (Too Many Requests) pada proxy, ganti ke direct URL. |
| """ |
| target_url = url |
| while True: |
| try: |
| res = await client.get(target_url) |
| |
| |
| if res.status_code == 429: |
| |
| clean_url = unwrap_google_url(target_url) |
| |
| |
| if clean_url != target_url: |
| target_url = clean_url |
| continue |
| |
|
|
| soup = BeautifulSoup(res.text, 'html.parser') |
| |
| |
| if validator_func(soup): |
| return soup |
| except Exception: |
| |
| pass |
|
|
| async def scan_intermediate_page_loop(intermediate_url: str) -> str: |
| """Looping scraping halaman intermediate sampai dapat link asli.""" |
| target_url = intermediate_url.replace("https://an1.com", "https://an1-com.translate.goog") |
| if "?" not in target_url: |
| target_url += "?_x_tr_sl=auto&_x_tr_tl=en&_x_tr_hl=en" |
|
|
| |
| def is_valid_intermediate(soup): |
| |
| btn = soup.select_one('a#pre_download') |
| if btn and btn.get('href'): return True |
| |
| |
| for b in soup.select('a.btn-green'): |
| if b.get('href'): return True |
| return False |
|
|
| |
| soup = await fetch_until_success(target_url, is_valid_intermediate) |
| |
| |
| candidates = [] |
| pre_download_btn = soup.select_one('a#pre_download') |
| if pre_download_btn: candidates.append(pre_download_btn['href']) |
| |
| for btn in soup.select('a.btn-green'): |
| candidates.append(btn['href']) |
| |
| for raw_link in candidates: |
| real_link = unwrap_google_url(raw_link) |
| if ("file.an1" in real_link or "files.an1" in real_link) and "an1store" not in real_link: |
| return real_link |
| |
| |
| return "" |
|
|
| async def process_item_fully(name, raw_link, image): |
| """ |
| Memproses satu item app sampai SEMUA data (size, download link) didapatkan. |
| Tidak akan return sampai data lengkap. |
| """ |
| while True: |
| try: |
| |
| |
| def detail_valid(s): |
| return bool(s.select('a.download_line.green')) |
| |
| app_soup = await fetch_until_success(raw_link, detail_valid) |
| |
| |
| size_el = app_soup.select_one('[itemprop="fileSize"]') |
| size = size_el.get_text(strip=True) if size_el else "Unknown" |
| |
| |
| if size == "Unknown": |
| continue |
|
|
| |
| final_links = [] |
| buttons = app_soup.select('a.download_line.green') |
| |
| all_links_success = True |
| for btn in buttons: |
| rel_link = btn.get('href') |
| if not rel_link: continue |
| |
| intermediate_url = unwrap_google_url(rel_link) |
| |
| |
| direct_link = await scan_intermediate_page_loop(intermediate_url) |
| |
| if direct_link: |
| final_links.append(direct_link) |
| else: |
| |
| all_links_success = False |
| break |
| |
| if not final_links or not all_links_success: |
| continue |
|
|
| |
| return { |
| "name": name, |
| "link": unwrap_google_url(raw_link), |
| "image": image, |
| "download": ", ".join(final_links), |
| "size": size |
| } |
| |
| except Exception: |
| continue |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "message": "Search API for An1.com by Bowo", |
| "github": "https://github.com/SaptaZ", |
| "example_usage": "/search?query=minecraft&limit=5" |
| } |
|
|
| @app.get("/search") |
| async def search_apps( |
| query: str = Query(..., description="App name"), |
| limit: int = Query(5) |
| ): |
| search_url = f"https://an1-com.translate.goog/index.php?do=search&subaction=search&story={query}&_x_tr_sl=auto&_x_tr_tl=en&_x_tr_hl=en" |
| |
| |
| |
| def search_page_valid(s): |
| has_items = bool(s.select('.search-result .item, .item')) |
| no_result_msg = "yielded no results" in s.get_text() or "did not match any documents" in s.get_text() |
| return has_items or no_result_msg |
|
|
| soup = await fetch_until_success(search_url, search_page_valid) |
| |
| |
| if "yielded no results" in soup.get_text() or "did not match any documents" in soup.get_text(): |
| return { |
| "success": True, |
| "query": query, |
| "limit": limit, |
| "count": 0, |
| "results": [] |
| } |
|
|
| |
| items = soup.select('.search-result .item, .item') |
| tasks = [] |
| |
| |
| for item in items[:limit]: |
| name_el = item.select_one('.title a') or item.select_one('a[href*=".html"]') |
| if not name_el: continue |
| |
| name = name_el.get_text(strip=True) |
| raw_link = name_el['href'] |
| img_el = item.select_one('img') |
| image = unwrap_google_url(img_el['src']) if img_el else "" |
| |
| |
| tasks.append(process_item_fully(name, raw_link, image)) |
| |
| |
| results = await asyncio.gather(*tasks) |
|
|
| return { |
| "success": True, |
| "query": query, |
| "limit": limit, |
| "count": len(results), |
| "results": results |
| } |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| uvicorn.run(app, host="0.0.0.0", port=port) |