File size: 3,222 Bytes
f26de06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Screen Service - Simple screenshot capture.
Just captures screen and returns base64 image.
"""

import base64
import io
import logging
import time
from typing import Optional, Dict, Any
from dataclasses import dataclass

try:
    import mss
    MSS_AVAILABLE = True
except ImportError:
    MSS_AVAILABLE = False

from PIL import Image

logger = logging.getLogger(__name__)


@dataclass
class ScreenCapture:
    """Represents a captured screen frame."""
    timestamp: float
    image_b64: str
    width: int
    height: int


class ScreenService:
    """Simple screen capture service."""
    
    def __init__(
        self,
        monitor: int = 1,
        max_width: int = 1920,
        max_height: int = 1080,
        compression_quality: int = 85,
    ):
        self.monitor = monitor
        self.max_width = max_width
        self.max_height = max_height
        self.compression_quality = compression_quality
        
        if not MSS_AVAILABLE:
            logger.warning("mss not available. Screen capture disabled.")

    def is_available(self) -> bool:
        """Check if screen capture is available."""
        return MSS_AVAILABLE

    def _process_image(self, img: Image.Image) -> Image.Image:
        """Process and resize image."""
        if img.mode != "RGB":
            img = img.convert("RGB")
        w, h = img.size
        ar = w / h
        if w > self.max_width or h > self.max_height:
            if ar > 1:
                new_w = min(w, self.max_width)
                new_h = int(new_w / ar)
            else:
                new_h = min(h, self.max_height)
                new_w = int(new_h * ar)
            img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
        return img

    def _image_to_base64(self, img: Image.Image) -> str:
        """Convert image to base64 string."""
        buf = io.BytesIO()
        img.save(buf, format="JPEG", quality=self.compression_quality, optimize=True)
        return base64.b64encode(buf.getvalue()).decode("utf-8")

    def capture(self) -> Optional[ScreenCapture]:
        """
        Capture a screenshot and return base64.
        
        Returns:
            ScreenCapture object or None if failed
        """
        if not MSS_AVAILABLE:
            logger.error("Screen capture not available")
            return None
        
        try:
            with mss.mss() as sct:
                mon = sct.monitors[self.monitor]
                frame = sct.grab(mon)
                pil = Image.frombytes("RGB", frame.size, frame.bgra, "raw", "BGRX")
                pil = self._process_image(pil)
                b64 = self._image_to_base64(pil)
                
                return ScreenCapture(
                    timestamp=time.time(),
                    image_b64=b64,
                    width=pil.width,
                    height=pil.height
                )
        except Exception as e:
            logger.error(f"Screen capture error: {e}")
            return None


# Singleton
_instance: Optional[ScreenService] = None

def get_screen_service() -> ScreenService:
    """Get singleton screen service."""
    global _instance
    if _instance is None:
        _instance = ScreenService()
    return _instance