| import os |
| import json |
| import asyncio |
| from fastapi import FastAPI, Request, BackgroundTasks, HTTPException |
| from fastapi.responses import HTMLResponse |
| from fastapi.templating import Jinja2Templates |
| from playwright.async_api import async_playwright |
| from playwright_stealth import stealth_async |
| import firebase_admin |
| from firebase_admin import credentials, firestore |
|
|
| |
| def init_db(): |
| if not firebase_admin._apps: |
| cred_json = os.getenv("FIREBASE_CONFIG") |
| if not cred_json: |
| raise Exception("ERROR: FIREBASE_CONFIG env is empty!") |
| cred_dict = json.loads(cred_json) |
| cred = credentials.Certificate(cred_dict) |
| firebase_admin.initialize_app(cred) |
| return firestore.client() |
|
|
| db = init_db() |
| app = FastAPI() |
| templates = Jinja2Templates(directory="templates") |
|
|
| |
| class Scraper: |
| def __init__(self): |
| self.base_url = "https://anichin.cafe" |
|
|
| async def run_stealth(self, url): |
| async with async_playwright() as p: |
| browser = await p.chromium.launch(headless=True) |
| context = await browser.new_context( |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" |
| ) |
| page = await context.new_page() |
| await stealth_async(page) |
| try: |
| await page.goto(url, wait_until="networkidle", timeout=60000) |
| return await page.content(), page |
| except Exception as e: |
| print(f"Error Scrape: {e}") |
| return None, None |
| finally: |
| |
| pass |
|
|
| async def sync_all(self): |
| async with async_playwright() as p: |
| browser = await p.chromium.launch(headless=True) |
| context = await browser.new_context() |
| page = await context.new_page() |
| await stealth_async(page) |
| |
| |
| await page.goto(f"{self.base_url}/donghua-list/", wait_until="networkidle") |
| items = await page.query_selector_all(".listupd .bs") |
| |
| for item in items: |
| link_el = await item.query_selector("a") |
| title_el = await item.query_selector(".tt") |
| img_el = await item.query_selector("img") |
| |
| title = (await title_el.inner_text()).strip() |
| url = await link_el.get_attribute("href") |
| thumb = await img_el.get_attribute("src") |
|
|
| |
| doc_ref = db.collection("donghua").document(title.replace("/", "-")) |
| doc_ref.set({ |
| "title": title, |
| "url": url, |
| "thumb": thumb, |
| "status": "In Database" |
| }, merge=True) |
| |
| await browser.close() |
|
|
| |
| @app.get("/", response_class=HTMLResponse) |
| async def index(request: Request): |
| docs = db.collection("donghua").stream() |
| donghua_list = [d.to_dict() for d in docs] |
| return templates.TemplateResponse("index.html", {"request": request, "data": donghua_list}) |
|
|
| @app.get("/watch/{title}") |
| async def watch(request: Request, title: str): |
| doc = db.collection("donghua").document(title).get() |
| if not doc.exists: |
| raise HTTPException(status_code=404, detail="Donghua not found") |
| |
| data = doc.to_dict() |
| |
| if "episodes" not in data: |
| scr = Scraper() |
| async with async_playwright() as p: |
| browser = await p.chromium.launch(headless=True) |
| ctx = await browser.new_context() |
| pg = await ctx.new_page() |
| await stealth_async(pg) |
| await pg.goto(data['url'], wait_until="networkidle") |
| |
| ep_elements = await pg.query_selector_all(".eplister li a") |
| episodes = [] |
| for ep in ep_elements: |
| url = await ep.get_attribute("href") |
| num = (await (await ep.query_selector(".epl-num")).inner_text()).strip() |
| episodes.append({"num": num, "url": url}) |
| |
| data['episodes'] = episodes |
| db.collection("donghua").document(title).update({"episodes": episodes}) |
| await browser.close() |
|
|
| return templates.TemplateResponse("watch.html", {"request": request, "donghua": data}) |
|
|
| @app.get("/get_stream") |
| async def get_stream(url: str): |
| scr = Scraper() |
| async with async_playwright() as p: |
| browser = await p.chromium.launch(headless=True) |
| ctx = await browser.new_context() |
| pg = await ctx.new_page() |
| await stealth_async(pg) |
| await pg.goto(url, wait_until="networkidle") |
| |
| |
| iframe = await pg.query_selector("iframe#pembed") |
| if not iframe: |
| iframe = await pg.query_selector(".video-content iframe") |
| |
| stream_url = await iframe.get_attribute("src") if iframe else None |
| await browser.close() |
| return {"stream_url": stream_url} |
|
|
| @app.post("/sync") |
| async def start_sync(bt: BackgroundTasks): |
| scr = Scraper() |
| bt.add_task(scr.sync_all) |
| return {"message": "Sync started"} |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|