File size: 5,699 Bytes
826cc86 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | import asyncio
import os
import json
import shlex
import re
from app.core.config import settings
from custom_logger import logger_config as logger
from app.db import crud
worker_task = None
worker_running = False
def is_worker_running():
return worker_running
async def start_worker():
global worker_task, worker_running
logger.info(f"start_worker called: worker_running={worker_running}")
if not worker_running:
worker_running = True
worker_task = asyncio.create_task(worker_loop())
logger.info("Worker task started")
else:
logger.info("Worker already running")
async def worker_loop():
global worker_running
logger.info("OCR Worker started. Monitoring for new image files...")
while worker_running:
logger.debug("Worker loop iteration, checking for files...")
await crud.cleanup_old_entries()
try:
row = await crud.get_next_not_started()
if row:
task_id = row['id']
filepath = row['filepath']
filename = row['filename']
logger.info(f"\n{'='*60}\nProcessing: {filename}\nID: {task_id}\n{'='*60}")
await crud.update_status(task_id, 'processing')
try:
await crud.update_progress(task_id, 10, "Starting OCR...")
command = f"cd {settings.CWD} && {settings.PYTHON_PATH} --input {shlex.quote(os.path.abspath(filepath))} --model {settings.OCR_MODEL_NAME}"
logger.debug(f"Executing command: {command}")
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=settings.CWD,
env={
**os.environ,
'PYTHONUNBUFFERED': '1',
'CUDA_LAUNCH_BLOCKING': '1',
'USE_CPU_IF_POSSIBLE': 'true'
}
)
while True:
line = await process.stdout.readline()
if not line:
break
line_str = line.decode('utf-8', errors='replace').strip()
if line_str:
logger.info(f"[OCR] {line_str}")
percent_match = re.search(r'(\d+)%', line_str)
if percent_match:
try:
percent = int(percent_match.group(1))
await crud.update_progress(task_id, min(percent, 90), "Processing...")
except: pass
if 'initializing paddleocr' in line_str.lower():
await crud.update_progress(task_id, 15, "Initializing engine...")
elif 'loading model' in line_str.lower():
await crud.update_progress(task_id, 25, "Loading OCR models...")
elif 'model loaded successfully' in line_str.lower():
await crud.update_progress(task_id, 40, "Models ready.")
elif 'processing:' in line_str.lower():
await crud.update_progress(task_id, 50, "Analyzing image...")
elif 'ocr completed successfully' in line_str.lower():
await crud.update_progress(task_id, 90, "OCR completed.")
elif 'json ocr saved' in line_str.lower():
await crud.update_progress(task_id, 95, "Saving data...")
await process.wait()
if process.returncode != 0:
raise Exception(f"OCR process failed with return code {process.returncode}")
await crud.update_progress(task_id, 98, "Reading results...")
output_path = os.path.join(settings.CWD, settings.TEMP_DIR, 'output_ocr.json')
with open(output_path, 'r') as file:
result = json.loads(file.read().strip())
result_data = result.get('text', '') or str(result)
logger.success(f"Successfully processed: {filename}")
logger.info(f"Text preview: {result_data[:100]}...")
await crud.update_status(task_id, 'completed', result=json.dumps(result))
if os.path.exists(filepath):
os.remove(filepath)
logger.debug(f"Deleted image file: {filepath}")
except Exception as e:
logger.error(f"Failed to process {filename}: {str(e)}")
await crud.update_status(task_id, 'failed', error=str(e))
else:
await asyncio.sleep(settings.POLL_INTERVAL)
except Exception as e:
logger.error(f"Worker error: {str(e)}")
await asyncio.sleep(settings.POLL_INTERVAL)
|