A newer version of the Streamlit SDK is available:
1.55.0
Agents Architecture Documentation
Table of Contents
- Overview
- Agent Architecture
- Existing Agents
- How Agents Work
- Adding New Agents
- Modifying Existing Agents
- Configuration System
- Best Practices
- Troubleshooting
Overview
The agent system in this project is built on a modular, extensible architecture that processes social media comments through a series of specialized agents. Each agent performs a specific task (language detection, translation, sentiment analysis) and is orchestrated through a LangGraph workflow.
Key Design Principles
- Modularity: Each agent handles a single responsibility
- Extensibility: Easy to add new agents without modifying existing code
- Consistency: All agents inherit from a common base class
- Configuration-Driven: Agent behavior controlled through JSON config files
- Error Resilience: Robust error handling at every level
Technology Stack
- LangChain: For LLM interactions and agent framework
- LangGraph: For workflow orchestration
- OpenAI API: LLM backend for NLP tasks
- Lingua: Fast language detection library
- Python 3.x: Core language
Agent Architecture
Directory Structure
agents/
βββ __init__.py # Module exports
βββ base_agent.py # Abstract base class
βββ language_detection_agent.py # Language detection agent
βββ translation_agent.py # Translation agent
βββ sentiment_analysis_agent.py # Sentiment analysis agent
βββ README.md # This file
Base Agent Class
All agents inherit from BaseAgent (base_agent.py), which provides:
class BaseAgent(ABC):
"""Abstract base class for all agents"""
# Common attributes
- name: str # Agent name
- config: Dict[str, Any] # Configuration dictionary
- model: str # LLM model to use
- temperature: float # LLM temperature
- max_retries: int # Maximum retry attempts
# Abstract methods (must be implemented)
@abstractmethod
def process(input_data: Dict) -> Dict
@abstractmethod
def validate_input(input_data: Dict) -> bool
# Common methods (inherited)
def get_name() -> str
def get_config() -> Dict
def log_processing(message: str, level: str)
def handle_error(error: Exception, context: str) -> Dict
Workflow Integration
Agents are orchestrated through workflow/comment_processor.py using LangGraph:
βββββββββββββββββββββββ
β Language Detection β
β Agent β
ββββββββββββ¬βββββββββββ
β
βΌ
ββββββ΄βββββ
β English?β
ββββββ¬βββββ
β
βββββββ΄ββββββ
β β
Yes No
β β
β βΌ
β βββββββββββββββ
β β Translation β
β β Agent β
β ββββββββ¬βββββββ
β β
βββββββ¬ββββββ
β
βΌ
ββββββββββββββββββββ
β Sentiment β
β Analysis Agent β
ββββββββββββββββββββ
Existing Agents
1. Language Detection Agent
File: language_detection_agent.py
Purpose: Detects the language of comment text using a hybrid approach.
Strategy:
- Uses Lingua library for fast English detection
- Falls back to LLM for non-English languages (higher accuracy)
- Returns language name, ISO code, and confidence level
Key Methods:
def detect_with_lingua(text: str) -> tuple[str, str, bool]
# Fast detection using lingua library
# Returns: (language_code, language_name, is_english)
def detect_with_llm(text: str) -> Dict[str, Any]
# LLM-based detection for nuanced analysis
# Returns: {language, language_code, confidence, has_text}
def process(input_data: Dict) -> Dict
# Main processing: lingua first, LLM if not English
Configuration (sentiment_config.json):
{
"language_detection": {
"model": "gpt-5-nano",
"temperature": 0.0,
"max_retries": 3
}
}
Input Requirements:
comment_text: str
Output:
language: str (e.g., "English", "Spanish")language_code: str (ISO 639-1, e.g., "en", "es")is_english: boolconfidence: str ("high", "medium", "low")detection_method: str ("lingua", "llm", "default")has_text: bool
2. Translation Agent
File: translation_agent.py
Purpose: Translates non-English comments to English using LLM.
Strategy:
- Skips translation if already English
- Uses LLM for context-aware, high-quality translation
- Preserves tone, intent, emojis, and special characters
- Specialized for music/education social media content
Key Methods:
def translate_text(text: str, source_language: str) -> Dict
# LLM-based translation with context preservation
# Returns: {translated_text, translation_confidence, notes}
def process(input_data: Dict) -> Dict
# Main processing: checks is_english, translates if needed
Configuration:
{
"translation": {
"model": "gpt-5-nano",
"temperature": 0.3,
"max_retries": 3
}
}
Input Requirements:
comment_text: stris_english: boollanguage: str (optional, for context)
Output:
translated_text: strtranslation_performed: booltranslation_confidence: strtranslation_notes: str
3. Sentiment Analysis Agent
File: sentiment_analysis_agent.py
Purpose: Analyzes sentiment polarity, intent, and determines if reply is needed.
Strategy:
- Uses content description for context
- Supports parent comment context for reply analysis
- Multi-label intent classification
- Differentiates genuine vs rhetorical/sarcastic questions
- Platform-aware analysis (YouTube, Facebook, Instagram)
Key Features:
- Context-Aware: Uses content description and parent comment
- Multi-Label: Can assign multiple intents to a single comment
- Reply Policy: Flags comments requiring responses
- Rhetorical Detection: Identifies sarcastic/rhetorical questions
Key Methods:
def _build_context_string(
content_description: str,
parent_comment_text: str = None,
platform: str = None,
content_title: str = None
) -> str
# Builds context for LLM prompt
# Handles YouTube title+description vs other platforms
def analyze_sentiment(
comment_text: str,
content_description: str,
parent_comment_text: str = None,
platform: str = None,
content_title: str = None
) -> Dict
# Performs sentiment analysis with full context
# Returns: {sentiment_polarity, intent, requires_reply, confidence, analysis_notes}
def process(input_data: Dict) -> Dict
# Main processing: validates input, analyzes sentiment
Configuration: Uses two config files:
- Agent Config (
sentiment_config.json):
{
"sentiment_analysis": {
"model": "gpt-5-nano",
"temperature": 0.2,
"max_retries": 3
}
}
- Categories Config (
sentiment_analysis_config.json):
{
"sentiment_polarity": {
"categories": [
{"value": "very_positive", "label": "Very Positive", "description": "..."},
{"value": "positive", "label": "Positive", "description": "..."},
{"value": "neutral", "label": "Neutral", "description": "..."},
{"value": "negative", "label": "Negative", "description": "..."},
{"value": "very_negative", "label": "Very Negative", "description": "..."}
]
},
"intent": {
"categories": [
{"value": "praise", "description": "..."},
{"value": "question", "description": "..."},
{"value": "request", "description": "..."},
{"value": "feedback_negative", "description": "..."},
{"value": "suggestion", "description": "..."},
{"value": "humor_sarcasm", "description": "..."},
{"value": "off_topic", "description": "..."},
{"value": "spam_selfpromo", "description": "..."}
]
},
"reply_policy": {
"requires_reply_intents": ["question", "request"],
"not_include": ["humor_sarcasm"]
}
}
Input Requirements:
comment_text: strcontent_description: strparent_comment_text: str (optional)platform: str (optional, e.g., "youtube", "facebook")content_title: str (optional, mainly for YouTube)
Output:
sentiment_polarity: str (one of: very_positive, positive, neutral, negative, very_negative)intent: str (comma-separated list, e.g., "praise, question")requires_reply: boolsentiment_confidence: str ("high", "medium", "low")analysis_notes: str (1-2 sentence summary)success: bool (False if critical fields missing)
Common Patterns Across All Agents
- JSON Response Parsing: All agents have
_parse_llm_json_response()method to handle markdown-wrapped JSON - Error Handling: All use
handle_error()from base class - Logging: All use
log_processing()for consistent logging - Validation: All implement
validate_input()before processing - State Preservation: All preserve original input data in output
How Agents Work
Workflow Execution Flow
Initialization (
CommentProcessingWorkflow.__init__):# Load configurations lang_detect_config = config["agents"]["language_detection"] translation_config = config["agents"]["translation"] sentiment_config = config["agents"]["sentiment_analysis"] # Initialize agents self.language_agent = LanguageDetectionAgent(lang_detect_config, api_key) self.translation_agent = TranslationAgent(translation_config, api_key) self.sentiment_agent = SentimentAnalysisAgent(sentiment_config, api_key, sentiment_categories) # Build workflow graph self.workflow = self._build_workflow()Workflow Graph (
_build_workflow()):workflow = StateGraph(CommentState) # Add nodes (agents) workflow.add_node("language_detection", self._language_detection_node) workflow.add_node("translation", self._translation_node) workflow.add_node("sentiment_analysis", self._sentiment_analysis_node) # Define edges (control flow) workflow.set_entry_point("language_detection") workflow.add_conditional_edges( "language_detection", self._should_translate, {"translate": "translation", "skip_translation": "sentiment_analysis"} ) workflow.add_edge("translation", "sentiment_analysis") workflow.add_edge("sentiment_analysis", END) return workflow.compile()Node Execution (Example:
_language_detection_node):def _language_detection_node(self, state: CommentState) -> CommentState: try: # Prepare input input_data = {"comment_text": state["comment_text"]} # Process with agent result = self.language_agent.process(input_data) # Update state if result.get("success", False): state["language"] = result.get("language", "English") state["language_code"] = result.get("language_code", "en") state["is_english"] = result.get("is_english", True) # ... more fields else: # Handle error, set defaults state["processing_errors"].append(result.get("error")) return state except Exception as e: # Error handling state["processing_errors"].append(str(e)) return stateDecision Points (Example:
_should_translate):def _should_translate(self, state: CommentState) -> str: if state.get("is_english", True) or not state.get("has_text", True): # Set defaults for skipped translation state["translated_text"] = state["comment_text"] state["translation_performed"] = False return "skip_translation" else: return "translate"Comment Processing (
process_comment()):def process_comment(self, comment_data: Dict) -> Dict: # Initialize state initial_state = { "comment_sk": comment_data.get("comment_sk"), "comment_text": comment_data.get("comment_text"), # ... all fields "processing_errors": [], "success": True } # Run workflow final_state = self.workflow.invoke(initial_state) # Merge and return return dict(final_state)
State Management
The workflow uses a CommentState TypedDict to pass data between agents:
class CommentState(TypedDict):
# Input fields
comment_sk: int
comment_id: str
comment_text: str
# ... more fields
# Processing fields (populated by agents)
language: str
language_code: str
is_english: bool
translated_text: str
sentiment_polarity: str
intent: str
# ... more fields
# Metadata
processing_errors: Annotated[List[str], operator.add]
success: bool
Error Handling Strategy
- Agent Level: Each agent returns
{"success": False, "error": "..."}on failure - Node Level: Nodes catch exceptions, set defaults, append to
processing_errors - Workflow Level: Workflow continues even if an agent fails (graceful degradation)
- Critical Failures: Sentiment agent marks
success=Falseif critical fields missing (comment not stored)
Adding New Agents
Step-by-Step Guide
Step 1: Create the Agent Class
Create a new file in the agents/ directory (e.g., topic_classification_agent.py):
"""
Topic Classification Agent
Extracts topics and themes from comments
"""
from typing import Dict, Any
import json
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage
from agents.base_agent import BaseAgent
import logging
logger = logging.getLogger(__name__)
class TopicClassificationAgent(BaseAgent):
"""
Agent that classifies comments into predefined topics/themes.
"""
def __init__(self, config: Dict[str, Any], api_key: str, topic_categories: Dict[str, Any]):
"""
Initialize the Topic Classification Agent.
Args:
config: Configuration dictionary
api_key: OpenAI API key
topic_categories: Dictionary with topic categories
"""
super().__init__("TopicClassificationAgent", config)
self.api_key = api_key
self.topic_categories = topic_categories
self.llm = ChatOpenAI(
model=self.model,
temperature=self.temperature,
api_key=self.api_key
)
def validate_input(self, input_data: Dict[str, Any]) -> bool:
"""
Validate that input contains required fields.
Args:
input_data: Input dictionary
Returns:
True if valid, False otherwise
"""
required_fields = ["comment_text"]
return all(field in input_data for field in required_fields)
def classify_topics(self, comment_text: str) -> Dict[str, Any]:
"""
Classify comment into topics using LLM.
Args:
comment_text: The comment text to analyze
Returns:
Dictionary with topic classification results
"""
# Build topic options from config
topic_options = "\n".join([
f"- {cat['value']}: {cat['description']}"
for cat in self.topic_categories["topics"]["categories"]
])
system_prompt = f"""You are an expert at classifying music-related comments into topics.
Available Topics:
{topic_options}
Return your response in JSON format with the following fields:
- topics: array of topic values (multi-label, can have multiple topics)
- confidence: your confidence level (high, medium, low)
- reasoning: brief explanation of your classification
"""
user_prompt = f"""Classify this comment into relevant topics:
Comment: "{comment_text}"
Return JSON only."""
try:
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
]
response = self.llm.invoke(messages)
result = self._parse_llm_json_response(response.content)
topics = result.get("topics", [])
if isinstance(topics, str):
topics = [topics]
topic_str = ", ".join(topics) if topics else None
return {
"success": True,
"topics": topic_str,
"topic_confidence": result.get("confidence", "medium"),
"topic_reasoning": result.get("reasoning", "")
}
except json.JSONDecodeError as e:
self.log_processing(f"JSON decode error: {str(e)}", "warning")
return {
"success": False,
"error": str(e)
}
except Exception as e:
self.log_processing(f"Topic classification failed: {str(e)}", "error")
return {
"success": False,
"error": str(e)
}
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process comment and extract topics.
Args:
input_data: Dictionary containing comment data
Returns:
Dictionary with topic classification results
"""
try:
# Validate input
if not self.validate_input(input_data):
return {
"success": False,
"error": "Invalid input: missing required fields"
}
comment_text = input_data["comment_text"]
self.log_processing(f"Classifying topics for comment", "debug")
# Perform classification
classification_result = self.classify_topics(comment_text)
result = {
"success": classification_result.get("success", False),
"topics": classification_result.get("topics"),
"topic_confidence": classification_result.get("topic_confidence"),
"topic_reasoning": classification_result.get("topic_reasoning", "")
}
if "error" in classification_result:
result["topic_error"] = classification_result["error"]
# Preserve all original data
for key, value in input_data.items():
if key not in result:
result[key] = value
return result
except Exception as e:
return self.handle_error(e, "topic_classification")
def _parse_llm_json_response(self, response_content: str) -> Dict[str, Any]:
"""
Parse LLM response that may contain JSON wrapped in markdown code blocks.
Args:
response_content: Raw response content from LLM
Returns:
Parsed JSON dictionary
Raises:
json.JSONDecodeError: If JSON cannot be parsed
"""
content = response_content.strip()
# Check if response is wrapped in markdown code block
if content.startswith("```json"):
content = content[7:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
elif content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
return json.loads(content)
Step 2: Update __init__.py
Add your agent to agents/__init__.py:
"""
Agents module for the sentiment analysis workflow.
Provides modular, extensible agents for various NLP tasks.
"""
from agents.base_agent import BaseAgent
from agents.language_detection_agent import LanguageDetectionAgent
from agents.translation_agent import TranslationAgent
from agents.sentiment_analysis_agent import SentimentAnalysisAgent
from agents.topic_classification_agent import TopicClassificationAgent # ADD THIS
__all__ = [
"BaseAgent",
"LanguageDetectionAgent",
"TranslationAgent",
"SentimentAnalysisAgent",
"TopicClassificationAgent" # ADD THIS
]
Step 3: Update Configuration Files
Add agent configuration to config_files/sentiment_config.json:
{
"agents": {
"language_detection": { ... },
"translation": { ... },
"sentiment_analysis": { ... },
"topic_classification": {
"name": "TopicClassificationAgent",
"model": "gpt-5-nano",
"temperature": 0.2,
"max_retries": 3,
"description": "Classifies comments into topic categories"
}
}
}
Create topic categories config (or add to existing sentiment_analysis_config.json):
{
"topics": {
"categories": [
{
"value": "technique",
"label": "Technique",
"description": "Playing technique, finger positioning, hand coordination"
},
{
"value": "theory",
"label": "Music Theory",
"description": "Scales, chords, harmony, composition theory"
},
{
"value": "equipment",
"label": "Equipment",
"description": "Instruments, gear, accessories, software"
},
{
"value": "performance",
"label": "Performance",
"description": "Stage presence, live playing, performance anxiety"
},
{
"value": "practice",
"label": "Practice",
"description": "Practice routines, discipline, improvement tips"
}
]
}
}
Step 4: Update Workflow State
Add fields to CommentState in workflow/comment_processor.py:
class CommentState(TypedDict):
# ... existing fields ...
# Topic classification fields
topics: str
topic_confidence: str
topic_reasoning: str
Step 5: Add Workflow Node
Add the node method to CommentProcessingWorkflow class:
def _topic_classification_node(self, state: CommentState) -> CommentState:
"""
Node for topic classification.
Args:
state: Current workflow state
Returns:
Updated state with topic classification results
"""
try:
# Prepare input
input_data = {
"comment_text": state.get("translated_text", state["comment_text"])
}
# Process with topic classification agent
result = self.topic_agent.process(input_data)
# Update state
if result.get("success", False):
state["topics"] = result.get("topics")
state["topic_confidence"] = result.get("topic_confidence")
state["topic_reasoning"] = result.get("topic_reasoning", "")
else:
error_msg = f"Topic classification failed: {result.get('error', 'Unknown error')}"
state["processing_errors"] = state.get("processing_errors", []) + [error_msg]
state["topics"] = None
state["topic_confidence"] = None
state["topic_reasoning"] = "Topic classification failed"
logger.debug(f"Topics: {state['topics']}")
return state
except Exception as e:
error_msg = f"Topic classification node error: {str(e)}"
logger.error(error_msg)
state["processing_errors"] = state.get("processing_errors", []) + [error_msg]
state["topics"] = None
state["topic_confidence"] = None
state["topic_reasoning"] = "Error during topic classification"
return state
Step 6: Initialize Agent in Workflow
Update __init__ method:
def __init__(self, config: Dict[str, Any], api_key: str):
# ... existing initialization ...
# Load topic categories
topic_categories_path = config.get("topic_categories_config", "config_files/topic_categories.json")
with open(topic_categories_path, 'r') as f:
topic_categories = json.load(f)
# Initialize topic agent
topic_config = config["agents"]["topic_classification"]
self.topic_agent = TopicClassificationAgent(topic_config, api_key, topic_categories)
Step 7: Update Workflow Graph
Modify _build_workflow():
def _build_workflow(self) -> StateGraph:
workflow = StateGraph(CommentState)
# Add nodes
workflow.add_node("language_detection", self._language_detection_node)
workflow.add_node("translation", self._translation_node)
workflow.add_node("sentiment_analysis", self._sentiment_analysis_node)
workflow.add_node("topic_classification", self._topic_classification_node) # ADD THIS
# Define edges
workflow.set_entry_point("language_detection")
workflow.add_conditional_edges(
"language_detection",
self._should_translate,
{"translate": "translation", "skip_translation": "sentiment_analysis"}
)
workflow.add_edge("translation", "sentiment_analysis")
workflow.add_edge("sentiment_analysis", "topic_classification") # ADD THIS
workflow.add_edge("topic_classification", END) # MODIFY THIS
return workflow.compile()
Step 8: Update Database Schema
Add columns to your Snowflake table:
ALTER TABLE COMMENT_SENTIMENT_FEATURES
ADD COLUMN TOPICS VARCHAR(500),
ADD COLUMN TOPIC_CONFIDENCE VARCHAR(20),
ADD COLUMN TOPIC_REASONING VARCHAR(1000);
Step 9: Test Your Agent
Test with a small batch first:
python main.py --limit 10 --sequential
Check logs for any errors and verify output in Snowflake.
Quick Checklist for Adding New Agents
- Create agent class inheriting from
BaseAgent - Implement
validate_input()method - Implement
process()method - Implement
_parse_llm_json_response()if using LLM - Add agent to
agents/__init__.py - Add configuration to
sentiment_config.json - Create/update category config file if needed
- Add fields to
CommentStateTypedDict - Create node method in
CommentProcessingWorkflow - Initialize agent in
__init__ - Add node to workflow graph
- Update edges in workflow
- Update database schema
- Test with small batch
Modifying Existing Agents
Common Modifications
1. Change LLM Model
Update config_files/sentiment_config.json:
{
"agents": {
"sentiment_analysis": {
"model": "gpt-4o", // Change from gpt-5-nano
"temperature": 0.2,
"max_retries": 3
}
}
}
No code changes needed! Configuration is loaded dynamically.
2. Add New Sentiment Category
Update config_files/sentiment_analysis_config.json:
{
"sentiment_polarity": {
"categories": [
// ... existing categories ...
{
"value": "mixed",
"label": "Mixed",
"description": "Contains both positive and negative elements"
}
]
}
}
The agent will automatically include this in prompts. No code changes needed.
3. Add New Intent Category
Update config_files/sentiment_analysis_config.json:
{
"intent": {
"categories": [
// ... existing categories ...
{
"value": "collaboration",
"label": "Collaboration",
"description": "Seeking or offering collaboration opportunities"
}
]
}
}
4. Modify Reply Policy
Update config_files/sentiment_analysis_config.json:
{
"reply_policy": {
"requires_reply_intents": ["question", "request", "feedback_negative"], // Added feedback_negative
"not_include": ["humor_sarcasm", "spam_selfpromo"] // Added spam_selfpromo
}
}
5. Adjust Temperature for Better Results
If getting inconsistent results, adjust temperature:
{
"agents": {
"sentiment_analysis": {
"model": "gpt-5-nano",
"temperature": 0.1, // Lower = more consistent, less creative
"max_retries": 3
}
}
}
6. Add Context to Sentiment Analysis
Modify _build_context_string() in sentiment_analysis_agent.py:
def _build_context_string(self, content_description: str, parent_comment_text: str = None,
platform: str = None, content_title: str = None,
channel_name: str = None) -> str: # ADD channel_name
"""Build context string for sentiment analysis."""
context_parts = []
# ... existing code ...
# ADD THIS
if channel_name:
context_parts.append(f"Channel: {channel_name}")
return "\n".join(context_parts)
Then update the analyze_sentiment() method to accept and pass channel_name.
7. Improve Language Detection Accuracy
Modify language_detection_agent.py to add more languages to LINGUA_TO_ISO:
LINGUA_TO_ISO = {
# ... existing mappings ...
Language.VIETNAMESE: "vi",
Language.THAI: "th",
Language.INDONESIAN: "id",
# Add more as needed
}
8. Customize Translation Prompt
Modify translate_text() in translation_agent.py:
system_prompt = """You are a professional translator specializing in social media content related to music and education.
Translate the given text from the source language to English. The text is a comment on a musical content.
Preserve the tone, intent, and any emojis or special characters.
For informal social media language, maintain the casual tone in translation.
// ADD THESE GUIDELINES:
Special Instructions:
- Preserve musical terminology (e.g., "legato", "staccato") untranslated
- Translate instrument names (e.g., "guitarra" β "guitar")
- Keep artist names and brand names in original language
- Maintain slang and colloquialisms when possible
Return your response in JSON format with the following fields:
- translated_text: The English translation
- translation_confidence: Your confidence level (high, medium, low)
- notes: Any important notes about the translation (optional)
"""
9. Add Retry Logic for Failed Analyses
Modify process() in sentiment_analysis_agent.py:
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
try:
# ... existing validation code ...
# ADD RETRY LOGIC
max_attempts = self.max_retries
for attempt in range(max_attempts):
analysis_result = self.analyze_sentiment(
comment_text, content_description,
parent_comment_text, platform, content_title
)
if analysis_result.get("success"):
break
if attempt < max_attempts - 1:
self.log_processing(f"Attempt {attempt + 1} failed, retrying...", "warning")
# ... rest of existing code ...
10. Add Custom Validation Rules
Modify validate_input() in any agent:
def validate_input(self, input_data: Dict[str, Any]) -> bool:
"""Validate that input contains required fields."""
required_fields = ["comment_text", "content_description"]
# Check required fields exist
if not all(field in input_data for field in required_fields):
return False
# ADD CUSTOM VALIDATION
# Ensure comment_text is not empty or too short
comment_text = input_data.get("comment_text", "")
if not comment_text or len(comment_text.strip()) < 2:
self.log_processing("Comment text too short or empty", "warning")
return False
# Ensure content_description exists
content_desc = input_data.get("content_description", "")
if not content_desc or content_desc.strip() == "":
self.log_processing("Content description missing", "warning")
return False
return True
Testing Modified Agents
After making modifications, always test:
# Test with a small batch
python main.py --limit 10 --sequential
# Check specific data source
python main.py --limit 10 --sequential --data-source social_media
# Review logs for errors
tail -f logs/comment_processing_*.log
Configuration System
Configuration Files Overview
config_files/
βββ sentiment_config.json # Agent behavior config
βββ sentiment_analysis_config.json # Sentiment categories and intents
βββ data_sources_config.json # Data source configuration
Agent Configuration Structure
File: sentiment_config.json
{
"agents": {
"agent_name": {
"name": "AgentClassName",
"model": "gpt-5-nano", // LLM model to use
"temperature": 0.0, // Creativity (0.0 = deterministic, 1.0 = creative)
"max_retries": 3, // Max retry attempts
"description": "What this agent does"
}
},
"workflow": {
"parallel_processing": {
"enabled": true,
"worker_calculation": "CPU count - 2, max 5 workers",
"min_batch_size": 20,
"max_batch_size": 1000
}
}
}
Temperature Guidelines
- 0.0 - 0.1: Deterministic, consistent (good for classification)
- 0.2 - 0.4: Slight variation, mostly consistent (good for sentiment analysis)
- 0.5 - 0.7: Balanced creativity and consistency (good for translation)
- 0.8 - 1.0: Creative, varied (good for content generation)
Model Selection Guidelines
- gpt-5-nano: Fast, cheap, good for simple tasks
- gpt-4o-mini: Balanced speed/quality, good for most tasks
- gpt-4o: High quality, slower, good for complex analysis
Category Configuration Structure
File: sentiment_analysis_config.json
{
"category_type": {
"categories": [
{
"value": "machine_readable_value", // Used in code/DB
"label": "Human Readable Label", // Used in UI
"description": "Detailed description for LLM prompt"
}
]
}
}
Loading Configuration in Code
# In workflow/__init__ or agent __init__
import json
import os
# Load agent config
with open('config_files/sentiment_config.json', 'r') as f:
config = json.load(f)
agent_config = config["agents"]["agent_name"]
# Load category config
with open('config_files/sentiment_analysis_config.json', 'r') as f:
categories = json.load(f)
sentiment_categories = categories["sentiment_polarity"]["categories"]
Best Practices
Agent Development
- Single Responsibility: Each agent should do one thing well
- Fail Gracefully: Always return structured error responses
- Preserve Data: Never lose original input data - pass it through
- Log Everything: Use
log_processing()for debugging - Validate Early: Check inputs before processing
- Configuration Over Code: Use config files for behavior changes
- Test Incrementally: Test with
--limit 10 --sequentialfirst
Prompt Engineering
- Be Specific: Clearly define expected output format
- Use Examples: Include few-shot examples in prompts
- Request JSON: Always request JSON format for structured data
- Handle Edge Cases: Document edge cases in prompts
- Provide Context: Give LLM all relevant context
- Set Constraints: Clearly define boundaries and limitations
Example of good prompt structure:
system_prompt = """You are an expert at [TASK].
Your task is to:
1. [Step 1]
2. [Step 2]
3. [Step 3]
Context: [Explain the context]
Rules:
- Rule 1
- Rule 2
- Rule 3
Examples:
- Input: "..." β Output: {...}
- Input: "..." β Output: {...}
Return your response in JSON format with the following fields:
- field1: description
- field2: description
"""
Error Handling
- Try-Catch Everything: Wrap all processing in try-catch
- Specific Error Messages: Make errors actionable
- Graceful Degradation: Continue workflow even if one agent fails
- Error Accumulation: Collect errors in
processing_errorslist - Critical vs Non-Critical: Distinguish between recoverable and fatal errors
Example:
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
try:
# Validate
if not self.validate_input(input_data):
return {
"success": False,
"error": "Invalid input: missing required fields",
**input_data # Preserve original data
}
# Process
result = self.do_processing(input_data)
# Check result
if not result.get("success"):
return {
"success": False,
"error": result.get("error", "Unknown error"),
**input_data
}
# Return success
return {
"success": True,
"output_field": result["output"],
**input_data
}
except Exception as e:
return self.handle_error(e, "process")
Testing
- Unit Test Agents: Test agents independently before integration
- Small Batches: Always test with
--limit 10first - Sequential Mode: Use
--sequentialfor debugging - Check Logs: Review logs after every test run
- Validate Output: Check Snowflake results
- Test Edge Cases: Empty text, emojis only, very long text, special characters
Test script example:
# test_agent.py
from agents.sentiment_analysis_agent import SentimentAnalysisAgent
import json
# Load config
with open('config_files/sentiment_config.json', 'r') as f:
config = json.load(f)
with open('config_files/sentiment_analysis_config.json', 'r') as f:
categories = json.load(f)
# Initialize agent
agent = SentimentAnalysisAgent(
config["agents"]["sentiment_analysis"],
"your-api-key",
categories
)
# Test cases
test_cases = [
{"comment_text": "This is amazing!", "content_description": "Guitar tutorial"},
{"comment_text": "πππ", "content_description": "Piano cover"},
{"comment_text": "What scale is this?", "content_description": "Blues solo"},
]
for test in test_cases:
result = agent.process(test)
print(f"Input: {test['comment_text']}")
print(f"Result: {result}")
print("---")
Performance Optimization
- Batch Processing: Process comments in batches (handled by workflow)
- Parallel Workers: Use multiprocessing for large batches
- Minimize LLM Calls: Cache results when possible
- Optimize Prompts: Shorter prompts = faster responses
- Choose Right Model: Use gpt-5-nano for simple tasks
Code Organization
- One Agent Per File: Don't combine multiple agents
- Helper Methods: Use private methods (_method_name) for internal logic
- Type Hints: Always use type hints for parameters and returns
- Docstrings: Document all public methods
- Constants: Define constants at class level
Example structure:
class MyAgent(BaseAgent):
# Constants
DEFAULT_VALUE = "default"
MAX_LENGTH = 1000
def __init__(self, config, api_key):
"""Initialize agent."""
super().__init__("MyAgent", config)
# ... initialization
def validate_input(self, input_data: Dict[str, Any]) -> bool:
"""Validate input data."""
# ... validation
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Main processing method."""
# ... processing
def _helper_method(self, data: str) -> str:
"""Private helper method."""
# ... helper logic
def _parse_llm_json_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM JSON response."""
# ... parsing
Troubleshooting
Common Issues
Issue 1: Agent Returns Empty Results
Symptoms: Agent succeeds but returns None or empty strings for key fields
Causes:
- LLM not following JSON format
- JSON parsing failing silently
- Missing fields in LLM response
Solutions:
- Check logs for JSON parsing warnings
- Add validation after LLM call:
result = self._parse_llm_json_response(response.content) # Validate result if not result.get("sentiment_polarity"): return { "success": False, "error": "Missing sentiment_polarity in LLM response" } - Improve prompt to be more specific about required fields
- Add examples to prompt showing exact JSON structure
Issue 2: JSON Parsing Errors
Symptoms: JSON decode error in logs
Causes:
- LLM returns markdown-wrapped JSON
- LLM includes explanatory text before/after JSON
- Malformed JSON from LLM
Solutions:
- Use
_parse_llm_json_response()helper (already handles markdown) - Add more explicit prompt:
user_prompt = """... Return ONLY valid JSON, no explanation or markdown. Just the raw JSON object. """ - Add fallback parsing:
try: result = json.loads(content) except json.JSONDecodeError: # Try to extract JSON from text import re json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: result = json.loads(json_match.group()) else: raise
Issue 3: Inconsistent Results
Symptoms: Same comment gets different classifications on reruns
Causes:
- Temperature too high
- Prompt too vague
- Model inconsistency
Solutions:
- Lower temperature to 0.0 - 0.2 for classification tasks
- Make prompt more specific and rule-based
- Add examples to prompt
- Use a more consistent model (gpt-5-nano vs gpt-4o)
Issue 4: Agent Too Slow
Symptoms: Processing takes very long
Causes:
- Large LLM model
- Complex prompts
- Sequential processing
- API rate limits
Solutions:
- Use faster model (gpt-5-nano instead of gpt-4o)
- Simplify prompt (shorter = faster)
- Enable parallel processing (already default)
- Increase batch size (if not hitting rate limits)
- Consider caching repeated analyses
Issue 5: Agent Failing Validation
Symptoms: validate_input() returns False, agent skips processing
Causes:
- Missing required fields in input
- Empty or None values
- Wrong data types
Solutions:
- Check workflow node - ensure all required fields passed:
input_data = { "comment_text": state.get("translated_text", state["comment_text"]), "content_description": state["content_description"], # Add all required fields } - Add logging to validation:
def validate_input(self, input_data: Dict[str, Any]) -> bool: for field in required_fields: if field not in input_data: self.log_processing(f"Missing field: {field}", "error") return False return True
Issue 6: Workflow Not Running New Agent
Symptoms: New agent not being called, no logs from new agent
Causes:
- Forgot to add node to workflow graph
- Forgot to initialize agent
- Workflow edges not connected
Solutions:
- Verify agent initialized in
__init__:self.new_agent = NewAgent(config, api_key) - Verify node added:
workflow.add_node("new_agent", self._new_agent_node) - Verify edges:
workflow.add_edge("previous_agent", "new_agent") workflow.add_edge("new_agent", END) - Check for exceptions in workflow compilation
Issue 7: Database Insert Fails
Symptoms: Processing succeeds but data not in Snowflake
Causes:
- Missing columns in database
- Data type mismatch
- Field name mismatch
Solutions:
- Check column exists:
DESC TABLE COMMENT_SENTIMENT_FEATURES; - Add column if missing:
ALTER TABLE COMMENT_SENTIMENT_FEATURES ADD COLUMN NEW_FIELD VARCHAR(500); - Check field names match exactly (case-sensitive)
- Check main.py result_df construction includes new fields
Debugging Tips
- Enable Debug Logging: Set log level to DEBUG in main.py
- Print State: Add print statements in workflow nodes to see state
- Test Agent Directly: Test agent outside workflow first
- Use Sequential Mode:
--sequentialflag for clearer debugging - Check API Logs: Review OpenAI API dashboard for errors
- Validate JSON: Use online JSON validator for config files
- Check Git Status: Ensure all files saved and changes committed
Getting Help
- Check Logs: Always check
logs/directory first - Review This README: Answers to most questions are here
- Test Incrementally: Isolate the problem to one agent
- Use Small Batches: Test with
--limit 5for faster iteration - Document Issues: Keep notes on what you tried
Conclusion
This agent architecture provides a flexible, maintainable foundation for processing social media comments. Key takeaways:
- Base class pattern ensures consistency
- LangGraph workflow enables flexible orchestration
- Configuration-driven design minimizes code changes
- Error resilience at every level
- Extensible by design - easy to add new agents
For questions or issues, refer to the main project README or review the existing agent implementations for patterns and examples.
Last Updated: 2026-01-15 Version: 1.0 Maintainer: Musora Development Team