File size: 5,699 Bytes
826cc86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import asyncio
import os
import json
import shlex
import re
from app.core.config import settings
from custom_logger import logger_config as logger
from app.db import crud

worker_task = None
worker_running = False

def is_worker_running():
    return worker_running

async def start_worker():
    global worker_task, worker_running
    
    logger.info(f"start_worker called: worker_running={worker_running}")
    
    if not worker_running:
        worker_running = True
        worker_task = asyncio.create_task(worker_loop())
        logger.info("Worker task started")
    else:
        logger.info("Worker already running")

async def worker_loop():
    global worker_running
    logger.info("OCR Worker started. Monitoring for new image files...")
    
    while worker_running:
        logger.debug("Worker loop iteration, checking for files...")
        await crud.cleanup_old_entries()
        
        try:
            row = await crud.get_next_not_started()
            
            if row:
                task_id = row['id']
                filepath = row['filepath']
                filename = row['filename']
                
                logger.info(f"\n{'='*60}\nProcessing: {filename}\nID: {task_id}\n{'='*60}")
                
                await crud.update_status(task_id, 'processing')
                
                try:
                    await crud.update_progress(task_id, 10, "Starting OCR...")
                    
                    command = f"cd {settings.CWD} && {settings.PYTHON_PATH} --input {shlex.quote(os.path.abspath(filepath))} --model {settings.OCR_MODEL_NAME}"
                    
                    logger.debug(f"Executing command: {command}")
                    
                    process = await asyncio.create_subprocess_shell(
                        command,
                        stdout=asyncio.subprocess.PIPE,
                        stderr=asyncio.subprocess.STDOUT,
                        cwd=settings.CWD,
                        env={
                            **os.environ,
                            'PYTHONUNBUFFERED': '1',
                            'CUDA_LAUNCH_BLOCKING': '1',
                            'USE_CPU_IF_POSSIBLE': 'true'
                        }
                    )
                    
                    while True:
                        line = await process.stdout.readline()
                        if not line:
                            break
                            
                        line_str = line.decode('utf-8', errors='replace').strip()
                        if line_str:
                            logger.info(f"[OCR] {line_str}")
                            
                            percent_match = re.search(r'(\d+)%', line_str)
                            if percent_match:
                                try:
                                    percent = int(percent_match.group(1))
                                    await crud.update_progress(task_id, min(percent, 90), "Processing...")
                                except: pass
                            
                            if 'initializing paddleocr' in line_str.lower():
                                await crud.update_progress(task_id, 15, "Initializing engine...")
                            elif 'loading model' in line_str.lower():
                                await crud.update_progress(task_id, 25, "Loading OCR models...")
                            elif 'model loaded successfully' in line_str.lower():
                                await crud.update_progress(task_id, 40, "Models ready.")
                            elif 'processing:' in line_str.lower():
                                await crud.update_progress(task_id, 50, "Analyzing image...")
                            elif 'ocr completed successfully' in line_str.lower():
                                await crud.update_progress(task_id, 90, "OCR completed.")
                            elif 'json ocr saved' in line_str.lower():
                                await crud.update_progress(task_id, 95, "Saving data...")
                    
                    await process.wait()
                    if process.returncode != 0:
                        raise Exception(f"OCR process failed with return code {process.returncode}")
                    
                    await crud.update_progress(task_id, 98, "Reading results...")
                    
                    output_path = os.path.join(settings.CWD, settings.TEMP_DIR, 'output_ocr.json')
                    with open(output_path, 'r') as file:
                        result = json.loads(file.read().strip())
                    
                    result_data = result.get('text', '') or str(result)
                    
                    logger.success(f"Successfully processed: {filename}")
                    logger.info(f"Text preview: {result_data[:100]}...")
                    
                    await crud.update_status(task_id, 'completed', result=json.dumps(result))
                    
                    if os.path.exists(filepath):
                        os.remove(filepath)
                        logger.debug(f"Deleted image file: {filepath}")
                    
                except Exception as e:
                    logger.error(f"Failed to process {filename}: {str(e)}")
                    await crud.update_status(task_id, 'failed', error=str(e))
                    
            else:
                await asyncio.sleep(settings.POLL_INTERVAL)
                
        except Exception as e:
            logger.error(f"Worker error: {str(e)}")
            await asyncio.sleep(settings.POLL_INTERVAL)