Spaces:
Paused
Paused
| """ | |
| Task Memory System for the Felix Framework. | |
| Provides pattern recognition, success/failure tracking, and adaptive strategy | |
| selection based on historical task execution data. | |
| """ | |
| import json | |
| import sqlite3 | |
| import hashlib | |
| import time | |
| from pathlib import Path | |
| from enum import Enum | |
| from typing import Dict, List, Optional, Any, Tuple, Set | |
| from dataclasses import dataclass, field, asdict | |
| from datetime import datetime | |
| class TaskOutcome(Enum): | |
| """Possible outcomes for task execution.""" | |
| SUCCESS = "success" | |
| PARTIAL_SUCCESS = "partial_success" | |
| FAILURE = "failure" | |
| TIMEOUT = "timeout" | |
| ERROR = "error" | |
| class TaskComplexity(Enum): | |
| """Task complexity levels.""" | |
| SIMPLE = "simple" | |
| MODERATE = "moderate" | |
| COMPLEX = "complex" | |
| VERY_COMPLEX = "very_complex" | |
| class TaskPattern: | |
| """Pattern extracted from task execution history.""" | |
| pattern_id: str | |
| task_type: str | |
| complexity: TaskComplexity | |
| keywords: List[str] | |
| typical_duration: float | |
| success_rate: float | |
| failure_modes: List[str] | |
| optimal_strategies: List[str] | |
| required_agents: List[str] | |
| context_requirements: Dict[str, Any] | |
| created_at: float = field(default_factory=time.time) | |
| updated_at: float = field(default_factory=time.time) | |
| usage_count: int = 0 | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for storage.""" | |
| data = asdict(self) | |
| data['complexity'] = self.complexity.value | |
| return data | |
| def from_dict(cls, data: Dict[str, Any]) -> 'TaskPattern': | |
| """Create from dictionary.""" | |
| data['complexity'] = TaskComplexity(data['complexity']) | |
| return cls(**data) | |
| class TaskExecution: | |
| """Record of a task execution.""" | |
| execution_id: str | |
| task_description: str | |
| task_type: str | |
| complexity: TaskComplexity | |
| outcome: TaskOutcome | |
| duration: float | |
| agents_used: List[str] | |
| strategies_used: List[str] | |
| context_size: int | |
| error_messages: List[str] | |
| success_metrics: Dict[str, float] | |
| patterns_matched: List[str] | |
| created_at: float = field(default_factory=time.time) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for storage.""" | |
| data = asdict(self) | |
| data['complexity'] = self.complexity.value | |
| data['outcome'] = self.outcome.value | |
| return data | |
| def from_dict(cls, data: Dict[str, Any]) -> 'TaskExecution': | |
| """Create from dictionary.""" | |
| data['complexity'] = TaskComplexity(data['complexity']) | |
| data['outcome'] = TaskOutcome(data['outcome']) | |
| return cls(**data) | |
| class TaskMemoryQuery: | |
| """Query structure for task memory retrieval.""" | |
| task_types: Optional[List[str]] = None | |
| complexity_levels: Optional[List[TaskComplexity]] = None | |
| outcomes: Optional[List[TaskOutcome]] = None | |
| keywords: Optional[List[str]] = None | |
| min_success_rate: Optional[float] = None | |
| max_duration: Optional[float] = None | |
| time_range: Optional[Tuple[float, float]] = None | |
| limit: int = 10 | |
| class TaskMemory: | |
| """ | |
| Task memory system for pattern recognition and adaptive strategy selection. | |
| Tracks task execution history, identifies patterns, and recommends | |
| optimal strategies based on past performance. | |
| """ | |
| def __init__(self, storage_path: str = "felix_task_memory.db"): | |
| """ | |
| Initialize task memory system. | |
| Args: | |
| storage_path: Path to SQLite database file | |
| """ | |
| self.storage_path = Path(storage_path) | |
| self._init_database() | |
| def _init_database(self) -> None: | |
| """Initialize SQLite database with required tables.""" | |
| with sqlite3.connect(self.storage_path) as conn: | |
| # Task patterns table | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS task_patterns ( | |
| pattern_id TEXT PRIMARY KEY, | |
| task_type TEXT NOT NULL, | |
| complexity TEXT NOT NULL, | |
| keywords_json TEXT NOT NULL, | |
| typical_duration REAL NOT NULL, | |
| success_rate REAL NOT NULL, | |
| failure_modes_json TEXT NOT NULL, | |
| optimal_strategies_json TEXT NOT NULL, | |
| required_agents_json TEXT NOT NULL, | |
| context_requirements_json TEXT NOT NULL, | |
| created_at REAL NOT NULL, | |
| updated_at REAL NOT NULL, | |
| usage_count INTEGER DEFAULT 0 | |
| ) | |
| """) | |
| # Task executions table | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS task_executions ( | |
| execution_id TEXT PRIMARY KEY, | |
| task_description TEXT NOT NULL, | |
| task_type TEXT NOT NULL, | |
| complexity TEXT NOT NULL, | |
| outcome TEXT NOT NULL, | |
| duration REAL NOT NULL, | |
| agents_used_json TEXT NOT NULL, | |
| strategies_used_json TEXT NOT NULL, | |
| context_size INTEGER NOT NULL, | |
| error_messages_json TEXT NOT NULL, | |
| success_metrics_json TEXT NOT NULL, | |
| patterns_matched_json TEXT NOT NULL, | |
| created_at REAL NOT NULL | |
| ) | |
| """) | |
| # Create indices for better query performance | |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_task_type ON task_patterns(task_type)") | |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_complexity ON task_patterns(complexity)") | |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_success_rate ON task_patterns(success_rate)") | |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_execution_type ON task_executions(task_type)") | |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_execution_outcome ON task_executions(outcome)") | |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_execution_created ON task_executions(created_at)") | |
| def _generate_execution_id(self, task_description: str) -> str: | |
| """Generate unique ID for task execution.""" | |
| hash_input = f"{task_description}:{time.time()}" | |
| return hashlib.sha256(hash_input.encode()).hexdigest()[:16] | |
| def _generate_pattern_id(self, task_type: str, complexity: TaskComplexity, | |
| keywords: List[str]) -> str: | |
| """Generate unique ID for task pattern.""" | |
| keywords_str = ":".join(sorted(keywords)) | |
| hash_input = f"{task_type}:{complexity.value}:{keywords_str}" | |
| return hashlib.sha256(hash_input.encode()).hexdigest()[:16] | |
| def record_task_execution(self, task_description: str, task_type: str, | |
| complexity: TaskComplexity, outcome: TaskOutcome, | |
| duration: float, agents_used: List[str], | |
| strategies_used: List[str], context_size: int, | |
| error_messages: Optional[List[str]] = None, | |
| success_metrics: Optional[Dict[str, float]] = None) -> str: | |
| """ | |
| Record a task execution for future pattern analysis. | |
| Args: | |
| task_description: Description of the task | |
| task_type: Type/category of the task | |
| complexity: Assessed complexity level | |
| outcome: Execution outcome | |
| duration: Execution duration in seconds | |
| agents_used: List of agent types used | |
| strategies_used: List of strategies employed | |
| context_size: Size of context used | |
| error_messages: List of error messages if any | |
| success_metrics: Success metrics if available | |
| Returns: | |
| Execution ID | |
| """ | |
| if error_messages is None: | |
| error_messages = [] | |
| if success_metrics is None: | |
| success_metrics = {} | |
| execution_id = self._generate_execution_id(task_description) | |
| execution = TaskExecution( | |
| execution_id=execution_id, | |
| task_description=task_description, | |
| task_type=task_type, | |
| complexity=complexity, | |
| outcome=outcome, | |
| duration=duration, | |
| agents_used=agents_used, | |
| strategies_used=strategies_used, | |
| context_size=context_size, | |
| error_messages=error_messages, | |
| success_metrics=success_metrics, | |
| patterns_matched=[] # Will be filled by pattern matching | |
| ) | |
| # Find matching patterns and update them | |
| matched_patterns = self._find_matching_patterns(execution) | |
| execution.patterns_matched = [p.pattern_id for p in matched_patterns] | |
| # Store execution | |
| with sqlite3.connect(self.storage_path) as conn: | |
| conn.execute(""" | |
| INSERT INTO task_executions | |
| (execution_id, task_description, task_type, complexity, outcome, | |
| duration, agents_used_json, strategies_used_json, context_size, | |
| error_messages_json, success_metrics_json, patterns_matched_json, created_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| execution_id, | |
| task_description, | |
| task_type, | |
| complexity.value, | |
| outcome.value, | |
| duration, | |
| json.dumps(agents_used), | |
| json.dumps(strategies_used), | |
| context_size, | |
| json.dumps(error_messages), | |
| json.dumps(success_metrics), | |
| json.dumps(execution.patterns_matched), | |
| execution.created_at | |
| )) | |
| # Update or create patterns based on this execution | |
| self._update_patterns_from_execution(execution) | |
| return execution_id | |
| def _find_matching_patterns(self, execution: TaskExecution) -> List[TaskPattern]: | |
| """Find patterns that match the given execution.""" | |
| patterns = self.get_patterns(TaskMemoryQuery( | |
| task_types=[execution.task_type], | |
| complexity_levels=[execution.complexity] | |
| )) | |
| matched = [] | |
| task_keywords = self._extract_keywords(execution.task_description) | |
| for pattern in patterns: | |
| # Check keyword overlap | |
| keyword_overlap = len(set(task_keywords) & set(pattern.keywords)) | |
| if keyword_overlap >= len(pattern.keywords) * 0.5: # 50% overlap threshold | |
| matched.append(pattern) | |
| return matched | |
| def _extract_keywords(self, text: str) -> List[str]: | |
| """Extract keywords from task description.""" | |
| # Simple keyword extraction - could be enhanced with NLP | |
| import re | |
| words = re.findall(r'\b\w{3,}\b', text.lower()) | |
| # Filter out common words | |
| stopwords = { | |
| 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had', | |
| 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', 'his', | |
| 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two', 'who', 'boy', | |
| 'did', 'man', 'she', 'use', 'way', 'who', 'oil', 'sit', 'set', 'run' | |
| } | |
| keywords = [w for w in words if w not in stopwords and len(w) > 3] | |
| return list(set(keywords)) # Remove duplicates | |
| def _update_patterns_from_execution(self, execution: TaskExecution) -> None: | |
| """Update or create patterns based on task execution.""" | |
| task_keywords = self._extract_keywords(execution.task_description) | |
| if not task_keywords: | |
| return | |
| pattern_id = self._generate_pattern_id( | |
| execution.task_type, execution.complexity, task_keywords | |
| ) | |
| # Check if pattern exists | |
| existing_pattern = self._get_pattern_by_id(pattern_id) | |
| if existing_pattern: | |
| # Update existing pattern | |
| self._update_existing_pattern(existing_pattern, execution) | |
| else: | |
| # Create new pattern | |
| self._create_new_pattern(pattern_id, execution, task_keywords) | |
| def _get_pattern_by_id(self, pattern_id: str) -> Optional[TaskPattern]: | |
| """Get pattern by ID.""" | |
| with sqlite3.connect(self.storage_path) as conn: | |
| cursor = conn.execute( | |
| "SELECT * FROM task_patterns WHERE pattern_id = ?", | |
| (pattern_id,) | |
| ) | |
| row = cursor.fetchone() | |
| if row: | |
| return self._row_to_pattern(row) | |
| return None | |
| def _update_existing_pattern(self, pattern: TaskPattern, | |
| execution: TaskExecution) -> None: | |
| """Update existing pattern with new execution data.""" | |
| # Get all executions for this pattern to recalculate metrics | |
| executions = self._get_executions_for_pattern(pattern.pattern_id) | |
| executions.append(execution) | |
| # Recalculate success rate | |
| successes = sum(1 for e in executions | |
| if e.outcome in [TaskOutcome.SUCCESS, TaskOutcome.PARTIAL_SUCCESS]) | |
| pattern.success_rate = successes / len(executions) | |
| # Recalculate typical duration | |
| durations = [e.duration for e in executions] | |
| pattern.typical_duration = sum(durations) / len(durations) | |
| # Update failure modes | |
| failures = [e for e in executions if e.outcome in [TaskOutcome.FAILURE, TaskOutcome.ERROR]] | |
| failure_modes = [] | |
| for f in failures: | |
| failure_modes.extend(f.error_messages) | |
| pattern.failure_modes = list(set(failure_modes)) | |
| # Update optimal strategies (from successful executions) | |
| successes = [e for e in executions if e.outcome == TaskOutcome.SUCCESS] | |
| strategy_counts = {} | |
| for s in successes: | |
| for strategy in s.strategies_used: | |
| strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1 | |
| # Sort strategies by usage in successful executions | |
| pattern.optimal_strategies = sorted(strategy_counts.keys(), | |
| key=lambda x: strategy_counts[x], | |
| reverse=True)[:5] | |
| # Update required agents (from successful executions) | |
| agent_counts = {} | |
| for s in successes: | |
| for agent in s.agents_used: | |
| agent_counts[agent] = agent_counts.get(agent, 0) + 1 | |
| pattern.required_agents = sorted(agent_counts.keys(), | |
| key=lambda x: agent_counts[x], | |
| reverse=True)[:3] | |
| pattern.updated_at = time.time() | |
| pattern.usage_count += 1 | |
| # Save updated pattern | |
| self._save_pattern(pattern) | |
| def _create_new_pattern(self, pattern_id: str, execution: TaskExecution, | |
| keywords: List[str]) -> None: | |
| """Create new pattern from execution.""" | |
| pattern = TaskPattern( | |
| pattern_id=pattern_id, | |
| task_type=execution.task_type, | |
| complexity=execution.complexity, | |
| keywords=keywords, | |
| typical_duration=execution.duration, | |
| success_rate=1.0 if execution.outcome in [TaskOutcome.SUCCESS, TaskOutcome.PARTIAL_SUCCESS] else 0.0, | |
| failure_modes=execution.error_messages if execution.outcome in [TaskOutcome.FAILURE, TaskOutcome.ERROR] else [], | |
| optimal_strategies=execution.strategies_used if execution.outcome == TaskOutcome.SUCCESS else [], | |
| required_agents=execution.agents_used if execution.outcome == TaskOutcome.SUCCESS else [], | |
| context_requirements={ | |
| "min_context_size": execution.context_size, | |
| "success_metrics": execution.success_metrics | |
| }, | |
| usage_count=1 | |
| ) | |
| self._save_pattern(pattern) | |
| def _save_pattern(self, pattern: TaskPattern) -> None: | |
| """Save pattern to database.""" | |
| with sqlite3.connect(self.storage_path) as conn: | |
| conn.execute(""" | |
| INSERT OR REPLACE INTO task_patterns | |
| (pattern_id, task_type, complexity, keywords_json, typical_duration, | |
| success_rate, failure_modes_json, optimal_strategies_json, | |
| required_agents_json, context_requirements_json, created_at, | |
| updated_at, usage_count) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| pattern.pattern_id, | |
| pattern.task_type, | |
| pattern.complexity.value, | |
| json.dumps(pattern.keywords), | |
| pattern.typical_duration, | |
| pattern.success_rate, | |
| json.dumps(pattern.failure_modes), | |
| json.dumps(pattern.optimal_strategies), | |
| json.dumps(pattern.required_agents), | |
| json.dumps(pattern.context_requirements), | |
| pattern.created_at, | |
| pattern.updated_at, | |
| pattern.usage_count | |
| )) | |
| def _get_executions_for_pattern(self, pattern_id: str) -> List[TaskExecution]: | |
| """Get all executions that match a pattern.""" | |
| with sqlite3.connect(self.storage_path) as conn: | |
| cursor = conn.execute(""" | |
| SELECT * FROM task_executions | |
| WHERE patterns_matched_json LIKE ? | |
| """, (f'%"{pattern_id}"%',)) | |
| return [self._row_to_execution(row) for row in cursor.fetchall()] | |
| def _row_to_pattern(self, row) -> TaskPattern: | |
| """Convert database row to TaskPattern.""" | |
| (pattern_id, task_type, complexity, keywords_json, typical_duration, | |
| success_rate, failure_modes_json, optimal_strategies_json, | |
| required_agents_json, context_requirements_json, created_at, | |
| updated_at, usage_count) = row | |
| return TaskPattern( | |
| pattern_id=pattern_id, | |
| task_type=task_type, | |
| complexity=TaskComplexity(complexity), | |
| keywords=json.loads(keywords_json), | |
| typical_duration=typical_duration, | |
| success_rate=success_rate, | |
| failure_modes=json.loads(failure_modes_json), | |
| optimal_strategies=json.loads(optimal_strategies_json), | |
| required_agents=json.loads(required_agents_json), | |
| context_requirements=json.loads(context_requirements_json), | |
| created_at=created_at, | |
| updated_at=updated_at, | |
| usage_count=usage_count | |
| ) | |
| def _row_to_execution(self, row) -> TaskExecution: | |
| """Convert database row to TaskExecution.""" | |
| (execution_id, task_description, task_type, complexity, outcome, | |
| duration, agents_used_json, strategies_used_json, context_size, | |
| error_messages_json, success_metrics_json, patterns_matched_json, created_at) = row | |
| return TaskExecution( | |
| execution_id=execution_id, | |
| task_description=task_description, | |
| task_type=task_type, | |
| complexity=TaskComplexity(complexity), | |
| outcome=TaskOutcome(outcome), | |
| duration=duration, | |
| agents_used=json.loads(agents_used_json), | |
| strategies_used=json.loads(strategies_used_json), | |
| context_size=context_size, | |
| error_messages=json.loads(error_messages_json), | |
| success_metrics=json.loads(success_metrics_json), | |
| patterns_matched=json.loads(patterns_matched_json), | |
| created_at=created_at | |
| ) | |
| def get_patterns(self, query: TaskMemoryQuery) -> List[TaskPattern]: | |
| """ | |
| Retrieve task patterns matching query criteria. | |
| Args: | |
| query: Query parameters | |
| Returns: | |
| List of matching task patterns | |
| """ | |
| sql_parts = ["SELECT * FROM task_patterns WHERE 1=1"] | |
| params = [] | |
| if query.task_types: | |
| type_placeholders = ",".join("?" * len(query.task_types)) | |
| sql_parts.append(f"AND task_type IN ({type_placeholders})") | |
| params.extend(query.task_types) | |
| if query.complexity_levels: | |
| complexity_placeholders = ",".join("?" * len(query.complexity_levels)) | |
| sql_parts.append(f"AND complexity IN ({complexity_placeholders})") | |
| params.extend([c.value for c in query.complexity_levels]) | |
| if query.min_success_rate: | |
| sql_parts.append("AND success_rate >= ?") | |
| params.append(query.min_success_rate) | |
| if query.max_duration: | |
| sql_parts.append("AND typical_duration <= ?") | |
| params.append(query.max_duration) | |
| if query.time_range: | |
| sql_parts.append("AND created_at BETWEEN ? AND ?") | |
| params.extend(query.time_range) | |
| # Order by success rate and usage count | |
| sql_parts.append("ORDER BY success_rate DESC, usage_count DESC") | |
| sql_parts.append("LIMIT ?") | |
| params.append(query.limit) | |
| sql = " ".join(sql_parts) | |
| patterns = [] | |
| with sqlite3.connect(self.storage_path) as conn: | |
| cursor = conn.execute(sql, params) | |
| for row in cursor.fetchall(): | |
| pattern = self._row_to_pattern(row) | |
| # Apply keyword filtering if specified | |
| if query.keywords: | |
| pattern_keywords_lower = [k.lower() for k in pattern.keywords] | |
| if not any(keyword.lower() in pattern_keywords_lower | |
| for keyword in query.keywords): | |
| continue | |
| patterns.append(pattern) | |
| # Update usage count | |
| self._increment_pattern_usage(pattern.pattern_id) | |
| return patterns | |
| def _increment_pattern_usage(self, pattern_id: str) -> None: | |
| """Increment usage count for pattern.""" | |
| with sqlite3.connect(self.storage_path) as conn: | |
| conn.execute(""" | |
| UPDATE task_patterns | |
| SET usage_count = usage_count + 1 | |
| WHERE pattern_id = ? | |
| """, (pattern_id,)) | |
| def recommend_strategy(self, task_description: str, task_type: str, | |
| complexity: TaskComplexity) -> Dict[str, Any]: | |
| """ | |
| Recommend optimal strategy for a task based on historical patterns. | |
| Args: | |
| task_description: Description of the task | |
| task_type: Type/category of the task | |
| complexity: Assessed complexity level | |
| Returns: | |
| Dictionary with strategy recommendations | |
| """ | |
| # Find similar patterns | |
| keywords = self._extract_keywords(task_description) | |
| query = TaskMemoryQuery( | |
| task_types=[task_type], | |
| complexity_levels=[complexity], | |
| keywords=keywords, | |
| min_success_rate=0.5, | |
| limit=5 | |
| ) | |
| patterns = self.get_patterns(query) | |
| if not patterns: | |
| return { | |
| "strategies": [], | |
| "agents": [], | |
| "estimated_duration": None, | |
| "success_probability": 0.0, | |
| "recommendations": "No similar patterns found. Proceeding with default strategy.", | |
| "potential_issues": [] | |
| } | |
| # Aggregate recommendations from top patterns | |
| all_strategies = [] | |
| all_agents = [] | |
| durations = [] | |
| success_rates = [] | |
| potential_issues = [] | |
| for pattern in patterns: | |
| all_strategies.extend(pattern.optimal_strategies) | |
| all_agents.extend(pattern.required_agents) | |
| durations.append(pattern.typical_duration) | |
| success_rates.append(pattern.success_rate) | |
| potential_issues.extend(pattern.failure_modes) | |
| # Get most common strategies and agents | |
| strategy_counts = {} | |
| for strategy in all_strategies: | |
| strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1 | |
| agent_counts = {} | |
| for agent in all_agents: | |
| agent_counts[agent] = agent_counts.get(agent, 0) + 1 | |
| recommended_strategies = sorted(strategy_counts.keys(), | |
| key=lambda x: strategy_counts[x], | |
| reverse=True)[:3] | |
| recommended_agents = sorted(agent_counts.keys(), | |
| key=lambda x: agent_counts[x], | |
| reverse=True)[:3] | |
| # Calculate metrics | |
| avg_duration = sum(durations) / len(durations) if durations else None | |
| avg_success_rate = sum(success_rates) / len(success_rates) if success_rates else 0.0 | |
| # Generate recommendations text | |
| recommendations = [] | |
| if recommended_strategies: | |
| recommendations.append(f"Use proven strategies: {', '.join(recommended_strategies[:2])}") | |
| if recommended_agents: | |
| recommendations.append(f"Deploy agents: {', '.join(recommended_agents[:2])}") | |
| if avg_duration: | |
| recommendations.append(f"Expected duration: {avg_duration:.1f} seconds") | |
| return { | |
| "strategies": recommended_strategies, | |
| "agents": recommended_agents, | |
| "estimated_duration": avg_duration, | |
| "success_probability": avg_success_rate, | |
| "recommendations": ". ".join(recommendations), | |
| "potential_issues": list(set(potential_issues))[:3], | |
| "patterns_used": len(patterns) | |
| } | |
| def get_memory_summary(self) -> Dict[str, Any]: | |
| """Get summary statistics of task memory.""" | |
| with sqlite3.connect(self.storage_path) as conn: | |
| # Total patterns and executions | |
| cursor = conn.execute("SELECT COUNT(*) FROM task_patterns") | |
| total_patterns = cursor.fetchone()[0] | |
| cursor = conn.execute("SELECT COUNT(*) FROM task_executions") | |
| total_executions = cursor.fetchone()[0] | |
| # Success rate distribution | |
| cursor = conn.execute(""" | |
| SELECT outcome, COUNT(*) | |
| FROM task_executions | |
| GROUP BY outcome | |
| """) | |
| outcome_distribution = dict(cursor.fetchall()) | |
| # Most common task types | |
| cursor = conn.execute(""" | |
| SELECT task_type, COUNT(*) | |
| FROM task_patterns | |
| GROUP BY task_type | |
| ORDER BY COUNT(*) DESC | |
| LIMIT 5 | |
| """) | |
| top_task_types = dict(cursor.fetchall()) | |
| # Average success rate by complexity | |
| cursor = conn.execute(""" | |
| SELECT complexity, AVG(success_rate) | |
| FROM task_patterns | |
| GROUP BY complexity | |
| """) | |
| success_by_complexity = dict(cursor.fetchall()) | |
| return { | |
| "total_patterns": total_patterns, | |
| "total_executions": total_executions, | |
| "outcome_distribution": outcome_distribution, | |
| "top_task_types": top_task_types, | |
| "success_by_complexity": success_by_complexity, | |
| "storage_path": str(self.storage_path) | |
| } | |
| def cleanup_old_patterns(self, max_age_days: int = 60, | |
| min_usage_count: int = 2) -> int: | |
| """ | |
| Clean up old or unused task patterns. | |
| Args: | |
| max_age_days: Maximum age in days | |
| min_usage_count: Minimum usage count to keep | |
| Returns: | |
| Number of patterns deleted | |
| """ | |
| max_age_seconds = max_age_days * 24 * 3600 | |
| cutoff_time = time.time() - max_age_seconds | |
| with sqlite3.connect(self.storage_path) as conn: | |
| cursor = conn.execute(""" | |
| DELETE FROM task_patterns | |
| WHERE (created_at < ? AND usage_count < ?) | |
| OR (success_rate = 0.0 AND usage_count = 1) | |
| """, (cutoff_time, min_usage_count)) | |
| return cursor.rowcount | |