File size: 3,705 Bytes
a66d4bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import asyncio
import logging
from pathlib import Path
from dotenv import load_dotenv, find_dotenv
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel
from agents.mcp import MCPServerStdio
from agents.model_settings import ModelSettings

logger = logging.getLogger(__name__)


class FileSystemAgent:
    """
    File System Agent using OpenAI Agent SDK with MCP Filesystem Server.
    Uses MCPServerStdio to spawn the filesystem MCP server via npx,
    with cache_tools_list=True so tools are fetched only once per session.
    """

    def __init__(
        self,
        allowed_dirs: list[str] | None = None,
        model: str = "arcee-ai/trinity-large-preview:free",
    ):
        self.allowed_dirs = allowed_dirs or [str(Path(__file__).parent.parent.parent)]
        self.model = model
        self._mcp_server: MCPServerStdio | None = None
        self._agent: Agent | None = None
        self._client = AsyncOpenAI(
            api_key=os.getenv("OPENROUTER_API_KEY"),
            base_url="https://openrouter.ai/api/v1",
            timeout=30.0,
        )

    def _create_mcp_server(self) -> MCPServerStdio:
        """Create the Filesystem MCP server (stdio transport)."""
        return MCPServerStdio(
            name="Filesystem Server",
            params={
                "command": "npx",
                "args": [
                    "-y",
                    "@modelcontextprotocol/server-filesystem",
                    *self.allowed_dirs,
                ],
            },
            # Cache tools list so list_tools() is called only once
            cache_tools_list=True,
            # npx can be slow to download/start; increase from default 5s
            client_session_timeout_seconds=45,
        )

    def _create_agent(self, mcp_server: MCPServerStdio) -> Agent:
        """Create the AI agent wired to the filesystem MCP server."""
        return Agent(
            name="File System Agent",
            instructions=(
                "You are a filesystem assistant. You can read, write, edit, delete, "
                "move, search, and list files and directories using the available MCP tools. "
                "When the user asks you to perform a file operation, use the appropriate tool. "
                "Always confirm what you did after completing an operation."
            ),
            mcp_servers=[mcp_server],
            model=OpenAIChatCompletionsModel(
                model=self.model,
                openai_client=self._client,
            ),
            model_settings=ModelSettings(tool_choice="auto"),
        )

    async def run(self, query: str) -> str:
        """
        Run a query against the filesystem agent.

        Spawns the MCP server, creates the agent, runs the query,
        then cleans up. Tools are cached for the session lifetime.
        """
        mcp_server = self._create_mcp_server()

        async with mcp_server:
            agent = self._create_agent(mcp_server)
            logger.info("Filesystem MCP server connected, agent ready")

            result = await Runner.run(agent, input=query)
            logger.info("Agent finished")
            return result.final_output


# --------------- CLI entry point ---------------

async def main():
    agent = FileSystemAgent(
        allowed_dirs=[r"D:\deploy\chatbot"],
        model="arcee-ai/trinity-large-preview:free",
    )
    response = await agent.run("read  the file in the src/orchestrator_agents directory with name rag_agent.py and summarize its content")  # Example query

    print("Agent Response:\n", response)


if __name__ == "__main__":
    load_dotenv(find_dotenv())
    asyncio.run(main())