File size: 3,005 Bytes
2299bb4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | """
Worker - process messages from queue (clean, group, compress).
"""
import asyncio
from typing import Optional, List
from datetime import datetime
from .queue import message_queue
from .batch import BatchProcessor
from .monitor import SystemMonitor
class Worker:
"""
Background worker that processes messages from queue.
Handles cleaning, grouping, and preparing for compression.
"""
def __init__(self):
self.batch_processor = BatchProcessor()
self.monitor = SystemMonitor()
self._running = False
self._task: Optional[asyncio.Task] = None
async def start(self, interval_seconds: int = 5):
"""
Start the worker process.
Args:
interval_seconds: How often to check queue
"""
self._running = True
self._task = asyncio.create_task(self._run_loop(interval_seconds))
async def stop(self):
"""Stop the worker."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run_loop(self, interval: int):
"""Main worker loop."""
while self._running:
try:
await self._process_queue()
except Exception as e:
self.monitor.log_error(f"Worker error: {e}")
await asyncio.sleep(interval)
async def _process_queue(self):
"""Process pending messages in queue."""
# Get batch of messages
messages = await message_queue.get_batch(size=100)
if not messages:
return
# Process each message
for msg_data in messages:
try:
# Clean and normalize message
cleaned = await self._clean_message(msg_data)
# Add to batch processor
await self.batch_processor.add_message(cleaned)
self.monitor.increment_processed()
except Exception as e:
self.monitor.log_error(f"Message processing error: {e}")
# Check if batch is ready to compress
await self._check_batch_ready()
async def _clean_message(self, msg_data: dict) -> dict:
"""Clean and normalize message data."""
# In production, use formatter.clean_message
content = msg_data.get("content", "")
# Basic cleaning
cleaned_content = content.strip()
return {
**msg_data,
"content": cleaned_content,
"processed_at": datetime.utcnow().isoformat()
}
async def _check_batch_ready(self):
"""Check if batch is ready for compression."""
if self.batch_processor.is_ready():
await self.batch_processor.compress_and_save()
# Global worker instance
worker = Worker() |