""" LangGraph Graph Definition Defines the review processing workflow with conditional routing """ from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from typing import Literal from langgraph_state import ReviewState, BatchState, create_initial_state from langgraph_nodes import ( stage1_classification_node, stage2_sentiment_node, stage3_finalization_node ) from stage4_batch_analysis import Stage4BatchAnalysis from database_enhanced import EnhancedDatabase # ============================================================================ # DATABASE SYNC NODES # ============================================================================ def save_stage1_to_db_node(state: ReviewState) -> dict: """Save Stage 1 results to database""" db = EnhancedDatabase() db.connect() try: stage1_data = { 'llm1_type': state['llm1_result'].get('type'), 'llm1_department': state['llm1_result'].get('department'), 'llm1_priority': state['llm1_result'].get('priority'), 'llm1_confidence': state['llm1_result'].get('confidence'), 'llm1_reasoning': state['llm1_result'].get('reasoning'), 'llm2_user_type': state['llm2_result'].get('user_type'), 'llm2_emotion': state['llm2_result'].get('emotion'), 'llm2_context': state['llm2_result'].get('context'), 'llm2_confidence': state['llm2_result'].get('confidence'), 'llm2_reasoning': state['llm2_result'].get('reasoning'), 'manager_classification': str(state['manager_result']), 'manager_reasoning': state['manager_result'].get('reasoning'), } db.update_stage1(state['review_id'], stage1_data) db.close() return {"db_stage1_saved": True} except Exception as e: db.close() errors = state.get('errors', []) errors.append(f"DB Stage 1 save error: {str(e)}") return {"errors": errors} def save_stage2_to_db_node(state: ReviewState) -> dict: """Save Stage 2 results to database""" db = EnhancedDatabase() db.connect() try: stage2_data = { 'best_sentiment': state['best_sentiment_result']['sentiment'], 'best_confidence': state['best_sentiment_result']['confidence'], 'best_prob_positive': state['best_sentiment_result']['prob_positive'], 'best_prob_neutral': state['best_sentiment_result']['prob_neutral'], 'best_prob_negative': state['best_sentiment_result']['prob_negative'], 'alt_sentiment': state['alt_sentiment_result']['sentiment'], 'alt_confidence': state['alt_sentiment_result']['confidence'], 'alt_prob_positive': state['alt_sentiment_result']['prob_positive'], 'alt_prob_neutral': state['alt_sentiment_result']['prob_neutral'], 'alt_prob_negative': state['alt_sentiment_result']['prob_negative'], 'agreement': state['sentiment_agreement'], 'layer_sentiment': state['sentiment'], } db.update_stage2(state['review_id'], stage2_data) db.close() return {"db_stage2_saved": True} except Exception as e: db.close() errors = state.get('errors', []) errors.append(f"DB Stage 2 save error: {str(e)}") return {"errors": errors} def save_stage3_to_db_node(state: ReviewState) -> dict: """Save Stage 3 results to database""" db = EnhancedDatabase() db.connect() try: stage3_data = { 'final_sentiment': state['final_sentiment'], 'confidence': state['final_confidence'], 'reasoning': state['reasoning'], 'validation_notes': state['validation_notes'], 'conflicts_found': state['conflicts_found'], 'action_recommendation': state['action_recommendation'], 'needs_human_review': state['needs_human_review'], } db.update_stage3(state['review_id'], stage3_data) db.close() return {"db_stage3_saved": True} except Exception as e: db.close() errors = state.get('errors', []) errors.append(f"DB Stage 3 save error: {str(e)}") return {"errors": errors} # ============================================================================ # STAGE 4: BATCH ANALYSIS NODE # ============================================================================ def stage4_batch_analysis_node(state: BatchState) -> dict: """ Stage 4 Node: Batch analysis Runs after all reviews are processed """ print(f"\n{'='*70}") print(f"๐Ÿ“Š STAGE 4: BATCH ANALYSIS") print(f"{'='*70}") stage4 = Stage4BatchAnalysis() # Convert ReviewState list to dict format for Stage4 reviews_for_analysis = [] for review_state in state['all_reviews']: review_dict = { 'review_id': review_state['review_id'], 'review_text': review_state['review_text'], 'rating': review_state['rating'], 'stage1_llm1_type': review_state.get('classification_type'), 'stage1_llm1_department': review_state.get('department'), 'stage1_llm1_priority': review_state.get('priority'), 'stage1_llm2_user_type': review_state.get('user_type'), 'stage1_llm2_emotion': review_state.get('emotion'), 'stage2_agreement': review_state.get('sentiment_agreement'), 'stage3_final_sentiment': review_state.get('final_sentiment'), 'stage3_needs_human_review': review_state.get('needs_human_review'), 'stage3_reasoning': review_state.get('reasoning'), 'stage3_action_recommendation': review_state.get('action_recommendation'), } reviews_for_analysis.append(review_dict) # Analyze batch insights = stage4.analyze_batch(reviews_for_analysis) # Save to database db = EnhancedDatabase() db.connect() db.save_batch_insights(insights) db.close() return { 'sentiment_distribution': insights.get('sentiment_distribution'), 'priority_distribution': insights.get('priority_distribution'), 'department_distribution': insights.get('department_distribution'), 'emotion_distribution': insights.get('emotion_distribution'), 'critical_issues': insights.get('critical_issues'), 'quick_wins': insights.get('quick_wins'), 'churn_risk': insights.get('churn_risk'), 'model_agreement_rate': insights.get('model_agreement_rate'), 'recommendations': insights.get('recommendations'), 'batch_completed_at': insights.get('batch_completed_at') } # ============================================================================ # ROUTING FUNCTIONS # ============================================================================ def route_after_stage3(state: ReviewState) -> Literal["human_review", "complete"]: """ Conditional routing after Stage 3 Decides if human review is needed """ # Check if human review needed if state.get('needs_human_review', False): return "human_review" # Check confidence threshold if state.get('final_confidence', 1.0) < 0.5: return "human_review" # Check for conflicts if state.get('conflicts_found', 'none') != 'none': return "human_review" # Check priority if state.get('priority') == 'critical': return "human_review" return "complete" def human_review_queue_node(state: ReviewState) -> dict: """ Node for reviews flagged for human review Just marks them in the database """ print(f" ๐Ÿšจ FLAGGED for human review") # Could integrate with ticketing system, email alerts, etc. # For now, just mark in state return { "route_to": "human_review" } # ============================================================================ # BUILD REVIEW PROCESSING GRAPH # ============================================================================ def build_review_graph(): """ Build the complete review processing graph """ # Create graph workflow = StateGraph(ReviewState) # Add all nodes workflow.add_node("stage1_classify", stage1_classification_node) workflow.add_node("save_stage1", save_stage1_to_db_node) workflow.add_node("stage2_sentiment", stage2_sentiment_node) workflow.add_node("save_stage2", save_stage2_to_db_node) workflow.add_node("stage3_finalize", stage3_finalization_node) workflow.add_node("save_stage3", save_stage3_to_db_node) workflow.add_node("human_review_queue", human_review_queue_node) # Add edges (sequential flow through stages) workflow.add_edge("stage1_classify", "save_stage1") workflow.add_edge("save_stage1", "stage2_sentiment") workflow.add_edge("stage2_sentiment", "save_stage2") workflow.add_edge("save_stage2", "stage3_finalize") workflow.add_edge("stage3_finalize", "save_stage3") # Add conditional routing after Stage 3 workflow.add_conditional_edges( "save_stage3", route_after_stage3, { "human_review": "human_review_queue", "complete": END } ) # Human review goes to END workflow.add_edge("human_review_queue", END) # Set entry point workflow.set_entry_point("stage1_classify") # Compile with checkpointing memory = MemorySaver() graph = workflow.compile(checkpointer=memory) return graph # ============================================================================ # BUILD BATCH ANALYSIS GRAPH (Stage 4) # ============================================================================ def build_batch_graph(): """ Build the batch analysis graph (Stage 4) This runs after all reviews are processed """ workflow = StateGraph(BatchState) # Add batch analysis node workflow.add_node("stage4_batch", stage4_batch_analysis_node) # Simple linear flow workflow.set_entry_point("stage4_batch") workflow.add_edge("stage4_batch", END) # Compile graph = workflow.compile() return graph if __name__ == "__main__": print("\n" + "="*60) print("๐Ÿงช TESTING LANGGRAPH GRAPH BUILDER") print("="*60) # Build review graph print("\n๐Ÿ“Š Building review processing graph...") review_graph = build_review_graph() print(" โœ… Review graph built!") # Build batch graph print("\n๐Ÿ“Š Building batch analysis graph...") batch_graph = build_batch_graph() print(" โœ… Batch graph built!") print("\nโœ… Graph builder test complete!")