| import asyncio |
| import os |
| import json |
| import shlex |
| import re |
| from app.core.config import settings |
| from custom_logger import logger_config as logger |
| from app.db import crud |
|
|
| worker_task = None |
| worker_running = False |
|
|
| def is_worker_running(): |
| return worker_running |
|
|
| async def start_worker(): |
| global worker_task, worker_running |
| |
| logger.info(f"start_worker called: worker_running={worker_running}") |
| |
| if not worker_running: |
| worker_running = True |
| worker_task = asyncio.create_task(worker_loop()) |
| logger.info("Worker task started") |
| else: |
| logger.info("Worker already running") |
|
|
| async def worker_loop(): |
| global worker_running |
| logger.info("OCR Worker started. Monitoring for new image files...") |
| |
| while worker_running: |
| logger.debug("Worker loop iteration, checking for files...") |
| await crud.cleanup_old_entries() |
| |
| try: |
| row = await crud.get_next_not_started() |
| |
| if row: |
| task_id = row['id'] |
| filepath = row['filepath'] |
| filename = row['filename'] |
| |
| logger.info(f"\n{'='*60}\nProcessing: {filename}\nID: {task_id}\n{'='*60}") |
| |
| await crud.update_status(task_id, 'processing') |
| |
| try: |
| await crud.update_progress(task_id, 10, "Starting OCR...") |
| |
| command = f"cd {settings.CWD} && {settings.PYTHON_PATH} --input {shlex.quote(os.path.abspath(filepath))} --model {settings.OCR_MODEL_NAME}" |
| |
| logger.debug(f"Executing command: {command}") |
| |
| process = await asyncio.create_subprocess_shell( |
| command, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.STDOUT, |
| cwd=settings.CWD, |
| env={ |
| **os.environ, |
| 'PYTHONUNBUFFERED': '1', |
| 'CUDA_LAUNCH_BLOCKING': '1', |
| 'USE_CPU_IF_POSSIBLE': 'true' |
| } |
| ) |
| |
| while True: |
| line = await process.stdout.readline() |
| if not line: |
| break |
| |
| line_str = line.decode('utf-8', errors='replace').strip() |
| if line_str: |
| logger.info(f"[OCR] {line_str}") |
| |
| percent_match = re.search(r'(\d+)%', line_str) |
| if percent_match: |
| try: |
| percent = int(percent_match.group(1)) |
| await crud.update_progress(task_id, min(percent, 90), "Processing...") |
| except: pass |
| |
| if 'initializing paddleocr' in line_str.lower(): |
| await crud.update_progress(task_id, 15, "Initializing engine...") |
| elif 'loading model' in line_str.lower(): |
| await crud.update_progress(task_id, 25, "Loading OCR models...") |
| elif 'model loaded successfully' in line_str.lower(): |
| await crud.update_progress(task_id, 40, "Models ready.") |
| elif 'processing:' in line_str.lower(): |
| await crud.update_progress(task_id, 50, "Analyzing image...") |
| elif 'ocr completed successfully' in line_str.lower(): |
| await crud.update_progress(task_id, 90, "OCR completed.") |
| elif 'json ocr saved' in line_str.lower(): |
| await crud.update_progress(task_id, 95, "Saving data...") |
| |
| await process.wait() |
| if process.returncode != 0: |
| raise Exception(f"OCR process failed with return code {process.returncode}") |
| |
| await crud.update_progress(task_id, 98, "Reading results...") |
| |
| output_path = os.path.join(settings.CWD, settings.TEMP_DIR, 'output_ocr.json') |
| with open(output_path, 'r') as file: |
| result = json.loads(file.read().strip()) |
| |
| result_data = result.get('text', '') or str(result) |
| |
| logger.success(f"Successfully processed: {filename}") |
| logger.info(f"Text preview: {result_data[:100]}...") |
| |
| await crud.update_status(task_id, 'completed', result=json.dumps(result)) |
| |
| if os.path.exists(filepath): |
| os.remove(filepath) |
| logger.debug(f"Deleted image file: {filepath}") |
| |
| except Exception as e: |
| logger.error(f"Failed to process {filename}: {str(e)}") |
| await crud.update_status(task_id, 'failed', error=str(e)) |
| |
| else: |
| await asyncio.sleep(settings.POLL_INTERVAL) |
| |
| except Exception as e: |
| logger.error(f"Worker error: {str(e)}") |
| await asyncio.sleep(settings.POLL_INTERVAL) |
|
|