PinkSky / server /skill_orchestrator.py
FreshPixels's picture
Rename skill_orchestrator.py to server/skill_orchestrator.py
a79d545 verified
Raw
History Blame Contribute Delete
4.34 kB
"""Skill Orchestrator — многомодельный синтез в Skill Mode"""
import threading
from typing import Dict, List
from .state import STATE
from .universal_agent import UniversalAgent
from .process_manager import PROCESS_MANAGER
from .notification_system import NOTIFICATIONS
class SkillOrchestrator:
def __init__(self, state):
self.state = state
def execute_with_agents(self, chat_id: str, text: str, file_context: str = "", agents: List[str] = None) -> str:
if agents is None:
agents = self._auto_select_agents(text)
main_result = self._execute_main(text, file_context, chat_id)
if len(agents) <= 1:
return main_result
agent_results = self._run_agents_parallel(agents, text, main_result, chat_id)
return self._synthesize_results(text, main_result, agent_results, chat_id)
def _auto_select_agents(self, text: str) -> List[str]:
text_lower = text.lower()
selected = ["universal"]
if any(k in text_lower for k in ["код", "code", "python", "файл", "script"]):
selected.extend(["guru", "hacker"])
if any(k in text_lower for k in ["тест", "test", "bug", "bug"]):
selected.extend(["qa", "sdet"])
if any(k in text_lower for k in ["архитектур", "architect", "design", "систем"]):
selected.append("architect")
if any(k in text_lower for k in ["review", "ревью", "audit", "аудит"]):
selected.extend(["techlead", "critic"])
return list(dict.fromkeys(selected))[:4]
def _execute_main(self, text: str, file_context: str, chat_id: str) -> str:
role = self.state.roles.get(self.state.current_role, self.state.roles["universal"])
model_name = self.state.get_model_for_role(self.state.current_role)
model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"])
agent = UniversalAgent(role, model, use_interpreter=True)
full_task = f"{text}\n\n{file_context}".strip()
return agent.execute(full_task, chat_id=chat_id, mode="skill")
def _run_agents_parallel(self, agents: List[str], text: str, main_result: str, chat_id: str) -> Dict[str, str]:
results = {}
threads = []
def run_agent(agent_name):
if PROCESS_MANAGER.is_cancelled():
results[agent_name] = "Cancelled"
return
role = self.state.roles.get(agent_name, self.state.roles["universal"])
model_name = self.state.get_model_for_role(agent_name)
model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"])
agent = UniversalAgent(role, model)
prompt = f"Original task: {text}\n\nMain agent result: {main_result[:2000]}\n\nProvide your specialized perspective as {agent_name}."
try:
results[agent_name] = agent.execute(prompt, chat_id=chat_id, mode="skill")
except Exception as e:
results[agent_name] = f"Error: {e}"
for name in agents[1:]:
t = threading.Thread(target=run_agent, args=(name,))
threads.append(t)
PROCESS_MANAGER.register_thread(t)
t.start()
for t in threads:
t.join(timeout=90)
return results
def _synthesize_results(self, text: str, main_result: str, agent_results: Dict[str, str], chat_id: str) -> str:
synthesis = f"Synthesize results for: {text}\n\nMain result:\n{main_result[:1500]}\n\nAdditional perspectives:\n"
for name, result in agent_results.items():
synthesis += f"\n--- {name} ---\n{result[:1000]}\n"
synth_role = self.state.roles.get("universal", list(self.state.roles.values())[0])
model_name = self.state.get_best_model(rank_by="reasoning", max_tier=2)
model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"])
agent = UniversalAgent(synth_role, model)
try:
return agent.execute(synthesis, chat_id=chat_id, mode="skill")
except Exception as e:
return main_result + "\n\n--- Additional ---\n" + "\n".join(f"{k}: {v[:500]}" for k, v in agent_results.items())