PinkSky / server /conductor_engine.py
FreshPixels's picture
Rename conductor_engine.py to server/conductor_engine.py
45865b9 verified
Raw
History Blame Contribute Delete
7.8 kB
"""Conductor Engine — оркестрация Build режима"""
import threading
import json
import re
from typing import Dict, List, Optional, Any
from .models import Role, Conductor
from .universal_agent import UniversalAgent
from .state import STATE
from .process_manager import PROCESS_MANAGER
from .notification_system import NOTIFICATIONS
class ConductorEngine:
def __init__(self, state):
self.state = state
self.use_interpreter = state.build_context.get("use_interpreter", True)
self.notifications = NOTIFICATIONS
def orchestrate(self, user_request: str, chat_id: str = None, file_context: str = "",
build_params: Dict[str, Any] = None) -> str:
conductor = self.state.conductors.get(self.state.current_conductor, self.state.conductors["default"])
plan = self._get_plan(conductor, user_request, file_context, chat_id)
if not plan:
return "❌ Conductor не смог создать план."
results = self._execute_plan(plan, chat_id)
if len(results) == 1:
return results[0]
return self._synthesize(conductor, user_request, results, chat_id)
def _get_plan(self, conductor: Conductor, user_request: str, file_context: str, chat_id: str = None) -> Optional[Dict]:
full_prompt = f"{user_request}\n\n{file_context}".strip()
rank_by = conductor.auto_rank_by
if conductor.cost_aware and rank_by == "coding":
rank_by = "balanced"
model_name = self.state.get_best_model(rank_by=rank_by, max_tier=2)
model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"])
conductor_role = Role(name="conductor", prompt=conductor.prompt, description="Internal conductor role")
agent = UniversalAgent(conductor_role, model)
roles_info = "\n".join([f"- {k}: {v.description} (complexity: {v.complexity}, preferred: {', '.join(v.preferred_models)})"
for k, v in self.state.roles.items()])
models_info = "\n".join([f"- {k}: coding_rank={v.coding_rank}, speed_rank={v.speed_rank}, reasoning_rank={v.reasoning_rank}, cost=${v.cost_per_1k_output}/1k"
for k, v in sorted(self.state.models.items(), key=lambda x: x[1].coding_rank) if k != "hf_fallback"])
plan_prompt = f"""User request: {full_prompt}
AVAILABLE ROLES:
{roles_info}
AVAILABLE MODELS (sorted by coding rank):
{models_info}
Current selection criteria: {conductor.auto_rank_by}
Cost-aware: {conductor.cost_aware}
Create execution plan."""
try:
plan_text = agent.execute(plan_prompt)
json_match = re.search(r'\{.*\}', plan_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
else:
return {
"strategy": "single",
"tasks": [{"role": self.state.current_role, "model": self.state.get_model_for_role(self.state.current_role), "prompt": full_prompt}],
"synthesis_prompt": ""
}
except Exception as e:
print(f"Planning error: {e}")
return {
"strategy": "single",
"tasks": [{"role": self.state.current_role, "model": self.state.get_model_for_role(self.state.current_role), "prompt": full_prompt}],
"synthesis_prompt": ""
}
def _execute_plan(self, plan: Dict, chat_id: str = None) -> List[str]:
tasks = plan.get("tasks", [])
strategy = plan.get("strategy", "single")
results = []
if strategy == "parallel" and len(tasks) > 1:
threads = []
result_container = {}
def run_task(idx, task):
if PROCESS_MANAGER.is_cancelled():
result_container[idx] = "Cancelled"
return
role_name = task.get("role", self.state.current_role)
model_name = task.get("model", self.state.current_model)
prompt = task.get("prompt", "")
if model_name not in self.state.models:
model_name = self.state.get_model_for_role(role_name)
role = self.state.roles.get(role_name, self.state.roles["universal"])
model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"])
use_interp = self._should_use_interpreter(role_name, prompt)
agent = UniversalAgent(role, model, use_interpreter=use_interp)
try:
result = agent.execute(prompt, chat_id=chat_id)
result_container[idx] = result
except Exception as e:
result_container[idx] = f"Task {idx} error: {e}"
for i, task in enumerate(tasks):
t = threading.Thread(target=run_task, args=(i, task))
threads.append(t)
PROCESS_MANAGER.register_thread(t)
t.start()
for t in threads:
t.join(timeout=120)
results = [result_container.get(i, "Timeout") for i in range(len(tasks))]
else:
for task in tasks:
if PROCESS_MANAGER.is_cancelled():
results.append("Cancelled")
continue
role_name = task.get("role", self.state.current_role)
model_name = task.get("model", self.state.current_model)
prompt = task.get("prompt", "")
if model_name not in self.state.models:
model_name = self.state.get_model_for_role(role_name)
role = self.state.roles.get(role_name, self.state.roles["universal"])
model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"])
use_interp = self._should_use_interpreter(role_name, prompt)
agent = UniversalAgent(role, model, use_interpreter=use_interp)
try:
result = agent.execute(prompt, chat_id=chat_id)
results.append(result)
except Exception as e:
results.append(f"Error: {e}")
return results
def _should_use_interpreter(self, role_name: str, task: str) -> bool:
code_roles = ["guru", "hacker", "sdet", "qa", "evangelist"]
if role_name in code_roles:
return True
code_keywords = ["код", "напиши", "создай", "файл", "исполни", "запусти", "отладить", "исправить", "проверить", "тест"]
if any(kw in task.lower() for kw in code_keywords):
return True
if "тест" in task.lower() or "проверк" in task.lower():
return True
return False
def _synthesize(self, conductor: Conductor, original_request: str, results: List[str], chat_id: str = None) -> str:
synthesis_prompt = conductor.prompt + f"""
Synthesize multiple agent results into a single answer.
Original request: {original_request}
Agent results:
"""
for i, res in enumerate(results):
synthesis_prompt += f"\n--- Agent {i+1} result ---\n{res}\n"
synthesis_role = Role(name="synthesizer", prompt="You synthesize agent results.", description="Synthesizer")
synth_model_name = self.state.get_best_model(rank_by="reasoning", max_tier=2)
model = self.state.models.get(synth_model_name, self.state.models["deepseek-v4-pro"])
agent = UniversalAgent(synthesis_role, model)
try:
return agent.execute(synthesis_prompt, chat_id=chat_id)
except Exception as e:
return "\n\n---\n".join(results)