|
|
import os |
|
|
from fastapi import FastAPI, Header, HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import boto3 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DO_SPACES_REGION = "blr1" |
|
|
DO_SPACES_BUCKET = "milestone" |
|
|
DO_ACCESS_KEY = os.getenv("DO_SPACES_KEY") |
|
|
DO_SECRET_KEY = os.getenv("DO_SPACES_SECRET") |
|
|
|
|
|
BASE_URL = f"https://{DO_SPACES_BUCKET}.{DO_SPACES_REGION}.digitaloceanspaces.com" |
|
|
|
|
|
AUTH_TOKEN = "logicgo_admin@123" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_spaces_client(): |
|
|
return boto3.client( |
|
|
"s3", |
|
|
region_name=DO_SPACES_REGION, |
|
|
endpoint_url=f"https://{DO_SPACES_REGION}.digitaloceanspaces.com", |
|
|
aws_access_key_id=DO_ACCESS_KEY, |
|
|
aws_secret_access_key=DO_SECRET_KEY, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_auth(authorization: str = Header(None)): |
|
|
if not authorization: |
|
|
raise HTTPException(status_code=401, detail="Missing authorization header") |
|
|
|
|
|
token = authorization.replace("Bearer ", "").strip() |
|
|
if token != AUTH_TOKEN: |
|
|
raise HTTPException(status_code=403, detail="Invalid authorization token") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/images") |
|
|
def list_images(authorization: str = Header(None)): |
|
|
validate_auth(authorization) |
|
|
|
|
|
client = get_spaces_client() |
|
|
|
|
|
|
|
|
prompt_list = client.list_objects_v2( |
|
|
Bucket=DO_SPACES_BUCKET, |
|
|
Prefix="KiddoSnap/prompts/" |
|
|
) |
|
|
|
|
|
contents = prompt_list.get("Contents", []) |
|
|
txt_files = [f for f in contents if f["Key"].endswith(".txt")] |
|
|
|
|
|
results = [] |
|
|
|
|
|
for txt_file in txt_files: |
|
|
file_key = txt_file["Key"] |
|
|
file_name = file_key.split("/")[-1] |
|
|
file_number = file_name.replace(".txt", "").split("-")[1] |
|
|
|
|
|
|
|
|
obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=file_key) |
|
|
prompt_text = obj["Body"].read().decode("utf-8") |
|
|
|
|
|
|
|
|
thumb_url = f"{BASE_URL}/KiddoSnap/thumbnails/thumb-image-{file_number}.png" |
|
|
|
|
|
results.append({ |
|
|
"file": file_name, |
|
|
"prompt": prompt_text, |
|
|
"thumbnail": thumb_url |
|
|
}) |
|
|
|
|
|
return {"images": results} |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
def home(): |
|
|
return {"message": "FastAPI KiddoSnap images API running on HuggingFace Docker Space"} |
|
|
|