import sys import os import subprocess import glob # Fix: ensure the project root is on sys.path so 'from src.providers' resolves # correctly regardless of how the MCP host process launches this script. _project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _project_root not in sys.path: sys.path.insert(0, _project_root) from dotenv import load_dotenv # Use standard MCP library from mcp.server.fastmcp import FastMCP # We reuse our existing GeminiBrain for consistency from src.providers import GeminiBrain load_dotenv( dotenv_path=os.path.join(_project_root, ".env") ) # Initialize the FastMCP server mcp = FastMCP("ContextBrain MR-Guardian") # ───────────────────────────────────────────── # TOOL 1 — Scan a folder for testable files # ───────────────────────────────────────────── @mcp.tool() def scan_folder(folder_path: str, extensions: str = ".py") -> str: """ Scan a local folder and list all source files by extension (default: .py). Returns a structured list of files with their sizes — useful for deciding which files to generate tests for. Args: folder_path: Absolute path to the folder to scan. extensions: Comma-separated list of file extensions to include (e.g. '.py,.js'). """ if not os.path.isdir(folder_path): return f"❌ Folder not found: {folder_path}" ext_list = [e.strip() for e in extensions.split(",")] found = [] for root, dirs, files in os.walk(folder_path): # Skip hidden dirs and Python cache dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ("__pycache__", "venv", "node_modules", ".git")] for fname in files: if any(fname.endswith(ext) for ext in ext_list): full = os.path.join(root, fname) rel = os.path.relpath(full, folder_path) size = os.path.getsize(full) found.append((rel, full, size)) if not found: return f"No files with extensions {extensions} found in {folder_path}" lines = [f"📁 Folder: {folder_path}", f"Found {len(found)} file(s):\n"] for rel, full, size in sorted(found): lines.append(f" • {rel} ({size} bytes)") lines.append(f" → Full path: {full}") return "\n".join(lines) # ───────────────────────────────────────────── # TOOL 2 — Read a file (return full content) # ───────────────────────────────────────────── @mcp.tool() def read_file(filepath: str) -> str: """ Read the full contents of any local file and return it as a string. Use this to inspect source files before generating tests for them. Args: filepath: Absolute path to the file to read. """ if not os.path.isfile(filepath): return f"❌ File not found: {filepath}" try: with open(filepath, "r", encoding="utf-8", errors="replace") as f: content = f.read() lines = content.count("\n") + 1 return ( f"📄 File: {filepath}\n" f"Lines: {lines} | Size: {len(content)} chars\n" f"{'─' * 60}\n" f"{content}" ) except Exception as e: return f"❌ Error reading file: {e}" # ───────────────────────────────────────────── # TOOL 3 — Generate tests for a file (AI) # ───────────────────────────────────────────── @mcp.tool() def generate_local_tests(filepath: str, language: str = "python") -> str: """ Use Context Brain (Gemini AI) to generate unit tests for a source file. Saves the test file adjacent to the source and returns the content. Args: filepath: Absolute path to the source file to generate tests for. language: Programming language ('python' or 'javascript'). Default: python. """ api_key = os.getenv("GOOGLE_API_KEY") if not api_key: return "❌ Missing GOOGLE_API_KEY in environment or .env file." if not os.path.isfile(filepath): return f"❌ File not found: {filepath}" try: with open(filepath, "r", encoding="utf-8", errors="replace") as f: content = f.read() except Exception as e: return f"❌ Error reading source file: {e}" brain = GeminiBrain(api_key) prompt = f"""You are an elite Staff Software Development Engineer in Test (SDET). Your task is to write highly robust, production-ready unit tests for the following {language} code. Use popular frameworks (`pytest` for Python, `jest` for JS/Node). SOURCE CODE ({os.path.basename(filepath)}): {content} REQUIREMENTS: 1. Cover both happy paths and edge cases. 2. Include realistic mock data where external dependencies exist. 3. Ensure the test code is immediately runnable without syntax errors. 4. RETURN ONLY THE RAW RUNNABLE TEST CODE. 5. Do NOT wrap in markdown code blocks. Do NOT include explanatory text. STRICT RULES: - For Python: only use `import pytest` and direct imports of the function under test. - NEVER use unittest.mock, @patch, or MagicMock. - DO NOT test functions that use cloud SDKs, databases, or file I/O. """ try: response = brain.model.generate_content(prompt) test_content = response.text.strip() # Strip any markdown fences the model may still add if test_content.startswith("```"): test_content = test_content.split("\n", 1)[-1] test_content = test_content.rsplit("```", 1)[0].strip() # Strip language tag if present for tag in ("python\n", "javascript\n", "python", "javascript"): if test_content.startswith(tag): test_content = test_content[len(tag):] break test_content = test_content.strip() # Determine output file name dir_name = os.path.dirname(filepath) base_name = os.path.basename(filepath) if language.lower() == "python": test_filename = f"test_{base_name}" else: test_filename = f"{os.path.splitext(base_name)[0]}.test.js" test_path = os.path.join(dir_name, test_filename) with open(test_path, "w", encoding="utf-8") as f: f.write(test_content) preview = test_content[:800] return ( f"✅ Tests generated and saved to: {test_path}\n\n" f"{'─' * 60}\n" f"PREVIEW (first 800 chars):\n{preview}\n" f"{'─' * 60}\n" f"Total generated: {len(test_content)} chars" ) except Exception as e: return f"❌ AI Generation Failed: {e}" # ───────────────────────────────────────────── # TOOL 4 — Execute tests and return full logs # ───────────────────────────────────────────── @mcp.tool() def execute_tests(test_command: str, working_dir: str = "") -> str: """ Run a test command locally and return the full execution logs. Examples: 'pytest -v', 'pytest tests/test_calculator.py -v', 'npm test'. Args: test_command: The shell command to run (e.g. 'pytest -v'). working_dir: Optional absolute path to run the command from. Defaults to CWD. """ cwd = working_dir if working_dir and os.path.isdir(working_dir) else None try: result = subprocess.run( test_command, shell=True, capture_output=True, text=True, timeout=120, cwd=cwd, ) status = "✅ PASS" if result.returncode == 0 else "❌ FAIL" output = ( f"--- Test Execution Complete ---\n" f"Command : {test_command}\n" f"Directory: {cwd or os.getcwd()}\n" f"Exit Code: {result.returncode} ({status})\n\n" f"STDOUT:\n{result.stdout or '(no output)'}\n" ) if result.stderr: output += f"\nSTDERR:\n{result.stderr}\n" return output except subprocess.TimeoutExpired: return f"❌ Timeout: '{test_command}' ran for over 120 seconds and was killed." except Exception as e: return f"❌ Execution Error: {e}" if __name__ == "__main__": # Support both stdio (local) and sse (cloud/remote) transports transport = os.getenv("MCP_TRANSPORT", "stdio").lower() if transport == "sse": from mcp.server.sse import SseServerTransport import uvicorn port = int(os.getenv("PORT", "7860")) host = "0.0.0.0" print(f"🚀 ContextBrain MR-Guardian MCP Server starting on SSE...", file=sys.stderr) print(f"🚀 Manual SSE Launch: Binding to {host}:{port}", file=sys.stderr) sse = SseServerTransport("/messages") async def app(scope, receive, send): if scope["type"] == "http": if scope["path"] == "/sse": async with sse.connect_sse(scope, receive, send) as (read_stream, write_stream): await mcp._mcp_server.run( read_stream, write_stream, mcp._mcp_server.create_initialization_options() ) elif scope["path"].startswith("/messages"): await sse.handle_post_message(scope, receive, send) else: await send({ "type": "http.response.start", "status": 404, "headers": [(b"content-type", b"text/plain")], }) await send({ "type": "http.response.body", "body": b"Not Found", }) uvicorn.run(app, host=host, port=port) else: print("🚀 ContextBrain MR-Guardian MCP Server starting on stdio...", file=sys.stderr) mcp.run(transport="stdio")