Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, HTTPException,Depends,File, UploadFile | |
| from fastapi.responses import JSONResponse | |
| from config.database import admin_collection, user_collection,notification_collection,pothole_image_collection | |
| from model.pothole_model import load_image_model | |
| from utils.auth import create_access_token, hash_password, verify_password, verify_token | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from schema.model import Admin, PoholeInfo, PotInfoById, PotholeFilters, PotholeModel, UpdatePotholeInfo, User, UserLogin, VerifyOtp | |
| from utils.email_validator import send_activation_email | |
| import requests | |
| import uuid | |
| from bson import ObjectId | |
| import random | |
| security_scheme = HTTPBearer() | |
| router = APIRouter() | |
| activation_tokens={} | |
| import firebase_admin | |
| from firebase_admin import credentials, storage | |
| # Initialize Firebase Admin SDK | |
| cred = credentials.Certificate("pothole-detection-c31a0-firebase-adminsdk-xirxs-4ad5f554ca.json") | |
| firebase_admin.initialize_app(cred, { | |
| 'storageBucket': 'pothole-detection-c31a0.appspot.com' | |
| }) | |
| def generate_otp(): | |
| return str(random.randint(100000, 999999)) | |
| #----------------------------------------Admin Collections---------------------------------------- | |
| # Admin Registration Api | |
| async def register_admin(admin: Admin): | |
| hashed_password = hash_password(admin.password) | |
| admin_dict = admin.dict() | |
| admin_dict['password'] = hashed_password | |
| # activation_otp = generate_otp() | |
| # admin_dict['otp'] = activation_otp | |
| result = admin_collection.insert_one(admin_dict) | |
| return {"message": "Admin created successfully", "user_id": str(result.inserted_id)} | |
| #Api to get all admins | |
| # @router.get("/api/admins/getAllAdmin", tags=["admin"]) | |
| # async def get_all_admins(): | |
| # admins = admin_collection.find({}) | |
| # admin_list = [] | |
| # for admin in admins: | |
| # admin["_id"] = str(admin["_id"]) | |
| # admin_list.append(admin) | |
| # return admin_list | |
| #-------------------------------------------User Collections------------------------------------------- | |
| # User Login API | |
| async def user_login(userLogin: UserLogin): | |
| # Check if the provided username and password are for admin | |
| if userLogin.userName == "admin001" and userLogin.password == "Admin@123": | |
| collection = admin_collection | |
| else: | |
| collection = user_collection | |
| user = collection.find_one({"userName": userLogin.userName}) | |
| if not user or not verify_password(userLogin.password, user.get('password', '')): | |
| raise HTTPException(status_code=401, detail="Incorrect username or password") | |
| # Check if the user is verified | |
| if not user.get('isVerified'): | |
| raise HTTPException(status_code=403, detail="Account is not verified") | |
| # Extract additional user data | |
| user_data = { | |
| "email": user.get("email"), | |
| "username": user.get("userName"), | |
| "id": str(user.get("_id")), | |
| "role":user.get("role") | |
| } | |
| token = create_access_token({"sub": userLogin.userName}) | |
| response_data = {"access_token": token, "token_type": "bearer", "user_data": user_data} | |
| return response_data | |
| # User Registration Api | |
| async def create_user(user: User): | |
| hashed_password = hash_password(user.password) | |
| user_dict = user.dict() | |
| user_dict['password'] = hashed_password | |
| # activation_otp = generate_otp() | |
| # user_dict['otp'] = activation_otp | |
| # Send activation email | |
| # send_activation_email(user.email, activation_otp) | |
| # result = user_collection.insert_one(user_dict) | |
| # return {"message": "User created successfully. Activation email sent.",} | |
| result = user_collection.insert_one(user_dict) | |
| return {"message": "User created successfully", "userId": str(result.inserted_id)} | |
| # Otp Verification Api | |
| async def verify_otp(verifyOtp:VerifyOtp): | |
| # Check if the user exists with the given email | |
| user = user_collection.find_one({"email": verifyOtp.email}) | |
| if user is None: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| # Check if the provided OTP matches the stored OTP | |
| if user.get('otp') != verifyOtp.otp: | |
| raise HTTPException(status_code=400, detail="Invalid OTP") | |
| # Update the user's isVerified field to True | |
| user_collection.update_one({"email": verifyOtp.email}, {"$set": {"isVerified": True}}) | |
| return {"message": "OTP verified successfully and user is verified"} | |
| # Api to show the registered users list | |
| async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| verify_token(token.credentials) | |
| users = user_collection.find({}) | |
| user_list = [] | |
| for user in users: | |
| user["_id"] = str(user["_id"]) # Convert ObjectId to string | |
| user_list.append(user) | |
| return user_list | |
| #-------------------------------------------Pothole Collections------------------------------------------- | |
| # Api to submit the information about pothole | |
| async def upload_file(potholeInfo:PoholeInfo,token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| verify_token(token.credentials) | |
| pothole_info = potholeInfo.dict() | |
| notification_collection.insert_one(pothole_info) | |
| return {"message": "information submitted sucesssuccessfully"} | |
| # Api to update the information about pothole | |
| async def update_pothole_information(update_data: UpdatePotholeInfo, token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| verify_token(token.credentials) | |
| try: | |
| # Update the pothole information in the collection | |
| result = notification_collection.update_one({"_id": ObjectId(update_data.infoID)}, {"$set": {"status": update_data.status, "assignee": update_data.assignee}}) | |
| if result.modified_count == 1: | |
| return {"message": "Pothole information updated successfully"} | |
| else: | |
| return {"message": "No changes were made. Pothole information remains unchanged."} | |
| except Exception as e: | |
| return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500) | |
| # @router.get("/api/information/getAllFiles", tags=["pothole"], dependencies=[Depends(security_scheme)]) | |
| # async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| # verify_token(token.credentials) | |
| # files = pothole_image_collection.find({}) | |
| # file_list = [] | |
| # for file in files: | |
| # file["_id"] = str(file["_id"]) # Convert ObjectId to string | |
| # file_list.append(file) | |
| # return file_list | |
| # Function to upload file to Firebase Storage | |
| import os | |
| async def upload_file_to_firebase(file: UploadFile, token: HTTPAuthorizationCredentials): | |
| verify_token(token.credentials) | |
| try: | |
| # Generate a unique ID for the file | |
| fileID = uuid.uuid4().hex | |
| # Extract the filename from the path provided by Flutter | |
| filename = os.path.basename(file.filename) | |
| # Get reference to Firebase Storage bucket | |
| bucket = storage.bucket() | |
| # Create a blob object with the filename | |
| blob = bucket.blob(fileID + "-" + filename) | |
| # Upload the file | |
| blob.upload_from_file(file.file) | |
| blob.make_public() | |
| # Get the public URL of the uploaded file | |
| url = blob.public_url | |
| # Insert the document into the collection with Firebase URL | |
| pothole_image_collection.insert_one({"fileID": fileID, "url": url, "filename": filename}) | |
| return JSONResponse(content={"message": "file uploaded successfully", "file_id": fileID}) | |
| except Exception as e: | |
| return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500) | |
| async def upload_file_api(file: UploadFile = File(...), token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| return await upload_file_to_firebase(file, token) | |
| # Api to get all information about pothole | |
| async def get_all_info_with_filters(filters: PotholeFilters, token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| verify_token(token.credentials) | |
| # Construct a filter query based on provided parameters | |
| filter_query = {} | |
| if filters.userID == "67bca4ad53b08142c70eb0ee": | |
| # If userID is the specific ID, do not filter by userID | |
| pass | |
| elif filters.userID: | |
| filter_query["userId"] = filters.userID | |
| # Always apply the status filter | |
| if filters.status: | |
| filter_query["status"] = filters.status | |
| # Apply the filter query to find relevant pothole information | |
| pothole_informations = notification_collection.find(filter_query) | |
| info_list = [] | |
| for info in pothole_informations: | |
| info["_id"] = str(info["_id"]) | |
| file_id = info.get('fileID') | |
| if file_id: | |
| image_data = pothole_image_collection.find_one({"fileID": file_id}) | |
| if image_data: | |
| info['image'] = image_data.get('url') | |
| else: | |
| print("No image data found for file ID:", file_id) # Debugging print | |
| info_list.append(info) | |
| return info_list | |
| # Api to get information about pothole by unique id | |
| async def get_data_by_id(potHoleInfoById:PotInfoById,token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| verify_token(token.credentials) | |
| try: | |
| object_id = ObjectId(potHoleInfoById.infoID) | |
| data = notification_collection.find_one({"_id": object_id}) | |
| if data: | |
| data["_id"] = str(data["_id"]) | |
| image_data = pothole_image_collection.find_one({"fileID": data["fileID"]}) | |
| if image_data: | |
| data["image"] = image_data["url"] # Append image to the response | |
| return data | |
| else: | |
| raise HTTPException(status_code=404, detail="Data not found") | |
| except: | |
| raise HTTPException(status_code=400, detail="Invalid ID format") | |
| # Api to to verify the given object has pothole or not | |
| async def verify_pothole(potholeModel: PotholeModel, token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
| verify_token(token.credentials) | |
| try: | |
| # Get image bytes from URL | |
| response = requests.get(potholeModel.image) | |
| image_bytes = response.content | |
| # Pass image bytes to your model function | |
| results = load_image_model(image_bytes) | |
| return JSONResponse(content={"response": results[2:]}) | |
| except Exception as e: | |
| return JSONResponse(content={"response": f"{e}"}) | |