File size: 3,005 Bytes
2299bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
Worker - process messages from queue (clean, group, compress).
"""
import asyncio
from typing import Optional, List
from datetime import datetime

from .queue import message_queue
from .batch import BatchProcessor
from .monitor import SystemMonitor


class Worker:
    """
    Background worker that processes messages from queue.
    Handles cleaning, grouping, and preparing for compression.
    """
    
    def __init__(self):
        self.batch_processor = BatchProcessor()
        self.monitor = SystemMonitor()
        self._running = False
        self._task: Optional[asyncio.Task] = None
    
    async def start(self, interval_seconds: int = 5):
        """
        Start the worker process.
        
        Args:
            interval_seconds: How often to check queue
        """
        self._running = True
        self._task = asyncio.create_task(self._run_loop(interval_seconds))
    
    async def stop(self):
        """Stop the worker."""
        self._running = False
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
    
    async def _run_loop(self, interval: int):
        """Main worker loop."""
        while self._running:
            try:
                await self._process_queue()
            except Exception as e:
                self.monitor.log_error(f"Worker error: {e}")
            
            await asyncio.sleep(interval)
    
    async def _process_queue(self):
        """Process pending messages in queue."""
        # Get batch of messages
        messages = await message_queue.get_batch(size=100)
        
        if not messages:
            return
        
        # Process each message
        for msg_data in messages:
            try:
                # Clean and normalize message
                cleaned = await self._clean_message(msg_data)
                
                # Add to batch processor
                await self.batch_processor.add_message(cleaned)
                
                self.monitor.increment_processed()
            except Exception as e:
                self.monitor.log_error(f"Message processing error: {e}")
        
        # Check if batch is ready to compress
        await self._check_batch_ready()
    
    async def _clean_message(self, msg_data: dict) -> dict:
        """Clean and normalize message data."""
        # In production, use formatter.clean_message
        content = msg_data.get("content", "")
        
        # Basic cleaning
        cleaned_content = content.strip()
        
        return {
            **msg_data,
            "content": cleaned_content,
            "processed_at": datetime.utcnow().isoformat()
        }
    
    async def _check_batch_ready(self):
        """Check if batch is ready for compression."""
        if self.batch_processor.is_ready():
            await self.batch_processor.compress_and_save()


# Global worker instance
worker = Worker()