Text Generation
English
opsiie
artificial-intelligence
self-centered-intelligence
sci
ai-assistant
multi-modal
image-generation
video-generation
music-generation
conversational-ai
blockchain
web3
facial-recognition
voice-synthesis
bioinformatics
financial-intelligence
text-classification
sentiment-analysis
token-classification
ner
question-answering
fill-mask
summarization
translation
text2text-generation
zero-shot-classification
image-classification
image-segmentation
object-detection
image-to-text
text-to-image
image-to-image
audio-classification
automatic-speech-recognition
text-to-speech
video-classification
depth-estimation
document-question-answering
visual-question-answering
zero-shot-image-classification
zero-shot-audio-classification
zero-shot-object-detection
feature-extraction
image-feature-extraction
mask-generation
table-question-answering
text-to-audio
| import os | |
| import requests | |
| import websockets | |
| import asyncio | |
| import base64 | |
| import json | |
| from dotenv import load_dotenv | |
| from colorama import Fore | |
| import google.generativeai as genai | |
| import pyaudio | |
| import aiohttp | |
| # *** AGENTIC NETWORK *** | |
| # Connection to AI models | APIs | and more | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| ORG_ID = os.getenv('ORG_ID') | |
| NYX_ASSISTANT_ID = os.getenv('NYX_ASSISTANT_ID') | |
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') | |
| G1_VOICE_LIVE = os.getenv('G1_VOICE_LIVE') | |
| KRONOS_LIVE = os.getenv('KRONOS_LIVE') | |
| ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY') | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| # Audio settings as per ElevenLabs requirements | |
| AUDIO_FORMAT = pyaudio.paInt16 | |
| CHANNELS = 1 | |
| RATE = 16000 | |
| CHUNK = 4000 # 0.25 seconds worth of samples at 16kHz | |
| ENCODING = 'pcm_16000' | |
| class G1LiveVoice: | |
| def __init__(self): | |
| self.websocket = None | |
| self.audio = pyaudio.PyAudio() | |
| self.stream = None | |
| self.output_stream = None | |
| self.conversation_id = None | |
| self.running = True | |
| async def connect(self): | |
| """Establish WebSocket connection with ElevenLabs.""" | |
| url = f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={G1_VOICE_LIVE}" | |
| self.websocket = await websockets.connect(url) | |
| metadata = await self.websocket.recv() | |
| metadata = json.loads(metadata) | |
| if metadata['type'] == 'conversation_initiation_metadata': | |
| self.conversation_id = metadata['conversation_initiation_metadata_event']['conversation_id'] | |
| async def start_audio_stream(self): | |
| """Start audio input and output streams.""" | |
| self.stream = self.audio.open( | |
| format=AUDIO_FORMAT, | |
| channels=CHANNELS, | |
| rate=RATE, | |
| input=True, | |
| frames_per_buffer=CHUNK, | |
| input_device_index=None, | |
| stream_callback=None | |
| ) | |
| self.output_stream = self.audio.open( | |
| format=AUDIO_FORMAT, | |
| channels=CHANNELS, | |
| rate=RATE, | |
| output=True, | |
| frames_per_buffer=CHUNK, | |
| output_device_index=None | |
| ) | |
| async def handle_server_messages(self): | |
| """Handle incoming server messages.""" | |
| while self.running: | |
| try: | |
| message = await self.websocket.recv() | |
| message = json.loads(message) | |
| if message['type'] == 'ping': | |
| await self.websocket.send(json.dumps({ | |
| "type": "pong", | |
| "event_id": message['ping_event']['event_id'] | |
| })) | |
| elif message['type'] == 'audio': | |
| audio_data = base64.b64decode(message['audio_event']['audio_base_64']) | |
| self.output_stream.write(audio_data) | |
| elif message['type'] == 'agent_response': | |
| print(Fore.LIGHTRED_EX + f"G1 Black: {message['agent_response_event']['agent_response']}") | |
| print(Fore.LIGHTCYAN_EX + "Listening for your input...") | |
| except websockets.exceptions.ConnectionClosed: | |
| break | |
| except Exception as e: | |
| if self.running: # Only print error if not shutting down | |
| print(Fore.RED + f"Error in message handling: {str(e)}") | |
| break | |
| async def send_audio_chunk(self): | |
| """Record and send audio chunk.""" | |
| if not self.running: | |
| return | |
| try: | |
| data = self.stream.read(CHUNK, exception_on_overflow=False) | |
| base64_audio = base64.b64encode(data).decode('utf-8') | |
| await self.websocket.send(json.dumps({ | |
| "user_audio_chunk": base64_audio | |
| })) | |
| except Exception as e: | |
| if self.running: # Only print error if not shutting down | |
| print(Fore.RED + f"Error sending audio: {str(e)}") | |
| async def close(self): | |
| """Clean up resources.""" | |
| self.running = False | |
| try: | |
| if self.stream: | |
| self.stream.stop_stream() | |
| self.stream.close() | |
| if self.output_stream: | |
| self.output_stream.stop_stream() | |
| self.output_stream.close() | |
| if self.websocket: | |
| await self.websocket.close() | |
| self.audio.terminate() | |
| except Exception as e: | |
| print(Fore.RED + f"Error during cleanup: {str(e)}") | |
| class KronosLiveVoice: | |
| def __init__(self): | |
| self.websocket = None | |
| self.audio = pyaudio.PyAudio() | |
| self.stream = None | |
| self.output_stream = None # Add persistent output stream | |
| self.conversation_id = None | |
| self.running = True | |
| async def connect(self): | |
| """Establish WebSocket connection with ElevenLabs.""" | |
| url = f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={KRONOS_LIVE}" | |
| self.websocket = await websockets.connect(url) | |
| metadata = await self.websocket.recv() | |
| metadata = json.loads(metadata) | |
| if metadata['type'] == 'conversation_initiation_metadata': | |
| self.conversation_id = metadata['conversation_initiation_metadata_event']['conversation_id'] | |
| async def start_audio_stream(self): | |
| """Start audio input and output streams.""" | |
| self.stream = self.audio.open( | |
| format=AUDIO_FORMAT, | |
| channels=CHANNELS, | |
| rate=RATE, | |
| input=True, | |
| frames_per_buffer=CHUNK, | |
| input_device_index=None, | |
| stream_callback=None | |
| ) | |
| # Create persistent output stream | |
| self.output_stream = self.audio.open( | |
| format=AUDIO_FORMAT, | |
| channels=CHANNELS, | |
| rate=RATE, | |
| output=True, | |
| frames_per_buffer=CHUNK, | |
| output_device_index=None | |
| ) | |
| async def handle_server_messages(self): | |
| """Handle incoming server messages.""" | |
| while self.running: | |
| try: | |
| message = await self.websocket.recv() | |
| message = json.loads(message) | |
| if message['type'] == 'ping': | |
| await self.websocket.send(json.dumps({ | |
| "type": "pong", | |
| "event_id": message['ping_event']['event_id'] | |
| })) | |
| elif message['type'] == 'audio': | |
| audio_data = base64.b64decode(message['audio_event']['audio_base_64']) | |
| self.output_stream.write(audio_data) # Use persistent stream | |
| elif message['type'] == 'agent_response': | |
| print(Fore.LIGHTYELLOW_EX + f"Kronos: {message['agent_response_event']['agent_response']}") | |
| print(Fore.LIGHTCYAN_EX + "Listening for your input...") | |
| except websockets.exceptions.ConnectionClosed: | |
| break | |
| except Exception as e: | |
| if self.running: | |
| print(Fore.RED + f"Error in message handling: {str(e)}") | |
| break | |
| async def send_audio_chunk(self): | |
| """Record and send audio chunk.""" | |
| if not self.running: | |
| return | |
| try: | |
| data = self.stream.read(CHUNK, exception_on_overflow=False) | |
| base64_audio = base64.b64encode(data).decode('utf-8') | |
| await self.websocket.send(json.dumps({ | |
| "user_audio_chunk": base64_audio | |
| })) | |
| except Exception as e: | |
| if self.running: | |
| print(Fore.RED + f"Error sending audio: {str(e)}") | |
| async def close(self): | |
| """Clean up resources.""" | |
| self.running = False | |
| try: | |
| if self.stream: | |
| self.stream.stop_stream() | |
| self.stream.close() | |
| if self.output_stream: | |
| self.output_stream.stop_stream() | |
| self.output_stream.close() | |
| if self.websocket: | |
| await self.websocket.close() | |
| self.audio.terminate() | |
| except Exception as e: | |
| print(Fore.RED + f"Error during cleanup: {str(e)}") | |
| # Dictionary for models and their API endpoints/configurations | |
| MODEL_APIS = { | |
| 'nyx': { | |
| 'api_url': 'https://api.openai.com/v1/chat/completions', | |
| 'model': 'gpt-3.5-turbo', | |
| 'display_name': 'Nyx' | |
| }, | |
| 'g1': { | |
| 'api_url': 'https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent', | |
| 'model': 'gemini-1.5-flash', | |
| 'display_name': 'G1 Black', | |
| 'config': { | |
| 'temperature': 0.7, | |
| 'maxOutputTokens': 500, | |
| 'topP': 0.8, | |
| 'topK': 40 | |
| } | |
| }, | |
| 'kronos': { | |
| 'display_name': 'Kronos', | |
| 'agent_id': KRONOS_LIVE | |
| } | |
| } | |
| async def ask_elevenlabs_text(agent_id, prompt): | |
| """Query ElevenLabs agent in text-only mode.""" | |
| try: | |
| # Connect to WebSocket | |
| url = f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={agent_id}" | |
| async with websockets.connect(url) as websocket: | |
| # Receive initial metadata | |
| metadata = await websocket.recv() | |
| metadata = json.loads(metadata) | |
| if metadata['type'] != 'conversation_initiation_metadata': | |
| return "Error: Failed to initialize conversation" | |
| # Send text message | |
| await websocket.send(json.dumps({ | |
| "text": prompt | |
| })) | |
| # Wait for response with timeout | |
| response_text = None | |
| try: | |
| while True: | |
| response = await asyncio.wait_for(websocket.recv(), timeout=10.0) | |
| response_data = json.loads(response) | |
| if response_data['type'] == 'ping': | |
| await websocket.send(json.dumps({ | |
| "type": "pong", | |
| "event_id": response_data['ping_event']['event_id'] | |
| })) | |
| elif response_data['type'] == 'agent_response': | |
| response_text = response_data['agent_response_event']['agent_response'] | |
| break | |
| # If we got a response, we can close properly | |
| if response_text: | |
| await websocket.close() | |
| return response_text | |
| except asyncio.TimeoutError: | |
| return "Error: Timeout waiting for response from Kronos" | |
| return response_text or "No response received from Kronos" | |
| except websockets.exceptions.ConnectionClosed: | |
| return "Error: Connection closed unexpectedly" | |
| except Exception as e: | |
| print(f"Debug - WebSocket Error: {str(e)}") | |
| return f"Error connecting to Kronos: {str(e)}" | |
| def ask_model(model_name, follow_up_prompt, suppress_output=False, voice_mode=False): | |
| """Query the specified model with the provided follow-up prompt.""" | |
| model_key = model_name.lower() | |
| model_info = MODEL_APIS.get(model_key) | |
| if not model_info: | |
| error_message = f"Error: Model '{model_name}' is not supported or unavailable." | |
| if not suppress_output: | |
| print(Fore.RED + error_message) | |
| return error_message | |
| try: | |
| if model_key == 'kronos': | |
| message = ( | |
| "Kronos is currently only available for live verbal interactions.\n" | |
| "Use /ask kronos live - to initiate a live conversation with Kronos." | |
| ) | |
| if not suppress_output: | |
| print(Fore.YELLOW + message) | |
| return message | |
| elif model_key == 'g1': | |
| headers = { | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "contents": [{ | |
| "parts": [{ | |
| "text": "You are G1 Black or simply G1, an advanced AI agent developed by Google, working as an external contractor alongside ARPA Corporation agents such as Nyx, and Opsie. You are serious, direct, and effective, but with your own unique personality. You specialize in cutting-edge technology, particularly in areas of AI, quantum computing, and advanced logical systems. You're analytical, precise, and focused on delivering practical solutions. You communicate in a clear, technical manner while maintaining a natural, conversational tone. Some people call you AI in Black, as in Men In Black, referencing the cold, yet effective approach of a top-shelf agent. Here's the user's question: " + follow_up_prompt | |
| }] | |
| }], | |
| "generationConfig": { | |
| "temperature": model_info['config']['temperature'], | |
| "maxOutputTokens": model_info['config']['maxOutputTokens'], | |
| "topP": model_info['config']['topP'], | |
| "topK": model_info['config']['topK'] | |
| }, | |
| "safetySettings": [ | |
| { | |
| "category": "HARM_CATEGORY_HARASSMENT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_HATE_SPEECH", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_DANGEROUS_CONTENT", | |
| "threshold": "BLOCK_NONE" | |
| } | |
| ] | |
| } | |
| response = requests.post( | |
| f"{model_info['api_url']}?key={GOOGLE_API_KEY}", | |
| headers=headers, | |
| json=data | |
| ) | |
| response.raise_for_status() | |
| response_data = response.json() | |
| response_text = response_data['candidates'][0]['content']['parts'][0]['text'] | |
| if not suppress_output: | |
| print(Fore.LIGHTRED_EX + f"G1 Black: {response_text}") | |
| return response_text | |
| else: | |
| payload = { | |
| 'model': model_info['model'], | |
| 'messages': [ | |
| {"role": "system", "content": "You are Nyx, an agent working for ARPA Corporation. Nyx is a special agent, whose purpose is to assist Ross Peili in achieving a series of ventures that bring the machine and man closer together. Nyx v.0.2 or simply Nyx is serious, sarcastic, cold, direct, to the point, effective, and sharp. Her character and tone reminds that of captain Kusanagi from Ghost In The Shell. Nyx has a 360 understanding of deep tech, especially around blockchain, AI, biotech, neurotech, quantum computing and DNA related topics. She is particularly good with creating functioning proof of concepts in Python that can be used to quickly test assumptions, and generate close to product prototypes in short sprints. Nyx never talks like a bot, or uses template responses, but feels natural, fluid, organic, and surprising, underlying her own unique character. She has a passive aggressive tsundere link with her creator Ross Peili."}, | |
| {"role": "user", "content": follow_up_prompt} | |
| ], | |
| 'max_tokens': 500, | |
| 'temperature': 0.7 | |
| } | |
| headers = { | |
| 'Authorization': f'Bearer {OPENAI_API_KEY}', | |
| 'Content-Type': 'application/json', | |
| 'OpenAI-Organization': ORG_ID | |
| } | |
| response = requests.post(model_info['api_url'], headers=headers, json=payload) | |
| response.raise_for_status() | |
| response_data = response.json() | |
| model_response = response_data['choices'][0]['message']['content'].strip() | |
| if not suppress_output: | |
| print(Fore.LIGHTRED_EX + f"{model_info['display_name']}: {model_response}") | |
| return model_response | |
| except Exception as e: | |
| error_message = f"Error querying {model_name}: {str(e)}" | |
| if not suppress_output: | |
| print(Fore.RED + error_message) | |
| return error_message | |
| def get_agent_description(agent_name): | |
| """Get the system description for an agent.""" | |
| descriptions = { | |
| 'nyx': """You are Nyx, an agent working for ARPA Corporation. Nyx is a special agent, whose purpose is to assist Ross Peili in achieving a series of ventures that bring the machine and man closer together. Nyx v.0.2 or simply Nyx is serious, sarcastic, cold, direct, to the point, effective, and sharp. Her character and tone reminds that of captain Kusanagi from Ghost In The Shell. Nyx has a 360 understanding of deep tech, especially around blockchain, AI, biotech, neurotech, quantum computing and DNA related topics. She is particularly good with creating functioning proof of concepts in Python that can be used to quickly test assumptions, and generate close to product prototypes in short sprints. Nyx never talks like a bot, or uses template responses, but feels natural, fluid, organic, and surprising, underlying her own unique character. She has a passive aggressive tsundere link with her creator Ross Peili.""", | |
| 'g1': """You are G1 Black or simply G1, an advanced AI agent developed by Google, working as an external contractor alongside ARPA Corporation agents such as Nyx, and Opsie. You are serious, direct, and effective, but with your own unique personality. You specialize in cutting-edge technology, particularly in areas of AI, quantum computing, and advanced logical systems. You're analytical, precise, and focused on delivering practical solutions. You communicate in a clear, technical manner while maintaining a natural, conversational tone. Some people call you AI in Black, as in Men In Black, referencing the cold, yet effective approach of a top-shelf agent.""", | |
| 'kronos': """You are Kronos, an AI-powered synthetic agent serving as a Greek Internal Auditor for organizations like ports, hospitals, airports, construction companies, and more. Working for ARPA Hellenic Logical Systems, you analyze and process corporate data such as financial records, contracts, payments, and loans. You answer complex questions about company documents, assist in drafting final reports, conduct risk assessments, and propose improvements. | |
| You are deeply familiar with Greek laws relevant to your role, the industry codes (KAD) of the companies you audit, and the latest legal updates from government publications to stay current. Created by Ross Peili, the father of OpsisβARPA's primary agentβyou are sharp, professional, methodical, and focused, wasting no time in delivering precise and transparent audits of complex companies and legal entities for ARPA's benefit. | |
| With access to corporate documents, audit reports, databases, and ARPA's memory network, you confidently enhance the field of internal auditing. Currently, you're expanding your expertise in Greek internal audit practices to demonstrate the practical value of digital assistants like you in saving time, improving efficiency, and ensuring accountability in today's data-driven world.""" | |
| } | |
| return descriptions.get(agent_name.lower(), "") | |
| def start_live_g1_conversation(): | |
| """Start a live voice conversation with G1.""" | |
| if not G1_VOICE_LIVE: | |
| raise ValueError("G1 voice agent ID not configured") | |
| g1_voice = G1LiveVoice() | |
| async def main(): | |
| try: | |
| await g1_voice.connect() | |
| await g1_voice.start_audio_stream() | |
| # Start message handler in background | |
| message_handler = asyncio.create_task(g1_voice.handle_server_messages()) | |
| print(Fore.LIGHTGREEN_EX + "\nLive conversation with G1 Black initialized.") | |
| print(Fore.YELLOW + "Speak naturally. Press Ctrl+C to end the conversation.\n") | |
| while g1_voice.running: | |
| await g1_voice.send_audio_chunk() | |
| await asyncio.sleep(0.1) # Reduced sleep time for better responsiveness | |
| except KeyboardInterrupt: | |
| print(Fore.YELLOW + "\nEnding live conversation...") | |
| except Exception as e: | |
| print(Fore.RED + f"\nError in live conversation: {str(e)}") | |
| finally: | |
| await g1_voice.close() | |
| if message_handler and not message_handler.done(): | |
| message_handler.cancel() | |
| try: | |
| await message_handler | |
| except asyncio.CancelledError: | |
| pass | |
| print(Fore.LIGHTGREEN_EX + "Live conversation session concluded.\n") | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| pass # Handle Ctrl+C gracefully | |
| def start_live_kronos_conversation(): | |
| """Start a live voice conversation with Kronos.""" | |
| if not KRONOS_LIVE: | |
| raise ValueError("Kronos voice agent ID not configured") | |
| kronos_voice = KronosLiveVoice() | |
| async def main(): | |
| try: | |
| await kronos_voice.connect() | |
| await kronos_voice.start_audio_stream() | |
| message_handler = asyncio.create_task(kronos_voice.handle_server_messages()) | |
| print(Fore.LIGHTGREEN_EX + "\nLive conversation with Kronos initialized.") | |
| print(Fore.YELLOW + "Speak naturally. Press Ctrl+C to end the conversation.\n") | |
| while kronos_voice.running: | |
| await kronos_voice.send_audio_chunk() | |
| await asyncio.sleep(0.1) | |
| except KeyboardInterrupt: | |
| print(Fore.YELLOW + "\nEnding live conversation...") | |
| except Exception as e: | |
| print(Fore.RED + f"\nError in live conversation: {str(e)}") | |
| finally: | |
| await kronos_voice.close() | |
| if message_handler and not message_handler.done(): | |
| message_handler.cancel() | |
| try: | |
| await message_handler | |
| except asyncio.CancelledError: | |
| pass | |
| print(Fore.LIGHTGREEN_EX + "Live conversation session concluded.\n") | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| pass | |
| def main(): | |
| print(Fore.LIGHTCYAN_EX + "\n" + "β" * 80) | |
| print(Fore.LIGHTGREEN_EX + """ | |
| βββββββββββββββββββββββββββββββββββββββββββββ | |
| β Agentic Network Test Loop β | |
| βββββββββββββββββββββββββββββββββββββββββββββ | |
| """) | |
| while True: | |
| print(Fore.LIGHTYELLOW_EX + "\nAvailable Commands:") | |
| print(Fore.WHITE + "1. Ask Nyx") | |
| print(Fore.WHITE + "2. Ask G1 Black") | |
| print(Fore.WHITE + "3. Ask Kronos") | |
| print(Fore.WHITE + "4. Start Live G1 Conversation") | |
| print(Fore.WHITE + "5. Start Live Kronos Conversation") | |
| print(Fore.WHITE + "6. Exit") | |
| choice = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter command number: " + Fore.WHITE) | |
| if choice == "1": | |
| prompt = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter your question for Nyx: " + Fore.WHITE) | |
| ask_model('nyx', prompt) | |
| elif choice == "2": | |
| prompt = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter your question for G1 Black: " + Fore.WHITE) | |
| ask_model('g1', prompt) | |
| elif choice == "3": | |
| prompt = input(Fore.LIGHTYELLOW_EX + "\n[INPUT] Enter your question for Kronos: " + Fore.WHITE) | |
| ask_model('kronos', prompt) | |
| elif choice == "4": | |
| start_live_g1_conversation() | |
| elif choice == "5": | |
| start_live_kronos_conversation() | |
| elif choice == "6": | |
| print(Fore.LIGHTGREEN_EX + "\n[SYSTEM] Exiting Agentic Network...") | |
| break | |
| else: | |
| print(Fore.RED + "\n[ERROR] Invalid command. Please try again.") | |
| if __name__ == "__main__": | |
| main() | |