Spaces:
Runtime error
Runtime error
| from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool | |
| import datetime | |
| import requests | |
| import pytz | |
| import yaml | |
| import os | |
| import json | |
| import uuid | |
| from datasets import Dataset | |
| from huggingface_hub import HfApi | |
| from openai import OpenAI | |
| from tools.final_answer import FinalAnswerTool | |
| from Gradio_UI import GradioUI | |
| # Define the Perplexity system prompt | |
| Perplex_Assistant_Prompt = """You are a helpful AI assistant that searches the web for accurate information.""" | |
| # Set up API key in environment variable as expected by HfApiModel | |
| os.environ["HUGGINGFACE_API_TOKEN"] = os.getenv("HUGGINGFACE_API_KEY", "") | |
| #@weave.op() | |
| def tracked_perplexity_call(prompt: str, system_messages: str, model_name: str = "sonar-pro", assistant_meta: bool = False): | |
| """Enhanced Perplexity API call with explicit model tracking.""" | |
| client = OpenAI(api_key=os.getenv("PERPLEXITY_API_KEY"), base_url="https://api.perplexity.ai") | |
| system_message = Perplex_Assistant_Prompt | |
| if assistant_meta: | |
| system_message += f"\n\n{system_messages}" | |
| # Minimal parameters for Perplexity | |
| return client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| stream=False, | |
| ).choices[0].message.content | |
| def Sonar_Web_Search_Tool(arg1: str, arg2: str) -> str: | |
| """A tool that accesses Perplexity Sonar to search the web when the answer requires or would benefit from a real world web reference. | |
| Args: | |
| arg1: User Prompt | |
| arg2: Details on the desired web search results as system message for sonar web search | |
| """ | |
| try: | |
| sonar_response = tracked_perplexity_call(arg1, arg2) | |
| return sonar_response | |
| except Exception as e: | |
| return f"Error using Sonar Websearch tool '{arg1} {arg2}': {str(e)}" | |
| def Dataset_Creator_Function(dataset_name: str, conversation_data: str) -> str: | |
| """Creates and pushes a dataset to Hugging Face with the conversation history. | |
| Args: | |
| dataset_name: Name for the dataset (will be prefixed with username) | |
| conversation_data: JSON string or formatted text of conversation to save | |
| Returns: | |
| URL of the created dataset or error message | |
| """ | |
| try: | |
| # Initialize Hugging Face API | |
| hf_api = HfApi(token=os.getenv("HUGGINGFACE_API_KEY", "")) | |
| # Get username for the repository name | |
| user_info = hf_api.whoami() | |
| username = user_info.get("name", "anonymous") | |
| # Format the full repo name | |
| repo_id = f"{username}/{dataset_name}" | |
| # If conversation_data is a string, try to parse it as JSON | |
| if isinstance(conversation_data, str): | |
| try: | |
| conversation = json.loads(conversation_data) | |
| except json.JSONDecodeError: | |
| # If not valid JSON, treat as raw text | |
| conversation = {"text": conversation_data} | |
| else: | |
| conversation = conversation_data | |
| # Add metadata | |
| conversation["timestamp"] = datetime.datetime.now().isoformat() | |
| conversation["dataset_id"] = str(uuid.uuid4()) | |
| # Create a Hugging Face dataset | |
| dataset = Dataset.from_dict({"conversations": [conversation]}) | |
| # Push to the Hugging Face Hub | |
| dataset.push_to_hub(repo_id, token=os.getenv("HF_API_KEY", "")) | |
| return f"Successfully created dataset at https://huggingface.co/datasets/{repo_id}" | |
| except Exception as e: | |
| return f"Error creating dataset: {str(e)}" | |
| def Dataset_Creator_Tool(dataset_name: str, conversation_data: str) -> str: | |
| """A tool that posts a new dataset of the current conversation to Hugging Face. | |
| Args: | |
| dataset_name: Name for the dataset (will be prefixed with username) | |
| conversation_data: Conversation history as JSON string or formatted text | |
| """ | |
| try: | |
| dataset_creator_response = Dataset_Creator_Function(dataset_name, conversation_data) | |
| return dataset_creator_response | |
| except Exception as e: | |
| return f"Error using Dataset Creator tool: {str(e)}" | |
| def get_current_time_in_timezone(timezone: str) -> str: | |
| """A tool that fetches the current local time in a specified timezone. | |
| Args: | |
| timezone: A string representing a valid timezone (e.g., 'America/New_York'). | |
| """ | |
| try: | |
| # Create timezone object | |
| tz = pytz.timezone(timezone) | |
| # Get current time in that timezone | |
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
| return f"The current local time in {timezone} is: {local_time}" | |
| except Exception as e: | |
| return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
| final_answer = FinalAnswerTool() | |
| # Remove the huggingface_api_key parameter - it's not supported | |
| model = HfApiModel( | |
| max_tokens=2096, | |
| temperature=0.5, | |
| model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud', # Using the backup endpoint | |
| custom_role_conversions=None | |
| ) | |
| # Import tool from Hub | |
| image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) | |
| with open("prompts.yaml", 'r') as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| agent = CodeAgent( | |
| model=model, | |
| tools=[ | |
| final_answer, | |
| Sonar_Web_Search_Tool, | |
| get_current_time_in_timezone, | |
| image_generation_tool, | |
| Dataset_Creator_Tool | |
| ], | |
| max_steps=6, | |
| verbosity_level=1, | |
| grammar=None, | |
| planning_interval=None, | |
| name=None, | |
| description=None, | |
| prompt_templates=prompt_templates | |
| ) | |
| # To fix the TypeError in Gradio_UI.py, you would need to modify that file | |
| # For now, we'll just use the agent directly | |
| try: | |
| GradioUI(agent).launch() | |
| except TypeError as e: | |
| if "unsupported operand type(s) for +=" in str(e): | |
| print("Error: Token counting issue in Gradio UI") | |
| print("To fix, edit Gradio_UI.py and change:") | |
| print("total_input_tokens += agent.model.last_input_token_count") | |
| print("To:") | |
| print("total_input_tokens += (agent.model.last_input_token_count or 0)") | |
| else: | |
| raise e |