ai-workflow-agent / decision_agent.py
Hamza4100's picture
Upload 22 files
9c95c55 verified
# AI Workflow Agent - Decision Agent (Core Brain)
"""
Decision Agent using LLM (Gemini 2.5 Flash or Ollama)
Analyzes user queries and decides:
- n8n automation
- ComfyUI generative workflow
- Hybrid (n8n + ComfyUI)
- External repo project
"""
import httpx
import json
import logging
from typing import Dict, Any, Optional, List
from config import settings, ProjectType, CLASSIFICATION_KEYWORDS
logger = logging.getLogger(__name__)
class DecisionAgent:
"""
Core decision-making agent that analyzes user queries
and determines the appropriate workflow type.
"""
def __init__(self):
self.llm_provider = settings.LLM_PROVIDER
self.gemini_api_key = settings.GEMINI_API_KEY
self.ollama_host = settings.OLLAMA_HOST
self.ollama_model = settings.OLLAMA_MODEL
self.client = httpx.AsyncClient(timeout=120.0)
async def check_ollama_health(self) -> str:
"""Check if Ollama is running and responsive."""
try:
response = await self.client.get(f"{self.ollama_host}/api/tags")
if response.status_code == 200:
return "healthy"
return "unhealthy"
except Exception as e:
logger.debug(f"Ollama health check failed: {e}")
return "unreachable"
async def ensure_model_available(self) -> bool:
"""Ensure the required model is available in Ollama."""
try:
# Check if model exists
response = await self.client.get(f"{self.ollama_host}/api/tags")
if response.status_code == 200:
models = response.json().get("models", [])
model_names = [m.get("name", "") for m in models]
if self.model not in model_names and f"{self.model}:latest" not in model_names:
logger.info(f"Pulling model {self.model}...")
# Pull the model
pull_response = await self.client.post(
f"{self.ollama_host}/api/pull",
json={"name": self.model},
timeout=600.0 # 10 minutes for large models
)
return pull_response.status_code == 200
return True
return False
except Exception as e:
logger.debug(f"Model check failed: {e}")
return False
async def analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Analyze user query and determine project type.
STRATEGY: Respect user's LLM toggle from context
- If context says use_llm=True → Use Gemini (if available)
- If context says use_llm=False → Use keywords only
- If no context → Use Gemini (if available)
Returns:
Dict with project_type, confidence, explanation, suggested_tools, next_steps
"""
# Check if LLM is explicitly disabled in context
use_llm = True # Default to LLM
if context and isinstance(context, dict):
use_llm = context.get('use_llm', True) # Respect the toggle
# PRIMARY: Try LLM if enabled and available
if use_llm and self.gemini_api_key:
llm_result = await self._llm_analyze(query, context)
# If Gemini succeeded, use it
if llm_result["project_type"] != ProjectType.UNKNOWN:
final_result = llm_result
final_result["classification_method"] = "Gemini 2.5 Flash (LLM)"
logger.info(f"Decision (Gemini): {final_result['project_type']} (confidence: {final_result['confidence']:.2f})")
final_result["suggested_tools"] = self._get_suggested_tools(final_result["project_type"])
final_result["next_steps"] = self._get_next_steps(final_result["project_type"], query)
return final_result
# FALLBACK: Use keywords when LLM disabled or unavailable
keyword_result = self._keyword_classify(query)
keyword_result["classification_method"] = "Keyword Matching (Fallback)"
logger.info(f"Decision (Keywords): {keyword_result['project_type']} (confidence: {keyword_result['confidence']:.2f})")
keyword_result["suggested_tools"] = self._get_suggested_tools(keyword_result["project_type"])
keyword_result["next_steps"] = self._get_next_steps(keyword_result["project_type"], query)
return keyword_result
def _keyword_classify(self, query: str) -> Dict[str, Any]:
"""Fast keyword-based classification (FALLBACK ONLY).
Logic:
- If both COMFYUI + N8N keywords present → HYBRID
- Otherwise pick highest scoring type
"""
query_lower = query.lower()
scores = {}
for project_type, keywords in CLASSIFICATION_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in query_lower)
scores[project_type] = score
# Check for HYBRID: both image generation AND automation keywords present
comfyui_score = scores.get(ProjectType.COMFYUI, 0)
n8n_score = scores.get(ProjectType.N8N, 0)
# If both types have keywords, it's HYBRID
if comfyui_score >= 1 and n8n_score >= 1:
confidence = min((comfyui_score + n8n_score) / 3, 1.0)
return {
"project_type": ProjectType.HYBRID,
"confidence": confidence,
"explanation": "Detected hybrid automation + AI task."
}
# Otherwise use highest score
if not scores or max(scores.values()) == 0:
return {
"project_type": ProjectType.N8N,
"confidence": 0.3,
"explanation": "Default to n8n automation. Tell me more about what you want to automate."
}
best_type = max(scores, key=scores.get)
max_score = scores[best_type]
confidence = min(max_score / 3, 1.0)
explanations = {
ProjectType.N8N: "Detected automation task.",
ProjectType.COMFYUI: "Detected image generation task.",
ProjectType.HYBRID: "Detected hybrid automation + AI task.",
ProjectType.EXTERNAL_REPO: "Detected GitHub repository request."
}
return {
"project_type": best_type,
"confidence": confidence,
"explanation": explanations.get(best_type, f"Detected {best_type}.")
}
async def _llm_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""LLM-based analysis using configured provider (Gemini or Ollama)."""
if self.llm_provider == "gemini" and self.gemini_api_key:
return await self._gemini_analyze(query, context)
else:
return await self._ollama_analyze(query, context)
async def _gemini_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Analyze using Google Gemini 2.5 Flash API.
Use PRINCIPLE-BASED reasoning, not example-based.
This way the LLM handles edge cases intelligently without constant prompt tweaking.
"""
system_prompt = """You are a project classification assistant. Classify user requests into project types based on PRINCIPLES, not examples.
PRINCIPLES:
- **n8n**: Automation, workflows, data processing. Handles logic/connections/scheduling. (CSV reports are DATA, not images)
- **comfyui**: Visual generation. Creates images/photos/artwork using AI. (Image generation ONLY)
- **hybrid**: Both automation AND image generation in one workflow. (Must involve creating images AND automating something)
- **external_repo**: User wants to download/use existing code from GitHub
KEY DISTINCTION:
- "Report" = data document (n8n)
- "Image/photo" = visual content (comfyui)
- Both = hybrid
Respond ONLY with valid JSON:
{"project_type": "n8n" | "comfyui" | "hybrid" | "external_repo" | "unknown", "confidence": 0.0-1.0, "explanation": "..."}"""
user_prompt = f'Classify: "{query}"'
try:
response = await self.client.post(
"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent",
headers={"Content-Type": "application/json"},
json={
"contents": [{
"parts": [{
"text": f"{system_prompt}\n\n{user_prompt}"
}]
}]
},
params={"key": self.gemini_api_key}
)
if response.status_code == 200:
result = response.json()
try:
# Extract text from Gemini response
text_content = result["candidates"][0]["content"]["parts"][0]["text"]
# Try to extract JSON from the response
# Sometimes Gemini wraps JSON in markdown code blocks
if "```json" in text_content:
json_str = text_content.split("```json")[1].split("```")[0].strip()
elif "```" in text_content:
json_str = text_content.split("```")[1].split("```")[0].strip()
else:
json_str = text_content
parsed = json.loads(json_str)
return {
"project_type": parsed.get("project_type", ProjectType.UNKNOWN),
"confidence": float(parsed.get("confidence", 0.5)),
"explanation": parsed.get("explanation", "Gemini analysis")
}
except (json.JSONDecodeError, KeyError, IndexError) as e:
logger.warning(f"Failed to parse Gemini response: {e}")
return {
"project_type": ProjectType.UNKNOWN,
"confidence": 0.0,
"explanation": "Gemini response parsing failed"
}
else:
logger.debug(f"Gemini request failed: {response.status_code}")
return {
"project_type": ProjectType.UNKNOWN,
"confidence": 0.0,
"explanation": "Gemini request failed"
}
except Exception as e:
logger.debug(f"Gemini analysis error: {e}")
return {
"project_type": ProjectType.UNKNOWN,
"confidence": 0.0,
"explanation": f"Gemini error: {str(e)}"
}
async def _ollama_analyze(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Analyze using Ollama (Qwen2.5 or other model).
Use PRINCIPLE-BASED reasoning, not example-based.
"""
system_prompt = """You are a project classification assistant. Classify user requests into project types based on PRINCIPLES, not examples.
PRINCIPLES:
- **n8n**: Automation, workflows, data processing. Handles logic/connections/scheduling. (CSV reports are DATA, not images)
- **comfyui**: Visual generation. Creates images/photos/artwork using AI. (Image generation ONLY)
- **hybrid**: Both automation AND image generation in one workflow. (Must involve creating images AND automating something)
- **external_repo**: User wants to download/use existing code from GitHub
KEY DISTINCTION:
- "Report" = data document (n8n)
- "Image/photo" = visual content (comfyui)
- Both = hybrid
Respond ONLY with valid JSON:
{"project_type": "n8n" | "comfyui" | "hybrid" | "external_repo" | "unknown", "confidence": 0.0-1.0, "explanation": "..."}"""
user_prompt = f'Classify: "{query}"'
try:
response = await self.client.post(
f"{self.ollama_host}/api/generate",
json={
"model": self.ollama_model,
"prompt": f"{system_prompt}\n\n{user_prompt}",
"stream": False,
"format": "json"
}
)
if response.status_code == 200:
result = response.json()
llm_response = result.get("response", "{}")
# Parse JSON response
try:
parsed = json.loads(llm_response)
return {
"project_type": parsed.get("project_type", ProjectType.UNKNOWN),
"confidence": float(parsed.get("confidence", 0.5)),
"explanation": parsed.get("explanation", "Ollama analysis")
}
except json.JSONDecodeError:
logger.warning(f"Failed to parse Ollama response: {llm_response}")
return {
"project_type": ProjectType.UNKNOWN,
"confidence": 0.0,
"explanation": "Ollama response parsing failed"
}
else:
logger.debug(f"Ollama request failed: {response.status_code}")
return {
"project_type": ProjectType.UNKNOWN,
"confidence": 0.0,
"explanation": "Ollama request failed"
}
except Exception as e:
logger.debug(f"Ollama analysis error: {e}")
return {
"project_type": ProjectType.UNKNOWN,
"confidence": 0.0,
"explanation": f"Ollama error: {str(e)}"
}
def _get_suggested_tools(self, project_type: str) -> List[str]:
"""Get suggested tools based on project type."""
tools_map = {
ProjectType.N8N: [
"n8n_builder - Generate workflow JSON",
"n8n_deploy - Deploy to n8n instance",
"webhook_trigger - Setup webhook triggers"
],
ProjectType.COMFYUI: [
"comfyui_builder - Generate workflow graph",
"comfyui_execute - Run workflow",
"model_download - Download required models"
],
ProjectType.HYBRID: [
"n8n_builder - Generate automation workflow",
"comfyui_builder - Generate AI workflow",
"api_connector - Connect n8n to ComfyUI"
],
ProjectType.EXTERNAL_REPO: [
"github_search - Find relevant repositories",
"docker_helper - Clone and build",
"error_analyzer - Fix build issues"
]
}
return tools_map.get(project_type, ["unknown_tool"])
def _get_next_steps(self, project_type: str, query: str) -> List[str]:
"""Get recommended next steps."""
steps_map = {
ProjectType.N8N: [
"1. Generate workflow JSON template",
"2. Customize nodes and connections",
"3. Deploy to n8n instance",
"4. Test with sample data"
],
ProjectType.COMFYUI: [
"1. Generate ComfyUI workflow graph",
"2. Check required models are installed",
"3. Execute workflow",
"4. Review generated output"
],
ProjectType.HYBRID: [
"1. Create ComfyUI workflow for AI task",
"2. Create n8n workflow for automation",
"3. Connect n8n → ComfyUI via HTTP",
"4. Test end-to-end pipeline"
],
ProjectType.EXTERNAL_REPO: [
"1. Search GitHub for relevant projects",
"2. Select best matching repository",
"3. Clone and configure with Docker",
"4. Validate and fix any errors"
]
}
return steps_map.get(project_type, ["Please provide more details"])
# ============================================
# CrewAI Integration (Advanced Mode)
# ============================================
class CrewAIDecisionAgent:
"""
Advanced decision agent using CrewAI framework.
Used for more complex multi-step reasoning.
"""
def __init__(self):
self.simple_agent = DecisionAgent()
# CrewAI setup would go here
# We use simple agent for Phase 0, upgrade to CrewAI in Milestone 1
async def analyze_complex(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Complex analysis using CrewAI agents.
Reserved for Milestone 1 implementation.
"""
# For now, delegate to simple agent
return await self.simple_agent.analyze(query, context)