Khanna, Videh Rakesh Rakesh
feat: add device control, app automation, tests, and other updates
8576f52 | """Tests for JARVIS device registry and device control tools.""" | |
| import json | |
| import os | |
| import sqlite3 | |
| import tempfile | |
| import unittest | |
| from unittest.mock import patch, MagicMock | |
| import sys | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| class TestDeviceRegistry(unittest.TestCase): | |
| """Test persistent device registry.""" | |
| def setUp(self): | |
| self.tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) | |
| self.tmp.close() | |
| import device_registry | |
| self._orig_path = device_registry.DB_PATH | |
| device_registry.DB_PATH = self.tmp.name | |
| self.reg = device_registry.DeviceRegistry() | |
| def tearDown(self): | |
| import device_registry | |
| device_registry.DB_PATH = self._orig_path | |
| os.unlink(self.tmp.name) | |
| def test_add_device(self): | |
| dev_id = self.reg.add_device( | |
| name="Living Room Light", | |
| device_type="light", | |
| protocol="mqtt", | |
| address="192.168.1.50", | |
| room="living room", | |
| ) | |
| self.assertIsInstance(dev_id, int) | |
| self.assertGreater(dev_id, 0) | |
| def test_get_device(self): | |
| dev_id = self.reg.add_device( | |
| name="Bedroom Speaker", | |
| device_type="speaker", | |
| protocol="bluetooth", | |
| mac_address="AA:BB:CC:DD:EE:FF", | |
| room="bedroom", | |
| ) | |
| device = self.reg.get_device(dev_id) | |
| self.assertIsNotNone(device) | |
| self.assertEqual(device["name"], "Bedroom Speaker") | |
| self.assertEqual(device["device_type"], "speaker") | |
| self.assertEqual(device["protocol"], "bluetooth") | |
| self.assertEqual(device["mac_address"], "AA:BB:CC:DD:EE:FF") | |
| self.assertEqual(device["room"], "bedroom") | |
| def test_get_nonexistent_device(self): | |
| device = self.reg.get_device(9999) | |
| self.assertIsNone(device) | |
| def test_get_device_by_name(self): | |
| self.reg.add_device(name="Kitchen Plug", device_type="plug", protocol="wifi") | |
| device = self.reg.get_device_by_name("Kitchen Plug") | |
| self.assertIsNotNone(device) | |
| self.assertEqual(device["name"], "Kitchen Plug") | |
| def test_get_device_by_name_case_insensitive(self): | |
| self.reg.add_device(name="Kitchen Plug", device_type="plug", protocol="wifi") | |
| device = self.reg.get_device_by_name("kitchen plug") | |
| self.assertIsNotNone(device) | |
| def test_get_devices_by_type(self): | |
| self.reg.add_device(name="Light 1", device_type="light", protocol="mqtt") | |
| self.reg.add_device(name="Light 2", device_type="light", protocol="hue") | |
| self.reg.add_device(name="Plug 1", device_type="plug", protocol="mqtt") | |
| lights = self.reg.get_devices(device_type="light") | |
| self.assertEqual(len(lights), 2) | |
| def test_get_devices_by_room(self): | |
| self.reg.add_device(name="Light A", device_type="light", protocol="mqtt", room="bedroom") | |
| self.reg.add_device(name="Light B", device_type="light", protocol="mqtt", room="kitchen") | |
| bedroom = self.reg.get_devices(room="bedroom") | |
| self.assertEqual(len(bedroom), 1) | |
| self.assertEqual(bedroom[0]["name"], "Light A") | |
| def test_get_devices_by_protocol(self): | |
| self.reg.add_device(name="Dev 1", device_type="plug", protocol="mqtt") | |
| self.reg.add_device(name="Dev 2", device_type="plug", protocol="wifi") | |
| mqtt_devs = self.reg.get_devices(protocol="mqtt") | |
| self.assertEqual(len(mqtt_devs), 1) | |
| def test_update_device_status(self): | |
| dev_id = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi") | |
| self.reg.update_device_status(dev_id, "online") | |
| device = self.reg.get_device(dev_id) | |
| self.assertEqual(device["status"], "online") | |
| def test_remove_device(self): | |
| dev_id = self.reg.add_device(name="Delete Me", device_type="plug", protocol="wifi") | |
| self.reg.remove_device(dev_id) | |
| device = self.reg.get_device(dev_id) | |
| self.assertIsNone(device) | |
| def test_search_devices(self): | |
| self.reg.add_device(name="Living Room TV", device_type="tv", protocol="wifi") | |
| self.reg.add_device(name="Bedroom Light", device_type="light", protocol="mqtt") | |
| results = self.reg.search_devices("living") | |
| self.assertEqual(len(results), 1) | |
| self.assertEqual(results[0]["name"], "Living Room TV") | |
| def test_device_config(self): | |
| config = {"broker": "192.168.1.100", "topic_prefix": "cmnd/plug1"} | |
| dev_id = self.reg.add_device( | |
| name="Smart Plug", | |
| device_type="plug", | |
| protocol="mqtt", | |
| config=config, | |
| ) | |
| device = self.reg.get_device(dev_id) | |
| self.assertEqual(device["config"]["broker"], "192.168.1.100") | |
| self.assertEqual(device["config"]["topic_prefix"], "cmnd/plug1") | |
| def test_device_commands(self): | |
| dev_id = self.reg.add_device(name="IR TV", device_type="tv", protocol="ir") | |
| self.reg.add_device_command(dev_id, "power", "0x20DF10EF", "Power toggle") | |
| self.reg.add_device_command(dev_id, "vol_up", "0x20DF40BF", "Volume up") | |
| commands = self.reg.get_device_commands(dev_id) | |
| self.assertEqual(len(commands), 2) | |
| self.assertEqual(commands[0]["command_name"], "power") | |
| def test_upsert_device(self): | |
| """Adding device with same name+protocol should update, not duplicate.""" | |
| id1 = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi", address="1.1.1.1") | |
| id2 = self.reg.add_device(name="Plug", device_type="plug", protocol="wifi", address="2.2.2.2") | |
| # Should be the same device, updated | |
| all_devices = self.reg.get_devices() | |
| plugs = [d for d in all_devices if d["name"] == "Plug"] | |
| self.assertEqual(len(plugs), 1) | |
| self.assertEqual(plugs[0]["address"], "2.2.2.2") | |
| class TestDeviceControlTools(unittest.TestCase): | |
| """Test device control tool functions.""" | |
| def test_tools_registered(self): | |
| from tools import TOOL_REGISTRY | |
| import tools.device_control | |
| expected = [ | |
| "bluetooth_scan", "bluetooth_pair", "bluetooth_connect", | |
| "bluetooth_disconnect", "bluetooth_info", | |
| "network_scan", "bonjour_discover", "wake_on_lan", "ping_device", | |
| "http_device_request", "hue_control", | |
| "homekit_control", "homekit_list_shortcuts", "siri_command", | |
| "mqtt_publish", "mqtt_subscribe", | |
| "register_device", "list_devices", "find_device", "remove_device", | |
| "control_device", "smart_home_scene", | |
| "airdrop_file", "find_my_devices", | |
| ] | |
| for name in expected: | |
| self.assertIn(name, TOOL_REGISTRY, f"Tool '{name}' not registered") | |
| def test_wake_on_lan_valid_mac(self): | |
| """WoL should construct and send magic packet without error.""" | |
| from tools.device_control import wake_on_lan | |
| # Mock socket to avoid actual network call | |
| with patch("tools.device_control.socket") as mock_socket: | |
| mock_sock = MagicMock() | |
| mock_socket.socket.return_value = mock_sock | |
| mock_socket.AF_INET = 2 | |
| mock_socket.SOCK_DGRAM = 2 | |
| mock_socket.SOL_SOCKET = 1 | |
| mock_socket.SO_BROADCAST = 6 | |
| result = wake_on_lan("AA:BB:CC:DD:EE:FF") | |
| self.assertIn("sent", result) | |
| mock_sock.sendto.assert_called_once() | |
| def test_wake_on_lan_invalid_mac(self): | |
| from tools.device_control import wake_on_lan | |
| result = wake_on_lan("invalid") | |
| self.assertIn("Invalid MAC", result) | |
| def test_http_device_request_blocks_external(self): | |
| """Should block requests to non-local addresses.""" | |
| from tools.device_control import http_device_request | |
| result = http_device_request("http://google.com/api") | |
| self.assertIn("BLOCKED", result) | |
| def test_http_device_request_allows_local(self): | |
| """Should allow local addresses (even if they fail to connect).""" | |
| from tools.device_control import http_device_request | |
| result = http_device_request("http://192.168.1.999/status") | |
| # Will fail to connect, but shouldn't be BLOCKED | |
| self.assertNotIn("BLOCKED", result) | |
| def test_http_device_request_allows_mdns(self): | |
| """Should allow .local addresses.""" | |
| from tools.device_control import http_device_request | |
| result = http_device_request("http://mydevice.local/status") | |
| self.assertNotIn("BLOCKED", result) | |
| def test_register_and_list_devices(self): | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) | |
| tmp.close() | |
| import device_registry | |
| orig = device_registry.DB_PATH | |
| device_registry.DB_PATH = tmp.name | |
| try: | |
| from tools.device_control import register_device, list_devices | |
| result = register_device("Test Light", "light", "mqtt", room="bedroom") | |
| self.assertIn("registered", result) | |
| self.assertIn("Test Light", result) | |
| listing = list_devices() | |
| self.assertIn("Test Light", listing) | |
| self.assertIn("bedroom", listing.lower()) | |
| finally: | |
| device_registry.DB_PATH = orig | |
| os.unlink(tmp.name) | |
| def test_find_device_tool(self): | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) | |
| tmp.close() | |
| import device_registry | |
| orig = device_registry.DB_PATH | |
| device_registry.DB_PATH = tmp.name | |
| try: | |
| from tools.device_control import register_device, find_device | |
| register_device("Living Room TV", "tv", "wifi", address="192.168.1.50") | |
| result = find_device("TV") | |
| self.assertIn("Living Room TV", result) | |
| finally: | |
| device_registry.DB_PATH = orig | |
| os.unlink(tmp.name) | |
| def test_remove_device_tool(self): | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) | |
| tmp.close() | |
| import device_registry | |
| orig = device_registry.DB_PATH | |
| device_registry.DB_PATH = tmp.name | |
| try: | |
| from tools.device_control import register_device, remove_device, list_devices | |
| register_device("To Remove", "plug", "wifi") | |
| # Find the device ID | |
| reg = device_registry.DeviceRegistry() | |
| dev = reg.get_device_by_name("To Remove") | |
| result = remove_device(dev["id"]) | |
| self.assertIn("Removed", result) | |
| listing = list_devices() | |
| self.assertIn("No registered devices", listing) | |
| finally: | |
| device_registry.DB_PATH = orig | |
| os.unlink(tmp.name) | |
| def test_smart_home_scene_movie(self): | |
| """Test built-in scene execution (mocked system calls).""" | |
| from tools.device_control import smart_home_scene | |
| with patch("tools.system_control.subprocess") as mock: | |
| mock.run.return_value = MagicMock(returncode=0, stdout="", stderr="") | |
| result = smart_home_scene("movie") | |
| self.assertIn("Scene 'movie' activated", result) | |
| def test_smart_home_scene_unknown(self): | |
| """Unknown scene should try as Apple Shortcut.""" | |
| from tools.device_control import smart_home_scene | |
| with patch("subprocess.run") as mock_run: | |
| mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") | |
| result = smart_home_scene("my_custom_scene") | |
| self.assertIn("executed", result.lower() if "executed" in result.lower() | |
| else result) | |
| def test_control_device_not_found(self): | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) | |
| tmp.close() | |
| import device_registry | |
| orig = device_registry.DB_PATH | |
| device_registry.DB_PATH = tmp.name | |
| try: | |
| device_registry.DeviceRegistry() # init db | |
| from tools.device_control import control_device | |
| result = control_device("nonexistent", "on") | |
| self.assertIn("not found", result) | |
| finally: | |
| device_registry.DB_PATH = orig | |
| os.unlink(tmp.name) | |
| def test_control_device_bluetooth(self, mock_run): | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) | |
| tmp.close() | |
| import device_registry | |
| orig = device_registry.DB_PATH | |
| device_registry.DB_PATH = tmp.name | |
| try: | |
| reg = device_registry.DeviceRegistry() | |
| reg.add_device("Speaker", "speaker", "bluetooth", mac_address="AA:BB:CC:DD:EE:FF") | |
| mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") | |
| from tools.device_control import control_device | |
| result = control_device("Speaker", "connect") | |
| self.assertIn("Connected", result) | |
| finally: | |
| device_registry.DB_PATH = orig | |
| os.unlink(tmp.name) | |
| class TestToolCount(unittest.TestCase): | |
| """Verify total tool count after all tools loaded.""" | |
| def test_total_tools_count(self): | |
| from tools import TOOL_REGISTRY | |
| import tools.builtin | |
| import tools.system_control | |
| import tools.user_tools | |
| import tools.vscode_tools | |
| import tools.device_control | |
| # Should have a large number of tools | |
| self.assertGreaterEqual(len(TOOL_REGISTRY), 50, | |
| f"Expected 50+ tools, got {len(TOOL_REGISTRY)}: {sorted(TOOL_REGISTRY.keys())}") | |
| if __name__ == "__main__": | |
| unittest.main() | |