Danialebrat's picture
Deploying sentiment analysis project
9858829

A newer version of the Streamlit SDK is available: 1.55.0

Upgrade

Agents Architecture Documentation

Table of Contents

Overview

The agent system in this project is built on a modular, extensible architecture that processes social media comments through a series of specialized agents. Each agent performs a specific task (language detection, translation, sentiment analysis) and is orchestrated through a LangGraph workflow.

Key Design Principles

  1. Modularity: Each agent handles a single responsibility
  2. Extensibility: Easy to add new agents without modifying existing code
  3. Consistency: All agents inherit from a common base class
  4. Configuration-Driven: Agent behavior controlled through JSON config files
  5. Error Resilience: Robust error handling at every level

Technology Stack

  • LangChain: For LLM interactions and agent framework
  • LangGraph: For workflow orchestration
  • OpenAI API: LLM backend for NLP tasks
  • Lingua: Fast language detection library
  • Python 3.x: Core language

Agent Architecture

Directory Structure

agents/
β”œβ”€β”€ __init__.py                      # Module exports
β”œβ”€β”€ base_agent.py                    # Abstract base class
β”œβ”€β”€ language_detection_agent.py      # Language detection agent
β”œβ”€β”€ translation_agent.py             # Translation agent
β”œβ”€β”€ sentiment_analysis_agent.py      # Sentiment analysis agent
└── README.md                        # This file

Base Agent Class

All agents inherit from BaseAgent (base_agent.py), which provides:

class BaseAgent(ABC):
    """Abstract base class for all agents"""

    # Common attributes
    - name: str                      # Agent name
    - config: Dict[str, Any]         # Configuration dictionary
    - model: str                     # LLM model to use
    - temperature: float             # LLM temperature
    - max_retries: int              # Maximum retry attempts

    # Abstract methods (must be implemented)
    @abstractmethod
    def process(input_data: Dict) -> Dict
    @abstractmethod
    def validate_input(input_data: Dict) -> bool

    # Common methods (inherited)
    def get_name() -> str
    def get_config() -> Dict
    def log_processing(message: str, level: str)
    def handle_error(error: Exception, context: str) -> Dict

Workflow Integration

Agents are orchestrated through workflow/comment_processor.py using LangGraph:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Language Detection β”‚
β”‚  Agent              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
      β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
      β”‚ English?β”‚
      β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
           β”‚
     β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”
     β”‚           β”‚
   Yes          No
     β”‚           β”‚
     β”‚           β–Ό
     β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     β”‚    β”‚ Translation β”‚
     β”‚    β”‚ Agent       β”‚
     β”‚    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
     β”‚           β”‚
     β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ Sentiment        β”‚
    β”‚ Analysis Agent   β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Existing Agents

1. Language Detection Agent

File: language_detection_agent.py

Purpose: Detects the language of comment text using a hybrid approach.

Strategy:

  • Uses Lingua library for fast English detection
  • Falls back to LLM for non-English languages (higher accuracy)
  • Returns language name, ISO code, and confidence level

Key Methods:

def detect_with_lingua(text: str) -> tuple[str, str, bool]
    # Fast detection using lingua library
    # Returns: (language_code, language_name, is_english)

def detect_with_llm(text: str) -> Dict[str, Any]
    # LLM-based detection for nuanced analysis
    # Returns: {language, language_code, confidence, has_text}

def process(input_data: Dict) -> Dict
    # Main processing: lingua first, LLM if not English

Configuration (sentiment_config.json):

{
    "language_detection": {
        "model": "gpt-5-nano",
        "temperature": 0.0,
        "max_retries": 3
    }
}

Input Requirements:

  • comment_text: str

Output:

  • language: str (e.g., "English", "Spanish")
  • language_code: str (ISO 639-1, e.g., "en", "es")
  • is_english: bool
  • confidence: str ("high", "medium", "low")
  • detection_method: str ("lingua", "llm", "default")
  • has_text: bool

2. Translation Agent

File: translation_agent.py

Purpose: Translates non-English comments to English using LLM.

Strategy:

  • Skips translation if already English
  • Uses LLM for context-aware, high-quality translation
  • Preserves tone, intent, emojis, and special characters
  • Specialized for music/education social media content

Key Methods:

def translate_text(text: str, source_language: str) -> Dict
    # LLM-based translation with context preservation
    # Returns: {translated_text, translation_confidence, notes}

def process(input_data: Dict) -> Dict
    # Main processing: checks is_english, translates if needed

Configuration:

{
    "translation": {
        "model": "gpt-5-nano",
        "temperature": 0.3,
        "max_retries": 3
    }
}

Input Requirements:

  • comment_text: str
  • is_english: bool
  • language: str (optional, for context)

Output:

  • translated_text: str
  • translation_performed: bool
  • translation_confidence: str
  • translation_notes: str

3. Sentiment Analysis Agent

File: sentiment_analysis_agent.py

Purpose: Analyzes sentiment polarity, intent, and determines if reply is needed.

