Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, File, UploadFile, Response, HTTPException | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from PIL import Image | |
| import io | |
| import sqlite3 | |
| from pydantic import BaseModel, EmailStr | |
| from pathlib import Path | |
| from model import YOLOModel | |
| import shutil | |
| yolo = YOLOModel() | |
| UPLOAD_FOLDER = Path("./uploads") | |
| UPLOAD_FOLDER.mkdir(exist_ok=True) | |
| app = FastAPI() | |
| cropped_images_dir = "cropped_images" | |
| # Initialize SQLite database | |
| def init_db(): | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| c.execute(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| firstName TEXT NOT NULL, | |
| lastName TEXT NOT NULL, | |
| country TEXT, | |
| number TEXT, -- Phone number stored as TEXT to allow various formats | |
| email TEXT UNIQUE NOT NULL, -- Email should be unique and non-null | |
| password TEXT NOT NULL -- Password will be stored as a string (hashed ideally) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| init_db() | |
| class UserSignup(BaseModel): | |
| firstName: str | |
| lastName: str | |
| country: str | |
| number: str | |
| email: EmailStr | |
| password: str | |
| class UserLogin(BaseModel): | |
| email: str | |
| password: str | |
| async def signup(user_data: UserSignup): | |
| try: | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| # Check if user already exists | |
| c.execute("SELECT * FROM users WHERE email = ?", (user_data.email,)) | |
| if c.fetchone(): | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| # Insert new user | |
| c.execute(""" | |
| INSERT INTO users (firstName, lastName, country, number, email, password) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, (user_data.firstName, user_data.lastName, user_data.country, user_data.number, user_data.email, user_data.password)) | |
| conn.commit() | |
| conn.close() | |
| return {"message": "User registered successfully", "email": user_data.email} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def login(user_data: UserLogin): | |
| try: | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| # Find user | |
| c.execute("SELECT * FROM users WHERE email = ? AND password = ?", | |
| (user_data.email, user_data.password)) | |
| user = c.fetchone() | |
| conn.close() | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| return { | |
| "message": "Login successful", | |
| "user": { | |
| "firstName": user[1], | |
| "lastName": user[2], | |
| "email": user[3] | |
| } | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_users(): | |
| try: | |
| conn = sqlite3.connect('users.db') | |
| conn.row_factory = sqlite3.Row # This makes rows behave like dictionaries | |
| c = conn.cursor() | |
| c.execute("SELECT * FROM users") | |
| rows = c.fetchall() | |
| conn.close() | |
| # Convert rows to a list of dictionaries | |
| users = [dict(row) for row in rows] | |
| return users | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def upload_image(image: UploadFile = File(...)): | |
| # print(f'\n\t\tUPLOADED!!!!') | |
| try: | |
| file_path = UPLOAD_FOLDER / image.filename | |
| with file_path.open("wb") as buffer: | |
| shutil.copyfileobj(image.file, buffer) | |
| # print(f'Starting to pass into model, {file_path}') | |
| # Perform YOLO inference | |
| predictions = yolo.predict(str(file_path)) | |
| print(f'\n\n\n{predictions}\n\n\ \n\t\t\t\tare predictions') | |
| # Clean up uploaded file | |
| file_path.unlink() # Remove file after processing | |
| return JSONResponse(content={"items": predictions}) | |
| except Exception as e: | |
| return JSONResponse(content={"error": str(e)}, status_code=500) | |
| def cleanup_images(directory: str): | |
| """Remove all images in the directory.""" | |
| for file in Path(directory).glob("*"): | |
| file.unlink() | |
| # @app.post("/upload") | |
| # async def upload_image(image: UploadFile = File(...)): | |
| # # print(f'\n\t\tUPLOADED!!!!') | |
| # try: | |
| # file_path = UPLOAD_FOLDER / image.filename | |
| # with file_path.open("wb") as buffer: | |
| # shutil.copyfileobj(image.file, buffer) | |
| # # print(f'Starting to pass into model, {file_path}') | |
| # # Perform YOLO inference | |
| # predictions = yolo.predict(str(file_path)) | |
| # print(f'\n\n\n{predictions}\n\n\ \n\t\t\t\tare predictions') | |
| # # Clean up uploaded file | |
| # file_path.unlink() # Remove file after processing | |
| # return JSONResponse(content={"items": predictions}) | |
| # except Exception as e: | |
| # return JSONResponse(content={"error": str(e)}, status_code=500) | |
| # code to accept the localhost to get images from | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["http://192.168.56.1:3000", "http://192.168.56.1:3001", "https://recognizethis.netlify.app/", "*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) | |