hassan526 commited on
Commit
9d30c81
·
verified ·
1 Parent(s): 9541bb5

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +70 -142
app.py CHANGED
@@ -5,12 +5,13 @@ import base64
5
  import json
6
  import uuid
7
  import cv2
 
8
  import numpy as np
9
  import gradio as gr
10
- import shutil
11
  from time import gmtime, strftime
12
  from pydantic import BaseModel
13
- from fastapi import FastAPI, File, UploadFile
14
  from fastapi.responses import JSONResponse
15
  from typing import Dict
16
 
@@ -18,67 +19,57 @@ from engine.header import *
18
 
19
  file_path = os.path.abspath(__file__)
20
  root_path = os.path.dirname(file_path)
21
- dump_path = os.path.join(root_path, "dump2/")
22
 
23
  device_id = get_deviceid().decode('utf-8')
24
  print_info('\t <Hardware ID> \t\t {}'.format(device_id))
25
 
26
- def activate_sdk():
27
- online_key = os.environ.get("LICENSE_KEY")
28
- offline_key_path = os.path.join(root_path, "license.txt")
29
-
30
- dict_path = os.path.join(root_path, "engine/bin")
31
-
32
  ret = -1
33
- if online_key is None:
34
- print_warning("Online license key not found!")
 
35
  else:
36
- activate_ret = set_activation(online_key.encode('utf-8')).decode('utf-8')
37
- ret = json.loads(activate_ret).get("errorCode", None)
38
 
39
  if ret == 0:
40
- print_log("Successfully online activation SDK!")
41
- else:
42
- print_error(f"Failed to online activation SDK, Error code {ret}\n Trying offline activation SDK...");
43
- if os.path.exists(offline_key_path) is False:
44
- print_warning("Offline license key file not found!")
45
- print_error(f"Falied to offline activation SDK, Error code {ret}")
46
- return ret
47
  else:
48
- file=open(offline_key_path,"r")
49
- offline_key = file.read()
50
- file.close()
51
- activate_ret = set_activation(offline_key.encode('utf-8')).decode('utf-8')
52
- ret = json.loads(activate_ret).get("errorCode", None)
53
- if ret == 0:
54
- print_log("Successfully offline activation SDK!")
55
- else:
56
- print_error(f"Falied to offline activation SDK, Error code {ret}")
57
- return ret
58
-
59
- init_ret = init_sdk(dict_path.encode('utf-8')).decode('utf-8')
60
- ret = json.loads(activate_ret).get("errorCode", None)
61
- print_log(f"Init SDK: {ret}")
62
  sys.stdout.flush()
63
  return ret
64
 
65
 
66
- async def save_upload_file(upload_file: UploadFile) -> str:
67
- file_name = uuid.uuid4().hex[:6] + "_" + upload_file.filename
68
- save_path = os.path.join(dump_path, file_name)
69
- with open(save_path, "wb") as buffer:
70
- shutil.copyfileobj(upload_file.file, buffer)
71
- return os.path.abspath(save_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
- async def save_base64_file(base64_string: str) -> str:
74
- file_name = uuid.uuid4().hex[:6] + ".jpg" # or ".png" depending on your use
75
- save_path = os.path.join(dump_path, file_name)
76
-
77
- with open(save_path, "wb") as buffer:
78
- buffer.write(base64.b64decode(base64_string))
79
-
80
- return os.path.abspath(save_path)
81
-
82
  app = FastAPI()
83
 
84
  class ImageBase64Request(BaseModel):
@@ -88,112 +79,49 @@ class ImageBase64Request(BaseModel):
88
  def read_root():
89
  return {"status": "API is running"}
90
 
91
- @app.post("/api/read_idcard")
92
- async def read_idcard(image: UploadFile = File(...), image2: UploadFile = File(None)):
93
- try:
94
- file_path1 = await save_upload_file(image)
95
-
96
- file_path2 = ""
97
- if image2 is not None:
98
- file_path2 = await save_upload_file(image2)
99
-
100
-
101
- ocrResult = ocr_id_card(file_path1.encode('utf-8'), file_path2.encode('utf-8'))
102
- ocrResDict = json.loads(ocrResult)
103
-
104
- os.remove(file_path1)
105
- if file_path2:
106
- os.remove(file_path2)
107
-
108
- return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200)
109
-
110
- except Exception as e:
111
- return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
112
-
113
-
114
- @app.post("/api/read_idcard_base64")
115
- async def read_idcard_base64(request_data: ImageBase64Request):
116
  try:
