| | |
| | import os |
| | import uuid |
| | import platform |
| | import psutil |
| | import time |
| | from datetime import datetime |
| | from typing import List, Optional |
| | from fastapi import FastAPI, Query, HTTPException, Body, BackgroundTasks, Header |
| | from contextlib import asynccontextmanager |
| | from pydantic import BaseModel |
| | from fastapi.responses import FileResponse, Response |
| |
|
| | from app.invite_logic import DiscordInviteLogic |
| | invite_app = DiscordInviteLogic() |
| |
|
| | from app.cl_lq import ClLqApp |
| | lq_app = ClLqApp() |
| |
|
| | from app.gs_daily import GSDailyApp |
| | gs_app = GSDailyApp() |
| |
|
| | from app.gs_code import GSCodeApp |
| | gs_code_app = GSCodeApp() |
| |
|
| | from app.ht_noti import HentaiApp |
| | hentai_app = HentaiApp() |
| |
|
| | from app.ht_rd import HentaiRandomApp |
| | ht_random_app = HentaiRandomApp() |
| |
|
| | from app.tik_dl import TikTokDownloader |
| | tik_app = TikTokDownloader() |
| |
|
| | from app.byps_uv import BypassUV |
| | bypass_app = BypassUV() |
| |
|
| | from app.ios_app import IOSAppFetcher |
| | ios_app = IOSAppFetcher() |
| |
|
| | from app.gm_crate import GmailLogic |
| | gm_app = GmailLogic() |
| |
|
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | await invite_app.start() |
| | await lq_app.start() |
| | await gs_app.start() |
| | await gs_code_app.start() |
| | await hentai_app.start() |
| | await ht_random_app.start() |
| | await bypass_app.start() |
| | await ios_app.start() |
| | yield |
| | await invite_app.stop() |
| | await lq_app.stop() |
| | await gs_app.stop() |
| | await gs_code_app.stop() |
| | await hentai_app.stop() |
| | await ht_random_app.stop() |
| | await bypass_app.stop() |
| | await ios_app.stop() |
| |
|
| | app = FastAPI(lifespan=lifespan) |
| |
|
| | START_TIME = time.time() |
| |
|
| | @app.get("/api/v1/invites/discord") |
| | async def get_new_invite(): |
| | res = await invite_app.get_invite() |
| | if "error" in res: |
| | raise HTTPException(status_code=500, detail=res["error"]) |
| | return res |
| |
|
| | @app.get("/api/v1/get/lqAov") |
| | async def get_free_account(): |
| | res = await lq_app.fetch_acc() |
| | if not res["ok"]: |
| | raise HTTPException(status_code=500, detail=res["error"]) |
| | return res |
| |
|
| | API_KEY = os.getenv("API_KEY") |
| |
|
| | class DailyRequest(BaseModel): |
| | cookie: str |
| | discord_id: str |
| | server: str |
| |
|
| | @app.post("/api/v1/genshin/daily") |
| | async def genshin_daily( |
| | data: DailyRequest, |
| | x_api_key: str = Header(None) |
| | ): |
| | if not API_KEY: |
| | raise HTTPException(status_code=500, detail="500: Yo mama called, said stop hacking or she'll whoop ur ass. Listen to her for once.") |
| |
|
| | if x_api_key != API_KEY: |
| | raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
|
| | res = await gs_app.run_daily_and_capture( |
| | data.cookie, |
| | data.discord_id, |
| | data.server |
| | ) |
| |
|
| | if not res["ok"]: |
| | raise HTTPException(status_code=400, detail=res["error"]) |
| |
|
| | return res |
| |
|
| | class RedeemRequest(BaseModel): |
| | cookie: str |
| | server: str |
| | code: str |
| |
|
| | @app.post("/api/v1/genshin/redeem") |
| | async def genshin_redeem( |
| | data: RedeemRequest, |
| | x_api_key: str = Header(None) |
| | ): |
| | if x_api_key != API_KEY: |
| | raise HTTPException(status_code=403, detail="API key không hợp lệ") |
| |
|
| | res = await gs_code_app.redeem_code( |
| | data.cookie, |
| | data.server, |
| | data.code |
| | ) |
| |
|
| | if not res["ok"]: |
| | raise HTTPException(status_code=400, detail=res) |
| |
|
| | return res |
| |
|
| | @app.get("/api/v1/hentai/newest") |
| | async def hentai_newest( |
| | apikey: Optional[str] = Query(None), |
| | x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| | ): |
| | key_to_check = x_api_key or apikey |
| | return await hentai_app.get_new(key_to_check, API_KEY) |
| |
|
| | @app.get("/api/v1/hentai/info") |
| | async def hentai_info( |
| | slug: str = Query(...), |
| | apikey: Optional[str] = Query(None), |
| | x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| | ): |
| | key = x_api_key or apikey |
| | if not API_KEY or key != API_KEY: |
| | raise HTTPException(status_code=403, detail="Invalid API key") |
| | return await hentai_app.fetch_info(slug) |
| |
|
| | @app.get("/api/v1/hentai/random") |
| | async def hentai_random( |
| | apikey: Optional[str] = Query(None), |
| | limit: str = Query("1"), |
| | x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| | ): |
| | key = x_api_key or apikey |
| | return await ht_random_app.get_random(key, API_KEY, limit) |
| |
|
| | @app.get("/api/v1/tiktok/download") |
| | async def tiktok_download( |
| | url: str = Query(..., description="TikTok Video URL"), |
| | hd: int = Query(1, description="HD mode: 1 for ON, 0 for OFF"), |
| | apikey: Optional[str] = Query(None), |
| | x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| | ): |
| | key = x_api_key or apikey |
| | if not API_KEY or key != API_KEY: |
| | raise HTTPException(status_code=403, detail="API key invalid. Stop sniffin' around.") |
| |
|
| | res = await tik_app.fetch_video(url, hd) |
| | |
| | if not res["ok"]: |
| | raise HTTPException(status_code=400, detail=res["error"]) |
| | |
| | return res |
| |
|
| | @app.get("/api/v1/bypass") |
| | async def bypass_link( |
| | url: str = Query(...), |
| | apikey: Optional[str] = Query(None), |
| | x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| | ): |
| |
|
| | key = x_api_key or apikey |
| |
|
| | if not API_KEY or key != API_KEY: |
| | raise HTTPException(status_code=403, detail="API key invalid") |
| |
|
| | res = await bypass_app.bypass(url) |
| |
|
| | if not res["ok"]: |
| | raise HTTPException(status_code=400, detail=res["error"]) |
| |
|
| | return res |
| |
|
| | @app.get("/api/v1/ios/app") |
| | async def ios_app_fetch( |
| | n: str = Query(...), |
| | apikey: Optional[str] = Query(None), |
| | x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| | ): |
| |
|
| | key = x_api_key or apikey |
| |
|
| | if not API_KEY or key != API_KEY: |
| | raise HTTPException(status_code=403, detail="API key invalid") |
| |
|
| | res = await ios_app.get_app_accounts(n) |
| |
|
| | if not res["ok"]: |
| | raise HTTPException(status_code=404, detail=res["error"]) |
| |
|
| | return res |
| |
|
| | class ProductItem(BaseModel): |
| | name: str |
| | price: str |
| |
|
| | class GmailRequest(BaseModel): |
| | to_email: str |
| | subject: str |
| | customer_name: str |
| | order_id: str |
| | status: str |
| | total_price: str |
| | products: List[ProductItem] |
| | from_name: Optional[str] = "Celeste Store" |
| |
|
| | @app.post("/api/v1/gmail/create") |
| | async def create_gmail_notif( |
| | data: GmailRequest, |
| | x_api_key: str = Header(None) |
| | ): |
| | if x_api_key != API_KEY: |
| | raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
|
| | res = await gm_app.send_order_email( |
| | to_email=data.to_email, |
| | subject=data.subject, |
| | customer_name=data.customer_name, |
| | order_id=data.order_id, |
| | status=data.status, |
| | products=[p.dict() for p in data.products], |
| | total_price=data.total_price, |
| | from_name=data.from_name |
| | ) |
| |
|
| | if not res["ok"]: |
| | raise HTTPException(status_code=500, detail=res["error"]) |
| |
|
| | return res |
| |
|
| |
|
| | @app.get("/") |
| | def home(): |
| | uptime_seconds = int(time.time() - START_TIME) |
| | uptime_string = str(datetime.utcfromtimestamp(uptime_seconds).strftime("%H:%M:%S")) |
| |
|
| | memory = psutil.virtual_memory() |
| | cpu_percent = psutil.cpu_percent(interval=0.5) |
| | |
| | return { |
| | "status": "online", |
| | "uptime_seconds": uptime_seconds, |
| | "uptime_hms": uptime_string, |
| | "process_id": os.getpid(), |
| | "cpu_usage_percent": cpu_percent, |
| | "total_ram_mb": round(memory.total / 1024 / 1024, 2), |
| | "used_ram_mb": round(memory.used / 1024 / 1024, 2), |
| | "ram_usage_percent": memory.percent, |
| | "platform": platform.system(), |
| | "platform_release": platform.release(), |
| | "python_version": platform.python_version(), |
| | "hostname": platform.node() |
| | } |
| | |