polyglot-ocr-jp / api /endpoints.py
xqt's picture
asdasd
e9aee4a
import uuid
from fastapi import APIRouter, UploadFile, File, HTTPException, Header
from schemas.responses import StandardResponse, ResponseStatus
from services.worker import task_queue, results_store
from core.config import settings
import os
import shutil
import json
router = APIRouter()
@router.get("/", include_in_schema=False, response_model=StandardResponse[dict])
async def root():
return StandardResponse(
status_code=ResponseStatus.SUCCESS,
data={"message": "Polyglot OCR Service Online"}
)
@router.get("/health", response_model=StandardResponse[dict])
async def health():
return StandardResponse(
status_code=ResponseStatus.SUCCESS,
data={
"worker_alive": True,
"queue_depth": task_queue.qsize()
}
)
@router.post("/upload", response_model=StandardResponse[dict])
async def upload(file: UploadFile = File(...), content_length: int = Header(None)):
if task_queue.qsize() >= settings.MAX_QUEUE_SIZE:
return {"status_code": ResponseStatus.QUEUE_FULL, "data": {"message": "Server busy."}}
if (content_length or file.size) > settings.MAX_FILE_SIZE:
return {"status_code": ResponseStatus.FILE_SIZE_EXCEEDED, "data": {"limit": settings.MAX_FILE_SIZE}}
if not (ext := os.path.splitext(file.filename.lower())[1]) in {'.pdf', '.png', '.jpg', '.jpeg'}:
return {"status_code": ResponseStatus.INVALID_FILE_TYPE, "data": {"message": "Unsupported file."}}
task_id = str(uuid.uuid4())
os.makedirs(settings.STORAGE_DIR, exist_ok=True)
file_path = os.path.join(settings.STORAGE_DIR, f"{task_id}{ext}")
try:
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
finally:
file.file.close()
results_store[task_id] = {"file_path": file_path}
await task_queue.put((task_id, file_path))
return {
"status_code": ResponseStatus.SUCCESS,
"data": {"task_id": task_id, "queue_position": task_queue.qsize()}
}
@router.get("/result/{task_id}", response_model=StandardResponse[dict])
async def get_result(task_id: str):
task_info = results_store.get(task_id)
if not task_info:
return {"status_code": ResponseStatus.TASK_NOT_FOUND, "data": {"message": "Expired or missing."}}
status_code = task_info.get("status_code")
# If successful, load from disk, return, and don't keep in RAM
if status_code == ResponseStatus.SUCCESS:
json_path = task_info.get("result_file")
if json_path and os.path.exists(json_path):
with open(json_path, 'r', encoding='utf-8') as f:
disk_data = json.load(f)
return {
"status_code": ResponseStatus.SUCCESS,
"data": disk_data
}
if "error" in task_info:
return {"status_code": status_code, "data": {"error": task_info["error"]}}
return {"status_code": ResponseStatus.PENDING, "data": {"message": "Processing..."}}