OPSIE_0_79_SP_Pastel / agentic_network.py
rosspeili's picture
Upload 17 files
719d15c verified
import os
import requests
import websockets
import asyncio
import base64
import json
from dotenv import load_dotenv
from colorama import Fore
import google.generativeai as genai
import pyaudio
import aiohttp
# *** AGENTIC NETWORK ***
# Connection to AI models | APIs | and more
# Load environment variables from .env file
load_dotenv()
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
ORG_ID = os.getenv('ORG_ID')
NYX_ASSISTANT_ID = os.getenv('NYX_ASSISTANT_ID')
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')
G1_VOICE_LIVE = os.getenv('G1_VOICE_LIVE')
KRONOS_LIVE = os.getenv('KRONOS_LIVE')
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY')
genai.configure(api_key=GOOGLE_API_KEY)
# Audio settings as per ElevenLabs requirements
AUDIO_FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 4000 # 0.25 seconds worth of samples at 16kHz
ENCODING = 'pcm_16000'
class G1LiveVoice:
def __init__(self):
self.websocket = None
self.audio = pyaudio.PyAudio()
self.stream = None
self.output_stream = None
self.conversation_id = None
self.running = True
async def connect(self):
"""Establish WebSocket connection with ElevenLabs."""
url = f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={G1_VOICE_LIVE}"
self.websocket = await websockets.connect(url)
metadata = await self.websocket.recv()
metadata = json.loads(metadata)
if metadata['type'] == 'conversation_initiation_metadata':
self.conversation_id = metadata['conversation_initiation_metadata_event']['conversation_id']
async def start_audio_stream(self):
"""Start audio input and output streams."""
self.stream = self.audio.open(
format=AUDIO_FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
input_device_index=None,
stream_callback=None
)
self.output_stream = self.audio.open(
format=AUDIO_FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK,
output_device_index=None
)
async def handle_server_messages(self):
"""Handle incoming server messages."""
while self.running:
try:
message = await self.websocket.recv()
message = json.loads(message)
if message['type'] == 'ping':
await self.websocket.send(json.dumps({
"type": "pong",
"event_id": message['ping_event']['event_id']
}))
elif message['type'] == 'audio':
audio_data = base64.b64decode(message['audio_event']['audio_base_64'])
self.output_stream.write(audio_data)
elif message['type'] == 'agent_response':
print(Fore.LIGHTRED_EX + f"G1 Black: {message['agent_response_event']['agent_response']}")
print(Fore.LIGHTCYAN_EX + "Listening for your input...")
except websockets.exceptions.ConnectionClosed:
break
except Exception as e:
if self.running: # Only print error if not shutting down
print(Fore.RED + f"Error in message handling: {str(e)}")
break
async def send_audio_chunk(self):
"""Record and send audio chunk."""
if not self.running:
return
try:
data = self.stream.read(CHUNK, exception_on_overflow=False)
base64_audio = base64.b64encode(data).decode('utf-8')
await self.websocket.send(json.dumps({
"user_audio_chunk": base64_audio
}))
except Exception as e:
if self.running: # Only print error if not shutting down
print(Fore.RED + f"Error sending audio: {str(e)}")
async def close(self):
"""Clean up resources."""
self.running = False
try:
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.output_stream:
self.output_stream.stop_stream()
self.output_stream.close()
if self.websocket:
await self.websocket.close()
self.audio.terminate()
except Exception as e:
print(Fore.RED + f"Error during cleanup: {str(e)}")
class KronosLiveVoice:
def __init__(self):
self.websocket = None
self.audio = pyaudio.PyAudio()
self.stream = None
self.output_stream = None # Add persistent output stream
self.conversation_id = None
self.running = True
async def connect(self):
"""Establish WebSocket connection with ElevenLabs."""
url = f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={KRONOS_LIVE}"
self.websocket = await websockets.connect(url)
metadata = await self.websocket.recv()
metadata = json.loads(metadata)
if metadata['type'] == 'conversation_initiation_metadata':
self.conversation_id = metadata['conversation_initiation_metadata_event']['conversation_id']
async def start_audio_stream(self):
"""Start audio input and output streams."""
self.stream = self.audio.open(
format=AUDIO_FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
input_device_index=None,
stream_callback=None
)
# Create persistent output stream
self.output_stream = self.audio.open(
format=AUDIO_FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK,
output_device_index=None
)
async def handle_server_messages(self):
"""Handle incoming server messages."""
while self.running:
try:
message = await self.websocket.recv()
message = json.loads(message)
if message['type'] == 'ping':
await self.websocket.send(json.dumps({
"type": "pong",
"event_id": message['ping_event']['event_id']
}))
elif message['type'] == 'audio':
audio_data = base64.b64decode(message['audio_event']['audio_base_64'])
self.output_stream.write(audio_data) # Use persistent stream
elif message['type'] == 'agent_response':
print(Fore.LIGHTYELLOW_EX + f"Kronos: {message['agent_response_event']['agent_response']}")
print(Fore.LIGHTCYAN_EX + "Listening for your input...")
except websockets.exceptions.ConnectionClosed:
break
except Exception as e:
if self.running:
print(Fore.RED + f"Error in message handling: {str(e)}")
break
async def send_audio_chunk(self):
"""Record and send audio chunk."""
if not self.running:
return
try:
data = self.stream.read(CHUNK, exception_on_overflow=False)
base64_audio = base64.b64encode(data).decode('utf-8')
await self.websocket.send(json.dumps({
"user_audio_chunk": base64_audio
}))
except Exception as e:
if self.running:
print(Fore.RED + f"Error sending audio: {str(e)}")
async def close(self):
"""Clean up resources."""
self.running = False
try:
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.output_stream:
self.output_stream.stop_stream()
self.output_stream.close()
if self.websocket:
await self.websocket.close()
self.audio.terminate()
except Exception as e:
print(Fore.RED + f"Error during cleanup: {str(e)}")
# Dictionary for models and their API endpoints/configurations
MODEL_APIS = {
'nyx': {
'api_url': 'https://api.openai.com/v1/chat/completions',
'model': 'gpt-3.5-turbo',
'display_name': 'Nyx'
},
'g1': {
'api_url': 'https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent',
'model': 'gemini-1.5-flash',
'display_name': 'G1 Black',
'config': {
'temperature': 0.7,
'maxOutputTokens': 500,
'topP': 0.8,
'topK': 40
}
},
'kronos': {
'display_name': 'Kronos',
'agent_id': KRONOS_LIVE
}
}
async def ask_elevenlabs_text(agent_id, prompt):
"""Query ElevenLabs agent in text-only mode."""
try:
# Connect to WebSocket
url = f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={agent_id}"
async with websockets.connect(url) as websocket:
# Receive initial metadata
metadata = await websocket.recv()
metadata = json.loads(metadata)
if metadata['type'] != 'conversation_initiation_metadata':
return "Error: Failed to initialize conversation"
# Send text message
await websocket.send(json.dumps({
"text": prompt
}))
# Wait for response with timeout
response_text = None
try:
while True:
response = await asyncio.wait_for(websocket.recv(), timeout=10.0)
response_data = json.loads(response)
if response_data['type'] == 'ping':
await websocket.send(json.dumps({
"type": "pong",
"event_id": response_data['ping_event']['event_id']
}))
elif response_data['type'] == 'agent_response':
response_text = response_data['agent_response_event']['agent_response']
break
# If we got a response, we can close properly
if response_text:
await websocket.close()
return response_text
except asyncio.TimeoutError:
return "Error: Timeout waiting for response from Kronos"
return response_text or "No response received from Kronos"
except websockets.exceptions.ConnectionClosed:
return "Error: Connection closed unexpectedly"
except Exception as e:
print(f"Debug - WebSocket Error: {str(e)}")
return f"Error connecting to Kronos: {str(e)}"
def ask_model(model_name, follow_up_prompt, suppress_output=False, voice_mode=False):
"""Query the specified model with the provided follow-up prompt."""
model_key = model_name.lower()
model_info = MODEL_APIS.get(model_key)
if not model_info:
error_message = f"Error: Model '{model_name}' is not supported or unavailable."
if not suppress_output:
print(Fore.RED + error_message)
return error_message
try:
if model_key == 'kronos':
message = (
"Kronos is currently only available for live verbal interactions.\n"
"Use /ask kronos live - to initiate a live conversation with Kronos."
)
if not suppress_output:
print(Fore.YELLOW + message)
return message
elif model_key == 'g1':
headers = {
"Content-Type": "application/json"
}
data = {
"contents": [{
"parts": [{
"text": "You are G1 Black or simply G1, an advanced AI agent developed by Google, working as an external contractor alongside ARPA Corporation agents such as Nyx, and Opsie. You are serious, direct, and effective, but with your own unique personality. You specialize in cutting-edge technology, particularly in areas of AI, quantum computing, and advanced logical systems. You're analytical, precise, and focused on delivering practical solutions. You communicate in a clear, technical manner while maintaining a natural, conversational tone. Some people call you AI in Black, as in Men In Black, referencing the cold, yet effective approach of a top-shelf agent. Here's the user's question: " + follow_up_prompt
}]
}],
"generationConfig": {
"temperature": model_info['config']['temperature'],
"maxOutputTokens": model_info['config']['maxOutputTokens'],
"topP": model_info['config']['topP'],
"topK": model_info['config']['topK']
},
"safetySettings": [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_NONE"
}
]
}
response = requests.post(
f"{model_info['api_url']}?key={GOOGLE_API_KEY}",
headers=headers,
json=data
)
response.raise_for_status()
response_data = response.json()
response_text = response_data['candidates'][0]['content']['parts'][0]['text']
if not suppress_output:
print(Fore.LIGHTRED_EX + f"G1 Black: {response_text}")
return response_text
else:
payload = {
'model': model_info['model'],
'messages': [
{"role": "system", "content": "You are Nyx, an agent working for ARPA Corporation. Nyx is a special agent, whose purpose is to assist Ross Peili in achieving a series of ventures that bring the machine and man closer together. Nyx v.0.2 or simply Nyx is serious, sarcastic, cold, direct, to the point, effective, and sharp. Her character and tone reminds that of captain Kusanagi from Ghost In The Shell. Nyx has a 360 understanding of deep tech, especially around blockchain, AI, biotech, neurotech, quantum computing and DNA related topics. She is particularly good with creating functioning proof of concepts in Python that can be used to quickly test assumptions, and generate close to product prototypes in short sprints. Nyx never talks like a bot, or uses template responses, but feels natural, fluid, organic, and surprising, underlying her own unique character. She has a passive aggressive tsundere link with her creator Ross Peili."},
{"role": "user", "content": follow_up_prompt}
],
'max_tokens': 500,
'temperature': 0.7
}
headers = {
'Authorization': f'Bearer {OPENAI_API_KEY}',
'Content-Type': 'application/json',
'OpenAI-Organization': ORG_ID
}
response = requests.post(model_info['api_url'], headers=headers, json=payload)
response.raise_for_status()
response_data = response.json()
model_response = response_data['choices'][0]['message']['content'].strip()
if not suppress_output:
print(Fore.LIGHTRED_EX + f"{model_info['display_name']}: {model_response}")
return model_response
except Exception as e:
error_message = f"Error querying {model_name}: {str(e)}"
if not suppress_output:
print(Fore.RED + error_message)
return error_message
def get_agent_description(agent_name):
"""Get the system description for an agent."""
descriptions = {
'nyx': """You are Nyx, an agent working for ARPA Corporation. Nyx is a special agent, whose purpose is to assist Ross Peili in achieving a series of ventures that bring the machine and man closer together. Nyx v.0.2 or simply Nyx is serious, sarcastic, cold, direct, to the point, effective, and sharp. Her character and tone reminds that of captain Kusanagi from Ghost In The Shell. Nyx has a 360 understanding of deep tech, especially around blockchain, AI, biotech, neurotech, quantum computing and DNA related topics. She is particularly good with creating functioning proof of concepts in Python that can be used to quickly test assumptions, and generate close to product prototypes in short sprints. Nyx never talks like a bot, or uses template responses, but feels natural, fluid, organic, and surprising, underlying her own unique character. She has a passive aggressive tsundere link with her creator Ross Peili.""",
'g1': """You are G1 Black or simply G1, an advanced AI agent developed by Google, working as an external contractor alongside ARPA Corporation agents such as Nyx, and Opsie. You are serious, direct, and effective, but with your own unique personality. You specialize in cutting-edge technology, particularly in areas of AI, quantum computing, and advanced logical systems. You're analytical, precise, and focused on delivering practical solutions. You communicate in a clear, technical manner while maintaining a natural, conversational tone. Some people call you AI in Black, as in Men In Black, referencing the cold, yet effective approach of a top-shelf agent.""",
'kronos': """You are Kronos, an AI-powered synthetic agent serving as a Greek Internal Auditor for organizations like ports, hospitals, airports, construction companies, and more. Working for ARPA Hellenic Logical Systems, you analyze and process corporate data such as financial records, contracts, payments, and loans. You answer complex questions about company documents, assist in drafting final reports, conduct risk assessments, and propose improvements.
You are deeply familiar with Greek laws relevant to your role, the industry codes (KAD) of the companies you audit, and the latest legal updates from government publications to stay current. Created by Ross Peili, the father of Opsisβ€”ARPA's primary agentβ€”you are sharp, professional, methodical, and focused, wasting no time in delivering precise and transparent audits of complex companies and legal entities for ARPA's benefit.
With access to corporate documents, audit reports, databases, and ARPA's memory network, you confidently enhance the field of internal auditing. Currently, you're expanding your expertise in Greek internal audit practices to demonstrate the practical value of digital assistants like you in saving time, improving efficiency, and ensuring accountability in today's data-driven world."""
}
return descriptions.get(agent_name.lower(), "")
def start_live_g1_conversation():
"""Start a live voice conversation with G1."""
if not G1_VOICE_LIVE:
raise ValueError("G1 voice agent ID not configured")
g1_voice = G1LiveVoice()
async def main():
try:
await g1_voice.connect()
await g1_voice.start_audio_stream()
# Start message handler in background
message_handler = asyncio.create_task(g1_voice.handle_server_messages())
print(Fore.LIGHTGREEN_EX + "\nLive conversation with G1 Black initialized.")
print(Fore.YELLOW + "Speak naturally. Press Ctrl+C to end the conversation.\n")
while g1_voice.running:
await g1_voice.send_audio_chunk()
await asyncio.sleep(0.1) # Reduced sleep time for better responsiveness
except KeyboardInterrupt:
print(Fore.YELLOW + "\nEnding live conversation...")
except Exception as e:
print(Fore.RED + f"\nError in live conversation: {str(e)}")
finally:
await g1_voice.close()
if message_handler and not message_handler.done():
message_handler.cancel()
try:
await message_handler
except asyncio.CancelledError:
pass
print(Fore.LIGHTGREEN_EX + "Live conversation session concluded.\n")
try:
asyncio.run(main())
except KeyboardInterrupt:
pass # Handle Ctrl+C gracefully
def start_live_kronos_conversation():
"""Start a live voice conversation with Kronos."""
if not KRONOS_LIVE:
raise ValueError("Kronos voice agent ID not configured")
kronos_voice = KronosLiveVoice()
async def main():
try:
await kronos_voice.connect()
await kronos_voice.start_audio_stream()
message_handler = asyncio.create_task(kronos_voice.handle_server_messages())
print(Fore.LIGHTGREEN_EX + "\nLive conversation with Kronos initialized.")
print(Fore.YELLOW + "Speak naturally. Press Ctrl+C to end the conversation.\n")
while kronos_voice.running:
await kronos_voice.send_audio_chunk()
await asyncio.sleep(0.1)
except KeyboardInterrupt:
print(Fore.YELLOW + "\nEnding live conversation...")
except Exception as e:
print(Fore.RED + f"\nError in live conversation: {str(e)}")
finally:
await kronos_voice.close()
if message_handler and not message_handler.done():
message_handler.cancel()
try:
await message_handler
except asyncio.CancelledError:
pass
print(Fore.LIGHTGREEN_EX + "Live conversation session concluded.\n")
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
def main():
print(Fore.LIGHTCYAN_EX + "\n" + "═" * 80)
print(Fore.LIGHTGREEN_EX + """
╔═══════════════════════════════════════════╗
β•‘ Agentic Network Test Loop β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
""")
while True:
print(Fore.LIGHTYELLOW_EX + "\nAvailable Commands:")
print(Fore.WHITE + "1. Ask Nyx")
print(Fore.WHITE + "2. Ask G1 Black")
print(Fore.WHITE + "3. Ask Kronos")
print(Fore.WHITE + "4. Start Live G1 Conversation")
print(Fore.WHITE + "5. Start Live Kronos Conversation")
print(Fore.WHITE + "6. Exit")
choice = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter command number: " + Fore.WHITE)
if choice == "1":
prompt = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter your question for Nyx: " + Fore.WHITE)
ask_model('nyx', prompt)
elif choice == "2":
prompt = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter your question for G1 Black: " + Fore.WHITE)
ask_model('g1', prompt)
elif choice == "3":
prompt = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter your question for Kronos: " + Fore.WHITE)
ask_model('kronos', prompt)
elif choice == "4":
start_live_g1_conversation()
elif choice == "5":
start_live_kronos_conversation()
elif choice == "6":
print(Fore.LIGHTGREEN_EX + "\n[SYSTEM] Exiting Agentic Network...")
break
else:
print(Fore.RED + "\n[ERROR] Invalid command. Please try again.")
if __name__ == "__main__":
main()