"""Conductor Engine — оркестрация Build режима""" import threading import json import re from typing import Dict, List, Optional, Any from .models import Role, Conductor from .universal_agent import UniversalAgent from .state import STATE from .process_manager import PROCESS_MANAGER from .notification_system import NOTIFICATIONS class ConductorEngine: def __init__(self, state): self.state = state self.use_interpreter = state.build_context.get("use_interpreter", True) self.notifications = NOTIFICATIONS def orchestrate(self, user_request: str, chat_id: str = None, file_context: str = "", build_params: Dict[str, Any] = None) -> str: conductor = self.state.conductors.get(self.state.current_conductor, self.state.conductors["default"]) plan = self._get_plan(conductor, user_request, file_context, chat_id) if not plan: return "❌ Conductor не смог создать план." results = self._execute_plan(plan, chat_id) if len(results) == 1: return results[0] return self._synthesize(conductor, user_request, results, chat_id) def _get_plan(self, conductor: Conductor, user_request: str, file_context: str, chat_id: str = None) -> Optional[Dict]: full_prompt = f"{user_request}\n\n{file_context}".strip() rank_by = conductor.auto_rank_by if conductor.cost_aware and rank_by == "coding": rank_by = "balanced" model_name = self.state.get_best_model(rank_by=rank_by, max_tier=2) model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"]) conductor_role = Role(name="conductor", prompt=conductor.prompt, description="Internal conductor role") agent = UniversalAgent(conductor_role, model) roles_info = "\n".join([f"- {k}: {v.description} (complexity: {v.complexity}, preferred: {', '.join(v.preferred_models)})" for k, v in self.state.roles.items()]) models_info = "\n".join([f"- {k}: coding_rank={v.coding_rank}, speed_rank={v.speed_rank}, reasoning_rank={v.reasoning_rank}, cost=${v.cost_per_1k_output}/1k" for k, v in sorted(self.state.models.items(), key=lambda x: x[1].coding_rank) if k != "hf_fallback"]) plan_prompt = f"""User request: {full_prompt} AVAILABLE ROLES: {roles_info} AVAILABLE MODELS (sorted by coding rank): {models_info} Current selection criteria: {conductor.auto_rank_by} Cost-aware: {conductor.cost_aware} Create execution plan.""" try: plan_text = agent.execute(plan_prompt) json_match = re.search(r'\{.*\}', plan_text, re.DOTALL) if json_match: return json.loads(json_match.group()) else: return { "strategy": "single", "tasks": [{"role": self.state.current_role, "model": self.state.get_model_for_role(self.state.current_role), "prompt": full_prompt}], "synthesis_prompt": "" } except Exception as e: print(f"Planning error: {e}") return { "strategy": "single", "tasks": [{"role": self.state.current_role, "model": self.state.get_model_for_role(self.state.current_role), "prompt": full_prompt}], "synthesis_prompt": "" } def _execute_plan(self, plan: Dict, chat_id: str = None) -> List[str]: tasks = plan.get("tasks", []) strategy = plan.get("strategy", "single") results = [] if strategy == "parallel" and len(tasks) > 1: threads = [] result_container = {} def run_task(idx, task): if PROCESS_MANAGER.is_cancelled(): result_container[idx] = "Cancelled" return role_name = task.get("role", self.state.current_role) model_name = task.get("model", self.state.current_model) prompt = task.get("prompt", "") if model_name not in self.state.models: model_name = self.state.get_model_for_role(role_name) role = self.state.roles.get(role_name, self.state.roles["universal"]) model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"]) use_interp = self._should_use_interpreter(role_name, prompt) agent = UniversalAgent(role, model, use_interpreter=use_interp) try: result = agent.execute(prompt, chat_id=chat_id) result_container[idx] = result except Exception as e: result_container[idx] = f"Task {idx} error: {e}" for i, task in enumerate(tasks): t = threading.Thread(target=run_task, args=(i, task)) threads.append(t) PROCESS_MANAGER.register_thread(t) t.start() for t in threads: t.join(timeout=120) results = [result_container.get(i, "Timeout") for i in range(len(tasks))] else: for task in tasks: if PROCESS_MANAGER.is_cancelled(): results.append("Cancelled") continue role_name = task.get("role", self.state.current_role) model_name = task.get("model", self.state.current_model) prompt = task.get("prompt", "") if model_name not in self.state.models: model_name = self.state.get_model_for_role(role_name) role = self.state.roles.get(role_name, self.state.roles["universal"]) model = self.state.models.get(model_name, self.state.models["deepseek-v4-pro"]) use_interp = self._should_use_interpreter(role_name, prompt) agent = UniversalAgent(role, model, use_interpreter=use_interp) try: result = agent.execute(prompt, chat_id=chat_id) results.append(result) except Exception as e: results.append(f"Error: {e}") return results def _should_use_interpreter(self, role_name: str, task: str) -> bool: code_roles = ["guru", "hacker", "sdet", "qa", "evangelist"] if role_name in code_roles: return True code_keywords = ["код", "напиши", "создай", "файл", "исполни", "запусти", "отладить", "исправить", "проверить", "тест"] if any(kw in task.lower() for kw in code_keywords): return True if "тест" in task.lower() or "проверк" in task.lower(): return True return False def _synthesize(self, conductor: Conductor, original_request: str, results: List[str], chat_id: str = None) -> str: synthesis_prompt = conductor.prompt + f""" Synthesize multiple agent results into a single answer. Original request: {original_request} Agent results: """ for i, res in enumerate(results): synthesis_prompt += f"\n--- Agent {i+1} result ---\n{res}\n" synthesis_role = Role(name="synthesizer", prompt="You synthesize agent results.", description="Synthesizer") synth_model_name = self.state.get_best_model(rank_by="reasoning", max_tier=2) model = self.state.models.get(synth_model_name, self.state.models["deepseek-v4-pro"]) agent = UniversalAgent(synthesis_role, model) try: return agent.execute(synthesis_prompt, chat_id=chat_id) except Exception as e: return "\n\n---\n".join(results)