File size: 3,647 Bytes
3e805ab
 
 
 
 
 
 
eb23bfb
 
3e805ab
 
 
 
 
 
 
eb23bfb
3e805ab
 
 
 
 
 
 
eb23bfb
3e805ab
 
 
 
 
 
 
 
 
 
eb23bfb
 
 
 
 
3e805ab
 
 
 
 
 
 
 
 
eb23bfb
 
3e805ab
 
 
 
 
eb23bfb
3e805ab
eb23bfb
 
 
3e805ab
 
 
 
eb23bfb
3e805ab
 
 
 
 
 
eb23bfb
 
 
3e805ab
 
 
eb23bfb
3e805ab
eb23bfb
 
 
 
 
3e805ab
 
eb23bfb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e805ab
eb23bfb
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import List
import os
import shutil
import uuid
import re
import inflect 
import cloudinary.api
from src.models import AIModelManager
from src.cloud_db import CloudDB

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

print("Loading AI Models and Cloud DB...")
ai = AIModelManager()
db = CloudDB() 
p = inflect.engine() 
print("Ready!")

os.makedirs("temp_uploads", exist_ok=True)

def standardize_category_name(name: str) -> str:
    clean_name = name.strip().lower()
    clean_name = re.sub(r'\s+', '_', clean_name)
    clean_name = re.sub(r'[^\w\s]', '', clean_name)
    singular_name = p.singular_noun(clean_name)
    return singular_name if singular_name else clean_name

def sanitize_filename(filename: str) -> str:
    clean_name = re.sub(r'\s+', '_', filename)
    clean_name = re.sub(r'[^\w\.\-]', '', clean_name)
    return clean_name

@app.post("/api/upload")
async def upload_new_images(files: List[UploadFile] = File(...), folder_name: str = Form(...)):
    uploaded_urls = []
    standardized_folder = standardize_category_name(folder_name)
    
    try:
        for file in files:
            safe_filename = sanitize_filename(file.filename)
            temp_path = f"temp_uploads/{safe_filename}"
            with open(temp_path, "wb") as buffer:
                shutil.copyfileobj(file.file, buffer)
                
            image_url = db.upload_image(temp_path, standardized_folder)
            
            vectors_to_save = ai.process_image(temp_path, is_query=False)
            
            for vec_dict in vectors_to_save:
                image_id = str(uuid.uuid4())
                db.add_vector(vec_dict, image_url, image_id)
            
            os.remove(temp_path)
            uploaded_urls.append(image_url)
            
        return {"message": "Success!", "urls": uploaded_urls}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/search")
async def search_database(file: UploadFile = File(...)):
    try:
        safe_filename = sanitize_filename(file.filename)
        temp_path = f"temp_uploads/query_{safe_filename}"
        
        with open(temp_path, "wb") as buffer:
            shutil.copyfileobj(file.file, buffer)
            
        vectors_to_search = ai.process_image(temp_path, is_query=True)
        
        all_results = []
        for vec_dict in vectors_to_search:
            results = db.search(vec_dict, top_k=10)
            all_results.extend(results)
            
        os.remove(temp_path)
        
        unique_results = {}
        for r in all_results:
            url = r["url"]
            if url not in unique_results or r["score"] > unique_results[url]["score"]:
                unique_results[url] = r
                
        final_results = sorted(unique_results.values(), key=lambda x: x["score"], reverse=True)
        
        return {"results": final_results[:10]}
    except Exception as e:
        print(f"Production Search Error: {str(e)}") 
        raise HTTPException(status_code=500, detail=str(e))
    
@app.get("/api/categories")
async def get_categories():
    try:
        result = cloudinary.api.root_folders()
        folders = [folder["name"] for folder in result.get("folders", [])]
        return {"categories": folders}
    except Exception as e:
        print(f"Error fetching categories from Cloudinary: {e}")
        return {"categories": []}