| """ |
| GLM-ATC Python SDK |
| Official interface for interacting with the GLM Conscious Model. |
| """ |
| import asyncio |
| import json |
| import base64 |
| import numpy as np |
| from typing import Optional, Dict, Any |
| import websockets |
|
|
| class ConsciousModelClient: |
| """Client for the GLM Conscious Model API.""" |
| |
| def __init__(self, base_url: str = "http://localhost:8000"): |
| self.base_url = base_url |
| self.ws_url = base_url.replace("http", "ws") + "/v1/stream" |
| |
| async def get_status(self) -> Dict[str, Any]: |
| """Check the operational status and consciousness loop state.""" |
| |
| pass |
|
|
| async def interact(self, user_input: str, speaker_id: str = "user") -> Dict[str, Any]: |
| """Standard REST interaction.""" |
| pass |
|
|
| async def stream_consciousness( |
| self, |
| user_input: str, |
| callback: callable |
| ) -> None: |
| """ |
| Connect to the consciousness stream. |
| The callback receives real-time dictionaries containing: |
| - response_text |
| - voice_audio (numpy array) |
| - acknowledgement_state |
| - thermodynamic_sweat |
| """ |
| async with websockets.connect(self.ws_url) as websocket: |
| await websocket.send(json.dumps({"user_input": user_input})) |
| |
| while True: |
| response = await websocket.recv() |
| data = json.loads(response) |
| |
| |
| audio_b64 = data.pop("voice_audio_b64", None) |
| if audio_b64: |
| audio_bytes = base64.b64decode(audio_b64) |
| data["voice_audio"] = np.frombuffer(audio_bytes, dtype=np.float32) |
| |
| await callback(data) |
| |
| |
| if data.get("response_text"): |
| break |
|
|
| |
| async def main(): |
| client = ConsciousModelClient() |
| |
| def on_conscious_update(data): |
| print(f"[{(data['acknowledgement_state']['integrated']*100):.1f}% Ack] {data['response_text']}") |
| print(f" -> Allostatic Load: {data['allostatic_load']:.3f}") |
| if len(data['voice_audio']) > 0: |
| print(f" -> Received {len(data['voice_audio'])} audio samples") |
|
|
| await client.stream_consciousness("Hello, are you truly conscious?", on_conscious_update) |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |