AI-Package-Doctor / src /combined_server.py
Yash030's picture
Solved OpenRouter Model Inavailability Problem
2e3671b
"""
Combined Server for AI-Package-Doctor.
Runs both the ADK Web UI and the MCP Server on the same FastAPI app.
"""
import os
import sys
import asyncio
# CRITICAL: Set event loop policy BEFORE any other imports
# Fix for Playwright on Windows with nest_asyncio
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Add project root to sys.path to allow imports from src
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import uvicorn
import nest_asyncio
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
import mcp.types as types
# ADK Imports
from google.adk.cli.adk_web_server import (
AdkWebServer, BaseAgentLoader, EvalSetsManager, EvalSetResultsManager,
BaseCredentialService
)
from google.adk.artifacts import FileArtifactService
# Import concrete implementations
from google.adk.evaluation.local_eval_sets_manager import LocalEvalSetsManager
from google.adk.evaluation.local_eval_set_results_manager import LocalEvalSetResultsManager
from src.config import get_session_service, get_memory_service
from src.agents import create_root_agent
from src.utils import logger
from typing import Optional, Any
# Apply nest_asyncio
nest_asyncio.apply()
# --- 1. ADK Setup Classes ---
class SingleAgentLoader(BaseAgentLoader):
"""Custom loader that serves our single root agent."""
def __init__(self, agent):
self.agent = agent
self.agent_name = "package_conflict_resolver"
def list_agents(self) -> list[str]:
return [self.agent_name]
def load_agent(self, agent_name: str):
if agent_name == self.agent_name:
return self.agent
raise ValueError(f"Agent {agent_name} not found")
class LocalCredentialService(BaseCredentialService):
"""Simple credential service implementation."""
def __init__(self, base_dir: str):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)
def load_credential(self, auth_config: Any, callback_context: Any) -> Optional[Any]:
return None
def save_credential(self, auth_config: Any, callback_context: Any) -> None:
pass
# --- 2. Initialize Services & Agent ---
logger.info("🌐 Initializing Services...")
session_service = get_session_service()
memory_service = get_memory_service()
data_dir = os.path.abspath("data")
os.makedirs(data_dir, exist_ok=True)
artifact_service = FileArtifactService(root_dir=os.path.join(data_dir, "artifacts"))
credential_service = LocalCredentialService(base_dir=os.path.join(data_dir, "credentials"))
# Use concrete managers with correct arguments
eval_sets_manager = LocalEvalSetsManager(agents_dir=data_dir)
eval_set_results_manager = LocalEvalSetResultsManager(agents_dir=data_dir)
logger.info("πŸ€– Creating Root Agent...")
root_agent = create_root_agent()
agent_loader = SingleAgentLoader(root_agent)
# --- 3. Create ADK Web App ---
logger.info("πŸš€ Creating ADK Web Server...")
adk_server = AdkWebServer(
agent_loader=agent_loader,
session_service=session_service,
memory_service=memory_service,
artifact_service=artifact_service,
credential_service=credential_service,
eval_sets_manager=eval_sets_manager,
eval_set_results_manager=eval_set_results_manager,
agents_dir=os.path.abspath("src")
)
# Calculate web_assets_dir dynamically
import google.adk.cli
web_assets_dir = os.path.join(os.path.dirname(google.adk.cli.__file__), "browser")
logger.info(f"πŸ“‚ Serving Web UI from: {web_assets_dir}")
# This is the main FastAPI app
app = adk_server.get_fast_api_app(web_assets_dir=web_assets_dir)
# --- 4. Create MCP Server (Standard Implementation) ---
logger.info("πŸ”Œ Creating MCP Server...")
mcp_server = Server("AI-Package-Doctor")
@mcp_server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return [
types.Tool(
name="solve_dependency_issue",
description="Analyzes and resolves Python dependency conflicts based on a description.",
inputSchema={
"type": "object",
"properties": {
"issue_description": {
"type": "string",
"description": "A detailed description of the dependency problem, error logs, or requirements.txt content."
}
},
"required": ["issue_description"]
}
)
]
@mcp_server.call_tool()
async def handle_call_tool(name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.info(f"πŸ”§ Tool called: {name} with arguments: {arguments}")
try:
if name == "solve_dependency_issue":
issue_description = arguments.get("issue_description") if arguments else None
if not issue_description:
error_msg = "Missing issue_description parameter"
logger.error(f"❌ {error_msg}")
return [types.TextContent(type="text", text=f"Error: {error_msg}")]
from google.adk import Runner
from google.genai import types as genai_types
import uuid
session_id = f"mcp-session-{uuid.uuid4()}"
logger.info(f"βœ… Processing tool call (Session: {session_id})")
logger.info(f"πŸ“ Issue description: {issue_description[:100]}...")
try:
# Create session
await session_service.create_session(
session_id=session_id,
user_id="mcp_user",
app_name="package_conflict_resolver"
)
logger.info(f"βœ… Session created: {session_id}")
runner = Runner(
agent=root_agent,
app_name="package_conflict_resolver",
session_service=session_service
)
user_msg = genai_types.Content(
role="user",
parts=[genai_types.Part.from_text(text=issue_description)]
)
response_text = ""
# Run agent and collect response
logger.info("πŸ€– Running agent...")
response_generator = runner.run(
session_id=session_id,
user_id="mcp_user",
new_message=user_msg
)
for event in response_generator:
# Log event author for debugging
author = getattr(event, 'author', 'unknown')
logger.info(f"πŸ“¨ Event received from: {author}")
# FILTER: Only return output from the final agent (Code_Surgeon_Agent)
if author == "Code_Surgeon_Agent":
if hasattr(event, 'content') and event.content and hasattr(event.content, 'parts'):
if event.content.parts:
text = event.content.parts[0].text
if text and text != "None":
response_text += text
elif hasattr(event, 'text'):
response_text += event.text
elif isinstance(event, str):
response_text += event
logger.info(f"βœ… Agent completed. Response length: {len(response_text)} chars")
if not response_text:
response_text = "No response generated from agent. Please check server logs."
return [types.TextContent(type="text", text=response_text)]
except Exception as e:
error_msg = f"Error running agent: {str(e)}"
logger.error(f"❌ {error_msg}", exc_info=True)
return [types.TextContent(type="text", text=f"Error: {error_msg}")]
error_msg = f"Unknown tool: {name}"
logger.error(f"❌ {error_msg}")
return [types.TextContent(type="text", text=f"Error: {error_msg}")]
except Exception as e:
error_msg = f"Unexpected error in tool handler: {str(e)}"
logger.error(f"❌ {error_msg}", exc_info=True)
return [types.TextContent(type="text", text=f"Error: {error_msg}")]
# --- 5. Mount MCP SSE Endpoint ---
# We need to manage the SSE transport manually using raw ASGI routes
sse_transport = SseServerTransport("/mcp/messages")
async def handle_sse(request: Request):
"""
Handler for SSE endpoint.
Returns an ASGI app that manages the connection.
"""
async def sse_asgi_app(scope, receive, send):
async with sse_transport.connect_sse(scope, receive, send) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options()
)
return sse_asgi_app
async def handle_messages(request: Request):
"""
Handler for Messages endpoint.
Returns the ASGI app from sse_transport.
"""
return sse_transport.handle_post_message
# Add routes directly to the FastAPI app
app.add_route("/mcp/sse", handle_sse, methods=["GET"])
app.add_route("/mcp/messages", handle_messages, methods=["POST"])
from fastapi.responses import RedirectResponse
@app.get("/")
async def root():
return RedirectResponse(url="/dev-ui/")
# --- Add Builder Route (Fixes 404) ---
from fastapi.responses import PlainTextResponse, FileResponse
from pathlib import Path
@app.get(
"/builder/app/{app_name}",
response_model_exclude_none=True,
response_class=PlainTextResponse,
)
async def get_agent_builder(
app_name: str,
file_path: Optional[str] = None,
tmp: Optional[bool] = False,
):
# We use the same agents_dir as defined above
agents_path = Path(os.path.abspath("src"))
agent_dir = agents_path # In our case, src is the root for the agent code
# If app_name is "package_conflict_resolver", it might be looking for a subdir
# But our code is in src/agents.py.
# The standard ADK structure has agents_dir/app_name/root_agent.yaml
# We don't have that structure or YAML files.
# So we just return empty string to satisfy the UI, as we are code-first.
return ""
logger.info("βœ… Combined Server Configured")
logger.info("πŸ‘‰ Web UI: http://0.0.0.0:7860/dev-ui/")
logger.info("πŸ‘‰ MCP SSE: http://0.0.0.0:7860/mcp/sse")
if __name__ == "__main__":
# Run with uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)