Spaces:
Sleeping
Sleeping
| # AI Workflow Agent - Decision Agent (Core Brain) | |
| """ | |
| Decision Agent using LLM (Gemini 2.5 Flash or Ollama) | |
| Analyzes user queries and decides: | |
| - n8n automation | |
| - ComfyUI generative workflow | |
| - Hybrid (n8n + ComfyUI) | |
| - External repo project | |
| """ | |
| import httpx | |
| import json | |
| import logging | |
| from typing import Dict, Any, Optional, List | |
| from config import settings, ProjectType, CLASSIFICATION_KEYWORDS | |
| logger = logging.getLogger(__name__) | |
| class DecisionAgent: | |
| """ | |
| Core decision-making agent that analyzes user queries | |
| and determines the appropriate workflow type. | |
| """ | |
| def __init__(self): | |
| self.llm_provider = settings.LLM_PROVIDER | |
| self.gemini_api_key = settings.GEMINI_API_KEY | |
| self.ollama_host = settings.OLLAMA_HOST | |
| self.ollama_model = settings.OLLAMA_MODEL | |
| self.client = httpx.AsyncClient(timeout=120.0) | |
| async def check_ollama_health(self) -> str: | |
| """Check if Ollama is running and responsive.""" | |
| try: | |
| response = await self.client.get(f"{self.ollama_host}/api/tags") | |
| if response.status_code == 200: | |
| return "healthy" | |
| return "unhealthy" | |
| except Exception as e: | |
| logger.debug(f"Ollama health check failed: {e}") | |
| return "unreachable" | |
| async def ensure_model_available(self) -> bool: | |
| """Ensure the required model is available in Ollama.""" | |
| try: | |
| # Check if model exists | |
| response = await self.client.get(f"{self.ollama_host}/api/tags") | |
| if response.status_code == 200: | |
| models = response.json().get("models", []) | |
| model_names = [m.get("name", "") for m in models] | |
| if self.model not in model_names and f"{self.model}:latest" not in model_names: | |
| logger.info(f"Pulling model {self.model}...") | |
| # Pull the model | |
| pull_response = await self.client.post( | |
| f"{self.ollama_host}/api/pull", | |
| json={"name": self.model}, | |
| timeout=600.0 # 10 minutes for large models | |
| ) | |
| return pull_response.status_code == 200 | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.debug(f"Model check failed: {e}") | |
| return False | |
| async def analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """ | |
| Analyze user query and determine project type. | |
| STRATEGY: Respect user's LLM toggle from context | |
| - If context says use_llm=True → Use Gemini (if available) | |
| - If context says use_llm=False → Use keywords only | |
| - If no context → Use Gemini (if available) | |
| Returns: | |
| Dict with project_type, confidence, explanation, suggested_tools, next_steps | |
| """ | |
| # Check if LLM is explicitly disabled in context | |
| use_llm = True # Default to LLM | |
| if context and isinstance(context, dict): | |
| use_llm = context.get('use_llm', True) # Respect the toggle | |
| # PRIMARY: Try LLM if enabled and available | |
| if use_llm and self.gemini_api_key: | |
| llm_result = await self._llm_analyze(query, context) | |
| # If Gemini succeeded, use it | |
| if llm_result["project_type"] != ProjectType.UNKNOWN: | |
| final_result = llm_result | |
| final_result["classification_method"] = "Gemini 2.5 Flash (LLM)" | |
| logger.info(f"Decision (Gemini): {final_result['project_type']} (confidence: {final_result['confidence']:.2f})") | |
| final_result["suggested_tools"] = self._get_suggested_tools(final_result["project_type"]) | |
| final_result["next_steps"] = self._get_next_steps(final_result["project_type"], query) | |
| return final_result | |
| # FALLBACK: Use keywords when LLM disabled or unavailable | |
| keyword_result = self._keyword_classify(query) | |
| keyword_result["classification_method"] = "Keyword Matching (Fallback)" | |
| logger.info(f"Decision (Keywords): {keyword_result['project_type']} (confidence: {keyword_result['confidence']:.2f})") | |
| keyword_result["suggested_tools"] = self._get_suggested_tools(keyword_result["project_type"]) | |
| keyword_result["next_steps"] = self._get_next_steps(keyword_result["project_type"], query) | |
| return keyword_result | |
| def _keyword_classify(self, query: str) -> Dict[str, Any]: | |
| """Fast keyword-based classification (FALLBACK ONLY). | |
| Logic: | |
| - If both COMFYUI + N8N keywords present → HYBRID | |
| - Otherwise pick highest scoring type | |
| """ | |
| query_lower = query.lower() | |
| scores = {} | |
| for project_type, keywords in CLASSIFICATION_KEYWORDS.items(): | |
| score = sum(1 for kw in keywords if kw in query_lower) | |
| scores[project_type] = score | |
| # Check for HYBRID: both image generation AND automation keywords present | |
| comfyui_score = scores.get(ProjectType.COMFYUI, 0) | |
| n8n_score = scores.get(ProjectType.N8N, 0) | |
| # If both types have keywords, it's HYBRID | |
| if comfyui_score >= 1 and n8n_score >= 1: | |
| confidence = min((comfyui_score + n8n_score) / 3, 1.0) | |
| return { | |
| "project_type": ProjectType.HYBRID, | |
| "confidence": confidence, | |
| "explanation": "Detected hybrid automation + AI task." | |
| } | |
| # Otherwise use highest score | |
| if not scores or max(scores.values()) == 0: | |
| return { | |
| "project_type": ProjectType.N8N, | |
| "confidence": 0.3, | |
| "explanation": "Default to n8n automation. Tell me more about what you want to automate." | |
| } | |
| best_type = max(scores, key=scores.get) | |
| max_score = scores[best_type] | |
| confidence = min(max_score / 3, 1.0) | |
| explanations = { | |
| ProjectType.N8N: "Detected automation task.", | |
| ProjectType.COMFYUI: "Detected image generation task.", | |
| ProjectType.HYBRID: "Detected hybrid automation + AI task.", | |
| ProjectType.EXTERNAL_REPO: "Detected GitHub repository request." | |
| } | |
| return { | |
| "project_type": best_type, | |
| "confidence": confidence, | |
| "explanation": explanations.get(best_type, f"Detected {best_type}.") | |
| } | |
| async def _llm_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """LLM-based analysis using configured provider (Gemini or Ollama).""" | |
| if self.llm_provider == "gemini" and self.gemini_api_key: | |
| return await self._gemini_analyze(query, context) | |
| else: | |
| return await self._ollama_analyze(query, context) | |
| async def _gemini_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Analyze using Google Gemini 2.5 Flash API. | |
| Use PRINCIPLE-BASED reasoning, not example-based. | |
| This way the LLM handles edge cases intelligently without constant prompt tweaking. | |
| """ | |
| system_prompt = """You are a project classification assistant. Classify user requests into project types based on PRINCIPLES, not examples. | |
| PRINCIPLES: | |
| - **n8n**: Automation, workflows, data processing. Handles logic/connections/scheduling. (CSV reports are DATA, not images) | |
| - **comfyui**: Visual generation. Creates images/photos/artwork using AI. (Image generation ONLY) | |
| - **hybrid**: Both automation AND image generation in one workflow. (Must involve creating images AND automating something) | |
| - **external_repo**: User wants to download/use existing code from GitHub | |
| KEY DISTINCTION: | |
| - "Report" = data document (n8n) | |
| - "Image/photo" = visual content (comfyui) | |
| - Both = hybrid | |
| Respond ONLY with valid JSON: | |
| {"project_type": "n8n" | "comfyui" | "hybrid" | "external_repo" | "unknown", "confidence": 0.0-1.0, "explanation": "..."}""" | |
| user_prompt = f'Classify: "{query}"' | |
| try: | |
| response = await self.client.post( | |
| "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent", | |
| headers={"Content-Type": "application/json"}, | |
| json={ | |
| "contents": [{ | |
| "parts": [{ | |
| "text": f"{system_prompt}\n\n{user_prompt}" | |
| }] | |
| }] | |
| }, | |
| params={"key": self.gemini_api_key} | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| try: | |
| # Extract text from Gemini response | |
| text_content = result["candidates"][0]["content"]["parts"][0]["text"] | |
| # Try to extract JSON from the response | |
| # Sometimes Gemini wraps JSON in markdown code blocks | |
| if "```json" in text_content: | |
| json_str = text_content.split("```json")[1].split("```")[0].strip() | |
| elif "```" in text_content: | |
| json_str = text_content.split("```")[1].split("```")[0].strip() | |
| else: | |
| json_str = text_content | |
| parsed = json.loads(json_str) | |
| return { | |
| "project_type": parsed.get("project_type", ProjectType.UNKNOWN), | |
| "confidence": float(parsed.get("confidence", 0.5)), | |
| "explanation": parsed.get("explanation", "Gemini analysis") | |
| } | |
| except (json.JSONDecodeError, KeyError, IndexError) as e: | |
| logger.warning(f"Failed to parse Gemini response: {e}") | |
| return { | |
| "project_type": ProjectType.UNKNOWN, | |
| "confidence": 0.0, | |
| "explanation": "Gemini response parsing failed" | |
| } | |
| else: | |
| logger.debug(f"Gemini request failed: {response.status_code}") | |
| return { | |
| "project_type": ProjectType.UNKNOWN, | |
| "confidence": 0.0, | |
| "explanation": "Gemini request failed" | |
| } | |
| except Exception as e: | |
| logger.debug(f"Gemini analysis error: {e}") | |
| return { | |
| "project_type": ProjectType.UNKNOWN, | |
| "confidence": 0.0, | |
| "explanation": f"Gemini error: {str(e)}" | |
| } | |
| async def _ollama_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Analyze using Ollama (Qwen2.5 or other model). | |
| Use PRINCIPLE-BASED reasoning, not example-based. | |
| """ | |
| system_prompt = """You are a project classification assistant. Classify user requests into project types based on PRINCIPLES, not examples. | |
| PRINCIPLES: | |
| - **n8n**: Automation, workflows, data processing. Handles logic/connections/scheduling. (CSV reports are DATA, not images) | |
| - **comfyui**: Visual generation. Creates images/photos/artwork using AI. (Image generation ONLY) | |
| - **hybrid**: Both automation AND image generation in one workflow. (Must involve creating images AND automating something) | |
| - **external_repo**: User wants to download/use existing code from GitHub | |
| KEY DISTINCTION: | |
| - "Report" = data document (n8n) | |
| - "Image/photo" = visual content (comfyui) | |
| - Both = hybrid | |
| Respond ONLY with valid JSON: | |
| {"project_type": "n8n" | "comfyui" | "hybrid" | "external_repo" | "unknown", "confidence": 0.0-1.0, "explanation": "..."}""" | |
| user_prompt = f'Classify: "{query}"' | |
| try: | |
| response = await self.client.post( | |
| f"{self.ollama_host}/api/generate", | |
| json={ | |
| "model": self.ollama_model, | |
| "prompt": f"{system_prompt}\n\n{user_prompt}", | |
| "stream": False, | |
| "format": "json" | |
| } | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| llm_response = result.get("response", "{}") | |
| # Parse JSON response | |
| try: | |
| parsed = json.loads(llm_response) | |
| return { | |
| "project_type": parsed.get("project_type", ProjectType.UNKNOWN), | |
| "confidence": float(parsed.get("confidence", 0.5)), | |
| "explanation": parsed.get("explanation", "Ollama analysis") | |
| } | |
| except json.JSONDecodeError: | |
| logger.warning(f"Failed to parse Ollama response: {llm_response}") | |
| return { | |
| "project_type": ProjectType.UNKNOWN, | |
| "confidence": 0.0, | |
| "explanation": "Ollama response parsing failed" | |
| } | |
| else: | |
| logger.debug(f"Ollama request failed: {response.status_code}") | |
| return { | |
| "project_type": ProjectType.UNKNOWN, | |
| "confidence": 0.0, | |
| "explanation": "Ollama request failed" | |
| } | |
| except Exception as e: | |
| logger.debug(f"Ollama analysis error: {e}") | |
| return { | |
| "project_type": ProjectType.UNKNOWN, | |
| "confidence": 0.0, | |
| "explanation": f"Ollama error: {str(e)}" | |
| } | |
| def _get_suggested_tools(self, project_type: str) -> List[str]: | |
| """Get suggested tools based on project type.""" | |
| tools_map = { | |
| ProjectType.N8N: [ | |
| "n8n_builder - Generate workflow JSON", | |
| "n8n_deploy - Deploy to n8n instance", | |
| "webhook_trigger - Setup webhook triggers" | |
| ], | |
| ProjectType.COMFYUI: [ | |
| "comfyui_builder - Generate workflow graph", | |
| "comfyui_execute - Run workflow", | |
| "model_download - Download required models" | |
| ], | |
| ProjectType.HYBRID: [ | |
| "n8n_builder - Generate automation workflow", | |
| "comfyui_builder - Generate AI workflow", | |
| "api_connector - Connect n8n to ComfyUI" | |
| ], | |
| ProjectType.EXTERNAL_REPO: [ | |
| "github_search - Find relevant repositories", | |
| "docker_helper - Clone and build", | |
| "error_analyzer - Fix build issues" | |
| ] | |
| } | |
| return tools_map.get(project_type, ["unknown_tool"]) | |
| def _get_next_steps(self, project_type: str, query: str) -> List[str]: | |
| """Get recommended next steps.""" | |
| steps_map = { | |
| ProjectType.N8N: [ | |
| "1. Generate workflow JSON template", | |
| "2. Customize nodes and connections", | |
| "3. Deploy to n8n instance", | |
| "4. Test with sample data" | |
| ], | |
| ProjectType.COMFYUI: [ | |
| "1. Generate ComfyUI workflow graph", | |
| "2. Check required models are installed", | |
| "3. Execute workflow", | |
| "4. Review generated output" | |
| ], | |
| ProjectType.HYBRID: [ | |
| "1. Create ComfyUI workflow for AI task", | |
| "2. Create n8n workflow for automation", | |
| "3. Connect n8n → ComfyUI via HTTP", | |
| "4. Test end-to-end pipeline" | |
| ], | |
| ProjectType.EXTERNAL_REPO: [ | |
| "1. Search GitHub for relevant projects", | |
| "2. Select best matching repository", | |
| "3. Clone and configure with Docker", | |
| "4. Validate and fix any errors" | |
| ] | |
| } | |
| return steps_map.get(project_type, ["Please provide more details"]) | |
| # ============================================ | |
| # CrewAI Integration (Advanced Mode) | |
| # ============================================ | |
| class CrewAIDecisionAgent: | |
| """ | |
| Advanced decision agent using CrewAI framework. | |
| Used for more complex multi-step reasoning. | |
| """ | |
| def __init__(self): | |
| self.simple_agent = DecisionAgent() | |
| # CrewAI setup would go here | |
| # We use simple agent for Phase 0, upgrade to CrewAI in Milestone 1 | |
| async def analyze_complex(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """ | |
| Complex analysis using CrewAI agents. | |
| Reserved for Milestone 1 implementation. | |
| """ | |
| # For now, delegate to simple agent | |
| return await self.simple_agent.analyze(query, context) | |