import os import ctypes import subprocess from typing import Dict, Any, Optional, List from openspace.utils.logging import Logger from PIL import Image, ImageGrab try: from pywinauto import Desktop import win32ui import win32gui import win32con import pygetwindow as gw WINDOWS_LIBS_AVAILABLE = True except ImportError: WINDOWS_LIBS_AVAILABLE = False logger = Logger.get_logger(__name__) class WindowsAdapter: """Windows platform-specific functionality adapter""" def __init__(self): if not WINDOWS_LIBS_AVAILABLE: logger.warning("Windows libraries are not fully installed, some features may not be available") self.available = WINDOWS_LIBS_AVAILABLE def capture_screenshot_with_cursor(self, output_path: str) -> bool: """ Capture screenshot using ImageGrab (including cursor) Args: output_path: Output file path Returns: Whether successful """ try: # Use ImageGrab to capture screenshot img = ImageGrab.grab(bbox=None, include_layered_windows=True) # Try to add cursor try: if WINDOWS_LIBS_AVAILABLE: cursor, hotspot = self._get_cursor() if cursor: # Get scaling ratio ratio = ctypes.windll.shcore.GetScaleFactorForDevice(0) / 100 pos_win = win32gui.GetCursorPos() pos = ( round(pos_win[0] * ratio - hotspot[0]), round(pos_win[1] * ratio - hotspot[1]) ) img.paste(cursor, pos, cursor) logger.info("Windows screenshot successfully (with cursor)") else: logger.info("Windows screenshot successfully (without cursor)") except Exception as e: logger.warning(f"Cannot add cursor to screenshot: {e}") logger.info("Windows screenshot successfully (without cursor)") img.save(output_path) return True except Exception as e: logger.error(f"Windows screenshot failed: {e}") return False def _get_cursor(self) -> tuple: """ Get current cursor image and hotspot Returns: (cursor_image, (hotspot_x, hotspot_y)) """ try: hcursor = win32gui.GetCursorInfo()[1] hdc = win32ui.CreateDCFromHandle(win32gui.GetDC(0)) hbmp = win32ui.CreateBitmap() hbmp.CreateCompatibleBitmap(hdc, 36, 36) hdc_compatible = hdc.CreateCompatibleDC() hdc_compatible.SelectObject(hbmp) hdc_compatible.DrawIcon((0, 0), hcursor) bmpinfo = hbmp.GetInfo() bmpstr = hbmp.GetBitmapBits(True) cursor = Image.frombuffer( 'RGB', (bmpinfo['bmWidth'], bmpinfo['bmHeight']), bmpstr, 'raw', 'BGRX', 0, 1 ).convert("RGBA") win32gui.DestroyIcon(hcursor) win32gui.DeleteObject(hbmp.GetHandle()) hdc_compatible.DeleteDC() # Make black pixels transparent pixdata = cursor.load() width, height = cursor.size for y in range(height): for x in range(width): if pixdata[x, y] == (0, 0, 0, 255): pixdata[x, y] = (0, 0, 0, 0) hotspot = win32gui.GetIconInfo(hcursor)[1:3] return (cursor, hotspot) except Exception as e: logger.debug(f"Failed to get cursor image: {e}") return (None, (0, 0)) def activate_window(self, window_name: str, strict: bool = False) -> Dict[str, Any]: """ Activate window (Windows uses pygetwindow) Args: window_name: Window title strict: Whether to strictly match Returns: Result dictionary """ if not WINDOWS_LIBS_AVAILABLE: return {'status': 'error', 'message': 'Windows libraries not available'} try: windows = gw.getWindowsWithTitle(window_name) if not windows: logger.warning(f"Window not found: {window_name}") return {'status': 'error', 'message': f'Window {window_name} not found'} window = None if strict: # Strict match for wnd in windows: if wnd.title == window_name: window = wnd break if not window: return {'status': 'error', 'message': f'Window {window_name} not found (strict mode)'} else: window = windows[0] window.activate() logger.info(f"Windows window activated successfully: {window_name}") return {'status': 'success', 'message': 'Window activated'} except Exception as e: logger.error(f"Windows window activation failed: {e}") return {'status': 'error', 'message': str(e)} def close_window(self, window_name: str, strict: bool = False) -> Dict[str, Any]: """ Close window (Windows uses pygetwindow) Args: window_name: Window title strict: Whether to strictly match Returns: Result dictionary """ if not WINDOWS_LIBS_AVAILABLE: return {'status': 'error', 'message': 'Windows libraries not available'} try: windows = gw.getWindowsWithTitle(window_name) if not windows: logger.warning(f"Window not found: {window_name}") return {'status': 'error', 'message': f'Window {window_name} not found'} window = None if strict: for wnd in windows: if wnd.title == window_name: window = wnd break if not window: return {'status': 'error', 'message': f'Window {window_name} not found (strict mode)'} else: window = windows[0] window.close() logger.info(f"Windows window closed successfully: {window_name}") return {'status': 'success', 'message': 'Window closed'} except Exception as e: logger.error(f"Windows window close failed: {e}") return {'status': 'error', 'message': str(e)} def get_accessibility_tree(self, max_depth: int = 10, max_width: int = 50) -> Dict[str, Any]: """ Get Windows accessibility tree (using pywinauto) Args: max_depth: Maximum depth max_width: Maximum number of child elements per level Returns: Accessibility tree data """ if not WINDOWS_LIBS_AVAILABLE: return {'error': 'Windows accessibility libraries not available'} try: # Get desktop desktop = Desktop(backend="uia") # Serialize accessibility tree tree = self._serialize_uia_element( desktop, depth=0, max_depth=max_depth, max_width=max_width, visited=set() ) return { 'tree': tree, 'platform': 'Windows' } except Exception as e: logger.error(f"Windows get accessibility tree failed: {e}") return {'error': str(e)} def _serialize_uia_element( self, element, depth: int = 0, max_depth: int = 10, max_width: int = 50, visited: set = None ) -> Optional[Dict[str, Any]]: """ Serialize Windows UIA element to dictionary Args: element: UIA element depth: Current depth max_depth: Maximum depth max_width: Maximum width visited: Set of visited elements Returns: Serialized dictionary """ if visited is None: visited = set() if depth > max_depth or element in visited: return None visited.add(element) try: result = { 'depth': depth } # Get basic attributes try: result['class_name'] = element.class_name() except: result['class_name'] = 'unknown' try: result['name'] = element.window_text() except: result['name'] = '' # Get states states = {} state_methods = [ 'is_enabled', 'is_visible', 'is_minimized', 'is_maximized', 'is_focused', 'is_checked', 'is_selected' ] for method_name in state_methods: if hasattr(element, method_name): try: method = getattr(element, method_name) states[method_name] = method() except: pass if states: result['states'] = states # Get position and size try: rectangle = element.rectangle() result['position'] = { 'left': rectangle.left, 'top': rectangle.top } result['size'] = { 'width': rectangle.width(), 'height': rectangle.height() } except: pass # Recursively get child elements result['children'] = [] try: children = element.children() for i, child in enumerate(children[:max_width]): try: child_data = self._serialize_uia_element( child, depth + 1, max_depth, max_width, visited ) if child_data: result['children'].append(child_data) except Exception as e: logger.debug(f"Cannot serialize child element {i}: {e}") continue except Exception as e: logger.debug(f"Cannot get child elements: {e}") return result except Exception as e: logger.debug(f"Failed to serialize element (depth={depth}): {e}") return None def list_windows(self) -> List[Dict[str, Any]]: """ List all windows Returns: Window list """ if not WINDOWS_LIBS_AVAILABLE: return [] try: windows = gw.getAllWindows() return [ { 'title': win.title, 'left': win.left, 'top': win.top, 'width': win.width, 'height': win.height, 'visible': win.visible, 'active': win.isActive } for win in windows if win.title # Only return windows with titles ] except Exception as e: logger.error(f"List windows failed: {e}") return [] def set_wallpaper(self, image_path: str) -> Dict[str, Any]: """ Set desktop wallpaper Args: image_path: Image path Returns: Result dictionary """ try: image_path = os.path.expanduser(image_path) image_path = os.path.abspath(image_path) if not os.path.exists(image_path): return {'status': 'error', 'message': f'Image not found: {image_path}'} # Use Windows API to set wallpaper SPI_SETDESKWALLPAPER = 20 ctypes.windll.user32.SystemParametersInfoW( SPI_SETDESKWALLPAPER, 0, image_path, 3 # SPIF_UPDATEINIFILE | SPIF_SENDCHANGE ) logger.info(f"Windows wallpaper set successfully: {image_path}") return {'status': 'success', 'message': 'Wallpaper set successfully'} except Exception as e: logger.error(f"Windows set wallpaper failed: {e}") return {'status': 'error', 'message': str(e)} def get_system_info(self) -> Dict[str, Any]: """ Get Windows system information Returns: System information dictionary """ try: import platform as plat return { 'platform': 'Windows', 'version': plat.version(), 'release': plat.release(), 'edition': plat.win32_edition() if hasattr(plat, 'win32_edition') else 'Unknown', 'available': self.available } except Exception as e: logger.error(f"Failed to get system information: {e}") return { 'platform': 'Windows', 'error': str(e) } def start_recording(self, output_path: str) -> Dict[str, Any]: try: try: result = subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True, timeout=5, creationflags=subprocess.CREATE_NO_WINDOW) except (subprocess.CalledProcessError, FileNotFoundError): return { 'status': 'error', 'message': 'ffmpeg not installed. Download from: https://ffmpeg.org/download.html' } try: user32 = ctypes.windll.user32 width = user32.GetSystemMetrics(0) # SM_CXSCREEN height = user32.GetSystemMetrics(1) # SM_CYSCREEN except: width, height = 1920, 1080 command = [ 'ffmpeg', '-y', '-f', 'gdigrab', '-draw_mouse', '1', '-framerate', '30', '-video_size', f'{width}x{height}', '-i', 'desktop', '-c:v', 'libx264', '-preset', 'ultrafast', '-pix_fmt', 'yuv420p', '-r', '30', output_path ] process = subprocess.Popen( command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) import time time.sleep(1) if process.poll() is not None: error_output = process.stderr.read() if process.stderr else "Unknown error" return { 'status': 'error', 'message': f'Failed to start recording: {error_output}' } logger.info(f"Windows recording started: {output_path}") return { 'status': 'success', 'message': 'Recording started', 'process': process } except Exception as e: logger.error(f"Windows start recording failed: {e}") return { 'status': 'error', 'message': str(e) } def stop_recording(self, process) -> Dict[str, Any]: try: if not process or process.poll() is not None: return { 'status': 'error', 'message': 'No recording in progress' } import signal try: process.send_signal(signal.CTRL_C_EVENT) except: process.terminate() try: process.wait(timeout=15) except subprocess.TimeoutExpired: logger.warning("ffmpeg did not respond, killing process") process.kill() process.wait() logger.info("Windows recording stopped successfully") return { 'status': 'success', 'message': 'Recording stopped' } except Exception as e: logger.error(f"Windows stop recording failed: {e}") return { 'status': 'error', 'message': str(e) } def get_running_applications(self) -> List[Dict[str, str]]: """ Get list of all running applications Returns: Application list """ if not WINDOWS_LIBS_AVAILABLE: return [] try: import psutil apps = [] seen_names = set() for proc in psutil.process_iter(['pid', 'name', 'exe']): try: pinfo = proc.info name = pinfo['name'] exe = pinfo['exe'] # Skip system processes if not exe or name in ['System', 'Registry', 'svchost.exe', 'csrss.exe']: continue # Skip duplicates if name in seen_names: continue seen_names.add(name) apps.append({ 'name': name, 'pid': pinfo['pid'], 'path': exe or '' }) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass return apps except ImportError: logger.warning("psutil not installed, cannot get running applications") return [] except Exception as e: logger.error(f"Failed to get running applications list: {e}") return [] def get_screen_size(self) -> Dict[str, int]: """ Get screen size Returns: Screen size dictionary """ try: user32 = ctypes.windll.user32 width = user32.GetSystemMetrics(0) # SM_CXSCREEN height = user32.GetSystemMetrics(1) # SM_CYSCREEN return {'width': width, 'height': height} except Exception as e: logger.error(f"Failed to get screen size: {e}") return {'width': 1920, 'height': 1080} # Default value def get_terminal_output(self) -> Optional[str]: """ Get terminal output (Windows Command Prompt, PowerShell, or Windows Terminal) Note: Due to Windows architecture, getting terminal output is complex. This method attempts to find active console windows. Returns: Terminal output content (limited functionality on Windows) """ try: # Windows doesn't provide easy access to terminal content like Linux/macOS # This is a limitation of the Windows platform # We can try to use PowerShell to get recent command history # Try to get PowerShell history try: history_path = os.path.expanduser( '~\\AppData\\Roaming\\Microsoft\\Windows\\PowerShell\\PSReadLine\\ConsoleHost_history.txt' ) if os.path.exists(history_path): with open(history_path, 'r', encoding='utf-8', errors='ignore') as f: # Get last 50 lines lines = f.readlines() recent_history = ''.join(lines[-50:]) if recent_history: return f"PowerShell History (last 50 commands):\n{recent_history}" except Exception as e: logger.debug(f"Cannot read PowerShell history: {e}") # Try to get Command Prompt history using doskey try: result = subprocess.run( ['doskey', '/history'], capture_output=True, text=True, timeout=2, creationflags=subprocess.CREATE_NO_WINDOW ) if result.returncode == 0 and result.stdout: return f"Command Prompt History:\n{result.stdout}" except Exception as e: logger.debug(f"Cannot get Command Prompt history: {e}") logger.warning("Windows terminal output is limited - only command history available") return None except Exception as e: logger.error(f"Failed to get terminal output: {e}") return None