|
|
import os |
|
|
import json |
|
|
from dotenv import load_dotenv |
|
|
from tavily import TavilyClient |
|
|
from cerebras.cloud.sdk import Cerebras |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
class WebSearchTool: |
|
|
"""Search the web using Tavily""" |
|
|
|
|
|
def __init__(self, api_key: str): |
|
|
self.client = TavilyClient(api_key=api_key) |
|
|
|
|
|
def search(self, query: str, max_results: int = 5) -> str: |
|
|
"""Search and return formatted results""" |
|
|
try: |
|
|
response = self.client.search( |
|
|
query=query, |
|
|
search_depth="advanced", |
|
|
max_results=max_results, |
|
|
include_answer=True |
|
|
) |
|
|
|
|
|
|
|
|
output = [] |
|
|
|
|
|
if response.get("answer"): |
|
|
output.append(f"Quick Answer: {response['answer']}\n") |
|
|
|
|
|
output.append("Search Results:") |
|
|
for i, result in enumerate(response.get("results", []), 1): |
|
|
output.append(f"\n{i}. {result['title']}") |
|
|
output.append(f" URL: {result['url']}") |
|
|
output.append(f" {result['content'][:300]}...") |
|
|
|
|
|
return "\n".join(output) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Search error: {str(e)}" |
|
|
|
|
|
class FileReaderTool: |
|
|
"""Read various file formats""" |
|
|
|
|
|
def read(self, file_path: str) -> str: |
|
|
"""Read file and return content as text""" |
|
|
if not os.path.exists(file_path): |
|
|
return f"Error: File not found at {file_path}" |
|
|
|
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
|
|
|
|
try: |
|
|
|
|
|
if ext == '.docx': |
|
|
try: |
|
|
from docx import Document |
|
|
doc = Document(file_path) |
|
|
text = [para.text for para in doc.paragraphs if para.text.strip()] |
|
|
for table in doc.tables: |
|
|
for row in table.rows: |
|
|
cells = [cell.text.strip() for cell in row.cells] |
|
|
text.append(" | ".join(cells)) |
|
|
return "\n".join(text) |
|
|
except ImportError: |
|
|
return "Error: python-docx not installed." |
|
|
|
|
|
|
|
|
elif ext == '.pdf': |
|
|
try: |
|
|
import pdfplumber |
|
|
with pdfplumber.open(file_path) as pdf: |
|
|
text = [page.extract_text() for page in pdf.pages if page.extract_text()] |
|
|
return "\n".join(text) |
|
|
except ImportError: |
|
|
return "Error: pdfplumber not installed." |
|
|
|
|
|
|
|
|
elif ext in ['.xlsx', '.xls', '.csv']: |
|
|
try: |
|
|
import pandas as pd |
|
|
if ext == '.csv': |
|
|
df = pd.read_csv(file_path) |
|
|
else: |
|
|
df = pd.read_excel(file_path) |
|
|
return df.to_string() |
|
|
except ImportError: |
|
|
return "Error: pandas or openpyxl not installed." |
|
|
|
|
|
|
|
|
elif ext in ['.txt', '.md', '.json']: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
return f.read() |
|
|
|
|
|
else: |
|
|
return f"Unsupported file type: {ext}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error reading file: {str(e)}" |
|
|
|
|
|
class ImageAnalysisTool: |
|
|
"""Analyze images using OCR or vision models""" |
|
|
|
|
|
def analyze(self, image_path: str, question: str = "Describe this image") -> str: |
|
|
if not os.path.exists(image_path): |
|
|
return f"Error: Image not found at {image_path}" |
|
|
|
|
|
try: |
|
|
|
|
|
import pytesseract |
|
|
from PIL import Image |
|
|
|
|
|
img = Image.open(image_path) |
|
|
text = pytesseract.image_to_string(img) |
|
|
|
|
|
if text.strip(): |
|
|
return f"Text extracted from image:\n{text}" |
|
|
else: |
|
|
return "No text found in image (OCR returned empty)" |
|
|
|
|
|
except ImportError: |
|
|
return "Error: pytesseract or Pillow not installed." |
|
|
except Exception as e: |
|
|
return f"Error analyzing image: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
class BasicAgent: |
|
|
""" |
|
|
Renamed from SimpleResearchAgent to match app.py requirements. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
print("--- Initializing BasicAgent ---") |
|
|
|
|
|
|
|
|
self.hf_token = os.getenv("HF_TOKEN") |
|
|
self.cerebras_key = os.getenv("CEREBRAS_API_KEY") |
|
|
self.tavily_key = os.getenv("TAVILY_API_KEY") |
|
|
|
|
|
if not self.cerebras_key or not self.tavily_key: |
|
|
raise ValueError("❌ Missing API Keys. Please check Space Settings.") |
|
|
|
|
|
|
|
|
self.llm = Cerebras(api_key=self.cerebras_key) |
|
|
self.model = "gpt-oss-120b" |
|
|
|
|
|
|
|
|
self.web_search = WebSearchTool(self.tavily_key) |
|
|
self.file_reader = FileReaderTool() |
|
|
self.image_analyzer = ImageAnalysisTool() |
|
|
|
|
|
print("✅ BasicAgent initialized successfully.") |
|
|
|
|
|
def _call_llm(self, messages: list, temperature: float = 0.0) -> str: |
|
|
"""Call LLM and return response""" |
|
|
try: |
|
|
response = self.llm.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=messages, |
|
|
temperature=temperature, |
|
|
max_tokens=200 |
|
|
) |
|
|
content = response.choices[0].message.content |
|
|
return content.strip() if content else "Error: Empty response." |
|
|
except Exception as e: |
|
|
return f"LLM Error: {str(e)}" |
|
|
|
|
|
def answer(self, question: str, mode="context") -> str: |
|
|
""" |
|
|
Main method called by app.py. |
|
|
Note: app.py only passes 'question', not 'file_path'. |
|
|
""" |
|
|
print(f"Processing: {question[:50]}...") |
|
|
|
|
|
|
|
|
is_logic = any(keyword in question.lower() for keyword in [ |
|
|
'opposite', 'backwards', 'reversed', 'if you understand', 'python code' |
|
|
]) |
|
|
|
|
|
context_parts = [] |
|
|
|
|
|
|
|
|
if not is_logic: |
|
|
|
|
|
search_results = self.web_search.search(question) |
|
|
context_parts.append(f"Web Search Results:\n{search_results}") |
|
|
else: |
|
|
context_parts.append("Logic/Reasoning Task (No Search Performed)") |
|
|
|
|
|
context = "\n\n".join(context_parts) |
|
|
|
|
|
|
|
|
|
|
|
messages = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": ( |
|
|
"You are a precise data extraction engine. " |
|
|
"Answer with ONLY the exact value requested. " |
|
|
"No explanations, no preambles, no conversational filler. " |
|
|
"Examples: '42', 'John Smith', 'Paris', 'right'. " |
|
|
) |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f"Context:\n{context}\n\nQuestion: {question}\n\nExact Answer:" |
|
|
} |
|
|
] |
|
|
|
|
|
return self._call_llm(messages) |
|
|
|
|
|
def __call__(self, question: str) -> str: |
|
|
return self.answer(question) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
agent = BasicAgent() |
|
|
print(agent("What is the capital of France?")) |