import sys import os import base64 import json import uuid import cv2 import io import numpy as np import gradio as gr from PIL import Image, ExifTags, UnidentifiedImageError from time import gmtime, strftime from pydantic import BaseModel from fastapi import FastAPI, File, UploadFile, HTTPException, Request from fastapi.responses import JSONResponse from typing import Dict from engine.header import * file_path = os.path.abspath(__file__) root_path = os.path.dirname(file_path) device_id = get_deviceid().decode('utf-8') print_info('\t \t\t {}'.format(device_id)) def activate_sdk(): id_live_key = os.environ.get("LICENSE_KEY") id_live_dict_path = os.path.join(root_path, "engine/model") ret = -1 if id_live_key is None: print_warning("ID LIVE license key not found!") return ret else: ret = set_activation(id_live_key.encode('utf-8')) if ret == 0: ret = init_sdk(id_live_dict_path.encode('utf-8')) if ret == 0: print_log("Successfully init ID LIVE SDK!") else: print_log("Failed to init ID LIVE SDK!") else: print_error(f"Falied to activate ID LIVE SDK, Error code {ret}") sys.stdout.flush() return ret def apply_exif_rotation(image): try: exif = image._getexif() if exif is not None: for orientation in ExifTags.TAGS.keys(): if ExifTags.TAGS[orientation] == 'Orientation': break # Get the orientation value orientation = exif.get(orientation, None) # Apply the appropriate rotation based on the orientation if orientation == 3: image = image.rotate(180, expand=True) elif orientation == 6: image = image.rotate(270, expand=True) elif orientation == 8: image = image.rotate(90, expand=True) except AttributeError: print("No EXIF data found") return image app = FastAPI() class ImageBase64Request(BaseModel): base64: str @app.get("/") def read_root(): return {"status": "API is running"} @app.post("/process_image") async def process_image(image: UploadFile = File(...)): try: contents = await image.read() image_pil = apply_exif_rotation(Image.open(io.BytesIO(contents))).convert('RGB') except UnidentifiedImageError: return JSONResponse(status_code=400, content={"resultCode": "Error", "result": "Failed to open file"}) image_np = np.asarray(image_pil) result = processImage(image_np, image_np.shape[1], image_np.shape[0]) if result is None: return JSONResponse(status_code=400, content={"resultCode": "Error", "result": "Failed to process image"}) result_dict = json.loads(result.decode('utf-8')) return JSONResponse(status_code=200, content={"resultCode": "Ok", "result": result_dict}) @app.post("/process_image_base64") async def process_image_base64(request: ImageBase64Request): try: image_data = base64.b64decode(request.base64) image = apply_exif_rotation(Image.open(io.BytesIO(image_data))).convert("RGB") except (base64.binascii.Error, UnidentifiedImageError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Failed to parse base64: {str(e)}") image_np = np.asarray(image) result = processImage(image_np, image_np.shape[1], image_np.shape[0]) if result is None: raise HTTPException(status_code=400, detail="Failed to process image") try: result_dict = json.loads(result.decode("utf-8")) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to decode result: {str(e)}") return JSONResponse( content={"resultCode": "Ok", "result": result_dict}, status_code=200 ) if __name__ == '__main__': ret = activate_sdk() if ret != 0: exit(-1) dummy_interface = gr.Interface( fn=lambda x: "API ready.", inputs=gr.Textbox(label="Info"), outputs=gr.Textbox(label="Response"), flagging_mode="auto" # 🚫 disables writing to `flagged/` ) gr_app = gr.mount_gradio_app(app, dummy_interface, path="/gradio") import uvicorn uvicorn.run(gr_app, host="0.0.0.0", port=7860)