import sys import os import base64 import json import uuid import cv2 import numpy as np import gradio as gr import shutil from time import gmtime, strftime from pydantic import BaseModel from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse from typing import Dict from engine.header import * file_path = os.path.abspath(__file__) root_path = os.path.dirname(file_path) dump_path = os.path.join(root_path, "dump2/") device_id = get_deviceid().decode('utf-8') print_info('\t \t\t {}'.format(device_id)) def activate_sdk(): online_key = os.environ.get("LICENSE_KEY") offline_key_path = os.path.join(root_path, "license.txt") dict_path = os.path.join(root_path, "engine/bin") ret = -1 if online_key is None: print_warning("Online license key not found!") else: activate_ret = set_activation(online_key.encode('utf-8')).decode('utf-8') ret = json.loads(activate_ret).get("errorCode", None) if ret == 0: print_log("Successfully online activation SDK!") else: print_error(f"Failed to online activation SDK, Error code {ret}\n Trying offline activation SDK..."); if os.path.exists(offline_key_path) is False: print_warning("Offline license key file not found!") print_error(f"Falied to offline activation SDK, Error code {ret}") return ret else: file=open(offline_key_path,"r") offline_key = file.read() file.close() activate_ret = set_activation(offline_key.encode('utf-8')).decode('utf-8') ret = json.loads(activate_ret).get("errorCode", None) if ret == 0: print_log("Successfully offline activation SDK!") else: print_error(f"Falied to offline activation SDK, Error code {ret}") return ret init_ret = init_sdk(dict_path.encode('utf-8')).decode('utf-8') ret = json.loads(activate_ret).get("errorCode", None) print_log(f"Init SDK: {ret}") sys.stdout.flush() return ret async def save_upload_file(upload_file: UploadFile) -> str: file_name = uuid.uuid4().hex[:6] + "_" + upload_file.filename save_path = os.path.join(dump_path, file_name) with open(save_path, "wb") as buffer: shutil.copyfileobj(upload_file.file, buffer) return os.path.abspath(save_path) async def save_base64_file(base64_string: str) -> str: file_name = uuid.uuid4().hex[:6] + ".jpg" # or ".png" depending on your use save_path = os.path.join(dump_path, file_name) with open(save_path, "wb") as buffer: buffer.write(base64.b64decode(base64_string)) return os.path.abspath(save_path) app = FastAPI() class ImageBase64Request(BaseModel): image: str @app.get("/") def read_root(): return {"status": "API is running"} @app.post("/api/read_idcard") async def read_idcard(image: UploadFile = File(...), image2: UploadFile = File(None)): try: file_path1 = await save_upload_file(image) file_path2 = "" if image2 is not None: file_path2 = await save_upload_file(image2) ocrResult = ocr_id_card(file_path1.encode('utf-8'), file_path2.encode('utf-8')) ocrResDict = json.loads(ocrResult) os.remove(file_path1) if file_path2: os.remove(file_path2) return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) @app.post("/api/read_idcard_base64") async def read_idcard_base64(request_data: ImageBase64Request): try: file_path = await save_base64_file(request_data.image) # Your OCR function (pass bytes-encoded file path) ocr_result = ocr_id_card(file_path.encode('utf-8'), ''.encode('utf-8')) ocr_res_dict = json.loads(ocr_result) # Remove the temp file os.remove(file_path) return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) @app.post("/api/read_credit") async def read_credit(image: UploadFile = File(...)): try: file_path = await save_upload_file(image) ocrResult = ocr_credit_card(file_path.encode('utf-8')) ocrResDict = json.loads(ocrResult) os.remove(file_path) return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) @app.post("/api/read_credit_base64") async def read_credit_base64(request_data: ImageBase64Request): try: file_path = await save_base64_file(request_data.image) # Your OCR function (pass bytes-encoded file path) ocr_result = ocr_credit_card(file_path.encode('utf-8')) ocr_res_dict = json.loads(ocr_result) # Remove the temp file os.remove(file_path) return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) @app.post("/api/read_barcode") async def read_barcode(image: UploadFile = File(...)): try: file_path = await save_upload_file(image) ocrResult = ocr_barcode(file_path.encode('utf-8')) ocrResDict = json.loads(ocrResult) os.remove(file_path) return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) @app.post("/api/read_barcode_base64") async def read_barcode_base64(request_data: ImageBase64Request): try: file_path = await save_base64_file(request_data.image) # Your OCR function (pass bytes-encoded file path) ocr_result = ocr_barcode(file_path.encode('utf-8')) ocr_res_dict = json.loads(ocr_result) # Remove the temp file os.remove(file_path) return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) if __name__ == '__main__': ret = activate_sdk() if ret != 0: exit(-1) dummy_interface = gr.Interface( fn=lambda x: "API ready.", inputs=gr.Textbox(label="Info"), outputs=gr.Textbox(label="Response"), flagging_mode="auto" # 🚫 disables writing to `flagged/` ) gr_app = gr.mount_gradio_app(app, dummy_interface, path="/gradio") import uvicorn uvicorn.run(gr_app, host="0.0.0.0", port=7860)