alishams21's picture
Upload folder using huggingface_hub
e00e744 verified
import os
import sys
import logging
from contextlib import AsyncExitStack
from agents import Agent, Tool, Runner, trace
from agents.mcp.server import MCPServerStdio
from typing import Dict, Any, Optional
from ...utils.tracers import log_trace_id
from ...plugins.java_lineage_agent.java_instructions import comprehensive_analysis_instructions
from ...plugins.java_lineage_agent.mcp_servers.mcp_params import java_mcp_server_params
from ...utils.file_utils import dump_json_record
# Get logger for this module
logger = logging.getLogger(__name__)
MAX_TURNS = 30 # Increased for comprehensive analysis
class JavaLineageAgent:
"""Plugin agent for Java lineage analysis"""
def __init__(self, agent_name: str, source_code: str, model_name: str = "gpt-4o-mini", get_model_func=None):
self.agent_name = agent_name
self.model_name = model_name
self.source_code = source_code
self.get_model_func = get_model_func
async def create_agent(self, java_mcp_servers) -> Agent:
# Use the passed get_model_func or fall back to the centralized one
if self.get_model_func:
model = self.get_model_func(self.model_name)
else:
from ...utils import get_model
model = get_model(self.model_name)
agent = Agent(
name=self.agent_name,
instructions=comprehensive_analysis_instructions(self.agent_name),
model=model,
mcp_servers=java_mcp_servers,
)
return agent
async def run_agent(self, java_mcp_servers, source_code: str):
# Create single agent for comprehensive analysis
comprehensive_agent = await self.create_agent(java_mcp_servers)
# Run the complete analysis in one go
result = await Runner.run(comprehensive_agent, source_code, max_turns=MAX_TURNS)
# Return the final output
return dump_json_record(self.agent_name, result.final_output)
async def run_with_mcp_servers(self, source_code: str):
async with AsyncExitStack() as stack:
java_mcp_servers = [
await stack.enter_async_context(
MCPServerStdio(params, client_session_timeout_seconds=120)
)
for params in java_mcp_server_params
]
return await self.run_agent(java_mcp_servers, source_code=source_code)
async def run_with_trace(self, source_code: str):
trace_name = f"{self.agent_name}-lineage-agent"
trace_id = log_trace_id(f"{self.agent_name.lower()}")
with trace(trace_name, trace_id=trace_id):
return await self.run_with_mcp_servers(source_code=source_code)
async def run(self):
try:
logger.info(f"Starting Java lineage analysis for {self.agent_name}")
result = await self.run_with_trace(self.source_code)
logger.info(f"Completed Java lineage analysis for {self.agent_name}")
return result
except Exception as e:
logger.error(f"Error running {self.agent_name}: {e}")
return {"error": str(e)}
# Plugin interface functions
def create_java_lineage_agent(agent_name: str, source_code: str, model_name: str = "gpt-4o-mini", get_model_func=None) -> JavaLineageAgent:
"""Factory function to create a JavaLineageAgent instance"""
return JavaLineageAgent(agent_name=agent_name, source_code=source_code, model_name=model_name, get_model_func=get_model_func)
def get_plugin_info() -> Dict[str, Any]:
"""Return plugin metadata"""
return {
"name": "java-lineage-agent",
"description": "Java lineage analysis agent for parsing and analyzing Java queries",
"version": "1.0.0",
"author": "Ali Shamsaddinlou",
"agent_class": JavaLineageAgent,
"factory_function": create_java_lineage_agent,
}