Spaces:
Running
Running
| import asyncio | |
| import os | |
| import sys | |
| # Mock Reachy libraries to avoid full robot connection | |
| from unittest.mock import MagicMock | |
| sys.modules["reachy_mini"] = MagicMock() | |
| sys.modules["reachy_mini.utils"] = MagicMock() | |
| sys.modules["reachy_mini.motion"] = MagicMock() | |
| sys.modules["reachy_mini.motion.move"] = MagicMock() | |
| sys.modules["reachy_mini.io"] = MagicMock() | |
| sys.modules["reachy_mini.io.zenoh_client"] = MagicMock() | |
| from reachy_mini_danceml.realtime_handler import RealtimeHandler | |
| from reachy_mini_danceml.movement_generator import MovementGenerator | |
| # Mock Generator | |
| gen = MagicMock(spec=MovementGenerator) | |
| async def test_startup(): | |
| print("Testing RealtimeHandler initialization...") | |
| try: | |
| handler = RealtimeHandler("fake-key", gen) | |
| print("Handler created.") | |
| print("Creating stream...") | |
| # This calls run_until_complete internally | |
| stream = handler.create_stream() | |
| print("Stream created successfully.") | |
| except Exception as e: | |
| print(f"CRASH: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| try: | |
| # Check if we can run the internal logic | |
| handler = RealtimeHandler("fake-key", gen) | |
| stream = handler.create_stream() | |
| print("Sync execution success.") | |
| except Exception as e: | |
| print(f"Sync execution failed: {e}") | |