"""Универсальный агент с поддержкой Open Interpreter""" import json import logging import urllib.request import re from datetime import datetime from typing import Dict, List, Optional, Any from .models import Role, ModelConfig from .config import API_KEY, API_BASE, HF_TOKEN from .file_manager import FILE_MANAGER from .internet_agent import INTERNET_AGENT from .notification_system import NOTIFICATIONS from .process_manager import PROCESS_MANAGER from .state import STATE from .interpreter_config import configure_interpreter_for_model class UniversalAgent: def __init__(self, role: Role, model: ModelConfig, use_interpreter: bool = False): self.role = role self.model = model self.use_interpreter = use_interpreter self.conversation_history: List[Dict[str, str]] = [] self.file_manager = FILE_MANAGER self.internet = INTERNET_AGENT self.notifications = NOTIFICATIONS self.logger = logging.getLogger(__name__) self.tools = { "search_web": self.internet.search_web, "fetch_page": self.internet.fetch_page, "analyze_website": self.internet.analyze_website, "read_file": self.file_manager.read_file, "save_file": self.file_manager.save_file, "list_files": self.file_manager.list_files, "analyze_file": self.file_manager.analyze_file, } def _add_tools_to_task(self, task: str) -> str: tools_desc = "\n\n🔧 ᴀᴠᴀɪʟᴀʙʟᴇ ᴛᴏᴏʟs(。◕‿◕。):\n" for name, func in self.tools.items(): tools_desc += f"▹ {name}: {func.__doc__ or 'No description'}\n" return task + tools_desc def execute(self, task: str, sys_prompt_override: str = None, chat_id: str = None, history: List[Dict[str, str]] = None, mode: str = "chat") -> str: start_time = datetime.now() self.logger.info(f"Executing task in {mode} mode with role {self.role.name} and model {self.model.name}") if self.use_interpreter and mode in ("skill", "build"): result = self._execute_with_interpreter(task, chat_id, mode) else: result = self._execute_with_api(task, sys_prompt_override, chat_id, history, mode) duration = (datetime.now() - start_time).total_seconds() self.logger.info(f"Task completed in {duration:.2f}s. Tokens: {len(result)} chars") return result def _execute_with_interpreter(self, task: str, chat_id: str = None, mode: str = "chat") -> str: if not configure_interpreter_for_model(self.model.name, STATE, API_KEY, API_BASE): return "▸ ⚠️ ɪɴᴛᴇʀᴘʀᴇᴛᴇʀ ᴅɪsᴀʙʟᴇᴅ\nFalling back to API..." try: from interpreter import interpreter base_instructions = interpreter.custom_instructions or "" interpreter.custom_instructions = ( f"{base_instructions}\n\n" f"🎭 ᴄᴜʀʀᴇɴᴛ ʀᴏʟᴇ\n{self.role.prompt[:1000]}" ) messages = interpreter.chat(task, display=False) interpreter.custom_instructions = base_instructions if messages and len(messages) > 0: return f"╭───────────────────╮\n │ ✅ ᴛᴀsᴋ ᴄᴏᴍᴘʟᴇᴛᴇᴅ │\n ├───────────────────┤\n{str(messages[-1].get('content', ''))[-2000:]}\n╰───────────────────╯" return "▸ ⚠️ ɪɴᴛᴇʀᴘʀᴇᴛᴇʀ: No response." except Exception as e: self.logger.error(f"Interpreter error: {e}") return f"▸ ⚠️ ɪɴᴛᴇʀᴘʀᴇᴛᴇʀ ᴇʀʀᴏʀ\n{str(e)[:500]}\nFalling back to API..." def _execute_with_api(self, task: str, sys_prompt_override: str = None, chat_id: str = None, history: List[Dict[str, str]] = None, mode: str = "chat") -> str: system_prompt = sys_prompt_override or self.role.prompt messages = [{"role": "system", "content": system_prompt}] if history: for h in history[-10:]: messages.append({"role": h.get("role", "user"), "content": h.get("content", "")}) messages.append({"role": "user", "content": task}) if self.model.provider == "hf": return self._call_hf(messages, chat_id) return self._call_openai_compatible(messages, chat_id) def _call_openai_compatible(self, messages: List[Dict], chat_id: str = None) -> str: url = f"{API_BASE}/chat/completions" data = json.dumps({ "model": self.model.endpoint, "messages": messages, "temperature": 0.7, "max_tokens": self.model.max_tokens }).encode('utf-8') headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } req = urllib.request.Request(url, data=data, headers=headers) try: with urllib.request.urlopen(req, timeout=45) as response: res = json.loads(response.read().decode('utf-8')) content = res['choices'][0]['message']['content'] return ( "╭───────────────────╮\n" " │ 🤖 ᴀɢᴇɴᴛ ʀᴇsᴘᴏɴsᴇ │\n" "├───────────────────┤\n" f"{content[:2000]}\n" "╰───────────────────╯" ) except Exception as e: self.logger.error(f"Primary API failed: {e}") return self._tiered_fallback(messages, e, chat_id) def _tiered_fallback(self, messages: List[Dict], primary_error, chat_id: str = None) -> str: if chat_id: from .telegram_utils import send_tg send_tg(chat_id, "▸ ⚠️ ᴍᴏᴅᴇʟ ᴇʀʀᴏʀ\nFallback to cheaper tier...") tried = [self.model.name] current = self.model.name while True: next_model = STATE.get_next_tier_model(current) if not next_model or next_model in tried: break tried.append(next_model) model_cfg = STATE.models.get(next_model) if not model_cfg: break try: agent = UniversalAgent(self.role, model_cfg) if model_cfg.provider == "hf": result = agent._call_hf(messages, chat_id) else: result = agent._call_openai_compatible(messages, chat_id) return ( f"{result}\n\n" "╭───────────────────╮\n" " │ ✅ ᴛɪᴇʀᴇᴅ ꜰᴀʟʟʙᴀᴄᴋ │\n" f" ├───────────────────┤\n" f" │ {next_model} │\n" "╰───────────────────╯" ) except Exception as e: current = next_model return self._fallback_hf(messages, primary_error, chat_id, tried)