Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| import subprocess | |
| import glob | |
| # Fix: ensure the project root is on sys.path so 'from src.providers' resolves | |
| # correctly regardless of how the MCP host process launches this script. | |
| _project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if _project_root not in sys.path: | |
| sys.path.insert(0, _project_root) | |
| from dotenv import load_dotenv | |
| # Use standard MCP library | |
| from mcp.server.fastmcp import FastMCP | |
| # We reuse our existing GeminiBrain for consistency | |
| from src.providers import GeminiBrain | |
| load_dotenv( | |
| dotenv_path=os.path.join(_project_root, ".env") | |
| ) | |
| # Initialize the FastMCP server | |
| mcp = FastMCP("ContextBrain MR-Guardian") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 1 β Scan a folder for testable files | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def scan_folder(folder_path: str, extensions: str = ".py") -> str: | |
| """ | |
| Scan a local folder and list all source files by extension (default: .py). | |
| Returns a structured list of files with their sizes β useful for deciding | |
| which files to generate tests for. | |
| Args: | |
| folder_path: Absolute path to the folder to scan. | |
| extensions: Comma-separated list of file extensions to include (e.g. '.py,.js'). | |
| """ | |
| if not os.path.isdir(folder_path): | |
| return f"β Folder not found: {folder_path}" | |
| ext_list = [e.strip() for e in extensions.split(",")] | |
| found = [] | |
| for root, dirs, files in os.walk(folder_path): | |
| # Skip hidden dirs and Python cache | |
| dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ("__pycache__", "venv", "node_modules", ".git")] | |
| for fname in files: | |
| if any(fname.endswith(ext) for ext in ext_list): | |
| full = os.path.join(root, fname) | |
| rel = os.path.relpath(full, folder_path) | |
| size = os.path.getsize(full) | |
| found.append((rel, full, size)) | |
| if not found: | |
| return f"No files with extensions {extensions} found in {folder_path}" | |
| lines = [f"π Folder: {folder_path}", f"Found {len(found)} file(s):\n"] | |
| for rel, full, size in sorted(found): | |
| lines.append(f" β’ {rel} ({size} bytes)") | |
| lines.append(f" β Full path: {full}") | |
| return "\n".join(lines) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 2 β Read a file (return full content) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def read_file(filepath: str) -> str: | |
| """ | |
| Read the full contents of any local file and return it as a string. | |
| Use this to inspect source files before generating tests for them. | |
| Args: | |
| filepath: Absolute path to the file to read. | |
| """ | |
| if not os.path.isfile(filepath): | |
| return f"β File not found: {filepath}" | |
| try: | |
| with open(filepath, "r", encoding="utf-8", errors="replace") as f: | |
| content = f.read() | |
| lines = content.count("\n") + 1 | |
| return ( | |
| f"π File: {filepath}\n" | |
| f"Lines: {lines} | Size: {len(content)} chars\n" | |
| f"{'β' * 60}\n" | |
| f"{content}" | |
| ) | |
| except Exception as e: | |
| return f"β Error reading file: {e}" | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 3 β Generate tests for a file (AI) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_local_tests(filepath: str, language: str = "python") -> str: | |
| """ | |
| Use Context Brain (Gemini AI) to generate unit tests for a source file. | |
| Saves the test file adjacent to the source and returns the content. | |
| Args: | |
| filepath: Absolute path to the source file to generate tests for. | |
| language: Programming language ('python' or 'javascript'). Default: python. | |
| """ | |
| api_key = os.getenv("GOOGLE_API_KEY") | |
| if not api_key: | |
| return "β Missing GOOGLE_API_KEY in environment or .env file." | |
| if not os.path.isfile(filepath): | |
| return f"β File not found: {filepath}" | |
| try: | |
| with open(filepath, "r", encoding="utf-8", errors="replace") as f: | |
| content = f.read() | |
| except Exception as e: | |
| return f"β Error reading source file: {e}" | |
| brain = GeminiBrain(api_key) | |
| prompt = f"""You are an elite Staff Software Development Engineer in Test (SDET). | |
| Your task is to write highly robust, production-ready unit tests for the following {language} code. | |
| Use popular frameworks (`pytest` for Python, `jest` for JS/Node). | |
| SOURCE CODE ({os.path.basename(filepath)}): | |
| {content} | |
| REQUIREMENTS: | |
| 1. Cover both happy paths and edge cases. | |
| 2. Include realistic mock data where external dependencies exist. | |
| 3. Ensure the test code is immediately runnable without syntax errors. | |
| 4. RETURN ONLY THE RAW RUNNABLE TEST CODE. | |
| 5. Do NOT wrap in markdown code blocks. Do NOT include explanatory text. | |
| STRICT RULES: | |
| - For Python: only use `import pytest` and direct imports of the function under test. | |
| - NEVER use unittest.mock, @patch, or MagicMock. | |
| - DO NOT test functions that use cloud SDKs, databases, or file I/O. | |
| """ | |
| try: | |
| response = brain.model.generate_content(prompt) | |
| test_content = response.text.strip() | |
| # Strip any markdown fences the model may still add | |
| if test_content.startswith("```"): | |
| test_content = test_content.split("\n", 1)[-1] | |
| test_content = test_content.rsplit("```", 1)[0].strip() | |
| # Strip language tag if present | |
| for tag in ("python\n", "javascript\n", "python", "javascript"): | |
| if test_content.startswith(tag): | |
| test_content = test_content[len(tag):] | |
| break | |
| test_content = test_content.strip() | |
| # Determine output file name | |
| dir_name = os.path.dirname(filepath) | |
| base_name = os.path.basename(filepath) | |
| if language.lower() == "python": | |
| test_filename = f"test_{base_name}" | |
| else: | |
| test_filename = f"{os.path.splitext(base_name)[0]}.test.js" | |
| test_path = os.path.join(dir_name, test_filename) | |
| with open(test_path, "w", encoding="utf-8") as f: | |
| f.write(test_content) | |
| preview = test_content[:800] | |
| return ( | |
| f"β Tests generated and saved to: {test_path}\n\n" | |
| f"{'β' * 60}\n" | |
| f"PREVIEW (first 800 chars):\n{preview}\n" | |
| f"{'β' * 60}\n" | |
| f"Total generated: {len(test_content)} chars" | |
| ) | |
| except Exception as e: | |
| return f"β AI Generation Failed: {e}" | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 4 β Execute tests and return full logs | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def execute_tests(test_command: str, working_dir: str = "") -> str: | |
| """ | |
| Run a test command locally and return the full execution logs. | |
| Examples: 'pytest -v', 'pytest tests/test_calculator.py -v', 'npm test'. | |
| Args: | |
| test_command: The shell command to run (e.g. 'pytest -v'). | |
| working_dir: Optional absolute path to run the command from. Defaults to CWD. | |
| """ | |
| cwd = working_dir if working_dir and os.path.isdir(working_dir) else None | |
| try: | |
| result = subprocess.run( | |
| test_command, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=120, | |
| cwd=cwd, | |
| ) | |
| status = "β PASS" if result.returncode == 0 else "β FAIL" | |
| output = ( | |
| f"--- Test Execution Complete ---\n" | |
| f"Command : {test_command}\n" | |
| f"Directory: {cwd or os.getcwd()}\n" | |
| f"Exit Code: {result.returncode} ({status})\n\n" | |
| f"STDOUT:\n{result.stdout or '(no output)'}\n" | |
| ) | |
| if result.stderr: | |
| output += f"\nSTDERR:\n{result.stderr}\n" | |
| return output | |
| except subprocess.TimeoutExpired: | |
| return f"β Timeout: '{test_command}' ran for over 120 seconds and was killed." | |
| except Exception as e: | |
| return f"β Execution Error: {e}" | |
| if __name__ == "__main__": | |
| # Support both stdio (local) and sse (cloud/remote) transports | |
| transport = os.getenv("MCP_TRANSPORT", "stdio").lower() | |
| if transport == "sse": | |
| from mcp.server.sse import SseServerTransport | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| host = "0.0.0.0" | |
| print(f"π ContextBrain MR-Guardian MCP Server starting on SSE...", file=sys.stderr) | |
| print(f"π Manual SSE Launch: Binding to {host}:{port}", file=sys.stderr) | |
| sse = SseServerTransport("/messages") | |
| async def app(scope, receive, send): | |
| if scope["type"] == "http": | |
| if scope["path"] == "/sse": | |
| async with sse.connect_sse(scope, receive, send) as (read_stream, write_stream): | |
| await mcp._mcp_server.run( | |
| read_stream, | |
| write_stream, | |
| mcp._mcp_server.create_initialization_options() | |
| ) | |
| elif scope["path"].startswith("/messages"): | |
| await sse.handle_post_message(scope, receive, send) | |
| else: | |
| await send({ | |
| "type": "http.response.start", | |
| "status": 404, | |
| "headers": [(b"content-type", b"text/plain")], | |
| }) | |
| await send({ | |
| "type": "http.response.body", | |
| "body": b"Not Found", | |
| }) | |
| uvicorn.run(app, host=host, port=port) | |
| else: | |
| print("π ContextBrain MR-Guardian MCP Server starting on stdio...", file=sys.stderr) | |
| mcp.run(transport="stdio") | |