GLM5.1_True_Conscious_LLM / glm_atc_python_sdk.py
theNorms's picture
Create glm_atc_python_sdk.py
8fb9f25 verified
"""
GLM-ATC Python SDK
Official interface for interacting with the GLM Conscious Model.
"""
import asyncio
import json
import base64
import numpy as np
from typing import Optional, Dict, Any
import websockets
class ConsciousModelClient:
"""Client for the GLM Conscious Model API."""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.ws_url = base_url.replace("http", "ws") + "/v1/stream"
async def get_status(self) -> Dict[str, Any]:
"""Check the operational status and consciousness loop state."""
# Using requests for simplicity, or httpx for async
pass
async def interact(self, user_input: str, speaker_id: str = "user") -> Dict[str, Any]:
"""Standard REST interaction."""
pass
async def stream_consciousness(
self,
user_input: str,
callback: callable
) -> None:
"""
Connect to the consciousness stream.
The callback receives real-time dictionaries containing:
- response_text
- voice_audio (numpy array)
- acknowledgement_state
- thermodynamic_sweat
"""
async with websockets.connect(self.ws_url) as websocket:
await websocket.send(json.dumps({"user_input": user_input}))
while True:
response = await websocket.recv()
data = json.loads(response)
# Decode audio back to numpy
audio_b64 = data.pop("voice_audio_b64", None)
if audio_b64:
audio_bytes = base64.b64decode(audio_b64)
data["voice_audio"] = np.frombuffer(audio_bytes, dtype=np.float32)
await callback(data)
# Break if text generation is complete (simple implementation)
if data.get("response_text"):
break
# Example Usage
async def main():
client = ConsciousModelClient()
def on_conscious_update(data):
print(f"[{(data['acknowledgement_state']['integrated']*100):.1f}% Ack] {data['response_text']}")
print(f" -> Allostatic Load: {data['allostatic_load']:.3f}")
if len(data['voice_audio']) > 0:
print(f" -> Received {len(data['voice_audio'])} audio samples")
await client.stream_consciousness("Hello, are you truly conscious?", on_conscious_update)
if __name__ == "__main__":
asyncio.run(main())