Spaces:
Sleeping
Sleeping
File size: 2,602 Bytes
d2904be 312ed82 d2904be 312ed82 d2904be | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | from fastapi import FastAPI, UploadFile, File, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import tempfile
import shutil
import os
from utils import check_image_nsfw, check_video_nsfw, download_file
app = FastAPI()
# ✅ CORS CONFIG
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 🔥 allow all (change later if needed)
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
MAX_MB = 25
@app.get("/health")
def health():
return {"status": "ok", "api": "live", "creator": "JerryCoder"}
# -----------------------------
# Save upload with size limit
# -----------------------------
def save_upload(file: UploadFile):
tmp = tempfile.NamedTemporaryFile(delete=False)
size = 0
with open(tmp.name, "wb") as buffer:
while chunk := file.file.read(1024 * 1024):
size += len(chunk)
if size > MAX_MB * 1024 * 1024:
buffer.close()
os.remove(tmp.name)
raise HTTPException(413, "Max 25MB exceeded")
buffer.write(chunk)
return tmp.name
def is_video(name):
return name.lower().endswith((".mp4", ".mov", ".avi", ".mkv", ".webm"))
def is_image(name):
return name.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))
# -----------------------------
# POST upload
# -----------------------------
@app.post("/detect")
async def detect(file: UploadFile = File(...)):
path = save_upload(file)
try:
if is_image(file.filename):
return {"type": "image", "nsfw": check_image_nsfw(path)}
elif is_video(file.filename):
return {"type": "video", "nsfw": check_video_nsfw(path)}
else:
raise HTTPException(400, "Unsupported file")
finally:
if os.path.exists(path):
os.remove(path)
# -----------------------------
# GET URL
# -----------------------------
@app.get("/detect")
async def detect_url(url: str = Query(...)):
try:
path = download_file(url)
except Exception:
raise HTTPException(400, "Cannot fetch URL")
try:
size = os.path.getsize(path) / (1024 * 1024)
if size > MAX_MB:
raise HTTPException(413, "Max 25MB exceeded")
if is_image(url):
return {"type": "image", "nsfw": check_image_nsfw(path)}
elif is_video(url):
return {"type": "video", "nsfw": check_video_nsfw(path)}
else:
raise HTTPException(400, "Unsupported file")
finally:
if os.path.exists(path):
os.remove(path) |