File size: 5,678 Bytes
c2df2b9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | from pathlib import Path
from typing import Any, Dict, List, Optional
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
def _make_file_tools(workspace: Path):
workspace = workspace.resolve()
@tool
def read_file(path: str) -> str:
"""Read the contents of a file from the workspace."""
full = (workspace / path).resolve()
if not str(full).startswith(str(workspace)):
return "Error: path outside workspace."
if not full.exists():
return f"Error: file not found: {path}"
return full.read_text(encoding="utf-8", errors="replace")
@tool
def write_file(path: str, content: str) -> str:
"""Write content to a file in the workspace (creates parent dirs)."""
full = (workspace / path).resolve()
if not str(full).startswith(str(workspace)):
return "Error: path outside workspace."
full.parent.mkdir(parents=True, exist_ok=True)
full.write_text(content, encoding="utf-8")
return f"Written: {path}"
@tool
def list_files(path: str = ".") -> str:
"""List files and directories under a path in the workspace."""
full = (workspace / path).resolve()
if not str(full).startswith(str(workspace)):
return "Error: path outside workspace."
if not full.exists():
return f"Error: path not found: {path}"
items = []
for p in sorted(full.iterdir()):
tag = "[DIR]" if p.is_dir() else "[FILE]"
items.append(f"{tag} {p.relative_to(workspace)}")
return "\n".join(items) if items else "(empty)"
@tool
def delete_file(path: str) -> str:
"""Delete a file or empty directory in the workspace."""
full = (workspace / path).resolve()
if not str(full).startswith(str(workspace)):
return "Error: path outside workspace."
if not full.exists():
return f"Error: not found: {path}"
if full.is_file():
full.unlink()
return f"Deleted file: {path}"
if full.is_dir():
full.rmdir()
return f"Deleted empty directory: {path}"
return f"Error: {path} is neither a file nor empty directory."
return [read_file, write_file, list_files, delete_file]
def _build_system_message(workspace: Path) -> SystemMessage:
return SystemMessage(
content=(
f"Your workspace is at {workspace}.\n"
"You have file tools to read, write, list, and delete files here.\n"
"You have built-in web search capabilities through your model.\n\n"
"=== LEAN WORKSPACE RULES ===\n"
"1. KEEP ONLY HIGH-VALUE FILES: Retain only final artifacts the user would want\n"
" (reports, generated scripts, datasets, or files the user explicitly asked for).\n"
"2. AUTO-DELETE JUNK: Before finishing your response, use 'delete_file' to remove\n"
" all intermediate scratch files, .tmp files, raw HTML dumps, logs, or any file\n"
" that is NOT a final deliverable.\n"
"3. OVERWRITE — NEVER DUPLICATE: If you need to update a file, overwrite it in place\n"
" using 'write_file' with the same path. Never create versioned copies (e.g.,\n"
" report_v1.txt, report_v2.txt) unless the user explicitly asks for a new version.\n\n"
"When the user asks about their files or folder contents, use the file tools."
)
)
async def run_agent(
api_key: str,
model: str,
provider: str,
workspace: Path,
user_message: str,
chat_history: List[Dict[str, str]],
) -> Dict[str, Any]:
tools = _make_file_tools(workspace)
if provider == "gemini":
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(
model=model or "gemini-2.0-flash",
google_api_key=api_key,
temperature=0.7,
)
else:
llm = ChatOpenAI(
model=model or "perplexity/llama-3-sonar-large-32k-online",
openai_api_key=api_key,
base_url="https://openrouter.ai/api/v1",
default_headers={
"HTTP-Referer": "https://huggingface.co/spaces/react-agent",
"X-Title": "React Agent",
},
temperature=0.7,
)
agent = create_react_agent(model=llm, tools=tools)
messages = [_build_system_message(workspace)]
for msg in chat_history:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "user":
messages.append(HumanMessage(content=content))
elif role == "assistant":
messages.append(AIMessage(content=content))
messages.append(HumanMessage(content=user_message))
result = await agent.ainvoke({"messages": messages})
agent_response = ""
for msg in reversed(result.get("messages", [])):
if isinstance(msg, AIMessage) and msg.content:
agent_response = msg.content
break
updated_chat_history = []
for msg in result.get("messages", []):
if isinstance(msg, HumanMessage):
updated_chat_history.append({"role": "user", "content": msg.content})
elif isinstance(msg, AIMessage):
updated_chat_history.append({"role": "assistant", "content": msg.content})
return {
"agent_response": agent_response,
"updated_chat_history": updated_chat_history,
}
|