File size: 7,819 Bytes
53cf0f3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
import os
import json
from dotenv import load_dotenv
from tavily import TavilyClient
from cerebras.cloud.sdk import Cerebras
load_dotenv()
# --- HELPER TOOLS ---
class WebSearchTool:
"""Search the web using Tavily"""
def __init__(self, api_key: str):
self.client = TavilyClient(api_key=api_key)
def search(self, query: str, max_results: int = 5) -> str:
"""Search and return formatted results"""
try:
response = self.client.search(
query=query,
search_depth="advanced",
max_results=max_results,
include_answer=True
)
# Format results
output = []
if response.get("answer"):
output.append(f"Quick Answer: {response['answer']}\n")
output.append("Search Results:")
for i, result in enumerate(response.get("results", []), 1):
output.append(f"\n{i}. {result['title']}")
output.append(f" URL: {result['url']}")
output.append(f" {result['content'][:300]}...")
return "\n".join(output)
except Exception as e:
return f"Search error: {str(e)}"
class FileReaderTool:
"""Read various file formats"""
def read(self, file_path: str) -> str:
"""Read file and return content as text"""
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
ext = os.path.splitext(file_path)[1].lower()
try:
# DOCX files
if ext == '.docx':
try:
from docx import Document
doc = Document(file_path)
text = [para.text for para in doc.paragraphs if para.text.strip()]
for table in doc.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells]
text.append(" | ".join(cells))
return "\n".join(text)
except ImportError:
return "Error: python-docx not installed."
# PDF files
elif ext == '.pdf':
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
text = [page.extract_text() for page in pdf.pages if page.extract_text()]
return "\n".join(text)
except ImportError:
return "Error: pdfplumber not installed."
# Excel/CSV files
elif ext in ['.xlsx', '.xls', '.csv']:
try:
import pandas as pd
if ext == '.csv':
df = pd.read_csv(file_path)
else:
df = pd.read_excel(file_path)
return df.to_string()
except ImportError:
return "Error: pandas or openpyxl not installed."
# Text files
elif ext in ['.txt', '.md', '.json']:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
else:
return f"Unsupported file type: {ext}"
except Exception as e:
return f"Error reading file: {str(e)}"
class ImageAnalysisTool:
"""Analyze images using OCR or vision models"""
def analyze(self, image_path: str, question: str = "Describe this image") -> str:
if not os.path.exists(image_path):
return f"Error: Image not found at {image_path}"
try:
# Try OCR first (fast and simple)
import pytesseract
from PIL import Image
img = Image.open(image_path)
text = pytesseract.image_to_string(img)
if text.strip():
return f"Text extracted from image:\n{text}"
else:
return "No text found in image (OCR returned empty)"
except ImportError:
return "Error: pytesseract or Pillow not installed."
except Exception as e:
return f"Error analyzing image: {str(e)}"
# --- MAIN AGENT CLASS ---
class BasicAgent:
"""
Renamed from SimpleResearchAgent to match app.py requirements.
"""
def __init__(self):
print("--- Initializing BasicAgent ---")
# 1. Load Keys internally
self.hf_token = os.getenv("HF_TOKEN")
self.cerebras_key = os.getenv("CEREBRAS_API_KEY")
self.tavily_key = os.getenv("TAVILY_API_KEY")
if not self.cerebras_key or not self.tavily_key:
raise ValueError("❌ Missing API Keys. Please check Space Settings.")
# 2. Initialize LLM
self.llm = Cerebras(api_key=self.cerebras_key)
self.model = "gpt-oss-120b" # Or "llama3.1-8b"
# 3. Initialize tools
self.web_search = WebSearchTool(self.tavily_key)
self.file_reader = FileReaderTool()
self.image_analyzer = ImageAnalysisTool()
print("✅ BasicAgent initialized successfully.")
def _call_llm(self, messages: list, temperature: float = 0.0) -> str:
"""Call LLM and return response"""
try:
response = self.llm.chat.completions.create(
model=self.model,
messages=messages,
temperature=temperature,
max_tokens=200 # Prevent long rambling
)
content = response.choices[0].message.content
return content.strip() if content else "Error: Empty response."
except Exception as e:
return f"LLM Error: {str(e)}"
def answer(self, question: str, mode="context") -> str:
"""
Main method called by app.py.
Note: app.py only passes 'question', not 'file_path'.
"""
print(f"Processing: {question[:50]}...")
# 1. Detect if this is a Logic/Trick question (GAIA style)
is_logic = any(keyword in question.lower() for keyword in [
'opposite', 'backwards', 'reversed', 'if you understand', 'python code'
])
context_parts = []
# 2. Search Web (Skip if it's purely a logic puzzle)
if not is_logic:
# Clean question for search (remove "Answer this..." etc)
search_results = self.web_search.search(question)
context_parts.append(f"Web Search Results:\n{search_results}")
else:
context_parts.append("Logic/Reasoning Task (No Search Performed)")
context = "\n\n".join(context_parts)
# 3. Construct System Prompt
# We use the GAIA-style prompt for strictness
messages = [
{
"role": "system",
"content": (
"You are a precise data extraction engine. "
"Answer with ONLY the exact value requested. "
"No explanations, no preambles, no conversational filler. "
"Examples: '42', 'John Smith', 'Paris', 'right'. "
)
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {question}\n\nExact Answer:"
}
]
return self._call_llm(messages)
def __call__(self, question: str) -> str:
return self.answer(question)
# For local testing
if __name__ == "__main__":
agent = BasicAgent()
print(agent("What is the capital of France?")) |