# Agents Architecture Documentation ## Table of Contents - [Overview](#overview) - [Agent Architecture](#agent-architecture) - [Existing Agents](#existing-agents) - [How Agents Work](#how-agents-work) - [Adding New Agents](#adding-new-agents) - [Modifying Existing Agents](#modifying-existing-agents) - [Configuration System](#configuration-system) - [Best Practices](#best-practices) - [Troubleshooting](#troubleshooting) ## Overview The agent system in this project is built on a modular, extensible architecture that processes social media comments through a series of specialized agents. Each agent performs a specific task (language detection, translation, sentiment analysis) and is orchestrated through a LangGraph workflow. ### Key Design Principles 1. **Modularity**: Each agent handles a single responsibility 2. **Extensibility**: Easy to add new agents without modifying existing code 3. **Consistency**: All agents inherit from a common base class 4. **Configuration-Driven**: Agent behavior controlled through JSON config files 5. **Error Resilience**: Robust error handling at every level ### Technology Stack - **LangChain**: For LLM interactions and agent framework - **LangGraph**: For workflow orchestration - **OpenAI API**: LLM backend for NLP tasks - **Lingua**: Fast language detection library - **Python 3.x**: Core language ## Agent Architecture ### Directory Structure ``` agents/ ├── __init__.py # Module exports ├── base_agent.py # Abstract base class ├── language_detection_agent.py # Language detection agent ├── translation_agent.py # Translation agent ├── sentiment_analysis_agent.py # Sentiment analysis agent └── README.md # This file ``` ### Base Agent Class All agents inherit from `BaseAgent` (`base_agent.py`), which provides: ```python class BaseAgent(ABC): """Abstract base class for all agents""" # Common attributes - name: str # Agent name - config: Dict[str, Any] # Configuration dictionary - model: str # LLM model to use - temperature: float # LLM temperature - max_retries: int # Maximum retry attempts # Abstract methods (must be implemented) @abstractmethod def process(input_data: Dict) -> Dict @abstractmethod def validate_input(input_data: Dict) -> bool # Common methods (inherited) def get_name() -> str def get_config() -> Dict def log_processing(message: str, level: str) def handle_error(error: Exception, context: str) -> Dict ``` ### Workflow Integration Agents are orchestrated through `workflow/comment_processor.py` using LangGraph: ``` ┌─────────────────────┐ │ Language Detection │ │ Agent │ └──────────┬──────────┘ │ ▼ ┌────┴────┐ │ English?│ └────┬────┘ │ ┌─────┴─────┐ │ │ Yes No │ │ │ ▼ │ ┌─────────────┐ │ │ Translation │ │ │ Agent │ │ └──────┬──────┘ │ │ └─────┬─────┘ │ ▼ ┌──────────────────┐ │ Sentiment │ │ Analysis Agent │ └──────────────────┘ ``` ## Existing Agents ### 1. Language Detection Agent **File**: `language_detection_agent.py` **Purpose**: Detects the language of comment text using a hybrid approach. **Strategy**: - Uses **Lingua library** for fast English detection - Falls back to **LLM** for non-English languages (higher accuracy) - Returns language name, ISO code, and confidence level **Key Methods**: ```python def detect_with_lingua(text: str) -> tuple[str, str, bool] # Fast detection using lingua library # Returns: (language_code, language_name, is_english) def detect_with_llm(text: str) -> Dict[str, Any] # LLM-based detection for nuanced analysis # Returns: {language, language_code, confidence, has_text} def process(input_data: Dict) -> Dict # Main processing: lingua first, LLM if not English ``` **Configuration** (`sentiment_config.json`): ```json { "language_detection": { "model": "gpt-5-nano", "temperature": 0.0, "max_retries": 3 } } ``` **Input Requirements**: - `comment_text`: str **Output**: - `language`: str (e.g., "English", "Spanish") - `language_code`: str (ISO 639-1, e.g., "en", "es") - `is_english`: bool - `confidence`: str ("high", "medium", "low") - `detection_method`: str ("lingua", "llm", "default") - `has_text`: bool ### 2. Translation Agent **File**: `translation_agent.py` **Purpose**: Translates non-English comments to English using LLM. **Strategy**: - Skips translation if already English - Uses LLM for context-aware, high-quality translation - Preserves tone, intent, emojis, and special characters - Specialized for music/education social media content **Key Methods**: ```python def translate_text(text: str, source_language: str) -> Dict # LLM-based translation with context preservation # Returns: {translated_text, translation_confidence, notes} def process(input_data: Dict) -> Dict # Main processing: checks is_english, translates if needed ``` **Configuration**: ```json { "translation": { "model": "gpt-5-nano", "temperature": 0.3, "max_retries": 3 } } ``` **Input Requirements**: - `comment_text`: str - `is_english`: bool - `language`: str (optional, for context) **Output**: - `translated_text`: str - `translation_performed`: bool - `translation_confidence`: str - `translation_notes`: str ### 3. Sentiment Analysis Agent **File**: `sentiment_analysis_agent.py` **Purpose**: Analyzes sentiment polarity, intent, and determines if reply is needed. **Strategy**: - Uses content description for context - Supports parent comment context for reply analysis - Multi-label intent classification - Differentiates genuine vs rhetorical/sarcastic questions - Platform-aware analysis (YouTube, Facebook, Instagram) **Key Features**: - **Context-Aware**: Uses content description and parent comment - **Multi-Label**: Can assign multiple intents to a single comment - **Reply Policy**: Flags comments requiring responses - **Rhetorical Detection**: Identifies sarcastic/rhetorical questions **Key Methods**: ```python def _build_context_string( content_description: str, parent_comment_text: str = None, platform: str = None, content_title: str = None ) -> str # Builds context for LLM prompt # Handles YouTube title+description vs other platforms def analyze_sentiment( comment_text: str, content_description: str, parent_comment_text: str = None, platform: str = None, content_title: str = None ) -> Dict # Performs sentiment analysis with full context # Returns: {sentiment_polarity, intent, requires_reply, confidence, analysis_notes} def process(input_data: Dict) -> Dict # Main processing: validates input, analyzes sentiment ``` **Configuration**: Uses two config files: 1. **Agent Config** (`sentiment_config.json`): ```json { "sentiment_analysis": { "model": "gpt-5-nano", "temperature": 0.2, "max_retries": 3 } } ``` 2. **Categories Config** (`sentiment_analysis_config.json`): ```json { "sentiment_polarity": { "categories": [ {"value": "very_positive", "label": "Very Positive", "description": "..."}, {"value": "positive", "label": "Positive", "description": "..."}, {"value": "neutral", "label": "Neutral", "description": "..."}, {"value": "negative", "label": "Negative", "description": "..."}, {"value": "very_negative", "label": "Very Negative", "description": "..."} ] }, "intent": { "categories": [ {"value": "praise", "description": "..."}, {"value": "question", "description": "..."}, {"value": "request", "description": "..."}, {"value": "feedback_negative", "description": "..."}, {"value": "suggestion", "description": "..."}, {"value": "humor_sarcasm", "description": "..."}, {"value": "off_topic", "description": "..."}, {"value": "spam_selfpromo", "description": "..."} ] }, "reply_policy": { "requires_reply_intents": ["question", "request"], "not_include": ["humor_sarcasm"] } } ``` **Input Requirements**: - `comment_text`: str - `content_description`: str - `parent_comment_text`: str (optional) - `platform`: str (optional, e.g., "youtube", "facebook") - `content_title`: str (optional, mainly for YouTube) **Output**: - `sentiment_polarity`: str (one of: very_positive, positive, neutral, negative, very_negative) - `intent`: str (comma-separated list, e.g., "praise, question") - `requires_reply`: bool - `sentiment_confidence`: str ("high", "medium", "low") - `analysis_notes`: str (1-2 sentence summary) - `success`: bool (False if critical fields missing) ### Common Patterns Across All Agents 1. **JSON Response Parsing**: All agents have `_parse_llm_json_response()` method to handle markdown-wrapped JSON 2. **Error Handling**: All use `handle_error()` from base class 3. **Logging**: All use `log_processing()` for consistent logging 4. **Validation**: All implement `validate_input()` before processing 5. **State Preservation**: All preserve original input data in output ## How Agents Work ### Workflow Execution Flow 1. **Initialization** (`CommentProcessingWorkflow.__init__`): ```python # Load configurations lang_detect_config = config["agents"]["language_detection"] translation_config = config["agents"]["translation"] sentiment_config = config["agents"]["sentiment_analysis"] # Initialize agents self.language_agent = LanguageDetectionAgent(lang_detect_config, api_key) self.translation_agent = TranslationAgent(translation_config, api_key) self.sentiment_agent = SentimentAnalysisAgent(sentiment_config, api_key, sentiment_categories) # Build workflow graph self.workflow = self._build_workflow() ``` 2. **Workflow Graph** (`_build_workflow()`): ```python workflow = StateGraph(CommentState) # Add nodes (agents) workflow.add_node("language_detection", self._language_detection_node) workflow.add_node("translation", self._translation_node) workflow.add_node("sentiment_analysis", self._sentiment_analysis_node) # Define edges (control flow) workflow.set_entry_point("language_detection") workflow.add_conditional_edges( "language_detection", self._should_translate, {"translate": "translation", "skip_translation": "sentiment_analysis"} ) workflow.add_edge("translation", "sentiment_analysis") workflow.add_edge("sentiment_analysis", END) return workflow.compile() ``` 3. **Node Execution** (Example: `_language_detection_node`): ```python def _language_detection_node(self, state: CommentState) -> CommentState: try: # Prepare input input_data = {"comment_text": state["comment_text"]} # Process with agent result = self.language_agent.process(input_data) # Update state if result.get("success", False): state["language"] = result.get("language", "English") state["language_code"] = result.get("language_code", "en") state["is_english"] = result.get("is_english", True) # ... more fields else: # Handle error, set defaults state["processing_errors"].append(result.get("error")) return state except Exception as e: # Error handling state["processing_errors"].append(str(e)) return state ``` 4. **Decision Points** (Example: `_should_translate`): ```python def _should_translate(self, state: CommentState) -> str: if state.get("is_english", True) or not state.get("has_text", True): # Set defaults for skipped translation state["translated_text"] = state["comment_text"] state["translation_performed"] = False return "skip_translation" else: return "translate" ``` 5. **Comment Processing** (`process_comment()`): ```python def process_comment(self, comment_data: Dict) -> Dict: # Initialize state initial_state = { "comment_sk": comment_data.get("comment_sk"), "comment_text": comment_data.get("comment_text"), # ... all fields "processing_errors": [], "success": True } # Run workflow final_state = self.workflow.invoke(initial_state) # Merge and return return dict(final_state) ``` ### State Management The workflow uses a `CommentState` TypedDict to pass data between agents: ```python class CommentState(TypedDict): # Input fields comment_sk: int comment_id: str comment_text: str # ... more fields # Processing fields (populated by agents) language: str language_code: str is_english: bool translated_text: str sentiment_polarity: str intent: str # ... more fields # Metadata processing_errors: Annotated[List[str], operator.add] success: bool ``` ### Error Handling Strategy 1. **Agent Level**: Each agent returns `{"success": False, "error": "..."}` on failure 2. **Node Level**: Nodes catch exceptions, set defaults, append to `processing_errors` 3. **Workflow Level**: Workflow continues even if an agent fails (graceful degradation) 4. **Critical Failures**: Sentiment agent marks `success=False` if critical fields missing (comment not stored) ## Adding New Agents ### Step-by-Step Guide #### Step 1: Create the Agent Class Create a new file in the `agents/` directory (e.g., `topic_classification_agent.py`): ```python """ Topic Classification Agent Extracts topics and themes from comments """ from typing import Dict, Any import json from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, SystemMessage from agents.base_agent import BaseAgent import logging logger = logging.getLogger(__name__) class TopicClassificationAgent(BaseAgent): """ Agent that classifies comments into predefined topics/themes. """ def __init__(self, config: Dict[str, Any], api_key: str, topic_categories: Dict[str, Any]): """ Initialize the Topic Classification Agent. Args: config: Configuration dictionary api_key: OpenAI API key topic_categories: Dictionary with topic categories """ super().__init__("TopicClassificationAgent", config) self.api_key = api_key self.topic_categories = topic_categories self.llm = ChatOpenAI( model=self.model, temperature=self.temperature, api_key=self.api_key ) def validate_input(self, input_data: Dict[str, Any]) -> bool: """ Validate that input contains required fields. Args: input_data: Input dictionary Returns: True if valid, False otherwise """ required_fields = ["comment_text"] return all(field in input_data for field in required_fields) def classify_topics(self, comment_text: str) -> Dict[str, Any]: """ Classify comment into topics using LLM. Args: comment_text: The comment text to analyze Returns: Dictionary with topic classification results """ # Build topic options from config topic_options = "\n".join([ f"- {cat['value']}: {cat['description']}" for cat in self.topic_categories["topics"]["categories"] ]) system_prompt = f"""You are an expert at classifying music-related comments into topics. Available Topics: {topic_options} Return your response in JSON format with the following fields: - topics: array of topic values (multi-label, can have multiple topics) - confidence: your confidence level (high, medium, low) - reasoning: brief explanation of your classification """ user_prompt = f"""Classify this comment into relevant topics: Comment: "{comment_text}" Return JSON only.""" try: messages = [ SystemMessage(content=system_prompt), HumanMessage(content=user_prompt) ] response = self.llm.invoke(messages) result = self._parse_llm_json_response(response.content) topics = result.get("topics", []) if isinstance(topics, str): topics = [topics] topic_str = ", ".join(topics) if topics else None return { "success": True, "topics": topic_str, "topic_confidence": result.get("confidence", "medium"), "topic_reasoning": result.get("reasoning", "") } except json.JSONDecodeError as e: self.log_processing(f"JSON decode error: {str(e)}", "warning") return { "success": False, "error": str(e) } except Exception as e: self.log_processing(f"Topic classification failed: {str(e)}", "error") return { "success": False, "error": str(e) } def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """ Process comment and extract topics. Args: input_data: Dictionary containing comment data Returns: Dictionary with topic classification results """ try: # Validate input if not self.validate_input(input_data): return { "success": False, "error": "Invalid input: missing required fields" } comment_text = input_data["comment_text"] self.log_processing(f"Classifying topics for comment", "debug") # Perform classification classification_result = self.classify_topics(comment_text) result = { "success": classification_result.get("success", False), "topics": classification_result.get("topics"), "topic_confidence": classification_result.get("topic_confidence"), "topic_reasoning": classification_result.get("topic_reasoning", "") } if "error" in classification_result: result["topic_error"] = classification_result["error"] # Preserve all original data for key, value in input_data.items(): if key not in result: result[key] = value return result except Exception as e: return self.handle_error(e, "topic_classification") def _parse_llm_json_response(self, response_content: str) -> Dict[str, Any]: """ Parse LLM response that may contain JSON wrapped in markdown code blocks. Args: response_content: Raw response content from LLM Returns: Parsed JSON dictionary Raises: json.JSONDecodeError: If JSON cannot be parsed """ content = response_content.strip() # Check if response is wrapped in markdown code block if content.startswith("```json"): content = content[7:] if content.endswith("```"): content = content[:-3] content = content.strip() elif content.startswith("```"): content = content[3:] if content.endswith("```"): content = content[:-3] content = content.strip() return json.loads(content) ``` #### Step 2: Update `__init__.py` Add your agent to `agents/__init__.py`: ```python """ Agents module for the sentiment analysis workflow. Provides modular, extensible agents for various NLP tasks. """ from agents.base_agent import BaseAgent from agents.language_detection_agent import LanguageDetectionAgent from agents.translation_agent import TranslationAgent from agents.sentiment_analysis_agent import SentimentAnalysisAgent from agents.topic_classification_agent import TopicClassificationAgent # ADD THIS __all__ = [ "BaseAgent", "LanguageDetectionAgent", "TranslationAgent", "SentimentAnalysisAgent", "TopicClassificationAgent" # ADD THIS ] ``` #### Step 3: Update Configuration Files Add agent configuration to `config_files/sentiment_config.json`: ```json { "agents": { "language_detection": { ... }, "translation": { ... }, "sentiment_analysis": { ... }, "topic_classification": { "name": "TopicClassificationAgent", "model": "gpt-5-nano", "temperature": 0.2, "max_retries": 3, "description": "Classifies comments into topic categories" } } } ``` Create topic categories config (or add to existing `sentiment_analysis_config.json`): ```json { "topics": { "categories": [ { "value": "technique", "label": "Technique", "description": "Playing technique, finger positioning, hand coordination" }, { "value": "theory", "label": "Music Theory", "description": "Scales, chords, harmony, composition theory" }, { "value": "equipment", "label": "Equipment", "description": "Instruments, gear, accessories, software" }, { "value": "performance", "label": "Performance", "description": "Stage presence, live playing, performance anxiety" }, { "value": "practice", "label": "Practice", "description": "Practice routines, discipline, improvement tips" } ] } } ``` #### Step 4: Update Workflow State Add fields to `CommentState` in `workflow/comment_processor.py`: ```python class CommentState(TypedDict): # ... existing fields ... # Topic classification fields topics: str topic_confidence: str topic_reasoning: str ``` #### Step 5: Add Workflow Node Add the node method to `CommentProcessingWorkflow` class: ```python def _topic_classification_node(self, state: CommentState) -> CommentState: """ Node for topic classification. Args: state: Current workflow state Returns: Updated state with topic classification results """ try: # Prepare input input_data = { "comment_text": state.get("translated_text", state["comment_text"]) } # Process with topic classification agent result = self.topic_agent.process(input_data) # Update state if result.get("success", False): state["topics"] = result.get("topics") state["topic_confidence"] = result.get("topic_confidence") state["topic_reasoning"] = result.get("topic_reasoning", "") else: error_msg = f"Topic classification failed: {result.get('error', 'Unknown error')}" state["processing_errors"] = state.get("processing_errors", []) + [error_msg] state["topics"] = None state["topic_confidence"] = None state["topic_reasoning"] = "Topic classification failed" logger.debug(f"Topics: {state['topics']}") return state except Exception as e: error_msg = f"Topic classification node error: {str(e)}" logger.error(error_msg) state["processing_errors"] = state.get("processing_errors", []) + [error_msg] state["topics"] = None state["topic_confidence"] = None state["topic_reasoning"] = "Error during topic classification" return state ``` #### Step 6: Initialize Agent in Workflow Update `__init__` method: ```python def __init__(self, config: Dict[str, Any], api_key: str): # ... existing initialization ... # Load topic categories topic_categories_path = config.get("topic_categories_config", "config_files/topic_categories.json") with open(topic_categories_path, 'r') as f: topic_categories = json.load(f) # Initialize topic agent topic_config = config["agents"]["topic_classification"] self.topic_agent = TopicClassificationAgent(topic_config, api_key, topic_categories) ``` #### Step 7: Update Workflow Graph Modify `_build_workflow()`: ```python def _build_workflow(self) -> StateGraph: workflow = StateGraph(CommentState) # Add nodes workflow.add_node("language_detection", self._language_detection_node) workflow.add_node("translation", self._translation_node) workflow.add_node("sentiment_analysis", self._sentiment_analysis_node) workflow.add_node("topic_classification", self._topic_classification_node) # ADD THIS # Define edges workflow.set_entry_point("language_detection") workflow.add_conditional_edges( "language_detection", self._should_translate, {"translate": "translation", "skip_translation": "sentiment_analysis"} ) workflow.add_edge("translation", "sentiment_analysis") workflow.add_edge("sentiment_analysis", "topic_classification") # ADD THIS workflow.add_edge("topic_classification", END) # MODIFY THIS return workflow.compile() ``` #### Step 8: Update Database Schema Add columns to your Snowflake table: ```sql ALTER TABLE COMMENT_SENTIMENT_FEATURES ADD COLUMN TOPICS VARCHAR(500), ADD COLUMN TOPIC_CONFIDENCE VARCHAR(20), ADD COLUMN TOPIC_REASONING VARCHAR(1000); ``` #### Step 9: Test Your Agent Test with a small batch first: ```bash python main.py --limit 10 --sequential ``` Check logs for any errors and verify output in Snowflake. ### Quick Checklist for Adding New Agents - [ ] Create agent class inheriting from `BaseAgent` - [ ] Implement `validate_input()` method - [ ] Implement `process()` method - [ ] Implement `_parse_llm_json_response()` if using LLM - [ ] Add agent to `agents/__init__.py` - [ ] Add configuration to `sentiment_config.json` - [ ] Create/update category config file if needed - [ ] Add fields to `CommentState` TypedDict - [ ] Create node method in `CommentProcessingWorkflow` - [ ] Initialize agent in `__init__` - [ ] Add node to workflow graph - [ ] Update edges in workflow - [ ] Update database schema - [ ] Test with small batch ## Modifying Existing Agents ### Common Modifications #### 1. Change LLM Model Update `config_files/sentiment_config.json`: ```json { "agents": { "sentiment_analysis": { "model": "gpt-4o", // Change from gpt-5-nano "temperature": 0.2, "max_retries": 3 } } } ``` No code changes needed! Configuration is loaded dynamically. #### 2. Add New Sentiment Category Update `config_files/sentiment_analysis_config.json`: ```json { "sentiment_polarity": { "categories": [ // ... existing categories ... { "value": "mixed", "label": "Mixed", "description": "Contains both positive and negative elements" } ] } } ``` The agent will automatically include this in prompts. No code changes needed. #### 3. Add New Intent Category Update `config_files/sentiment_analysis_config.json`: ```json { "intent": { "categories": [ // ... existing categories ... { "value": "collaboration", "label": "Collaboration", "description": "Seeking or offering collaboration opportunities" } ] } } ``` #### 4. Modify Reply Policy Update `config_files/sentiment_analysis_config.json`: ```json { "reply_policy": { "requires_reply_intents": ["question", "request", "feedback_negative"], // Added feedback_negative "not_include": ["humor_sarcasm", "spam_selfpromo"] // Added spam_selfpromo } } ``` #### 5. Adjust Temperature for Better Results If getting inconsistent results, adjust temperature: ```json { "agents": { "sentiment_analysis": { "model": "gpt-5-nano", "temperature": 0.1, // Lower = more consistent, less creative "max_retries": 3 } } } ``` #### 6. Add Context to Sentiment Analysis Modify `_build_context_string()` in `sentiment_analysis_agent.py`: ```python def _build_context_string(self, content_description: str, parent_comment_text: str = None, platform: str = None, content_title: str = None, channel_name: str = None) -> str: # ADD channel_name """Build context string for sentiment analysis.""" context_parts = [] # ... existing code ... # ADD THIS if channel_name: context_parts.append(f"Channel: {channel_name}") return "\n".join(context_parts) ``` Then update the `analyze_sentiment()` method to accept and pass `channel_name`. #### 7. Improve Language Detection Accuracy Modify `language_detection_agent.py` to add more languages to LINGUA_TO_ISO: ```python LINGUA_TO_ISO = { # ... existing mappings ... Language.VIETNAMESE: "vi", Language.THAI: "th", Language.INDONESIAN: "id", # Add more as needed } ``` #### 8. Customize Translation Prompt Modify `translate_text()` in `translation_agent.py`: ```python system_prompt = """You are a professional translator specializing in social media content related to music and education. Translate the given text from the source language to English. The text is a comment on a musical content. Preserve the tone, intent, and any emojis or special characters. For informal social media language, maintain the casual tone in translation. // ADD THESE GUIDELINES: Special Instructions: - Preserve musical terminology (e.g., "legato", "staccato") untranslated - Translate instrument names (e.g., "guitarra" → "guitar") - Keep artist names and brand names in original language - Maintain slang and colloquialisms when possible Return your response in JSON format with the following fields: - translated_text: The English translation - translation_confidence: Your confidence level (high, medium, low) - notes: Any important notes about the translation (optional) """ ``` #### 9. Add Retry Logic for Failed Analyses Modify `process()` in `sentiment_analysis_agent.py`: ```python def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: try: # ... existing validation code ... # ADD RETRY LOGIC max_attempts = self.max_retries for attempt in range(max_attempts): analysis_result = self.analyze_sentiment( comment_text, content_description, parent_comment_text, platform, content_title ) if analysis_result.get("success"): break if attempt < max_attempts - 1: self.log_processing(f"Attempt {attempt + 1} failed, retrying...", "warning") # ... rest of existing code ... ``` #### 10. Add Custom Validation Rules Modify `validate_input()` in any agent: ```python def validate_input(self, input_data: Dict[str, Any]) -> bool: """Validate that input contains required fields.""" required_fields = ["comment_text", "content_description"] # Check required fields exist if not all(field in input_data for field in required_fields): return False # ADD CUSTOM VALIDATION # Ensure comment_text is not empty or too short comment_text = input_data.get("comment_text", "") if not comment_text or len(comment_text.strip()) < 2: self.log_processing("Comment text too short or empty", "warning") return False # Ensure content_description exists content_desc = input_data.get("content_description", "") if not content_desc or content_desc.strip() == "": self.log_processing("Content description missing", "warning") return False return True ``` ### Testing Modified Agents After making modifications, always test: ```bash # Test with a small batch python main.py --limit 10 --sequential # Check specific data source python main.py --limit 10 --sequential --data-source social_media # Review logs for errors tail -f logs/comment_processing_*.log ``` ## Configuration System ### Configuration Files Overview ``` config_files/ ├── sentiment_config.json # Agent behavior config ├── sentiment_analysis_config.json # Sentiment categories and intents └── data_sources_config.json # Data source configuration ``` ### Agent Configuration Structure **File**: `sentiment_config.json` ```json { "agents": { "agent_name": { "name": "AgentClassName", "model": "gpt-5-nano", // LLM model to use "temperature": 0.0, // Creativity (0.0 = deterministic, 1.0 = creative) "max_retries": 3, // Max retry attempts "description": "What this agent does" } }, "workflow": { "parallel_processing": { "enabled": true, "worker_calculation": "CPU count - 2, max 5 workers", "min_batch_size": 20, "max_batch_size": 1000 } } } ``` ### Temperature Guidelines - **0.0 - 0.1**: Deterministic, consistent (good for classification) - **0.2 - 0.4**: Slight variation, mostly consistent (good for sentiment analysis) - **0.5 - 0.7**: Balanced creativity and consistency (good for translation) - **0.8 - 1.0**: Creative, varied (good for content generation) ### Model Selection Guidelines - **gpt-5-nano**: Fast, cheap, good for simple tasks - **gpt-4o-mini**: Balanced speed/quality, good for most tasks - **gpt-4o**: High quality, slower, good for complex analysis ### Category Configuration Structure **File**: `sentiment_analysis_config.json` ```json { "category_type": { "categories": [ { "value": "machine_readable_value", // Used in code/DB "label": "Human Readable Label", // Used in UI "description": "Detailed description for LLM prompt" } ] } } ``` ### Loading Configuration in Code ```python # In workflow/__init__ or agent __init__ import json import os # Load agent config with open('config_files/sentiment_config.json', 'r') as f: config = json.load(f) agent_config = config["agents"]["agent_name"] # Load category config with open('config_files/sentiment_analysis_config.json', 'r') as f: categories = json.load(f) sentiment_categories = categories["sentiment_polarity"]["categories"] ``` ## Best Practices ### Agent Development 1. **Single Responsibility**: Each agent should do one thing well 2. **Fail Gracefully**: Always return structured error responses 3. **Preserve Data**: Never lose original input data - pass it through 4. **Log Everything**: Use `log_processing()` for debugging 5. **Validate Early**: Check inputs before processing 6. **Configuration Over Code**: Use config files for behavior changes 7. **Test Incrementally**: Test with `--limit 10 --sequential` first ### Prompt Engineering 1. **Be Specific**: Clearly define expected output format 2. **Use Examples**: Include few-shot examples in prompts 3. **Request JSON**: Always request JSON format for structured data 4. **Handle Edge Cases**: Document edge cases in prompts 5. **Provide Context**: Give LLM all relevant context 6. **Set Constraints**: Clearly define boundaries and limitations Example of good prompt structure: ```python system_prompt = """You are an expert at [TASK]. Your task is to: 1. [Step 1] 2. [Step 2] 3. [Step 3] Context: [Explain the context] Rules: - Rule 1 - Rule 2 - Rule 3 Examples: - Input: "..." → Output: {...} - Input: "..." → Output: {...} Return your response in JSON format with the following fields: - field1: description - field2: description """ ``` ### Error Handling 1. **Try-Catch Everything**: Wrap all processing in try-catch 2. **Specific Error Messages**: Make errors actionable 3. **Graceful Degradation**: Continue workflow even if one agent fails 4. **Error Accumulation**: Collect errors in `processing_errors` list 5. **Critical vs Non-Critical**: Distinguish between recoverable and fatal errors Example: ```python def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: try: # Validate if not self.validate_input(input_data): return { "success": False, "error": "Invalid input: missing required fields", **input_data # Preserve original data } # Process result = self.do_processing(input_data) # Check result if not result.get("success"): return { "success": False, "error": result.get("error", "Unknown error"), **input_data } # Return success return { "success": True, "output_field": result["output"], **input_data } except Exception as e: return self.handle_error(e, "process") ``` ### Testing 1. **Unit Test Agents**: Test agents independently before integration 2. **Small Batches**: Always test with `--limit 10` first 3. **Sequential Mode**: Use `--sequential` for debugging 4. **Check Logs**: Review logs after every test run 5. **Validate Output**: Check Snowflake results 6. **Test Edge Cases**: Empty text, emojis only, very long text, special characters Test script example: ```python # test_agent.py from agents.sentiment_analysis_agent import SentimentAnalysisAgent import json # Load config with open('config_files/sentiment_config.json', 'r') as f: config = json.load(f) with open('config_files/sentiment_analysis_config.json', 'r') as f: categories = json.load(f) # Initialize agent agent = SentimentAnalysisAgent( config["agents"]["sentiment_analysis"], "your-api-key", categories ) # Test cases test_cases = [ {"comment_text": "This is amazing!", "content_description": "Guitar tutorial"}, {"comment_text": "😊😊😊", "content_description": "Piano cover"}, {"comment_text": "What scale is this?", "content_description": "Blues solo"}, ] for test in test_cases: result = agent.process(test) print(f"Input: {test['comment_text']}") print(f"Result: {result}") print("---") ``` ### Performance Optimization 1. **Batch Processing**: Process comments in batches (handled by workflow) 2. **Parallel Workers**: Use multiprocessing for large batches 3. **Minimize LLM Calls**: Cache results when possible 4. **Optimize Prompts**: Shorter prompts = faster responses 5. **Choose Right Model**: Use gpt-5-nano for simple tasks ### Code Organization 1. **One Agent Per File**: Don't combine multiple agents 2. **Helper Methods**: Use private methods (\_method\_name) for internal logic 3. **Type Hints**: Always use type hints for parameters and returns 4. **Docstrings**: Document all public methods 5. **Constants**: Define constants at class level Example structure: ```python class MyAgent(BaseAgent): # Constants DEFAULT_VALUE = "default" MAX_LENGTH = 1000 def __init__(self, config, api_key): """Initialize agent.""" super().__init__("MyAgent", config) # ... initialization def validate_input(self, input_data: Dict[str, Any]) -> bool: """Validate input data.""" # ... validation def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """Main processing method.""" # ... processing def _helper_method(self, data: str) -> str: """Private helper method.""" # ... helper logic def _parse_llm_json_response(self, response: str) -> Dict[str, Any]: """Parse LLM JSON response.""" # ... parsing ``` ## Troubleshooting ### Common Issues #### Issue 1: Agent Returns Empty Results **Symptoms**: Agent succeeds but returns None or empty strings for key fields **Causes**: - LLM not following JSON format - JSON parsing failing silently - Missing fields in LLM response **Solutions**: 1. Check logs for JSON parsing warnings 2. Add validation after LLM call: ```python result = self._parse_llm_json_response(response.content) # Validate result if not result.get("sentiment_polarity"): return { "success": False, "error": "Missing sentiment_polarity in LLM response" } ``` 3. Improve prompt to be more specific about required fields 4. Add examples to prompt showing exact JSON structure #### Issue 2: JSON Parsing Errors **Symptoms**: `JSON decode error` in logs **Causes**: - LLM returns markdown-wrapped JSON - LLM includes explanatory text before/after JSON - Malformed JSON from LLM **Solutions**: 1. Use `_parse_llm_json_response()` helper (already handles markdown) 2. Add more explicit prompt: ```python user_prompt = """... Return ONLY valid JSON, no explanation or markdown. Just the raw JSON object. """ ``` 3. Add fallback parsing: ```python try: result = json.loads(content) except json.JSONDecodeError: # Try to extract JSON from text import re json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: result = json.loads(json_match.group()) else: raise ``` #### Issue 3: Inconsistent Results **Symptoms**: Same comment gets different classifications on reruns **Causes**: - Temperature too high - Prompt too vague - Model inconsistency **Solutions**: 1. Lower temperature to 0.0 - 0.2 for classification tasks 2. Make prompt more specific and rule-based 3. Add examples to prompt 4. Use a more consistent model (gpt-5-nano vs gpt-4o) #### Issue 4: Agent Too Slow **Symptoms**: Processing takes very long **Causes**: - Large LLM model - Complex prompts - Sequential processing - API rate limits **Solutions**: 1. Use faster model (gpt-5-nano instead of gpt-4o) 2. Simplify prompt (shorter = faster) 3. Enable parallel processing (already default) 4. Increase batch size (if not hitting rate limits) 5. Consider caching repeated analyses #### Issue 5: Agent Failing Validation **Symptoms**: `validate_input()` returns False, agent skips processing **Causes**: - Missing required fields in input - Empty or None values - Wrong data types **Solutions**: 1. Check workflow node - ensure all required fields passed: ```python input_data = { "comment_text": state.get("translated_text", state["comment_text"]), "content_description": state["content_description"], # Add all required fields } ``` 2. Add logging to validation: ```python def validate_input(self, input_data: Dict[str, Any]) -> bool: for field in required_fields: if field not in input_data: self.log_processing(f"Missing field: {field}", "error") return False return True ``` #### Issue 6: Workflow Not Running New Agent **Symptoms**: New agent not being called, no logs from new agent **Causes**: - Forgot to add node to workflow graph - Forgot to initialize agent - Workflow edges not connected **Solutions**: 1. Verify agent initialized in `__init__`: ```python self.new_agent = NewAgent(config, api_key) ``` 2. Verify node added: ```python workflow.add_node("new_agent", self._new_agent_node) ``` 3. Verify edges: ```python workflow.add_edge("previous_agent", "new_agent") workflow.add_edge("new_agent", END) ``` 4. Check for exceptions in workflow compilation #### Issue 7: Database Insert Fails **Symptoms**: Processing succeeds but data not in Snowflake **Causes**: - Missing columns in database - Data type mismatch - Field name mismatch **Solutions**: 1. Check column exists: ```sql DESC TABLE COMMENT_SENTIMENT_FEATURES; ``` 2. Add column if missing: ```sql ALTER TABLE COMMENT_SENTIMENT_FEATURES ADD COLUMN NEW_FIELD VARCHAR(500); ``` 3. Check field names match exactly (case-sensitive) 4. Check main.py result_df construction includes new fields ### Debugging Tips 1. **Enable Debug Logging**: Set log level to DEBUG in main.py 2. **Print State**: Add print statements in workflow nodes to see state 3. **Test Agent Directly**: Test agent outside workflow first 4. **Use Sequential Mode**: `--sequential` flag for clearer debugging 5. **Check API Logs**: Review OpenAI API dashboard for errors 6. **Validate JSON**: Use online JSON validator for config files 7. **Check Git Status**: Ensure all files saved and changes committed ### Getting Help 1. **Check Logs**: Always check `logs/` directory first 2. **Review This README**: Answers to most questions are here 3. **Test Incrementally**: Isolate the problem to one agent 4. **Use Small Batches**: Test with `--limit 5` for faster iteration 5. **Document Issues**: Keep notes on what you tried ## Conclusion This agent architecture provides a flexible, maintainable foundation for processing social media comments. Key takeaways: - **Base class pattern** ensures consistency - **LangGraph workflow** enables flexible orchestration - **Configuration-driven** design minimizes code changes - **Error resilience** at every level - **Extensible by design** - easy to add new agents For questions or issues, refer to the main project README or review the existing agent implementations for patterns and examples. --- **Last Updated**: 2026-01-15 **Version**: 1.0 **Maintainer**: Musora Development Team