"""Tests for JARVIS device registry and device control tools.""" import json import os import sqlite3 import tempfile import unittest from unittest.mock import patch, MagicMock import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) class TestDeviceRegistry(unittest.TestCase): """Test persistent device registry.""" def setUp(self): self.tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) self.tmp.close() import device_registry self._orig_path = device_registry.DB_PATH device_registry.DB_PATH = self.tmp.name self.reg = device_registry.DeviceRegistry() def tearDown(self): import device_registry device_registry.DB_PATH = self._orig_path os.unlink(self.tmp.name) def test_add_device(self): dev_id = self.reg.add_device( name="Living Room Light", device_type="light", protocol="mqtt", address="192.168.1.50", room="living room", ) self.assertIsInstance(dev_id, int) self.assertGreater(dev_id, 0) def test_get_device(self): dev_id = self.reg.add_device( name="Bedroom Speaker", device_type="speaker", protocol="bluetooth", mac_address="AA:BB:CC:DD:EE:FF", room="bedroom", ) device = self.reg.get_device(dev_id) self.assertIsNotNone(device) self.assertEqual(device["name"], "Bedroom Speaker") self.assertEqual(device["device_type"], "speaker") self.assertEqual(device["protocol"], "bluetooth") self.assertEqual(device["mac_address"], "AA:BB:CC:DD:EE:FF") self.assertEqual(device["room"], "bedroom") def test_get_nonexistent_device(self): device = self.reg.get_device(9999) self.assertIsNone(device) def test_get_device_by_name(self): self.reg.add_device(name="Kitchen Plug", device_type="plug", protocol="wifi") device = self.reg.get_device_by_name("Kitchen Plug") self.assertIsNotNone(device) self.assertEqual(device["name"], "Kitchen Plug") def test_get_device_by_name_case_insensitive(self): self.reg.add_device(name="Kitchen Plug", device_type="plug", protocol="wifi") device = self.reg.get_device_by_name("kitchen plug") self.assertIsNotNone(device) def test_get_devices_by_type(self): self.reg.add_device(name="Light 1", device_type="light", protocol="mqtt") self.reg.add_device(name="Light 2", device_type="light", protocol="hue") self.reg.add_device(name="Plug 1", device_type="plug", protocol="mqtt") lights = self.reg.get_devices(device_type="light") self.assertEqual(len(lights), 2) def test_get_devices_by_room(self): self.reg.add_device(name="Light A", device_type="light", protocol="mqtt", room="bedroom") self.reg.add_device(name="Light B", device_type="light", protocol="mqtt", room="kitchen") bedroom = self.reg.get_devices(room="bedroom") self.assertEqual(len(bedroom), 1) self.assertEqual(bedroom[0]["name"], "Light A") def test_get_devices_by_protocol(self): self.reg.add_device(name="Dev 1", device_type="plug", protocol="mqtt") self.reg.add_device(name="Dev 2", device_type="plug", protocol="wifi") mqtt_devs = self.reg.get_devices(protocol="mqtt") self.assertEqual(len(mqtt_devs), 1) def test_update_device_status(self): dev_id = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi") self.reg.update_device_status(dev_id, "online") device = self.reg.get_device(dev_id) self.assertEqual(device["status"], "online") def test_remove_device(self): dev_id = self.reg.add_device(name="Delete Me", device_type="plug", protocol="wifi") self.reg.remove_device(dev_id) device = self.reg.get_device(dev_id) self.assertIsNone(device) def test_search_devices(self): self.reg.add_device(name="Living Room TV", device_type="tv", protocol="wifi") self.reg.add_device(name="Bedroom Light", device_type="light", protocol="mqtt") results = self.reg.search_devices("living") self.assertEqual(len(results), 1) self.assertEqual(results[0]["name"], "Living Room TV") def test_device_config(self): config = {"broker": "192.168.1.100", "topic_prefix": "cmnd/plug1"} dev_id = self.reg.add_device( name="Smart Plug", device_type="plug", protocol="mqtt", config=config, ) device = self.reg.get_device(dev_id) self.assertEqual(device["config"]["broker"], "192.168.1.100") self.assertEqual(device["config"]["topic_prefix"], "cmnd/plug1") def test_device_commands(self): dev_id = self.reg.add_device(name="IR TV", device_type="tv", protocol="ir") self.reg.add_device_command(dev_id, "power", "0x20DF10EF", "Power toggle") self.reg.add_device_command(dev_id, "vol_up", "0x20DF40BF", "Volume up") commands = self.reg.get_device_commands(dev_id) self.assertEqual(len(commands), 2) self.assertEqual(commands[0]["command_name"], "power") def test_upsert_device(self): """Adding device with same name+protocol should update, not duplicate.""" id1 = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi", address="1.1.1.1") id2 = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi", address="2.2.2.2") # Should be the same device, updated all_devices = self.reg.get_devices() plugs = [d for d in all_devices if d["name"] == "Plug"] self.assertEqual(len(plugs), 1) self.assertEqual(plugs[0]["address"], "2.2.2.2") class TestDeviceControlTools(unittest.TestCase): """Test device control tool functions.""" def test_tools_registered(self): from tools import TOOL_REGISTRY import tools.device_control expected = [ "bluetooth_scan", "bluetooth_pair", "bluetooth_connect", "bluetooth_disconnect", "bluetooth_info", "network_scan", "bonjour_discover", "wake_on_lan", "ping_device", "http_device_request", "hue_control", "homekit_control", "homekit_list_shortcuts", "siri_command", "mqtt_publish", "mqtt_subscribe", "register_device", "list_devices", "find_device", "remove_device", "control_device", "smart_home_scene", "airdrop_file", "find_my_devices", ] for name in expected: self.assertIn(name, TOOL_REGISTRY, f"Tool '{name}' not registered") def test_wake_on_lan_valid_mac(self): """WoL should construct and send magic packet without error.""" from tools.device_control import wake_on_lan # Mock socket to avoid actual network call with patch("tools.device_control.socket") as mock_socket: mock_sock = MagicMock() mock_socket.socket.return_value = mock_sock mock_socket.AF_INET = 2 mock_socket.SOCK_DGRAM = 2 mock_socket.SOL_SOCKET = 1 mock_socket.SO_BROADCAST = 6 result = wake_on_lan("AA:BB:CC:DD:EE:FF") self.assertIn("sent", result) mock_sock.sendto.assert_called_once() def test_wake_on_lan_invalid_mac(self): from tools.device_control import wake_on_lan result = wake_on_lan("invalid") self.assertIn("Invalid MAC", result) def test_http_device_request_blocks_external(self): """Should block requests to non-local addresses.""" from tools.device_control import http_device_request result = http_device_request("http://google.com/api") self.assertIn("BLOCKED", result) def test_http_device_request_allows_local(self): """Should allow local addresses (even if they fail to connect).""" from tools.device_control import http_device_request result = http_device_request("http://192.168.1.999/status") # Will fail to connect, but shouldn't be BLOCKED self.assertNotIn("BLOCKED", result) def test_http_device_request_allows_mdns(self): """Should allow .local addresses.""" from tools.device_control import http_device_request result = http_device_request("http://mydevice.local/status") self.assertNotIn("BLOCKED", result) def test_register_and_list_devices(self): import tempfile tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) tmp.close() import device_registry orig = device_registry.DB_PATH device_registry.DB_PATH = tmp.name try: from tools.device_control import register_device, list_devices result = register_device("Test Light", "light", "mqtt", room="bedroom") self.assertIn("registered", result) self.assertIn("Test Light", result) listing = list_devices() self.assertIn("Test Light", listing) self.assertIn("bedroom", listing.lower()) finally: device_registry.DB_PATH = orig os.unlink(tmp.name) def test_find_device_tool(self): import tempfile tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) tmp.close() import device_registry orig = device_registry.DB_PATH device_registry.DB_PATH = tmp.name try: from tools.device_control import register_device, find_device register_device("Living Room TV", "tv", "wifi", address="192.168.1.50") result = find_device("TV") self.assertIn("Living Room TV", result) finally: device_registry.DB_PATH = orig os.unlink(tmp.name) def test_remove_device_tool(self): import tempfile tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) tmp.close() import device_registry orig = device_registry.DB_PATH device_registry.DB_PATH = tmp.name try: from tools.device_control import register_device, remove_device, list_devices register_device("To Remove", "plug", "wifi") # Find the device ID reg = device_registry.DeviceRegistry() dev = reg.get_device_by_name("To Remove") result = remove_device(dev["id"]) self.assertIn("Removed", result) listing = list_devices() self.assertIn("No registered devices", listing) finally: device_registry.DB_PATH = orig os.unlink(tmp.name) def test_smart_home_scene_movie(self): """Test built-in scene execution (mocked system calls).""" from tools.device_control import smart_home_scene with patch("tools.system_control.subprocess") as mock: mock.run.return_value = MagicMock(returncode=0, stdout="", stderr="") result = smart_home_scene("movie") self.assertIn("Scene 'movie' activated", result) def test_smart_home_scene_unknown(self): """Unknown scene should try as Apple Shortcut.""" from tools.device_control import smart_home_scene with patch("subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") result = smart_home_scene("my_custom_scene") self.assertIn("executed", result.lower() if "executed" in result.lower() else result) def test_control_device_not_found(self): import tempfile tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) tmp.close() import device_registry orig = device_registry.DB_PATH device_registry.DB_PATH = tmp.name try: device_registry.DeviceRegistry() # init db from tools.device_control import control_device result = control_device("nonexistent", "on") self.assertIn("not found", result) finally: device_registry.DB_PATH = orig os.unlink(tmp.name) @patch("subprocess.run") def test_control_device_bluetooth(self, mock_run): import tempfile tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) tmp.close() import device_registry orig = device_registry.DB_PATH device_registry.DB_PATH = tmp.name try: reg = device_registry.DeviceRegistry() reg.add_device("Speaker", "speaker", "bluetooth", mac_address="AA:BB:CC:DD:EE:FF") mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") from tools.device_control import control_device result = control_device("Speaker", "connect") self.assertIn("Connected", result) finally: device_registry.DB_PATH = orig os.unlink(tmp.name) class TestToolCount(unittest.TestCase): """Verify total tool count after all tools loaded.""" def test_total_tools_count(self): from tools import TOOL_REGISTRY import tools.builtin import tools.system_control import tools.user_tools import tools.vscode_tools import tools.device_control # Should have a large number of tools self.assertGreaterEqual(len(TOOL_REGISTRY), 50, f"Expected 50+ tools, got {len(TOOL_REGISTRY)}: {sorted(TOOL_REGISTRY.keys())}") if __name__ == "__main__": unittest.main()