A newer version of the Streamlit SDK is available:
1.55.0
Comment Processing with Agentic Workflow
A scalable, modular system for processing comments from multiple data sources using OpenAI API, LangChain, and LangGraph. The system performs language detection, translation, and context-aware sentiment analysis using an agentic workflow architecture.
Data Sources Supported
- Social Media Comments: External platforms (Facebook, Instagram, YouTube, etc.)
- Musora Internal Comments: Comments from Musora internal applications
- Extensible Architecture: Easily add new data sources via configuration
Features
- Multi-Source Support: Process comments from multiple data sources with a single codebase
- Configuration-Driven: Add new data sources without code changes
- Parent Comment Context: Automatically includes parent comment text for reply analysis
- Modular Agent Architecture: Extensible base classes for easy addition of new agents
- Language Detection: Hybrid approach using lingua library for fast English detection, with LLM fallback for non-English languages
- Translation: High-quality translation for non-English comments using OpenAI models
- Context-Aware Sentiment Analysis:
- Uses content description for context
- Includes parent comment text when analyzing replies
- Multi-label intent classification
- LangGraph Workflow: Flexible graph-based orchestration of agent operations
- Snowflake Integration: Seamless data fetching and storage with source-specific tables
- Parallel Processing: Multiprocessing support for high-performance batch processing
- Dynamic Batch Sizing: Intelligent batch size calculation based on workload and available resources
- Independent Batch Execution: Each batch processes and stores results independently
- Comprehensive Logging: Detailed logging for monitoring and debugging
- Scalable Configuration: Easy-to-modify sentiment categories and intents via JSON config
Project Structure
musora-sentiment-analysis/
βββ agents/
β βββ __init__.py
β βββ base_agent.py # Base class for all agents
β βββ language_detection_agent.py # Language detection agent
β βββ translation_agent.py # Translation agent
β βββ sentiment_analysis_agent.py # Sentiment analysis agent (parent context support)
βββ workflow/
β βββ __init__.py
β βββ comment_processor.py # LangGraph workflow orchestrator
βββ sql/
β βββ fetch_comments.sql # Query for social media comments (with parent join)
β βββ fetch_musora_comments.sql # Query for Musora internal comments (with parent join)
β βββ create_ml_features_table.sql # Schema for social media table (with parent fields)
β βββ init_musora_table.sql # Initialize empty Musora table (run first!)
β βββ create_musora_ml_features_table.sql # Full Musora schema with views (optional)
βββ config_files/
β βββ data_sources_config.json # Data source configuration (NEW)
β βββ sentiment_config.json # Configuration for agents and workflow
β βββ sentiment_analysis_config.json # Sentiment categories and intents
βββ logs/ # Processing logs (auto-created)
βββ LLM.py # LLM utility class
βββ SnowFlakeConnection.py # Snowflake connection handler
βββ main.py # Main execution script (multi-source support)
βββ requirements.txt # Python dependencies
βββ .env # Environment variables (not in git)
βββ README.md # This file
βββ CLAUDE.md # Detailed technical documentation
Setup
1. Install Dependencies
pip install -r requirements.txt
2. Configure Environment Variables
Ensure your .env file contains the required credentials:
# Snowflake
SNOWFLAKE_USER=your_user
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_ROLE=your_role
SNOWFLAKE_DATABASE=SOCIAL_MEDIA_DB
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_SCHEMA=ML_FEATURES
# OpenAI
OPENAI_API_KEY=your_openai_key
3. Create Snowflake Tables
Run the SQL scripts to create the output tables:
# Execute the SQL files in Snowflake
# For social media comments (if not already exists)
sql/create_ml_features_table.sql
# For Musora internal comments - INITIAL SETUP (First time only)
# This creates the empty table structure
sql/init_musora_table.sql
Note: Run init_musora_table.sql before the first Musora comments processing run. After that, you can optionally run create_musora_ml_features_table.sql to create the additional views if needed.
Usage
Basic Usage (Process All Data Sources)
Process unprocessed comments from all enabled data sources:
python main.py
This will:
- Process all enabled data sources (social media and Musora comments)
- Fetch only comments that haven't been processed yet
- Process them through the workflow using parallel workers (CPU count - 2, max 5)
- Each batch processes and stores to Snowflake independently
- Append new results to the existing tables (no overwrite)
Process Specific Data Source
Process only social media comments:
python main.py --data-source social_media
Process only Musora internal comments:
python main.py --data-source musora_comments
Process Limited Number of Comments
Limit applies per data source:
# Process 100 comments from each enabled data source
python main.py --limit 100
# Process 100 comments from only Musora source
python main.py --limit 100 --data-source musora_comments
Sequential Processing (Debug Mode)
For debugging purposes, use sequential processing:
python main.py --limit 100 --sequential
This processes all comments in a single batch, making it easier to debug issues.
First Run for New Data Source
For the first run of Musora comments:
First: Run the initialization SQL script in Snowflake:
-- Execute in Snowflake sql/init_musora_table.sqlThen: Run the processing with overwrite flag:
python main.py --overwrite --data-source musora_comments --limit 100
Why two steps?
- The fetch query checks for already-processed comments by querying the output table
- On first run, that table doesn't exist, causing an error
- The init script creates the empty table structure first
- Then processing can run normally
Warning: Overwrite will replace all existing data in the output table. Only use for initial table creation or when reprocessing from scratch.
Custom Configuration File
python main.py --config path/to/custom_config.json
Command-Line Arguments
--limit N: Process only N comments per data source (default: 10000)--overwrite: Overwrite existing Snowflake table (default: append mode)--config PATH: Custom configuration file path--sequential: Use sequential processing instead of parallel (for debugging)--data-source SOURCE: Process only specific data source (e.g., social_media, musora_comments)
Parallel Processing
The system uses multiprocessing to process comments in parallel:
Worker Calculation:
- Number of workers:
CPU count - 2(max 5 workers) - Leaves CPU cores available for system operations
- Example: 8-core system β 5 workers (capped at max)
Dynamic Batch Sizing:
- Batch size calculated as:
total_comments / num_workers - Minimum batch size: 20 comments
- Maximum batch size: 1000 comments
- Batches β€ 20 comments are not split
Independent Execution:
- Each batch runs in a separate process
- Batches store to Snowflake immediately upon completion
- No waiting for all batches to complete
- Failed batches don't affect successful ones
Performance:
- Expected speedup: ~1.8-4.5x depending on number of workers
- Real-time progress reporting as batches complete
- Processing time and average per comment displayed in summary
Incremental Processing
The pipeline is designed for incremental processing:
- Automatic deduplication: SQL query excludes comments already in
COMMENT_SENTIMENT_FEATURES - Append-only by default: New results are added without overwriting existing data
- Failed comment retry: Comments with
success=Falseare not stored and will be retried in future runs - Run regularly: Safe to run daily/weekly to process new comments
Configuration
Data Sources Configuration
The config_files/data_sources_config.json file defines available data sources:
{
"data_sources": {
"social_media": {
"name": "Social Media Comments",
"enabled": true,
"sql_query_file": "sql/fetch_comments.sql",
"output_config": {
"table_name": "COMMENT_SENTIMENT_FEATURES",
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES"
}
},
"musora_comments": {
"name": "Musora Internal Comments",
"enabled": true,
"sql_query_file": "sql/fetch_musora_comments.sql",
"output_config": {
"table_name": "MUSORA_COMMENT_SENTIMENT_FEATURES",
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES"
},
"additional_fields": [
"PERMALINK_URL",
"THUMBNAIL_URL"
]
}
}
}
To add a new data source: Simply add a new entry to this config file and create the corresponding SQL query file.
Agent Configuration
The config_files/sentiment_config.json file controls agent behavior:
{
"agents": {
"language_detection": {
"model": "gpt-5-nano",
"temperature": 0.0,
"max_retries": 3
},
"translation": {
"model": "gpt-5-nano",
"temperature": 0.3,
"max_retries": 3
},
"sentiment_analysis": {
"model": "gpt-5-nano",
"temperature": 0.2,
"max_retries": 3
}
},
"workflow": {
"description": "Batch size is calculated dynamically based on number of workers (min: 20, max: 1000)",
"parallel_processing": {
"enabled": true,
"worker_calculation": "CPU count - 2, max 5 workers",
"min_batch_size": 20,
"max_batch_size": 1000
}
},
"snowflake": {
"output_table": "COMMENT_SENTIMENT_FEATURES",
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES"
}
}
Note: Batch size is now calculated dynamically and no longer needs to be configured manually.
Sentiment Categories Configuration
The config_files/sentiment_analysis_config.json file defines sentiment categories and intents (easily extensible):
{
"sentiment_polarity": {
"categories": [
{"value": "very_positive", "label": "Very Positive", "description": "..."},
{"value": "positive", "label": "Positive", "description": "..."},
{"value": "neutral", "label": "Neutral", "description": "..."},
{"value": "negative", "label": "Negative", "description": "..."},
{"value": "very_negative", "label": "Very Negative", "description": "..."}
]
},
"intent": {
"categories": [
{"value": "praise", "label": "Praise", "description": "..."},
{"value": "question", "label": "Question", "description": "..."},
{"value": "request", "label": "Request", "description": "..."},
{"value": "feedback_negative", "label": "Negative Feedback", "description": "..."},
{"value": "suggestion", "label": "Suggestion", "description": "..."},
{"value": "humor_sarcasm", "label": "Humor/Sarcasm", "description": "..."},
{"value": "off_topic", "label": "Off Topic", "description": "..."},
{"value": "spam_selfpromo", "label": "Spam/Self-Promotion", "description": "..."}
]
},
"reply_policy": {
"requires_reply_intents": ["question", "request"],
"description": "Comments with these intents should be flagged for reply"
},
"intent_settings": {
"multi_label": true,
"description": "Intent can have multiple labels as a comment can express multiple intents"
}
}
Adding New Agents
The system is designed for easy extensibility. To add a new agent:
1. Create Agent Class
from agents.base_agent import BaseAgent
from typing import Dict, Any
class MyNewAgent(BaseAgent):
def __init__(self, config: Dict[str, Any], api_key: str):
super().__init__("MyNewAgent", config)
# Initialize your agent-specific components
def validate_input(self, input_data: Dict[str, Any]) -> bool:
# Validate input data
return True
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# Implement your agent logic
return {"success": True, "result": "..."}
2. Update Workflow
Add the agent to workflow/comment_processor.py:
# Add to CommentState TypedDict
new_agent_result: str
# Add node
workflow.add_node("my_new_agent", self._my_new_agent_node)
# Add edges
workflow.add_edge("translation", "my_new_agent")
workflow.add_edge("my_new_agent", END)
3. Update Configuration
Add agent config to sentiment_config.json:
{
"agents": {
"my_new_agent": {
"name": "MyNewAgent",
"model": "gpt-4o-mini",
"temperature": 0.5,
"max_retries": 3
}
}
}
Output Schema
Social Media Comments Table
Stored in SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES
Musora Comments Table
Stored in SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES
Common Columns (Both Tables)
| Column | Type | Description |
|---|---|---|
| COMMENT_SK | NUMBER | Surrogate key from source |
| COMMENT_ID | VARCHAR | Platform comment ID |
| ORIGINAL_TEXT | VARCHAR | Original comment text |
| PARENT_COMMENT_ID | VARCHAR | ID of parent comment if this is a reply |
| PARENT_COMMENT_TEXT | VARCHAR | Text of parent comment for context |
| DETECTED_LANGUAGE | VARCHAR | Detected language name |
| LANGUAGE_CODE | VARCHAR | ISO 639-1 code |
| IS_ENGLISH | BOOLEAN | Is comment in English |
| TRANSLATED_TEXT | VARCHAR | English translation |
| TRANSLATION_PERFORMED | BOOLEAN | Was translation performed |
| SENTIMENT_POLARITY | VARCHAR | Sentiment (very_positive, positive, neutral, negative, very_negative) |
| INTENT | VARCHAR | Multi-label intents (comma-separated) |
| REQUIRES_REPLY | BOOLEAN | Does comment need a response |
| SENTIMENT_CONFIDENCE | VARCHAR | Analysis confidence (high, medium, low) |
| PROCESSING_SUCCESS | BOOLEAN | Processing status |
| PROCESSED_AT | TIMESTAMP | Processing timestamp |
Musora-Specific Additional Columns
| Column | Type | Description |
|---|---|---|
| PERMALINK_URL | VARCHAR | Web URL path of the content |
| THUMBNAIL_URL | VARCHAR | Thumbnail URL of the content |
Available Views
Social Media:
VW_COMMENTS_REQUIRING_REPLY: Comments that need responses (includes parent comment info)VW_SENTIMENT_DISTRIBUTION: Sentiment and intent statistics by channel (includes reply comment count)VW_NON_ENGLISH_COMMENTS: Filtered view of non-English comments
Musora:
VW_MUSORA_COMMENTS_REQUIRING_REPLY: Musora comments needing responsesVW_MUSORA_SENTIMENT_DISTRIBUTION: Musora sentiment and intent statisticsVW_MUSORA_NON_ENGLISH_COMMENTS: Non-English Musora comments
Workflow Architecture
The system uses LangGraph to create a flexible, state-based workflow:
βββββββββββββββββββββββ
β Fetch Comments β
β from Snowflake β
β (Unprocessed Only) β
ββββββββββββ¬βββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Language Detection β
β Agent β
ββββββββββββ¬βββββββββββ
β
βΌ
ββββββ΄βββββ
β English?β
ββββββ¬βββββ
β
βββββββ΄ββββββ
β β
Yes No
β β
β βΌ
β βββββββββββββββ
β β Translation β
β β Agent β
β ββββββββ¬βββββββ
β β
βββββββ¬ββββββ
β
βΌ
ββββββββββββββββββββ
β Sentiment β
β Analysis Agent β
βββββββββββ¬βββββββββ
β
βΌ
ββββββββββββββββ
βStore Results β
βto Snowflake β
β(Append Mode) β
ββββββββββββββββ
Note: The fetch step automatically excludes comments already present in COMMENT_SENTIMENT_FEATURES, enabling incremental processing.
Logging
Logs are automatically created in the logs/ directory with timestamps:
logs/comment_processing_20251001_143022.log
Adding New Data Sources
The system is designed to make adding new data sources easy:
Steps to Add a New Source:
Update Configuration (
config_files/data_sources_config.json):"your_new_source": { "name": "Your New Source Name", "enabled": true, "sql_query_file": "sql/fetch_your_source.sql", "output_config": { "table_name": "YOUR_SOURCE_SENTIMENT_FEATURES", "database": "SOCIAL_MEDIA_DB", "schema": "ML_FEATURES" }, "additional_fields": ["FIELD1", "FIELD2"] // Optional }Create SQL Query File (
sql/fetch_your_source.sql):- Fetch comments with consistent column names
- Include self-join for parent comments if available
- Exclude already-processed comments (LEFT JOIN with output table)
Create Table Initialization Script (
sql/init_your_source_table.sql):- Creates empty table structure
- Base schema on
init_musora_table.sql - Add source-specific fields as needed
- Run this in Snowflake FIRST before processing
Create Full Schema (optional):
- Base schema on
create_musora_ml_features_table.sql - Include views and indexes
- Base schema on
Run First Time:
# Step 1: Run init script in Snowflake sql/init_your_source_table.sql # Step 2: Process first batch python main.py --overwrite --data-source your_new_source --limit 100
No code changes required!
Best Practices
- Testing: Always test with
--limitflag first (e.g.,--limit 100) - New Data Sources: Test new sources with
--sequential --limit 100first - Debugging: Use
--sequentialflag for easier debugging of processing issues - Incremental Processing: Run regularly without
--overwriteto process only new comments - Monitoring: Check logs for processing errors and batch completion
- Performance: Use default parallel mode for production workloads
- Extensibility: Follow the base agent pattern for consistency
- Error Handling: All agents include robust error handling
- Failed Comments: Review logs for failed comments - they'll be automatically retried in future runs
- Resource Management: System automatically adapts to available CPU resources
- Parent Comments: Ensure SQL queries include parent comment joins for best accuracy
Sentiment Analysis Features
Multi-Label Intent Classification
The system supports multi-label intent classification, meaning a single comment can have multiple intents:
- Example: "This is amazing! What scale are you using?" β
["praise", "question"] - Example: "Love this but can you make a tutorial on it?" β
["praise", "request"]
Context-Aware Analysis with Parent Comment Support
The sentiment analysis agent provides rich context understanding:
- Content Context: Uses the
content_descriptionfield to understand what the comment is about - Parent Comment Context (NEW): When analyzing reply comments, the system:
- Automatically detects when a comment is a reply
- Fetches the parent comment text from the database
- Includes parent comment in the LLM prompt
- Explicitly instructs the LLM that this is a reply comment
- Results in more accurate sentiment and intent classification
Example:
- Parent Comment: "Does anyone know how to play this riff?"
- Reply Comment: "Yes!"
- Without parent context: Might be classified as unclear/off-topic
- With parent context: Correctly classified as answering a question
This dramatically improves accuracy for:
- Short reply comments ("Yes", "Thanks!", "Agreed")
- Sarcastic replies (context crucial for understanding)
- Continuation of discussions
- Agreement/disagreement comments
Failure Handling & Reprocessing
Comments that fail sentiment analysis (missing critical fields like sentiment_polarity or intents) are:
- Marked as
success=Falsein the workflow - NOT stored in Snowflake
- Automatically available for reprocessing in future runs
This ensures only successfully processed comments are stored, while failed comments remain available for retry.
Incremental Processing & Deduplication
The pipeline automatically handles incremental processing:
- SQL-level deduplication: Query excludes comments already in
COMMENT_SENTIMENT_FEATURESusingLEFT JOIN - Automatic retry: Failed comments (not stored) are automatically retried on next run
- Append-only mode: Default behavior appends new records without overwriting
- Production-ready: Safe to run daily/weekly/monthly to process new comments
Scalable Configuration
To add or modify sentiment categories or intents:
- Edit
config_files/sentiment_analysis_config.json - Add/modify categories in the
sentiment_polarityorintentsections - Update
reply_policy.requires_reply_intentsif needed - No code changes required!
Future Extensions
The modular architecture supports easy addition of:
- Topic classification agent
- Entity extraction agent
- Engagement score prediction agent
- Named entity recognition agent
Simply create a new agent inheriting from BaseAgent and add it to the workflow graph.
Troubleshooting
Issue: "Object does not exist or not authorized" on First Run
Error: Object 'SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES' does not exist or not authorized
Cause: The fetch query tries to check for already-processed comments, but the output table doesn't exist yet on first run.
Solution:
- Run the initialization script first:
-- Execute in Snowflake sql/init_musora_table.sql - Then run the processing:
python main.py --overwrite --data-source musora_comments --limit 100
Issue: API Rate Limits
If hitting API rate limits, reduce the number of parallel workers or process fewer comments:
# Process fewer comments at a time
python main.py --limit 500
# Or use sequential mode
python main.py --sequential --limit 100
Issue: Memory Issues
Process in smaller batches using --limit:
python main.py --limit 500
Issue: Debugging Processing Errors
Use sequential mode to debug issues more easily:
python main.py --sequential --limit 50
This processes all comments in a single batch with clearer error messages.
Issue: Connection Timeouts
Check Snowflake credentials in .env and network connectivity.
Issue: Parallel Processing Not Working
If multiprocessing issues occur, use sequential mode:
python main.py --sequential
Performance
Expected Speedup
Parallel processing provides significant performance improvements:
- Sequential: 1x (baseline)
- 2 workers: ~1.8-1.9x faster
- 5 workers: ~4-4.5x faster
Speedup isn't perfectly linear due to:
- Snowflake connection overhead
- LLM API rate limits (shared across workers)
- I/O operations
Monitoring Performance
The processing summary includes:
- Total processing time
- Average time per comment
- Number of workers used
- Batch size calculations
- Failed batches (if any)
License
Internal use only - Musora sentiment analysis project.