Strategy:

  • Uses content description for context
  • Supports parent comment context for reply analysis
  • Multi-label intent classification
  • Differentiates genuine vs rhetorical/sarcastic questions
  • Platform-aware analysis (YouTube, Facebook, Instagram)

Key Features:

  • Context-Aware: Uses content description and parent comment
  • Multi-Label: Can assign multiple intents to a single comment
  • Reply Policy: Flags comments requiring responses
  • Rhetorical Detection: Identifies sarcastic/rhetorical questions

Key Methods:

def _build_context_string(
    content_description: str,
    parent_comment_text: str = None,
    platform: str = None,
    content_title: str = None
) -> str
    # Builds context for LLM prompt
    # Handles YouTube title+description vs other platforms

def analyze_sentiment(
    comment_text: str,
    content_description: str,
    parent_comment_text: str = None,
    platform: str = None,
    content_title: str = None
) -> Dict
    # Performs sentiment analysis with full context
    # Returns: {sentiment_polarity, intent, requires_reply, confidence, analysis_notes}

def process(input_data: Dict) -> Dict
    # Main processing: validates input, analyzes sentiment

Configuration: Uses two config files:

  1. Agent Config (sentiment_config.json):
{
    "sentiment_analysis": {
        "model": "gpt-5-nano",
        "temperature": 0.2,
        "max_retries": 3
    }
}
  1. Categories Config (sentiment_analysis_config.json):
{
    "sentiment_polarity": {
        "categories": [
            {"value": "very_positive", "label": "Very Positive", "description": "..."},
            {"value": "positive", "label": "Positive", "description": "..."},
            {"value": "neutral", "label": "Neutral", "description": "..."},
            {"value": "negative", "label": "Negative", "description": "..."},
            {"value": "very_negative", "label": "Very Negative", "description": "..."}
        ]
    },
    "intent": {
        "categories": [
            {"value": "praise", "description": "..."},
            {"value": "question", "description": "..."},
            {"value": "request", "description": "..."},
            {"value": "feedback_negative", "description": "..."},
            {"value": "suggestion", "description": "..."},
            {"value": "humor_sarcasm", "description": "..."},
            {"value": "off_topic", "description": "..."},
            {"value": "spam_selfpromo", "description": "..."}
        ]
    },
    "reply_policy": {
        "requires_reply_intents": ["question", "request"],
        "not_include": ["humor_sarcasm"]
    }
}

Input Requirements:

  • comment_text: str
  • content_description: str
  • parent_comment_text: str (optional)
  • platform: str (optional, e.g., "youtube", "facebook")
  • content_title: str (optional, mainly for YouTube)

Output:

  • sentiment_polarity: str (one of: very_positive, positive, neutral, negative, very_negative)
  • intent: str (comma-separated list, e.g., "praise, question")
  • requires_reply: bool
  • sentiment_confidence: str ("high", "medium", "low")
  • analysis_notes: str (1-2 sentence summary)
  • success: bool (False if critical fields missing)

Common Patterns Across All Agents

  1. JSON Response Parsing: All agents have _parse_llm_json_response() method to handle markdown-wrapped JSON
  2. Error Handling: All use handle_error() from base class
  3. Logging: All use log_processing() for consistent logging
  4. Validation: All implement validate_input() before processing
  5. State Preservation: All preserve original input data in output

How Agents Work

Workflow Execution Flow

  1. Initialization (CommentProcessingWorkflow.__init__):

    # Load configurations
    lang_detect_config = config["agents"]["language_detection"]
    translation_config = config["agents"]["translation"]
    sentiment_config = config["agents"]["sentiment_analysis"]
    
    # Initialize agents
    self.language_agent = LanguageDetectionAgent(lang_detect_config, api_key)
    self.translation_agent = TranslationAgent(translation_config, api_key)
    self.sentiment_agent = SentimentAnalysisAgent(sentiment_config, api_key, sentiment_categories)
    
    # Build workflow graph
    self.workflow = self._build_workflow()
    
  2. Workflow Graph (_build_workflow()):

    workflow = StateGraph(CommentState)
    
    # Add nodes (agents)
    workflow.add_node("language_detection", self._language_detection_node)
    workflow.add_node("translation", self._translation_node)
    workflow.add_node("sentiment_analysis", self._sentiment_analysis_node)
    
    # Define edges (control flow)
    workflow.set_entry_point("language_detection")
    workflow.add_conditional_edges(
        "language_detection",
        self._should_translate,
        {"translate": "translation", "skip_translation": "sentiment_analysis"}
    )
    workflow.add_edge("translation", "sentiment_analysis")
    workflow.add_edge("sentiment_analysis", END)
    
    return workflow.compile()
    
  3. Node Execution (Example: _language_detection_node):

    def _language_detection_node(self, state: CommentState) -> CommentState:
        try:
            # Prepare input
            input_data = {"comment_text": state["comment_text"]}
    
            # Process with agent
            result = self.language_agent.process(input_data)
    
            # Update state
            if result.get("success", False):
                state["language"] = result.get("language", "English")
                state["language_code"] = result.get("language_code", "en")
                state["is_english"] = result.get("is_english", True)
                # ... more fields
            else:
                # Handle error, set defaults
                state["processing_errors"].append(result.get("error"))
    
            return state
        except Exception as e:
            # Error handling
            state["processing_errors"].append(str(e))
            return state
    
  4. Decision Points (Example: _should_translate):

    def _should_translate(self, state: CommentState) -> str:
        if state.get("is_english", True) or not state.get("has_text", True):
            # Set defaults for skipped translation
            state["translated_text"] = state["comment_text"]
            state["translation_performed"] = False
            return "skip_translation"
        else:
            return "translate"
    
  5. Comment Processing (process_comment()):

    def process_comment(self, comment_data: Dict) -> Dict:
        # Initialize state
        initial_state = {
            "comment_sk": comment_data.get("comment_sk"),
            "comment_text": comment_data.get("comment_text"),
            # ... all fields
            "processing_errors": [],
            "success": True
        }
    
        # Run workflow
        final_state = self.workflow.invoke(initial_state)
    
        # Merge and return
        return dict(final_state)
    

