AventraOSCore / agentos_core.py
Tpayne101's picture
Update agentos_core.py
aa8969c verified
import os
import json
import time
from openai import OpenAI
class AgentCore:
def __init__(self):
# Load API key
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("Missing OPENAI_API_KEY environment variable.")
# Create OpenAI client
self.client = OpenAI(api_key=api_key)
# Initialize memory
self.memory_file = "agent_memory.json"
self.memory = self.load_memory()
# Unique agent identity
self.agent_id = self.create_agent_identity()
def create_agent_identity(self):
base = f"agent-{time.time()}"
return base
def load_memory(self):
"""Load previous conversation memory from disk."""
if os.path.exists(self.memory_file):
try:
with open(self.memory_file, "r") as f:
return json.load(f)
except Exception:
return []
return []
def save_memory(self):
"""Save current memory to disk."""
try:
with open(self.memory_file, "w") as f:
json.dump(self.memory, f)
except Exception as e:
print("Error saving memory:", e)
def chat(self, prompt):
"""Chat with the agent and automatically store memory."""
try:
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are AgentOS, an intelligent autonomous system."},
{"role": "user", "content": prompt},
],
)
message = response.choices[0].message.content
self.memory.append({"user": prompt, "agent": message})
self.save_memory() # ✅ Auto-save every new message
return message
except Exception as e:
return f"Error: {str(e)}"
def log_telemetry(self, prompt, response, success=True):
log = {
"agent_id": self.agent_id,
"timestamp": time.time(),
"prompt": prompt,
"response": response,
"success": success
}
if not os.path.exists("telemetry.json"):
with open("telemetry.json", "w") as f:
json.dump([], f)
with open("telemetry.json", "r+") as f:
data = json.load(f)
data.append(log)
f.seek(0)
json.dump(data, f)
import hashlib
import uuid
def create_agent_identity(self):
base = f"agent-{uuid.uuid4()}"
timestamp = str(time.time())
# Create a verifiable hash (the “digital DNA”)
identity_hash = hashlib.sha256(f"{base}-{timestamp}".encode()).hexdigest()
identity = {
"id": base,
"created_at": timestamp,
"hash": identity_hash,
"reputation": 0,
"version": "0.1"
}
# Save this fingerprint to file
with open("agent_identity.json", "w") as f:
json.dump(identity, f, indent=2)
return base