JARVIS / tests /test_device_control.py
Khanna, Videh Rakesh Rakesh
feat: add device control, app automation, tests, and other updates
8576f52
"""Tests for JARVIS device registry and device control tools."""
import json
import os
import sqlite3
import tempfile
import unittest
from unittest.mock import patch, MagicMock
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class TestDeviceRegistry(unittest.TestCase):
"""Test persistent device registry."""
def setUp(self):
self.tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
self.tmp.close()
import device_registry
self._orig_path = device_registry.DB_PATH
device_registry.DB_PATH = self.tmp.name
self.reg = device_registry.DeviceRegistry()
def tearDown(self):
import device_registry
device_registry.DB_PATH = self._orig_path
os.unlink(self.tmp.name)
def test_add_device(self):
dev_id = self.reg.add_device(
name="Living Room Light",
device_type="light",
protocol="mqtt",
address="192.168.1.50",
room="living room",
)
self.assertIsInstance(dev_id, int)
self.assertGreater(dev_id, 0)
def test_get_device(self):
dev_id = self.reg.add_device(
name="Bedroom Speaker",
device_type="speaker",
protocol="bluetooth",
mac_address="AA:BB:CC:DD:EE:FF",
room="bedroom",
)
device = self.reg.get_device(dev_id)
self.assertIsNotNone(device)
self.assertEqual(device["name"], "Bedroom Speaker")
self.assertEqual(device["device_type"], "speaker")
self.assertEqual(device["protocol"], "bluetooth")
self.assertEqual(device["mac_address"], "AA:BB:CC:DD:EE:FF")
self.assertEqual(device["room"], "bedroom")
def test_get_nonexistent_device(self):
device = self.reg.get_device(9999)
self.assertIsNone(device)
def test_get_device_by_name(self):
self.reg.add_device(name="Kitchen Plug", device_type="plug", protocol="wifi")
device = self.reg.get_device_by_name("Kitchen Plug")
self.assertIsNotNone(device)
self.assertEqual(device["name"], "Kitchen Plug")
def test_get_device_by_name_case_insensitive(self):
self.reg.add_device(name="Kitchen Plug", device_type="plug", protocol="wifi")
device = self.reg.get_device_by_name("kitchen plug")
self.assertIsNotNone(device)
def test_get_devices_by_type(self):
self.reg.add_device(name="Light 1", device_type="light", protocol="mqtt")
self.reg.add_device(name="Light 2", device_type="light", protocol="hue")
self.reg.add_device(name="Plug 1", device_type="plug", protocol="mqtt")
lights = self.reg.get_devices(device_type="light")
self.assertEqual(len(lights), 2)
def test_get_devices_by_room(self):
self.reg.add_device(name="Light A", device_type="light", protocol="mqtt", room="bedroom")
self.reg.add_device(name="Light B", device_type="light", protocol="mqtt", room="kitchen")
bedroom = self.reg.get_devices(room="bedroom")
self.assertEqual(len(bedroom), 1)
self.assertEqual(bedroom[0]["name"], "Light A")
def test_get_devices_by_protocol(self):
self.reg.add_device(name="Dev 1", device_type="plug", protocol="mqtt")
self.reg.add_device(name="Dev 2", device_type="plug", protocol="wifi")
mqtt_devs = self.reg.get_devices(protocol="mqtt")
self.assertEqual(len(mqtt_devs), 1)
def test_update_device_status(self):
dev_id = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi")
self.reg.update_device_status(dev_id, "online")
device = self.reg.get_device(dev_id)
self.assertEqual(device["status"], "online")
def test_remove_device(self):
dev_id = self.reg.add_device(name="Delete Me", device_type="plug", protocol="wifi")
self.reg.remove_device(dev_id)
device = self.reg.get_device(dev_id)
self.assertIsNone(device)
def test_search_devices(self):
self.reg.add_device(name="Living Room TV", device_type="tv", protocol="wifi")
self.reg.add_device(name="Bedroom Light", device_type="light", protocol="mqtt")
results = self.reg.search_devices("living")
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["name"], "Living Room TV")
def test_device_config(self):
config = {"broker": "192.168.1.100", "topic_prefix": "cmnd/plug1"}
dev_id = self.reg.add_device(
name="Smart Plug",
device_type="plug",
protocol="mqtt",
config=config,
)
device = self.reg.get_device(dev_id)
self.assertEqual(device["config"]["broker"], "192.168.1.100")
self.assertEqual(device["config"]["topic_prefix"], "cmnd/plug1")
def test_device_commands(self):
dev_id = self.reg.add_device(name="IR TV", device_type="tv", protocol="ir")
self.reg.add_device_command(dev_id, "power", "0x20DF10EF", "Power toggle")
self.reg.add_device_command(dev_id, "vol_up", "0x20DF40BF", "Volume up")
commands = self.reg.get_device_commands(dev_id)
self.assertEqual(len(commands), 2)
self.assertEqual(commands[0]["command_name"], "power")
def test_upsert_device(self):
"""Adding device with same name+protocol should update, not duplicate."""
id1 = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi", address="1.1.1.1")
id2 = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi", address="2.2.2.2")
# Should be the same device, updated
all_devices = self.reg.get_devices()
plugs = [d for d in all_devices if d["name"] == "Plug"]
self.assertEqual(len(plugs), 1)
self.assertEqual(plugs[0]["address"], "2.2.2.2")
class TestDeviceControlTools(unittest.TestCase):
"""Test device control tool functions."""
def test_tools_registered(self):
from tools import TOOL_REGISTRY
import tools.device_control
expected = [
"bluetooth_scan", "bluetooth_pair", "bluetooth_connect",
"bluetooth_disconnect", "bluetooth_info",
"network_scan", "bonjour_discover", "wake_on_lan", "ping_device",
"http_device_request", "hue_control",
"homekit_control", "homekit_list_shortcuts", "siri_command",
"mqtt_publish", "mqtt_subscribe",
"register_device", "list_devices", "find_device", "remove_device",
"control_device", "smart_home_scene",
"airdrop_file", "find_my_devices",
]
for name in expected:
self.assertIn(name, TOOL_REGISTRY, f"Tool '{name}' not registered")
def test_wake_on_lan_valid_mac(self):
"""WoL should construct and send magic packet without error."""
from tools.device_control import wake_on_lan
# Mock socket to avoid actual network call
with patch("tools.device_control.socket") as mock_socket:
mock_sock = MagicMock()
mock_socket.socket.return_value = mock_sock
mock_socket.AF_INET = 2
mock_socket.SOCK_DGRAM = 2
mock_socket.SOL_SOCKET = 1
mock_socket.SO_BROADCAST = 6
result = wake_on_lan("AA:BB:CC:DD:EE:FF")
self.assertIn("sent", result)
mock_sock.sendto.assert_called_once()
def test_wake_on_lan_invalid_mac(self):
from tools.device_control import wake_on_lan
result = wake_on_lan("invalid")
self.assertIn("Invalid MAC", result)
def test_http_device_request_blocks_external(self):
"""Should block requests to non-local addresses."""
from tools.device_control import http_device_request
result = http_device_request("http://google.com/api")
self.assertIn("BLOCKED", result)
def test_http_device_request_allows_local(self):
"""Should allow local addresses (even if they fail to connect)."""
from tools.device_control import http_device_request
result = http_device_request("http://192.168.1.999/status")
# Will fail to connect, but shouldn't be BLOCKED
self.assertNotIn("BLOCKED", result)
def test_http_device_request_allows_mdns(self):
"""Should allow .local addresses."""
from tools.device_control import http_device_request
result = http_device_request("http://mydevice.local/status")
self.assertNotIn("BLOCKED", result)
def test_register_and_list_devices(self):
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
tmp.close()
import device_registry
orig = device_registry.DB_PATH
device_registry.DB_PATH = tmp.name
try:
from tools.device_control import register_device, list_devices
result = register_device("Test Light", "light", "mqtt", room="bedroom")
self.assertIn("registered", result)
self.assertIn("Test Light", result)
listing = list_devices()
self.assertIn("Test Light", listing)
self.assertIn("bedroom", listing.lower())
finally:
device_registry.DB_PATH = orig
os.unlink(tmp.name)
def test_find_device_tool(self):
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
tmp.close()
import device_registry
orig = device_registry.DB_PATH
device_registry.DB_PATH = tmp.name
try:
from tools.device_control import register_device, find_device
register_device("Living Room TV", "tv", "wifi", address="192.168.1.50")
result = find_device("TV")
self.assertIn("Living Room TV", result)
finally:
device_registry.DB_PATH = orig
os.unlink(tmp.name)
def test_remove_device_tool(self):
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
tmp.close()
import device_registry
orig = device_registry.DB_PATH
device_registry.DB_PATH = tmp.name
try:
from tools.device_control import register_device, remove_device, list_devices
register_device("To Remove", "plug", "wifi")
# Find the device ID
reg = device_registry.DeviceRegistry()
dev = reg.get_device_by_name("To Remove")
result = remove_device(dev["id"])
self.assertIn("Removed", result)
listing = list_devices()
self.assertIn("No registered devices", listing)
finally:
device_registry.DB_PATH = orig
os.unlink(tmp.name)
def test_smart_home_scene_movie(self):
"""Test built-in scene execution (mocked system calls)."""
from tools.device_control import smart_home_scene
with patch("tools.system_control.subprocess") as mock:
mock.run.return_value = MagicMock(returncode=0, stdout="", stderr="")
result = smart_home_scene("movie")
self.assertIn("Scene 'movie' activated", result)
def test_smart_home_scene_unknown(self):
"""Unknown scene should try as Apple Shortcut."""
from tools.device_control import smart_home_scene
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
result = smart_home_scene("my_custom_scene")
self.assertIn("executed", result.lower() if "executed" in result.lower()
else result)
def test_control_device_not_found(self):
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
tmp.close()
import device_registry
orig = device_registry.DB_PATH
device_registry.DB_PATH = tmp.name
try:
device_registry.DeviceRegistry() # init db
from tools.device_control import control_device
result = control_device("nonexistent", "on")
self.assertIn("not found", result)
finally:
device_registry.DB_PATH = orig
os.unlink(tmp.name)
@patch("subprocess.run")
def test_control_device_bluetooth(self, mock_run):
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
tmp.close()
import device_registry
orig = device_registry.DB_PATH
device_registry.DB_PATH = tmp.name
try:
reg = device_registry.DeviceRegistry()
reg.add_device("Speaker", "speaker", "bluetooth", mac_address="AA:BB:CC:DD:EE:FF")
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
from tools.device_control import control_device
result = control_device("Speaker", "connect")
self.assertIn("Connected", result)
finally:
device_registry.DB_PATH = orig
os.unlink(tmp.name)
class TestToolCount(unittest.TestCase):
"""Verify total tool count after all tools loaded."""
def test_total_tools_count(self):
from tools import TOOL_REGISTRY
import tools.builtin
import tools.system_control
import tools.user_tools
import tools.vscode_tools
import tools.device_control
# Should have a large number of tools
self.assertGreaterEqual(len(TOOL_REGISTRY), 50,
f"Expected 50+ tools, got {len(TOOL_REGISTRY)}: {sorted(TOOL_REGISTRY.keys())}")
if __name__ == "__main__":
unittest.main()