""" JARVIS Device Control — Universal smart device management. Controls appliances and electronics via every available protocol: • Bluetooth — pair, connect, send data to BT/BLE devices • WiFi/LAN — HTTP REST, Wake-on-LAN, mDNS/Bonjour discovery • HomeKit — Apple Home via Shortcuts bridge • MQTT — IoT devices (smart plugs, sensors, ESP32, etc.) • IR — infrared blaster control (Broadlink, etc.) • Network — scan, discover, ping devices All discovered devices are persisted in the device registry. """ import subprocess import os import json import socket import struct import threading import time from tools import tool # ═══════════════════════════════════════════════════════════════ # BLUETOOTH CONTROL # ═══════════════════════════════════════════════════════════════ @tool( name="bluetooth_scan", description="Scan for nearby Bluetooth devices (classic + BLE). Returns list of discovered devices.", parameters={ "type": "object", "properties": { "duration": {"type": "integer", "description": "Scan duration in seconds (default 5, max 15)"}, }, }, ) def bluetooth_scan(duration: int = 5) -> str: duration = max(1, min(15, duration)) try: # Use blueutil for classic BT result = subprocess.run( ["blueutil", "--inquiry", str(duration)], capture_output=True, text=True, timeout=duration + 10, ) devices = [] if result.stdout.strip(): for line in result.stdout.strip().split("\n"): if line.strip(): devices.append(line.strip()) # Also list paired devices paired = subprocess.run( ["blueutil", "--paired"], capture_output=True, text=True, timeout=5, ) paired_list = [] if paired.stdout.strip(): for line in paired.stdout.strip().split("\n"): if line.strip(): paired_list.append(line.strip()) output = [] if devices: output.append(f"Discovered ({len(devices)}):") for d in devices: output.append(f" • {d}") if paired_list: output.append(f"\nPaired ({len(paired_list)}):") for d in paired_list: output.append(f" • {d}") if not output: return "No Bluetooth devices found. Ensure Bluetooth is on." return "\n".join(output) except FileNotFoundError: return "Install blueutil: brew install blueutil" except Exception as e: return f"Bluetooth scan error: {e}" @tool( name="bluetooth_pair", description="Pair with a Bluetooth device by MAC address", parameters={ "type": "object", "properties": { "mac": {"type": "string", "description": "Device MAC address (e.g. AA:BB:CC:DD:EE:FF)"}, }, "required": ["mac"], }, ) def bluetooth_pair(mac: str) -> str: try: result = subprocess.run( ["blueutil", "--pair", mac, "--wait-connect", "10"], capture_output=True, text=True, timeout=15, ) if result.returncode == 0: # Register in device registry from device_registry import DeviceRegistry reg = DeviceRegistry() reg.add_device( name=f"BT-{mac[-8:].replace(':', '')}", device_type="bluetooth", protocol="bluetooth", mac_address=mac, ) return f"Paired with {mac}" return f"Pairing failed: {result.stderr.strip()}" except FileNotFoundError: return "Install blueutil: brew install blueutil" except Exception as e: return f"Error: {e}" @tool( name="bluetooth_connect", description="Connect to a paired Bluetooth device", parameters={ "type": "object", "properties": { "mac": {"type": "string", "description": "Device MAC address"}, }, "required": ["mac"], }, ) def bluetooth_connect(mac: str) -> str: try: result = subprocess.run( ["blueutil", "--connect", mac, "--wait-connect", "10"], capture_output=True, text=True, timeout=15, ) return f"Connected to {mac}" if result.returncode == 0 else f"Connect failed: {result.stderr.strip()}" except FileNotFoundError: return "Install blueutil: brew install blueutil" except Exception as e: return f"Error: {e}" @tool( name="bluetooth_disconnect", description="Disconnect a Bluetooth device", parameters={ "type": "object", "properties": { "mac": {"type": "string", "description": "Device MAC address"}, }, "required": ["mac"], }, ) def bluetooth_disconnect(mac: str) -> str: try: result = subprocess.run( ["blueutil", "--disconnect", mac], capture_output=True, text=True, timeout=10, ) return f"Disconnected {mac}" if result.returncode == 0 else f"Failed: {result.stderr.strip()}" except Exception as e: return f"Error: {e}" @tool( name="bluetooth_info", description="Get info about a paired Bluetooth device", parameters={ "type": "object", "properties": { "mac": {"type": "string", "description": "Device MAC address"}, }, "required": ["mac"], }, ) def bluetooth_info(mac: str) -> str: try: result = subprocess.run( ["blueutil", "--info", mac], capture_output=True, text=True, timeout=5, ) return result.stdout.strip() or "No info available" except Exception as e: return f"Error: {e}" # ═══════════════════════════════════════════════════════════════ # NETWORK / WiFi DEVICE DISCOVERY # ═══════════════════════════════════════════════════════════════ @tool( name="network_scan", description="Scan local network for devices using ARP. Finds all devices on your WiFi.", parameters={ "type": "object", "properties": { "subnet": {"type": "string", "description": "Subnet to scan (default: auto-detect, e.g. '192.168.1.0/24')"}, }, }, ) def network_scan(subnet: str = "") -> str: try: if not subnet: # Auto-detect subnet from default route route = subprocess.run( ["route", "-n", "get", "default"], capture_output=True, text=True, timeout=5, ) gateway = "" for line in route.stdout.split("\n"): if "gateway" in line: gateway = line.split(":")[-1].strip() break if gateway: parts = gateway.rsplit(".", 1) subnet = f"{parts[0]}.0/24" else: subnet = "192.168.1.0/24" # ARP scan arp = subprocess.run( ["arp", "-a"], capture_output=True, text=True, timeout=10, ) devices = [] for line in arp.stdout.split("\n"): if line.strip() and "(" in line: parts = line.strip() # Extract name, IP, MAC try: name_part = parts.split("(")[0].strip() ip = parts.split("(")[1].split(")")[0] mac_match = parts.split("at ")[-1].split(" ")[0] if "at " in parts else "" if ip and mac_match and mac_match != "(incomplete)": name = name_part if name_part and name_part != "?" else "" devices.append({"name": name, "ip": ip, "mac": mac_match}) except (IndexError, ValueError): continue if not devices: return "No devices found on network. Try: network_scan with a specific subnet." lines = [f"Network devices ({len(devices)}):"] for d in devices: name = f" ({d['name']})" if d['name'] else "" lines.append(f" • {d['ip']}{name} — MAC: {d['mac']}") return "\n".join(lines) except Exception as e: return f"Network scan error: {e}" @tool( name="bonjour_discover", description="Discover local network services via Bonjour/mDNS (printers, smart TVs, AirPlay, smart home devices)", parameters={ "type": "object", "properties": { "service_type": {"type": "string", "description": "Service type: 'all', 'http', 'airplay', 'printer', 'homekit', 'googlecast', 'hue', 'mqtt' (default: all)"}, "duration": {"type": "integer", "description": "Discovery duration in seconds (default 3)"}, }, }, ) def bonjour_discover(service_type: str = "all", duration: int = 3) -> str: duration = max(1, min(10, duration)) service_map = { "http": "_http._tcp.", "airplay": "_airplay._tcp.", "printer": "_ipp._tcp.", "homekit": "_hap._tcp.", "googlecast": "_googlecast._tcp.", "hue": "_hue._tcp.", "mqtt": "_mqtt._tcp.", "spotify": "_spotify-connect._tcp.", "ssh": "_ssh._tcp.", "smb": "_smb._tcp.", "raop": "_raop._tcp.", } if service_type == "all": services_to_scan = list(service_map.values()) elif service_type in service_map: services_to_scan = [service_map[service_type]] else: services_to_scan = [f"_{service_type}._tcp."] all_results = [] for svc in services_to_scan: try: result = subprocess.run( ["dns-sd", "-B", svc, "local."], capture_output=True, text=True, timeout=duration + 2, ) for line in result.stdout.split("\n"): if line.strip() and not line.startswith("Browsing") and not line.startswith("DATE") and not line.startswith("Timestamp"): parts = line.strip().split() if len(parts) >= 7: name = " ".join(parts[6:]) svc_type = parts[5] if len(parts) > 5 else svc all_results.append({"name": name, "type": svc_type}) except subprocess.TimeoutExpired: continue except Exception: continue if not all_results: return "No Bonjour/mDNS services found." lines = [f"Discovered services ({len(all_results)}):"] for r in all_results: lines.append(f" • {r['name']} ({r['type']})") return "\n".join(lines) @tool( name="wake_on_lan", description="Wake a device using Wake-on-LAN magic packet (turn on a PC/Mac/NAS on your network)", parameters={ "type": "object", "properties": { "mac": {"type": "string", "description": "Target device MAC address (e.g. AA:BB:CC:DD:EE:FF)"}, "broadcast": {"type": "string", "description": "Broadcast address (default: 255.255.255.255)"}, }, "required": ["mac"], }, ) def wake_on_lan(mac: str, broadcast: str = "255.255.255.255") -> str: try: mac_clean = mac.replace(":", "").replace("-", "").replace(".", "") if len(mac_clean) != 12: return f"Invalid MAC address: {mac}" mac_bytes = bytes.fromhex(mac_clean) magic_packet = b'\xff' * 6 + mac_bytes * 16 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.sendto(magic_packet, (broadcast, 9)) sock.close() return f"Wake-on-LAN packet sent to {mac}" except Exception as e: return f"WoL error: {e}" @tool( name="ping_device", description="Ping a device to check if it's online", parameters={ "type": "object", "properties": { "host": {"type": "string", "description": "IP address or hostname"}, "count": {"type": "integer", "description": "Number of pings (default 3)"}, }, "required": ["host"], }, ) def ping_device(host: str, count: int = 3) -> str: count = max(1, min(10, count)) try: result = subprocess.run( ["ping", "-c", str(count), "-t", "5", host], capture_output=True, text=True, timeout=count * 6, ) if result.returncode == 0: # Extract stats lines = result.stdout.strip().split("\n") stats = [l for l in lines if "packets" in l or "avg" in l.lower() or "round-trip" in l] return f"{host} is ONLINE\n" + "\n".join(stats) return f"{host} is OFFLINE/unreachable" except Exception as e: return f"Ping error: {e}" # ═══════════════════════════════════════════════════════════════ # HTTP SMART DEVICE CONTROL (Hue, Tapo, Tuya, generic REST) # ═══════════════════════════════════════════════════════════════ @tool( name="http_device_request", description="Send HTTP request to a smart device on your network (REST API). Works with Philips Hue bridges, smart plugs, ESP32, Home Assistant, etc.", parameters={ "type": "object", "properties": { "url": {"type": "string", "description": "Full URL (e.g. http://192.168.1.50/api/lights)"}, "method": {"type": "string", "description": "HTTP method: GET, POST, PUT, DELETE (default: GET)"}, "body": {"type": "string", "description": "Request body as JSON string (optional)"}, "headers": {"type": "string", "description": "Extra headers as JSON object string (optional)"}, }, "required": ["url"], }, ) def http_device_request(url: str, method: str = "GET", body: str = "", headers: str = "") -> str: import httpx # Validate URL is local network only (security) from urllib.parse import urlparse parsed = urlparse(url) hostname = parsed.hostname or "" local_prefixes = ("192.168.", "10.", "172.16.", "172.17.", "172.18.", "172.19.", "172.20.", "172.21.", "172.22.", "172.23.", "172.24.", "172.25.", "172.26.", "172.27.", "172.28.", "172.29.", "172.30.", "172.31.", "127.", "localhost", "0.0.0.0") is_local = any(hostname.startswith(p) for p in local_prefixes) is_mdns = hostname.endswith(".local") if not is_local and not is_mdns: return f"BLOCKED: http_device_request only works with local network devices ({hostname} is not local). Use web_search or run_command for external requests." try: extra_headers = json.loads(headers) if headers else {} body_data = json.loads(body) if body else None with httpx.Client(timeout=10, verify=False) as client: if method.upper() == "GET": resp = client.get(url, headers=extra_headers) elif method.upper() == "POST": resp = client.post(url, json=body_data, headers=extra_headers) elif method.upper() == "PUT": resp = client.put(url, json=body_data, headers=extra_headers) elif method.upper() == "DELETE": resp = client.delete(url, headers=extra_headers) else: return f"Unknown method: {method}" result = f"HTTP {resp.status_code}" try: result += f"\n{json.dumps(resp.json(), indent=2)[:2000]}" except Exception: result += f"\n{resp.text[:2000]}" return result except Exception as e: return f"HTTP request error: {e}" @tool( name="hue_control", description="Control Philips Hue lights. Requires a Hue Bridge on your network.", parameters={ "type": "object", "properties": { "bridge_ip": {"type": "string", "description": "Hue Bridge IP address"}, "api_key": {"type": "string", "description": "Hue API key (press bridge button first, then use 'discover' action to get one)"}, "action": {"type": "string", "description": "'discover' to get API key, 'list' to list lights, 'on'/'off' to control, 'color' to set color, 'brightness' to set brightness"}, "light_id": {"type": "string", "description": "Light ID (from list), or 'all'"}, "value": {"type": "string", "description": "Value for color (hex like 'FF0000') or brightness (0-254)"}, }, "required": ["bridge_ip", "action"], }, ) def hue_control(bridge_ip: str, action: str, api_key: str = "", light_id: str = "1", value: str = "") -> str: import httpx base = f"http://{bridge_ip}/api" try: if action == "discover": # Press the bridge button first, then call this resp = httpx.post(f"{base}", json={"devicetype": "jarvis#macbook"}, timeout=5) data = resp.json() if isinstance(data, list) and data: if "success" in data[0]: key = data[0]["success"]["username"] return f"Hue API key: {key}\nSave this! Use it for all future Hue commands." elif "error" in data[0]: return f"Error: {data[0]['error']['description']} (press the bridge button first!)" return f"Unexpected response: {data}" if not api_key: return "API key required. Use action='discover' first (press bridge button)." if action == "list": resp = httpx.get(f"{base}/{api_key}/lights", timeout=5) lights = resp.json() lines = [f"Hue Lights ({len(lights)}):"] for lid, info in lights.items(): state = "ON" if info["state"]["on"] else "OFF" bri = info["state"].get("bri", "?") lines.append(f" {lid}: {info['name']} — {state} (brightness: {bri})") return "\n".join(lines) elif action in ("on", "off"): state = {"on": action == "on"} if light_id == "all": resp = httpx.put(f"{base}/{api_key}/groups/0/action", json=state, timeout=5) else: resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5) return f"Light {light_id}: {action}" elif action == "brightness": bri = max(0, min(254, int(value))) state = {"on": True, "bri": bri} if light_id == "all": resp = httpx.put(f"{base}/{api_key}/groups/0/action", json=state, timeout=5) else: resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5) return f"Light {light_id} brightness: {bri}/254" elif action == "color": # Convert hex to Hue xy color space (simplified) r = int(value[0:2], 16) / 255.0 g = int(value[2:4], 16) / 255.0 b = int(value[4:6], 16) / 255.0 # sRGB -> CIE XY (simplified conversion) X = r * 0.664511 + g * 0.154324 + b * 0.162028 Y = r * 0.283881 + g * 0.668433 + b * 0.047685 Z = r * 0.000088 + g * 0.072310 + b * 0.986039 total = X + Y + Z if total == 0: x_val, y_val = 0.0, 0.0 else: x_val = X / total y_val = Y / total state = {"on": True, "xy": [x_val, y_val]} resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5) return f"Light {light_id} color set to #{value}" return f"Unknown action: {action}" except Exception as e: return f"Hue error: {e}" # ═══════════════════════════════════════════════════════════════ # HOMEKIT / APPLE HOME (via Shortcuts bridge) # ═══════════════════════════════════════════════════════════════ @tool( name="homekit_control", description="Control Apple HomeKit devices via Siri Shortcuts. Can control any device in your Apple Home app — lights, locks, thermostats, cameras, plugs, scenes.", parameters={ "type": "object", "properties": { "shortcut_name": {"type": "string", "description": "Name of the Apple Shortcut to run (create shortcuts in Shortcuts.app that control your HomeKit devices)"}, "input": {"type": "string", "description": "Input to pass to the shortcut (optional)"}, }, "required": ["shortcut_name"], }, ) def homekit_control(shortcut_name: str, input: str = "") -> str: try: cmd = ["shortcuts", "run", shortcut_name] if input: cmd.extend(["--input-text", input]) result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) if result.returncode == 0: output = result.stdout.strip() return f"Shortcut '{shortcut_name}' executed." + (f"\nOutput: {output}" if output else "") return f"Shortcut error: {result.stderr.strip()}" except subprocess.TimeoutExpired: return f"Shortcut '{shortcut_name}' timed out (may still be running)" except Exception as e: return f"Error: {e}" @tool( name="homekit_list_shortcuts", description="List all available Apple Shortcuts (includes HomeKit automations you've set up)", parameters={"type": "object", "properties": {}}, ) def homekit_list_shortcuts() -> str: try: result = subprocess.run( ["shortcuts", "list"], capture_output=True, text=True, timeout=10, ) shortcuts = result.stdout.strip().split("\n") shortcuts = [s.strip() for s in shortcuts if s.strip()] if not shortcuts: return "No shortcuts found. Create them in Shortcuts.app." # Group by likely category homekit = [s for s in shortcuts if any(k in s.lower() for k in ["light", "lock", "thermostat", "scene", "home", "room", "plug", "switch", "fan", "door", "garage", "blind", "curtain", "ac", "heater", "alarm"])] other = [s for s in shortcuts if s not in homekit] lines = [f"Apple Shortcuts ({len(shortcuts)} total):"] if homekit: lines.append(f"\n Home/IoT ({len(homekit)}):") for s in homekit: lines.append(f" • {s}") if other: lines.append(f"\n Other ({len(other)}):") for s in other[:20]: lines.append(f" • {s}") if len(other) > 20: lines.append(f" ... and {len(other) - 20} more") return "\n".join(lines) except Exception as e: return f"Error: {e}" @tool( name="siri_command", description="Execute a natural language Siri command — controls any Siri-capable device or HomeKit accessory", parameters={ "type": "object", "properties": { "command": {"type": "string", "description": "Natural language command (e.g. 'turn off living room lights', 'set thermostat to 72', 'lock the front door')"}, }, "required": ["command"], }, ) def siri_command(command: str) -> str: """Route a command through Siri via AppleScript.""" safe_cmd = command.replace('"', '\\"') try: # Use AppleScript to invoke Siri result = subprocess.run([ "osascript", "-e", f''' tell application "System Events" -- Trigger Siri key code 49 using {{command down}} delay 1.5 keystroke "{safe_cmd}" delay 0.3 keystroke return end tell ''' ], capture_output=True, text=True, timeout=10) return f"Siri command sent: {command}" except Exception as e: return f"Siri error: {e}" # ═══════════════════════════════════════════════════════════════ # MQTT IoT CONTROL # ═══════════════════════════════════════════════════════════════ @tool( name="mqtt_publish", description="Publish a message to an MQTT broker — controls IoT devices (smart plugs, ESP32, Tasmota, sensors). Requires mosquitto CLI or paho-mqtt.", parameters={ "type": "object", "properties": { "broker": {"type": "string", "description": "MQTT broker address (e.g. '192.168.1.100' or 'localhost')"}, "topic": {"type": "string", "description": "MQTT topic (e.g. 'cmnd/plug1/POWER', 'home/light/bedroom')"}, "message": {"type": "string", "description": "Message payload (e.g. 'ON', 'OFF', '{\"brightness\": 50}')"}, "port": {"type": "integer", "description": "Broker port (default 1883)"}, }, "required": ["broker", "topic", "message"], }, ) def mqtt_publish(broker: str, topic: str, message: str, port: int = 1883) -> str: # Try mosquitto_pub first (CLI) try: result = subprocess.run( ["mosquitto_pub", "-h", broker, "-p", str(port), "-t", topic, "-m", message], capture_output=True, text=True, timeout=10, ) if result.returncode == 0: return f"MQTT published: {topic} ← {message}" return f"MQTT error: {result.stderr.strip()}" except FileNotFoundError: pass # Fallback: try paho-mqtt Python library try: import paho.mqtt.publish as publish publish.single(topic, payload=message, hostname=broker, port=port) return f"MQTT published: {topic} ← {message}" except ImportError: return "MQTT not available. Install: brew install mosquitto OR pip install paho-mqtt" except Exception as e: return f"MQTT error: {e}" @tool( name="mqtt_subscribe", description="Subscribe to an MQTT topic and read one message (check sensor value, device status, etc.)", parameters={ "type": "object", "properties": { "broker": {"type": "string", "description": "MQTT broker address"}, "topic": {"type": "string", "description": "MQTT topic to subscribe to (e.g. 'stat/plug1/POWER', 'home/sensor/#')"}, "port": {"type": "integer", "description": "Broker port (default 1883)"}, "timeout": {"type": "integer", "description": "Wait timeout in seconds (default 5)"}, }, "required": ["broker", "topic"], }, ) def mqtt_subscribe(broker: str, topic: str, port: int = 1883, timeout: int = 5) -> str: timeout = max(1, min(30, timeout)) try: result = subprocess.run( ["mosquitto_sub", "-h", broker, "-p", str(port), "-t", topic, "-C", "1", "-W", str(timeout)], capture_output=True, text=True, timeout=timeout + 5, ) if result.stdout.strip(): return f"MQTT [{topic}]: {result.stdout.strip()}" return f"No message received on {topic} within {timeout}s" except FileNotFoundError: return "Install mosquitto: brew install mosquitto" except subprocess.TimeoutExpired: return f"No message on {topic} within {timeout}s" except Exception as e: return f"MQTT error: {e}" # ═══════════════════════════════════════════════════════════════ # DEVICE REGISTRY TOOLS (persistent storage) # ═══════════════════════════════════════════════════════════════ @tool( name="register_device", description="Register a smart device so JARVIS remembers it. Supports any protocol: bluetooth, wifi, mqtt, homekit, http, ir", parameters={ "type": "object", "properties": { "name": {"type": "string", "description": "Friendly device name (e.g. 'Living Room Light', 'Bedroom Speaker')"}, "device_type": {"type": "string", "description": "Type: 'light', 'plug', 'speaker', 'tv', 'thermostat', 'lock', 'sensor', 'camera', 'appliance', 'computer', 'phone', 'other'"}, "protocol": {"type": "string", "description": "Protocol: 'bluetooth', 'wifi', 'mqtt', 'homekit', 'http', 'ir', 'zigbee', 'zwave'"}, "address": {"type": "string", "description": "IP address or hostname (for wifi/http devices)"}, "mac_address": {"type": "string", "description": "MAC address (for bluetooth/WoL)"}, "port": {"type": "integer", "description": "Port number (optional)"}, "room": {"type": "string", "description": "Room location (e.g. 'bedroom', 'living room', 'kitchen')"}, "config": {"type": "string", "description": "Extra config as JSON string (API keys, MQTT topics, etc.)"}, }, "required": ["name", "device_type", "protocol"], }, ) def register_device(name: str, device_type: str, protocol: str, address: str = "", mac_address: str = "", port: int = 0, room: str = "", config: str = "") -> str: from device_registry import DeviceRegistry reg = DeviceRegistry() cfg = json.loads(config) if config else {} device_id = reg.add_device(name, device_type, protocol, address, port, mac_address, cfg, room) return f"Device registered: #{device_id} '{name}' ({device_type}/{protocol}) in {room or 'unassigned'}" @tool( name="list_devices", description="List all registered smart devices. Filter by type, room, or protocol.", parameters={ "type": "object", "properties": { "device_type": {"type": "string", "description": "Filter by type (e.g. 'light', 'speaker')"}, "room": {"type": "string", "description": "Filter by room (e.g. 'bedroom')"}, "protocol": {"type": "string", "description": "Filter by protocol (e.g. 'bluetooth', 'mqtt')"}, }, }, ) def list_devices(device_type: str = "", room: str = "", protocol: str = "") -> str: from device_registry import DeviceRegistry reg = DeviceRegistry() devices = reg.get_devices( device_type=device_type or None, room=room or None, protocol=protocol or None, ) if not devices: return "No registered devices." + (" Try different filters." if any([device_type, room, protocol]) else " Use register_device to add one.") lines = [f"Registered devices ({len(devices)}):"] current_room = None for d in devices: r = d["room"] or "Unassigned" if r != current_room: current_room = r lines.append(f"\n 📍 {current_room}:") status = d.get("status", "unknown") status_icon = {"online": "🟢", "offline": "🔴", "registered": "⚪", "unknown": "⚪"}.get(status, "⚪") addr = d.get("address") or d.get("mac_address") or "" addr_str = f" @ {addr}" if addr else "" lines.append(f" {status_icon} #{d['id']}: {d['name']} ({d['device_type']}/{d['protocol']}){addr_str}") return "\n".join(lines) @tool( name="find_device", description="Search for a registered device by name, room, or type", parameters={ "type": "object", "properties": { "query": {"type": "string", "description": "Search query (device name, room, or type)"}, }, "required": ["query"], }, ) def find_device(query: str) -> str: from device_registry import DeviceRegistry reg = DeviceRegistry() devices = reg.search_devices(query) if not devices: return f"No devices found matching '{query}'" lines = [f"Found {len(devices)} device(s):"] for d in devices: room = f" in {d['room']}" if d.get("room") else "" addr = d.get("address") or d.get("mac_address") or "" lines.append(f" #{d['id']}: {d['name']} ({d['device_type']}/{d['protocol']}){room}") if addr: lines.append(f" Address: {addr}") if d.get("config"): lines.append(f" Config: {json.dumps(d['config'])[:100]}") return "\n".join(lines) @tool( name="remove_device", description="Remove a device from the registry", parameters={ "type": "object", "properties": { "device_id": {"type": "integer", "description": "Device ID to remove"}, }, "required": ["device_id"], }, ) def remove_device(device_id: int) -> str: from device_registry import DeviceRegistry reg = DeviceRegistry() device = reg.get_device(device_id) if not device: return f"Device #{device_id} not found" reg.remove_device(device_id) return f"Removed device #{device_id}: {device['name']}" @tool( name="control_device", description="Send a command to a registered device. Automatically uses the right protocol. Works with any registered device.", parameters={ "type": "object", "properties": { "device_name": {"type": "string", "description": "Device name or ID"}, "action": {"type": "string", "description": "Action: 'on', 'off', 'toggle', 'status', 'connect', 'disconnect', or custom command"}, "value": {"type": "string", "description": "Value for the action (brightness, color, temperature, etc.)"}, }, "required": ["device_name", "action"], }, ) def control_device(device_name: str, action: str, value: str = "") -> str: from device_registry import DeviceRegistry reg = DeviceRegistry() # Find device by name or ID device = None try: device_id = int(device_name) device = reg.get_device(device_id) except ValueError: device = reg.get_device_by_name(device_name) if not device: matches = reg.search_devices(device_name) if len(matches) == 1: device = matches[0] elif len(matches) > 1: names = ", ".join(f"#{m['id']}: {m['name']}" for m in matches) return f"Multiple devices match '{device_name}': {names}. Be more specific or use device ID." if not device: return f"Device '{device_name}' not found. Use list_devices to see registered devices." protocol = device["protocol"] address = device.get("address", "") mac = device.get("mac_address", "") config = device.get("config", {}) # ── Bluetooth devices ── if protocol == "bluetooth": if not mac: return f"No MAC address for {device['name']}. Update device config." if action in ("connect", "on"): return bluetooth_connect(mac) elif action in ("disconnect", "off"): return bluetooth_disconnect(mac) elif action == "status": return bluetooth_info(mac) else: return f"Bluetooth action '{action}' not supported. Use: connect, disconnect, status" # ── WiFi/HTTP devices ── elif protocol in ("wifi", "http"): if not address: return f"No IP address for {device['name']}. Update device config." port = device.get("port", 80) base_url = f"http://{address}:{port}" if port != 80 else f"http://{address}" # Check for custom API endpoints in config endpoints = config.get("endpoints", {}) if action in endpoints: url = f"{base_url}{endpoints[action]}" return http_device_request(url, method=endpoints.get(f"{action}_method", "POST")) # Generic REST control if action in ("on", "off", "toggle"): # Try common smart plug/switch APIs for path in ["/relay/0", "/switch", "/api/relay", f"/cm?cmnd=Power%20{action.upper()}"]: try: result = http_device_request(f"{base_url}{path}", method="POST", body=json.dumps({"state": action})) if "200" in result or "ok" in result.lower(): reg.update_device_status(device["id"], "online") return f"{device['name']}: {action} — {result[:100]}" except Exception: continue return f"Could not find working API for {device['name']}. Register endpoints in config." elif action == "status": return http_device_request(f"{base_url}/status", method="GET") else: # Pass through as API call return http_device_request(f"{base_url}/{action}", method="POST", body=json.dumps({"value": value}) if value else "") # ── MQTT devices ── elif protocol == "mqtt": broker = config.get("broker", address or "localhost") mqtt_port = config.get("mqtt_port", 1883) topic_prefix = config.get("topic_prefix", f"cmnd/{device['name'].lower().replace(' ', '_')}") if action in ("on", "off"): topic = config.get("power_topic", f"{topic_prefix}/POWER") return mqtt_publish(broker, topic, action.upper(), mqtt_port) elif action == "toggle": topic = config.get("power_topic", f"{topic_prefix}/POWER") return mqtt_publish(broker, topic, "TOGGLE", mqtt_port) elif action == "status": topic = config.get("status_topic", f"stat/{device['name'].lower().replace(' ', '_')}/POWER") return mqtt_subscribe(broker, topic, mqtt_port, timeout=3) elif action == "brightness" and value: topic = config.get("dimmer_topic", f"{topic_prefix}/Dimmer") return mqtt_publish(broker, topic, value, mqtt_port) elif action == "color" and value: topic = config.get("color_topic", f"{topic_prefix}/Color") return mqtt_publish(broker, topic, value, mqtt_port) else: # Custom command topic = f"{topic_prefix}/{action}" return mqtt_publish(broker, topic, value or action.upper(), mqtt_port) # ── HomeKit devices ── elif protocol == "homekit": shortcut = config.get("shortcut", f"{device['name']} {action.title()}") return homekit_control(shortcut, input=value) # ── IR devices ── elif protocol == "ir": # Broadlink or other IR blaster ir_device = config.get("ir_device", address) code = config.get("codes", {}).get(action, "") if not code: return f"No IR code for '{action}' on {device['name']}. Add codes to device config." if ir_device: # Try broadlink_cli try: result = subprocess.run( ["broadlink_cli", "--device", ir_device, "--send", code], capture_output=True, text=True, timeout=10, ) return f"IR sent: {action} to {device['name']}" except FileNotFoundError: return "Install broadlink_cli: pip install broadlink" return f"No IR blaster configured for {device['name']}" else: return f"Protocol '{protocol}' not yet supported for direct control. Use http_device_request or mqtt_publish directly." # ═══════════════════════════════════════════════════════════════ # SMART HOME SCENES & ROUTINES # ═══════════════════════════════════════════════════════════════ @tool( name="smart_home_scene", description="Execute a smart home scene — controls multiple devices at once. Built-in scenes: 'movie', 'sleep', 'morning', 'party', 'work', 'away'. Or run a custom Apple Shortcut scene.", parameters={ "type": "object", "properties": { "scene": {"type": "string", "description": "Scene name: 'movie', 'sleep', 'morning', 'party', 'work', 'away', or a custom Shortcut name"}, }, "required": ["scene"], }, ) def smart_home_scene(scene: str) -> str: built_in = { "movie": [ ("set_volume", {"level": 40}), ("toggle_dark_mode", {"enable": True}), ("set_brightness", {"level": 0.2}), ("do_not_disturb", {"enable": True}), ], "sleep": [ ("set_volume", {"level": 0}), ("do_not_disturb", {"enable": True}), ("set_brightness", {"level": 0.0}), ], "morning": [ ("set_brightness", {"level": 0.8}), ("toggle_dark_mode", {"enable": False}), ("do_not_disturb", {"enable": False}), ("set_volume", {"level": 50}), ], "work": [ ("do_not_disturb", {"enable": True}), ("set_brightness", {"level": 0.7}), ("toggle_dark_mode", {"enable": True}), ], "party": [ ("set_volume", {"level": 80}), ("set_brightness", {"level": 1.0}), ("do_not_disturb", {"enable": False}), ], "away": [ ("lock_screen", {}), ("do_not_disturb", {"enable": True}), ], } if scene.lower() in built_in: from tools import TOOL_REGISTRY results = [] for tool_name, args in built_in[scene.lower()]: if tool_name in TOOL_REGISTRY: try: r = TOOL_REGISTRY[tool_name]["function"](**args) results.append(f" ✓ {tool_name}: {r}") except Exception as e: results.append(f" ✗ {tool_name}: {e}") return f"Scene '{scene}' activated:\n" + "\n".join(results) else: # Try as Apple Shortcut return homekit_control(scene) @tool( name="airdrop_file", description="Share a file via AirDrop to nearby Apple devices", parameters={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to the file to share"}, }, "required": ["file_path"], }, ) def airdrop_file(file_path: str) -> str: path = os.path.expanduser(file_path) if not os.path.exists(path): return f"File not found: {path}" try: subprocess.run([ "osascript", "-e", f''' tell application "Finder" set theFile to POSIX file "{path}" as alias set theWindow to make new Finder window set target of theWindow to theFile end tell -- Open sharing menu tell application "System Events" keystroke "r" using {{command down, shift down}} delay 1 end tell ''' ], capture_output=True, text=True, timeout=10) return f"AirDrop sharing opened for: {path}" except Exception as e: return f"AirDrop error: {e}" @tool( name="find_my_devices", description="List devices from Find My (Apple's device tracking). Shows your Apple devices and their last known locations.", parameters={"type": "object", "properties": {}}, ) def find_my_devices() -> str: try: # Find My data is cached locally findmy_dir = os.path.expanduser("~/Library/Caches/com.apple.findmy.fmipcore/Items.data") devices_dir = os.path.expanduser("~/Library/Caches/com.apple.findmy.fmipcore/Devices.data") results = [] for path, label in [(devices_dir, "Devices"), (findmy_dir, "Items")]: if os.path.exists(path): try: with open(path, "r") as f: data = json.load(f) for item in data: name = item.get("name", item.get("serialNumber", "Unknown")) loc = item.get("location", {}) battery = item.get("batteryLevel", "?") lat = loc.get("latitude", "?") lon = loc.get("longitude", "?") results.append(f" • {name} — Battery: {battery} — Location: {lat}, {lon}") except Exception: continue if results: return "Find My Devices:\n" + "\n".join(results) return "No Find My data found. Ensure Find My is enabled and signed in." except Exception as e: return f"Find My error: {e}"