File size: 7,813 Bytes
437df61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import asyncio
import os
import json
import shlex
import re
from app.core.config import settings
from custom_logger import logger_config as logger
from app.db import crud

worker_task = None
worker_running = False

def is_worker_running():
    return worker_running

async def start_worker():
    global worker_task, worker_running
    
    logger.info(f"start_worker called: worker_running={worker_running}")
    
    if not worker_running:
        worker_running = True
        worker_task = asyncio.create_task(worker_loop())
        logger.info("Worker task started")
    else:
        logger.info("Worker already running")

async def worker_loop():
    global worker_running
    logger.info("STT Worker started. Monitoring for new audio files...")
    
    while worker_running:
        logger.debug("Worker loop iteration, checking for files...")
        await crud.cleanup_old_entries()
        
        try:
            row = await crud.get_next_not_started()
            
            if row:
                task_id = row['id']
                filepath = row['filepath']
                filename = row['filename']
                
                logger.info(f"\n{'='*60}\nProcessing: {filename}\nID: {task_id}\n{'='*60}")
                
                await crud.update_status(task_id, 'processing')
                
                try:
                    await crud.update_progress(task_id, 5, "Starting STT...")
                    
                    command = f"cd {settings.CWD} && {settings.PYTHON_PATH} --input {shlex.quote(os.path.abspath(filepath))} --model {settings.STT_MODEL_NAME}"
                    
                    logger.debug(f"Executing command: {command}")
                    
                    process = await asyncio.create_subprocess_shell(
                        command,
                        stdout=asyncio.subprocess.PIPE,
                        stderr=asyncio.subprocess.STDOUT,
                        cwd=settings.CWD,
                        env={
                            **os.environ,
                            'PYTHONUNBUFFERED': '1',
                            'CUDA_LAUNCH_BLOCKING': '1',
                            'USE_CPU_IF_POSSIBLE': 'true'
                        }
                    )
                    
                    current_chunk = 1
                    total_chunks = 1
                    
                    while True:
                        line = await process.stdout.readline()
                        if not line:
                            break
                            
                        line_str = line.decode('utf-8', errors='replace').strip()
                        if line_str:
                            logger.info(f"[STT] {line_str}")
                            
                            # Track chunk progress
                            chunk_match = re.search(r'Processing chunk (\d+)/(\d+)', line_str)
                            if chunk_match:
                                try:
                                    current_chunk = int(chunk_match.group(1))
                                    total_chunks = int(chunk_match.group(2))
                                except: pass
                            
                            # Generic percentage matcher
                            percent_match = re.search(r'(\d+)%', line_str)
                            if percent_match:
                                try:
                                    percent = int(percent_match.group(1))
                                    if 'audio' in line_str.lower() or 'extract' in line_str.lower():
                                        await crud.update_progress(task_id, percent // 2, "Extracting audio...")
                                    elif 'transcrib' in line_str.lower() or 'model' in line_str.lower():
                                        # Calculate overall transcription progress based on chunks
                                        chunk_base = ((current_chunk - 1) / total_chunks) * 100
                                        chunk_progress = (percent / total_chunks)
                                        overall_transcription_progress = chunk_base + chunk_progress
                                        
                                        # Remap so 50-100% of the overall bar is transcription
                                        overall_progress = int(50 + (overall_transcription_progress / 2))
                                        await crud.update_progress(task_id, overall_progress, f"Transcribing... (Chunk {current_chunk}/{total_chunks})")
                                    else:
                                        await crud.update_progress(task_id, percent, "Processing...")
                                except: pass
                                
                            # Stage matchers
                            if 'initializing nemo asr' in line_str.lower():
                                await crud.update_progress(task_id, 10, "Initializing engine...")
                            elif 'extracting audio' in line_str.lower():
                                await crud.update_progress(task_id, 15, "Extracting audio...")
                            elif 'model loaded' in line_str.lower():
                                await crud.update_progress(task_id, 25, "Model loaded...")
                            elif 'processing audio duration' in line_str.lower():
                                await crud.update_progress(task_id, 35, "Analyzing audio...")
                            elif 'transcription started' in line_str.lower() and total_chunks == 1:
                                await crud.update_progress(task_id, 50, "Transcribing started...")
                            elif 'transcription completed successfully' in line_str.lower():
                                await crud.update_progress(task_id, 90, "Transcription finished.")
                            elif 'json transcription saved' in line_str.lower():
                                await crud.update_progress(task_id, 95, "Saving data...")
                    
                    await process.wait()
                    if process.returncode != 0:
                        raise Exception(f"STT process failed with return code {process.returncode}")
                    
                    await crud.update_progress(task_id, 98, "Reading results...")
                    
                    output_path = os.path.join(settings.CWD, settings.TEMP_DIR, 'output_transcription.json')
                    with open(output_path, 'r') as file:
                        result = json.loads(file.read().strip())
                    
                    # Extract result text (caption)
                    result_data = result.get('text', '') or result.get('transcription', '') or str(result)
                    
                    logger.success(f"Successfully processed: {filename}")
                    logger.info(f"Text preview: {result_data[:100]}...")
                    
                    await crud.update_status(task_id, 'completed', result=json.dumps(result))
                    
                    if os.path.exists(filepath):
                        os.remove(filepath)
                        logger.debug(f"Deleted audio file: {filepath}")
                    
                except Exception as e:
                    logger.error(f"Failed to process {filename}: {str(e)}")
                    await crud.update_status(task_id, 'failed', error=str(e))
                    
            else:
                await asyncio.sleep(settings.POLL_INTERVAL)
                
        except Exception as e:
            logger.error(f"Worker error: {str(e)}")
            await asyncio.sleep(settings.POLL_INTERVAL)