import os import sys from typing import TypedDict, Annotated, List import json from langgraph.graph import StateGraph, END from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import HumanMessage, SystemMessage sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import config # Define the state for our graph class AgentState(TypedDict): narrative_content: str initial_analysis: dict scenarios: List[dict] thematic_prediction: str error: str class LangGraphAgent: def __init__(self): self.llm = ChatGoogleGenerativeAI(model=config.GEMINI_MODEL, google_api_key=config.GEMINI_API_KEY, temperature=0.7) self.graph = self._build_graph() def _build_graph(self): workflow = StateGraph(AgentState) # Define the nodes workflow.add_node("analyze_narrative", self.analyze_narrative) workflow.add_node("generate_scenarios", self.generate_scenarios) workflow.add_node("refine_theme", self.refine_theme) workflow.add_node("handle_error", self.handle_error) # Build the graph workflow.set_entry_point("analyze_narrative") workflow.add_edge("analyze_narrative", "generate_scenarios") workflow.add_edge("generate_scenarios", "refine_theme") workflow.add_edge("refine_theme", END) # Add conditional edges for error handling workflow.add_conditional_edges( "analyze_narrative", self.should_continue, { "continue": "generate_scenarios", "error": "handle_error" } ) workflow.add_conditional_edges( "generate_scenarios", self.should_continue, { "continue": "refine_theme", "error": "handle_error" } ) workflow.add_conditional_edges( "refine_theme", self.should_continue, { "continue": END, "error": "handle_error" } ) workflow.add_edge("handle_error", END) return workflow.compile() def should_continue(self, state: AgentState) -> str: return "error" if state.get("error") else "continue" def analyze_narrative(self, state: AgentState): try: prompt = f""" Analyze this content for disinformation indicators. Provide a comprehensive assessment: Content: "{state['narrative_content']}" Respond in JSON format with keys: "risk_score", "factual_accuracy", "emotional_manipulation", "historical_similarity", "likely_origin", "intent", "spread_prediction", "key_claims", "manipulation_tactics", "target_demographics", "geographic_focus", "confidence_level". """ response = self.llm.invoke([SystemMessage(content="You are a disinformation analyst."), HumanMessage(content=prompt)]) analysis = json.loads(response.content) return {"initial_analysis": analysis} except Exception as e: return {"error": f"Failed to analyze narrative: {e}"} def generate_scenarios(self, state: AgentState): try: prompt = f""" Based on this analysis, generate 3-5 likely evolution scenarios for the narrative: Analysis: {state['initial_analysis']} Narrative: "{state['narrative_content']}" For each scenario, provide a JSON object with keys: "scenario_name", "description", "probability", "timeline_hours", "predicted_reach", "key_events", "mitigation_strategies". Return a JSON list of these scenario objects. """ response = self.llm.invoke([SystemMessage(content="You are a predictive modeler specializing in information dynamics."), HumanMessage(content=prompt)]) scenarios = json.loads(response.content) return {"scenarios": scenarios} except Exception as e: return {"error": f"Failed to generate scenarios: {e}"} def refine_theme(self, state: AgentState): try: prompt = f""" Synthesize the following scenarios into a single, cohesive thematic prediction. Identify the overarching narrative theme and its likely trajectory. Scenarios: {state['scenarios']} Thematic Prediction should be a compelling, easy-to-understand paragraph that summarizes the core threat and its potential evolution. Focus on the 'story' that will emerge. """ response = self.llm.invoke([SystemMessage(content="You are a narrative strategist."), HumanMessage(content=prompt)]) return {"thematic_prediction": response.content} except Exception as e: return {"error": f"Failed to refine theme: {e}"} def handle_error(self, state: AgentState): print(f"Error in agent execution: {state['error']}") return {} def run(self, narrative_content: str) -> dict: inputs = {"narrative_content": narrative_content} final_state = self.graph.invoke(inputs) return final_state