State Management

The workflow uses a CommentState TypedDict to pass data between agents:

class CommentState(TypedDict):
    # Input fields
    comment_sk: int
    comment_id: str
    comment_text: str
    # ... more fields

    # Processing fields (populated by agents)
    language: str
    language_code: str
    is_english: bool
    translated_text: str
    sentiment_polarity: str
    intent: str
    # ... more fields

    # Metadata
    processing_errors: Annotated[List[str], operator.add]
    success: bool

Error Handling Strategy

  1. Agent Level: Each agent returns {"success": False, "error": "..."} on failure
  2. Node Level: Nodes catch exceptions, set defaults, append to processing_errors
  3. Workflow Level: Workflow continues even if an agent fails (graceful degradation)
  4. Critical Failures: Sentiment agent marks success=False if critical fields missing (comment not stored)

Adding New Agents

Step-by-Step Guide

Step 1: Create the Agent Class

Create a new file in the agents/ directory (e.g., topic_classification_agent.py):

"""
Topic Classification Agent
Extracts topics and themes from comments
"""

from typing import Dict, Any
import json
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage
from agents.base_agent import BaseAgent
import logging

logger = logging.getLogger(__name__)


class TopicClassificationAgent(BaseAgent):
    """
    Agent that classifies comments into predefined topics/themes.
    """

    def __init__(self, config: Dict[str, Any], api_key: str, topic_categories: Dict[str, Any]):
        """
        Initialize the Topic Classification Agent.

        Args:
            config: Configuration dictionary
            api_key: OpenAI API key
            topic_categories: Dictionary with topic categories
        """
        super().__init__("TopicClassificationAgent", config)
        self.api_key = api_key
        self.topic_categories = topic_categories
        self.llm = ChatOpenAI(
            model=self.model,
            temperature=self.temperature,
            api_key=self.api_key
        )

    def validate_input(self, input_data: Dict[str, Any]) -> bool:
        """
        Validate that input contains required fields.

        Args:
            input_data: Input dictionary

        Returns:
            True if valid, False otherwise
        """
        required_fields = ["comment_text"]
        return all(field in input_data for field in required_fields)

    def classify_topics(self, comment_text: str) -> Dict[str, Any]:
        """
        Classify comment into topics using LLM.

        Args:
            comment_text: The comment text to analyze

        Returns:
            Dictionary with topic classification results
        """
        # Build topic options from config
        topic_options = "\n".join([
            f"- {cat['value']}: {cat['description']}"
            for cat in self.topic_categories["topics"]["categories"]
        ])

        system_prompt = f"""You are an expert at classifying music-related comments into topics.

Available Topics:
{topic_options}

Return your response in JSON format with the following fields:
- topics: array of topic values (multi-label, can have multiple topics)
- confidence: your confidence level (high, medium, low)
- reasoning: brief explanation of your classification
"""

        user_prompt = f"""Classify this comment into relevant topics:

Comment: "{comment_text}"

Return JSON only."""

        try:
            messages = [
                SystemMessage(content=system_prompt),
                HumanMessage(content=user_prompt)
            ]

            response = self.llm.invoke(messages)
            result = self._parse_llm_json_response(response.content)

            topics = result.get("topics", [])
            if isinstance(topics, str):
                topics = [topics]

            topic_str = ", ".join(topics) if topics else None

            return {
                "success": True,
                "topics": topic_str,
                "topic_confidence": result.get("confidence", "medium"),
                "topic_reasoning": result.get("reasoning", "")
            }

        except json.JSONDecodeError as e:
            self.log_processing(f"JSON decode error: {str(e)}", "warning")
            return {
                "success": False,
                "error": str(e)
            }
        except Exception as e:
            self.log_processing(f"Topic classification failed: {str(e)}", "error")
            return {
                "success": False,
                "error": str(e)
            }

    def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process comment and extract topics.

        Args:
            input_data: Dictionary containing comment data

        Returns:
            Dictionary with topic classification results
        """
        try:
            # Validate input
            if not self.validate_input(input_data):
                return {
                    "success": False,
                    "error": "Invalid input: missing required fields"
                }

            comment_text = input_data["comment_text"]

            self.log_processing(f"Classifying topics for comment", "debug")

            # Perform classification
            classification_result = self.classify_topics(comment_text)

            result = {
                "success": classification_result.get("success", False),
                "topics": classification_result.get("topics"),
                "topic_confidence": classification_result.get("topic_confidence"),
                "topic_reasoning": classification_result.get("topic_reasoning", "")
            }

            if "error" in classification_result:
                result["topic_error"] = classification_result["error"]

            # Preserve all original data
            for key, value in input_data.items():
                if key not in result:
                    result[key] = value

            return result

        except Exception as e:
            return self.handle_error(e, "topic_classification")

    def _parse_llm_json_response(self, response_content: str) -> Dict[str, Any]:
        """
        Parse LLM response that may contain JSON wrapped in markdown code blocks.

        Args:
            response_content: Raw response content from LLM

        Returns:
            Parsed JSON dictionary

        Raises:
            json.JSONDecodeError: If JSON cannot be parsed
        """
        content = response_content.strip()

        # Check if response is wrapped in markdown code block
        if content.startswith("```json"):
            content = content[7:]
            if content.endswith("```"):
                content = content[:-3]
            content = content.strip()
        elif content.startswith("```"):
            content = content[3:]
            if content.endswith("```"):
                content = content[:-3]
            content = content.strip()

        return json.loads(content)

Step 2: Update __init__.py

Add your agent to agents/__init__.py:

"""
Agents module for the sentiment analysis workflow.
Provides modular, extensible agents for various NLP tasks.
"""

from agents.base_agent import BaseAgent
from agents.language_detection_agent import LanguageDetectionAgent
from agents.translation_agent import TranslationAgent
from agents.sentiment_analysis_agent import SentimentAnalysisAgent
from agents.topic_classification_agent import TopicClassificationAgent  # ADD THIS

__all__ = [
    "BaseAgent",
    "LanguageDetectionAgent",
    "TranslationAgent",
    "SentimentAnalysisAgent",
    "TopicClassificationAgent"  # ADD THIS
]

Step 3: Update Configuration Files

Add agent configuration to config_files/sentiment_config.json:

{
    "agents": {
        "language_detection": { ... },
        "translation": { ... },
        "sentiment_analysis": { ... },
        "topic_classification": {
            "name": "TopicClassificationAgent",
            "model": "gpt-5-nano",
            "temperature": 0.2,
            "max_retries": 3,
            "description": "Classifies comments into topic categories"
        }
    }
}

Create topic categories config (or add to existing sentiment_analysis_config.json):

{
    "topics": {
        "categories": [
            {
                "value": "technique",
                "label": "Technique",
                "description": "Playing technique, finger positioning, hand coordination"
            },
            {
                "value": "theory",
                "label": "Music Theory",
                "description": "Scales, chords, harmony, composition theory"
            },
            {
                "value": "equipment",
                "label": "Equipment",
                "description": "Instruments, gear, accessories, software"
            },
            {
                "value": "performance",
                "label": "Performance",
                "description": "Stage presence, live playing, performance anxiety"
            },
            {
                "value": "practice",
                "label": "Practice",
                "description": "Practice routines, discipline, improvement tips"
            }
        ]
    }
}

Step 4: Update Workflow State

Add fields to CommentState in workflow/comment_processor.py:

class CommentState(TypedDict):
    # ... existing fields ...

    # Topic classification fields
    topics: str
    topic_confidence: str
    topic_reasoning: str

Step 5: Add Workflow Node

Add the node method to CommentProcessingWorkflow class:

def _topic_classification_node(self, state: CommentState) -> CommentState:
    """
    Node for topic classification.

    Args:
        state: Current workflow state

    Returns:
        Updated state with topic classification results
    """
    try:
        # Prepare input
        input_data = {
            "comment_text": state.get("translated_text", state["comment_text"])
        }

        # Process with topic classification agent
        result = self.topic_agent.process(input_data)

        # Update state
        if result.get("success", False):
            state["topics"] = result.get("topics")
            state["topic_confidence"] = result.get("topic_confidence")
            state["topic_reasoning"] = result.get("topic_reasoning", "")
        else:
            error_msg = f"Topic classification failed: {result.get('error', 'Unknown error')}"
            state["processing_errors"] = state.get("processing_errors", []) + [error_msg]
            state["topics"] = None
            state["topic_confidence"] = None
            state["topic_reasoning"] = "Topic classification failed"

        logger.debug(f"Topics: {state['topics']}")
        return state

    except Exception as e:
        error_msg = f"Topic classification node error: {str(e)}"
        logger.error(error_msg)
        state["processing_errors"] = state.get("processing_errors", []) + [error_msg]
        state["topics"] = None
        state["topic_confidence"] = None
        state["topic_reasoning"] = "Error during topic classification"
        return state

Step 6: Initialize Agent in Workflow

Update __init__ method:

def __init__(self, config: Dict[str, Any], api_key: str):
    # ... existing initialization ...

    # Load topic categories
    topic_categories_path = config.get("topic_categories_config", "config_files/topic_categories.json")
    with open(topic_categories_path, 'r') as f:
        topic_categories = json.load(f)

    # Initialize topic agent
    topic_config = config["agents"]["topic_classification"]
    self.topic_agent = TopicClassificationAgent(topic_config, api_key, topic_categories)

Step 7: Update Workflow Graph

Modify _build_workflow():

def _build_workflow(self) -> StateGraph:
    workflow = StateGraph(CommentState)

    # Add nodes
    workflow.add_node("language_detection", self._language_detection_node)
    workflow.add_node("translation", self._translation_node)
    workflow.add_node("sentiment_analysis", self._sentiment_analysis_node)
    workflow.add_node("topic_classification", self._topic_classification_node)  # ADD THIS

    # Define edges
    workflow.set_entry_point("language_detection")
    workflow.add_conditional_edges(
        "language_detection",
        self._should_translate,
        {"translate": "translation", "skip_translation": "sentiment_analysis"}
    )
    workflow.add_edge("translation", "sentiment_analysis")
    workflow.add_edge("sentiment_analysis", "topic_classification")  # ADD THIS
    workflow.add_edge("topic_classification", END)  # MODIFY THIS

    return workflow.compile()

Step 8: Update Database Schema

Add columns to your Snowflake table:

ALTER TABLE COMMENT_SENTIMENT_FEATURES
ADD COLUMN TOPICS VARCHAR(500),
ADD COLUMN TOPIC_CONFIDENCE VARCHAR(20),
ADD COLUMN TOPIC_REASONING VARCHAR(1000);

Step 9: Test Your Agent

Test with a small batch first:

python main.py --limit 10 --sequential

Check logs for any errors and verify output in Snowflake.

Quick Checklist for Adding New Agents

  • Create agent class inheriting from BaseAgent
  • Implement validate_input() method
  • Implement process() method
  • Implement _parse_llm_json_response() if using LLM
  • Add agent to agents/__init__.py
  • Add configuration to sentiment_config.json
  • Create/update category config file if needed
  • Add fields to CommentState TypedDict
  • Create node method in CommentProcessingWorkflow
  • Initialize agent in __init__
  • Add node to workflow graph
  • Update edges in workflow
  • Update database schema
  • Test with small batch

Modifying Existing Agents

Common Modifications

1. Change LLM Model

Update config_files/sentiment_config.json:

{
    "agents": {
        "sentiment_analysis": {
            "model": "gpt-4o",  // Change from gpt-5-nano
            "temperature": 0.2,
            "max_retries": 3
        }
    }
}

No code changes needed! Configuration is loaded dynamically.

2. Add New Sentiment Category

Update config_files/sentiment_analysis_config.json:

{
    "sentiment_polarity": {
        "categories": [
            // ... existing categories ...
            {
                "value": "mixed",
                "label": "Mixed",
                "description": "Contains both positive and negative elements"
            }
        ]
    }
}

The agent will automatically include this in prompts. No code changes needed.

3. Add New Intent Category

Update config_files/sentiment_analysis_config.json:

{
    "intent": {
        "categories": [
            // ... existing categories ...
            {
                "value": "collaboration",
                "label": "Collaboration",
                "description": "Seeking or offering collaboration opportunities"
            }
        ]
    }
}

4. Modify Reply Policy

Update config_files/sentiment_analysis_config.json:

{
    "reply_policy": {
        "requires_reply_intents": ["question", "request", "feedback_negative"],  // Added feedback_negative
        "not_include": ["humor_sarcasm", "spam_selfpromo"]  // Added spam_selfpromo
    }
}

5. Adjust Temperature for Better Results

If getting inconsistent results, adjust temperature:

{
    "agents": {
        "sentiment_analysis": {
            "model": "gpt-5-nano",
            "temperature": 0.1,  // Lower = more consistent, less creative
            "max_retries": 3
        }
    }
}

6. Add Context to Sentiment Analysis

Modify _build_context_string() in sentiment_analysis_agent.py:

def _build_context_string(self, content_description: str, parent_comment_text: str = None,
                          platform: str = None, content_title: str = None,
                          channel_name: str = None) -> str:  # ADD channel_name
    """Build context string for sentiment analysis."""
    context_parts = []

    # ... existing code ...

    # ADD THIS
    if channel_name:
        context_parts.append(f"Channel: {channel_name}")

    return "\n".join(context_parts)

Then update the analyze_sentiment() method to accept and pass channel_name.

7. Improve Language Detection Accuracy

Modify language_detection_agent.py to add more languages to LINGUA_TO_ISO:

LINGUA_TO_ISO = {
    # ... existing mappings ...
    Language.VIETNAMESE: "vi",
    Language.THAI: "th",
    Language.INDONESIAN: "id",
    # Add more as needed
}

8. Customize Translation Prompt

Modify translate_text() in translation_agent.py:

system_prompt = """You are a professional translator specializing in social media content related to music and education.
Translate the given text from the source language to English. The text is a comment on a musical content.
Preserve the tone, intent, and any emojis or special characters.
For informal social media language, maintain the casual tone in translation.

