ForesightSphere / src /agent.py
syaikhipin's picture
Create agent.py
42c81e2 verified
import os
import sys
from typing import TypedDict, Annotated, List
import json
from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import config
# Define the state for our graph
class AgentState(TypedDict):
narrative_content: str
initial_analysis: dict
scenarios: List[dict]
thematic_prediction: str
error: str
class LangGraphAgent:
def __init__(self):
self.llm = ChatGoogleGenerativeAI(model=config.GEMINI_MODEL, google_api_key=config.GEMINI_API_KEY, temperature=0.7)
self.graph = self._build_graph()
def _build_graph(self):
workflow = StateGraph(AgentState)
# Define the nodes
workflow.add_node("analyze_narrative", self.analyze_narrative)
workflow.add_node("generate_scenarios", self.generate_scenarios)
workflow.add_node("refine_theme", self.refine_theme)
workflow.add_node("handle_error", self.handle_error)
# Build the graph
workflow.set_entry_point("analyze_narrative")
workflow.add_edge("analyze_narrative", "generate_scenarios")
workflow.add_edge("generate_scenarios", "refine_theme")
workflow.add_edge("refine_theme", END)
# Add conditional edges for error handling
workflow.add_conditional_edges(
"analyze_narrative",
self.should_continue,
{
"continue": "generate_scenarios",
"error": "handle_error"
}
)
workflow.add_conditional_edges(
"generate_scenarios",
self.should_continue,
{
"continue": "refine_theme",
"error": "handle_error"
}
)
workflow.add_conditional_edges(
"refine_theme",
self.should_continue,
{
"continue": END,
"error": "handle_error"
}
)
workflow.add_edge("handle_error", END)
return workflow.compile()
def should_continue(self, state: AgentState) -> str:
return "error" if state.get("error") else "continue"
def analyze_narrative(self, state: AgentState):
try:
prompt = f"""
Analyze this content for disinformation indicators. Provide a comprehensive assessment:
Content: "{state['narrative_content']}"
Respond in JSON format with keys: "risk_score", "factual_accuracy", "emotional_manipulation", "historical_similarity", "likely_origin", "intent", "spread_prediction", "key_claims", "manipulation_tactics", "target_demographics", "geographic_focus", "confidence_level".
"""
response = self.llm.invoke([SystemMessage(content="You are a disinformation analyst."), HumanMessage(content=prompt)])
analysis = json.loads(response.content)
return {"initial_analysis": analysis}
except Exception as e:
return {"error": f"Failed to analyze narrative: {e}"}
def generate_scenarios(self, state: AgentState):
try:
prompt = f"""
Based on this analysis, generate 3-5 likely evolution scenarios for the narrative:
Analysis: {state['initial_analysis']}
Narrative: "{state['narrative_content']}"
For each scenario, provide a JSON object with keys: "scenario_name", "description", "probability", "timeline_hours", "predicted_reach", "key_events", "mitigation_strategies".
Return a JSON list of these scenario objects.
"""
response = self.llm.invoke([SystemMessage(content="You are a predictive modeler specializing in information dynamics."), HumanMessage(content=prompt)])
scenarios = json.loads(response.content)
return {"scenarios": scenarios}
except Exception as e:
return {"error": f"Failed to generate scenarios: {e}"}
def refine_theme(self, state: AgentState):
try:
prompt = f"""
Synthesize the following scenarios into a single, cohesive thematic prediction. Identify the overarching narrative theme and its likely trajectory.
Scenarios: {state['scenarios']}
Thematic Prediction should be a compelling, easy-to-understand paragraph that summarizes the core threat and its potential evolution. Focus on the 'story' that will emerge.
"""
response = self.llm.invoke([SystemMessage(content="You are a narrative strategist."), HumanMessage(content=prompt)])
return {"thematic_prediction": response.content}
except Exception as e:
return {"error": f"Failed to refine theme: {e}"}
def handle_error(self, state: AgentState):
print(f"Error in agent execution: {state['error']}")
return {}
def run(self, narrative_content: str) -> dict:
inputs = {"narrative_content": narrative_content}
final_state = self.graph.invoke(inputs)
return final_state