RobotPai / agent.py
panos-span's picture
Update agent.py
daba587 verified
"""
Modified agent.py - Fixed with Hugging Face models instead of OpenAI
Fixes LangSmith authentication and missing PostgreSQL function issues
"""
import os
import logging
import warnings
from typing import List, Dict, Any, Optional, Union
import pandas as pd
from supabase import create_client, Client
# Suppress LangSmith warnings to avoid authentication errors
warnings.filterwarnings("ignore", category=UserWarning, module="langsmith")
logging.getLogger("langsmith").setLevel(logging.ERROR)
# Disable LangSmith tracing to avoid 401 errors
os.environ["LANGCHAIN_TRACING_V2"] = "false"
try:
from langchain.agents import AgentType, AgentExecutor, create_react_agent
from langchain.tools import BaseTool, tool
from langchain.memory import ConversationBufferMemory
from langchain_community.llms import HuggingFacePipeline
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import SupabaseVectorStore
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_core.prompts import ChatPromptTemplate
# Hugging Face specific imports
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from sentence_transformers import SentenceTransformer
import torch
except ImportError as e:
print(f"Import error: {e}")
print("Please install required packages: pip install transformers sentence-transformers torch")
class RobotPaiAgent:
"""
RobotPai Agent using Hugging Face models instead of OpenAI
Fixes authentication and database function issues
"""
def __init__(self, model_name: str = "microsoft/DialoGPT-medium"):
print("πŸ€– Initializing RobotPai Agent with Hugging Face models...")
self.model_name = model_name
self.setup_environment()
self.setup_supabase()
self.setup_models()
self.setup_vectorstore()
self.setup_tools()
self.setup_agent()
def setup_environment(self):
"""Setup environment variables with error handling"""
# Disable LangSmith to avoid authentication errors
os.environ["LANGCHAIN_TRACING_V2"] = "false"
# Required environment variables
self.supabase_url = os.getenv("SUPABASE_URL")
self.supabase_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
if not all([self.supabase_url, self.supabase_key]):
raise ValueError("Missing required environment variables: SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY")
print("βœ… Environment configured")
def setup_supabase(self):
"""Setup Supabase client and ensure database setup"""
try:
self.supabase_client: Client = create_client(self.supabase_url, self.supabase_key)
self.ensure_database_setup()
print("βœ… Supabase client initialized")
except Exception as e:
print(f"⚠️ Supabase setup failed: {e}")
self.supabase_client = None
def ensure_database_setup(self):
"""Ensure the database has required tables and functions"""
try:
# Check if documents table exists
result = self.supabase_client.table('documents').select('id').limit(1).execute()
print("βœ… Documents table exists")
except Exception as e:
print(f"⚠️ Database setup needed: {e}")
print("Please run the SQL setup in your Supabase dashboard:")
print("""
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Create documents table
CREATE TABLE IF NOT EXISTS documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
embedding VECTOR(384) -- Dimension for sentence-transformers
);
-- Create match_documents_langchain function
CREATE OR REPLACE FUNCTION match_documents_langchain(
query_embedding VECTOR(384),
match_count INT DEFAULT 10,
filter JSONB DEFAULT '{}'
)
RETURNS TABLE (
id UUID,
content TEXT,
metadata JSONB,
similarity FLOAT
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
SELECT
documents.id,
documents.content,
documents.metadata,
1 - (documents.embedding <=> query_embedding) AS similarity
FROM documents
WHERE documents.metadata @> filter
ORDER BY documents.embedding <=> query_embedding
LIMIT match_count;
END;
$$;
""")
def setup_models(self):
"""Setup Hugging Face models for LLM and embeddings"""
try:
# Setup embeddings using sentence-transformers (faster and smaller)
print("πŸ”„ Loading embedding model...")
self.embeddings = HuggingFaceEmbeddings(
model_name="all-MiniLM-L6-v2", # 384 dimensions, fast and good quality
model_kwargs={'device': 'cpu'}, # Use CPU for compatibility
encode_kwargs={'normalize_embeddings': True}
)
print("βœ… Embeddings model loaded")
# Setup LLM using a lightweight model suitable for HF Spaces
print("πŸ”„ Loading language model...")
# Use a smaller, faster model for Hugging Face Spaces
model_id = "microsoft/DialoGPT-small" # Smaller model for faster inference
try:
# Create a text generation pipeline
self.llm_pipeline = pipeline(
"text-generation",
model=model_id,
tokenizer=model_id,
max_length=512,
temperature=0.7,
do_sample=True,
device_map="auto" if torch.cuda.is_available() else None,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
# Wrap in LangChain HuggingFacePipeline
self.llm = HuggingFacePipeline(
pipeline=self.llm_pipeline,
model_kwargs={"temperature": 0.7, "max_length": 512}
)
print(f"βœ… Language model loaded: {model_id}")
except Exception as e:
print(f"⚠️ Failed to load {model_id}: {e}")
# Fallback to a simple text completion
print("πŸ”„ Using fallback model...")
self.llm = self._create_fallback_llm()
except Exception as e:
print(f"❌ Model setup failed: {e}")
# Create minimal fallback
self.embeddings = None
self.llm = self._create_fallback_llm()
def _create_fallback_llm(self):
"""Create a simple fallback LLM for when models fail to load"""
class SimpleLLM:
def __call__(self, prompt: str) -> str:
return f"I'm a simple AI assistant. You asked: {prompt[:100]}... I would help you search documents and analyze data, but I need proper model setup."
def invoke(self, prompt: str) -> str:
return self.__call__(prompt)
return SimpleLLM()
def setup_vectorstore(self):
"""Setup vector store with proper error handling"""
if not self.supabase_client or not self.embeddings:
print("⚠️ Skipping vector store setup - missing dependencies")
self.vectorstore = None
return
try:
# Initialize vector store with correct function name
self.vectorstore = SupabaseVectorStore(
client=self.supabase_client,
embedding=self.embeddings,
table_name="documents",
query_name="match_documents_langchain" # Use the function we created
)
print("βœ… Vector store initialized")
except Exception as e:
print(f"⚠️ Vector store setup failed: {e}")
self.vectorstore = None
def setup_tools(self):
"""Setup tools for the agent"""
self.tools = []
# Document Search Tool
@tool
def search_documents(query: str) -> str:
"""Search for relevant documents in the knowledge base."""
if not self.vectorstore:
return "Vector store not available. Please check database setup."
try:
docs = self.vectorstore.similarity_search(query, k=3)
if docs:
results = []
for i, doc in enumerate(docs, 1):
content = doc.page_content[:300] + "..." if len(doc.page_content) > 300 else doc.page_content
results.append(f"Document {i}: {content}")
return "\n\n".join(results)
else:
return "No relevant documents found."
except Exception as e:
return f"Error searching documents: {str(e)}"
# CSV Analysis Tool
@tool
def analyze_csv_data(query: str) -> str:
"""Analyze CSV data and answer questions about it."""
try:
# Load the CSV file if it exists
if os.path.exists("supabase_docs.csv"):
df = pd.read_csv("supabase_docs.csv")
# Basic analysis based on query
if "rows" in query.lower() or "count" in query.lower():
return f"The CSV has {len(df)} rows and {len(df.columns)} columns."
elif "columns" in query.lower():
return f"Columns: {', '.join(df.columns.tolist())}"
elif "head" in query.lower() or "first" in query.lower():
return f"First 5 rows:\n{df.head().to_string()}"
else:
return f"CSV loaded with {len(df)} rows. Available columns: {', '.join(df.columns.tolist())}"
else:
return "CSV file not found. Please upload supabase_docs.csv"
except Exception as e:
return f"Error analyzing CSV: {str(e)}"
# General Q&A Tool
@tool
def answer_question(question: str) -> str:
"""Answer general questions using the language model."""
try:
# Simple prompt for the question
prompt = f"Question: {question}\nAnswer:"
response = self.llm.invoke(prompt)
return response if isinstance(response, str) else str(response)
except Exception as e:
return f"I'm unable to process that question right now. Error: {str(e)}"
self.tools = [search_documents, analyze_csv_data, answer_question]
print(f"βœ… {len(self.tools)} tools initialized")
def setup_agent(self):
"""Setup the agent with React framework"""
try:
# Create a simple prompt template
template = """Answer the following questions as best you can. You have access to the following tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought: {agent_scratchpad}"""
prompt = PromptTemplate.from_template(template)
# Create a simple agent using React pattern
if hasattr(self.llm, 'invoke'):
agent = create_react_agent(self.llm, self.tools, prompt)
self.agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True,
max_iterations=3,
handle_parsing_errors=True,
return_intermediate_steps=True
)
else:
# Fallback for simple LLM
self.agent_executor = self._create_simple_executor()
print("βœ… Agent initialized successfully")
except Exception as e:
print(f"⚠️ Agent setup failed: {e}")
self.agent_executor = self._create_simple_executor()
def _create_simple_executor(self):
"""Create a simple executor when full agent setup fails"""
class SimpleExecutor:
def __init__(self, tools, llm):
self.tools = {tool.name: tool for tool in tools}
self.llm = llm
def invoke(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
query = inputs.get("input", "")
# Simple routing logic
if "document" in query.lower() or "search" in query.lower():
if "search_documents" in self.tools:
result = self.tools["search_documents"].invoke(query)
return {"output": result}
elif "csv" in query.lower() or "data" in query.lower():
if "analyze_csv_data" in self.tools:
result = self.tools["analyze_csv_data"].invoke(query)
return {"output": result}
else:
if "answer_question" in self.tools:
result = self.tools["answer_question"].invoke(query)
return {"output": result}
return {"output": f"I can help you with document search, CSV analysis, or general questions. You asked: {query}"}
return SimpleExecutor(self.tools, self.llm)
def add_documents(self, texts: List[str], metadatas: List[Dict] = None):
"""Add documents to the vector store"""
if not self.vectorstore:
print("⚠️ Vector store not available")
return False
try:
# Split long texts into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # Smaller chunks for better performance
chunk_overlap=100
)
all_texts = []
all_metadatas = []
for i, text in enumerate(texts):
chunks = text_splitter.split_text(text)
all_texts.extend(chunks)
# Add metadata for each chunk
base_metadata = metadatas[i] if metadatas and i < len(metadatas) else {}
for j, chunk in enumerate(chunks):
chunk_metadata = base_metadata.copy()
chunk_metadata.update({"chunk_id": j, "source_doc": i})
all_metadatas.append(chunk_metadata)
# Add to vector store
ids = self.vectorstore.add_texts(all_texts, all_metadatas)
print(f"βœ… Added {len(ids)} document chunks to vector store")
return True
except Exception as e:
print(f"❌ Error adding documents: {e}")
return False
def process_query(self, query: str) -> str:
"""Process a user query through the agent"""
try:
if self.agent_executor:
response = self.agent_executor.invoke({"input": query})
return response.get("output", "Sorry, I couldn't process your query.")
else:
return "Agent not properly initialized. Please check your setup."
except Exception as e:
return f"Error processing query: {str(e)}"
def load_csv_for_analysis(self, file_path: str = "supabase_docs.csv") -> bool:
"""Load CSV data for analysis"""
try:
if not os.path.exists(file_path):
print(f"⚠️ CSV file not found: {file_path}")
return False
df = pd.read_csv(file_path)
print(f"βœ… Loaded CSV with {len(df)} rows and {len(df.columns)} columns")
# Optionally add CSV content to vector store for searching
if self.vectorstore:
documents = []
for _, row in df.head(100).iterrows(): # Limit to first 100 rows
content = " | ".join([f"{col}: {val}" for col, val in row.items() if pd.notna(val)])
documents.append(content)
metadatas = [{"source": "csv_data", "row_id": i} for i in range(len(documents))]
self.add_documents(documents, metadatas)
print("βœ… CSV data added to vector store for searching")
return True
except Exception as e:
print(f"❌ Error loading CSV: {e}")
return False
# Utility function for direct usage
def create_agent():
"""Create and return a RobotPai agent instance"""
try:
agent = RobotPaiAgent()
return agent
except Exception as e:
print(f"Failed to create agent: {e}")
return None
# For backward compatibility
def get_agent():
"""Get agent instance - for backward compatibility"""
return create_agent()