weu / main.py
Aqso's picture
Rename app.py to main.py
683f72b verified
import os
import json
import asyncio
from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
import firebase_admin
from firebase_admin import credentials, firestore
# --- INIT FIREBASE ---
def init_db():
if not firebase_admin._apps:
cred_json = os.getenv("FIREBASE_CONFIG")
if not cred_json:
raise Exception("ERROR: FIREBASE_CONFIG env is empty!")
cred_dict = json.loads(cred_json)
cred = credentials.Certificate(cred_dict)
firebase_admin.initialize_app(cred)
return firestore.client()
db = init_db()
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# --- SCRAPER ENGINE ---
class Scraper:
def __init__(self):
self.base_url = "https://anichin.cafe"
async def run_stealth(self, url):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
page = await context.new_page()
await stealth_async(page)
try:
await page.goto(url, wait_until="networkidle", timeout=60000)
return await page.content(), page
except Exception as e:
print(f"Error Scrape: {e}")
return None, None
finally:
# Browser ditutup di level pemanggil setelah data diambil
pass
async def sync_all(self):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
await stealth_async(page)
# Scrape Daftar Donghua
await page.goto(f"{self.base_url}/donghua-list/", wait_until="networkidle")
items = await page.query_selector_all(".listupd .bs")
for item in items:
link_el = await item.query_selector("a")
title_el = await item.query_selector(".tt")
img_el = await item.query_selector("img")
title = (await title_el.inner_text()).strip()
url = await link_el.get_attribute("href")
thumb = await img_el.get_attribute("src")
# Save metadata dasar ke Firebase
doc_ref = db.collection("donghua").document(title.replace("/", "-"))
doc_ref.set({
"title": title,
"url": url,
"thumb": thumb,
"status": "In Database"
}, merge=True)
await browser.close()
# --- ROUTES ---
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
docs = db.collection("donghua").stream()
donghua_list = [d.to_dict() for d in docs]
return templates.TemplateResponse("index.html", {"request": request, "data": donghua_list})
@app.get("/watch/{title}")
async def watch(request: Request, title: str):
doc = db.collection("donghua").document(title).get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Donghua not found")
data = doc.to_dict()
# Logic: Ambil episode list jika belum ada di DB
if "episodes" not in data:
scr = Scraper()
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
ctx = await browser.new_context()
pg = await ctx.new_page()
await stealth_async(pg)
await pg.goto(data['url'], wait_until="networkidle")
ep_elements = await pg.query_selector_all(".eplister li a")
episodes = []
for ep in ep_elements:
url = await ep.get_attribute("href")
num = (await (await ep.query_selector(".epl-num")).inner_text()).strip()
episodes.append({"num": num, "url": url})
data['episodes'] = episodes
db.collection("donghua").document(title).update({"episodes": episodes})
await browser.close()
return templates.TemplateResponse("watch.html", {"request": request, "donghua": data})
@app.get("/get_stream")
async def get_stream(url: str):
scr = Scraper()
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
ctx = await browser.new_context()
pg = await ctx.new_page()
await stealth_async(pg)
await pg.goto(url, wait_until="networkidle")
# Cari iframe player
iframe = await pg.query_selector("iframe#pembed")
if not iframe:
iframe = await pg.query_selector(".video-content iframe")
stream_url = await iframe.get_attribute("src") if iframe else None
await browser.close()
return {"stream_url": stream_url}
@app.post("/sync")
async def start_sync(bt: BackgroundTasks):
scr = Scraper()
bt.add_task(scr.sync_all)
return {"message": "Sync started"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)