File size: 4,327 Bytes
523fd45
 
 
 
 
 
 
9d30c81
523fd45
 
9d30c81
523fd45
 
9d30c81
523fd45
 
 
 
 
 
520fa8d
523fd45
 
 
 
e17d070
9d30c81
 
265cf1f
9d30c81
 
 
265cf1f
9d30c81
265cf1f
 
9d30c81
 
 
265cf1f
9d30c81
 
 
 
87d3a01
265cf1f
523fd45
 
9d30c81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
523fd45
 
 
 
9e0c501
523fd45
 
 
265cf1f
523fd45
9d30c81
 
265cf1f
9d30c81
 
 
 
523fd45
9d30c81
 
523fd45
9d30c81
 
 
 
 
523fd45
 
9d30c81
 
265cf1f
9e0c501
9d30c81
 
 
523fd45
9d30c81
 
523fd45
9d30c81
 
523fd45
265cf1f
9d30c81
265cf1f
9d30c81
523fd45
9d30c81
 
 
 
523fd45
9d30c81
523fd45
265cf1f
 
 
 
 
 
 
 
 
 
 
 
 
 
dfa9d9d
59cf014
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sys

import os
import base64
import json
import uuid
import cv2
import io
import numpy as np
import gradio as gr
from PIL import Image, ExifTags, UnidentifiedImageError
from time import gmtime, strftime
from pydantic import BaseModel
from fastapi import FastAPI, File, UploadFile, HTTPException, Request
from fastapi.responses import JSONResponse
from typing import Dict

from engine.header import *

file_path = os.path.abspath(__file__)
root_path = os.path.dirname(file_path)

device_id = get_deviceid().decode('utf-8')
print_info('\t <Hardware ID> \t\t {}'.format(device_id))

def activate_sdk():
    id_live_key = os.environ.get("LICENSE_KEY")
    id_live_dict_path = os.path.join(root_path, "engine/model")
    ret = -1
    if id_live_key is None:
        print_warning("ID LIVE license key not found!")
        return ret
    else:
        ret = set_activation(id_live_key.encode('utf-8'))

    if ret == 0:
        ret = init_sdk(id_live_dict_path.encode('utf-8'))
        if ret == 0:
            print_log("Successfully init ID LIVE SDK!")
        else:
            print_log("Failed to init ID LIVE SDK!")
    else:
        print_error(f"Falied to activate ID LIVE SDK, Error code {ret}")

    sys.stdout.flush()
    return ret


def apply_exif_rotation(image):
    try:
        exif = image._getexif()
        if exif is not None:
            for orientation in ExifTags.TAGS.keys():
                if ExifTags.TAGS[orientation] == 'Orientation':
                    break

            # Get the orientation value
            orientation = exif.get(orientation, None)

            # Apply the appropriate rotation based on the orientation
            if orientation == 3:
                image = image.rotate(180, expand=True)
            elif orientation == 6:
                image = image.rotate(270, expand=True)
            elif orientation == 8:
                image = image.rotate(90, expand=True)

    except AttributeError:
        print("No EXIF data found")

    return image

app = FastAPI()

class ImageBase64Request(BaseModel):
    base64: str

@app.get("/")
def read_root():
    return {"status": "API is running"}

@app.post("/process_image")
async def process_image(image: UploadFile = File(...)):
    try:
        contents = await image.read()
        image_pil = apply_exif_rotation(Image.open(io.BytesIO(contents))).convert('RGB')
    except UnidentifiedImageError:
        return JSONResponse(status_code=400, content={"resultCode": "Error", "result": "Failed to open file"})

    image_np = np.asarray(image_pil)
    result = processImage(image_np, image_np.shape[1], image_np.shape[0])

    if result is None:
        return JSONResponse(status_code=400, content={"resultCode": "Error", "result": "Failed to process image"})
    
    result_dict = json.loads(result.decode('utf-8'))
    return JSONResponse(status_code=200, content={"resultCode": "Ok", "result": result_dict})


@app.post("/process_image_base64")
async def process_image_base64(request: ImageBase64Request):
    try:
        image_data = base64.b64decode(request.base64)
        image = apply_exif_rotation(Image.open(io.BytesIO(image_data))).convert("RGB")
    except (base64.binascii.Error, UnidentifiedImageError, ValueError) as e:
        raise HTTPException(status_code=400, detail=f"Failed to parse base64: {str(e)}")

    image_np = np.asarray(image)
    result = processImage(image_np, image_np.shape[1], image_np.shape[0])

    if result is None:
        raise HTTPException(status_code=400, detail="Failed to process image")

    try:
        result_dict = json.loads(result.decode("utf-8"))
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to decode result: {str(e)}")

    return JSONResponse(
        content={"resultCode": "Ok", "result": result_dict},
        status_code=200
    )

    
if __name__ == '__main__':
    ret = activate_sdk()
    if ret != 0:
        exit(-1)

    dummy_interface = gr.Interface(
        fn=lambda x: "API ready.",
        inputs=gr.Textbox(label="Info"),
        outputs=gr.Textbox(label="Response"),
        flagging_mode="auto"  # 🚫 disables writing to `flagged/`
    )
    
    gr_app = gr.mount_gradio_app(app, dummy_interface, path="/gradio")
    
    import uvicorn
    uvicorn.run(gr_app, host="0.0.0.0", port=7860)