| from fastapi import FastAPI, UploadFile, File, Query, HTTPException |
| import shutil |
| from fastapi.middleware.cors import CORSMiddleware |
| import tempfile |
| import os |
|
|
| from utils import ( |
| check_image_nsfw, |
| check_video_nsfw, |
| download_file |
| ) |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| MAX_SIZE_MB = 25 |
|
|
|
|
| @app.get("/health") |
| def health(): |
| return {"status": "ok", "api": "live", "creator": "JerryCoder"} |
|
|
|
|
| def save_upload(file: UploadFile): |
| tmp = tempfile.NamedTemporaryFile(delete=False) |
|
|
| size = 0 |
| with open(tmp.name, "wb") as buffer: |
| while chunk := file.file.read(1024 * 1024): |
| size += len(chunk) |
| if size > MAX_SIZE_MB * 1024 * 1024: |
| buffer.close() |
| os.remove(tmp.name) |
| raise HTTPException(status_code=413, detail="File too large (max 25MB)") |
| buffer.write(chunk) |
|
|
| return tmp.name |
|
|
|
|
| def is_video(filename): |
| return filename.lower().endswith((".mp4", ".mov", ".avi", ".mkv", ".webm")) |
|
|
|
|
| def is_image(filename): |
| return filename.lower().endswith((".jpg", ".jpeg", ".png", ".webp")) |
|
|
|
|
| @app.post("/detect") |
| async def detect(file: UploadFile = File(...)): |
| path = save_upload(file) |
|
|
| try: |
| if is_image(file.filename): |
| nsfw = check_image_nsfw(path) |
| return {"type": "image", "nsfw": nsfw} |
|
|
| elif is_video(file.filename): |
| nsfw = check_video_nsfw(path) |
| return {"type": "video", "nsfw": nsfw} |
|
|
| else: |
| raise HTTPException(status_code=400, detail="Unsupported file type") |
|
|
| finally: |
| if os.path.exists(path): |
| os.remove(path) |
|
|
|
|
| @app.get("/detect") |
| async def detect_url(url: str = Query(...)): |
| path = download_file(url) |
|
|
| try: |
| size_mb = os.path.getsize(path) / (1024 * 1024) |
| if size_mb > MAX_SIZE_MB: |
| os.remove(path) |
| raise HTTPException(status_code=413, detail="File too large (max 25MB)") |
|
|
| if is_image(url): |
| nsfw = check_image_nsfw(path) |
| return {"type": "image", "nsfw": nsfw} |
|
|
| elif is_video(url): |
| nsfw = check_video_nsfw(path) |
| return {"type": "video", "nsfw": nsfw} |
|
|
| else: |
| raise HTTPException(status_code=400, detail="Unsupported file type") |
|
|
| finally: |
| if os.path.exists(path): |
| os.remove(path) |