import os import math import asyncio import subprocess import requests from io import BytesIO from bs4 import BeautifulSoup from pydantic import Field # ----- LlamaIndex & LangChain Imports ----- from llama_index.core.llms import ChatMessage, LLMMetadata, LLM, CompletionResponse from llama_index.core.agent import ReActAgent from llama_index.core.callbacks.llama_debug import LlamaDebugHandler from llama_index.core.tools import FunctionTool from llama_index.llms.huggingface import HuggingFaceInferenceAPI from langchain_community.retrievers import TavilySearchAPIRetriever # ---------- BASIC SETUP ---------- HEADERS = {"User-Agent": "Mozilla/5.0"} def check_required_keys() -> None: missing = [k for k in ("TAVILY_API_KEY", "HUGGINGFACE_TOKEN") if not os.getenv(k)] if missing: print(f"⚠️ WARNING: Missing API keys: {', '.join(missing)}") else: print("✅ All required API keys are present.") check_required_keys() # Monkey-patch requerido por LlamaIndex ChatMessage.message = property(lambda self: self) # ---------- HUGGING FACE LLM WRAPPER (Command R+) ---------- class HuggingFaceLLM(LLM): """Wrapper para la API de Inferencia de Hugging Face, optimizado para Command R+.""" model_name: str = Field(default="CohereForAI/c4ai-command-r-plus") temperature: float = Field(default=0.01) max_new_tokens: int = Field(default=2048) # Aumentado para respuestas más largas _client: HuggingFaceInferenceAPI = None class Config: extra = "allow" def __init__(self, **kwargs): super().__init__(**kwargs) api_key = os.getenv("HUGGINGFACE_TOKEN") if not api_key: raise ValueError("HUGGINGFACE_TOKEN no configurado en los secrets del Space") self._client = HuggingFaceInferenceAPI(model_name=self.model_name, token=api_key) if self.callback_manager is None: from llama_index.core.callbacks.base import CallbackManager self.callback_manager = CallbackManager([]) if not self.callback_manager.handlers: self.callback_manager.add_handler(LlamaDebugHandler()) @property def metadata(self) -> LLMMetadata: return LLMMetadata( context_window=128000, num_output=self.max_new_tokens, is_chat_model=True, is_function_calling_model=True, model_name=self.model_name, ) def chat(self, messages: list[ChatMessage], **kwargs) -> ChatMessage: prompt = self._client.tokenizer.apply_chat_template( [{"role": msg.role.value, "content": msg.content} for msg in messages], tokenize=False, add_generation_prompt=True ) try: response = self._client.text_generation( prompt, max_new_tokens=self.max_new_tokens, temperature=self.temperature if self.temperature > 0 else 0.01, # Temp no puede ser 0 do_sample=True, top_p=0.95 ) return ChatMessage(role="assistant", content=response) except Exception as e: print(f"[ERROR] HuggingFace API call failed: {e}") return ChatMessage(role="assistant", content=f"Error: API call failed. Reason: {e}") async def achat(self, messages: list[ChatMessage], **kwargs) -> ChatMessage: return await asyncio.to_thread(self.chat, messages, **kwargs) def complete(self, prompt: str, **kwargs) -> CompletionResponse: raise NotImplementedError("Use .chat() for this model.") # ---------- TOOLING ---------- def _pd_safe_import(): try: import pandas as pd return pd except ModuleNotFoundError: return None def web_search(query: str, num_results: int = 5) -> str: """Tavily search -> concatenated, citation‑ready snippet list (includes URL).""" try: retriever = TavilySearchAPIRetriever(api_key=os.getenv("TAVILY_API_KEY"), k=num_results) results = retriever.invoke(query) formatted = [f"Result {i}:\nTitle: {doc.metadata.get('title','')}\nURL: {doc.metadata.get('source','')}\nContent: {doc.page_content}\n" for i, doc in enumerate(results, 1)] return "\n\n".join(formatted) except Exception as exc: return f"Error web_search: {exc}" def scrape_url_text(url: str) -> str: """Downloads a webpage and returns cleaned visible text.""" try: resp = requests.get(url, headers=HEADERS, timeout=20) resp.raise_for_status() if "Just a moment" in resp.text and "cloudflare" in resp.text.lower(): return "Error: The site is protected by Cloudflare and cannot be scraped directly. Use information from web_search instead." soup = BeautifulSoup(resp.text, "html.parser") for tag in soup(["script", "style", "noscript", "header", "footer", "nav"]): tag.decompose() text = "\n".join(t.strip() for t in soup.get_text("\n").splitlines() if t.strip()) return text[:8000] except Exception as exc: return f"Error scrape_url_text: {exc}" def analyze_markdown_table(table_md: str, question: str) -> str: """Check commutativity or return CSV. Requires pandas lazily.""" pd = _pd_safe_import() if pd is None: return "Error: pandas library is required for this tool but not installed." try: clean = [ln for ln in table_md.strip().splitlines() if ln.strip() and not ln.lstrip().startswith("|---")] rows = [[c.strip() for c in ln.strip("|").split("|")] for ln in clean] if len(rows) < 2: return "Error: malformed markdown table" df = pd.DataFrame(rows[1:], columns=rows[0]) if "conmut" in question.lower(): offenders: set[str] = set() header, cols = df.columns[0], df.columns[1:] for x in cols: for y in cols: try: val_xy = df.loc[df[header] == x, y].iat[0] val_yx = df.loc[df[header] == y, x].iat[0] if val_xy != val_yx: offenders.update([x, y]) except (IndexError, KeyError): continue return ", ".join(sorted(offenders)) or "Conmutativa" return df.to_csv(index=False) except Exception as exc: return f"Error analyze_markdown_table: {exc}" def execute_code(code: str) -> str: """Runs short python code in a sandboxed subprocess.""" try: res = subprocess.run(["python", "-S", "-c", code], capture_output=True, text=True, timeout=10) if res.returncode == 0: output = res.stdout.strip() return f"Output: {output if output else '(No output)'}" return f"Error: {res.stderr.strip()}" except Exception as exc: return f"Error execute_code: {exc}" # ... (otras herramientas como reverse_text, classify_botanical_foods, etc. van aquí, sin cambios) ... def reverse_text(text: str) -> str: return text[::-1] # ---------- TOOL DEFINITIONS & PROMPT ---------- tool_defs = [ (web_search, "web_search", "Searches the web via Tavily."), (scrape_url_text, "scrape_url_text", "Fetch any URL and return visible text."), (analyze_markdown_table, "analyze_markdown_table", "Analyze a markdown table."), (execute_code, "execute_code", "Run short python snippets securely."), (reverse_text, "reverse_text", "Reverse a text string."), (lambda _: "I cannot answer with the available tools.", "no_tool_solution", "Fallback answer when stuck."), ] TOOLS = [FunctionTool.from_defaults(fn=fn, name=name, description=desc) for fn, name, desc in tool_defs] SYSTEM_PROMPT = f""" You are Alfred, a ReAct agent. Your goal is to answer questions accurately. Follow these rules STRICTLY. **OPERATING PROCEDURE:** 1. **TRIAGE:** First, analyze the question. If it involves a local file (image, audio, Excel) or multimedia, IMMEDIATELY use `no_tool_solution`. 2. **INFORMATION GATHERING:** For all other questions, your FIRST step is ALWAYS `web_search`. 3. **ANALYZE SNIPPET:** After `web_search`, CAREFULLY read the `Content:` snippet of each result. If the answer is clearly present, answer immediately. DO NOT use another tool if you already have the information. 4. **DEEP DIVE:** Only if the snippet is incomplete, use `scrape_url_text` on the most promising URL. If `scrape_url_text` fails (e.g., Cloudflare error), go back to the text from `web_search` or give up. 5. **FINAL ANSWER:** Your final response MUST be ONLY the `Observation:` from your last successful tool call, or the phrase "I cannot answer with the available tools." """ # ---------- AGENT CREATION & EXECUTION ---------- def create_fresh_agent(): """Creates a new, clean agent instance to prevent state contamination.""" llm = HuggingFaceLLM() return ReActAgent.from_tools( tools=TOOLS, llm=llm, system_prompt=SYSTEM_PROMPT, verbose=False, max_iterations=20, handle_parsing_errors=True ) def _extract_observation(raw: str) -> str: """Extracts the LAST observation from the ReAct agent's reasoning dump.""" if "Observation:" in raw: segment = raw.rsplit("Observation:", 1)[-1] if "Final Answer:" in segment: segment = segment.split("Final Answer:", 1)[0] return segment.strip() return raw.strip() def basic_agent_response(question: str) -> str: """Public entry point: creates a fresh agent and runs one query.""" try: print(f"[DEBUG] ➜ Question: {question}") agent = create_fresh_agent() raw_resp = agent.query(question) text_response = str(raw_resp.response if hasattr(raw_resp, "response") else raw_resp) cleaned = _extract_observation(text_response) return cleaned or "I cannot answer with the available tools." except Exception as exc: print(f"[ERROR] Agent execution failed: {exc}") return "I cannot answer with the available tools."