117
- file_path = await save_base64_file(request_data.image)
118
-
119
- # Your OCR function (pass bytes-encoded file path)
120
- ocr_result = ocr_id_card(file_path.encode('utf-8'), ''.encode('utf-8'))
121
- ocr_res_dict = json.loads(ocr_result)
122
-
123
- # Remove the temp file
124
- os.remove(file_path)
125
 
126
- return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200)
 
127
 
128
- except Exception as e:
129
- return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
130
-
131
- @app.post("/api/read_credit")
132
- async def read_credit(image: UploadFile = File(...)):
133
- try:
134
- file_path = await save_upload_file(image)
135
-
136
- ocrResult = ocr_credit_card(file_path.encode('utf-8'))
137
- ocrResDict = json.loads(ocrResult)
138
-
139
- os.remove(file_path)
140
-
141
- return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200)
142
-
143
- except Exception as e:
144
- return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
145
-
146
- @app.post("/api/read_credit_base64")
147
- async def read_credit_base64(request_data: ImageBase64Request):
148
- try:
149
- file_path = await save_base64_file(request_data.image)
150
-
151
- # Your OCR function (pass bytes-encoded file path)
152
- ocr_result = ocr_credit_card(file_path.encode('utf-8'))
153
- ocr_res_dict = json.loads(ocr_result)
154
-
155
- # Remove the temp file
156
- os.remove(file_path)
157
-
158
- return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200)
159
-
160
- except Exception as e:
161
- return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
162
 
163
 
164
- @app.post("/api/read_barcode")
165
- async def read_barcode(image: UploadFile = File(...)):
166
  try:
167
- file_path = await save_upload_file(image)
 
 
 
168
 
169
- ocrResult = ocr_barcode(file_path.encode('utf-8'))
170
- ocrResDict = json.loads(ocrResult)
171
-
172
- os.remove(file_path)
173
 
174
- return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200)
 
175
 
176
- except Exception as e:
177
- return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
178
-
179
- @app.post("/api/read_barcode_base64")
180
- async def read_barcode_base64(request_data: ImageBase64Request):
181
  try:
182
- file_path = await save_base64_file(request_data.image)
183
-
184
- # Your OCR function (pass bytes-encoded file path)
185
- ocr_result = ocr_barcode(file_path.encode('utf-8'))
186
- ocr_res_dict = json.loads(ocr_result)
187
-
188
- # Remove the temp file
189
- os.remove(file_path)
190
-
191
- return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200)
192
-
193
  except Exception as e:
194
- return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
195
 
 
 
 
 
196
 
 
197
  if __name__ == '__main__':
198
  ret = activate_sdk()
199
  if ret != 0:
 
5
  import json
6
  import uuid
7
  import cv2
8
+ import io
9
  import numpy as np
10
  import gradio as gr
11
+ from PIL import Image, ExifTags, UnidentifiedImageError
12
  from time import gmtime, strftime
13
  from pydantic import BaseModel
14
+ from fastapi import FastAPI, File, UploadFile, HTTPException, Request
15
  from fastapi.responses import JSONResponse
16
  from typing import Dict
17
 
 
19
 
20
  file_path = os.path.abspath(__file__)
21
  root_path = os.path.dirname(file_path)
 
22
 
23
  device_id = get_deviceid().decode('utf-8')
24
  print_info('\t <Hardware ID> \t\t {}'.format(device_id))
25
 
26
+ def activate_id_live_sdk():
27
+ id_live_key = os.environ.get("LICENSE_KEY")
28
+ id_live_dict_path = os.path.join(root_path, "engine/model")
 
 
 
