Nancy1906's picture
uuu
5b6cef8 verified
import os
import math
import asyncio
import subprocess
import requests
from io import BytesIO
from bs4 import BeautifulSoup
from pydantic import Field
# ----- LlamaIndex & LangChain Imports -----
from llama_index.core.llms import ChatMessage, LLMMetadata, LLM, CompletionResponse
from llama_index.core.agent import ReActAgent
from llama_index.core.callbacks.llama_debug import LlamaDebugHandler
from llama_index.core.tools import FunctionTool
from llama_index.llms.huggingface import HuggingFaceInferenceAPI
from langchain_community.retrievers import TavilySearchAPIRetriever
# ---------- BASIC SETUP ----------
HEADERS = {"User-Agent": "Mozilla/5.0"}
def check_required_keys() -> None:
missing = [k for k in ("TAVILY_API_KEY", "HUGGINGFACE_TOKEN") if not os.getenv(k)]
if missing:
print(f"⚠️ WARNING: Missing API keys: {', '.join(missing)}")
else:
print("✅ All required API keys are present.")
check_required_keys()
# Monkey-patch requerido por LlamaIndex
ChatMessage.message = property(lambda self: self)
# ---------- HUGGING FACE LLM WRAPPER (Command R+) ----------
class HuggingFaceLLM(LLM):
"""Wrapper para la API de Inferencia de Hugging Face, optimizado para Command R+."""
model_name: str = Field(default="CohereForAI/c4ai-command-r-plus")
temperature: float = Field(default=0.01)
max_new_tokens: int = Field(default=2048) # Aumentado para respuestas más largas
_client: HuggingFaceInferenceAPI = None
class Config:
extra = "allow"
def __init__(self, **kwargs):
super().__init__(**kwargs)
api_key = os.getenv("HUGGINGFACE_TOKEN")
if not api_key:
raise ValueError("HUGGINGFACE_TOKEN no configurado en los secrets del Space")
self._client = HuggingFaceInferenceAPI(model_name=self.model_name, token=api_key)
if self.callback_manager is None:
from llama_index.core.callbacks.base import CallbackManager
self.callback_manager = CallbackManager([])
if not self.callback_manager.handlers:
self.callback_manager.add_handler(LlamaDebugHandler())
@property
def metadata(self) -> LLMMetadata:
return LLMMetadata(
context_window=128000,
num_output=self.max_new_tokens,
is_chat_model=True,
is_function_calling_model=True,
model_name=self.model_name,
)
def chat(self, messages: list[ChatMessage], **kwargs) -> ChatMessage:
prompt = self._client.tokenizer.apply_chat_template(
[{"role": msg.role.value, "content": msg.content} for msg in messages],
tokenize=False, add_generation_prompt=True
)
try:
response = self._client.text_generation(
prompt, max_new_tokens=self.max_new_tokens,
temperature=self.temperature if self.temperature > 0 else 0.01, # Temp no puede ser 0
do_sample=True, top_p=0.95
)
return ChatMessage(role="assistant", content=response)
except Exception as e:
print(f"[ERROR] HuggingFace API call failed: {e}")
return ChatMessage(role="assistant", content=f"Error: API call failed. Reason: {e}")
async def achat(self, messages: list[ChatMessage], **kwargs) -> ChatMessage:
return await asyncio.to_thread(self.chat, messages, **kwargs)
def complete(self, prompt: str, **kwargs) -> CompletionResponse:
raise NotImplementedError("Use .chat() for this model.")
# ---------- TOOLING ----------
def _pd_safe_import():
try:
import pandas as pd
return pd
except ModuleNotFoundError:
return None
def web_search(query: str, num_results: int = 5) -> str:
"""Tavily search -> concatenated, citation‑ready snippet list (includes URL)."""
try:
retriever = TavilySearchAPIRetriever(api_key=os.getenv("TAVILY_API_KEY"), k=num_results)
results = retriever.invoke(query)
formatted = [f"Result {i}:\nTitle: {doc.metadata.get('title','')}\nURL: {doc.metadata.get('source','')}\nContent: {doc.page_content}\n" for i, doc in enumerate(results, 1)]
return "\n\n".join(formatted)
except Exception as exc:
return f"Error web_search: {exc}"
def scrape_url_text(url: str) -> str:
"""Downloads a webpage and returns cleaned visible text."""
try:
resp = requests.get(url, headers=HEADERS, timeout=20)
resp.raise_for_status()
if "Just a moment" in resp.text and "cloudflare" in resp.text.lower():
return "Error: The site is protected by Cloudflare and cannot be scraped directly. Use information from web_search instead."
soup = BeautifulSoup(resp.text, "html.parser")
for tag in soup(["script", "style", "noscript", "header", "footer", "nav"]):
tag.decompose()
text = "\n".join(t.strip() for t in soup.get_text("\n").splitlines() if t.strip())
return text[:8000]
except Exception as exc:
return f"Error scrape_url_text: {exc}"
def analyze_markdown_table(table_md: str, question: str) -> str:
"""Check commutativity or return CSV. Requires pandas lazily."""
pd = _pd_safe_import()
if pd is None: return "Error: pandas library is required for this tool but not installed."
try:
clean = [ln for ln in table_md.strip().splitlines() if ln.strip() and not ln.lstrip().startswith("|---")]
rows = [[c.strip() for c in ln.strip("|").split("|")] for ln in clean]
if len(rows) < 2: return "Error: malformed markdown table"
df = pd.DataFrame(rows[1:], columns=rows[0])
if "conmut" in question.lower():
offenders: set[str] = set()
header, cols = df.columns[0], df.columns[1:]
for x in cols:
for y in cols:
try:
val_xy = df.loc[df[header] == x, y].iat[0]
val_yx = df.loc[df[header] == y, x].iat[0]
if val_xy != val_yx: offenders.update([x, y])
except (IndexError, KeyError): continue
return ", ".join(sorted(offenders)) or "Conmutativa"
return df.to_csv(index=False)
except Exception as exc:
return f"Error analyze_markdown_table: {exc}"
def execute_code(code: str) -> str:
"""Runs short python code in a sandboxed subprocess."""
try:
res = subprocess.run(["python", "-S", "-c", code], capture_output=True, text=True, timeout=10)
if res.returncode == 0:
output = res.stdout.strip()
return f"Output: {output if output else '(No output)'}"
return f"Error: {res.stderr.strip()}"
except Exception as exc:
return f"Error execute_code: {exc}"
# ... (otras herramientas como reverse_text, classify_botanical_foods, etc. van aquí, sin cambios) ...
def reverse_text(text: str) -> str: return text[::-1]
# ---------- TOOL DEFINITIONS & PROMPT ----------
tool_defs = [
(web_search, "web_search", "Searches the web via Tavily."),
(scrape_url_text, "scrape_url_text", "Fetch any URL and return visible text."),
(analyze_markdown_table, "analyze_markdown_table", "Analyze a markdown table."),
(execute_code, "execute_code", "Run short python snippets securely."),
(reverse_text, "reverse_text", "Reverse a text string."),
(lambda _: "I cannot answer with the available tools.", "no_tool_solution", "Fallback answer when stuck."),
]
TOOLS = [FunctionTool.from_defaults(fn=fn, name=name, description=desc) for fn, name, desc in tool_defs]
SYSTEM_PROMPT = f"""
You are Alfred, a ReAct agent. Your goal is to answer questions accurately. Follow these rules STRICTLY.
**OPERATING PROCEDURE:**
1. **TRIAGE:** First, analyze the question. If it involves a local file (image, audio, Excel) or multimedia, IMMEDIATELY use `no_tool_solution`.
2. **INFORMATION GATHERING:** For all other questions, your FIRST step is ALWAYS `web_search`.
3. **ANALYZE SNIPPET:** After `web_search`, CAREFULLY read the `Content:` snippet of each result. If the answer is clearly present, answer immediately. DO NOT use another tool if you already have the information.
4. **DEEP DIVE:** Only if the snippet is incomplete, use `scrape_url_text` on the most promising URL. If `scrape_url_text` fails (e.g., Cloudflare error), go back to the text from `web_search` or give up.
5. **FINAL ANSWER:** Your final response MUST be ONLY the `Observation:` from your last successful tool call, or the phrase "I cannot answer with the available tools."
"""
# ---------- AGENT CREATION & EXECUTION ----------
def create_fresh_agent():
"""Creates a new, clean agent instance to prevent state contamination."""
llm = HuggingFaceLLM()
return ReActAgent.from_tools(
tools=TOOLS, llm=llm, system_prompt=SYSTEM_PROMPT, verbose=False,
max_iterations=20, handle_parsing_errors=True
)
def _extract_observation(raw: str) -> str:
"""Extracts the LAST observation from the ReAct agent's reasoning dump."""
if "Observation:" in raw:
segment = raw.rsplit("Observation:", 1)[-1]
if "Final Answer:" in segment:
segment = segment.split("Final Answer:", 1)[0]
return segment.strip()
return raw.strip()
def basic_agent_response(question: str) -> str:
"""Public entry point: creates a fresh agent and runs one query."""
try:
print(f"[DEBUG] ➜ Question: {question}")
agent = create_fresh_agent()
raw_resp = agent.query(question)
text_response = str(raw_resp.response if hasattr(raw_resp, "response") else raw_resp)
cleaned = _extract_observation(text_response)
return cleaned or "I cannot answer with the available tools."
except Exception as exc:
print(f"[ERROR] Agent execution failed: {exc}")
return "I cannot answer with the available tools."