Spaces:
Sleeping
Sleeping
| import sys | |
| sys.path.append('../') | |
| import os | |
| import base64 | |
| import json | |
| import cv2 | |
| import numpy as np | |
| import gradio as gr | |
| from time import gmtime, strftime | |
| from pydantic import BaseModel | |
| from fastapi import FastAPI, File, UploadFile | |
| from fastapi.responses import JSONResponse | |
| from typing import Dict | |
| from engine.header import * | |
| file_path = os.path.abspath(__file__) | |
| dir_path = os.path.dirname(file_path) | |
| root_path = os.path.dirname(dir_path) | |
| MATCH_THRESHOLD = 0.67 | |
| version = get_version().decode('utf-8') | |
| print_info('\t <Recognito Face Recognition> \t version {}'.format(version)) | |
| device_id = get_deviceid().decode('utf-8') | |
| print_info('\t <Hardware ID> \t\t {}'.format(device_id)) | |
| def activate_sdk(): | |
| online_key = os.environ.get("FR_LICENSE_KEY") | |
| offline_key_path = os.path.join(root_path, "license.txt") | |
| dict_path = os.path.join(root_path, "engine/bin") | |
| ret = -1 | |
| if online_key is None: | |
| print_warning("Recognition online license key not found!") | |
| else: | |
| ret = init_sdk(dict_path.encode('utf-8'), online_key.encode('utf-8')) | |
| if ret == 0: | |
| print_log("Successfully online init SDK!") | |
| else: | |
| print_error(f"Failed to online init SDK, Error code {ret}\n Trying offline init SDK..."); | |
| if os.path.exists(offline_key_path) is False: | |
| print_warning("Recognition offline license key file not found!") | |
| print_error(f"Falied to offline init SDK, Error code {ret}") | |
| return ret | |
| else: | |
| ret = init_sdk_offline(dict_path.encode('utf-8'), offline_key_path.encode('utf-8')) | |
| if ret == 0: | |
| print_log("Successfully offline init SDK!") | |
| else: | |
| print_error(f"Falied to offline init SDK, Error code {ret}") | |
| return ret | |
| return ret | |
| def generate_response(result, similarity=None, face_bboxes=None, face_features=None): | |
| status = "ok" | |
| data = { | |
| "status": status, | |
| "data": {} | |
| } | |
| data["data"]["result"] = result | |
| if similarity is not None: | |
| data["data"]["similarity"] = float(similarity) | |
| images = [{}, {}] | |
| if face_bboxes is not None: | |
| for i, bbox in enumerate(face_bboxes): | |
| box = { | |
| "x" : int(bbox[0]), | |
| "y" : int(bbox[1]), | |
| "width" : int(bbox[2] - bbox[0] + 1), | |
| "height" : int(bbox[3] - bbox[1] + 1) | |
| } | |
| images[i]["detection"] = box | |
| if face_features is not None: | |
| for i, feat in enumerate(face_features): | |
| json_string = json.dumps(feat.tolist(), indent=0).replace('\n','') | |
| images[i]["feature"] = json_string | |
| data["data"]["image1"] = images[0] | |
| data["data"]["image2"] = images[1] | |
| return JSONResponse(content=data, status_code=200) | |
| app = FastAPI() | |
| def read_root(): | |
| return {"status": "API is running"} | |
| def read_image(file: UploadFile) -> np.ndarray: | |
| # Read the image file and convert it to OpenCV format | |
| image_bytes = file.file.read() | |
| image_np = np.frombuffer(image_bytes, np.uint8) | |
| image = cv2.imdecode(image_np, cv2.IMREAD_COLOR) | |
| return image | |
| async def compare_face_api( | |
| image1: UploadFile = File(...), | |
| image2: UploadFile = File(...) | |
| ) -> JSONResponse: | |
| try: | |
| image_mat1 = read_image(image1) | |
| image_mat2 = read_image(image2) | |
| except Exception as e: | |
| response = generate_response("Failed to open image") | |
| return response | |
| result, score, face_bboxes, face_features = compare_face(image_mat1, image_mat2, MATCH_THRESHOLD) | |
| response = generate_response(result, score, face_bboxes, face_features) | |
| return response | |
| def decode_base64_image(base64_string: str) -> np.ndarray: | |
| try: | |
| image_data = base64.b64decode(base64_string) | |
| image_np = np.frombuffer(image_data, np.uint8) | |
| image = cv2.imdecode(image_np, cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise ValueError("Decoded image is None") | |
| return image | |
| except Exception as e: | |
| raise ValueError(f"Failed to decode base64 image: {str(e)}") | |
| class CompareFaceRequest(BaseModel): | |
| image1: str | |
| image2: str | |
| async def compare_face_base64_api(request: CompareFaceRequest) -> JSONResponse: | |
| try: | |
| image_mat1 = decode_base64_image(request.image1) | |
| image_mat2 = decode_base64_image(request.image2) | |
| except: | |
| response = generate_response("Failed to open image") | |
| return response | |
| result, score, face_bboxes, face_features = compare_face(image_mat1, image_mat2, MATCH_THRESHOLD) | |
| response = generate_response(result, score, face_bboxes, face_features) | |
| return response | |
| if __name__ == '__main__': | |
| ret = activate_sdk() | |
| if ret != 0: | |
| exit(-1) | |
| dummy_interface = gr.Interface( | |
| fn=lambda x: "API ready.", | |
| inputs=gr.Textbox(label="Info"), | |
| outputs=gr.Textbox(label="Response"), | |
| allow_flagging="never" # 🚫 disables writing to `flagged/` | |
| ) | |
| gr_app = gr.mount_gradio_app(app, dummy_interface, path="/gradio") | |
| import uvicorn | |
| uvicorn.run(gr_app, host="0.0.0.0", port=7860) | |