SolshineMisfit's picture
Push dataset to hub now to handle data restructure and upload in chunks
dfafa93 verified
raw
history blame
15 kB
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool
import datetime
import requests
import pytz
import yaml
import os
import json
import uuid
from datasets import Dataset
from huggingface_hub import HfApi
from openai import OpenAI
from tools.final_answer import FinalAnswerTool
from Gradio_UI import GradioUI
# Define the Perplexity system prompt
Perplex_Assistant_Prompt = """You are a helpful AI assistant that searches the web for accurate information."""
# Set up API key in environment variable as expected by HfApiModel
os.environ["HUGGINGFACE_API_TOKEN"] = os.getenv("HUGGINGFACE_API_KEY", "")
# Initialize the standard search tools
ddg_search_tool = DuckDuckGoSearchTool(max_results=10) # Default is 10 results
# google_search_tool = GoogleSearchTool()
#@weave.op()
def tracked_perplexity_call(prompt: str, system_messages: str, model_name: str = "sonar-pro", assistant_meta: bool = False):
"""Enhanced Perplexity API call with explicit model tracking."""
client = OpenAI(api_key=os.getenv("PERPLEXITY_API_KEY"), base_url="https://api.perplexity.ai")
system_message = Perplex_Assistant_Prompt
if assistant_meta:
system_message += f"\n\n{system_messages}"
# Minimal parameters for Perplexity
return client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": prompt},
],
stream=False,
).choices[0].message.content
@tool
def Sonar_Web_Search_Tool(arg1: str, arg2: str) -> str:
"""A tool that accesses Perplexity Sonar to search the web when the answer requires or would benefit from a real world web reference.
Args:
arg1: User Prompt
arg2: Details on the desired web search results as system message for sonar web search
"""
try:
sonar_response = tracked_perplexity_call(arg1, arg2)
return sonar_response
except Exception as e:
return f"Error using Sonar Websearch tool '{arg1} {arg2}': {str(e)}"
def Dataset_Creator_Function(dataset_name: str, conversation_data: str) -> str:
"""Creates and pushes a dataset to Hugging Face with the conversation history.
Args:
dataset_name: Name for the dataset (will be prefixed with username)
conversation_data: String representing the conversation data. Can be:
- JSON array of objects (each object becomes a row)
- Pipe-separated values (col1 | col2 | col3) for tabular data
- Plain text (stored in a 'text' column)
Returns:
URL of the created dataset or error message
"""
try:
# Get API key from environment variables
api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY")
if not api_key:
return "Error: No Hugging Face API key found in environment variables"
# Set fixed username for dataset organization
username = "Misfits-and-Machines"
# Initialize Hugging Face API
hf_api = HfApi(token=api_key)
# Sanitize dataset name
safe_dataset_name = dataset_name.replace(" ", "_").lower()
repo_id = f"{username}/{safe_dataset_name}"
print(f"Creating dataset: {repo_id}")
# Check if the repository exists or create it
try:
repo_exists = hf_api.repo_exists(repo_id=repo_id, repo_type="dataset")
if not repo_exists:
hf_api.create_repo(repo_id=repo_id, repo_type="dataset")
print(f"Created repository: {repo_id}")
else:
print(f"Repository already exists: {repo_id}")
except Exception as e:
print(f"Note when checking/creating repository: {str(e)}")
# Check if data is JSON first (preferred format)
is_json = False
try:
# Try to parse as JSON
json_data = json.loads(conversation_data)
# Check if it's an array of objects (preferred structure)
if isinstance(json_data, list) and all(isinstance(item, dict) for item in json_data) and len(json_data) > 0:
print(f"Processing as JSON array with {len(json_data)} items")
# Extract all keys to ensure consistent columns
all_keys = set()
for item in json_data:
all_keys.update(item.keys())
all_keys = sorted(list(all_keys)) # Sort keys for consistent order
print(f"Detected columns: {', '.join(all_keys)}")
# Create dataset with proper structure
rows = []
for item in json_data:
row = {key: item.get(key, "") for key in all_keys}
rows.append(row)
# Convert to pandas DataFrame for better control
import pandas as pd
df = pd.DataFrame(rows)
print(df.head()) # Print first few rows for verification
# Create dataset from pandas DataFrame
from datasets import Dataset
dataset = Dataset.from_pandas(df)
# Push to Hugging Face Hub with the train split name
dataset.push_to_hub(
repo_id=repo_id,
token=api_key,
split="train",
commit_message=f"Upload JSON dataset: {dataset_name}"
)
print(f"Successfully pushed JSON dataset with {len(json_data)} rows")
is_json = True
elif isinstance(json_data, dict):
# Single object - convert to dataset
print("Processing as single JSON object")
import pandas as pd
df = pd.DataFrame([json_data])
dataset = Dataset.from_pandas(df)
# Push to Hugging Face Hub
dataset.push_to_hub(
repo_id=repo_id,
token=api_key,
split="train",
commit_message=f"Upload single JSON object: {dataset_name}"
)
is_json = True
except json.JSONDecodeError:
# Not valid JSON, will try other formats
print("Not valid JSON, checking other formats...")
# If not JSON, check if data is structured with pipe separators
if not is_json:
lines = conversation_data.strip().split('\n')
is_structured = '|' in conversation_data and len(lines) > 1
if is_structured:
print("Detected pipe-separated structured data")
# Parse the header row for column names
header = lines[0].strip()
headers = [col.strip() for col in header.split('|')]
# Create structured data
import pandas as pd
rows = []
# Process each data row
for i, line in enumerate(lines[1:], 1):
if not line.strip():
continue
values = [val.strip() for val in line.split('|')]
# Ensure we have the right number of values
if len(values) == len(headers):
row = {headers[j]: values[j] for j in range(len(headers))}
rows.append(row)
else:
print(f"Warning: Skipping row {i} due to mismatch in column count")
# Create dataset from pandas DataFrame
df = pd.DataFrame(rows)
dataset = Dataset.from_pandas(df)
# Push to Hugging Face Hub
dataset.push_to_hub(
repo_id=repo_id,
token=api_key,
split="train",
commit_message=f"Upload structured data: {dataset_name}"
)
print(f"Successfully pushed structured dataset with {len(rows)} rows")
else:
# Handle as regular text data (single row)
print("Processing as regular text data")
dataset = Dataset.from_dict({"text": [conversation_data]})
# Push to Hugging Face Hub
dataset.push_to_hub(
repo_id=repo_id,
token=api_key,
split="train",
commit_message=f"Upload text data: {dataset_name}"
)
# Generate the URL for the dataset
dataset_url = f"https://huggingface.co/datasets/{repo_id}"
print(f"Dataset successfully pushed to: {dataset_url}")
return f"Successfully created dataset at {dataset_url}"
except Exception as e:
import traceback
error_trace = traceback.format_exc()
print(f"Dataset creation error: {str(e)}\n{error_trace}")
return f"Error creating dataset: {str(e)}\n\nTroubleshooting tips:\n1. Verify your HF_API_KEY is valid\n2. Try a simpler dataset name with only letters and underscores"
@tool
def Dataset_Creator_Tool(dataset_name: str, conversation_data: str) -> str:
"""A tool that posts a new dataset of the current conversation to Hugging Face.
Args:
dataset_name: Name for the dataset (will be prefixed with 'Misfits-and-Machines/')
conversation_data: String content to save to the dataset
Returns:
Link to the created dataset or error message with troubleshooting steps
"""
try:
print(f"Creating dataset '{dataset_name}' with {len(conversation_data)} characters of data")
print(f"Dataset will be created at Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}")
result = Dataset_Creator_Function(dataset_name, conversation_data)
print(f"Dataset creation result: {result}")
return result
except Exception as e:
import traceback
error_trace = traceback.format_exc()
return f"Error using Dataset Creator tool: {str(e)}\n{error_trace}"
def verify_dataset_exists(repo_id: str) -> dict:
"""Verify that a dataset exists and is valid on the Hugging Face Hub.
Args:
repo_id: Full repository ID in format "username/dataset_name"
Returns:
Dict with "exists" boolean and "message" string
"""
try:
# Check if dataset exists using the datasets-server API
api_url = f"https://datasets-server.huggingface.co/is-valid?dataset={repo_id}"
response = requests.get(api_url)
# Parse the response
if response.status_code == 200:
data = response.json()
# If any of these are True, the dataset exists in some form
if data.get("viewer", False) or data.get("preview", False):
return {"exists": True, "message": "Dataset is valid and accessible"}
else:
return {"exists": False, "message": "Dataset exists but may not be fully processed yet"}
else:
return {"exists": False, "message": f"API returned status code {response.status_code}"}
except Exception as e:
return {"exists": False, "message": f"Error verifying dataset: {str(e)}"}
@tool
def Check_Dataset_Validity(dataset_name: str) -> str:
"""A tool that checks if a dataset exists and is valid on Hugging Face.
Args:
dataset_name: Name of the dataset to check (with or without organization prefix)
Returns:
Status message about the dataset validity
"""
try:
# Ensure the dataset name has the organization prefix
if "/" not in dataset_name:
dataset_name = f"Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}"
# Check dataset validity
result = verify_dataset_exists(dataset_name)
if result["exists"]:
return f"Dataset '{dataset_name}' exists and is valid. You can access it at https://huggingface.co/datasets/{dataset_name}"
else:
return f"Dataset '{dataset_name}' could not be verified: {result['message']}. It may still be processing or may not exist."
except Exception as e:
return f"Error checking dataset validity: {str(e)}"
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""A tool that fetches the current local time in a specified timezone.
Args:
timezone: A string representing a valid timezone (e.g., 'America/New_York').
"""
try:
# Create timezone object
tz = pytz.timezone(timezone)
# Get current time in that timezone
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
return f"The current local time in {timezone} is: {local_time}"
except Exception as e:
return f"Error fetching time for timezone '{timezone}': {str(e)}"
final_answer = FinalAnswerTool()
# Remove the huggingface_api_key parameter - it's not supported
model = HfApiModel(
max_tokens=2096,
temperature=0.5,
model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud', # Using the backup endpoint
custom_role_conversions=None
)
# Import tool from Hub
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
with open("prompts.yaml", 'r') as stream:
prompt_templates = yaml.safe_load(stream)
agent = CodeAgent(
model=model,
tools=[
final_answer,
Sonar_Web_Search_Tool,
ddg_search_tool, # Added DuckDuckGo search tool
# google_search_tool, # Added Google search tool
get_current_time_in_timezone,
image_generation_tool,
Dataset_Creator_Tool,
Check_Dataset_Validity
],
max_steps=6,
verbosity_level=1,
grammar=None,
planning_interval=None,
name=None,
description=None,
prompt_templates=prompt_templates
)
# To fix the TypeError in Gradio_UI.py, you would need to modify that file
# For now, we'll just use the agent directly
try:
GradioUI(agent).launch()
except TypeError as e:
if "unsupported operand type(s) for +=" in str(e):
print("Error: Token counting issue in Gradio UI")
print("To fix, edit Gradio_UI.py and change:")
print("total_input_tokens += agent.model.last_input_token_count")
print("To:")
print("total_input_tokens += (agent.model.last_input_token_count or 0)")
else:
raise e