File size: 2,262 Bytes
2299bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""
System controller - main entry point for async batch processing.
Receives messages and routes to appropriate handlers.
"""
from typing import Optional
from ..models import Message
from .queue import MessageQueue
from .batch import BatchProcessor
from .scheduler import BatchScheduler
from .monitor import SystemMonitor


class SystemController:
    """
    Main controller for async message processing.
    Coordinates queue, batch, scheduler, and monitoring.
    """
    
    def __init__(self):
        self.queue = MessageQueue()
        self.batch_processor = BatchProcessor()
        self.scheduler = BatchScheduler()
        self.monitor = SystemMonitor()
    
    async def receive_message(self, message: Message) -> bool:
        """
        Receive a message and add it to the processing queue.
        
        Args:
            message: Message to process
            
        Returns:
            True if queued successfully
        """
        try:
            await self.queue.enqueue(message)
            self.monitor.increment_queue_size()
            return True
        except Exception as e:
            self.monitor.log_error(f"Failed to queue message: {e}")
            return False
    
    async def trigger_batch(
        self,
        user_id: Optional[str] = None,
        group_id: Optional[str] = None
    ) -> dict:
        """
        Manually trigger batch processing.
        
        Args:
            user_id: Optional specific user to process
            group_id: Optional specific group to process
            
        Returns:
            Status dict with processing results
        """
        return await self.batch_processor.process_batch(user_id, group_id)
    
    async def get_status(self) -> dict:
        """Get system status."""
        return {
            "queue_size": self.queue.size(),
            "monitor": self.monitor.get_stats(),
            "scheduler_next": self.scheduler.get_next_run()
        }
    
    async def start_scheduler(self):
        """Start the batch scheduler."""
        await self.scheduler.start()
    
    async def stop_scheduler(self):
        """Stop the batch scheduler."""
        await self.scheduler.stop()


# Global controller instance
controller = SystemController()