PinkSky / server /universal_agent.py
FreshPixels's picture
Rename universal_agent.py to server/universal_agent.py
0d97442 verified
Raw
History Blame Contribute Delete
7.42 kB
"""ะฃะฝะธะฒะตั€ัะฐะปัŒะฝั‹ะน ะฐะณะตะฝั‚ ั ะฟะพะดะดะตั€ะถะบะพะน Open Interpreter"""
import json
import logging
import urllib.request
import re
from datetime import datetime
from typing import Dict, List, Optional, Any
from .models import Role, ModelConfig
from .config import API_KEY, API_BASE, HF_TOKEN
from .file_manager import FILE_MANAGER
from .internet_agent import INTERNET_AGENT
from .notification_system import NOTIFICATIONS
from .process_manager import PROCESS_MANAGER
from .state import STATE
from .interpreter_config import configure_interpreter_for_model
class UniversalAgent:
def __init__(self, role: Role, model: ModelConfig, use_interpreter: bool = False):
self.role = role
self.model = model
self.use_interpreter = use_interpreter
self.conversation_history: List[Dict[str, str]] = []
self.file_manager = FILE_MANAGER
self.internet = INTERNET_AGENT
self.notifications = NOTIFICATIONS
self.logger = logging.getLogger(__name__)
self.tools = {
"search_web": self.internet.search_web,
"fetch_page": self.internet.fetch_page,
"analyze_website": self.internet.analyze_website,
"read_file": self.file_manager.read_file,
"save_file": self.file_manager.save_file,
"list_files": self.file_manager.list_files,
"analyze_file": self.file_manager.analyze_file,
}
def _add_tools_to_task(self, task: str) -> str:
tools_desc = "\n\n๐Ÿ”ง แด€แด แด€ษชสŸแด€ส™สŸแด‡ แด›แดแดสŸs๏ผˆ๏ฝกโ—•โ€ฟโ—•๏ฝก๏ผ‰:\n"
for name, func in self.tools.items():
tools_desc += f"โ–น {name}: {func.__doc__ or 'No description'}\n"
return task + tools_desc
def execute(self, task: str, sys_prompt_override: str = None, chat_id: str = None,
history: List[Dict[str, str]] = None, mode: str = "chat") -> str:
start_time = datetime.now()
self.logger.info(f"Executing task in {mode} mode with role {self.role.name} and model {self.model.name}")
if self.use_interpreter and mode in ("skill", "build"):
result = self._execute_with_interpreter(task, chat_id, mode)
else:
result = self._execute_with_api(task, sys_prompt_override, chat_id, history, mode)
duration = (datetime.now() - start_time).total_seconds()
self.logger.info(f"Task completed in {duration:.2f}s. Tokens: {len(result)} chars")
return result
def _execute_with_interpreter(self, task: str, chat_id: str = None, mode: str = "chat") -> str:
if not configure_interpreter_for_model(self.model.name, STATE, API_KEY, API_BASE):
return "โ–ธ โš ๏ธ ษชษดแด›แด‡ส€แด˜ส€แด‡แด›แด‡ส€ แด…ษชsแด€ส™สŸแด‡แด…\nFalling back to API..."
try:
from interpreter import interpreter
base_instructions = interpreter.custom_instructions or ""
interpreter.custom_instructions = (
f"{base_instructions}\n\n"
f"๐ŸŽญ แด„แดœส€ส€แด‡ษดแด› ส€แดสŸแด‡\n{self.role.prompt[:1000]}"
)
messages = interpreter.chat(task, display=False)
interpreter.custom_instructions = base_instructions
if messages and len(messages) > 0:
return f"โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ\n โ”‚ โœ… แด›แด€sแด‹ แด„แดแดแด˜สŸแด‡แด›แด‡แด… โ”‚\n โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค\n{str(messages[-1].get('content', ''))[-2000:]}\nโ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ"
return "โ–ธ โš ๏ธ ษชษดแด›แด‡ส€แด˜ส€แด‡แด›แด‡ส€: No response."
except Exception as e:
self.logger.error(f"Interpreter error: {e}")
return f"โ–ธ โš ๏ธ ษชษดแด›แด‡ส€แด˜ส€แด‡แด›แด‡ส€ แด‡ส€ส€แดส€\n{str(e)[:500]}\nFalling back to API..."
def _execute_with_api(self, task: str, sys_prompt_override: str = None, chat_id: str = None,
history: List[Dict[str, str]] = None, mode: str = "chat") -> str:
system_prompt = sys_prompt_override or self.role.prompt
messages = [{"role": "system", "content": system_prompt}]
if history:
for h in history[-10:]:
messages.append({"role": h.get("role", "user"), "content": h.get("content", "")})
messages.append({"role": "user", "content": task})
if self.model.provider == "hf":
return self._call_hf(messages, chat_id)
return self._call_openai_compatible(messages, chat_id)
def _call_openai_compatible(self, messages: List[Dict], chat_id: str = None) -> str:
url = f"{API_BASE}/chat/completions"
data = json.dumps({
"model": self.model.endpoint,
"messages": messages,
"temperature": 0.7,
"max_tokens": self.model.max_tokens
}).encode('utf-8')
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
req = urllib.request.Request(url, data=data, headers=headers)
try:
with urllib.request.urlopen(req, timeout=45) as response:
res = json.loads(response.read().decode('utf-8'))
content = res['choices'][0]['message']['content']
return (
"โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ\n"
" โ”‚ ๐Ÿค– แด€ษขแด‡ษดแด› ส€แด‡sแด˜แดษดsแด‡ โ”‚\n"
"โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค\n"
f"{content[:2000]}\n"
"โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ"
)
except Exception as e:
self.logger.error(f"Primary API failed: {e}")
return self._tiered_fallback(messages, e, chat_id)
def _tiered_fallback(self, messages: List[Dict], primary_error, chat_id: str = None) -> str:
if chat_id:
from .telegram_utils import send_tg
send_tg(chat_id, "โ–ธ โš ๏ธ แดแดแด…แด‡สŸ แด‡ส€ส€แดส€\nFallback to cheaper tier...")
tried = [self.model.name]
current = self.model.name
while True:
next_model = STATE.get_next_tier_model(current)
if not next_model or next_model in tried:
break
tried.append(next_model)
model_cfg = STATE.models.get(next_model)
if not model_cfg:
break
try:
agent = UniversalAgent(self.role, model_cfg)
if model_cfg.provider == "hf":
result = agent._call_hf(messages, chat_id)
else:
result = agent._call_openai_compatible(messages, chat_id)
return (
f"{result}\n\n"
"โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ\n"
" โ”‚ โœ… แด›ษชแด‡ส€แด‡แด… ๊œฐแด€สŸสŸส™แด€แด„แด‹ โ”‚\n"
f" โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค\n"
f" โ”‚ {next_model} โ”‚\n"
"โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ"
)
except Exception as e:
current = next_model
return self._fallback_hf(messages, primary_error, chat_id, tried)