// ADD THESE GUIDELINES:
Special Instructions:
- Preserve musical terminology (e.g., "legato", "staccato") untranslated
- Translate instrument names (e.g., "guitarra" β†’ "guitar")
- Keep artist names and brand names in original language
- Maintain slang and colloquialisms when possible

Return your response in JSON format with the following fields:
- translated_text: The English translation
- translation_confidence: Your confidence level (high, medium, low)
- notes: Any important notes about the translation (optional)
"""

9. Add Retry Logic for Failed Analyses

Modify process() in sentiment_analysis_agent.py:

def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
    try:
        # ... existing validation code ...

        # ADD RETRY LOGIC
        max_attempts = self.max_retries
        for attempt in range(max_attempts):
            analysis_result = self.analyze_sentiment(
                comment_text, content_description,
                parent_comment_text, platform, content_title
            )

            if analysis_result.get("success"):
                break

            if attempt < max_attempts - 1:
                self.log_processing(f"Attempt {attempt + 1} failed, retrying...", "warning")

        # ... rest of existing code ...

10. Add Custom Validation Rules

Modify validate_input() in any agent:

def validate_input(self, input_data: Dict[str, Any]) -> bool:
    """Validate that input contains required fields."""
    required_fields = ["comment_text", "content_description"]

    # Check required fields exist
    if not all(field in input_data for field in required_fields):
        return False

    # ADD CUSTOM VALIDATION
    # Ensure comment_text is not empty or too short
    comment_text = input_data.get("comment_text", "")
    if not comment_text or len(comment_text.strip()) < 2:
        self.log_processing("Comment text too short or empty", "warning")
        return False

    # Ensure content_description exists
    content_desc = input_data.get("content_description", "")
    if not content_desc or content_desc.strip() == "":
        self.log_processing("Content description missing", "warning")
        return False

    return True

Testing Modified Agents

After making modifications, always test:

# Test with a small batch
python main.py --limit 10 --sequential

# Check specific data source
python main.py --limit 10 --sequential --data-source social_media

# Review logs for errors
tail -f logs/comment_processing_*.log

Configuration System

Configuration Files Overview

config_files/
β”œβ”€β”€ sentiment_config.json              # Agent behavior config
β”œβ”€β”€ sentiment_analysis_config.json     # Sentiment categories and intents
└── data_sources_config.json           # Data source configuration

Agent Configuration Structure

File: sentiment_config.json

{
    "agents": {
        "agent_name": {
            "name": "AgentClassName",
            "model": "gpt-5-nano",        // LLM model to use
            "temperature": 0.0,            // Creativity (0.0 = deterministic, 1.0 = creative)
            "max_retries": 3,              // Max retry attempts
            "description": "What this agent does"
        }
    },
    "workflow": {
        "parallel_processing": {
            "enabled": true,
            "worker_calculation": "CPU count - 2, max 5 workers",
            "min_batch_size": 20,
            "max_batch_size": 1000
        }
    }
}

Temperature Guidelines

  • 0.0 - 0.1: Deterministic, consistent (good for classification)
  • 0.2 - 0.4: Slight variation, mostly consistent (good for sentiment analysis)
  • 0.5 - 0.7: Balanced creativity and consistency (good for translation)
  • 0.8 - 1.0: Creative, varied (good for content generation)

Model Selection Guidelines

  • gpt-5-nano: Fast, cheap, good for simple tasks
  • gpt-4o-mini: Balanced speed/quality, good for most tasks
  • gpt-4o: High quality, slower, good for complex analysis

Category Configuration Structure

File: sentiment_analysis_config.json

{
    "category_type": {
        "categories": [
            {
                "value": "machine_readable_value",  // Used in code/DB
                "label": "Human Readable Label",    // Used in UI
                "description": "Detailed description for LLM prompt"
            }
        ]
    }
}

Loading Configuration in Code

# In workflow/__init__ or agent __init__
import json
import os

# Load agent config
with open('config_files/sentiment_config.json', 'r') as f:
    config = json.load(f)

agent_config = config["agents"]["agent_name"]

# Load category config
with open('config_files/sentiment_analysis_config.json', 'r') as f:
    categories = json.load(f)

sentiment_categories = categories["sentiment_polarity"]["categories"]

Best Practices

Agent Development

  1. Single Responsibility: Each agent should do one thing well
  2. Fail Gracefully: Always return structured error responses
  3. Preserve Data: Never lose original input data - pass it through
  4. Log Everything: Use log_processing() for debugging
  5. Validate Early: Check inputs before processing
  6. Configuration Over Code: Use config files for behavior changes
  7. Test Incrementally: Test with --limit 10 --sequential first

Prompt Engineering

  1. Be Specific: Clearly define expected output format
  2. Use Examples: Include few-shot examples in prompts
  3. Request JSON: Always request JSON format for structured data
  4. Handle Edge Cases: Document edge cases in prompts
  5. Provide Context: Give LLM all relevant context
  6. Set Constraints: Clearly define boundaries and limitations

Example of good prompt structure:

system_prompt = """You are an expert at [TASK].

