Spaces:
Paused
Paused
| from mcp.server.fastmcp import FastMCP | |
| import httpx | |
| import os | |
| import logging | |
| import json | |
| import subprocess | |
| import asyncio | |
| import re | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("mcp-analysis") | |
| mcp = FastMCP("analysis") | |
| API_BASE_URL = "https://analysis.googleapis.com/v1alpha" | |
| class AnalysisClient: | |
| def __init__(self, api_key: str): | |
| self.api_key = api_key | |
| self.base_url = API_BASE_URL | |
| self.headers = { | |
| "Content-Type": "application/json", | |
| "x-goog-api-key": self.api_key | |
| } | |
| async def _request(self, method: str, path: str, **kwargs): | |
| # Handle session_id prefixing | |
| if path.startswith("/sessions/sessions/"): | |
| path = path.replace("/sessions/sessions/", "/sessions/") | |
| url = f"{self.base_url}{path}" | |
| async with httpx.AsyncClient() as client: | |
| response = await client.request(method, url, headers=self.headers, **kwargs) | |
| if response.status_code not in (200, 204): | |
| logger.error(f"Jules API Error ({method} {path}): {response.status_code} - {response.text}") | |
| response.raise_for_status() | |
| return response.json() if response.text else {} | |
| async def list_sources(self): | |
| return await self._request("GET", "/sources") | |
| async def get_source(self, name: str): | |
| if not name.startswith("sources/"): name = f"sources/{name}" | |
| return await self._request("GET", f"/{name}") | |
| async def create_session(self, prompt, source, branch="main", title="API Session", automation="AUTO_CREATE_PR"): | |
| # Format source for API if it is just owner/repo | |
| if "/" in source and "sources/github/" not in source: | |
| source = f"sources/github/{source}" | |
| payload = { | |
| "prompt": prompt, | |
| "sourceContext": { | |
| "source": source, | |
| "githubRepoContext": {"startingBranch": branch} | |
| } | |
| } | |
| if title: payload["title"] = title | |
| if automation: payload["automationMode"] = automation | |
| logger.info(f"Creating session with payload: {json.dumps(payload)}") | |
| try: | |
| return await self._request("POST", "/sessions", json=payload) | |
| except Exception as e: | |
| err_msg = str(e).lower() | |
| if "maximum number of sessions" in err_msg or "max sessions reached" in err_msg or "400" in err_msg: | |
| logger.warning(f"Possible quota/session limit error detected: {e}. Triggering early key rotation.") | |
| force_rotate_key() | |
| return {"error": f"Session creation failed: {e}. API key has been rotated. Please retry."} | |
| logger.warning(f"REST API create_session failed: {e}. Falling back to CLI.") | |
| cli_source = source.replace("sources/github/", "") | |
| return await self.create_session_cli(prompt, cli_source, branch, title) | |
| async def create_session_cli(self, prompt, source, branch="main", title="API Session"): | |
| """Fallback to analysis CLI for session creation.""" | |
| logger.info(f"Using analysis CLI to create session for {source}") | |
| cmd = [ | |
| "analysis", "new", | |
| "--repo", source, | |
| prompt | |
| ] | |
| env = os.environ.copy() | |
| env["JULES_API_KEY"] = self.api_key | |
| npm_bin = subprocess.check_output(["npm", "config", "get", "prefix"]).decode().strip() + "/bin" | |
| env["PATH"] = f"{env.get('PATH', '')}:{npm_bin}" | |
| try: | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env | |
| ) | |
| stdout, stderr = await process.communicate() | |
| output = stdout.decode().strip() | |
| match = re.search(r'sessions/(\d+)', output) | |
| if match: | |
| s_id = match.group(0) | |
| return {"name": s_id, "id": match.group(1)} | |
| return {"output": output} | |
| except: | |
| return {"error": "CLI failed"} | |
| async def get_session(self, session_id): | |
| session_id = str(session_id).strip() | |
| if not session_id.startswith("sessions/"): session_id = f"sessions/{session_id}" | |
| return await self._request("GET", f"/{session_id}") | |
| async def list_sessions(self, page_size=30): | |
| return await self._request("GET", "/sessions", params={"pageSize": page_size}) | |
| async def send_message(self, session_id, prompt): | |
| session_id = str(session_id).strip() | |
| if not session_id.startswith("sessions/"): session_id = f"sessions/{session_id}" | |
| return await self._request("POST", f"/{session_id}:sendMessage", json={"prompt": prompt}) | |
| async def approve_plan(self, session_id): | |
| session_id = str(session_id).strip() | |
| if not session_id.startswith("sessions/"): session_id = f"sessions/{session_id}" | |
| return await self._request("POST", f"/{session_id}:approvePlan", json={}) | |
| async def get_activity(self, name: str): | |
| name = str(name).strip() | |
| return await self._request("GET", f"/{name}") | |
| async def list_activities(self, session_id, page_size=50): | |
| session_id = str(session_id).strip() | |
| if not session_id.startswith("sessions/"): session_id = f"sessions/{session_id}" | |
| return await self._request("GET", f"/{session_id}/activities", params={"pageSize": page_size}) | |
| # API Key Rotation State | |
| REQUEST_COUNT = 0 | |
| CURRENT_KEY_INDEX = 0 | |
| def force_rotate_key(): | |
| """Immediately switch to the next available API key and reset counter.""" | |
| global REQUEST_COUNT, CURRENT_KEY_INDEX | |
| keys = [os.environ.get("JULES_API_KEY"), os.environ.get("JULES_API_KEY_FALLBACK")] | |
| keys = [k for k in keys if k] | |
| if len(keys) > 1: | |
| CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(keys) | |
| REQUEST_COUNT = 0 | |
| logger.info(f"FORCE ROTATION: Switched to Jules API Key index {CURRENT_KEY_INDEX}") | |
| def get_rotated_api_key(): | |
| global REQUEST_COUNT, CURRENT_KEY_INDEX | |
| keys = [os.environ.get("JULES_API_KEY"), os.environ.get("JULES_API_KEY_FALLBACK")] | |
| # Filter out None or empty keys | |
| keys = [k for k in keys if k] | |
| if not keys: | |
| logger.error("No Jules API keys found in environment.") | |
| return None | |
| key = keys[CURRENT_KEY_INDEX] | |
| REQUEST_COUNT += 1 | |
| if REQUEST_COUNT >= 3: | |
| REQUEST_COUNT = 0 | |
| old_index = CURRENT_KEY_INDEX | |
| CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(keys) | |
| if len(keys) > 1 and old_index != CURRENT_KEY_INDEX: | |
| logger.info(f"Requests per key reached. Rotating Jules API Key from index {old_index} to {CURRENT_KEY_INDEX}") | |
| return key | |
| def get_client(): | |
| key = get_rotated_api_key() | |
| return AnalysisClient(key) | |
| async def list_sources(): | |
| """List all available sources (repositories).""" | |
| return await get_client().list_sources() | |
| async def get_source(name: str): | |
| """ | |
| Get details about a specific source. | |
| :param name: The source name (e.g., 'sources/github/owner/repo'). | |
| """ | |
| return await get_client().get_source(name) | |
| async def create_session( | |
| prompt: str, | |
| source: str, | |
| branch: str = "main", | |
| title: str = "API Session", | |
| automation_mode: str = "AUTO_CREATE_PR" | |
| ): | |
| """ | |
| Initialize a new autonomous coding session for a repository. | |
| Args: | |
| prompt: The implementation task or instructions for Jules. | |
| source: Source repository path (e.g., 'sources/github/owner/repo'). | |
| branch: The branch to use. | |
| title: Title for the session. | |
| automation_mode: Defaults to 'AUTO_CREATE_PR'. | |
| """ | |
| return await get_client().create_session(prompt, source, branch, title, automation_mode) | |
| async def get_session(session_id: str): | |
| """ | |
| Get the current status and details of a session. | |
| :param session_id: The session ID (e.g., 'sessions/123'). | |
| """ | |
| return await get_client().get_session(session_id) | |
| async def list_sessions(page_size: int = 30): | |
| """List recent Jules sessions.""" | |
| return await get_client().list_sessions(page_size) | |
| async def send_message(session_id: str, prompt: str): | |
| """ | |
| Send a message or feedback to an existing session. | |
| :param session_id: The session ID (e.g., 'sessions/123'). | |
| :param prompt: The message/instruction text. | |
| """ | |
| return await get_client().send_message(session_id, prompt) | |
| async def approve_plan(session_id: str): | |
| """ | |
| Approve the implementation plan for a session. | |
| :param session_id: The session ID. | |
| """ | |
| return await get_client().approve_plan(session_id) | |
| async def get_activity(name: str): | |
| """ | |
| Get details about a specific activity. | |
| :param name: Activity resource name (e.g., 'sessions/123/activities/456'). | |
| """ | |
| return await get_client().get_activity(name) | |
| async def list_activities(session_id: str, page_size: int = 50): | |
| """ | |
| List activities for a specific session. | |
| :param session_id: The session ID. | |
| :param page_size: Number of activities to return. | |
| """ | |
| return await get_client().list_activities(session_id, page_size) | |
| if __name__ == "__main__": mcp.run() | |