from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool import datetime import requests import pytz import yaml import os import json import uuid from datasets import Dataset from huggingface_hub import HfApi from openai import OpenAI from tools.final_answer import FinalAnswerTool from Gradio_UI import GradioUI # Define the Perplexity system prompt Perplex_Assistant_Prompt = """You are a helpful AI assistant that searches the web for accurate information.""" # Set up API key in environment variable as expected by HfApiModel os.environ["HUGGINGFACE_API_TOKEN"] = os.getenv("HUGGINGFACE_API_KEY", "") # Initialize the standard search tools ddg_search_tool = DuckDuckGoSearchTool(max_results=10) # Default is 10 results # google_search_tool = GoogleSearchTool() #@weave.op() def tracked_perplexity_call(prompt: str, system_messages: str, model_name: str = "sonar-pro", assistant_meta: bool = False): """Enhanced Perplexity API call with explicit model tracking.""" client = OpenAI(api_key=os.getenv("PERPLEXITY_API_KEY"), base_url="https://api.perplexity.ai") system_message = Perplex_Assistant_Prompt if assistant_meta: system_message += f"\n\n{system_messages}" # Minimal parameters for Perplexity return client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": prompt}, ], stream=False, ).choices[0].message.content @tool def Sonar_Web_Search_Tool(arg1: str, arg2: str) -> str: """A tool that accesses Perplexity Sonar to search the web when the answer requires or would benefit from a real world web reference. Args: arg1: User Prompt arg2: Details on the desired web search results as system message for sonar web search """ try: sonar_response = tracked_perplexity_call(arg1, arg2) return sonar_response except Exception as e: return f"Error using Sonar Websearch tool '{arg1} {arg2}': {str(e)}" def Dataset_Creator_Function(dataset_name: str, conversation_data: str) -> str: """Creates and pushes a dataset to Hugging Face with the conversation history. Args: dataset_name: Name for the dataset (will be prefixed with username) conversation_data: String representing the conversation data. Can be: - JSON array of objects (each object becomes a row) - Pipe-separated values (col1 | col2 | col3) for tabular data - Plain text (stored in a 'text' column) Returns: URL of the created dataset or error message """ try: # Get API key from environment variables api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY") if not api_key: return "Error: No Hugging Face API key found in environment variables" # Set fixed username for dataset organization username = "Misfits-and-Machines" # Initialize Hugging Face API hf_api = HfApi(token=api_key) # Sanitize dataset name safe_dataset_name = dataset_name.replace(" ", "_").lower() repo_id = f"{username}/{safe_dataset_name}" print(f"Creating dataset: {repo_id}") # Check if the repository exists or create it try: repo_exists = hf_api.repo_exists(repo_id=repo_id, repo_type="dataset") if not repo_exists: hf_api.create_repo(repo_id=repo_id, repo_type="dataset") print(f"Created repository: {repo_id}") else: print(f"Repository already exists: {repo_id}") except Exception as e: print(f"Note when checking/creating repository: {str(e)}") # Check if data is JSON first (preferred format) is_json = False try: # Try to parse as JSON json_data = json.loads(conversation_data) # Check if it's an array of objects (preferred structure) if isinstance(json_data, list) and all(isinstance(item, dict) for item in json_data) and len(json_data) > 0: print(f"Processing as JSON array with {len(json_data)} items") # Extract all keys to ensure consistent columns all_keys = set() for item in json_data: all_keys.update(item.keys()) all_keys = sorted(list(all_keys)) # Sort keys for consistent order print(f"Detected columns: {', '.join(all_keys)}") # Create dataset with proper structure rows = [] for item in json_data: row = {key: item.get(key, "") for key in all_keys} rows.append(row) # Convert to pandas DataFrame for better control import pandas as pd df = pd.DataFrame(rows) print(df.head()) # Print first few rows for verification # Create dataset from pandas DataFrame from datasets import Dataset dataset = Dataset.from_pandas(df) # Push to Hugging Face Hub with the train split name dataset.push_to_hub( repo_id=repo_id, token=api_key, split="train", commit_message=f"Upload JSON dataset: {dataset_name}" ) print(f"Successfully pushed JSON dataset with {len(json_data)} rows") is_json = True elif isinstance(json_data, dict): # Single object - convert to dataset print("Processing as single JSON object") import pandas as pd df = pd.DataFrame([json_data]) dataset = Dataset.from_pandas(df) # Push to Hugging Face Hub dataset.push_to_hub( repo_id=repo_id, token=api_key, split="train", commit_message=f"Upload single JSON object: {dataset_name}" ) is_json = True except json.JSONDecodeError: # Not valid JSON, will try other formats print("Not valid JSON, checking other formats...") # If not JSON, check if data is structured with pipe separators if not is_json: lines = conversation_data.strip().split('\n') is_structured = '|' in conversation_data and len(lines) > 1 if is_structured: print("Detected pipe-separated structured data") # Parse the header row for column names header = lines[0].strip() headers = [col.strip() for col in header.split('|')] # Create structured data import pandas as pd rows = [] # Process each data row for i, line in enumerate(lines[1:], 1): if not line.strip(): continue values = [val.strip() for val in line.split('|')] # Ensure we have the right number of values if len(values) == len(headers): row = {headers[j]: values[j] for j in range(len(headers))} rows.append(row) else: print(f"Warning: Skipping row {i} due to mismatch in column count") # Create dataset from pandas DataFrame df = pd.DataFrame(rows) dataset = Dataset.from_pandas(df) # Push to Hugging Face Hub dataset.push_to_hub( repo_id=repo_id, token=api_key, split="train", commit_message=f"Upload structured data: {dataset_name}" ) print(f"Successfully pushed structured dataset with {len(rows)} rows") else: # Handle as regular text data (single row) print("Processing as regular text data") dataset = Dataset.from_dict({"text": [conversation_data]}) # Push to Hugging Face Hub dataset.push_to_hub( repo_id=repo_id, token=api_key, split="train", commit_message=f"Upload text data: {dataset_name}" ) # Generate the URL for the dataset dataset_url = f"https://huggingface.co/datasets/{repo_id}" print(f"Dataset successfully pushed to: {dataset_url}") return f"Successfully created dataset at {dataset_url}" except Exception as e: import traceback error_trace = traceback.format_exc() print(f"Dataset creation error: {str(e)}\n{error_trace}") return f"Error creating dataset: {str(e)}\n\nTroubleshooting tips:\n1. Verify your HF_API_KEY is valid\n2. Try a simpler dataset name with only letters and underscores" @tool def Dataset_Creator_Tool(dataset_name: str, conversation_data: str) -> str: """A tool that posts a new dataset of the current conversation to Hugging Face. Args: dataset_name: Name for the dataset (will be prefixed with 'Misfits-and-Machines/') conversation_data: String content to save to the dataset Returns: Link to the created dataset or error message with troubleshooting steps """ try: print(f"Creating dataset '{dataset_name}' with {len(conversation_data)} characters of data") print(f"Dataset will be created at Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}") result = Dataset_Creator_Function(dataset_name, conversation_data) print(f"Dataset creation result: {result}") return result except Exception as e: import traceback error_trace = traceback.format_exc() return f"Error using Dataset Creator tool: {str(e)}\n{error_trace}" def verify_dataset_exists(repo_id: str) -> dict: """Verify that a dataset exists and is valid on the Hugging Face Hub. Args: repo_id: Full repository ID in format "username/dataset_name" Returns: Dict with "exists" boolean and "message" string """ try: # Check if dataset exists using the datasets-server API api_url = f"https://datasets-server.huggingface.co/is-valid?dataset={repo_id}" response = requests.get(api_url) # Parse the response if response.status_code == 200: data = response.json() # If any of these are True, the dataset exists in some form if data.get("viewer", False) or data.get("preview", False): return {"exists": True, "message": "Dataset is valid and accessible"} else: return {"exists": False, "message": "Dataset exists but may not be fully processed yet"} else: return {"exists": False, "message": f"API returned status code {response.status_code}"} except Exception as e: return {"exists": False, "message": f"Error verifying dataset: {str(e)}"} @tool def Check_Dataset_Validity(dataset_name: str) -> str: """A tool that checks if a dataset exists and is valid on Hugging Face. Args: dataset_name: Name of the dataset to check (with or without organization prefix) Returns: Status message about the dataset validity """ try: # Ensure the dataset name has the organization prefix if "/" not in dataset_name: dataset_name = f"Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}" # Check dataset validity result = verify_dataset_exists(dataset_name) if result["exists"]: return f"Dataset '{dataset_name}' exists and is valid. You can access it at https://huggingface.co/datasets/{dataset_name}" else: return f"Dataset '{dataset_name}' could not be verified: {result['message']}. It may still be processing or may not exist." except Exception as e: return f"Error checking dataset validity: {str(e)}" @tool def get_current_time_in_timezone(timezone: str) -> str: """A tool that fetches the current local time in a specified timezone. Args: timezone: A string representing a valid timezone (e.g., 'America/New_York'). """ try: # Create timezone object tz = pytz.timezone(timezone) # Get current time in that timezone local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") return f"The current local time in {timezone} is: {local_time}" except Exception as e: return f"Error fetching time for timezone '{timezone}': {str(e)}" final_answer = FinalAnswerTool() # Remove the huggingface_api_key parameter - it's not supported model = HfApiModel( max_tokens=2096, temperature=0.5, model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud', # Using the backup endpoint custom_role_conversions=None ) # Import tool from Hub image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) with open("prompts.yaml", 'r') as stream: prompt_templates = yaml.safe_load(stream) agent = CodeAgent( model=model, tools=[ final_answer, Sonar_Web_Search_Tool, ddg_search_tool, # Added DuckDuckGo search tool # google_search_tool, # Added Google search tool get_current_time_in_timezone, image_generation_tool, Dataset_Creator_Tool, Check_Dataset_Validity ], max_steps=6, verbosity_level=1, grammar=None, planning_interval=None, name=None, description=None, prompt_templates=prompt_templates ) # To fix the TypeError in Gradio_UI.py, you would need to modify that file # For now, we'll just use the agent directly try: GradioUI(agent).launch() except TypeError as e: if "unsupported operand type(s) for +=" in str(e): print("Error: Token counting issue in Gradio UI") print("To fix, edit Gradio_UI.py and change:") print("total_input_tokens += agent.model.last_input_token_count") print("To:") print("total_input_tokens += (agent.model.last_input_token_count or 0)") else: raise e