File size: 5,376 Bytes
8b2d981
 
 
de4c73e
8b2d981
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de4c73e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7abecc6
 
8b2d981
 
 
 
 
 
 
 
 
b90e078
8b2d981
 
 
 
9f54298
8b2d981
 
 
9f54298
 
de4c73e
9f54298
7abecc6
de4c73e
8b2d981
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7abecc6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from pathlib import Path
from typing import Any, Dict, List
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent

def _make_file_tools(workspace: Path):
    workspace = workspace.resolve()

    @tool
    def read_file(path: str) -> str:
        """Read the contents of a file from the workspace."""
        full = (workspace / path).resolve()
        if not str(full).startswith(str(workspace)):
            return "Error: path outside workspace."
        if not full.exists():
            return f"Error: file not found: {path}"
        return full.read_text(encoding="utf-8", errors="replace")

    @tool
    def write_file(path: str, content: str) -> str:
        """Write content to a file in the workspace (creates parent dirs)."""
        full = (workspace / path).resolve()
        if not str(full).startswith(str(workspace)):
            return "Error: path outside workspace."
        full.parent.mkdir(parents=True, exist_ok=True)
        full.write_text(content, encoding="utf-8")
        return f"Written: {path}"

    @tool
    def list_files(path: str = ".") -> str:
        """List files and directories under a path in the workspace."""
        full = (workspace / path).resolve()
        if not str(full).startswith(str(workspace)):
            return "Error: path outside workspace."
        if not full.exists():
            return f"Error: path not found: {path}"
        items = []
        for p in sorted(full.iterdir()):
            tag = "[DIR]" if p.is_dir() else "[FILE]"
            items.append(f"{tag} {p.relative_to(workspace)}")
        return "\n".join(items) if items else "(empty)"

    @tool
    def delete_file(path: str) -> str:
        """Delete a file or empty directory in the workspace."""
        full = (workspace / path).resolve()
        if not str(full).startswith(str(workspace)):
            return "Error: path outside workspace."
        if not full.exists():
            return f"Error: not found: {path}"
        if full.is_file():
            full.unlink()
            return f"Deleted file: {path}"
        if full.is_dir():
            full.rmdir()
            return f"Deleted empty directory: {path}"
        return f"Error: {path} is neither a file nor empty directory."

    return [read_file, write_file, list_files, delete_file]

def _build_system_message(workspace: Path) -> SystemMessage:
    return SystemMessage(
        content=(
            f"You are a helpful AI assistant. Your workspace is at {workspace}.\n"
            "You have file tools to read, write, list, and delete files in this workspace.\n"
            "You can search and process information safely using your integrated knowledge.\n\n"
            "=== LEAN WORKSPACE RULES ===\n"
            "1. KEEP ONLY HIGH-VALUE FILES: Retain only final artifacts the user would want\n"
            "   (reports, generated scripts, datasets, or files the user explicitly asked for).\n"
            "2. AUTO-DELETE JUNK: Before finishing your response, use 'delete_file' to remove\n"
            "   all intermediate scratch files, .tmp files, raw HTML dumps, logs, or any file\n"
            "   that is NOT a final deliverable.\n"
            "3. OVERWRITE — NEVER DUPLICATE: If you need to update a file, overwrite it in place\n"
            "   using 'write_file' with the same path. Never create versioned copies (e.g.,\n"
            "   report_v1.txt, report_v2.txt) unless the user explicitly asks for a new version.\n\n"
            "When the user asks about their files or folder contents, use the file tools."
        )
    )

async def run_agent(
    openrouter_key: str,
    workspace: Path,
    user_message: str,
    chat_history: List[Dict[str, str]],
) -> Dict[str, Any]:
    tools = _make_file_tools(workspace)

    llm = ChatOpenAI(
        model="openai/gpt-oss-20b:free",
        openai_api_key=openrouter_key,
        base_url="https://openrouter.ai/api/v1",
        default_headers={
            "HTTP-Referer": "https://huggingface.co/spaces/react-agent",
            "X-Title": "React Agent Console",
        },
    )

    agent = create_react_agent(
        model=llm,
        tools=tools
    )

    messages = [_build_system_message(workspace)]

    for msg in chat_history:
        role = msg.get("role", "")
        content = msg.get("content", "")
        if role == "user":
            messages.append(HumanMessage(content=content))
        elif role == "assistant":
            messages.append(AIMessage(content=content))

    messages.append(HumanMessage(content=user_message))

    result = await agent.ainvoke({"messages": messages})

    agent_response = ""
    for msg in reversed(result.get("messages", [])):
        if isinstance(msg, AIMessage) and msg.content:
            agent_response = msg.content
            break

    updated_chat_history = []
    for msg in result.get("messages", []):
        if isinstance(msg, HumanMessage):
            updated_chat_history.append({"role": "user", "content": msg.content})
        elif isinstance(msg, AIMessage):
            updated_chat_history.append({"role": "assistant", "content": msg.content})

    return {
        "agent_response": agent_response,
        "updated_chat_history": updated_chat_history,
    }