xqt commited on
Commit
e9aee4a
·
1 Parent(s): 824383e
Dockerfile CHANGED
@@ -29,4 +29,4 @@ COPY --chown=user . /app
29
 
30
  RUN python3 -c "from paddleocr import PaddleOCR; PaddleOCR(use_angle_cls=True, lang='japan')"
31
 
32
- CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860", "--workers", "1"]
 
29
 
30
  RUN python3 -c "from paddleocr import PaddleOCR; PaddleOCR(use_angle_cls=True, lang='japan')"
31
 
32
+ CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860"]
api/endpoints.py CHANGED
@@ -1,45 +1,83 @@
1
  import uuid
2
- from fastapi import APIRouter, UploadFile, File, HTTPException
3
  from schemas.responses import StandardResponse, ResponseStatus
4
  from services.worker import task_queue, results_store
 
 
 
 
5
 
6
  router = APIRouter()
7
 
8
- @router.get("/health")
 
 
 
 
 
 
 
9
  async def health():
10
- return {"status": "healthy", "queue_depth": task_queue.qsize()}
 
 
 
 
 
 
11
 
12
- # 1. Add this to satisfy the internal health check
13
- @router.get("/")
14
- async def root():
15
- return JSONResponse(content={"status": "alive"}, status_code=200)
16
 
17
  @router.post("/upload", response_model=StandardResponse[dict])
18
- async def upload(file: UploadFile = File(...)):
19
- if not file.filename.lower().endswith(('.pdf', '.png', '.jpg', '.jpeg')):
20
- raise HTTPException(status_code=400, detail="Unsupported file format.")
21
-
 
 
 
 
 
 
22
  task_id = str(uuid.uuid4())
23
- content = await file.read()
 
24
 
25
- results_store[task_id] = {"status": "pending"}
26
- await task_queue.put((task_id, content, file.filename.lower().endswith('.pdf')))
27
-
28
- return StandardResponse(
29
- status_code=ResponseStatus.SUCCESS,
30
- data={"task_id": task_id, "queue_position": task_queue.qsize()}
31
- )
 
 
 
 
 
 
 
32
 
33
  @router.get("/result/{task_id}", response_model=StandardResponse[dict])
34
  async def get_result(task_id: str):
35
- result = results_store.get(task_id)
36
- if not result:
37
- raise HTTPException(status_code=404, detail="Task not found or expired.")
38
 
