JARVIS / tools /device_control.py
Khanna, Videh Rakesh Rakesh
feat: add device control, app automation, tests, and other updates
8576f52
"""
JARVIS Device Control — Universal smart device management.
Controls appliances and electronics via every available protocol:
• Bluetooth — pair, connect, send data to BT/BLE devices
• WiFi/LAN — HTTP REST, Wake-on-LAN, mDNS/Bonjour discovery
• HomeKit — Apple Home via Shortcuts bridge
• MQTT — IoT devices (smart plugs, sensors, ESP32, etc.)
• IR — infrared blaster control (Broadlink, etc.)
• Network — scan, discover, ping devices
All discovered devices are persisted in the device registry.
"""
import subprocess
import os
import json
import socket
import struct
import threading
import time
from tools import tool
# ═══════════════════════════════════════════════════════════════
# BLUETOOTH CONTROL
# ═══════════════════════════════════════════════════════════════
@tool(
name="bluetooth_scan",
description="Scan for nearby Bluetooth devices (classic + BLE). Returns list of discovered devices.",
parameters={
"type": "object",
"properties": {
"duration": {"type": "integer", "description": "Scan duration in seconds (default 5, max 15)"},
},
},
)
def bluetooth_scan(duration: int = 5) -> str:
duration = max(1, min(15, duration))
try:
# Use blueutil for classic BT
result = subprocess.run(
["blueutil", "--inquiry", str(duration)],
capture_output=True, text=True, timeout=duration + 10,
)
devices = []
if result.stdout.strip():
for line in result.stdout.strip().split("\n"):
if line.strip():
devices.append(line.strip())
# Also list paired devices
paired = subprocess.run(
["blueutil", "--paired"],
capture_output=True, text=True, timeout=5,
)
paired_list = []
if paired.stdout.strip():
for line in paired.stdout.strip().split("\n"):
if line.strip():
paired_list.append(line.strip())
output = []
if devices:
output.append(f"Discovered ({len(devices)}):")
for d in devices:
output.append(f" • {d}")
if paired_list:
output.append(f"\nPaired ({len(paired_list)}):")
for d in paired_list:
output.append(f" • {d}")
if not output:
return "No Bluetooth devices found. Ensure Bluetooth is on."
return "\n".join(output)
except FileNotFoundError:
return "Install blueutil: brew install blueutil"
except Exception as e:
return f"Bluetooth scan error: {e}"
@tool(
name="bluetooth_pair",
description="Pair with a Bluetooth device by MAC address",
parameters={
"type": "object",
"properties": {
"mac": {"type": "string", "description": "Device MAC address (e.g. AA:BB:CC:DD:EE:FF)"},
},
"required": ["mac"],
},
)
def bluetooth_pair(mac: str) -> str:
try:
result = subprocess.run(
["blueutil", "--pair", mac, "--wait-connect", "10"],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
# Register in device registry
from device_registry import DeviceRegistry
reg = DeviceRegistry()
reg.add_device(
name=f"BT-{mac[-8:].replace(':', '')}",
device_type="bluetooth",
protocol="bluetooth",
mac_address=mac,
)
return f"Paired with {mac}"
return f"Pairing failed: {result.stderr.strip()}"
except FileNotFoundError:
return "Install blueutil: brew install blueutil"
except Exception as e:
return f"Error: {e}"
@tool(
name="bluetooth_connect",
description="Connect to a paired Bluetooth device",
parameters={
"type": "object",
"properties": {
"mac": {"type": "string", "description": "Device MAC address"},
},
"required": ["mac"],
},
)
def bluetooth_connect(mac: str) -> str:
try:
result = subprocess.run(
["blueutil", "--connect", mac, "--wait-connect", "10"],
capture_output=True, text=True, timeout=15,
)
return f"Connected to {mac}" if result.returncode == 0 else f"Connect failed: {result.stderr.strip()}"
except FileNotFoundError:
return "Install blueutil: brew install blueutil"
except Exception as e:
return f"Error: {e}"
@tool(
name="bluetooth_disconnect",
description="Disconnect a Bluetooth device",
parameters={
"type": "object",
"properties": {
"mac": {"type": "string", "description": "Device MAC address"},
},
"required": ["mac"],
},
)
def bluetooth_disconnect(mac: str) -> str:
try:
result = subprocess.run(
["blueutil", "--disconnect", mac],
capture_output=True, text=True, timeout=10,
)
return f"Disconnected {mac}" if result.returncode == 0 else f"Failed: {result.stderr.strip()}"
except Exception as e:
return f"Error: {e}"
@tool(
name="bluetooth_info",
description="Get info about a paired Bluetooth device",
parameters={
"type": "object",
"properties": {
"mac": {"type": "string", "description": "Device MAC address"},
},
"required": ["mac"],
},
)
def bluetooth_info(mac: str) -> str:
try:
result = subprocess.run(
["blueutil", "--info", mac],
capture_output=True, text=True, timeout=5,
)
return result.stdout.strip() or "No info available"
except Exception as e:
return f"Error: {e}"
# ═══════════════════════════════════════════════════════════════
# NETWORK / WiFi DEVICE DISCOVERY
# ═══════════════════════════════════════════════════════════════
@tool(
name="network_scan",
description="Scan local network for devices using ARP. Finds all devices on your WiFi.",
parameters={
"type": "object",
"properties": {
"subnet": {"type": "string", "description": "Subnet to scan (default: auto-detect, e.g. '192.168.1.0/24')"},
},
},
)
def network_scan(subnet: str = "") -> str:
try:
if not subnet:
# Auto-detect subnet from default route
route = subprocess.run(
["route", "-n", "get", "default"],
capture_output=True, text=True, timeout=5,
)
gateway = ""
for line in route.stdout.split("\n"):
if "gateway" in line:
gateway = line.split(":")[-1].strip()
break
if gateway:
parts = gateway.rsplit(".", 1)
subnet = f"{parts[0]}.0/24"
else:
subnet = "192.168.1.0/24"
# ARP scan
arp = subprocess.run(
["arp", "-a"],
capture_output=True, text=True, timeout=10,
)
devices = []
for line in arp.stdout.split("\n"):
if line.strip() and "(" in line:
parts = line.strip()
# Extract name, IP, MAC
try:
name_part = parts.split("(")[0].strip()
ip = parts.split("(")[1].split(")")[0]
mac_match = parts.split("at ")[-1].split(" ")[0] if "at " in parts else ""
if ip and mac_match and mac_match != "(incomplete)":
name = name_part if name_part and name_part != "?" else ""
devices.append({"name": name, "ip": ip, "mac": mac_match})
except (IndexError, ValueError):
continue
if not devices:
return "No devices found on network. Try: network_scan with a specific subnet."
lines = [f"Network devices ({len(devices)}):"]
for d in devices:
name = f" ({d['name']})" if d['name'] else ""
lines.append(f" • {d['ip']}{name} — MAC: {d['mac']}")
return "\n".join(lines)
except Exception as e:
return f"Network scan error: {e}"
@tool(
name="bonjour_discover",
description="Discover local network services via Bonjour/mDNS (printers, smart TVs, AirPlay, smart home devices)",
parameters={
"type": "object",
"properties": {
"service_type": {"type": "string", "description": "Service type: 'all', 'http', 'airplay', 'printer', 'homekit', 'googlecast', 'hue', 'mqtt' (default: all)"},
"duration": {"type": "integer", "description": "Discovery duration in seconds (default 3)"},
},
},
)
def bonjour_discover(service_type: str = "all", duration: int = 3) -> str:
duration = max(1, min(10, duration))
service_map = {
"http": "_http._tcp.",
"airplay": "_airplay._tcp.",
"printer": "_ipp._tcp.",
"homekit": "_hap._tcp.",
"googlecast": "_googlecast._tcp.",
"hue": "_hue._tcp.",
"mqtt": "_mqtt._tcp.",
"spotify": "_spotify-connect._tcp.",
"ssh": "_ssh._tcp.",
"smb": "_smb._tcp.",
"raop": "_raop._tcp.",
}
if service_type == "all":
services_to_scan = list(service_map.values())
elif service_type in service_map:
services_to_scan = [service_map[service_type]]
else:
services_to_scan = [f"_{service_type}._tcp."]
all_results = []
for svc in services_to_scan:
try:
result = subprocess.run(
["dns-sd", "-B", svc, "local."],
capture_output=True, text=True, timeout=duration + 2,
)
for line in result.stdout.split("\n"):
if line.strip() and not line.startswith("Browsing") and not line.startswith("DATE") and not line.startswith("Timestamp"):
parts = line.strip().split()
if len(parts) >= 7:
name = " ".join(parts[6:])
svc_type = parts[5] if len(parts) > 5 else svc
all_results.append({"name": name, "type": svc_type})
except subprocess.TimeoutExpired:
continue
except Exception:
continue
if not all_results:
return "No Bonjour/mDNS services found."
lines = [f"Discovered services ({len(all_results)}):"]
for r in all_results:
lines.append(f" • {r['name']} ({r['type']})")
return "\n".join(lines)
@tool(
name="wake_on_lan",
description="Wake a device using Wake-on-LAN magic packet (turn on a PC/Mac/NAS on your network)",
parameters={
"type": "object",
"properties": {
"mac": {"type": "string", "description": "Target device MAC address (e.g. AA:BB:CC:DD:EE:FF)"},
"broadcast": {"type": "string", "description": "Broadcast address (default: 255.255.255.255)"},
},
"required": ["mac"],
},
)
def wake_on_lan(mac: str, broadcast: str = "255.255.255.255") -> str:
try:
mac_clean = mac.replace(":", "").replace("-", "").replace(".", "")
if len(mac_clean) != 12:
return f"Invalid MAC address: {mac}"
mac_bytes = bytes.fromhex(mac_clean)
magic_packet = b'\xff' * 6 + mac_bytes * 16
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(magic_packet, (broadcast, 9))
sock.close()
return f"Wake-on-LAN packet sent to {mac}"
except Exception as e:
return f"WoL error: {e}"
@tool(
name="ping_device",
description="Ping a device to check if it's online",
parameters={
"type": "object",
"properties": {
"host": {"type": "string", "description": "IP address or hostname"},
"count": {"type": "integer", "description": "Number of pings (default 3)"},
},
"required": ["host"],
},
)
def ping_device(host: str, count: int = 3) -> str:
count = max(1, min(10, count))
try:
result = subprocess.run(
["ping", "-c", str(count), "-t", "5", host],
capture_output=True, text=True, timeout=count * 6,
)
if result.returncode == 0:
# Extract stats
lines = result.stdout.strip().split("\n")
stats = [l for l in lines if "packets" in l or "avg" in l.lower() or "round-trip" in l]
return f"{host} is ONLINE\n" + "\n".join(stats)
return f"{host} is OFFLINE/unreachable"
except Exception as e:
return f"Ping error: {e}"
# ═══════════════════════════════════════════════════════════════
# HTTP SMART DEVICE CONTROL (Hue, Tapo, Tuya, generic REST)
# ═══════════════════════════════════════════════════════════════
@tool(
name="http_device_request",
description="Send HTTP request to a smart device on your network (REST API). Works with Philips Hue bridges, smart plugs, ESP32, Home Assistant, etc.",
parameters={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Full URL (e.g. http://192.168.1.50/api/lights)"},
"method": {"type": "string", "description": "HTTP method: GET, POST, PUT, DELETE (default: GET)"},
"body": {"type": "string", "description": "Request body as JSON string (optional)"},
"headers": {"type": "string", "description": "Extra headers as JSON object string (optional)"},
},
"required": ["url"],
},
)
def http_device_request(url: str, method: str = "GET", body: str = "", headers: str = "") -> str:
import httpx
# Validate URL is local network only (security)
from urllib.parse import urlparse
parsed = urlparse(url)
hostname = parsed.hostname or ""
local_prefixes = ("192.168.", "10.", "172.16.", "172.17.", "172.18.", "172.19.",
"172.20.", "172.21.", "172.22.", "172.23.", "172.24.", "172.25.",
"172.26.", "172.27.", "172.28.", "172.29.", "172.30.", "172.31.",
"127.", "localhost", "0.0.0.0")
is_local = any(hostname.startswith(p) for p in local_prefixes)
is_mdns = hostname.endswith(".local")
if not is_local and not is_mdns:
return f"BLOCKED: http_device_request only works with local network devices ({hostname} is not local). Use web_search or run_command for external requests."
try:
extra_headers = json.loads(headers) if headers else {}
body_data = json.loads(body) if body else None
with httpx.Client(timeout=10, verify=False) as client:
if method.upper() == "GET":
resp = client.get(url, headers=extra_headers)
elif method.upper() == "POST":
resp = client.post(url, json=body_data, headers=extra_headers)
elif method.upper() == "PUT":
resp = client.put(url, json=body_data, headers=extra_headers)
elif method.upper() == "DELETE":
resp = client.delete(url, headers=extra_headers)
else:
return f"Unknown method: {method}"
result = f"HTTP {resp.status_code}"
try:
result += f"\n{json.dumps(resp.json(), indent=2)[:2000]}"
except Exception:
result += f"\n{resp.text[:2000]}"
return result
except Exception as e:
return f"HTTP request error: {e}"
@tool(
name="hue_control",
description="Control Philips Hue lights. Requires a Hue Bridge on your network.",
parameters={
"type": "object",
"properties": {
"bridge_ip": {"type": "string", "description": "Hue Bridge IP address"},
"api_key": {"type": "string", "description": "Hue API key (press bridge button first, then use 'discover' action to get one)"},
"action": {"type": "string", "description": "'discover' to get API key, 'list' to list lights, 'on'/'off' to control, 'color' to set color, 'brightness' to set brightness"},
"light_id": {"type": "string", "description": "Light ID (from list), or 'all'"},
"value": {"type": "string", "description": "Value for color (hex like 'FF0000') or brightness (0-254)"},
},
"required": ["bridge_ip", "action"],
},
)
def hue_control(bridge_ip: str, action: str, api_key: str = "",
light_id: str = "1", value: str = "") -> str:
import httpx
base = f"http://{bridge_ip}/api"
try:
if action == "discover":
# Press the bridge button first, then call this
resp = httpx.post(f"{base}", json={"devicetype": "jarvis#macbook"}, timeout=5)
data = resp.json()
if isinstance(data, list) and data:
if "success" in data[0]:
key = data[0]["success"]["username"]
return f"Hue API key: {key}\nSave this! Use it for all future Hue commands."
elif "error" in data[0]:
return f"Error: {data[0]['error']['description']} (press the bridge button first!)"
return f"Unexpected response: {data}"
if not api_key:
return "API key required. Use action='discover' first (press bridge button)."
if action == "list":
resp = httpx.get(f"{base}/{api_key}/lights", timeout=5)
lights = resp.json()
lines = [f"Hue Lights ({len(lights)}):"]
for lid, info in lights.items():
state = "ON" if info["state"]["on"] else "OFF"
bri = info["state"].get("bri", "?")
lines.append(f" {lid}: {info['name']}{state} (brightness: {bri})")
return "\n".join(lines)
elif action in ("on", "off"):
state = {"on": action == "on"}
if light_id == "all":
resp = httpx.put(f"{base}/{api_key}/groups/0/action", json=state, timeout=5)
else:
resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5)
return f"Light {light_id}: {action}"
elif action == "brightness":
bri = max(0, min(254, int(value)))
state = {"on": True, "bri": bri}
if light_id == "all":
resp = httpx.put(f"{base}/{api_key}/groups/0/action", json=state, timeout=5)
else:
resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5)
return f"Light {light_id} brightness: {bri}/254"
elif action == "color":
# Convert hex to Hue xy color space (simplified)
r = int(value[0:2], 16) / 255.0
g = int(value[2:4], 16) / 255.0
b = int(value[4:6], 16) / 255.0
# sRGB -> CIE XY (simplified conversion)
X = r * 0.664511 + g * 0.154324 + b * 0.162028
Y = r * 0.283881 + g * 0.668433 + b * 0.047685
Z = r * 0.000088 + g * 0.072310 + b * 0.986039
total = X + Y + Z
if total == 0:
x_val, y_val = 0.0, 0.0
else:
x_val = X / total
y_val = Y / total
state = {"on": True, "xy": [x_val, y_val]}
resp = httpx.put(f"{base}/{api_key}/lights/{light_id}/state", json=state, timeout=5)
return f"Light {light_id} color set to #{value}"
return f"Unknown action: {action}"
except Exception as e:
return f"Hue error: {e}"
# ═══════════════════════════════════════════════════════════════
# HOMEKIT / APPLE HOME (via Shortcuts bridge)
# ═══════════════════════════════════════════════════════════════
@tool(
name="homekit_control",
description="Control Apple HomeKit devices via Siri Shortcuts. Can control any device in your Apple Home app — lights, locks, thermostats, cameras, plugs, scenes.",
parameters={
"type": "object",
"properties": {
"shortcut_name": {"type": "string", "description": "Name of the Apple Shortcut to run (create shortcuts in Shortcuts.app that control your HomeKit devices)"},
"input": {"type": "string", "description": "Input to pass to the shortcut (optional)"},
},
"required": ["shortcut_name"],
},
)
def homekit_control(shortcut_name: str, input: str = "") -> str:
try:
cmd = ["shortcuts", "run", shortcut_name]
if input:
cmd.extend(["--input-text", input])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode == 0:
output = result.stdout.strip()
return f"Shortcut '{shortcut_name}' executed." + (f"\nOutput: {output}" if output else "")
return f"Shortcut error: {result.stderr.strip()}"
except subprocess.TimeoutExpired:
return f"Shortcut '{shortcut_name}' timed out (may still be running)"
except Exception as e:
return f"Error: {e}"
@tool(
name="homekit_list_shortcuts",
description="List all available Apple Shortcuts (includes HomeKit automations you've set up)",
parameters={"type": "object", "properties": {}},
)
def homekit_list_shortcuts() -> str:
try:
result = subprocess.run(
["shortcuts", "list"],
capture_output=True, text=True, timeout=10,
)
shortcuts = result.stdout.strip().split("\n")
shortcuts = [s.strip() for s in shortcuts if s.strip()]
if not shortcuts:
return "No shortcuts found. Create them in Shortcuts.app."
# Group by likely category
homekit = [s for s in shortcuts if any(k in s.lower() for k in
["light", "lock", "thermostat", "scene", "home", "room",
"plug", "switch", "fan", "door", "garage", "blind",
"curtain", "ac", "heater", "alarm"])]
other = [s for s in shortcuts if s not in homekit]
lines = [f"Apple Shortcuts ({len(shortcuts)} total):"]
if homekit:
lines.append(f"\n Home/IoT ({len(homekit)}):")
for s in homekit:
lines.append(f" • {s}")
if other:
lines.append(f"\n Other ({len(other)}):")
for s in other[:20]:
lines.append(f" • {s}")
if len(other) > 20:
lines.append(f" ... and {len(other) - 20} more")
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@tool(
name="siri_command",
description="Execute a natural language Siri command — controls any Siri-capable device or HomeKit accessory",
parameters={
"type": "object",
"properties": {
"command": {"type": "string", "description": "Natural language command (e.g. 'turn off living room lights', 'set thermostat to 72', 'lock the front door')"},
},
"required": ["command"],
},
)
def siri_command(command: str) -> str:
"""Route a command through Siri via AppleScript."""
safe_cmd = command.replace('"', '\\"')
try:
# Use AppleScript to invoke Siri
result = subprocess.run([
"osascript", "-e",
f'''
tell application "System Events"
-- Trigger Siri
key code 49 using {{command down}}
delay 1.5
keystroke "{safe_cmd}"
delay 0.3
keystroke return
end tell
'''
], capture_output=True, text=True, timeout=10)
return f"Siri command sent: {command}"
except Exception as e:
return f"Siri error: {e}"
# ═══════════════════════════════════════════════════════════════
# MQTT IoT CONTROL
# ═══════════════════════════════════════════════════════════════
@tool(
name="mqtt_publish",
description="Publish a message to an MQTT broker — controls IoT devices (smart plugs, ESP32, Tasmota, sensors). Requires mosquitto CLI or paho-mqtt.",
parameters={
"type": "object",
"properties": {
"broker": {"type": "string", "description": "MQTT broker address (e.g. '192.168.1.100' or 'localhost')"},
"topic": {"type": "string", "description": "MQTT topic (e.g. 'cmnd/plug1/POWER', 'home/light/bedroom')"},
"message": {"type": "string", "description": "Message payload (e.g. 'ON', 'OFF', '{\"brightness\": 50}')"},
"port": {"type": "integer", "description": "Broker port (default 1883)"},
},
"required": ["broker", "topic", "message"],
},
)
def mqtt_publish(broker: str, topic: str, message: str, port: int = 1883) -> str:
# Try mosquitto_pub first (CLI)
try:
result = subprocess.run(
["mosquitto_pub", "-h", broker, "-p", str(port), "-t", topic, "-m", message],
capture_output=True, text=True, timeout=10,
)
if result.returncode == 0:
return f"MQTT published: {topic}{message}"
return f"MQTT error: {result.stderr.strip()}"
except FileNotFoundError:
pass
# Fallback: try paho-mqtt Python library
try:
import paho.mqtt.publish as publish
publish.single(topic, payload=message, hostname=broker, port=port)
return f"MQTT published: {topic}{message}"
except ImportError:
return "MQTT not available. Install: brew install mosquitto OR pip install paho-mqtt"
except Exception as e:
return f"MQTT error: {e}"
@tool(
name="mqtt_subscribe",
description="Subscribe to an MQTT topic and read one message (check sensor value, device status, etc.)",
parameters={
"type": "object",
"properties": {
"broker": {"type": "string", "description": "MQTT broker address"},
"topic": {"type": "string", "description": "MQTT topic to subscribe to (e.g. 'stat/plug1/POWER', 'home/sensor/#')"},
"port": {"type": "integer", "description": "Broker port (default 1883)"},
"timeout": {"type": "integer", "description": "Wait timeout in seconds (default 5)"},
},
"required": ["broker", "topic"],
},
)
def mqtt_subscribe(broker: str, topic: str, port: int = 1883, timeout: int = 5) -> str:
timeout = max(1, min(30, timeout))
try:
result = subprocess.run(
["mosquitto_sub", "-h", broker, "-p", str(port), "-t", topic,
"-C", "1", "-W", str(timeout)],
capture_output=True, text=True, timeout=timeout + 5,
)
if result.stdout.strip():
return f"MQTT [{topic}]: {result.stdout.strip()}"
return f"No message received on {topic} within {timeout}s"
except FileNotFoundError:
return "Install mosquitto: brew install mosquitto"
except subprocess.TimeoutExpired:
return f"No message on {topic} within {timeout}s"
except Exception as e:
return f"MQTT error: {e}"
# ═══════════════════════════════════════════════════════════════
# DEVICE REGISTRY TOOLS (persistent storage)
# ═══════════════════════════════════════════════════════════════
@tool(
name="register_device",
description="Register a smart device so JARVIS remembers it. Supports any protocol: bluetooth, wifi, mqtt, homekit, http, ir",
parameters={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Friendly device name (e.g. 'Living Room Light', 'Bedroom Speaker')"},
"device_type": {"type": "string", "description": "Type: 'light', 'plug', 'speaker', 'tv', 'thermostat', 'lock', 'sensor', 'camera', 'appliance', 'computer', 'phone', 'other'"},
"protocol": {"type": "string", "description": "Protocol: 'bluetooth', 'wifi', 'mqtt', 'homekit', 'http', 'ir', 'zigbee', 'zwave'"},
"address": {"type": "string", "description": "IP address or hostname (for wifi/http devices)"},
"mac_address": {"type": "string", "description": "MAC address (for bluetooth/WoL)"},
"port": {"type": "integer", "description": "Port number (optional)"},
"room": {"type": "string", "description": "Room location (e.g. 'bedroom', 'living room', 'kitchen')"},
"config": {"type": "string", "description": "Extra config as JSON string (API keys, MQTT topics, etc.)"},
},
"required": ["name", "device_type", "protocol"],
},
)
def register_device(name: str, device_type: str, protocol: str,
address: str = "", mac_address: str = "",
port: int = 0, room: str = "", config: str = "") -> str:
from device_registry import DeviceRegistry
reg = DeviceRegistry()
cfg = json.loads(config) if config else {}
device_id = reg.add_device(name, device_type, protocol, address,
port, mac_address, cfg, room)
return f"Device registered: #{device_id} '{name}' ({device_type}/{protocol}) in {room or 'unassigned'}"
@tool(
name="list_devices",
description="List all registered smart devices. Filter by type, room, or protocol.",
parameters={
"type": "object",
"properties": {
"device_type": {"type": "string", "description": "Filter by type (e.g. 'light', 'speaker')"},
"room": {"type": "string", "description": "Filter by room (e.g. 'bedroom')"},
"protocol": {"type": "string", "description": "Filter by protocol (e.g. 'bluetooth', 'mqtt')"},
},
},
)
def list_devices(device_type: str = "", room: str = "", protocol: str = "") -> str:
from device_registry import DeviceRegistry
reg = DeviceRegistry()
devices = reg.get_devices(
device_type=device_type or None,
room=room or None,
protocol=protocol or None,
)
if not devices:
return "No registered devices." + (" Try different filters." if any([device_type, room, protocol]) else " Use register_device to add one.")
lines = [f"Registered devices ({len(devices)}):"]
current_room = None
for d in devices:
r = d["room"] or "Unassigned"
if r != current_room:
current_room = r
lines.append(f"\n 📍 {current_room}:")
status = d.get("status", "unknown")
status_icon = {"online": "🟢", "offline": "🔴", "registered": "⚪", "unknown": "⚪"}.get(status, "⚪")
addr = d.get("address") or d.get("mac_address") or ""
addr_str = f" @ {addr}" if addr else ""
lines.append(f" {status_icon} #{d['id']}: {d['name']} ({d['device_type']}/{d['protocol']}){addr_str}")
return "\n".join(lines)
@tool(
name="find_device",
description="Search for a registered device by name, room, or type",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query (device name, room, or type)"},
},
"required": ["query"],
},
)
def find_device(query: str) -> str:
from device_registry import DeviceRegistry
reg = DeviceRegistry()
devices = reg.search_devices(query)
if not devices:
return f"No devices found matching '{query}'"
lines = [f"Found {len(devices)} device(s):"]
for d in devices:
room = f" in {d['room']}" if d.get("room") else ""
addr = d.get("address") or d.get("mac_address") or ""
lines.append(f" #{d['id']}: {d['name']} ({d['device_type']}/{d['protocol']}){room}")
if addr:
lines.append(f" Address: {addr}")
if d.get("config"):
lines.append(f" Config: {json.dumps(d['config'])[:100]}")
return "\n".join(lines)
@tool(
name="remove_device",
description="Remove a device from the registry",
parameters={
"type": "object",
"properties": {
"device_id": {"type": "integer", "description": "Device ID to remove"},
},
"required": ["device_id"],
},
)
def remove_device(device_id: int) -> str:
from device_registry import DeviceRegistry
reg = DeviceRegistry()
device = reg.get_device(device_id)
if not device:
return f"Device #{device_id} not found"
reg.remove_device(device_id)
return f"Removed device #{device_id}: {device['name']}"
@tool(
name="control_device",
description="Send a command to a registered device. Automatically uses the right protocol. Works with any registered device.",
parameters={
"type": "object",
"properties": {
"device_name": {"type": "string", "description": "Device name or ID"},
"action": {"type": "string", "description": "Action: 'on', 'off', 'toggle', 'status', 'connect', 'disconnect', or custom command"},
"value": {"type": "string", "description": "Value for the action (brightness, color, temperature, etc.)"},
},
"required": ["device_name", "action"],
},
)
def control_device(device_name: str, action: str, value: str = "") -> str:
from device_registry import DeviceRegistry
reg = DeviceRegistry()
# Find device by name or ID
device = None
try:
device_id = int(device_name)
device = reg.get_device(device_id)
except ValueError:
device = reg.get_device_by_name(device_name)
if not device:
matches = reg.search_devices(device_name)
if len(matches) == 1:
device = matches[0]
elif len(matches) > 1:
names = ", ".join(f"#{m['id']}: {m['name']}" for m in matches)
return f"Multiple devices match '{device_name}': {names}. Be more specific or use device ID."
if not device:
return f"Device '{device_name}' not found. Use list_devices to see registered devices."
protocol = device["protocol"]
address = device.get("address", "")
mac = device.get("mac_address", "")
config = device.get("config", {})
# ── Bluetooth devices ──
if protocol == "bluetooth":
if not mac:
return f"No MAC address for {device['name']}. Update device config."
if action in ("connect", "on"):
return bluetooth_connect(mac)
elif action in ("disconnect", "off"):
return bluetooth_disconnect(mac)
elif action == "status":
return bluetooth_info(mac)
else:
return f"Bluetooth action '{action}' not supported. Use: connect, disconnect, status"
# ── WiFi/HTTP devices ──
elif protocol in ("wifi", "http"):
if not address:
return f"No IP address for {device['name']}. Update device config."
port = device.get("port", 80)
base_url = f"http://{address}:{port}" if port != 80 else f"http://{address}"
# Check for custom API endpoints in config
endpoints = config.get("endpoints", {})
if action in endpoints:
url = f"{base_url}{endpoints[action]}"
return http_device_request(url, method=endpoints.get(f"{action}_method", "POST"))
# Generic REST control
if action in ("on", "off", "toggle"):
# Try common smart plug/switch APIs
for path in ["/relay/0", "/switch", "/api/relay", f"/cm?cmnd=Power%20{action.upper()}"]:
try:
result = http_device_request(f"{base_url}{path}", method="POST",
body=json.dumps({"state": action}))
if "200" in result or "ok" in result.lower():
reg.update_device_status(device["id"], "online")
return f"{device['name']}: {action}{result[:100]}"
except Exception:
continue
return f"Could not find working API for {device['name']}. Register endpoints in config."
elif action == "status":
return http_device_request(f"{base_url}/status", method="GET")
else:
# Pass through as API call
return http_device_request(f"{base_url}/{action}", method="POST",
body=json.dumps({"value": value}) if value else "")
# ── MQTT devices ──
elif protocol == "mqtt":
broker = config.get("broker", address or "localhost")
mqtt_port = config.get("mqtt_port", 1883)
topic_prefix = config.get("topic_prefix", f"cmnd/{device['name'].lower().replace(' ', '_')}")
if action in ("on", "off"):
topic = config.get("power_topic", f"{topic_prefix}/POWER")
return mqtt_publish(broker, topic, action.upper(), mqtt_port)
elif action == "toggle":
topic = config.get("power_topic", f"{topic_prefix}/POWER")
return mqtt_publish(broker, topic, "TOGGLE", mqtt_port)
elif action == "status":
topic = config.get("status_topic", f"stat/{device['name'].lower().replace(' ', '_')}/POWER")
return mqtt_subscribe(broker, topic, mqtt_port, timeout=3)
elif action == "brightness" and value:
topic = config.get("dimmer_topic", f"{topic_prefix}/Dimmer")
return mqtt_publish(broker, topic, value, mqtt_port)
elif action == "color" and value:
topic = config.get("color_topic", f"{topic_prefix}/Color")
return mqtt_publish(broker, topic, value, mqtt_port)
else:
# Custom command
topic = f"{topic_prefix}/{action}"
return mqtt_publish(broker, topic, value or action.upper(), mqtt_port)
# ── HomeKit devices ──
elif protocol == "homekit":
shortcut = config.get("shortcut", f"{device['name']} {action.title()}")
return homekit_control(shortcut, input=value)
# ── IR devices ──
elif protocol == "ir":
# Broadlink or other IR blaster
ir_device = config.get("ir_device", address)
code = config.get("codes", {}).get(action, "")
if not code:
return f"No IR code for '{action}' on {device['name']}. Add codes to device config."
if ir_device:
# Try broadlink_cli
try:
result = subprocess.run(
["broadlink_cli", "--device", ir_device, "--send", code],
capture_output=True, text=True, timeout=10,
)
return f"IR sent: {action} to {device['name']}"
except FileNotFoundError:
return "Install broadlink_cli: pip install broadlink"
return f"No IR blaster configured for {device['name']}"
else:
return f"Protocol '{protocol}' not yet supported for direct control. Use http_device_request or mqtt_publish directly."
# ═══════════════════════════════════════════════════════════════
# SMART HOME SCENES & ROUTINES
# ═══════════════════════════════════════════════════════════════
@tool(
name="smart_home_scene",
description="Execute a smart home scene — controls multiple devices at once. Built-in scenes: 'movie', 'sleep', 'morning', 'party', 'work', 'away'. Or run a custom Apple Shortcut scene.",
parameters={
"type": "object",
"properties": {
"scene": {"type": "string", "description": "Scene name: 'movie', 'sleep', 'morning', 'party', 'work', 'away', or a custom Shortcut name"},
},
"required": ["scene"],
},
)
def smart_home_scene(scene: str) -> str:
built_in = {
"movie": [
("set_volume", {"level": 40}),
("toggle_dark_mode", {"enable": True}),
("set_brightness", {"level": 0.2}),
("do_not_disturb", {"enable": True}),
],
"sleep": [
("set_volume", {"level": 0}),
("do_not_disturb", {"enable": True}),
("set_brightness", {"level": 0.0}),
],
"morning": [
("set_brightness", {"level": 0.8}),
("toggle_dark_mode", {"enable": False}),
("do_not_disturb", {"enable": False}),
("set_volume", {"level": 50}),
],
"work": [
("do_not_disturb", {"enable": True}),
("set_brightness", {"level": 0.7}),
("toggle_dark_mode", {"enable": True}),
],
"party": [
("set_volume", {"level": 80}),
("set_brightness", {"level": 1.0}),
("do_not_disturb", {"enable": False}),
],
"away": [
("lock_screen", {}),
("do_not_disturb", {"enable": True}),
],
}
if scene.lower() in built_in:
from tools import TOOL_REGISTRY
results = []
for tool_name, args in built_in[scene.lower()]:
if tool_name in TOOL_REGISTRY:
try:
r = TOOL_REGISTRY[tool_name]["function"](**args)
results.append(f" ✓ {tool_name}: {r}")
except Exception as e:
results.append(f" ✗ {tool_name}: {e}")
return f"Scene '{scene}' activated:\n" + "\n".join(results)
else:
# Try as Apple Shortcut
return homekit_control(scene)
@tool(
name="airdrop_file",
description="Share a file via AirDrop to nearby Apple devices",
parameters={
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to the file to share"},
},
"required": ["file_path"],
},
)
def airdrop_file(file_path: str) -> str:
path = os.path.expanduser(file_path)
if not os.path.exists(path):
return f"File not found: {path}"
try:
subprocess.run([
"osascript", "-e",
f'''
tell application "Finder"
set theFile to POSIX file "{path}" as alias
set theWindow to make new Finder window
set target of theWindow to theFile
end tell
-- Open sharing menu
tell application "System Events"
keystroke "r" using {{command down, shift down}}
delay 1
end tell
'''
], capture_output=True, text=True, timeout=10)
return f"AirDrop sharing opened for: {path}"
except Exception as e:
return f"AirDrop error: {e}"
@tool(
name="find_my_devices",
description="List devices from Find My (Apple's device tracking). Shows your Apple devices and their last known locations.",
parameters={"type": "object", "properties": {}},
)
def find_my_devices() -> str:
try:
# Find My data is cached locally
findmy_dir = os.path.expanduser("~/Library/Caches/com.apple.findmy.fmipcore/Items.data")
devices_dir = os.path.expanduser("~/Library/Caches/com.apple.findmy.fmipcore/Devices.data")
results = []
for path, label in [(devices_dir, "Devices"), (findmy_dir, "Items")]:
if os.path.exists(path):
try:
with open(path, "r") as f:
data = json.load(f)
for item in data:
name = item.get("name", item.get("serialNumber", "Unknown"))
loc = item.get("location", {})
battery = item.get("batteryLevel", "?")
lat = loc.get("latitude", "?")
lon = loc.get("longitude", "?")
results.append(f" • {name} — Battery: {battery} — Location: {lat}, {lon}")
except Exception:
continue
if results:
return "Find My Devices:\n" + "\n".join(results)
return "No Find My data found. Ensure Find My is enabled and signed in."
except Exception as e:
return f"Find My error: {e}"