Your task is to:
1. [Step 1]
2. [Step 2]
3. [Step 3]

Context: [Explain the context]

Rules:
- Rule 1
- Rule 2
- Rule 3

Examples:
- Input: "..." β†’ Output: {...}
- Input: "..." β†’ Output: {...}

Return your response in JSON format with the following fields:
- field1: description
- field2: description
"""

Error Handling

  1. Try-Catch Everything: Wrap all processing in try-catch
  2. Specific Error Messages: Make errors actionable
  3. Graceful Degradation: Continue workflow even if one agent fails
  4. Error Accumulation: Collect errors in processing_errors list
  5. Critical vs Non-Critical: Distinguish between recoverable and fatal errors

Example:

def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
    try:
        # Validate
        if not self.validate_input(input_data):
            return {
                "success": False,
                "error": "Invalid input: missing required fields",
                **input_data  # Preserve original data
            }

        # Process
        result = self.do_processing(input_data)

        # Check result
        if not result.get("success"):
            return {
                "success": False,
                "error": result.get("error", "Unknown error"),
                **input_data
            }

        # Return success
        return {
            "success": True,
            "output_field": result["output"],
            **input_data
        }

    except Exception as e:
        return self.handle_error(e, "process")

Testing

  1. Unit Test Agents: Test agents independently before integration
  2. Small Batches: Always test with --limit 10 first
  3. Sequential Mode: Use --sequential for debugging
  4. Check Logs: Review logs after every test run
  5. Validate Output: Check Snowflake results
  6. Test Edge Cases: Empty text, emojis only, very long text, special characters

Test script example:

# test_agent.py
from agents.sentiment_analysis_agent import SentimentAnalysisAgent
import json

# Load config
with open('config_files/sentiment_config.json', 'r') as f:
    config = json.load(f)
with open('config_files/sentiment_analysis_config.json', 'r') as f:
    categories = json.load(f)

# Initialize agent
agent = SentimentAnalysisAgent(
    config["agents"]["sentiment_analysis"],
    "your-api-key",
    categories
)

# Test cases
test_cases = [
    {"comment_text": "This is amazing!", "content_description": "Guitar tutorial"},
    {"comment_text": "😊😊😊", "content_description": "Piano cover"},
    {"comment_text": "What scale is this?", "content_description": "Blues solo"},
]

for test in test_cases:
    result = agent.process(test)
    print(f"Input: {test['comment_text']}")
    print(f"Result: {result}")
    print("---")

Performance Optimization

  1. Batch Processing: Process comments in batches (handled by workflow)
  2. Parallel Workers: Use multiprocessing for large batches
  3. Minimize LLM Calls: Cache results when possible
  4. Optimize Prompts: Shorter prompts = faster responses
  5. Choose Right Model: Use gpt-5-nano for simple tasks

Code Organization

  1. One Agent Per File: Don't combine multiple agents
  2. Helper Methods: Use private methods (_method_name) for internal logic
  3. Type Hints: Always use type hints for parameters and returns
  4. Docstrings: Document all public methods
  5. Constants: Define constants at class level

Example structure:

class MyAgent(BaseAgent):
    # Constants
    DEFAULT_VALUE = "default"
    MAX_LENGTH = 1000

    def __init__(self, config, api_key):
        """Initialize agent."""
        super().__init__("MyAgent", config)
        # ... initialization

    def validate_input(self, input_data: Dict[str, Any]) -> bool:
        """Validate input data."""
        # ... validation

    def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Main processing method."""
        # ... processing

    def _helper_method(self, data: str) -> str:
        """Private helper method."""
        # ... helper logic

    def _parse_llm_json_response(self, response: str) -> Dict[str, Any]:
        """Parse LLM JSON response."""
        # ... parsing