39
- # Map internal status to our Enum
40
- status_map = {"completed": ResponseStatus.SUCCESS, "pending": ResponseStatus.PENDING, "failed": ResponseStatus.ERROR}
41
-
42
- return StandardResponse(
43
- status_code=status_map.get(result["status"], ResponseStatus.PENDING),
44
- data=result
45
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import uuid
2
+ from fastapi import APIRouter, UploadFile, File, HTTPException, Header
3
  from schemas.responses import StandardResponse, ResponseStatus
4
  from services.worker import task_queue, results_store
5
+ from core.config import settings
6
+ import os
7
+ import shutil
8
+ import json
9
 
10
  router = APIRouter()
11
 
12
+ @router.get("/", include_in_schema=False, response_model=StandardResponse[dict])
13
+ async def root():
14
+ return StandardResponse(
15
+ status_code=ResponseStatus.SUCCESS,
16
+ data={"message": "Polyglot OCR Service Online"}
17
+ )
18
+
19
+ @router.get("/health", response_model=StandardResponse[dict])
20
  async def health():
21
+ return StandardResponse(
22
+ status_code=ResponseStatus.SUCCESS,
23
+ data={
24
+ "worker_alive": True,
25
+ "queue_depth": task_queue.qsize()
26
+ }
27
+ )
28
 
 
 
 
 
29
 
30
  @router.post("/upload", response_model=StandardResponse[dict])
31
+ async def upload(file: UploadFile = File(...), content_length: int = Header(None)):
32
+ if task_queue.qsize() >= settings.MAX_QUEUE_SIZE:
33
+ return {"status_code": ResponseStatus.QUEUE_FULL, "data": {"message": "Server busy."}}
34
+
35
+ if (content_length or file.size) > settings.MAX_FILE_SIZE:
36
+ return {"status_code": ResponseStatus.FILE_SIZE_EXCEEDED, "data": {"limit": settings.MAX_FILE_SIZE}}
37
+
38
+ if not (ext := os.path.splitext(file.filename.lower())[1]) in {'.pdf', '.png', '.jpg', '.jpeg'}:
39
+ return {"status_code": ResponseStatus.INVALID_FILE_TYPE, "data": {"message": "Unsupported file."}}
40
+
41
  task_id = str(uuid.uuid4())
42
+ os.makedirs(settings.STORAGE_DIR, exist_ok=True)
43
+ file_path = os.path.join(settings.STORAGE_DIR, f"{task_id}{ext}")
44
 
45
+ try:
46
+ with open(file_path, "wb") as buffer:
47
+ shutil.copyfileobj(file.file, buffer)
48
+ finally:
49
+ file.file.close()
50
+
51
+ results_store[task_id] = {"file_path": file_path}
52
+ await task_queue.put((task_id, file_path))
53
+
54
+ return {
55
+ "status_code": ResponseStatus.SUCCESS,
56
+ "data": {"task_id": task_id, "queue_position": task_queue.qsize()}
57
+ }
58
+
59
 
60
  @router.get("/result/{task_id}", response_model=StandardResponse[dict])
61
  async def get_result(task_id: str):
62
+ task_info = results_store.get(task_id)
 
 
63
 
64
+ if not task_info:
65
+ return {"status_code": ResponseStatus.TASK_NOT_FOUND, "data": {"message": "Expired or missing."}}
66
+
67
+ status_code = task_info.get("status_code")
68
+
69
+ # If successful, load from disk, return, and don't keep in RAM
70
+ if status_code == ResponseStatus.SUCCESS:
71
+ json_path = task_info.get("result_file")
72
+ if json_path and os.path.exists(json_path):
73
+ with open(json_path, 'r', encoding='utf-8') as f:
74
+ disk_data = json.load(f)
75
+ return {
76
+ "status_code": ResponseStatus.SUCCESS,
77
+ "data": disk_data
78
+ }
79
+
80
+ if "error" in task_info:
81
+ return {"status_code": status_code, "data": {"error": task_info["error"]}}
82
+
83
+ return {"status_code": ResponseStatus.PENDING, "data": {"message": "Processing..."}}
core/config.py CHANGED
@@ -2,9 +2,12 @@ from pydantic_settings import BaseSettings
2
 
3
  class Settings(BaseSettings):
4
  PROJECT_NAME: str = "Polyglot OCR Japanese"
 
5
  VERSION: str = "1.0.0"
6
  MAX_FILE_SIZE: int = 500 * 1024 * 1024
7
  STORAGE_QUOTA: int = 20 * 1024 * 1024 * 1024
8
  TASK_EXPIRY_SECONDS: int = 86400
 
 
9
 
10
  settings = Settings()
 
2
 
3
  class Settings(BaseSettings):
4
  PROJECT_NAME: str = "Polyglot OCR Japanese"
5
+ LANGUAGE: str = "japan"
6
  VERSION: str = "1.0.0"
7
  MAX_FILE_SIZE: int = 500 * 1024 * 1024
8
  STORAGE_QUOTA: int = 20 * 1024 * 1024 * 1024
9
  TASK_EXPIRY_SECONDS: int = 86400
10
+ MAX_QUEUE_SIZE: int = 100
11
+ STORAGE_DIR: str = "/tmp/pglt"
12
 
13
  settings = Settings()
main.py CHANGED
@@ -1,14 +1,16 @@
1
  import asyncio
2
  from fastapi import FastAPI
3
- from api.endpoints import router as api_v1_router
4
- from services.worker import ocr_worker, janitor_task
5
  from core.config import settings
 
6
 
7
  app = FastAPI(title=settings.PROJECT_NAME)
8
- app.include_router(api.endpoints.router)
9
 
10
  @app.on_event("startup")
11
  async def startup_event():
 
 
12
  asyncio.create_task(ocr_worker())
13
- asyncio.create_task(janitor_task())
14
- asdasd
 
1
  import asyncio
2
  from fastapi import FastAPI
3
+ from api.endpoints import router
4
+ from services.worker import preprocessor_worker, ocr_worker, janitor_worker
5
  from core.config import settings
6
+ import os
7
 
8
  app = FastAPI(title=settings.PROJECT_NAME)
9
+ app.include_router(router)
10
 
11
  @app.on_event("startup")
12
  async def startup_event():
13
+ os.makedirs(settings.STORAGE_DIR, exist_ok=True)
14
+ asyncio.create_task(preprocessor_worker())
15
  asyncio.create_task(ocr_worker())
16
+ asyncio.create_task(janitor_worker())
 
requirements.txt CHANGED
@@ -1,5 +1,6 @@
1
  fastapi==0.128.8
2
  uvicorn==0.39.0
 
3
  python-multipart==0.0.20
4
  paddlepaddle==3.3.0
5
  paddleocr==3.4.0
 
1
  fastapi==0.128.8
2
  uvicorn==0.39.0
3
+ pydantic_settings==2.11.0
4
  python-multipart==0.0.20
5
  paddlepaddle==3.3.0
6
  paddleocr==3.4.0
schemas/responses.py CHANGED
@@ -1,21 +1,41 @@
1
  from enum import Enum
2
  from typing import Generic, TypeVar, Optional, Any
3
  from datetime import datetime, timezone
4
- from pydantic import BaseModel, Field
5
  from core.config import settings
6
 
7
  T = TypeVar("T")
8
 
9
  class ResponseStatus(str, Enum):
 
10
  SUCCESS = "success"
11
- ERROR = "error"
12
  PENDING = "pending"
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
  class MetaData(BaseModel):
15
  version: str = settings.VERSION
16
  timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
17
 
18
  class StandardResponse(BaseModel, Generic[T]):
 
19
  status_code: ResponseStatus
20
  data: Optional[T] = None
21
- metadata: MetaData = Field(default_factory=MetaData)
 
 
 
 
 
 
 
 
1
  from enum import Enum
2
  from typing import Generic, TypeVar, Optional, Any
3
  from datetime import datetime, timezone
4
+ from pydantic import BaseModel, Field, model_validator
5
  from core.config import settings
6
 
7
  T = TypeVar("T")
8
 
9
  class ResponseStatus(str, Enum):
10
+ # --- Standard States ---
11
  SUCCESS = "success"
 
12
  PENDING = "pending"
13
+
14
+ # --- Client-Side Errors (4xx range logic) ---
15
+ INVALID_FILE_TYPE = "invalid_file_type" # Magic number mismatch
16
+ FILE_SIZE_EXCEEDED = "file_size_exceeded" # Over MAX_FILE_SIZE
17
+ FILE_CORRUPTED = "file_corrupted" # Fails integrity check (PIL/PDF)
18
+ TASK_NOT_FOUND = "task_not_found" # Invalid or expired UUID
19
+
20
+ # --- Server-Side Errors (5xx range logic) ---
21
+ INTERNAL_ERROR = "internal_server_error" # Unhandled exceptions/OCR crashes
22
+ QUEUE_FULL = "queue_full" # task_queue.qsize() > limit
23
+ STORAGE_FULL = "storage_full" # Total quota on disk exceeded
24
+ WORKER_UNAVAILABLE = "worker_unavailable" # Background loop crashed
25
 
26
  class MetaData(BaseModel):
27
  version: str = settings.VERSION
28
  timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
29
 
30
  class StandardResponse(BaseModel, Generic[T]):
31
+ status: bool
32
  status_code: ResponseStatus
33
  data: Optional[T] = None
34
+ metadata: MetaData = Field(default_factory=MetaData)
35
+
36
+ @model_validator(mode='before')
37
+ @classmethod
38
+ def set_status(cls, data: Any) -> Any:
39
+ if isinstance(data, dict) and 'status_code' in data:
40
+ data['status'] = data['status_code'] == ResponseStatus.SUCCESS
41
+ return data
services/ocr_engine.py CHANGED
@@ -1,8 +1,9 @@
1
  from paddleocr import PaddleOCR
 
2
 
3
  # Initialized once upon import
4
  engine = PaddleOCR(
5
  use_angle_cls=True,
6
- lang='japan',
7
  enable_mkldnn=False
8
  )
 
1
  from paddleocr import PaddleOCR
2
+ from core.config import settings
3
 
4
  # Initialized once upon import
5
  engine = PaddleOCR(
6
  use_angle_cls=True,
7
+ lang=settings.LANGUAGE,
8
  enable_mkldnn=False
9
  )
services/worker.py CHANGED
@@ -6,50 +6,151 @@ import numpy as np
6
  from PIL import Image
7
  from pdf2image import convert_from_bytes
8
  from services.ocr_engine import engine
9
- from utils.helpers import json_ready
10
  from core.config import settings
 
11
 
12
  # Global State
13
  results_store = {}
14
  task_queue = asyncio.Queue()
 
15
 
16
- async def ocr_worker():
17
  while True:
18
- task_id, content, is_pdf = await task_queue.get()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  try:
20
  loop = asyncio.get_event_loop()
21
- pages_images = [np.array(p.convert('RGB')) for p in convert_from_bytes(content)] if is_pdf \
22
- else [np.array(Image.open(io.BytesIO(content)).convert('RGB'))]
 
23
 
24
- output = []
25
- for i, img_arr in enumerate(pages_images):
 
 
26
  raw = await loop.run_in_executor(None, functools.partial(engine.ocr, img_arr))
27
 
28
  page_results = []
29
- if isinstance(raw, list) and len(raw) > 0:
30
  data = raw[0]
31
  if isinstance(data, dict) and 'rec_texts' in data:
32
  for t, s, b in zip(data['rec_texts'], data['rec_scores'], data['dt_polys']):
33
- page_results.append({"text": str(t), "confidence": float(s), "box": json_ready(b)})
 
 
 
 
34
  else:
35
  page_results = json_ready(data)
36
 
37
- output.append({"page": i + 1, "results": page_results})
 
 
 
 
 
 
 
 
38
 
 
 
 
39
  results_store[task_id] = {
40
- "status": "completed",
41
- "data": output,
42
  "expires_at": time.time() + settings.TASK_EXPIRY_SECONDS
43
  }
 
 
 
 
44
  except Exception as e:
45
- results_store[task_id] = {"status": "failed", "error": str(e), "expires_at": time.time() + settings.TASK_EXPIRY_SECONDS}
 
 
 
 
46
  finally:
47
- task_queue.task_done()
 
 
 
 
 
 
 
 
 
48
 
49
- async def janitor_task():
50
  while True:
51
- await asyncio.sleep(3600)
 
 
52
  now = time.time()
53
- to_delete = [tid for tid, res in results_store.items() if now > res.get("expires_at", 0)]
54
- for tid in to_delete:
55
- del results_store[tid]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  from PIL import Image
7
  from pdf2image import convert_from_bytes
8
  from services.ocr_engine import engine
9
+ from utils.helpers import json_ready, process_and_save_image
10
  from core.config import settings
11
+ import os
12
 
13
  # Global State
14
  results_store = {}
15
  task_queue = asyncio.Queue()
16
+ ocr_queue = asyncio.Queue()
17
 
18
+ async def preprocessor_worker():
19
  while True:
20
+ task_id, file_path = await task_queue.get()
21
+ try:
22
+ ext = os.path.splitext(file_path.lower())[1]
23
+ page_paths = []
24
+
25
+ if ext == '.pdf':
26
+ info = pdfinfo_from_path(file_path)
27
+ max_pages = info["Pages"]
28
+
29
+ for i in range(1, max_pages + 1):
30
+ page = convert_from_path(
31
+ file_path,
32
+ first_page=i,
33
+ last_page=i,
34
+ dpi=150
35
+ )[0]
36
+
37
+ chunk_path = process_and_save_image(page, file_path, i)
38
+ page_paths.append(chunk_path)
39
+ del page
40
+ else:
41
+ with Image.open(file_path) as img:
42
+ chunk_path = process_and_save_image(img, file_path, 0)
43
+ page_paths.append(chunk_path)
44
+
45
+ await ocr_queue.put((task_id, page_paths))
46
+
47
+ except Exception as e:
48
+ results_store[task_id] = {
49
+ "status": "failed",
50
+ "status_code": ResponseStatus.FILE_CORRUPTED,
51
+ "error": f"Memory-safe conversion failed: {str(e)}"
52
+ }
53
+ finally:
54
+ if os.path.exists(file_path):
55
+ os.remove(file_path)
56
+ task_queue.task_done()
57
+
58
+ async def ocr_worker():
59
+ while True:
60
+ task_id, page_paths = await ocr_queue.get()
61
  try:
62
  loop = asyncio.get_event_loop()
63
+ results_list = []
64
+
65
+ is_pdf = len(page_paths) > 1
66
 
67
+ for i, p_path in enumerate(page_paths):
68
+ img = Image.open(p_path)
69
+ img_arr = np.array(img.convert('RGB'))
70
+
71
  raw = await loop.run_in_executor(None, functools.partial(engine.ocr, img_arr))
72
 
73
  page_results = []
74
+ if isinstance(raw, list) and len(raw) > 0 and raw[0] is not None:
75
  data = raw[0]
76
  if isinstance(data, dict) and 'rec_texts' in data:
77
  for t, s, b in zip(data['rec_texts'], data['rec_scores'], data['dt_polys']):
78
+ page_results.append({
79
+ "text": str(t),
80
+ "confidence": round(float(s), 4),
81
+ "box": json_ready(b)
82
+ })
83
  else:
84
  page_results = json_ready(data)
85
 
86
+ results_list.append({"page": i + 1, "results": page_results})
87
+
88
+ if os.path.exists(p_path):
89
+ os.remove(p_path)
90
+
91
+ inal_data = {"pages": results_list}
92
+
93
+ json_filename = f"{task_id}_results.json"
94
+ json_path = os.path.join(settings.STORAGE_DIR, json_filename)
95
 
96
+ with open(json_path, 'w', encoding='utf-8') as f:
97
+ json.dump(final_data, f, ensure_ascii=False, indent=2)
98
+
99
  results_store[task_id] = {
100
+ "status_code": ResponseStatus.SUCCESS,
101
+ "result_file": json_path,
102
  "expires_at": time.time() + settings.TASK_EXPIRY_SECONDS
103
  }
104
+
105
+ del results_list
106
+ del final_data
107
+
108
  except Exception as e:
109
+ results_store[task_id] = {
110
+ "status_code": ResponseStatus.INTERNAL_ERROR,
111
+ "error": str(e),
112
+ "expires_at": time.time() + settings.TASK_EXPIRY_SECONDS
113
+ }
114
  finally:
115
+ ocr_queue.task_done()
116
+
117
+
118
+ async def janitor_worker():
119
+ """
120
+ Background task to purge files and memory records older than 3 hours.
121
+ Runs every 30 minutes to ensure the disk stays lean.
122
+ """
123
+ # 3 hours in seconds
124
+ EXPIRY_DURATION = 3 * 60 * 60
125
 
 
126
  while True:
127
+ # Check every 30 minutes (1800 seconds)
128
+ await asyncio.sleep(1800)
129
+
130
  now = time.time()
131
+ expired_tasks = []
132
+
133
+ # Iterate safely over a copy of the keys
134
+ for task_id in list(results_store.keys()):
135
+ task_info = results_store.get(task_id)
136
+ if not task_info:
137
+ continue
138
+
139
+ # Calculate age based on when the task was completed/created
140
+ # If 'expires_at' was set at task completion, we check against that
141
+ if now > task_info.get("expires_at", 0):
142
+
143
+ # 1. Physically remove the JSON results file
144
+ json_path = task_info.get("result_file")
145
+ if json_path and os.path.exists(json_path):
146
+ try:
147
+ os.remove(json_path)
148
+ except Exception as e:
149
+ print(f"Janitor failed to delete {json_path}: {e}")
150
+
151
+ # 2. Mark for memory cleanup
152
+ expired_tasks.append(task_id)
153
+
154
+ # 3. Remove from memory store
155
+ for task_id in expired_tasks:
156
+ del results_store[task_id]
utils/helpers.py CHANGED
@@ -1,4 +1,5 @@
1
  import numpy as np
 
2
 
3
  def json_ready(obj):
4
  """Recursively converts NumPy types to basic Python types for JSON compatibility."""
@@ -12,4 +13,17 @@ def json_ready(obj):
12
  return float(obj)
13
  if isinstance(obj, (np.int32, np.int64)):
14
  return int(obj)
15
- return obj
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import numpy as np
2
+ from PIL import Image
3
 
4
  def json_ready(obj):
5
  """Recursively converts NumPy types to basic Python types for JSON compatibility."""
 
13
  return float(obj)
14
  if isinstance(obj, (np.int32, np.int64)):
15
  return int(obj)
16
+ return obj
17
+
18
+ def process_and_save_image(pil_img, original_path, page_num):
19
+ img = pil_img.convert('RGB')
20
+ width, height = img.size
21
+
22
+ scale = 960 / max(width, height)
23
+ if scale < 1.0:
24
+ new_size = (int(width * scale), int(height * scale))
25
+ img = img.resize(new_size, Image.Resampling.LANCZOS)
26
+
27
+ chunk_path = f"{original_path}_p{page_num}.jpg"
28
+ img.save(chunk_path, "JPEG", quality=85, optimize=True)
29
+ return chunk_path