| | from fastapi import FastAPI |
| | from pydantic import BaseModel, HttpUrl |
| | import requests |
| | import face_recognition |
| | import pickle |
| | import cv2 |
| | import pyrebase |
| | import os |
| |
|
| |
|
| | |
| | config = { |
| | "apiKey": "AIzaSyClnRJAnrJgAgkYjuYnlvu-CJ6Cxyklebo", |
| | "databaseURL": "https://console.firebase.google.com/project/socioverse-2025/database/socioverse-2025-default-rtdb/data/~2F", |
| | "authDomain": "socioverse-2025.firebaseapp.com", |
| | "projectId": "socioverse-2025", |
| | "storageBucket": "socioverse-2025.appspot.com", |
| | "messagingSenderId": "689574504641", |
| | "appId": "1:689574504641:web:a22f6a2fa343e4221acc40", |
| | "serviceAccount":"socioverse-2025-firebase-adminsdk-gcc6m-6bfb53e6d9.json" |
| | } |
| |
|
| | firebase = pyrebase.initialize_app(config) |
| | storage = firebase.storage() |
| |
|
| | |
| | storage_folder = "Faces/" |
| |
|
| |
|
| | app = FastAPI() |
| |
|
| | class ImgSave(BaseModel): |
| | image_url: HttpUrl |
| | user_name: str |
| |
|
| | class ImgInput(BaseModel): |
| | image_url: HttpUrl |
| |
|
| | class ImgOutput(BaseModel): |
| | user_id: list |
| |
|
| | class UserSaved(BaseModel): |
| | status: str |
| |
|
| | class UserDelete(BaseModel): |
| | label: str |
| |
|
| | class Message(BaseModel): |
| | message: str |
| |
|
| | class CleanPickle(BaseModel): |
| | confirm: bool |
| |
|
| |
|
| | def recognize_face(image_url: HttpUrl) -> ImgOutput: |
| |
|
| | storage.child().download("Faces/pkl/face_encodings.pkl","face_encodings.pkl") |
| | |
| | response = requests.get(image_url) |
| | with open("examp.jpg", 'wb') as file: |
| | file.write(response.content) |
| |
|
| | |
| | with open("face_encodings.pkl", "rb") as file: |
| | data = pickle.load(file) |
| | face_encodings = data["encodings"] |
| | labels = data["labels"] |
| |
|
| |
|
| | |
| | new_image = cv2.imread("examp.jpg") |
| |
|
| | |
| | new_face_encodings = face_recognition.face_encodings(new_image) |
| |
|
| | if len(new_face_encodings) == 0: |
| | print("No faces found in the new image.") |
| | return ImgOutput(label=["unable to detect"]) |
| | else: |
| | output_labels = [] |
| |
|
| | for new_face_encoding in new_face_encodings: |
| | |
| | results = face_recognition.compare_faces(face_encodings, new_face_encoding) |
| |
|
| | for i, result in enumerate(results): |
| | if result: |
| | output_labels.append(labels[i]) |
| |
|
| | os.remove("examp.jpg") |
| |
|
| | if output_labels: |
| | return ImgOutput(user_id=output_labels) |
| | else: |
| | out = ["unable to detect"] |
| | return ImgOutput(user_id=out) |
| |
|
| |
|
| |
|
| | def add_face(image_url: HttpUrl,user_name : str): |
| | |
| | response = requests.get(image_url) |
| | with open("examp.jpg", 'wb') as file: |
| | file.write(response.content) |
| |
|
| | |
| | with open("face_encodings.pkl", "rb") as file: |
| | data = pickle.load(file) |
| | face_encodings = data["encodings"] |
| | labels = data["labels"] |
| |
|
| | |
| | new_image = cv2.imread("examp.jpg") |
| |
|
| | |
| | rgb_img = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB) |
| |
|
| | encode = face_recognition.face_encodings(new_image)[0] |
| | if len(encode) == 0: |
| | return "No face found" |
| | face_encodings.append(encode) |
| | labels.append(user_name) |
| |
|
| | |
| | os.remove("examp.jpg") |
| |
|
| | |
| | data = {"encodings": face_encodings, "labels": labels} |
| | with open("face_encodings.pkl", "wb") as file: |
| | pickle.dump(data, file) |
| |
|
| | |
| | pkl_blob = storage.child(f"{storage_folder}pkl/face_encodings.pkl") |
| | pkl_blob.put("face_encodings.pkl") |
| | return "User Saved" |
| |
|
| |
|
| | |
| | def delete_user(user_name: str): |
| | |
| | with open("face_encodings.pkl", "rb") as file: |
| | data = pickle.load(file) |
| | face_encodings = data["encodings"] |
| | labels = data["labels"] |
| |
|
| | if user_name in labels: |
| | index = labels.index(user_name) |
| | del labels[index] |
| | del face_encodings[index] |
| |
|
| | |
| | data = {"encodings": face_encodings, "labels": labels} |
| | with open("face_encodings.pkl", "wb") as file: |
| | pickle.dump(data, file) |
| |
|
| | |
| | pkl_blob = storage.child(f"{storage_folder}pkl/face_encodings.pkl") |
| | pkl_blob.put("face_encodings.pkl") |
| |
|
| | return {"message": f"User '{user_name}' deleted successfully."} |
| | else: |
| | return {"message": f"User '{user_name}' not found."} |
| |
|
| |
|
| | def clean_pickle(confirm: bool): |
| | if confirm: |
| | |
| | if os.path.exists("face_encodings.pkl"): |
| | os.remove("face_encodings.pkl") |
| |
|
| | |
| | with open("face_encodings.pkl", "wb") as file: |
| | data = {"encodings": [], "labels": []} |
| | pickle.dump(data, file) |
| |
|
| | |
| | pkl_blob = storage.child(f"{storage_folder}pkl/face_encodings.pkl") |
| | pkl_blob.put("face_encodings.pkl") |
| |
|
| | return {"message": "Pickle file cleaned and uploaded successfully."} |
| | else: |
| | return {"message": "Confirmation required to clean the pickle file."} |
| |
|
| |
|
| | @app.post('/') |
| | async def scoring_endpoint(item:ImgInput): |
| | result = recognize_face(item.image_url) |
| | return result |
| |
|
| |
|
| | @app.post('/user/') |
| | async def scoring_endpoint(item:ImgSave): |
| | results = add_face(item.image_url, item.user_name) |
| | return Message(message=results) |
| |
|
| |
|
| | @app.delete('/user_delete/') |
| | async def scoring_endpoint(item: UserDelete): |
| | result = delete_user(item.label) |
| | return Message(message=result["message"]) |
| |
|
| |
|
| | @app.delete('/clean/') |
| | async def clean_pickle_endpoint(item: CleanPickle): |
| | result = clean_pickle(item.confirm) |
| | return result |