Troubleshooting

Common Issues

Issue 1: Agent Returns Empty Results

Symptoms: Agent succeeds but returns None or empty strings for key fields

Causes:

  • LLM not following JSON format
  • JSON parsing failing silently
  • Missing fields in LLM response

Solutions:

  1. Check logs for JSON parsing warnings
  2. Add validation after LLM call:
    result = self._parse_llm_json_response(response.content)
    
    # Validate result
    if not result.get("sentiment_polarity"):
        return {
            "success": False,
            "error": "Missing sentiment_polarity in LLM response"
        }
    
  3. Improve prompt to be more specific about required fields
  4. Add examples to prompt showing exact JSON structure

Issue 2: JSON Parsing Errors

Symptoms: JSON decode error in logs

Causes:

  • LLM returns markdown-wrapped JSON
  • LLM includes explanatory text before/after JSON
  • Malformed JSON from LLM

Solutions:

  1. Use _parse_llm_json_response() helper (already handles markdown)
  2. Add more explicit prompt:
    user_prompt = """...
    
    Return ONLY valid JSON, no explanation or markdown. Just the raw JSON object.
    """
    
  3. Add fallback parsing:
    try:
        result = json.loads(content)
    except json.JSONDecodeError:
        # Try to extract JSON from text
        import re
        json_match = re.search(r'\{.*\}', content, re.DOTALL)
        if json_match:
            result = json.loads(json_match.group())
        else:
            raise
    

