Spaces:
Runtime error
Runtime error
| from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool, VisitWebpageTool | |
| import datetime | |
| import requests | |
| import pytz | |
| import yaml | |
| import os | |
| from datasets import Dataset | |
| from huggingface_hub import HfApi | |
| from openai import OpenAI | |
| from tools.final_answer import FinalAnswerTool | |
| from huggingface_hub import InferenceClient | |
| from Gradio_UI import GradioUI | |
| # Define the Perplexity system prompt | |
| Perplex_Assistant_Prompt = """You are a helpful AI assistant that searches the web for accurate information.""" | |
| # Set up API key in environment variable as expected by HfApiModel | |
| os.environ["HUGGINGFACE_API_TOKEN"] = os.getenv("HUGGINGFACE_API_KEY", "") | |
| # Initialize search tools with fallback capability | |
| try: | |
| # Try DuckDuckGo first (default) | |
| print("Initializing DuckDuckGo search tool...") | |
| ddg_search_tool = DuckDuckGoSearchTool(max_results=10) | |
| # Test the tool with a simple query | |
| test_result = ddg_search_tool("test query") | |
| print("DuckDuckGo search tool initialized successfully.") | |
| # Use DuckDuckGo as the primary search tool | |
| primary_search_tool = ddg_search_tool | |
| search_tool_name = "DuckDuckGo" | |
| except Exception as e: | |
| print(f"Error initializing DuckDuckGo search tool: {str(e)}") | |
| print("Falling back to Google search tool...") | |
| try: | |
| # Import GoogleSearchTool only if needed | |
| from smolagents import GoogleSearchTool | |
| google_search_tool = GoogleSearchTool() | |
| # Test the Google search tool | |
| test_result = google_search_tool("test query") | |
| print("Google search tool initialized successfully.") | |
| # Use Google as the fallback search tool | |
| primary_search_tool = google_search_tool | |
| search_tool_name = "Google" | |
| except Exception as google_error: | |
| print(f"Error initializing Google search tool: {str(google_error)}") | |
| print("WARNING: No working search tool available. Agent functionality will be limited.") | |
| # Create a minimal replacement that returns an explanatory message | |
| def search_fallback(query): | |
| return f"Search functionality unavailable. Both DuckDuckGo and Google search tools failed to initialize. Query was: {query}" | |
| primary_search_tool = search_fallback | |
| search_tool_name = "Unavailable" | |
| # Initialize the VisitWebpageTool | |
| visit_webpage_tool = VisitWebpageTool() | |
| #@weave.op() | |
| def tracked_perplexity_call(prompt: str, system_messages: str, model_name: str = "sonar-pro", assistant_meta: bool = False): | |
| """Enhanced Perplexity API call with explicit model tracking.""" | |
| client = OpenAI(api_key=os.getenv("PERPLEXITY_API_KEY"), base_url="https://api.perplexity.ai") | |
| system_message = Perplex_Assistant_Prompt | |
| if assistant_meta: | |
| system_message += f"\n\n{system_messages}" | |
| # Minimal parameters for Perplexity | |
| return client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| stream=False, | |
| ).choices[0].message.content | |
| def Sonar_Web_Search_Tool(arg1: str, arg2: str) -> str: | |
| """A tool that accesses Perplexity Sonar to search the web when the answer requires or would benefit from a real world web reference. | |
| Args: | |
| arg1: User Prompt | |
| arg2: Details on the desired web search results as system message for sonar web search | |
| """ | |
| try: | |
| sonar_response = tracked_perplexity_call(arg1, arg2) | |
| return sonar_response | |
| except Exception as e: | |
| return f"Error using Sonar Websearch tool '{arg1} {arg2}': {str(e)}" | |
| def parse_json(text: str): | |
| """ | |
| A safer JSON parser using ast.literal_eval. | |
| Converts JSON-like strings to Python objects without executing code. | |
| Handles common JSON literals (true, false, null) by converting them to Python equivalents. | |
| """ | |
| # Replace JSON literals with Python equivalents | |
| prepared_text = text.replace("true", "True").replace("false", "False").replace("null", "None") | |
| try: | |
| import ast | |
| return ast.literal_eval(prepared_text) | |
| except (SyntaxError, ValueError) as e: | |
| raise ValueError(f"Failed to parse JSON: {str(e)}") | |
| def Dataset_Creator_Function(dataset_name: str, conversation_data: str) -> str: | |
| """Creates and pushes a dataset to Hugging Face with the conversation history. | |
| Args: | |
| dataset_name: Name for the dataset (will be prefixed with username) | |
| conversation_data: String representing the conversation data. Can be: | |
| - JSON array of objects (each object becomes a row) | |
| - Pipe-separated values (first row as headers, subsequent rows as values) | |
| - Plain text (stored in a single 'text' column) | |
| Returns: | |
| URL of the created dataset or error message along with the log output. | |
| """ | |
| log_text = "" | |
| try: | |
| # Required imports | |
| import pandas as pd | |
| from datasets import Dataset, DatasetDict | |
| from huggingface_hub import HfApi | |
| # Get API key | |
| api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY") | |
| if not api_key: | |
| return "Error: No Hugging Face API key found in environment variables" | |
| # Set fixed username | |
| username = "Misfits-and-Machines" | |
| safe_dataset_name = dataset_name.replace(" ", "_").lower() | |
| repo_id = f"{username}/{safe_dataset_name}" | |
| log_text += f"Creating dataset: {repo_id}\n" | |
| # Ensure repository exists | |
| hf_api = HfApi(token=api_key) | |
| try: | |
| if not hf_api.repo_exists(repo_id=repo_id, repo_type="dataset"): | |
| hf_api.create_repo(repo_id=repo_id, repo_type="dataset") | |
| log_text += f"Created repository: {repo_id}\n" | |
| else: | |
| log_text += f"Repository already exists: {repo_id}\n" | |
| except Exception as e: | |
| log_text += f"Note when checking/creating repository: {str(e)}\n" | |
| # Process input data | |
| created_ds = None | |
| try: | |
| # Try parsing as JSON using the safer parse_json function | |
| try: | |
| json_data = parse_json(conversation_data) | |
| # Process based on data structure | |
| if isinstance(json_data, list) and all(isinstance(item, dict) for item in json_data): | |
| log_text += f"Processing JSON array with {len(json_data)} items\n" | |
| # Create a dataset with columns for all keys in the first item | |
| # This ensures the dataset structure is consistent | |
| first_item = json_data[0] | |
| columns = list(first_item.keys()) | |
| log_text += f"Detected columns: {columns}\n" | |
| # Initialize data dictionary with empty lists for each column | |
| data_dict = {col: [] for col in columns} | |
| # Process each item | |
| for item in json_data: | |
| for col in columns: | |
| # Get the value for this column, or empty string if missing | |
| value = item.get(col, "") | |
| data_dict[col].append(value) | |
| # Debug output to verify data structure | |
| for col in columns: | |
| log_text += f"Column '{col}' has {len(data_dict[col])} entries\n" | |
| # Create dataset from dictionary | |
| ds = Dataset.from_dict(data_dict) | |
| log_text += f"Created dataset with {len(ds)} rows\n" | |
| created_ds = DatasetDict({"train": ds}) | |
| elif isinstance(json_data, dict): | |
| log_text += "Processing single JSON object\n" | |
| # For a single object, create a dataset with one row | |
| data_dict = {k: [v] for k, v in json_data.items()} | |
| ds = Dataset.from_dict(data_dict) | |
| created_ds = DatasetDict({"train": ds}) | |
| else: | |
| raise ValueError("JSON not recognized as array or single object") | |
| except Exception as json_error: | |
| log_text += f"Not processing as JSON: {str(json_error)}\n" | |
| raise json_error # Propagate to next handler | |
| except Exception: | |
| # Try pipe-separated format | |
| lines = conversation_data.strip().split('\n') | |
| if '|' in conversation_data and len(lines) > 1: | |
| log_text += "Processing as pipe-separated data\n" | |
| headers = [h.strip() for h in lines[0].split('|')] | |
| log_text += f"Detected headers: {headers}\n" | |
| # Initialize data dictionary | |
| data_dict = {header: [] for header in headers} | |
| # Process each data row | |
| for i, line in enumerate(lines[1:], 1): | |
| if not line.strip(): | |
| continue | |
| values = [val.strip() for val in line.split('|')] | |
| if len(values) == len(headers): | |
| for j, header in enumerate(headers): | |
| data_dict[header].append(values[j]) | |
| else: | |
| log_text += f"Warning: Skipping row {i} (column count mismatch)\n" | |
| # Create dataset from dictionary | |
| if all(len(values) > 0 for values in data_dict.values()): | |
| ds = Dataset.from_dict(data_dict) | |
| log_text += f"Created dataset with {len(ds)} rows\n" | |
| created_ds = DatasetDict({"train": ds}) | |
| else: | |
| log_text += "No valid rows found in pipe-separated data\n" | |
| created_ds = DatasetDict({"train": Dataset.from_dict({"text": [conversation_data]})}) | |
| else: | |
| # Fallback for plain text | |
| log_text += "Processing as plain text\n" | |
| created_ds = DatasetDict({"train": Dataset.from_dict({"text": [conversation_data]})}) | |
| # Push using the DatasetDict push_to_hub method. | |
| log_text += f"Pushing dataset to {repo_id}\n" | |
| created_ds.push_to_hub( | |
| repo_id=repo_id, | |
| token=api_key, | |
| commit_message=f"Upload dataset: {dataset_name}" | |
| ) | |
| dataset_url = f"https://huggingface.co/datasets/{repo_id}" | |
| log_text += f"Dataset successfully pushed to: {dataset_url}\n" | |
| return f"Successfully created dataset at {dataset_url}\nLogs:\n{log_text}" | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| log_text += f"Dataset creation error: {str(e)}\n{error_trace}\n" | |
| return f"Error creating dataset: {str(e)}\nLogs:\n{log_text}" | |
| def Dataset_Creator_Tool(dataset_name: str, conversation_data: str) -> str: | |
| """A tool that creates and pushes a dataset to Hugging Face. | |
| Args: | |
| dataset_name: Name for the dataset (will be prefixed with 'Misfits-and-Machines/') | |
| conversation_data: Data content to save in the dataset. Formats supported: | |
| 1. JSON array of objects – Each object becomes a row (keys as columns). | |
| Example: [{"name": "Product A", "brand": "Company X"}, {"name": "Product B", "brand": "Company Y"}] | |
| 2. Pipe-separated values – First row as headers, remaining rows as values. | |
| Example: "name | brand\nProduct A | Company X\nProduct B | Company Y" | |
| 3. Plain text – Stored in a single 'text' column. | |
| Returns: | |
| A link to the created dataset on the Hugging Face Hub or an error message, along with log details. | |
| """ | |
| try: | |
| log_text = f"Creating dataset '{dataset_name}' with {len(conversation_data)} characters of data\n" | |
| log_text += f"Dataset will be created at Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}\n" | |
| # Call Dataset_Creator_Function directly without trying to define any new functions | |
| result = Dataset_Creator_Function(dataset_name, conversation_data) | |
| log_text += f"Dataset creation result: {result}\n" | |
| return log_text | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| return f"Error using Dataset Creator tool: {str(e)}\n{error_trace}" | |
| def verify_dataset_exists(repo_id: str) -> dict: | |
| """Verify that a dataset exists and is valid on the Hugging Face Hub. | |
| Args: | |
| repo_id: Full repository ID in format "username/dataset_name" | |
| Returns: | |
| Dict with "exists" boolean and "message" string | |
| """ | |
| try: | |
| # Check if dataset exists using the datasets-server API | |
| api_url = f"https://datasets-server.huggingface.co/is-valid?dataset={repo_id}" | |
| response = requests.get(api_url) | |
| # Parse the response | |
| if response.status_code == 200: | |
| data = response.json() | |
| # If any of these are True, the dataset exists in some form | |
| if data.get("viewer", False) or data.get("preview", False): | |
| return {"exists": True, "message": "Dataset is valid and accessible"} | |
| else: | |
| return {"exists": False, "message": "Dataset exists but may not be fully processed yet"} | |
| else: | |
| return {"exists": False, "message": f"API returned status code {response.status_code}"} | |
| except Exception as e: | |
| return {"exists": False, "message": f"Error verifying dataset: {str(e)}"} | |
| def Check_Dataset_Validity(dataset_name: str) -> str: | |
| """A tool that checks if a dataset exists and is valid on Hugging Face. | |
| Args: | |
| dataset_name: Name of the dataset to check (with or without organization prefix) | |
| Returns: | |
| Status message about the dataset validity | |
| """ | |
| try: | |
| # Ensure the dataset name has the organization prefix | |
| if "/" not in dataset_name: | |
| dataset_name = f"Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}" | |
| # Check dataset validity | |
| result = verify_dataset_exists(dataset_name) | |
| if result["exists"]: | |
| return f"Dataset '{dataset_name}' exists and is valid. You can access it at https://huggingface.co/datasets/{dataset_name}" | |
| else: | |
| return f"Dataset '{dataset_name}' could not be verified: {result['message']}. It may still be processing or may not exist." | |
| except Exception as e: | |
| return f"Error checking dataset validity: {str(e)}" | |
| def get_current_time_in_timezone(timezone: str) -> str: | |
| """A tool that fetches the current local time in a specified timezone. | |
| Args: | |
| timezone: A string representing a valid timezone (e.g., 'America/New_York'). | |
| """ | |
| try: | |
| # Create timezone object | |
| tz = pytz.timezone(timezone) | |
| # Get current time in that timezone | |
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
| return f"The current local time in {timezone} is: {local_time}" | |
| except Exception as e: | |
| return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
| final_answer = FinalAnswerTool() | |
| # Remove the huggingface_api_key parameter - it's not supported | |
| model = HfApiModel( | |
| max_tokens=2096, | |
| temperature=0.5, | |
| model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud', # Using the backup endpoint | |
| custom_role_conversions=None | |
| ) | |
| # Add fallback logic that only activates if the primary model fails | |
| def manage_context(prompt, max_allowed_tokens=30000): | |
| """Manages large contexts by summarizing or trimming when they get too big. | |
| This helps avoid the 'inputs tokens + max_new_tokens must be <= 32768' error | |
| by keeping the context size under control. | |
| Args: | |
| prompt: The full context/prompt that might be too large | |
| max_allowed_tokens: Maximum number of tokens to allow before trimming | |
| Returns: | |
| A potentially shortened/summarized version of the prompt | |
| """ | |
| # Rough token estimation (splitting on spaces is a crude approximation) | |
| estimated_tokens = len(prompt.split()) | |
| # If below threshold, return as is | |
| if estimated_tokens <= max_allowed_tokens: | |
| return prompt | |
| print(f"WARNING: Context size ({estimated_tokens} estimated tokens) exceeds limit ({max_allowed_tokens})") | |
| # For extremely large prompts, we need more aggressive handling | |
| if estimated_tokens > max_allowed_tokens * 1.5: | |
| print("Performing aggressive context management") | |
| # Approach 1: Keep only the most recent parts of the conversation | |
| lines = prompt.strip().split('\n') | |
| # Identify structural elements to keep | |
| instruction_idx = -1 | |
| for i, line in enumerate(lines): | |
| if "You are a" in line or "I want you to" in line: | |
| instruction_idx = i | |
| # Always keep the first part with instructions (system prompt) | |
| keep_beginning = lines[:instruction_idx + 20] if instruction_idx >= 0 else lines[:50] | |
| # Keep the most recent content (approximately half of the max tokens) | |
| keep_end = lines[-int(max_allowed_tokens/15):] | |
| # Add a note about trimming | |
| middle_note = [ | |
| "", | |
| "...", | |
| "[Context has been trimmed to fit token limits]", | |
| "...", | |
| "" | |
| ] | |
| # Combine parts | |
| shortened_prompt = "\n".join(keep_beginning + middle_note + keep_end) | |
| print(f"Context reduced from ~{estimated_tokens} to ~{len(shortened_prompt.split())} estimated tokens") | |
| return shortened_prompt | |
| # Moderate size reduction for moderately oversized prompts | |
| else: | |
| print("Performing moderate context management") | |
| # Split into lines for easier processing | |
| sections = prompt.split("\n\n") | |
| # Keep important sections like system instructions and recent content | |
| # Identify which sections to keep or trim | |
| keep_sections = [] | |
| trim_sections = [] | |
| # Process each section | |
| for i, section in enumerate(sections): | |
| # Always keep the first few sections (likely instructions) | |
| if i < 3: | |
| keep_sections.append(section) | |
| # Keep the last several sections (most recent and relevant) | |
| elif i > len(sections) - 8: | |
| keep_sections.append(section) | |
| # For code blocks, we should generally keep them | |
| elif "```" in section: | |
| keep_sections.append(section) | |
| # For very short sections, keep them | |
| elif len(section.split()) < 30: | |
| keep_sections.append(section) | |
| # For sections with likely important content, keep them | |
| elif any(marker in section.lower() for marker in ["important", "key", "critical", "necessary", "must"]): | |
| keep_sections.append(section) | |
| # Otherwise, candidate for trimming | |
| else: | |
| trim_sections.append(section) | |
| # If we still need to trim more, start removing some of the trim_sections | |
| if len(" ".join(keep_sections).split()) > max_allowed_tokens * 0.8: | |
| # Keep only a portion of the trim_sections | |
| trim_to_keep = int(len(trim_sections) * 0.3) # Keep 30% | |
| trim_sections = trim_sections[:trim_to_keep] | |
| # Build final prompt with a note about trimming | |
| final_sections = keep_sections + ["[Some context has been summarized to fit token limits]"] + trim_sections | |
| final_prompt = "\n\n".join(final_sections) | |
| print(f"Context reduced from ~{estimated_tokens} to ~{len(final_prompt.split())} estimated tokens") | |
| return final_prompt | |
| # Now update the try_model_call_with_fallbacks function to use this context management | |
| def try_model_call_with_fallbacks(prompt): | |
| """Try to use the primary model first with aggressive context management.""" | |
| # First, ALWAYS apply context management, but more aggressively | |
| try: | |
| # Get a rough token count estimate | |
| estimated_tokens = len(prompt.split()) | |
| print(f"Estimated input tokens: {estimated_tokens}") | |
| # Start with 25000 as the maximum (leaving ~6K tokens buffer for the model limits) | |
| managed_prompt = manage_context(prompt, max_allowed_tokens=25000) | |
| # If still potentially too large, reduce further | |
| if len(managed_prompt.split()) > 24000: | |
| print("First context reduction still too large, reducing further...") | |
| managed_prompt = manage_context(managed_prompt, max_allowed_tokens=22000) | |
| # Final emergency truncation if needed | |
| if len(managed_prompt.split()) > 22000: | |
| print("Emergency truncation required") | |
| words = managed_prompt.split() | |
| # Keep first 5000 and last 15000 words with a note in between | |
| managed_prompt = " ".join(words[:5000]) + "\n\n[CONTEXT SEVERELY TRUNCATED]\n\n" + " ".join(words[-15000:]) | |
| print(f"Final managed prompt size: {len(managed_prompt.split())} estimated tokens") | |
| # Temporarily reduce output tokens even further if the prompt is large | |
| temp_max_tokens = model.max_tokens | |
| if len(managed_prompt.split()) > 20000: | |
| print("Large prompt detected, temporarily reducing output tokens") | |
| model.max_tokens = 750 # Temporarily reduce to 750 for this call | |
| try: | |
| result = original_call(managed_prompt) | |
| model.max_tokens = temp_max_tokens # Restore original setting | |
| return result | |
| except Exception as call_error: | |
| # Restore original setting before handling the error | |
| model.max_tokens = temp_max_tokens | |
| raise call_error | |
| except Exception as primary_error: | |
| # If we still get a token limit error, try even more aggressive reduction | |
| if "Input validation error: inputs tokens + max_new_tokens" in str(primary_error): | |
| try: | |
| print("Critical: Token limit exceeded despite context management. Implementing emergency measures...") | |
| # Take a more drastic approach - keep only system instructions and last part | |
| lines = prompt.strip().split('\n') | |
| # Keep first 50 lines and last 100 lines only | |
| emergency_prompt = "\n".join(lines[:50] + ["\n[MAJORITY OF CONTEXT REMOVED DUE TO TOKEN LIMITS]\n"] + lines[-100:]) | |
| # Reduce output tokens drastically | |
| temp_max_tokens = model.max_tokens | |
| model.max_tokens = 500 | |
| try: | |
| result = original_call(emergency_prompt) | |
| model.max_tokens = temp_max_tokens | |
| return result | |
| except Exception: | |
| model.max_tokens = temp_max_tokens | |
| print("Emergency measures failed. Trying fallback models...") | |
| except Exception: | |
| print("Emergency context management failed. Proceeding to fallback models...") | |
| print(f"Primary model call failed: {str(primary_error)}") | |
| print("Trying fallback models...") | |
| # Rest of fallback logic remains the same... | |
| # List of fallback models | |
| fallbacks = [ | |
| { | |
| "provider": "sambanova", | |
| "model_name": "Qwen/Qwen2.5-Coder-32B-Instruct", | |
| "display_name": "Qwen 2.5 Coder 32B" | |
| }, | |
| { | |
| "provider": "hf-inference", | |
| "model_name": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", | |
| "display_name": "DeepSeek R1 Distill Qwen 32B" | |
| } | |
| ] | |
| # Get API key | |
| api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY") | |
| if not api_key: | |
| raise ValueError("No Hugging Face API key found in environment variables") | |
| # Try each fallback model in sequence with highly aggressive context management | |
| for fallback in fallbacks: | |
| try: | |
| print(f"Trying fallback model: {fallback['display_name']}") | |
| client = InferenceClient(provider=fallback["provider"], api_key=api_key) | |
| # Apply even more aggressive context management for fallbacks | |
| emergency_prompt = manage_context(prompt, max_allowed_tokens=15000) | |
| messages = [{"role": "user", "content": emergency_prompt}] | |
| completion = client.chat.completions.create( | |
| model=fallback["model_name"], | |
| messages=messages, | |
| max_tokens=1000, # Reduced tokens for output | |
| temperature=0.5 | |
| ) | |
| print(f"Successfully used fallback model: {fallback['display_name']}") | |
| return completion.choices[0].message.content | |
| except Exception as e: | |
| print(f"Fallback model {fallback['display_name']} failed: {str(e)}") | |
| continue | |
| # If all fallbacks fail, provide a useful error message | |
| return "ERROR: Unable to process request due to context size limitations. Please break your request into smaller parts or simplify your query." | |
| # Monkey patch the model's __call__ method to use our fallback logic | |
| original_call = model.__call__ | |
| model.__call__ = try_model_call_with_fallbacks | |
| # Reduce the model's output tokens immediately to improve chances of success | |
| model.max_tokens = 1000 # Reduce from 2096 to 1000 to stay under token limits | |
| # Import tool from Hub | |
| image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) | |
| with open("prompts.yaml", 'r') as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| # Override CodeAgent run to automatically clear context periodically | |
| original_code_agent_run = CodeAgent.run | |
| def context_managed_run(self, query, max_steps=None, persist_conversation=None): | |
| """Override to periodically clear conversation context after steps""" | |
| # Initialize step counter if not present | |
| if not hasattr(self, '_context_step_counter'): | |
| self._context_step_counter = 0 | |
| # Increment counter | |
| self._context_step_counter += 1 | |
| # Every 5 steps, perform aggressive context management | |
| if self._context_step_counter >= 5: | |
| print("Performing periodic conversation cleanup") | |
| self._context_step_counter = 0 | |
| # Clear most of the conversation history while keeping essential parts | |
| if hasattr(self, 'conversation') and len(self.conversation) > 8: | |
| original_length = len(self.conversation) | |
| # Find system messages | |
| system_messages = [msg for msg in self.conversation if msg.get('role') == 'system'] | |
| # Get the most recent exchanges (last 4 pairs of messages) | |
| recent_messages = self.conversation[-8:] | |
| # Rebuild conversation with system messages + note + recent messages | |
| self.conversation = system_messages + [ | |
| {'role': 'system', 'content': '[NOTICE: Previous conversation history has been trimmed to manage context size]'} | |
| ] + recent_messages | |
| print(f"Conversation history trimmed from {original_length} to {len(self.conversation)} messages") | |
| # Call the original run method | |
| return original_code_agent_run(self, query, max_steps, persist_conversation) | |
| # Apply the monkey patch to CodeAgent class | |
| CodeAgent.run = context_managed_run | |
| # Initialize the agent using standard smolagents patterns | |
| agent = CodeAgent( | |
| model=model, | |
| tools=[ | |
| final_answer, | |
| Sonar_Web_Search_Tool, | |
| primary_search_tool, # This is already set to either DuckDuckGo, Google, or fallback | |
| get_current_time_in_timezone, | |
| image_generation_tool, | |
| Dataset_Creator_Tool, | |
| Check_Dataset_Validity, | |
| visit_webpage_tool, # This is correctly initialized as VisitWebpageTool() | |
| ], | |
| max_steps=12, | |
| verbosity_level=1, | |
| grammar=None, | |
| planning_interval=2, | |
| name="Research Assistant", | |
| description="""An AI assistant that can search the web, create datasets, and answer questions # Note about working within token limits | |
| # When using with queries that might exceed token limits, consider: | |
| # 1. Breaking tasks into smaller sub-tasks | |
| # 2. Limiting the amount of data returned by search tools | |
| # 3. Using the planning_interval to enable more effective reasoning""", | |
| prompt_templates=prompt_templates | |
| ) | |
| # Add informative message about which search tool is being used | |
| print(f"Agent initialized with {search_tool_name} as primary search tool") | |
| print(f"Available tools: final_answer, Sonar_Web_Search_Tool, {search_tool_name}, get_current_time_in_timezone, image_generation_tool, Dataset_Creator_Tool, Check_Dataset_Validity, visit_webpage_tool") | |
| # Note about working within token limits - add this comment | |
| # When using with queries that might exceed token limits, consider: | |
| # 1. Breaking tasks into smaller sub-tasks | |
| # 2. Limiting the amount of data returned by search tools | |
| # 3. Using the planning_interval to enable more effective reasoning | |
| # To fix the TypeError in Gradio_UI.py, you would need to modify that file | |
| # For now, we'll just use the agent directly | |
| try: | |
| GradioUI(agent).launch() | |
| except TypeError as e: | |
| if "unsupported operand type(s) for +=" in str(e): | |
| print("Error: Token counting issue in Gradio UI") | |
| print("To fix, edit Gradio_UI.py and change:") | |
| print("total_input_tokens += agent.model.last_input_token_count") | |
| print("To:") | |
| print("total_input_tokens += (agent.model.last_input_token_count or 0)") | |
| else: | |
| raise e |