brightening-eyes's picture
initial commit
835c41f
import os
import uuid
import shutil
import sqlite3
import json
import logging
import asyncio
import numpy as np
import chromadb
import cv2
from datetime import datetime
from typing import List, Optional
from contextlib import asynccontextmanager
# FastAPI & Utilities
from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException, Request, Form
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
# AI Libraries
import torch
from PIL import Image
from sentence_transformers import SentenceTransformer
from transformers import BlipProcessor, BlipForConditionalGeneration
from insightface.app import FaceAnalysis
# --- CONFIGURATION ---
UPLOAD_DIR = "static/uploads"
DB_PATH = "photos.db"
CHROMA_PATH = "chroma_db"
device = "cuda" if torch.cuda.is_available() else "cpu"
# Ensure directories exist
os.makedirs(UPLOAD_DIR, exist_ok=True)
# --- LOGGING ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("CloudzyAI")
# --- GLOBAL MODELS (Loaded on Startup) ---
ai_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# 1. Load CLIP for Semantic Search (Text <-> Image)
logger.info("Loading CLIP model...")
ai_models["clip"] = SentenceTransformer('clip-ViT-B-32', device=device)
# 2. Load BLIP for Captioning
logger.info("Loading BLIP model...")
ai_models["blip_processor"] = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
ai_models["blip_model"] = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)
# 3. Load InsightFace for Smart Analysis
logger.info("Loading InsightFace model...")
# 'buffalo_l' is a good default model pack. It downloads automatically on first run.
app_face = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
app_face.prepare(ctx_id=0, det_size=(640, 640))
ai_models["face"] = app_face
# 4. Initialize Database
init_db()
yield
logger.info("Shutting down...")
app = FastAPI(lifespan=lifespan)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates") # Create this folder
# --- DATABASE SETUP (SQLite + ChromaDB) ---
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
collection = chroma_client.get_or_create_collection(name="photo_embeddings")
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS photos (
id TEXT PRIMARY KEY,
filename TEXT,
filepath TEXT,
upload_date TEXT,
caption TEXT,
tags TEXT,
smart_analysis TEXT,
status TEXT
)
""")
conn.commit()
conn.close()
# --- Pydantic Models ---
class PhotoResponse(BaseModel):
id: str
filename: str
url: str
caption: Optional[str] = None
tags: List[str] = []
smart_features: Optional[dict] = None
upload_date: str
# --- AI PROCESSING TASKS ---
def process_image_task(photo_id: str, file_path: str):
"""
Background task that runs the AI pipeline:
1. Generate Caption (BLIP)
2. Analyze Faces (InsightFace)
3. Create Embeddings (CLIP)
4. Update DBs
"""
logger.info(f"Starting AI analysis for {photo_id}")
try:
# Load Images
pil_image = Image.open(file_path).convert("RGB")
cv_image = cv2.imread(file_path) # InsightFace needs OpenCV format
# A. Captioning (BLIP)
inputs = ai_models["blip_processor"](pil_image, return_tensors="pt").to(device)
out = ai_models["blip_model"].generate(**inputs)
caption = ai_models["blip_processor"].decode(out[0], skip_special_tokens=True)
# B. Smart Feature: Face Analysis (InsightFace)
faces = ai_models["face"].get(cv_image)
face_data = []
tags = ["ai-generated"]
if len(faces) > 0:
avg_age = np.mean([face.age for face in faces])
gender_counts = {"M": 0, "F": 0}
for face in faces:
gender = "M" if face.sex == 1 else "F"
gender_counts[gender] += 1
face_data.append({
"age": int(face.age),
"gender": gender,
"confidence": float(face.det_score)
})
# Smart Tagging based on Analysis
tags.append("person")
tags.append(f"{len(faces)} people")
if gender_counts["M"] > gender_counts["F"]: tags.append("mostly_male")
if gender_counts["F"] > gender_counts["M"]: tags.append("mostly_female")
if avg_age < 18: tags.append("youth")
elif avg_age > 60: tags.append("senior")
else: tags.append("adult")
else:
tags.append("scenery") # Fallback tag
face_data = {"message": "No faces detected"}
# Combine caption words into tags (Simple approach)
tags.extend([word for word in caption.split() if len(word) > 4])
tags = list(set(tags)) # unique
# C. Embedding (CLIP)
# We embed the IMAGE itself for semantic search
embedding = ai_models["clip"].encode(pil_image).tolist()
# D. Save Results
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE photos
SET caption = ?, tags = ?, smart_analysis = ?, status = 'completed'
WHERE id = ?
""", (caption, json.dumps(tags), json.dumps(face_data), photo_id))
conn.commit()
conn.close()
# Save to ChromaDB
collection.add(
ids=[photo_id],
embeddings=[embedding],
metadatas=[{"caption": caption}]
)
logger.info(f"AI processing completed for {photo_id}")
except Exception as e:
logger.error(f"Error processing {photo_id}: {e}")
conn = sqlite3.connect(DB_PATH)
conn.execute("UPDATE photos SET status = 'failed' WHERE id = ?", (photo_id,))
conn.commit()
conn.close()
# --- API ENDPOINTS ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""Serve the UI"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/upload", response_model=PhotoResponse)
async def upload_photo(file: UploadFile = File(...), background_tasks: BackgroundTasks = None):
"""
1. Validate file
2. Save to disk
3. create DB record
4. Trigger Async AI Task
"""
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File must be an image")
file_id = str(uuid.uuid4())
ext = file.filename.split(".")[-1]
filename = f"{file_id}.{ext}"
file_path = os.path.join(UPLOAD_DIR, filename)
# Save file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Initial DB Record
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO photos (id, filename, filepath, upload_date, status)
VALUES (?, ?, ?, ?, 'processing')
""", (file_id, file.filename, file_path, datetime.now().isoformat()))
conn.commit()
conn.close()
# Trigger AI
background_tasks.add_task(process_image_task, file_id, file_path)
return {
"id": file_id,
"filename": file.filename,
"url": f"/static/uploads/{filename}",
"upload_date": datetime.now().isoformat()
}
@app.get("/photo/{photo_id}", response_model=PhotoResponse)
async def get_photo(photo_id: str):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
row = cursor.execute("SELECT * FROM photos WHERE id = ?", (photo_id,)).fetchone()
conn.close()
if not row:
raise HTTPException(status_code=404, detail="Photo not found")
return {
"id": row["id"],
"filename": row["filename"],
"url": f"/{row['filepath']}",
"caption": row["caption"],
"tags": json.loads(row["tags"]) if row["tags"] else [],
"smart_features": json.loads(row["smart_analysis"]) if row["smart_analysis"] else None,
"upload_date": row["upload_date"]
}
@app.get("/search")
async def search_photos(q: str):
"""
Semantic Search:
1. Embed query text using CLIP.
2. Search ChromaDB for nearest image vectors.
3. Retrieve metadata from SQLite.
"""
# Embed query text
query_vec = ai_models["clip"].encode(q).tolist()
# Query Vector DB
results = collection.query(
query_embeddings=[query_vec],
n_results=5
)
ids = results["ids"][0]
if not ids:
return []
# Fetch details from SQLite
placeholders = ",".join("?" * len(ids))
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# We use a trick to preserve order or just fetch all and map
rows = cursor.execute(f"SELECT * FROM photos WHERE id IN ({placeholders})", ids).fetchall()
conn.close()
# Format response
response_data = []
for row in rows:
response_data.append({
"id": row["id"],
"url": f"/{row['filepath']}",
"caption": row["caption"],
"tags": json.loads(row["tags"]) if row["tags"] else [],
"smart_features": json.loads(row["smart_analysis"]) if row["smart_analysis"] else None,
})
return response_data
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)