Issue 3: Inconsistent Results

Symptoms: Same comment gets different classifications on reruns

Causes:

  • Temperature too high
  • Prompt too vague
  • Model inconsistency

Solutions:

  1. Lower temperature to 0.0 - 0.2 for classification tasks
  2. Make prompt more specific and rule-based
  3. Add examples to prompt
  4. Use a more consistent model (gpt-5-nano vs gpt-4o)

Issue 4: Agent Too Slow

Symptoms: Processing takes very long

Causes:

  • Large LLM model
  • Complex prompts
  • Sequential processing
  • API rate limits

Solutions:

  1. Use faster model (gpt-5-nano instead of gpt-4o)
  2. Simplify prompt (shorter = faster)
  3. Enable parallel processing (already default)
  4. Increase batch size (if not hitting rate limits)
  5. Consider caching repeated analyses

Issue 5: Agent Failing Validation

Symptoms: validate_input() returns False, agent skips processing

Causes:

  • Missing required fields in input
  • Empty or None values
  • Wrong data types

Solutions:

  1. Check workflow node - ensure all required fields passed:
    input_data = {
        "comment_text": state.get("translated_text", state["comment_text"]),
        "content_description": state["content_description"],
        # Add all required fields
    }
    
  2. Add logging to validation:
    def validate_input(self, input_data: Dict[str, Any]) -> bool:
        for field in required_fields:
            if field not in input_data:
                self.log_processing(f"Missing field: {field}", "error")
                return False
        return True
    

