# AI Workflow Agent - Decision Agent (Core Brain) """ Decision Agent using LLM (Gemini 2.5 Flash or Ollama) Analyzes user queries and decides: - n8n automation - ComfyUI generative workflow - Hybrid (n8n + ComfyUI) - External repo project """ import httpx import json import logging from typing import Dict, Any, Optional, List from config import settings, ProjectType, CLASSIFICATION_KEYWORDS logger = logging.getLogger(__name__) class DecisionAgent: """ Core decision-making agent that analyzes user queries and determines the appropriate workflow type. """ def __init__(self): self.llm_provider = settings.LLM_PROVIDER self.gemini_api_key = settings.GEMINI_API_KEY self.ollama_host = settings.OLLAMA_HOST self.ollama_model = settings.OLLAMA_MODEL self.client = httpx.AsyncClient(timeout=120.0) async def check_ollama_health(self) -> str: """Check if Ollama is running and responsive.""" try: response = await self.client.get(f"{self.ollama_host}/api/tags") if response.status_code == 200: return "healthy" return "unhealthy" except Exception as e: logger.debug(f"Ollama health check failed: {e}") return "unreachable" async def ensure_model_available(self) -> bool: """Ensure the required model is available in Ollama.""" try: # Check if model exists response = await self.client.get(f"{self.ollama_host}/api/tags") if response.status_code == 200: models = response.json().get("models", []) model_names = [m.get("name", "") for m in models] if self.model not in model_names and f"{self.model}:latest" not in model_names: logger.info(f"Pulling model {self.model}...") # Pull the model pull_response = await self.client.post( f"{self.ollama_host}/api/pull", json={"name": self.model}, timeout=600.0 # 10 minutes for large models ) return pull_response.status_code == 200 return True return False except Exception as e: logger.debug(f"Model check failed: {e}") return False async def analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Analyze user query and determine project type. STRATEGY: Respect user's LLM toggle from context - If context says use_llm=True → Use Gemini (if available) - If context says use_llm=False → Use keywords only - If no context → Use Gemini (if available) Returns: Dict with project_type, confidence, explanation, suggested_tools, next_steps """ # Check if LLM is explicitly disabled in context use_llm = True # Default to LLM if context and isinstance(context, dict): use_llm = context.get('use_llm', True) # Respect the toggle # PRIMARY: Try LLM if enabled and available if use_llm and self.gemini_api_key: llm_result = await self._llm_analyze(query, context) # If Gemini succeeded, use it if llm_result["project_type"] != ProjectType.UNKNOWN: final_result = llm_result final_result["classification_method"] = "Gemini 2.5 Flash (LLM)" logger.info(f"Decision (Gemini): {final_result['project_type']} (confidence: {final_result['confidence']:.2f})") final_result["suggested_tools"] = self._get_suggested_tools(final_result["project_type"]) final_result["next_steps"] = self._get_next_steps(final_result["project_type"], query) return final_result # FALLBACK: Use keywords when LLM disabled or unavailable keyword_result = self._keyword_classify(query) keyword_result["classification_method"] = "Keyword Matching (Fallback)" logger.info(f"Decision (Keywords): {keyword_result['project_type']} (confidence: {keyword_result['confidence']:.2f})") keyword_result["suggested_tools"] = self._get_suggested_tools(keyword_result["project_type"]) keyword_result["next_steps"] = self._get_next_steps(keyword_result["project_type"], query) return keyword_result def _keyword_classify(self, query: str) -> Dict[str, Any]: """Fast keyword-based classification (FALLBACK ONLY). Logic: - If both COMFYUI + N8N keywords present → HYBRID - Otherwise pick highest scoring type """ query_lower = query.lower() scores = {} for project_type, keywords in CLASSIFICATION_KEYWORDS.items(): score = sum(1 for kw in keywords if kw in query_lower) scores[project_type] = score # Check for HYBRID: both image generation AND automation keywords present comfyui_score = scores.get(ProjectType.COMFYUI, 0) n8n_score = scores.get(ProjectType.N8N, 0) # If both types have keywords, it's HYBRID if comfyui_score >= 1 and n8n_score >= 1: confidence = min((comfyui_score + n8n_score) / 3, 1.0) return { "project_type": ProjectType.HYBRID, "confidence": confidence, "explanation": "Detected hybrid automation + AI task." } # Otherwise use highest score if not scores or max(scores.values()) == 0: return { "project_type": ProjectType.N8N, "confidence": 0.3, "explanation": "Default to n8n automation. Tell me more about what you want to automate." } best_type = max(scores, key=scores.get) max_score = scores[best_type] confidence = min(max_score / 3, 1.0) explanations = { ProjectType.N8N: "Detected automation task.", ProjectType.COMFYUI: "Detected image generation task.", ProjectType.HYBRID: "Detected hybrid automation + AI task.", ProjectType.EXTERNAL_REPO: "Detected GitHub repository request." } return { "project_type": best_type, "confidence": confidence, "explanation": explanations.get(best_type, f"Detected {best_type}.") } async def _llm_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """LLM-based analysis using configured provider (Gemini or Ollama).""" if self.llm_provider == "gemini" and self.gemini_api_key: return await self._gemini_analyze(query, context) else: return await self._ollama_analyze(query, context) async def _gemini_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Analyze using Google Gemini 2.5 Flash API. Use PRINCIPLE-BASED reasoning, not example-based. This way the LLM handles edge cases intelligently without constant prompt tweaking. """ system_prompt = """You are a project classification assistant. Classify user requests into project types based on PRINCIPLES, not examples. PRINCIPLES: - **n8n**: Automation, workflows, data processing. Handles logic/connections/scheduling. (CSV reports are DATA, not images) - **comfyui**: Visual generation. Creates images/photos/artwork using AI. (Image generation ONLY) - **hybrid**: Both automation AND image generation in one workflow. (Must involve creating images AND automating something) - **external_repo**: User wants to download/use existing code from GitHub KEY DISTINCTION: - "Report" = data document (n8n) - "Image/photo" = visual content (comfyui) - Both = hybrid Respond ONLY with valid JSON: {"project_type": "n8n" | "comfyui" | "hybrid" | "external_repo" | "unknown", "confidence": 0.0-1.0, "explanation": "..."}""" user_prompt = f'Classify: "{query}"' try: response = await self.client.post( "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent", headers={"Content-Type": "application/json"}, json={ "contents": [{ "parts": [{ "text": f"{system_prompt}\n\n{user_prompt}" }] }] }, params={"key": self.gemini_api_key} ) if response.status_code == 200: result = response.json() try: # Extract text from Gemini response text_content = result["candidates"][0]["content"]["parts"][0]["text"] # Try to extract JSON from the response # Sometimes Gemini wraps JSON in markdown code blocks if "```json" in text_content: json_str = text_content.split("```json")[1].split("```")[0].strip() elif "```" in text_content: json_str = text_content.split("```")[1].split("```")[0].strip() else: json_str = text_content parsed = json.loads(json_str) return { "project_type": parsed.get("project_type", ProjectType.UNKNOWN), "confidence": float(parsed.get("confidence", 0.5)), "explanation": parsed.get("explanation", "Gemini analysis") } except (json.JSONDecodeError, KeyError, IndexError) as e: logger.warning(f"Failed to parse Gemini response: {e}") return { "project_type": ProjectType.UNKNOWN, "confidence": 0.0, "explanation": "Gemini response parsing failed" } else: logger.debug(f"Gemini request failed: {response.status_code}") return { "project_type": ProjectType.UNKNOWN, "confidence": 0.0, "explanation": "Gemini request failed" } except Exception as e: logger.debug(f"Gemini analysis error: {e}") return { "project_type": ProjectType.UNKNOWN, "confidence": 0.0, "explanation": f"Gemini error: {str(e)}" } async def _ollama_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Analyze using Ollama (Qwen2.5 or other model). Use PRINCIPLE-BASED reasoning, not example-based. """ system_prompt = """You are a project classification assistant. Classify user requests into project types based on PRINCIPLES, not examples. PRINCIPLES: - **n8n**: Automation, workflows, data processing. Handles logic/connections/scheduling. (CSV reports are DATA, not images) - **comfyui**: Visual generation. Creates images/photos/artwork using AI. (Image generation ONLY) - **hybrid**: Both automation AND image generation in one workflow. (Must involve creating images AND automating something) - **external_repo**: User wants to download/use existing code from GitHub KEY DISTINCTION: - "Report" = data document (n8n) - "Image/photo" = visual content (comfyui) - Both = hybrid Respond ONLY with valid JSON: {"project_type": "n8n" | "comfyui" | "hybrid" | "external_repo" | "unknown", "confidence": 0.0-1.0, "explanation": "..."}""" user_prompt = f'Classify: "{query}"' try: response = await self.client.post( f"{self.ollama_host}/api/generate", json={ "model": self.ollama_model, "prompt": f"{system_prompt}\n\n{user_prompt}", "stream": False, "format": "json" } ) if response.status_code == 200: result = response.json() llm_response = result.get("response", "{}") # Parse JSON response try: parsed = json.loads(llm_response) return { "project_type": parsed.get("project_type", ProjectType.UNKNOWN), "confidence": float(parsed.get("confidence", 0.5)), "explanation": parsed.get("explanation", "Ollama analysis") } except json.JSONDecodeError: logger.warning(f"Failed to parse Ollama response: {llm_response}") return { "project_type": ProjectType.UNKNOWN, "confidence": 0.0, "explanation": "Ollama response parsing failed" } else: logger.debug(f"Ollama request failed: {response.status_code}") return { "project_type": ProjectType.UNKNOWN, "confidence": 0.0, "explanation": "Ollama request failed" } except Exception as e: logger.debug(f"Ollama analysis error: {e}") return { "project_type": ProjectType.UNKNOWN, "confidence": 0.0, "explanation": f"Ollama error: {str(e)}" } def _get_suggested_tools(self, project_type: str) -> List[str]: """Get suggested tools based on project type.""" tools_map = { ProjectType.N8N: [ "n8n_builder - Generate workflow JSON", "n8n_deploy - Deploy to n8n instance", "webhook_trigger - Setup webhook triggers" ], ProjectType.COMFYUI: [ "comfyui_builder - Generate workflow graph", "comfyui_execute - Run workflow", "model_download - Download required models" ], ProjectType.HYBRID: [ "n8n_builder - Generate automation workflow", "comfyui_builder - Generate AI workflow", "api_connector - Connect n8n to ComfyUI" ], ProjectType.EXTERNAL_REPO: [ "github_search - Find relevant repositories", "docker_helper - Clone and build", "error_analyzer - Fix build issues" ] } return tools_map.get(project_type, ["unknown_tool"]) def _get_next_steps(self, project_type: str, query: str) -> List[str]: """Get recommended next steps.""" steps_map = { ProjectType.N8N: [ "1. Generate workflow JSON template", "2. Customize nodes and connections", "3. Deploy to n8n instance", "4. Test with sample data" ], ProjectType.COMFYUI: [ "1. Generate ComfyUI workflow graph", "2. Check required models are installed", "3. Execute workflow", "4. Review generated output" ], ProjectType.HYBRID: [ "1. Create ComfyUI workflow for AI task", "2. Create n8n workflow for automation", "3. Connect n8n → ComfyUI via HTTP", "4. Test end-to-end pipeline" ], ProjectType.EXTERNAL_REPO: [ "1. Search GitHub for relevant projects", "2. Select best matching repository", "3. Clone and configure with Docker", "4. Validate and fix any errors" ] } return steps_map.get(project_type, ["Please provide more details"]) # ============================================ # CrewAI Integration (Advanced Mode) # ============================================ class CrewAIDecisionAgent: """ Advanced decision agent using CrewAI framework. Used for more complex multi-step reasoning. """ def __init__(self): self.simple_agent = DecisionAgent() # CrewAI setup would go here # We use simple agent for Phase 0, upgrade to CrewAI in Milestone 1 async def analyze_complex(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Complex analysis using CrewAI agents. Reserved for Milestone 1 implementation. """ # For now, delegate to simple agent return await self.simple_agent.analyze(query, context)