Spaces:
Runtime error
Runtime error
| from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool | |
| import datetime | |
| import requests | |
| import pytz | |
| import yaml | |
| import os | |
| import json | |
| import uuid | |
| from datasets import Dataset | |
| from huggingface_hub import HfApi | |
| from openai import OpenAI | |
| from tools.final_answer import FinalAnswerTool | |
| from Gradio_UI import GradioUI | |
| # Define the Perplexity system prompt | |
| Perplex_Assistant_Prompt = """You are a helpful AI assistant that searches the web for accurate information.""" | |
| # Set up API key in environment variable as expected by HfApiModel | |
| os.environ["HUGGINGFACE_API_TOKEN"] = os.getenv("HUGGINGFACE_API_KEY", "") | |
| # Initialize the standard search tools | |
| ddg_search_tool = DuckDuckGoSearchTool(max_results=10) # Default is 10 results | |
| # google_search_tool = GoogleSearchTool() | |
| #@weave.op() | |
| def tracked_perplexity_call(prompt: str, system_messages: str, model_name: str = "sonar-pro", assistant_meta: bool = False): | |
| """Enhanced Perplexity API call with explicit model tracking.""" | |
| client = OpenAI(api_key=os.getenv("PERPLEXITY_API_KEY"), base_url="https://api.perplexity.ai") | |
| system_message = Perplex_Assistant_Prompt | |
| if assistant_meta: | |
| system_message += f"\n\n{system_messages}" | |
| # Minimal parameters for Perplexity | |
| return client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| stream=False, | |
| ).choices[0].message.content | |
| def Sonar_Web_Search_Tool(arg1: str, arg2: str) -> str: | |
| """A tool that accesses Perplexity Sonar to search the web when the answer requires or would benefit from a real world web reference. | |
| Args: | |
| arg1: User Prompt | |
| arg2: Details on the desired web search results as system message for sonar web search | |
| """ | |
| try: | |
| sonar_response = tracked_perplexity_call(arg1, arg2) | |
| return sonar_response | |
| except Exception as e: | |
| return f"Error using Sonar Websearch tool '{arg1} {arg2}': {str(e)}" | |
| def Dataset_Creator_Function(dataset_name: str, conversation_data: str) -> str: | |
| """Creates and pushes a dataset to Hugging Face with the conversation history. | |
| Args: | |
| dataset_name: Name for the dataset (will be prefixed with username) | |
| conversation_data: String representing the conversation data. Can be: | |
| - JSON array of objects (each object becomes a row) | |
| - Pipe-separated values (col1 | col2 | col3) for tabular data | |
| - Plain text (stored in a 'text' column) | |
| Returns: | |
| URL of the created dataset or error message | |
| """ | |
| try: | |
| # Get API key from environment variables | |
| api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY") | |
| if not api_key: | |
| return "Error: No Hugging Face API key found in environment variables" | |
| # Set fixed username for dataset organization | |
| username = "Misfits-and-Machines" | |
| # Initialize Hugging Face API | |
| hf_api = HfApi(token=api_key) | |
| # Sanitize dataset name | |
| safe_dataset_name = dataset_name.replace(" ", "_").lower() | |
| repo_id = f"{username}/{safe_dataset_name}" | |
| print(f"Creating dataset: {repo_id}") | |
| # Check if the repository exists or create it | |
| try: | |
| repo_exists = hf_api.repo_exists(repo_id=repo_id, repo_type="dataset") | |
| if not repo_exists: | |
| hf_api.create_repo(repo_id=repo_id, repo_type="dataset") | |
| print(f"Created repository: {repo_id}") | |
| else: | |
| print(f"Repository already exists: {repo_id}") | |
| except Exception as e: | |
| print(f"Note when checking/creating repository: {str(e)}") | |
| # Check if data is JSON first (preferred format) | |
| is_json = False | |
| try: | |
| # Try to parse as JSON | |
| json_data = json.loads(conversation_data) | |
| # Check if it's an array of objects (preferred structure) | |
| if isinstance(json_data, list) and all(isinstance(item, dict) for item in json_data) and len(json_data) > 0: | |
| print(f"Processing as JSON array with {len(json_data)} items") | |
| # Extract all keys to ensure consistent columns | |
| all_keys = set() | |
| for item in json_data: | |
| all_keys.update(item.keys()) | |
| all_keys = sorted(list(all_keys)) # Sort keys for consistent order | |
| print(f"Detected columns: {', '.join(all_keys)}") | |
| # Create dataset with proper structure | |
| rows = [] | |
| for item in json_data: | |
| row = {key: item.get(key, "") for key in all_keys} | |
| rows.append(row) | |
| # Convert to pandas DataFrame for better control | |
| import pandas as pd | |
| df = pd.DataFrame(rows) | |
| print(df.head()) # Print first few rows for verification | |
| # Create dataset from pandas DataFrame | |
| from datasets import Dataset | |
| dataset = Dataset.from_pandas(df) | |
| # Push to Hugging Face Hub with the train split name | |
| dataset.push_to_hub( | |
| repo_id=repo_id, | |
| token=api_key, | |
| split="train", | |
| commit_message=f"Upload JSON dataset: {dataset_name}" | |
| ) | |
| print(f"Successfully pushed JSON dataset with {len(json_data)} rows") | |
| is_json = True | |
| elif isinstance(json_data, dict): | |
| # Single object - convert to dataset | |
| print("Processing as single JSON object") | |
| import pandas as pd | |
| df = pd.DataFrame([json_data]) | |
| dataset = Dataset.from_pandas(df) | |
| # Push to Hugging Face Hub | |
| dataset.push_to_hub( | |
| repo_id=repo_id, | |
| token=api_key, | |
| split="train", | |
| commit_message=f"Upload single JSON object: {dataset_name}" | |
| ) | |
| is_json = True | |
| except json.JSONDecodeError: | |
| # Not valid JSON, will try other formats | |
| print("Not valid JSON, checking other formats...") | |
| # If not JSON, check if data is structured with pipe separators | |
| if not is_json: | |
| lines = conversation_data.strip().split('\n') | |
| is_structured = '|' in conversation_data and len(lines) > 1 | |
| if is_structured: | |
| print("Detected pipe-separated structured data") | |
| # Parse the header row for column names | |
| header = lines[0].strip() | |
| headers = [col.strip() for col in header.split('|')] | |
| # Create structured data | |
| import pandas as pd | |
| rows = [] | |
| # Process each data row | |
| for i, line in enumerate(lines[1:], 1): | |
| if not line.strip(): | |
| continue | |
| values = [val.strip() for val in line.split('|')] | |
| # Ensure we have the right number of values | |
| if len(values) == len(headers): | |
| row = {headers[j]: values[j] for j in range(len(headers))} | |
| rows.append(row) | |
| else: | |
| print(f"Warning: Skipping row {i} due to mismatch in column count") | |
| # Create dataset from pandas DataFrame | |
| df = pd.DataFrame(rows) | |
| dataset = Dataset.from_pandas(df) | |
| # Push to Hugging Face Hub | |
| dataset.push_to_hub( | |
| repo_id=repo_id, | |
| token=api_key, | |
| split="train", | |
| commit_message=f"Upload structured data: {dataset_name}" | |
| ) | |
| print(f"Successfully pushed structured dataset with {len(rows)} rows") | |
| else: | |
| # Handle as regular text data (single row) | |
| print("Processing as regular text data") | |
| dataset = Dataset.from_dict({"text": [conversation_data]}) | |
| # Push to Hugging Face Hub | |
| dataset.push_to_hub( | |
| repo_id=repo_id, | |
| token=api_key, | |
| split="train", | |
| commit_message=f"Upload text data: {dataset_name}" | |
| ) | |
| # Generate the URL for the dataset | |
| dataset_url = f"https://huggingface.co/datasets/{repo_id}" | |
| print(f"Dataset successfully pushed to: {dataset_url}") | |
| return f"Successfully created dataset at {dataset_url}" | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| print(f"Dataset creation error: {str(e)}\n{error_trace}") | |
| return f"Error creating dataset: {str(e)}\n\nTroubleshooting tips:\n1. Verify your HF_API_KEY is valid\n2. Try a simpler dataset name with only letters and underscores" | |
| def Dataset_Creator_Tool(dataset_name: str, conversation_data: str) -> str: | |
| """A tool that posts a new dataset of the current conversation to Hugging Face. | |
| Args: | |
| dataset_name: Name for the dataset (will be prefixed with 'Misfits-and-Machines/') | |
| conversation_data: String content to save to the dataset | |
| Returns: | |
| Link to the created dataset or error message with troubleshooting steps | |
| """ | |
| try: | |
| print(f"Creating dataset '{dataset_name}' with {len(conversation_data)} characters of data") | |
| print(f"Dataset will be created at Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}") | |
| result = Dataset_Creator_Function(dataset_name, conversation_data) | |
| print(f"Dataset creation result: {result}") | |
| return result | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| return f"Error using Dataset Creator tool: {str(e)}\n{error_trace}" | |
| def verify_dataset_exists(repo_id: str) -> dict: | |
| """Verify that a dataset exists and is valid on the Hugging Face Hub. | |
| Args: | |
| repo_id: Full repository ID in format "username/dataset_name" | |
| Returns: | |
| Dict with "exists" boolean and "message" string | |
| """ | |
| try: | |
| # Check if dataset exists using the datasets-server API | |
| api_url = f"https://datasets-server.huggingface.co/is-valid?dataset={repo_id}" | |
| response = requests.get(api_url) | |
| # Parse the response | |
| if response.status_code == 200: | |
| data = response.json() | |
| # If any of these are True, the dataset exists in some form | |
| if data.get("viewer", False) or data.get("preview", False): | |
| return {"exists": True, "message": "Dataset is valid and accessible"} | |
| else: | |
| return {"exists": False, "message": "Dataset exists but may not be fully processed yet"} | |
| else: | |
| return {"exists": False, "message": f"API returned status code {response.status_code}"} | |
| except Exception as e: | |
| return {"exists": False, "message": f"Error verifying dataset: {str(e)}"} | |
| def Check_Dataset_Validity(dataset_name: str) -> str: | |
| """A tool that checks if a dataset exists and is valid on Hugging Face. | |
| Args: | |
| dataset_name: Name of the dataset to check (with or without organization prefix) | |
| Returns: | |
| Status message about the dataset validity | |
| """ | |
| try: | |
| # Ensure the dataset name has the organization prefix | |
| if "/" not in dataset_name: | |
| dataset_name = f"Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}" | |
| # Check dataset validity | |
| result = verify_dataset_exists(dataset_name) | |
| if result["exists"]: | |
| return f"Dataset '{dataset_name}' exists and is valid. You can access it at https://huggingface.co/datasets/{dataset_name}" | |
| else: | |
| return f"Dataset '{dataset_name}' could not be verified: {result['message']}. It may still be processing or may not exist." | |
| except Exception as e: | |
| return f"Error checking dataset validity: {str(e)}" | |
| def get_current_time_in_timezone(timezone: str) -> str: | |
| """A tool that fetches the current local time in a specified timezone. | |
| Args: | |
| timezone: A string representing a valid timezone (e.g., 'America/New_York'). | |
| """ | |
| try: | |
| # Create timezone object | |
| tz = pytz.timezone(timezone) | |
| # Get current time in that timezone | |
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
| return f"The current local time in {timezone} is: {local_time}" | |
| except Exception as e: | |
| return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
| final_answer = FinalAnswerTool() | |
| # Remove the huggingface_api_key parameter - it's not supported | |
| model = HfApiModel( | |
| max_tokens=2096, | |
| temperature=0.5, | |
| model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud', # Using the backup endpoint | |
| custom_role_conversions=None | |
| ) | |
| # Import tool from Hub | |
| image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) | |
| with open("prompts.yaml", 'r') as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| agent = CodeAgent( | |
| model=model, | |
| tools=[ | |
| final_answer, | |
| Sonar_Web_Search_Tool, | |
| ddg_search_tool, # Added DuckDuckGo search tool | |
| # google_search_tool, # Added Google search tool | |
| get_current_time_in_timezone, | |
| image_generation_tool, | |
| Dataset_Creator_Tool, | |
| Check_Dataset_Validity | |
| ], | |
| max_steps=6, | |
| verbosity_level=1, | |
| grammar=None, | |
| planning_interval=None, | |
| name=None, | |
| description=None, | |
| prompt_templates=prompt_templates | |
| ) | |
| # To fix the TypeError in Gradio_UI.py, you would need to modify that file | |
| # For now, we'll just use the agent directly | |
| try: | |
| GradioUI(agent).launch() | |
| except TypeError as e: | |
| if "unsupported operand type(s) for +=" in str(e): | |
| print("Error: Token counting issue in Gradio UI") | |
| print("To fix, edit Gradio_UI.py and change:") | |
| print("total_input_tokens += agent.model.last_input_token_count") | |
| print("To:") | |
| print("total_input_tokens += (agent.model.last_input_token_count or 0)") | |
| else: | |
| raise e |