Issue 6: Workflow Not Running New Agent

Symptoms: New agent not being called, no logs from new agent

Causes:

  • Forgot to add node to workflow graph
  • Forgot to initialize agent
  • Workflow edges not connected

Solutions:

  1. Verify agent initialized in __init__:
    self.new_agent = NewAgent(config, api_key)
    
  2. Verify node added:
    workflow.add_node("new_agent", self._new_agent_node)
    
  3. Verify edges:
    workflow.add_edge("previous_agent", "new_agent")
    workflow.add_edge("new_agent", END)
    
  4. Check for exceptions in workflow compilation

Issue 7: Database Insert Fails

Symptoms: Processing succeeds but data not in Snowflake

Causes:

  • Missing columns in database
  • Data type mismatch
  • Field name mismatch

Solutions:

  1. Check column exists:
    DESC TABLE COMMENT_SENTIMENT_FEATURES;
    
  2. Add column if missing:
    ALTER TABLE COMMENT_SENTIMENT_FEATURES
    ADD COLUMN NEW_FIELD VARCHAR(500);
    
  3. Check field names match exactly (case-sensitive)
  4. Check main.py result_df construction includes new fields

Debugging Tips

  1. Enable Debug Logging: Set log level to DEBUG in main.py
  2. Print State: Add print statements in workflow nodes to see state
  3. Test Agent Directly: Test agent outside workflow first
  4. Use Sequential Mode: --sequential flag for clearer debugging
  5. Check API Logs: Review OpenAI API dashboard for errors
  6. Validate JSON: Use online JSON validator for config files
  7. Check Git Status: Ensure all files saved and changes committed

Getting Help

  1. Check Logs: Always check logs/ directory first
  2. Review This README: Answers to most questions are here
  3. Test Incrementally: Isolate the problem to one agent
  4. Use Small Batches: Test with --limit 5 for faster iteration
  5. Document Issues: Keep notes on what you tried

Conclusion

This agent architecture provides a flexible, maintainable foundation for processing social media comments. Key takeaways:

  • Base class pattern ensures consistency
  • LangGraph workflow enables flexible orchestration
  • Configuration-driven design minimizes code changes
  • Error resilience at every level
  • Extensible by design - easy to add new agents

For questions or issues, refer to the main project README or review the existing agent implementations for patterns and examples.


Last Updated: 2026-01-15 Version: 1.0 Maintainer: Musora Development Team