Khanna, Videh Rakesh Rakesh
feat: add device control, app automation, tests, and other updates
8576f52 | """ | |
| JARVIS Device Control — Universal smart device management. | |
| Controls appliances and electronics via every available protocol: | |
| • Bluetooth — pair, connect, send data to BT/BLE devices | |
| • WiFi/LAN — HTTP REST, Wake-on-LAN, mDNS/Bonjour discovery | |
| • HomeKit — Apple Home via Shortcuts bridge | |
| • MQTT — IoT devices (smart plugs, sensors, ESP32, etc.) | |
| • IR — infrared blaster control (Broadlink, etc.) | |
| • Network — scan, discover, ping devices | |
| All discovered devices are persisted in the device registry. | |
| """ | |
| import subprocess | |
| import os | |
| import json | |
| import socket | |
| import struct | |
| import threading | |
| import time | |
| from tools import tool | |
| # ═══════════════════════════════════════════════════════════════ | |
| # BLUETOOTH CONTROL | |
| # ═══════════════════════════════════════════════════════════════ | |
| def bluetooth_scan(duration: int = 5) -> str: | |
| duration = max(1, min(15, duration)) | |
| try: | |
| # Use blueutil for classic BT | |
| result = subprocess.run( | |
| ["blueutil", "--inquiry", str(duration)], | |
| capture_output=True, text=True, timeout=duration + 10, | |
| ) | |
| devices = [] | |
| if result.stdout.strip(): | |
| for line in result.stdout.strip().split("\n"): | |
| if line.strip(): | |
| devices.append(line.strip()) | |
| # Also list paired devices | |
| paired = subprocess.run( | |
| ["blueutil", "--paired"], | |
| capture_output=True, text=True, timeout=5, | |
| ) | |
| paired_list = [] | |
| if paired.stdout.strip(): | |
| for line in paired.stdout.strip().split("\n"): | |
| if line.strip(): | |
| paired_list.append(line.strip()) | |
| output = [] | |
| if devices: | |
| output.append(f"Discovered ({len(devices)}):") | |
| for d in devices: | |
| output.append(f" • {d}") | |
| if paired_list: | |
| output.append(f"\nPaired ({len(paired_list)}):") | |
| for d in paired_list: | |
| output.append(f" • {d}") | |
| if not output: | |
| return "No Bluetooth devices found. Ensure Bluetooth is on." | |
| return "\n".join(output) | |
| except FileNotFoundError: | |
| return "Install blueutil: brew install blueutil" | |
| except Exception as e: | |
| return f"Bluetooth scan error: {e}" | |
| def bluetooth_pair(mac: str) -> str: | |
| try: | |
| result = subprocess.run( | |
| ["blueutil", "--pair", mac, "--wait-connect", "10"], | |
| capture_output=True, text=True, timeout=15, | |
| ) | |
| if result.returncode == 0: | |
| # Register in device registry | |
| from device_registry import DeviceRegistry | |
| reg = DeviceRegistry() | |
| reg.add_device( | |
| name=f"BT-{mac[-8:].replace(':', '')}", | |
| device_type="bluetooth", | |
| protocol="bluetooth", | |
| mac_address=mac, | |
| ) | |
| return f"Paired with {mac}" | |
| return f"Pairing failed: {result.stderr.strip()}" | |
| except FileNotFoundError: | |
| return "Install blueutil: brew install blueutil" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def bluetooth_connect(mac: str) -> str: | |
| try: | |
| result = subprocess.run( | |
| ["blueutil", "--connect", mac, "--wait-connect", "10"], | |
| capture_output=True, text=True, timeout=15, | |
| ) | |
| return f"Connected to {mac}" if result.returncode == 0 else f"Connect failed: {result.stderr.strip()}" | |
| except FileNotFoundError: | |
| return "Install blueutil: brew install blueutil" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def bluetooth_disconnect(mac: str) -> str: | |
| try: | |
| result = subprocess.run( | |
| ["blueutil", "--disconnect", mac], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| return f"Disconnected {mac}" if result.returncode == 0 else f"Failed: {result.stderr.strip()}" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def bluetooth_info(mac: str) -> str: | |
| try: | |
| result = subprocess.run( | |
| ["blueutil", "--info", mac], | |
| capture_output=True, text=True, timeout=5, | |
| ) | |
| return result.stdout.strip() or "No info available" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| # ═══════════════════════════════════════════════════════════════ | |
| # NETWORK / WiFi DEVICE DISCOVERY | |
| # ═══════════════════════════════════════════════════════════════ | |
| def network_scan(subnet: str = "") -> str: | |
| try: | |
| if not subnet: | |
| # Auto-detect subnet from default route | |
| route = subprocess.run( | |
| ["route", "-n", "get", "default"], | |
| capture_output=True, text=True, timeout=5, | |
| ) | |
| gateway = "" | |
| for line in route.stdout.split("\n"): | |
| if "gateway" in line: | |
| gateway = line.split(":")[-1].strip() | |
| break | |
| if gateway: | |
| parts = gateway.rsplit(".", 1) | |
| subnet = f"{parts[0]}.0/24" | |
| else: | |
| subnet = "192.168.1.0/24" | |
| # ARP scan | |
| arp = subprocess.run( | |
| ["arp", "-a"], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| devices = [] | |
| for line in arp.stdout.split("\n"): | |
| if line.strip() and "(" in line: | |
| parts = line.strip() | |
| # Extract name, IP, MAC | |
| try: | |
| name_part = parts.split("(")[0].strip() | |
| ip = parts.split("(")[1].split(")")[0] | |
| mac_match = parts.split("at ")[-1].split(" ")[0] if "at " in parts else "" | |
| if ip and mac_match and mac_match != "(incomplete)": | |
| name = name_part if name_part and name_part != "?" else "" | |
| devices.append({"name": name, "ip": ip, "mac": mac_match}) | |
| except (IndexError, ValueError): | |
| continue | |
| if not devices: | |
| return "No devices found on network. Try: network_scan with a specific subnet." | |
| lines = [f"Network devices ({len(devices)}):"] | |
| for d in devices: | |
| name = f" ({d['name']})" if d['name'] else "" | |
| lines.append(f" • {d['ip']}{name} — MAC: {d['mac']}") | |
| return "\n".join(lines) | |
| except Exception as e: | |
| return f"Network scan error: {e}" | |
| def bonjour_discover(service_type: str = "all", duration: int = 3) -> str: | |
| duration = max(1, min(10, duration)) | |
| service_map = { | |
| "http": "_http._tcp.", | |
| "airplay": "_airplay._tcp.", | |
| "printer": "_ipp._tcp.", | |
| "homekit": "_hap._tcp.", | |
| "googlecast": "_googlecast._tcp.", | |
| "hue": "_hue._tcp.", | |
| "mqtt": "_mqtt._tcp.", | |
| "spotify": "_spotify-connect._tcp.", | |
| "ssh": "_ssh._tcp.", | |
| "smb": "_smb._tcp.", | |
| "raop": "_raop._tcp.", | |
| } | |
| if service_type == "all": | |
| services_to_scan = list(service_map.values()) | |
| elif service_type in service_map: | |
| services_to_scan = [service_map[service_type]] | |
| else: | |
| services_to_scan = [f"_{service_type}._tcp."] | |
| all_results = [] | |
| for svc in services_to_scan: | |
| try: | |
| result = subprocess.run( | |
| ["dns-sd", "-B", svc, "local."], | |
| capture_output=True, text=True, timeout=duration + 2, | |
| ) | |
| for line in result.stdout.split("\n"): | |
| if line.strip() and not line.startswith("Browsing") and not line.startswith("DATE") and not line.startswith("Timestamp"): | |
| parts = line.strip().split() | |
| if len(parts) >= 7: | |
| name = " ".join(parts[6:]) | |
| svc_type = parts[5] if len(parts) > 5 else svc | |
| all_results.append({"name": name, "type": svc_type}) | |
| except subprocess.TimeoutExpired: | |
| continue | |
| except Exception: | |
| continue | |
| if not all_results: | |
| return "No Bonjour/mDNS services found." | |
| lines = [f"Discovered services ({len(all_results)}):"] | |
| for r in all_results: | |
| lines.append(f" • {r['name']} ({r['type']})") | |
| return "\n".join(lines) | |
| def wake_on_lan(mac: str, broadcast: str = "255.255.255.255") -> str: | |
| try: | |
| mac_clean = mac.replace(":", "").replace("-", "").replace(".", "") | |
| if len(mac_clean) != 12: | |
| return f"Invalid MAC address: {mac}" | |
| mac_bytes = bytes.fromhex(mac_clean) | |
| magic_packet = b'\xff' * 6 + mac_bytes * 16 | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
| sock.sendto(magic_packet, (broadcast, 9)) | |
| sock.close() | |
| return f"Wake-on-LAN packet sent to {mac}" | |
| except Exception as e: | |
| return f"WoL error: {e}" | |
| def ping_device(host: str, count: int = 3) -> str: | |
| count = max(1, min(10, count)) | |
| try: | |
| result = subprocess.run( | |
| ["ping", "-c", str(count), "-t", "5", host], | |
| capture_output=True, text=True, timeout=count * 6, | |
| ) | |
| if result.returncode == 0: | |
| # Extract stats | |
| lines = result.stdout.strip().split("\n") | |
| stats = [l for l in lines if "packets" in l or "avg" in l.lower() or "round-trip" in l] | |
| return f"{host} is ONLINE\n" + "\n".join(stats) | |
| return f"{host} is OFFLINE/unreachable" | |
| except Exception as e: | |
| return f"Ping error: {e}" | |
| # ═══════════════════════════════════════════════════════════════ | |
| # HTTP SMART DEVICE CONTROL (Hue, Tapo, Tuya, generic REST) | |
| # ═══════════════════════════════════════════════════════════════ | |
| def http_device_request(url: str, method: str = "GET", body: str = "", headers: str = "") -> str: | |
| import httpx | |
| # Validate URL is local network only (security) | |
| from urllib.parse import urlparse | |
| parsed = urlparse(url) | |
| hostname = parsed.hostname or "" | |
| local_prefixes = ("192.168.", "10.", "172.16.", "172.17.", "172.18.", "172.19.", | |
| "172.20.", "172.21.", "172.22.", "172.23.", "172.24.", "172.25.", | |
| "172.26.", "172.27.", "172.28.", "172.29.", "172.30.", "172.31.", | |
| "127.", "localhost", "0.0.0.0") | |
| is_local = any(hostname.startswith(p) for p in local_prefixes) | |
| is_mdns = hostname.endswith(".local") | |
| if not is_local and not is_mdns: | |
| return f"BLOCKED: http_device_request only works with local network devices ({hostname} is not local). Use web_search or run_command for external requests." | |
| try: | |
| extra_headers = json.loads(headers) if headers else {} | |
| body_data = json.loads(body) if body else None | |
| with httpx.Client(timeout=10, verify=False) as client: | |
| if method.upper() == "GET": | |
| resp = client.get(url, headers=extra_headers) | |
| elif method.upper() == "POST": | |
| resp = client.post(url, json=body_data, headers=extra_headers) | |
| elif method.upper() == "PUT": | |
| resp = client.put(url, json=body_data, headers=extra_headers) | |
| elif method.upper() == "DELETE": | |
| resp = client.delete(url, headers=extra_headers) | |
| else: | |
| return f"Unknown method: {method}" | |
| result = f"HTTP {resp.status_code}" | |
| try: | |
| result += f"\n{json.dumps(resp.json(), indent=2)[:2000]}" | |
| except Exception: | |
| result += f"\n{resp.text[:2000]}" | |
| return result | |
| except Exception as e: | |
| return f"HTTP request error: {e}" | |
| def hue_control(bridge_ip: str, action: str, api_key: str = "", | |
| light_id: str = "1", value: str = "") -> str: | |
| import httpx | |
| base = f"http://{bridge_ip}/api" | |
| try: | |
| if action == "discover": | |
| # Press the bridge button first, then call this | |
| resp = httpx.post(f"{base}", json={"devicetype": "jarvis#macbook"}, timeout=5) | |
| data = resp.json() | |
| if isinstance(data, list) and data: | |
| if "success" in data[0]: | |
| key = data[0]["success"]["username"] | |
| return f"Hue API key: {key}\nSave this! Use it for all future Hue commands." | |
| elif "error" in data[0]: | |
| return f"Error: {data[0]['error']['description']} (press the bridge button first!)" | |
| return f"Unexpected response: {data}" | |
| if not api_key: | |
| return "API key required. Use action='discover' first (press bridge button)." | |
| if action == "list": | |
| resp = httpx.get(f"{base}/{api_key}/lights", timeout=5) | |
| lights = resp.json() | |
| lines = [f"Hue Lights ({len(lights)}):"] | |
| for lid, info in lights.items(): | |
| state = "ON" if info["state"]["on"] else "OFF" | |
| bri = info["state"].get("bri", "?") | |
| lines.append(f" {lid}: {info['name']} — {state} (brightness: {bri})") | |
| return "\n".join(lines) | |
| elif action in ("on", "off"): | |
| state = {"on": action == "on"} | |
| if light_id == "all": | |
| resp = httpx.put(f"{base}/{api_key}/groups/0/action", json=state, timeout=5) | |
| else: | |
| resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5) | |
| return f"Light {light_id}: {action}" | |
| elif action == "brightness": | |
| bri = max(0, min(254, int(value))) | |
| state = {"on": True, "bri": bri} | |
| if light_id == "all": | |
| resp = httpx.put(f"{base}/{api_key}/groups/0/action", json=state, timeout=5) | |
| else: | |
| resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5) | |
| return f"Light {light_id} brightness: {bri}/254" | |
| elif action == "color": | |
| # Convert hex to Hue xy color space (simplified) | |
| r = int(value[0:2], 16) / 255.0 | |
| g = int(value[2:4], 16) / 255.0 | |
| b = int(value[4:6], 16) / 255.0 | |
| # sRGB -> CIE XY (simplified conversion) | |
| X = r * 0.664511 + g * 0.154324 + b * 0.162028 | |
| Y = r * 0.283881 + g * 0.668433 + b * 0.047685 | |
| Z = r * 0.000088 + g * 0.072310 + b * 0.986039 | |
| total = X + Y + Z | |
| if total == 0: | |
| x_val, y_val = 0.0, 0.0 | |
| else: | |
| x_val = X / total | |
| y_val = Y / total | |
| state = {"on": True, "xy": [x_val, y_val]} | |
| resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5) | |
| return f"Light {light_id} color set to #{value}" | |
| return f"Unknown action: {action}" | |
| except Exception as e: | |
| return f"Hue error: {e}" | |
| # ═══════════════════════════════════════════════════════════════ | |
| # HOMEKIT / APPLE HOME (via Shortcuts bridge) | |
| # ═══════════════════════════════════════════════════════════════ | |
| def homekit_control(shortcut_name: str, input: str = "") -> str: | |
| try: | |
| cmd = ["shortcuts", "run", shortcut_name] | |
| if input: | |
| cmd.extend(["--input-text", input]) | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) | |
| if result.returncode == 0: | |
| output = result.stdout.strip() | |
| return f"Shortcut '{shortcut_name}' executed." + (f"\nOutput: {output}" if output else "") | |
| return f"Shortcut error: {result.stderr.strip()}" | |
| except subprocess.TimeoutExpired: | |
| return f"Shortcut '{shortcut_name}' timed out (may still be running)" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def homekit_list_shortcuts() -> str: | |
| try: | |
| result = subprocess.run( | |
| ["shortcuts", "list"], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| shortcuts = result.stdout.strip().split("\n") | |
| shortcuts = [s.strip() for s in shortcuts if s.strip()] | |
| if not shortcuts: | |
| return "No shortcuts found. Create them in Shortcuts.app." | |
| # Group by likely category | |
| homekit = [s for s in shortcuts if any(k in s.lower() for k in | |
| ["light", "lock", "thermostat", "scene", "home", "room", | |
| "plug", "switch", "fan", "door", "garage", "blind", | |
| "curtain", "ac", "heater", "alarm"])] | |
| other = [s for s in shortcuts if s not in homekit] | |
| lines = [f"Apple Shortcuts ({len(shortcuts)} total):"] | |
| if homekit: | |
| lines.append(f"\n Home/IoT ({len(homekit)}):") | |
| for s in homekit: | |
| lines.append(f" • {s}") | |
| if other: | |
| lines.append(f"\n Other ({len(other)}):") | |
| for s in other[:20]: | |
| lines.append(f" • {s}") | |
| if len(other) > 20: | |
| lines.append(f" ... and {len(other) - 20} more") | |
| return "\n".join(lines) | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def siri_command(command: str) -> str: | |
| """Route a command through Siri via AppleScript.""" | |
| safe_cmd = command.replace('"', '\\"') | |
| try: | |
| # Use AppleScript to invoke Siri | |
| result = subprocess.run([ | |
| "osascript", "-e", | |
| f''' | |
| tell application "System Events" | |
| -- Trigger Siri | |
| key code 49 using {{command down}} | |
| delay 1.5 | |
| keystroke "{safe_cmd}" | |
| delay 0.3 | |
| keystroke return | |
| end tell | |
| ''' | |
| ], capture_output=True, text=True, timeout=10) | |
| return f"Siri command sent: {command}" | |
| except Exception as e: | |
| return f"Siri error: {e}" | |
| # ═══════════════════════════════════════════════════════════════ | |
| # MQTT IoT CONTROL | |
| # ═══════════════════════════════════════════════════════════════ | |
| def mqtt_publish(broker: str, topic: str, message: str, port: int = 1883) -> str: | |
| # Try mosquitto_pub first (CLI) | |
| try: | |
| result = subprocess.run( | |
| ["mosquitto_pub", "-h", broker, "-p", str(port), "-t", topic, "-m", message], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| if result.returncode == 0: | |
| return f"MQTT published: {topic} ← {message}" | |
| return f"MQTT error: {result.stderr.strip()}" | |
| except FileNotFoundError: | |
| pass | |
| # Fallback: try paho-mqtt Python library | |
| try: | |
| import paho.mqtt.publish as publish | |
| publish.single(topic, payload=message, hostname=broker, port=port) | |
| return f"MQTT published: {topic} ← {message}" | |
| except ImportError: | |
| return "MQTT not available. Install: brew install mosquitto OR pip install paho-mqtt" | |
| except Exception as e: | |
| return f"MQTT error: {e}" | |
| def mqtt_subscribe(broker: str, topic: str, port: int = 1883, timeout: int = 5) -> str: | |
| timeout = max(1, min(30, timeout)) | |
| try: | |
| result = subprocess.run( | |
| ["mosquitto_sub", "-h", broker, "-p", str(port), "-t", topic, | |
| "-C", "1", "-W", str(timeout)], | |
| capture_output=True, text=True, timeout=timeout + 5, | |
| ) | |
| if result.stdout.strip(): | |
| return f"MQTT [{topic}]: {result.stdout.strip()}" | |
| return f"No message received on {topic} within {timeout}s" | |
| except FileNotFoundError: | |
| return "Install mosquitto: brew install mosquitto" | |
| except subprocess.TimeoutExpired: | |
| return f"No message on {topic} within {timeout}s" | |
| except Exception as e: | |
| return f"MQTT error: {e}" | |
| # ═══════════════════════════════════════════════════════════════ | |
| # DEVICE REGISTRY TOOLS (persistent storage) | |
| # ═══════════════════════════════════════════════════════════════ | |
| def register_device(name: str, device_type: str, protocol: str, | |
| address: str = "", mac_address: str = "", | |
| port: int = 0, room: str = "", config: str = "") -> str: | |
| from device_registry import DeviceRegistry | |
| reg = DeviceRegistry() | |
| cfg = json.loads(config) if config else {} | |
| device_id = reg.add_device(name, device_type, protocol, address, | |
| port, mac_address, cfg, room) | |
| return f"Device registered: #{device_id} '{name}' ({device_type}/{protocol}) in {room or 'unassigned'}" | |
| def list_devices(device_type: str = "", room: str = "", protocol: str = "") -> str: | |
| from device_registry import DeviceRegistry | |
| reg = DeviceRegistry() | |
| devices = reg.get_devices( | |
| device_type=device_type or None, | |
| room=room or None, | |
| protocol=protocol or None, | |
| ) | |
| if not devices: | |
| return "No registered devices." + (" Try different filters." if any([device_type, room, protocol]) else " Use register_device to add one.") | |
| lines = [f"Registered devices ({len(devices)}):"] | |
| current_room = None | |
| for d in devices: | |
| r = d["room"] or "Unassigned" | |
| if r != current_room: | |
| current_room = r | |
| lines.append(f"\n 📍 {current_room}:") | |
| status = d.get("status", "unknown") | |
| status_icon = {"online": "🟢", "offline": "🔴", "registered": "⚪", "unknown": "⚪"}.get(status, "⚪") | |
| addr = d.get("address") or d.get("mac_address") or "" | |
| addr_str = f" @ {addr}" if addr else "" | |
| lines.append(f" {status_icon} #{d['id']}: {d['name']} ({d['device_type']}/{d['protocol']}){addr_str}") | |
| return "\n".join(lines) | |
| def find_device(query: str) -> str: | |
| from device_registry import DeviceRegistry | |
| reg = DeviceRegistry() | |
| devices = reg.search_devices(query) | |
| if not devices: | |
| return f"No devices found matching '{query}'" | |
| lines = [f"Found {len(devices)} device(s):"] | |
| for d in devices: | |
| room = f" in {d['room']}" if d.get("room") else "" | |
| addr = d.get("address") or d.get("mac_address") or "" | |
| lines.append(f" #{d['id']}: {d['name']} ({d['device_type']}/{d['protocol']}){room}") | |
| if addr: | |
| lines.append(f" Address: {addr}") | |
| if d.get("config"): | |
| lines.append(f" Config: {json.dumps(d['config'])[:100]}") | |
| return "\n".join(lines) | |
| def remove_device(device_id: int) -> str: | |
| from device_registry import DeviceRegistry | |
| reg = DeviceRegistry() | |
| device = reg.get_device(device_id) | |
| if not device: | |
| return f"Device #{device_id} not found" | |
| reg.remove_device(device_id) | |
| return f"Removed device #{device_id}: {device['name']}" | |
| def control_device(device_name: str, action: str, value: str = "") -> str: | |
| from device_registry import DeviceRegistry | |
| reg = DeviceRegistry() | |
| # Find device by name or ID | |
| device = None | |
| try: | |
| device_id = int(device_name) | |
| device = reg.get_device(device_id) | |
| except ValueError: | |
| device = reg.get_device_by_name(device_name) | |
| if not device: | |
| matches = reg.search_devices(device_name) | |
| if len(matches) == 1: | |
| device = matches[0] | |
| elif len(matches) > 1: | |
| names = ", ".join(f"#{m['id']}: {m['name']}" for m in matches) | |
| return f"Multiple devices match '{device_name}': {names}. Be more specific or use device ID." | |
| if not device: | |
| return f"Device '{device_name}' not found. Use list_devices to see registered devices." | |
| protocol = device["protocol"] | |
| address = device.get("address", "") | |
| mac = device.get("mac_address", "") | |
| config = device.get("config", {}) | |
| # ── Bluetooth devices ── | |
| if protocol == "bluetooth": | |
| if not mac: | |
| return f"No MAC address for {device['name']}. Update device config." | |
| if action in ("connect", "on"): | |
| return bluetooth_connect(mac) | |
| elif action in ("disconnect", "off"): | |
| return bluetooth_disconnect(mac) | |
| elif action == "status": | |
| return bluetooth_info(mac) | |
| else: | |
| return f"Bluetooth action '{action}' not supported. Use: connect, disconnect, status" | |
| # ── WiFi/HTTP devices ── | |
| elif protocol in ("wifi", "http"): | |
| if not address: | |
| return f"No IP address for {device['name']}. Update device config." | |
| port = device.get("port", 80) | |
| base_url = f"http://{address}:{port}" if port != 80 else f"http://{address}" | |
| # Check for custom API endpoints in config | |
| endpoints = config.get("endpoints", {}) | |
| if action in endpoints: | |
| url = f"{base_url}{endpoints[action]}" | |
| return http_device_request(url, method=endpoints.get(f"{action}_method", "POST")) | |
| # Generic REST control | |
| if action in ("on", "off", "toggle"): | |
| # Try common smart plug/switch APIs | |
| for path in ["/relay/0", "/switch", "/api/relay", f"/cm?cmnd=Power%20{action.upper()}"]: | |
| try: | |
| result = http_device_request(f"{base_url}{path}", method="POST", | |
| body=json.dumps({"state": action})) | |
| if "200" in result or "ok" in result.lower(): | |
| reg.update_device_status(device["id"], "online") | |
| return f"{device['name']}: {action} — {result[:100]}" | |
| except Exception: | |
| continue | |
| return f"Could not find working API for {device['name']}. Register endpoints in config." | |
| elif action == "status": | |
| return http_device_request(f"{base_url}/status", method="GET") | |
| else: | |
| # Pass through as API call | |
| return http_device_request(f"{base_url}/{action}", method="POST", | |
| body=json.dumps({"value": value}) if value else "") | |
| # ── MQTT devices ── | |
| elif protocol == "mqtt": | |
| broker = config.get("broker", address or "localhost") | |
| mqtt_port = config.get("mqtt_port", 1883) | |
| topic_prefix = config.get("topic_prefix", f"cmnd/{device['name'].lower().replace(' ', '_')}") | |
| if action in ("on", "off"): | |
| topic = config.get("power_topic", f"{topic_prefix}/POWER") | |
| return mqtt_publish(broker, topic, action.upper(), mqtt_port) | |
| elif action == "toggle": | |
| topic = config.get("power_topic", f"{topic_prefix}/POWER") | |
| return mqtt_publish(broker, topic, "TOGGLE", mqtt_port) | |
| elif action == "status": | |
| topic = config.get("status_topic", f"stat/{device['name'].lower().replace(' ', '_')}/POWER") | |
| return mqtt_subscribe(broker, topic, mqtt_port, timeout=3) | |
| elif action == "brightness" and value: | |
| topic = config.get("dimmer_topic", f"{topic_prefix}/Dimmer") | |
| return mqtt_publish(broker, topic, value, mqtt_port) | |
| elif action == "color" and value: | |
| topic = config.get("color_topic", f"{topic_prefix}/Color") | |
| return mqtt_publish(broker, topic, value, mqtt_port) | |
| else: | |
| # Custom command | |
| topic = f"{topic_prefix}/{action}" | |
| return mqtt_publish(broker, topic, value or action.upper(), mqtt_port) | |
| # ── HomeKit devices ── | |
| elif protocol == "homekit": | |
| shortcut = config.get("shortcut", f"{device['name']} {action.title()}") | |
| return homekit_control(shortcut, input=value) | |
| # ── IR devices ── | |
| elif protocol == "ir": | |
| # Broadlink or other IR blaster | |
| ir_device = config.get("ir_device", address) | |
| code = config.get("codes", {}).get(action, "") | |
| if not code: | |
| return f"No IR code for '{action}' on {device['name']}. Add codes to device config." | |
| if ir_device: | |
| # Try broadlink_cli | |
| try: | |
| result = subprocess.run( | |
| ["broadlink_cli", "--device", ir_device, "--send", code], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| return f"IR sent: {action} to {device['name']}" | |
| except FileNotFoundError: | |
| return "Install broadlink_cli: pip install broadlink" | |
| return f"No IR blaster configured for {device['name']}" | |
| else: | |
| return f"Protocol '{protocol}' not yet supported for direct control. Use http_device_request or mqtt_publish directly." | |
| # ═══════════════════════════════════════════════════════════════ | |
| # SMART HOME SCENES & ROUTINES | |
| # ═══════════════════════════════════════════════════════════════ | |
| def smart_home_scene(scene: str) -> str: | |
| built_in = { | |
| "movie": [ | |
| ("set_volume", {"level": 40}), | |
| ("toggle_dark_mode", {"enable": True}), | |
| ("set_brightness", {"level": 0.2}), | |
| ("do_not_disturb", {"enable": True}), | |
| ], | |
| "sleep": [ | |
| ("set_volume", {"level": 0}), | |
| ("do_not_disturb", {"enable": True}), | |
| ("set_brightness", {"level": 0.0}), | |
| ], | |
| "morning": [ | |
| ("set_brightness", {"level": 0.8}), | |
| ("toggle_dark_mode", {"enable": False}), | |
| ("do_not_disturb", {"enable": False}), | |
| ("set_volume", {"level": 50}), | |
| ], | |
| "work": [ | |
| ("do_not_disturb", {"enable": True}), | |
| ("set_brightness", {"level": 0.7}), | |
| ("toggle_dark_mode", {"enable": True}), | |
| ], | |
| "party": [ | |
| ("set_volume", {"level": 80}), | |
| ("set_brightness", {"level": 1.0}), | |
| ("do_not_disturb", {"enable": False}), | |
| ], | |
| "away": [ | |
| ("lock_screen", {}), | |
| ("do_not_disturb", {"enable": True}), | |
| ], | |
| } | |
| if scene.lower() in built_in: | |
| from tools import TOOL_REGISTRY | |
| results = [] | |
| for tool_name, args in built_in[scene.lower()]: | |
| if tool_name in TOOL_REGISTRY: | |
| try: | |
| r = TOOL_REGISTRY[tool_name]["function"](**args) | |
| results.append(f" ✓ {tool_name}: {r}") | |
| except Exception as e: | |
| results.append(f" ✗ {tool_name}: {e}") | |
| return f"Scene '{scene}' activated:\n" + "\n".join(results) | |
| else: | |
| # Try as Apple Shortcut | |
| return homekit_control(scene) | |
| def airdrop_file(file_path: str) -> str: | |
| path = os.path.expanduser(file_path) | |
| if not os.path.exists(path): | |
| return f"File not found: {path}" | |
| try: | |
| subprocess.run([ | |
| "osascript", "-e", | |
| f''' | |
| tell application "Finder" | |
| set theFile to POSIX file "{path}" as alias | |
| set theWindow to make new Finder window | |
| set target of theWindow to theFile | |
| end tell | |
| -- Open sharing menu | |
| tell application "System Events" | |
| keystroke "r" using {{command down, shift down}} | |
| delay 1 | |
| end tell | |
| ''' | |
| ], capture_output=True, text=True, timeout=10) | |
| return f"AirDrop sharing opened for: {path}" | |
| except Exception as e: | |
| return f"AirDrop error: {e}" | |
| def find_my_devices() -> str: | |
| try: | |
| # Find My data is cached locally | |
| findmy_dir = os.path.expanduser("~/Library/Caches/com.apple.findmy.fmipcore/Items.data") | |
| devices_dir = os.path.expanduser("~/Library/Caches/com.apple.findmy.fmipcore/Devices.data") | |
| results = [] | |
| for path, label in [(devices_dir, "Devices"), (findmy_dir, "Items")]: | |
| if os.path.exists(path): | |
| try: | |
| with open(path, "r") as f: | |
| data = json.load(f) | |
| for item in data: | |
| name = item.get("name", item.get("serialNumber", "Unknown")) | |
| loc = item.get("location", {}) | |
| battery = item.get("batteryLevel", "?") | |
| lat = loc.get("latitude", "?") | |
| lon = loc.get("longitude", "?") | |
| results.append(f" • {name} — Battery: {battery} — Location: {lat}, {lon}") | |
| except Exception: | |
| continue | |
| if results: | |
| return "Find My Devices:\n" + "\n".join(results) | |
| return "No Find My data found. Ensure Find My is enabled and signed in." | |
| except Exception as e: | |
| return f"Find My error: {e}" | |