import os import datetime import logging import argparse import difflib from pathlib import Path from dotenv import load_dotenv from openai import OpenAI from expert_system import create_expert_system from agent import create_code_agent from enhanced_agent import create_enhanced_agent # Load environment variables from .env file load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration AGENT_FILENAME = "agent.py" VERSION_DIR = "agent_versions" LOG_FILE = "agent_log.txt" KNOWLEDGE_BASE = "knowledge_base" # Get API key from environment TOGETHER_AI_KEY = "afc478b561fbd2629f96f8f69041d780f3b9a94adcfe276f30818ee84a5e0358" if not TOGETHER_AI_KEY: logger.error("TOGETHER_AI_API_KEY not found in .env file") print("ERROR: Please set your Together AI API key in the .env file") print("Create a .env file in the project root with:") print("TOGETHER_AI_API_KEY=your_api_key_here") exit(1) # Initialize OpenAI API OpenAI.api_key = TOGETHER_AI_KEY # Model Configuration MODELS = { "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free": { "provider": "together", "context_window": 32768, "max_tokens": 4096, "temperature": 0.7, "suitable_for": ["general", "reasoning", "coding", "analysis"], "priority": 1 }, "mistralai/Mixtral-8x22B-Instruct-v0.1": { "provider": "together", "context_window": 65536, "max_tokens": 8192, "temperature": 0.7, "suitable_for": ["long_context", "summarization", "analysis"], "priority": 2 }, "deepseek-ai/deepseek-coder-33b-instruct": { "provider": "together", "context_window": 32768, "max_tokens": 4096, "temperature": 0.2, "suitable_for": ["coding", "debugging", "code_generation"], "priority": 1 }, "meta-llama/Llama-3.1-8B-Instruct": { "provider": "together", "context_window": 8192, "max_tokens": 2048, "temperature": 0.7, "suitable_for": ["general", "fast_response"], "priority": 3 } } # Default model selection DEFAULT_MODEL = "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free" # Model selection strategies MODEL_SELECTION_STRATEGIES = { "default": lambda task_type, models: DEFAULT_MODEL, "best_for_task": lambda task_type, models: next( (model for model, config in sorted( models.items(), key=lambda x: x[1].get('priority', 10) ) if task_type in config.get('suitable_for', [])), DEFAULT_MODEL ), "fastest": lambda task_type, models: next( (model for model, config in sorted( models.items(), key=lambda x: x[1].get('priority', 10) ) if config.get('provider') == 'together'), DEFAULT_MODEL ), "most_capable": lambda task_type, models: max( models.items(), key=lambda x: x[1].get('context_window', 0) )[0] } # Ensure directories exist os.makedirs(VERSION_DIR, exist_ok=True) os.makedirs(KNOWLEDGE_BASE, exist_ok=True) class ModelManager: """Manages model selection and configuration""" def __init__(self, model_config: dict, strategy: str = "best_for_task"): """ Initialize the model manager Args: model_config: Dictionary of model configurations strategy: Strategy for model selection """ self.models = model_config self.strategy = strategy self.usage_stats = {model: {"calls": 0, "success": 0, "tokens": 0} for model in model_config} def select_model(self, task_type: str = "general") -> str: """ Select the most appropriate model for the given task type Args: task_type: Type of task (e.g., 'coding', 'analysis', 'general') Returns: str: Selected model name """ strategy = MODEL_SELECTION_STRATEGIES.get(self.strategy) if not strategy: logger.warning(f"Unknown strategy '{self.strategy}', using default") strategy = MODEL_SELECTION_STRATEGIES["default"] selected_model = strategy(task_type, self.models) return selected_model def get_model_config(self, model_name: str) -> dict: """ Get configuration for a specific model Args: model_name: Name of the model Returns: dict: Model configuration """ return self.models.get(model_name, {}) def update_usage(self, model_name: str, success: bool = True, tokens_used: int = 0): """ Update usage statistics for a model Args: model_name: Name of the model success: Whether the call was successful tokens_used: Number of tokens used """ if model_name in self.usage_stats: self.usage_stats[model_name]["calls"] += 1 self.usage_stats[model_name]["success"] += 1 if success else 0 self.usage_stats[model_name]["tokens"] += tokens_used def get_usage_stats(self) -> dict: """ Get usage statistics for all models Returns: dict: Usage statistics """ return self.usage_stats # Initialize model manager model_manager = ModelManager(MODELS) def log(event: str, level: str = "info"): """ Log an event with the specified log level. Args: event: The message to log level: Log level (info, warning, error, critical) """ log_levels = { 'info': logger.info, 'warning': logger.warning, 'error': logger.error, 'critical': logger.critical } log_func = log_levels.get(level.lower(), logger.info) log_func(event) # Also write to log file with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{datetime.datetime.now().isoformat()}] [{level.upper()}] {event}\n") def process_problem(problem: str, agent) -> dict: """ Process a problem using the enhanced autonomous agent. Args: problem: The problem or query to process agent: Initialized EnhancedAutonomousAgent instance Returns: dict: The generated response with metadata """ try: log(f"Processing problem: {problem[:100]}...") result = agent.process_input(problem) log("Problem processed successfully") return { 'success': True, 'solution': result['response'], 'metadata': { 'pipeline_result': result.get('pipeline_result', {}), 'reasoning': result.get('reasoning_result', {}), 'timestamp': result.get('timestamp') } } except Exception as e: error_msg = f"Error processing problem: {str(e)}" log(error_msg, "error") return { 'success': False, 'error': error_msg, 'solution': "I encountered an error while processing your request. Please try again." } def interactive_mode(agent, model_mgr): """Run the agent in interactive mode with model management and learning.""" print("\n" + "="*80) print("Enhanced Autonomous Agent - Interactive Mode") print("Type 'exit' to quit") print("Type 'learn' to analyze past solutions") print("Type 'clear' to clear the screen") print("="*80 + "\n") current_model = model_mgr.select_model() while True: try: user_input = input("\nYou: ").strip() if not user_input: continue if user_input.lower() == 'exit': print("Goodbye!") break if user_input.lower() == 'learn': print("Analyzing past solutions to learn patterns...") agent.learn_from_experience() print("Learning complete!") continue if user_input.lower() == 'history': show_solution_history(agent) continue if user_input.lower() == 'models': print("\nAvailable Models:") print("-"*40) for model_name, config in model_mgr.models.items(): print(f"{model_name} (Priority: {config.get('priority', 'N/A')})") print(f" Context: {config.get('context_window', 'N/A')} tokens") print(f" Best for: {', '.join(config.get('suitable_for', ['general']))}") print() continue if user_input.lower().startswith('switch '): model_name = user_input[7:].strip() if model_name in model_mgr.models: current_model = model_name print(f"Switched to model: {current_model}") else: print(f"Unknown model: {model_name}") continue if user_input.lower() == 'clear': os.system('cls' if os.name == 'nt' else 'clear') continue # Process the input with the enhanced agent start_time = datetime.datetime.now() # Update the expert system's model configuration model_config = model_mgr.get_model_config(current_model) agent.expert_system.model_name = current_model result = agent.process_input(user_input) # Update model usage stats tokens_used = result.get('tokens_used', 0) model_mgr.update_usage( model_name=current_model, success=result.get('success', False), tokens_used=tokens_used ) # Display the response display_response(result, (datetime.datetime.now() - start_time).total_seconds(), tokens_used) # Ask about learning if input("\nWould you like to learn from this interaction? (y/n): ").lower() == 'y': agent.learn_from_experience() print("Learning complete!") except KeyboardInterrupt: print("\nOperation cancelled by user.") break except Exception as e: print(f"\nError: {str(e)}") log(f"Error in interactive mode: {str(e)}", "error") def main(): """Main entry point for the AI system.""" parser = argparse.ArgumentParser(description="AI Expert System with Enhanced Autonomous Agent") parser.add_argument("--model", type=str, help="Specific model to use") parser.add_argument("--strategy", type=str, default="best_for_task", help="Model selection strategy (default: best_for_task)") parser.add_argument("--mode", type=str, default="interactive", choices=["interactive", "batch"], help="Operation mode") parser.add_argument("--problem", type=str, help="Problem to solve (batch mode only)") parser.add_argument("--learn", action="store_true", help="Learn from past experiences before starting") args = parser.parse_args() try: # Initialize model manager model_mgr = ModelManager(MODELS, strategy=args.strategy) # Initialize components log("Initializing system components...") expert_system = create_expert_system( together_ai_key=TOGETHER_AI_KEY, model_name=args.model if args.model else None ) code_agent = create_code_agent() # Initialize enhanced autonomous agent agent = create_enhanced_agent( expert_system=expert_system, code_agent=code_agent, knowledge_base_path=KNOWLEDGE_BASE ) # Learn from past experiences if requested if args.learn: log("Learning from past experiences...") agent.learn_from_experience() log("Learning complete!") # Set specific model if provided if args.model: if args.model in model_mgr.models: current_model = args.model log(f"Using specified model: {current_model}") else: log(f"Warning: Model '{args.model}' not found. Using default selection.", "warning") # Run in selected mode if args.mode == "interactive": interactive_mode(agent, model_mgr) elif args.mode == "batch" and args.problem: batch_mode(agent, model_mgr, args.problem) else: parser.print_help() except Exception as e: log(f"Fatal error: {str(e)}", "critical") return 1 return 0 def analyze_task_type(input_text: str) -> str: """ Analyze the input text to determine the most appropriate task type. Args: input_text: The user's input text Returns: str: The determined task type """ input_text = input_text.lower() # Check for coding-related tasks code_keywords = ['code', 'program', 'function', 'class', 'import', 'def '] if any(keyword in input_text for keyword in code_keywords): return 'coding' # Check for analysis tasks analysis_keywords = ['analyze', 'compare', 'explain', 'what is', 'how does'] if any(keyword in input_text for keyword in analysis_keywords): return 'analysis' # Check for long-form content if len(input_text.split()) > 50: return 'summarization' # Default to general return 'general' def display_response(response: dict, process_time: float, tokens_used: int): """ Display the response to the user. Args: response: The response dictionary from the agent process_time: Time taken to process the request tokens_used: Number of tokens used """ print("\n" + "="*80) if not response.get('success', False): print(f"Error: {response.get('error', '')}") return # Display main response print(response.get('response', 'No response generated')) # Display metadata if available if 'sources' in response or 'citations' in response: print("\nSources:") for source in response.get('sources', []): print(f"- {source}") # Display performance info print(f"\n[Processed in {process_time:.2f}s | Tokens: {tokens_used}]") # Display model info if available if 'model' in response: print(f"[Model: {response['model']}]") if __name__ == "__main__": import sys import time # Parse command line arguments parser = argparse.ArgumentParser(description='AI System Command Line Interface') parser.add_argument('--model', type=str, help='Default model to use') parser.add_argument('--strategy', type=str, default='best_for_task', help='Model selection strategy (default: best_for_task)') args = parser.parse_args() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('ai_system.log'), logging.StreamHandler() ] ) # Start the main application sys.exit(main())