Spaces:
Sleeping
Sleeping
| """ | |
| LangGraph State Schema | |
| Defines the state that flows through the graph | |
| """ | |
| from typing import TypedDict, Optional, Dict, Any, List | |
| from datetime import datetime | |
| class ReviewState(TypedDict): | |
| """ | |
| State schema for review processing graph | |
| All stages add to this state as it flows through the graph | |
| """ | |
| # Input data | |
| review: Dict[str, Any] | |
| review_id: str | |
| review_text: str | |
| rating: int | |
| # Stage 1: Classification outputs | |
| llm1_result: Optional[Dict[str, Any]] | |
| llm2_result: Optional[Dict[str, Any]] | |
| manager_result: Optional[Dict[str, Any]] | |
| # Stage 1: Extracted fields for easy access | |
| classification_type: Optional[str] | |
| department: Optional[str] | |
| priority: Optional[str] | |
| user_type: Optional[str] | |
| emotion: Optional[str] | |
| # Stage 2: Sentiment outputs | |
| best_sentiment_result: Optional[Dict[str, Any]] | |
| alt_sentiment_result: Optional[Dict[str, Any]] | |
| sentiment_layer_result: Optional[Dict[str, Any]] | |
| # Stage 2: Extracted fields | |
| sentiment: Optional[str] # POSITIVE, NEGATIVE, NEUTRAL | |
| sentiment_confidence: Optional[float] | |
| sentiment_agreement: Optional[bool] | |
| # Stage 3: Finalization outputs | |
| final_result: Optional[Dict[str, Any]] | |
| # Stage 3: Extracted fields | |
| final_sentiment: Optional[str] | |
| final_confidence: Optional[float] | |
| reasoning: Optional[str] | |
| action_recommendation: Optional[str] | |
| conflicts_found: Optional[str] | |
| validation_notes: Optional[str] | |
| # Routing decisions | |
| needs_human_review: bool | |
| route_to: Optional[str] # 'human_review', 'complete', 'batch_analysis' | |
| # Processing metadata | |
| stage1_completed: bool | |
| stage2_completed: bool | |
| stage3_completed: bool | |
| processing_started_at: Optional[str] | |
| processing_completed_at: Optional[str] | |
| # Timing information | |
| stage1_time: Optional[float] | |
| stage2_time: Optional[float] | |
| stage3_time: Optional[float] | |
| total_time: Optional[float] | |
| # Error handling | |
| errors: List[str] | |
| retry_count: int | |
| # Database sync status | |
| db_stage1_saved: bool | |
| db_stage2_saved: bool | |
| db_stage3_saved: bool | |
| class BatchState(TypedDict): | |
| """ | |
| State for batch analysis (Stage 4) | |
| Aggregates results from multiple reviews | |
| """ | |
| # Input | |
| all_reviews: List[ReviewState] | |
| total_count: int | |
| # Aggregated metrics | |
| sentiment_distribution: Optional[Dict[str, int]] | |
| priority_distribution: Optional[Dict[str, int]] | |
| department_distribution: Optional[Dict[str, int]] | |
| emotion_distribution: Optional[Dict[str, int]] | |
| # Analysis outputs | |
| critical_issues: Optional[List[Dict[str, Any]]] | |
| quick_wins: Optional[List[Dict[str, Any]]] | |
| churn_risk: Optional[float] | |
| model_agreement_rate: Optional[float] | |
| # Recommendations | |
| recommendations: Optional[List[str]] | |
| # Processing metadata | |
| batch_started_at: Optional[str] | |
| batch_completed_at: Optional[str] | |
| batch_processing_time: Optional[float] | |
| def create_initial_state(review: Dict[str, Any]) -> ReviewState: | |
| """ | |
| Create initial state for a review | |
| """ | |
| return ReviewState( | |
| # Input | |
| review=review, | |
| review_id=review.get('review_id', 'unknown'), | |
| review_text=review.get('review_text', ''), | |
| rating=review.get('rating', 3), | |
| # Stage 1 | |
| llm1_result=None, | |
| llm2_result=None, | |
| manager_result=None, | |
| classification_type=None, | |
| department=None, | |
| priority=None, | |
| user_type=None, | |
| emotion=None, | |
| # Stage 2 | |
| best_sentiment_result=None, | |
| alt_sentiment_result=None, | |
| sentiment_layer_result=None, | |
| sentiment=None, | |
| sentiment_confidence=None, | |
| sentiment_agreement=None, | |
| # Stage 3 | |
| final_result=None, | |
| final_sentiment=None, | |
| final_confidence=None, | |
| reasoning=None, | |
| action_recommendation=None, | |
| conflicts_found=None, | |
| validation_notes=None, | |
| # Routing | |
| needs_human_review=False, | |
| route_to=None, | |
| # Processing metadata | |
| stage1_completed=False, | |
| stage2_completed=False, | |
| stage3_completed=False, | |
| processing_started_at=datetime.now().isoformat(), | |
| processing_completed_at=None, | |
| # Timing | |
| stage1_time=None, | |
| stage2_time=None, | |
| stage3_time=None, | |
| total_time=None, | |
| # Errors | |
| errors=[], | |
| retry_count=0, | |
| # Database | |
| db_stage1_saved=False, | |
| db_stage2_saved=False, | |
| db_stage3_saved=False | |
| ) | |
| def create_batch_state(reviews: List[ReviewState]) -> BatchState: | |
| """ | |
| Create batch state from processed reviews | |
| """ | |
| return BatchState( | |
| all_reviews=reviews, | |
| total_count=len(reviews), | |
| sentiment_distribution=None, | |
| priority_distribution=None, | |
| department_distribution=None, | |
| emotion_distribution=None, | |
| critical_issues=None, | |
| quick_wins=None, | |
| churn_risk=None, | |
| model_agreement_rate=None, | |
| recommendations=None, | |
| batch_started_at=datetime.now().isoformat(), | |
| batch_completed_at=None, | |
| batch_processing_time=None | |
| ) | |
| if __name__ == "__main__": | |
| # Test state creation | |
| print("\n" + "="*60) | |
| print("🧪 TESTING LANGGRAPH STATE") | |
| print("="*60) | |
| test_review = { | |
| 'review_id': 'test_001', | |
| 'review_text': 'App crashes!', | |
| 'rating': 1 | |
| } | |
| state = create_initial_state(test_review) | |
| print(f"\n✅ Initial state created for: {state['review_id']}") | |
| print(f" Review text: {state['review_text']}") | |
| print(f" Stage 1 completed: {state['stage1_completed']}") | |
| print("\n✅ State schema test complete!") | |