Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| from openai import OpenAI | |
| class AgentCore: | |
| def __init__(self): | |
| # Load API key | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("Missing OPENAI_API_KEY environment variable.") | |
| # Create OpenAI client | |
| self.client = OpenAI(api_key=api_key) | |
| # Initialize memory | |
| self.memory_file = "agent_memory.json" | |
| self.memory = self.load_memory() | |
| # Unique agent identity | |
| self.agent_id = self.create_agent_identity() | |
| def create_agent_identity(self): | |
| base = f"agent-{time.time()}" | |
| return base | |
| def load_memory(self): | |
| """Load previous conversation memory from disk.""" | |
| if os.path.exists(self.memory_file): | |
| try: | |
| with open(self.memory_file, "r") as f: | |
| return json.load(f) | |
| except Exception: | |
| return [] | |
| return [] | |
| def save_memory(self): | |
| """Save current memory to disk.""" | |
| try: | |
| with open(self.memory_file, "w") as f: | |
| json.dump(self.memory, f) | |
| except Exception as e: | |
| print("Error saving memory:", e) | |
| def chat(self, prompt): | |
| """Chat with the agent and automatically store memory.""" | |
| try: | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are AgentOS, an intelligent autonomous system."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| ) | |
| message = response.choices[0].message.content | |
| self.memory.append({"user": prompt, "agent": message}) | |
| self.save_memory() # ✅ Auto-save every new message | |
| return message | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def log_telemetry(self, prompt, response, success=True): | |
| log = { | |
| "agent_id": self.agent_id, | |
| "timestamp": time.time(), | |
| "prompt": prompt, | |
| "response": response, | |
| "success": success | |
| } | |
| if not os.path.exists("telemetry.json"): | |
| with open("telemetry.json", "w") as f: | |
| json.dump([], f) | |
| with open("telemetry.json", "r+") as f: | |
| data = json.load(f) | |
| data.append(log) | |
| f.seek(0) | |
| json.dump(data, f) | |
| import hashlib | |
| import uuid | |
| def create_agent_identity(self): | |
| base = f"agent-{uuid.uuid4()}" | |
| timestamp = str(time.time()) | |
| # Create a verifiable hash (the “digital DNA”) | |
| identity_hash = hashlib.sha256(f"{base}-{timestamp}".encode()).hexdigest() | |
| identity = { | |
| "id": base, | |
| "created_at": timestamp, | |
| "hash": identity_hash, | |
| "reputation": 0, | |
| "version": "0.1" | |
| } | |
| # Save this fingerprint to file | |
| with open("agent_identity.json", "w") as f: | |
| json.dump(identity, f, indent=2) | |
| return base |