29
  ret = -1
30
+ if id_live_key is None:
31
+ print_warning("ID LIVE license key not found!")
32
+ return ret
33
  else:
34
+ ret = set_activation(id_live_key.encode('utf-8'))
 
35
 
36
  if ret == 0:
37
+ ret = init_sdk(id_live_dict_path.encode('utf-8'))
38
+ if ret == 0:
39
+ print_log("Successfully init ID LIVE SDK!")
 
 
 
 
40
  else:
41
+ print_log("Failed to init ID LIVE SDK!")
42
+ else:
43
+ print_error(f"Falied to activate ID LIVE SDK, Error code {ret}")
44
+
 
 
 
 
 
 
 
 
 
 
45
  sys.stdout.flush()
46
  return ret
47
 
48
 
49
+ def apply_exif_rotation(image):
50
+ try:
51
+ exif = image._getexif()
52
+ if exif is not None:
53
+ for orientation in ExifTags.TAGS.keys():
54
+ if ExifTags.TAGS[orientation] == 'Orientation':
55
+ break
56
+
57
+ # Get the orientation value
58
+ orientation = exif.get(orientation, None)
59
+
60
+ # Apply the appropriate rotation based on the orientation
61
+ if orientation == 3:
62
+ image = image.rotate(180, expand=True)
63
+ elif orientation == 6:
64
+ image = image.rotate(270, expand=True)
65
+ elif orientation == 8:
66
+ image = image.rotate(90, expand=True)
67
+
68
+ except AttributeError:
69
+ print("No EXIF data found")
70
+
71
+ return image
72
 
 
 
 
 
 
 
 
 
 
73
  app = FastAPI()
74
 
75
  class ImageBase64Request(BaseModel):
 
79
  def read_root():
80
  return {"status": "API is running"}
81
 
82
+ @app.post("/process_image")
83
+ async def process_image(image: UploadFile = File(...)):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  try:
85
+ contents = await image.read()
86
+ image_pil = apply_exif_rotation(Image.open(io.BytesIO(contents))).convert('RGB')
87
+ except UnidentifiedImageError:
88
+ return JSONResponse(status_code=400, content={"resultCode": "Error", "result": "Failed to open file"})
 
 
 
 
89
 
90
+ image_np = np.asarray(image_pil)
91
+ result = processImage(image_np, image_np.shape[1], image_np.shape[0])
92
 
93
+ if result is None:
94
+ return JSONResponse(status_code=400, content={"resultCode": "Error", "result": "Failed to process image"})
95
+
96
+ result_dict = json.loads(result.decode('utf-8'))
97
+ return JSONResponse(status_code=200, content={"resultCode": "Ok", "result": result_dict})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
 
99
 
100
+ @app.post("/process_image_base64")
101
+ async def process_image_base64(request: ImageBase64Request):
102
  try:
103
+ image_data = base64.b64decode(request.image)
104
+ image = apply_exif_rotation(Image.open(io.BytesIO(image_data))).convert("RGB")
105
+ except (base64.binascii.Error, UnidentifiedImageError, ValueError) as e:
106
+ raise HTTPException(status_code=400, detail=f"Failed to parse base64: {str(e)}")
107
 
108
+ image_np = np.asarray(image)
109
+ result = processImage(image_np, image_np.shape[1], image_np.shape[0])
 
 
110
 
111
+ if result is None:
112
+ raise HTTPException(status_code=400, detail="Failed to process image")
113
 
 
 
 
 
 
114
  try:
115
+ result_dict = json.loads(result.decode("utf-8"))
 
 
 
 
 
 
 
 
 
 
116
  except Exception as e:
117
+ raise HTTPException(status_code=500, detail=f"Failed to decode result: {str(e)}")
118
 
119
+ return JSONResponse(
120
+ content={"resultCode": "Ok", "result": result_dict},
121
+ status_code=200
122
+ )
123
 
124
+
125
  if __name__ == '__main__':
126
  ret = activate_sdk()
127
  if ret != 0: