| | import os |
| | import datetime |
| | import logging |
| | import argparse |
| | import difflib |
| | from pathlib import Path |
| | from dotenv import load_dotenv |
| | from openai import OpenAI |
| | from expert_system import create_expert_system |
| | from agent import create_code_agent |
| | from enhanced_agent import create_enhanced_agent |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | AGENT_FILENAME = "agent.py" |
| | VERSION_DIR = "agent_versions" |
| | LOG_FILE = "agent_log.txt" |
| | KNOWLEDGE_BASE = "knowledge_base" |
| |
|
| | |
| | TOGETHER_AI_KEY = "afc478b561fbd2629f96f8f69041d780f3b9a94adcfe276f30818ee84a5e0358" |
| | if not TOGETHER_AI_KEY: |
| | logger.error("TOGETHER_AI_API_KEY not found in .env file") |
| | print("ERROR: Please set your Together AI API key in the .env file") |
| | print("Create a .env file in the project root with:") |
| | print("TOGETHER_AI_API_KEY=your_api_key_here") |
| | exit(1) |
| |
|
| | |
| | OpenAI.api_key = TOGETHER_AI_KEY |
| |
|
| | |
| | MODELS = { |
| | "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free": { |
| | "provider": "together", |
| | "context_window": 32768, |
| | "max_tokens": 4096, |
| | "temperature": 0.7, |
| | "suitable_for": ["general", "reasoning", "coding", "analysis"], |
| | "priority": 1 |
| | }, |
| | "mistralai/Mixtral-8x22B-Instruct-v0.1": { |
| | "provider": "together", |
| | "context_window": 65536, |
| | "max_tokens": 8192, |
| | "temperature": 0.7, |
| | "suitable_for": ["long_context", "summarization", "analysis"], |
| | "priority": 2 |
| | }, |
| | "deepseek-ai/deepseek-coder-33b-instruct": { |
| | "provider": "together", |
| | "context_window": 32768, |
| | "max_tokens": 4096, |
| | "temperature": 0.2, |
| | "suitable_for": ["coding", "debugging", "code_generation"], |
| | "priority": 1 |
| | }, |
| | "meta-llama/Llama-3.1-8B-Instruct": { |
| | "provider": "together", |
| | "context_window": 8192, |
| | "max_tokens": 2048, |
| | "temperature": 0.7, |
| | "suitable_for": ["general", "fast_response"], |
| | "priority": 3 |
| | } |
| | } |
| |
|
| | |
| | DEFAULT_MODEL = "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free" |
| |
|
| | |
| | MODEL_SELECTION_STRATEGIES = { |
| | "default": lambda task_type, models: DEFAULT_MODEL, |
| | "best_for_task": lambda task_type, models: next( |
| | (model for model, config in sorted( |
| | models.items(), |
| | key=lambda x: x[1].get('priority', 10) |
| | ) if task_type in config.get('suitable_for', [])), |
| | DEFAULT_MODEL |
| | ), |
| | "fastest": lambda task_type, models: next( |
| | (model for model, config in sorted( |
| | models.items(), |
| | key=lambda x: x[1].get('priority', 10) |
| | ) if config.get('provider') == 'together'), |
| | DEFAULT_MODEL |
| | ), |
| | "most_capable": lambda task_type, models: max( |
| | models.items(), |
| | key=lambda x: x[1].get('context_window', 0) |
| | )[0] |
| | } |
| |
|
| | |
| | os.makedirs(VERSION_DIR, exist_ok=True) |
| | os.makedirs(KNOWLEDGE_BASE, exist_ok=True) |
| |
|
| | class ModelManager: |
| | """Manages model selection and configuration""" |
| | |
| | def __init__(self, model_config: dict, strategy: str = "best_for_task"): |
| | """ |
| | Initialize the model manager |
| | |
| | Args: |
| | model_config: Dictionary of model configurations |
| | strategy: Strategy for model selection |
| | """ |
| | self.models = model_config |
| | self.strategy = strategy |
| | self.usage_stats = {model: {"calls": 0, "success": 0, "tokens": 0} for model in model_config} |
| | |
| | def select_model(self, task_type: str = "general") -> str: |
| | """ |
| | Select the most appropriate model for the given task type |
| | |
| | Args: |
| | task_type: Type of task (e.g., 'coding', 'analysis', 'general') |
| | |
| | Returns: |
| | str: Selected model name |
| | """ |
| | strategy = MODEL_SELECTION_STRATEGIES.get(self.strategy) |
| | if not strategy: |
| | logger.warning(f"Unknown strategy '{self.strategy}', using default") |
| | strategy = MODEL_SELECTION_STRATEGIES["default"] |
| | |
| | selected_model = strategy(task_type, self.models) |
| | return selected_model |
| | |
| | def get_model_config(self, model_name: str) -> dict: |
| | """ |
| | Get configuration for a specific model |
| | |
| | Args: |
| | model_name: Name of the model |
| | |
| | Returns: |
| | dict: Model configuration |
| | """ |
| | return self.models.get(model_name, {}) |
| | |
| | def update_usage(self, model_name: str, success: bool = True, tokens_used: int = 0): |
| | """ |
| | Update usage statistics for a model |
| | |
| | Args: |
| | model_name: Name of the model |
| | success: Whether the call was successful |
| | tokens_used: Number of tokens used |
| | """ |
| | if model_name in self.usage_stats: |
| | self.usage_stats[model_name]["calls"] += 1 |
| | self.usage_stats[model_name]["success"] += 1 if success else 0 |
| | self.usage_stats[model_name]["tokens"] += tokens_used |
| | |
| | def get_usage_stats(self) -> dict: |
| | """ |
| | Get usage statistics for all models |
| | |
| | Returns: |
| | dict: Usage statistics |
| | """ |
| | return self.usage_stats |
| |
|
| | |
| | model_manager = ModelManager(MODELS) |
| |
|
| | def log(event: str, level: str = "info"): |
| | """ |
| | Log an event with the specified log level. |
| | |
| | Args: |
| | event: The message to log |
| | level: Log level (info, warning, error, critical) |
| | """ |
| | log_levels = { |
| | 'info': logger.info, |
| | 'warning': logger.warning, |
| | 'error': logger.error, |
| | 'critical': logger.critical |
| | } |
| | |
| | log_func = log_levels.get(level.lower(), logger.info) |
| | log_func(event) |
| | |
| | |
| | with open(LOG_FILE, "a", encoding="utf-8") as f: |
| | f.write(f"[{datetime.datetime.now().isoformat()}] [{level.upper()}] {event}\n") |
| |
|
| | def process_problem(problem: str, agent) -> dict: |
| | """ |
| | Process a problem using the enhanced autonomous agent. |
| | |
| | Args: |
| | problem: The problem or query to process |
| | agent: Initialized EnhancedAutonomousAgent instance |
| | |
| | Returns: |
| | dict: The generated response with metadata |
| | """ |
| | try: |
| | log(f"Processing problem: {problem[:100]}...") |
| | result = agent.process_input(problem) |
| | log("Problem processed successfully") |
| | return { |
| | 'success': True, |
| | 'solution': result['response'], |
| | 'metadata': { |
| | 'pipeline_result': result.get('pipeline_result', {}), |
| | 'reasoning': result.get('reasoning_result', {}), |
| | 'timestamp': result.get('timestamp') |
| | } |
| | } |
| | except Exception as e: |
| | error_msg = f"Error processing problem: {str(e)}" |
| | log(error_msg, "error") |
| | return { |
| | 'success': False, |
| | 'error': error_msg, |
| | 'solution': "I encountered an error while processing your request. Please try again." |
| | } |
| |
|
| | def interactive_mode(agent, model_mgr): |
| | """Run the agent in interactive mode with model management and learning.""" |
| | print("\n" + "="*80) |
| | print("Enhanced Autonomous Agent - Interactive Mode") |
| | print("Type 'exit' to quit") |
| | print("Type 'learn' to analyze past solutions") |
| | print("Type 'clear' to clear the screen") |
| | print("="*80 + "\n") |
| | |
| | current_model = model_mgr.select_model() |
| | |
| | while True: |
| | try: |
| | user_input = input("\nYou: ").strip() |
| | |
| | if not user_input: |
| | continue |
| | |
| | if user_input.lower() == 'exit': |
| | print("Goodbye!") |
| | break |
| | |
| | if user_input.lower() == 'learn': |
| | print("Analyzing past solutions to learn patterns...") |
| | agent.learn_from_experience() |
| | print("Learning complete!") |
| | continue |
| | |
| | if user_input.lower() == 'history': |
| | show_solution_history(agent) |
| | continue |
| | |
| | if user_input.lower() == 'models': |
| | print("\nAvailable Models:") |
| | print("-"*40) |
| | for model_name, config in model_mgr.models.items(): |
| | print(f"{model_name} (Priority: {config.get('priority', 'N/A')})") |
| | print(f" Context: {config.get('context_window', 'N/A')} tokens") |
| | print(f" Best for: {', '.join(config.get('suitable_for', ['general']))}") |
| | print() |
| | continue |
| | |
| | if user_input.lower().startswith('switch '): |
| | model_name = user_input[7:].strip() |
| | if model_name in model_mgr.models: |
| | current_model = model_name |
| | print(f"Switched to model: {current_model}") |
| | else: |
| | print(f"Unknown model: {model_name}") |
| | continue |
| | |
| | if user_input.lower() == 'clear': |
| | os.system('cls' if os.name == 'nt' else 'clear') |
| | continue |
| | |
| | |
| | start_time = datetime.datetime.now() |
| | |
| | |
| | model_config = model_mgr.get_model_config(current_model) |
| | agent.expert_system.model_name = current_model |
| | |
| | result = agent.process_input(user_input) |
| | |
| | |
| | tokens_used = result.get('tokens_used', 0) |
| | model_mgr.update_usage( |
| | model_name=current_model, |
| | success=result.get('success', False), |
| | tokens_used=tokens_used |
| | ) |
| | |
| | |
| | display_response(result, |
| | (datetime.datetime.now() - start_time).total_seconds(), |
| | tokens_used) |
| | |
| | |
| | if input("\nWould you like to learn from this interaction? (y/n): ").lower() == 'y': |
| | agent.learn_from_experience() |
| | print("Learning complete!") |
| | |
| | except KeyboardInterrupt: |
| | print("\nOperation cancelled by user.") |
| | break |
| | except Exception as e: |
| | print(f"\nError: {str(e)}") |
| | log(f"Error in interactive mode: {str(e)}", "error") |
| |
|
| | def main(): |
| | """Main entry point for the AI system.""" |
| | parser = argparse.ArgumentParser(description="AI Expert System with Enhanced Autonomous Agent") |
| | parser.add_argument("--model", type=str, help="Specific model to use") |
| | parser.add_argument("--strategy", type=str, default="best_for_task", |
| | help="Model selection strategy (default: best_for_task)") |
| | parser.add_argument("--mode", type=str, default="interactive", |
| | choices=["interactive", "batch"], |
| | help="Operation mode") |
| | parser.add_argument("--problem", type=str, help="Problem to solve (batch mode only)") |
| | parser.add_argument("--learn", action="store_true", |
| | help="Learn from past experiences before starting") |
| | args = parser.parse_args() |
| |
|
| | try: |
| | |
| | model_mgr = ModelManager(MODELS, strategy=args.strategy) |
| | |
| | |
| | log("Initializing system components...") |
| | expert_system = create_expert_system( |
| | together_ai_key=TOGETHER_AI_KEY, |
| | model_name=args.model if args.model else None |
| | ) |
| | code_agent = create_code_agent() |
| | |
| | |
| | agent = create_enhanced_agent( |
| | expert_system=expert_system, |
| | code_agent=code_agent, |
| | knowledge_base_path=KNOWLEDGE_BASE |
| | ) |
| | |
| | |
| | if args.learn: |
| | log("Learning from past experiences...") |
| | agent.learn_from_experience() |
| | log("Learning complete!") |
| | |
| | |
| | if args.model: |
| | if args.model in model_mgr.models: |
| | current_model = args.model |
| | log(f"Using specified model: {current_model}") |
| | else: |
| | log(f"Warning: Model '{args.model}' not found. Using default selection.", "warning") |
| | |
| | |
| | if args.mode == "interactive": |
| | interactive_mode(agent, model_mgr) |
| | elif args.mode == "batch" and args.problem: |
| | batch_mode(agent, model_mgr, args.problem) |
| | else: |
| | parser.print_help() |
| | |
| | except Exception as e: |
| | log(f"Fatal error: {str(e)}", "critical") |
| | return 1 |
| | |
| | return 0 |
| |
|
| | def analyze_task_type(input_text: str) -> str: |
| | """ |
| | Analyze the input text to determine the most appropriate task type. |
| | |
| | Args: |
| | input_text: The user's input text |
| | |
| | Returns: |
| | str: The determined task type |
| | """ |
| | input_text = input_text.lower() |
| | |
| | |
| | code_keywords = ['code', 'program', 'function', 'class', 'import', 'def '] |
| | if any(keyword in input_text for keyword in code_keywords): |
| | return 'coding' |
| | |
| | |
| | analysis_keywords = ['analyze', 'compare', 'explain', 'what is', 'how does'] |
| | if any(keyword in input_text for keyword in analysis_keywords): |
| | return 'analysis' |
| | |
| | |
| | if len(input_text.split()) > 50: |
| | return 'summarization' |
| | |
| | |
| | return 'general' |
| |
|
| | def display_response(response: dict, process_time: float, tokens_used: int): |
| | """ |
| | Display the response to the user. |
| | |
| | Args: |
| | response: The response dictionary from the agent |
| | process_time: Time taken to process the request |
| | tokens_used: Number of tokens used |
| | """ |
| | print("\n" + "="*80) |
| | |
| | if not response.get('success', False): |
| | print(f"Error: {response.get('error', '')}") |
| | return |
| | |
| | |
| | print(response.get('response', 'No response generated')) |
| | |
| | |
| | if 'sources' in response or 'citations' in response: |
| | print("\nSources:") |
| | for source in response.get('sources', []): |
| | print(f"- {source}") |
| | |
| | |
| | print(f"\n[Processed in {process_time:.2f}s | Tokens: {tokens_used}]") |
| | |
| | |
| | if 'model' in response: |
| | print(f"[Model: {response['model']}]") |
| |
|
| | if __name__ == "__main__": |
| | import sys |
| | import time |
| | |
| | |
| | parser = argparse.ArgumentParser(description='AI System Command Line Interface') |
| | parser.add_argument('--model', type=str, help='Default model to use') |
| | parser.add_argument('--strategy', type=str, default='best_for_task', |
| | help='Model selection strategy (default: best_for_task)') |
| | args = parser.parse_args() |
| | |
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| | handlers=[ |
| | logging.FileHandler('ai_system.log'), |
| | logging.StreamHandler() |
| | ] |
| | ) |
| | |
| | |
| | sys.exit(main()) |