| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| from langchain_openai import ChatOpenAI |
| from langchain_core.messages import AIMessage, HumanMessage, SystemMessage |
| from langchain_core.tools import tool |
| from langgraph.prebuilt import create_react_agent |
|
|
|
|
| def _make_file_tools(workspace: Path): |
| workspace = workspace.resolve() |
|
|
| @tool |
| def read_file(path: str) -> str: |
| """Read the contents of a file from the workspace.""" |
| full = (workspace / path).resolve() |
| if not str(full).startswith(str(workspace)): |
| return "Error: path outside workspace." |
| if not full.exists(): |
| return f"Error: file not found: {path}" |
| return full.read_text(encoding="utf-8", errors="replace") |
|
|
| @tool |
| def write_file(path: str, content: str) -> str: |
| """Write content to a file in the workspace (creates parent dirs).""" |
| full = (workspace / path).resolve() |
| if not str(full).startswith(str(workspace)): |
| return "Error: path outside workspace." |
| full.parent.mkdir(parents=True, exist_ok=True) |
| full.write_text(content, encoding="utf-8") |
| return f"Written: {path}" |
|
|
| @tool |
| def list_files(path: str = ".") -> str: |
| """List files and directories under a path in the workspace.""" |
| full = (workspace / path).resolve() |
| if not str(full).startswith(str(workspace)): |
| return "Error: path outside workspace." |
| if not full.exists(): |
| return f"Error: path not found: {path}" |
| items = [] |
| for p in sorted(full.iterdir()): |
| tag = "[DIR]" if p.is_dir() else "[FILE]" |
| items.append(f"{tag} {p.relative_to(workspace)}") |
| return "\n".join(items) if items else "(empty)" |
|
|
| @tool |
| def delete_file(path: str) -> str: |
| """Delete a file or empty directory in the workspace.""" |
| full = (workspace / path).resolve() |
| if not str(full).startswith(str(workspace)): |
| return "Error: path outside workspace." |
| if not full.exists(): |
| return f"Error: not found: {path}" |
| if full.is_file(): |
| full.unlink() |
| return f"Deleted file: {path}" |
| if full.is_dir(): |
| full.rmdir() |
| return f"Deleted empty directory: {path}" |
| return f"Error: {path} is neither a file nor empty directory." |
|
|
| return [read_file, write_file, list_files, delete_file] |
|
|
|
|
| def _build_system_message(workspace: Path) -> SystemMessage: |
| return SystemMessage( |
| content=( |
| f"Your workspace is at {workspace}.\n" |
| "You have file tools to read, write, list, and delete files here.\n" |
| "You have built-in web search capabilities through your model.\n\n" |
| "=== LEAN WORKSPACE RULES ===\n" |
| "1. KEEP ONLY HIGH-VALUE FILES: Retain only final artifacts the user would want\n" |
| " (reports, generated scripts, datasets, or files the user explicitly asked for).\n" |
| "2. AUTO-DELETE JUNK: Before finishing your response, use 'delete_file' to remove\n" |
| " all intermediate scratch files, .tmp files, raw HTML dumps, logs, or any file\n" |
| " that is NOT a final deliverable.\n" |
| "3. OVERWRITE — NEVER DUPLICATE: If you need to update a file, overwrite it in place\n" |
| " using 'write_file' with the same path. Never create versioned copies (e.g.,\n" |
| " report_v1.txt, report_v2.txt) unless the user explicitly asks for a new version.\n\n" |
| "When the user asks about their files or folder contents, use the file tools." |
| ) |
| ) |
|
|
|
|
| async def run_agent( |
| api_key: str, |
| model: str, |
| provider: str, |
| workspace: Path, |
| user_message: str, |
| chat_history: List[Dict[str, str]], |
| ) -> Dict[str, Any]: |
| tools = _make_file_tools(workspace) |
|
|
| if provider == "gemini": |
| from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
| llm = ChatGoogleGenerativeAI( |
| model=model or "gemini-2.0-flash", |
| google_api_key=api_key, |
| temperature=0.7, |
| ) |
| else: |
| llm = ChatOpenAI( |
| model=model or "perplexity/llama-3-sonar-large-32k-online", |
| openai_api_key=api_key, |
| base_url="https://openrouter.ai/api/v1", |
| default_headers={ |
| "HTTP-Referer": "https://huggingface.co/spaces/react-agent", |
| "X-Title": "React Agent", |
| }, |
| temperature=0.7, |
| ) |
|
|
| agent = create_react_agent(model=llm, tools=tools) |
|
|
| messages = [_build_system_message(workspace)] |
|
|
| for msg in chat_history: |
| role = msg.get("role", "") |
| content = msg.get("content", "") |
| if role == "user": |
| messages.append(HumanMessage(content=content)) |
| elif role == "assistant": |
| messages.append(AIMessage(content=content)) |
|
|
| messages.append(HumanMessage(content=user_message)) |
|
|
| result = await agent.ainvoke({"messages": messages}) |
|
|
| agent_response = "" |
| for msg in reversed(result.get("messages", [])): |
| if isinstance(msg, AIMessage) and msg.content: |
| agent_response = msg.content |
| break |
|
|
| updated_chat_history = [] |
| for msg in result.get("messages", []): |
| if isinstance(msg, HumanMessage): |
| updated_chat_history.append({"role": "user", "content": msg.content}) |
| elif isinstance(msg, AIMessage): |
| updated_chat_history.append({"role": "assistant", "content": msg.content}) |
|
|
| return { |
| "agent_response": agent_response, |
| "updated_chat_history": updated_chat_history, |
| } |
|
|