Soumya79's picture
Upload 4 files
46a1e74 verified
from typing import Dict, TypedDict, Annotated, Sequence, Optional
from langgraph.graph import Graph, StateGraph
from agents import MarketAnalysisAgent, RiskScoringAgent, ProjectStatusAgent, ReportingAgent
import logging
from datetime import datetime
import json
from functools import lru_cache
import os
logger = logging.getLogger(__name__)
class AgentState(TypedDict):
market_analysis: Dict
risk_scoring: Dict
project_status: Dict
final_report: Dict
messages: Sequence[str]
error: Optional[str]
timestamp: str
metrics: Dict
def create_workflow() -> Graph:
# Initialize agents with caching
market_agent = MarketAnalysisAgent()
risk_agent = RiskScoringAgent()
project_agent = ProjectStatusAgent()
reporting_agent = ReportingAgent()
# Define the workflow
workflow = StateGraph(AgentState)
@lru_cache(maxsize=100)
def get_cached_state(state_key: str) -> Optional[AgentState]:
try:
with open(f"./cache/{state_key}.json", "r") as f:
return json.load(f)
except:
return None
def save_state_to_cache(state_key: str, state: AgentState):
try:
os.makedirs("./cache", exist_ok=True)
with open(f"./cache/{state_key}.json", "w") as f:
json.dump(state, f)
except Exception as e:
logger.error(f"Error saving state to cache: {str(e)}")
# Market Analysis Node
def analyze_market(state: AgentState) -> AgentState:
try:
state_key = f"market_{hash(json.dumps(state, sort_keys=True))}"
cached_state = get_cached_state(state_key)
if cached_state:
return cached_state
market_data = {
"market_conditions": "current_market_data",
"trends": "market_trends",
"indicators": "economic_indicators",
"it_industry": {
"technology_trends": [],
"competitor_analysis": [],
"market_demand": []
}
}
analysis_result = market_agent.analyze(market_data)
new_state = {
**state,
"market_analysis": analysis_result,
"timestamp": datetime.now().isoformat()
}
save_state_to_cache(state_key, new_state)
return new_state
except Exception as e:
logger.error(f"Error in market analysis: {str(e)}")
return {**state, "error": str(e)}
# Risk Scoring Node
def score_risk(state: AgentState) -> AgentState:
try:
if "error" in state:
return state
state_key = f"risk_{hash(json.dumps(state, sort_keys=True))}"
cached_state = get_cached_state(state_key)
if cached_state:
return cached_state
risk_data = {
"market_analysis": state["market_analysis"],
"project_data": {
"technical_risks": [],
"project_risks": [],
"business_risks": [],
"operational_risks": []
}
}
risk_result = risk_agent.score(risk_data)
new_state = {
**state,
"risk_scoring": risk_result,
"timestamp": datetime.now().isoformat()
}
save_state_to_cache(state_key, new_state)
return new_state
except Exception as e:
logger.error(f"Error in risk scoring: {str(e)}")
return {**state, "error": str(e)}
# Project Status Node
def track_project(state: AgentState) -> AgentState:
try:
if "error" in state:
return state
state_key = f"project_{hash(json.dumps(state, sort_keys=True))}"
cached_state = get_cached_state(state_key)
if cached_state:
return cached_state
project_data = {
"milestones": "current_milestones",
"resources": {
"team_composition": {},
"skill_gaps": [],
"knowledge_transfer": []
},
"technical": {
"architecture_status": [],
"code_quality": [],
"security_status": []
},
"schedule": "schedule_data"
}
status_result = project_agent.track(project_data)
new_state = {
**state,
"project_status": status_result,
"timestamp": datetime.now().isoformat()
}
save_state_to_cache(state_key, new_state)
return new_state
except Exception as e:
logger.error(f"Error in project tracking: {str(e)}")
return {**state, "error": str(e)}
# Reporting Node
def generate_report(state: AgentState) -> AgentState:
try:
if "error" in state:
return state
state_key = f"report_{hash(json.dumps(state, sort_keys=True))}"
cached_state = get_cached_state(state_key)
if cached_state:
return cached_state
report_data = {
"market_analysis": state["market_analysis"],
"risk_scoring": state["risk_scoring"],
"project_status": state["project_status"]
}
report_result = reporting_agent.generate_report(report_data)
# Calculate metrics
metrics = {
"risk_score": report_result.get("risk_assessment", {}).get("overall_risk_score", 0),
"project_health": report_result.get("project_status", {}).get("overall_status", "UNKNOWN"),
"market_sentiment": report_result.get("market_analysis", {}).get("market_sentiment", "NEUTRAL")
}
new_state = {
**state,
"final_report": report_result,
"metrics": metrics,
"timestamp": datetime.now().isoformat()
}
save_state_to_cache(state_key, new_state)
return new_state
except Exception as e:
logger.error(f"Error in report generation: {str(e)}")
return {**state, "error": str(e)}
# Error Handling Node
def handle_error(state: AgentState) -> AgentState:
if "error" in state:
logger.error(f"Workflow error: {state['error']}")
return {
**state,
"final_report": {
"error": state["error"],
"status": "ERROR",
"message": "An error occurred during processing",
"timestamp": datetime.now().isoformat()
}
}
return state
# Add nodes to the workflow
workflow.add_node("analyze_market", analyze_market)
workflow.add_node("score_risk", score_risk)
workflow.add_node("track_project", track_project)
workflow.add_node("generate_report", generate_report)
workflow.add_node("handle_error", handle_error)
# Define edges
workflow.add_edge("analyze_market", "score_risk")
workflow.add_edge("score_risk", "track_project")
workflow.add_edge("track_project", "generate_report")
workflow.add_edge("generate_report", "handle_error")
# Add conditional edges for error handling
workflow.add_conditional_edges(
"analyze_market",
lambda x: "error" in x,
{
True: "handle_error",
False: "score_risk"
}
)
workflow.add_conditional_edges(
"score_risk",
lambda x: "error" in x,
{
True: "handle_error",
False: "track_project"
}
)
workflow.add_conditional_edges(
"track_project",
lambda x: "error" in x,
{
True: "handle_error",
False: "generate_report"
}
)
workflow.add_conditional_edges(
"generate_report",
lambda x: "error" in x,
{
True: "handle_error",
False: "handle_error"
}
)
# Set entry point
workflow.set_entry_point("analyze_market")
# Compile the workflow
return workflow.compile()
# Create the workflow instance
workflow = create_workflow()