Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import logging | |
| from contextlib import AsyncExitStack | |
| from agents import Agent, Tool, Runner, trace | |
| from agents.mcp.server import MCPServerStdio | |
| from typing import Dict, Any, Optional | |
| from ...utils.tracers import log_trace_id | |
| from ...plugins.java_lineage_agent.java_instructions import comprehensive_analysis_instructions | |
| from ...plugins.java_lineage_agent.mcp_servers.mcp_params import java_mcp_server_params | |
| from ...utils.file_utils import dump_json_record | |
| # Get logger for this module | |
| logger = logging.getLogger(__name__) | |
| MAX_TURNS = 30 # Increased for comprehensive analysis | |
| class JavaLineageAgent: | |
| """Plugin agent for Java lineage analysis""" | |
| def __init__(self, agent_name: str, source_code: str, model_name: str = "gpt-4o-mini", get_model_func=None): | |
| self.agent_name = agent_name | |
| self.model_name = model_name | |
| self.source_code = source_code | |
| self.get_model_func = get_model_func | |
| async def create_agent(self, java_mcp_servers) -> Agent: | |
| # Use the passed get_model_func or fall back to the centralized one | |
| if self.get_model_func: | |
| model = self.get_model_func(self.model_name) | |
| else: | |
| from ...utils import get_model | |
| model = get_model(self.model_name) | |
| agent = Agent( | |
| name=self.agent_name, | |
| instructions=comprehensive_analysis_instructions(self.agent_name), | |
| model=model, | |
| mcp_servers=java_mcp_servers, | |
| ) | |
| return agent | |
| async def run_agent(self, java_mcp_servers, source_code: str): | |
| # Create single agent for comprehensive analysis | |
| comprehensive_agent = await self.create_agent(java_mcp_servers) | |
| # Run the complete analysis in one go | |
| result = await Runner.run(comprehensive_agent, source_code, max_turns=MAX_TURNS) | |
| # Return the final output | |
| return dump_json_record(self.agent_name, result.final_output) | |
| async def run_with_mcp_servers(self, source_code: str): | |
| async with AsyncExitStack() as stack: | |
| java_mcp_servers = [ | |
| await stack.enter_async_context( | |
| MCPServerStdio(params, client_session_timeout_seconds=120) | |
| ) | |
| for params in java_mcp_server_params | |
| ] | |
| return await self.run_agent(java_mcp_servers, source_code=source_code) | |
| async def run_with_trace(self, source_code: str): | |
| trace_name = f"{self.agent_name}-lineage-agent" | |
| trace_id = log_trace_id(f"{self.agent_name.lower()}") | |
| with trace(trace_name, trace_id=trace_id): | |
| return await self.run_with_mcp_servers(source_code=source_code) | |
| async def run(self): | |
| try: | |
| logger.info(f"Starting Java lineage analysis for {self.agent_name}") | |
| result = await self.run_with_trace(self.source_code) | |
| logger.info(f"Completed Java lineage analysis for {self.agent_name}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error running {self.agent_name}: {e}") | |
| return {"error": str(e)} | |
| # Plugin interface functions | |
| def create_java_lineage_agent(agent_name: str, source_code: str, model_name: str = "gpt-4o-mini", get_model_func=None) -> JavaLineageAgent: | |
| """Factory function to create a JavaLineageAgent instance""" | |
| return JavaLineageAgent(agent_name=agent_name, source_code=source_code, model_name=model_name, get_model_func=get_model_func) | |
| def get_plugin_info() -> Dict[str, Any]: | |
| """Return plugin metadata""" | |
| return { | |
| "name": "java-lineage-agent", | |
| "description": "Java lineage analysis agent for parsing and analyzing Java queries", | |
| "version": "1.0.0", | |
| "author": "Ali Shamsaddinlou", | |
| "agent_class": JavaLineageAgent, | |
| "factory_function": create_java_lineage_agent, | |
| } |