Danialebrat's picture
Deploying sentiment analysis project
9858829
# Comment Processing with Agentic Workflow
A scalable, modular system for processing comments from multiple data sources using OpenAI API, LangChain, and LangGraph. The system performs language detection, translation, and context-aware sentiment analysis using an agentic workflow architecture.
## Data Sources Supported
- **Social Media Comments**: External platforms (Facebook, Instagram, YouTube, etc.)
- **Musora Internal Comments**: Comments from Musora internal applications
- **Extensible Architecture**: Easily add new data sources via configuration
## Features
- **Multi-Source Support**: Process comments from multiple data sources with a single codebase
- **Configuration-Driven**: Add new data sources without code changes
- **Parent Comment Context**: Automatically includes parent comment text for reply analysis
- **Modular Agent Architecture**: Extensible base classes for easy addition of new agents
- **Language Detection**: Hybrid approach using lingua library for fast English detection, with LLM fallback for non-English languages
- **Translation**: High-quality translation for non-English comments using OpenAI models
- **Context-Aware Sentiment Analysis**:
- Uses content description for context
- Includes parent comment text when analyzing replies
- Multi-label intent classification
- **LangGraph Workflow**: Flexible graph-based orchestration of agent operations
- **Snowflake Integration**: Seamless data fetching and storage with source-specific tables
- **Parallel Processing**: Multiprocessing support for high-performance batch processing
- **Dynamic Batch Sizing**: Intelligent batch size calculation based on workload and available resources
- **Independent Batch Execution**: Each batch processes and stores results independently
- **Comprehensive Logging**: Detailed logging for monitoring and debugging
- **Scalable Configuration**: Easy-to-modify sentiment categories and intents via JSON config
## Project Structure
```
musora-sentiment-analysis/
β”œβ”€β”€ agents/
β”‚ β”œβ”€β”€ __init__.py
β”‚ β”œβ”€β”€ base_agent.py # Base class for all agents
β”‚ β”œβ”€β”€ language_detection_agent.py # Language detection agent
β”‚ β”œβ”€β”€ translation_agent.py # Translation agent
β”‚ └── sentiment_analysis_agent.py # Sentiment analysis agent (parent context support)
β”œβ”€β”€ workflow/
β”‚ β”œβ”€β”€ __init__.py
β”‚ └── comment_processor.py # LangGraph workflow orchestrator
β”œβ”€β”€ sql/
β”‚ β”œβ”€β”€ fetch_comments.sql # Query for social media comments (with parent join)
β”‚ β”œβ”€β”€ fetch_musora_comments.sql # Query for Musora internal comments (with parent join)
β”‚ β”œβ”€β”€ create_ml_features_table.sql # Schema for social media table (with parent fields)
β”‚ β”œβ”€β”€ init_musora_table.sql # Initialize empty Musora table (run first!)
β”‚ └── create_musora_ml_features_table.sql # Full Musora schema with views (optional)
β”œβ”€β”€ config_files/
β”‚ β”œβ”€β”€ data_sources_config.json # Data source configuration (NEW)
β”‚ β”œβ”€β”€ sentiment_config.json # Configuration for agents and workflow
β”‚ └── sentiment_analysis_config.json # Sentiment categories and intents
β”œβ”€β”€ logs/ # Processing logs (auto-created)
β”œβ”€β”€ LLM.py # LLM utility class
β”œβ”€β”€ SnowFlakeConnection.py # Snowflake connection handler
β”œβ”€β”€ main.py # Main execution script (multi-source support)
β”œβ”€β”€ requirements.txt # Python dependencies
β”œβ”€β”€ .env # Environment variables (not in git)
β”œβ”€β”€ README.md # This file
└── CLAUDE.md # Detailed technical documentation
```
## Setup
### 1. Install Dependencies
```bash
pip install -r requirements.txt
```
### 2. Configure Environment Variables
Ensure your `.env` file contains the required credentials:
```env
# Snowflake
SNOWFLAKE_USER=your_user
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_ROLE=your_role
SNOWFLAKE_DATABASE=SOCIAL_MEDIA_DB
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_SCHEMA=ML_FEATURES
# OpenAI
OPENAI_API_KEY=your_openai_key
```
### 3. Create Snowflake Tables
Run the SQL scripts to create the output tables:
```bash
# Execute the SQL files in Snowflake
# For social media comments (if not already exists)
sql/create_ml_features_table.sql
# For Musora internal comments - INITIAL SETUP (First time only)
# This creates the empty table structure
sql/init_musora_table.sql
```
**Note**: Run `init_musora_table.sql` before the first Musora comments processing run. After that, you can optionally run `create_musora_ml_features_table.sql` to create the additional views if needed.
## Usage
### Basic Usage (Process All Data Sources)
Process unprocessed comments from all enabled data sources:
```bash
python main.py
```
This will:
- Process all enabled data sources (social media and Musora comments)
- Fetch only comments that haven't been processed yet
- Process them through the workflow using parallel workers (CPU count - 2, max 5)
- Each batch processes and stores to Snowflake independently
- Append new results to the existing tables (no overwrite)
### Process Specific Data Source
Process only social media comments:
```bash
python main.py --data-source social_media
```
Process only Musora internal comments:
```bash
python main.py --data-source musora_comments
```
### Process Limited Number of Comments
Limit applies per data source:
```bash
# Process 100 comments from each enabled data source
python main.py --limit 100
# Process 100 comments from only Musora source
python main.py --limit 100 --data-source musora_comments
```
### Sequential Processing (Debug Mode)
For debugging purposes, use sequential processing:
```bash
python main.py --limit 100 --sequential
```
This processes all comments in a single batch, making it easier to debug issues.
### First Run for New Data Source
For the first run of Musora comments:
1. **First**: Run the initialization SQL script in Snowflake:
```sql
-- Execute in Snowflake
sql/init_musora_table.sql
```
2. **Then**: Run the processing with overwrite flag:
```bash
python main.py --overwrite --data-source musora_comments --limit 100
```
**Why two steps?**
- The fetch query checks for already-processed comments by querying the output table
- On first run, that table doesn't exist, causing an error
- The init script creates the empty table structure first
- Then processing can run normally
**Warning**: Overwrite will replace all existing data in the output table. Only use for initial table creation or when reprocessing from scratch.
### Custom Configuration File
```bash
python main.py --config path/to/custom_config.json
```
### Command-Line Arguments
- `--limit N`: Process only N comments per data source (default: 10000)
- `--overwrite`: Overwrite existing Snowflake table (default: append mode)
- `--config PATH`: Custom configuration file path
- `--sequential`: Use sequential processing instead of parallel (for debugging)
- `--data-source SOURCE`: Process only specific data source (e.g., social_media, musora_comments)
### Parallel Processing
The system uses multiprocessing to process comments in parallel:
**Worker Calculation**:
- Number of workers: `CPU count - 2` (max 5 workers)
- Leaves CPU cores available for system operations
- Example: 8-core system β†’ 5 workers (capped at max)
**Dynamic Batch Sizing**:
- Batch size calculated as: `total_comments / num_workers`
- Minimum batch size: 20 comments
- Maximum batch size: 1000 comments
- Batches ≀ 20 comments are not split
**Independent Execution**:
- Each batch runs in a separate process
- Batches store to Snowflake immediately upon completion
- No waiting for all batches to complete
- Failed batches don't affect successful ones
**Performance**:
- Expected speedup: ~1.8-4.5x depending on number of workers
- Real-time progress reporting as batches complete
- Processing time and average per comment displayed in summary
### Incremental Processing
The pipeline is designed for incremental processing:
- **Automatic deduplication**: SQL query excludes comments already in `COMMENT_SENTIMENT_FEATURES`
- **Append-only by default**: New results are added without overwriting existing data
- **Failed comment retry**: Comments with `success=False` are not stored and will be retried in future runs
- **Run regularly**: Safe to run daily/weekly to process new comments
## Configuration
### Data Sources Configuration
The `config_files/data_sources_config.json` file defines available data sources:
```json
{
"data_sources": {
"social_media": {
"name": "Social Media Comments",
"enabled": true,
"sql_query_file": "sql/fetch_comments.sql",
"output_config": {
"table_name": "COMMENT_SENTIMENT_FEATURES",
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES"
}
},
"musora_comments": {
"name": "Musora Internal Comments",
"enabled": true,
"sql_query_file": "sql/fetch_musora_comments.sql",
"output_config": {
"table_name": "MUSORA_COMMENT_SENTIMENT_FEATURES",
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES"
},
"additional_fields": [
"PERMALINK_URL",
"THUMBNAIL_URL"
]
}
}
}
```
**To add a new data source**: Simply add a new entry to this config file and create the corresponding SQL query file.
### Agent Configuration
The `config_files/sentiment_config.json` file controls agent behavior:
```json
{
"agents": {
"language_detection": {
"model": "gpt-5-nano",
"temperature": 0.0,
"max_retries": 3
},
"translation": {
"model": "gpt-5-nano",
"temperature": 0.3,
"max_retries": 3
},
"sentiment_analysis": {
"model": "gpt-5-nano",
"temperature": 0.2,
"max_retries": 3
}
},
"workflow": {
"description": "Batch size is calculated dynamically based on number of workers (min: 20, max: 1000)",
"parallel_processing": {
"enabled": true,
"worker_calculation": "CPU count - 2, max 5 workers",
"min_batch_size": 20,
"max_batch_size": 1000
}
},
"snowflake": {
"output_table": "COMMENT_SENTIMENT_FEATURES",
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES"
}
}
```
**Note**: Batch size is now calculated dynamically and no longer needs to be configured manually.
### Sentiment Categories Configuration
The `config_files/sentiment_analysis_config.json` file defines sentiment categories and intents (easily extensible):
```json
{
"sentiment_polarity": {
"categories": [
{"value": "very_positive", "label": "Very Positive", "description": "..."},
{"value": "positive", "label": "Positive", "description": "..."},
{"value": "neutral", "label": "Neutral", "description": "..."},
{"value": "negative", "label": "Negative", "description": "..."},
{"value": "very_negative", "label": "Very Negative", "description": "..."}
]
},
"intent": {
"categories": [
{"value": "praise", "label": "Praise", "description": "..."},
{"value": "question", "label": "Question", "description": "..."},
{"value": "request", "label": "Request", "description": "..."},
{"value": "feedback_negative", "label": "Negative Feedback", "description": "..."},
{"value": "suggestion", "label": "Suggestion", "description": "..."},
{"value": "humor_sarcasm", "label": "Humor/Sarcasm", "description": "..."},
{"value": "off_topic", "label": "Off Topic", "description": "..."},
{"value": "spam_selfpromo", "label": "Spam/Self-Promotion", "description": "..."}
]
},
"reply_policy": {
"requires_reply_intents": ["question", "request"],
"description": "Comments with these intents should be flagged for reply"
},
"intent_settings": {
"multi_label": true,
"description": "Intent can have multiple labels as a comment can express multiple intents"
}
}
```
## Adding New Agents
The system is designed for easy extensibility. To add a new agent:
### 1. Create Agent Class
```python
from agents.base_agent import BaseAgent
from typing import Dict, Any
class MyNewAgent(BaseAgent):
def __init__(self, config: Dict[str, Any], api_key: str):
super().__init__("MyNewAgent", config)
# Initialize your agent-specific components
def validate_input(self, input_data: Dict[str, Any]) -> bool:
# Validate input data
return True
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# Implement your agent logic
return {"success": True, "result": "..."}
```
### 2. Update Workflow
Add the agent to `workflow/comment_processor.py`:
```python
# Add to CommentState TypedDict
new_agent_result: str
# Add node
workflow.add_node("my_new_agent", self._my_new_agent_node)
# Add edges
workflow.add_edge("translation", "my_new_agent")
workflow.add_edge("my_new_agent", END)
```
### 3. Update Configuration
Add agent config to `sentiment_config.json`:
```json
{
"agents": {
"my_new_agent": {
"name": "MyNewAgent",
"model": "gpt-4o-mini",
"temperature": 0.5,
"max_retries": 3
}
}
}
```
## Output Schema
### Social Media Comments Table
Stored in `SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES`
### Musora Comments Table
Stored in `SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES`
### Common Columns (Both Tables)
| Column | Type | Description |
|--------|------|-------------|
| COMMENT_SK | NUMBER | Surrogate key from source |
| COMMENT_ID | VARCHAR | Platform comment ID |
| ORIGINAL_TEXT | VARCHAR | Original comment text |
| **PARENT_COMMENT_ID** | **VARCHAR** | **ID of parent comment if this is a reply** |
| **PARENT_COMMENT_TEXT** | **VARCHAR** | **Text of parent comment for context** |
| DETECTED_LANGUAGE | VARCHAR | Detected language name |
| LANGUAGE_CODE | VARCHAR | ISO 639-1 code |
| IS_ENGLISH | BOOLEAN | Is comment in English |
| TRANSLATED_TEXT | VARCHAR | English translation |
| TRANSLATION_PERFORMED | BOOLEAN | Was translation performed |
| SENTIMENT_POLARITY | VARCHAR | Sentiment (very_positive, positive, neutral, negative, very_negative) |
| INTENT | VARCHAR | Multi-label intents (comma-separated) |
| REQUIRES_REPLY | BOOLEAN | Does comment need a response |
| SENTIMENT_CONFIDENCE | VARCHAR | Analysis confidence (high, medium, low) |
| PROCESSING_SUCCESS | BOOLEAN | Processing status |
| PROCESSED_AT | TIMESTAMP | Processing timestamp |
### Musora-Specific Additional Columns
| Column | Type | Description |
|--------|------|-------------|
| PERMALINK_URL | VARCHAR | Web URL path of the content |
| THUMBNAIL_URL | VARCHAR | Thumbnail URL of the content |
### Available Views
**Social Media:**
- `VW_COMMENTS_REQUIRING_REPLY`: Comments that need responses (includes parent comment info)
- `VW_SENTIMENT_DISTRIBUTION`: Sentiment and intent statistics by channel (includes reply comment count)
- `VW_NON_ENGLISH_COMMENTS`: Filtered view of non-English comments
**Musora:**
- `VW_MUSORA_COMMENTS_REQUIRING_REPLY`: Musora comments needing responses
- `VW_MUSORA_SENTIMENT_DISTRIBUTION`: Musora sentiment and intent statistics
- `VW_MUSORA_NON_ENGLISH_COMMENTS`: Non-English Musora comments
## Workflow Architecture
The system uses LangGraph to create a flexible, state-based workflow:
```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Fetch Comments β”‚
β”‚ from Snowflake β”‚
β”‚ (Unprocessed Only) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Language Detection β”‚
β”‚ Agent β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
β”‚ English?β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”
β”‚ β”‚
Yes No
β”‚ β”‚
β”‚ β–Ό
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ β”‚ Translation β”‚
β”‚ β”‚ Agent β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚
β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Sentiment β”‚
β”‚ Analysis Agent β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Store Results β”‚
β”‚to Snowflake β”‚
β”‚(Append Mode) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```
**Note**: The fetch step automatically excludes comments already present in `COMMENT_SENTIMENT_FEATURES`, enabling incremental processing.
## Logging
Logs are automatically created in the `logs/` directory with timestamps:
```
logs/comment_processing_20251001_143022.log
```
## Adding New Data Sources
The system is designed to make adding new data sources easy:
### Steps to Add a New Source:
1. **Update Configuration** (`config_files/data_sources_config.json`):
```json
"your_new_source": {
"name": "Your New Source Name",
"enabled": true,
"sql_query_file": "sql/fetch_your_source.sql",
"output_config": {
"table_name": "YOUR_SOURCE_SENTIMENT_FEATURES",
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES"
},
"additional_fields": ["FIELD1", "FIELD2"] // Optional
}
```
2. **Create SQL Query File** (`sql/fetch_your_source.sql`):
- Fetch comments with consistent column names
- Include self-join for parent comments if available
- Exclude already-processed comments (LEFT JOIN with output table)
3. **Create Table Initialization Script** (`sql/init_your_source_table.sql`):
- Creates empty table structure
- Base schema on `init_musora_table.sql`
- Add source-specific fields as needed
- **Run this in Snowflake FIRST before processing**
4. **Create Full Schema** (optional):
- Base schema on `create_musora_ml_features_table.sql`
- Include views and indexes
5. **Run First Time**:
```bash
# Step 1: Run init script in Snowflake
sql/init_your_source_table.sql
# Step 2: Process first batch
python main.py --overwrite --data-source your_new_source --limit 100
```
**No code changes required!**
## Best Practices
1. **Testing**: Always test with `--limit` flag first (e.g., `--limit 100`)
2. **New Data Sources**: Test new sources with `--sequential --limit 100` first
3. **Debugging**: Use `--sequential` flag for easier debugging of processing issues
4. **Incremental Processing**: Run regularly without `--overwrite` to process only new comments
5. **Monitoring**: Check logs for processing errors and batch completion
6. **Performance**: Use default parallel mode for production workloads
7. **Extensibility**: Follow the base agent pattern for consistency
8. **Error Handling**: All agents include robust error handling
9. **Failed Comments**: Review logs for failed comments - they'll be automatically retried in future runs
10. **Resource Management**: System automatically adapts to available CPU resources
11. **Parent Comments**: Ensure SQL queries include parent comment joins for best accuracy
## Sentiment Analysis Features
### Multi-Label Intent Classification
The system supports **multi-label intent classification**, meaning a single comment can have multiple intents:
- **Example**: "This is amazing! What scale are you using?" β†’ `["praise", "question"]`
- **Example**: "Love this but can you make a tutorial on it?" β†’ `["praise", "request"]`
### Context-Aware Analysis with Parent Comment Support
The sentiment analysis agent provides rich context understanding:
1. **Content Context**: Uses the `content_description` field to understand what the comment is about
2. **Parent Comment Context** (NEW): When analyzing reply comments, the system:
- Automatically detects when a comment is a reply
- Fetches the parent comment text from the database
- Includes parent comment in the LLM prompt
- Explicitly instructs the LLM that this is a reply comment
- Results in more accurate sentiment and intent classification
**Example**:
- Parent Comment: "Does anyone know how to play this riff?"
- Reply Comment: "Yes!"
- Without parent context: Might be classified as unclear/off-topic
- With parent context: Correctly classified as answering a question
This dramatically improves accuracy for:
- Short reply comments ("Yes", "Thanks!", "Agreed")
- Sarcastic replies (context crucial for understanding)
- Continuation of discussions
- Agreement/disagreement comments
### Failure Handling & Reprocessing
Comments that fail sentiment analysis (missing critical fields like sentiment_polarity or intents) are:
- Marked as `success=False` in the workflow
- **NOT stored in Snowflake**
- **Automatically available for reprocessing** in future runs
This ensures only successfully processed comments are stored, while failed comments remain available for retry.
### Incremental Processing & Deduplication
The pipeline automatically handles incremental processing:
- **SQL-level deduplication**: Query excludes comments already in `COMMENT_SENTIMENT_FEATURES` using `LEFT JOIN`
- **Automatic retry**: Failed comments (not stored) are automatically retried on next run
- **Append-only mode**: Default behavior appends new records without overwriting
- **Production-ready**: Safe to run daily/weekly/monthly to process new comments
### Scalable Configuration
To add or modify sentiment categories or intents:
1. Edit `config_files/sentiment_analysis_config.json`
2. Add/modify categories in the `sentiment_polarity` or `intent` sections
3. Update `reply_policy.requires_reply_intents` if needed
4. No code changes required!
## Future Extensions
The modular architecture supports easy addition of:
- Topic classification agent
- Entity extraction agent
- Engagement score prediction agent
- Named entity recognition agent
Simply create a new agent inheriting from `BaseAgent` and add it to the workflow graph.
## Troubleshooting
### Issue: "Object does not exist or not authorized" on First Run
**Error**: `Object 'SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES' does not exist or not authorized`
**Cause**: The fetch query tries to check for already-processed comments, but the output table doesn't exist yet on first run.
**Solution**:
1. Run the initialization script first:
```sql
-- Execute in Snowflake
sql/init_musora_table.sql
```
2. Then run the processing:
```bash
python main.py --overwrite --data-source musora_comments --limit 100
```
### Issue: API Rate Limits
If hitting API rate limits, reduce the number of parallel workers or process fewer comments:
```bash
# Process fewer comments at a time
python main.py --limit 500
# Or use sequential mode
python main.py --sequential --limit 100
```
### Issue: Memory Issues
Process in smaller batches using `--limit`:
```bash
python main.py --limit 500
```
### Issue: Debugging Processing Errors
Use sequential mode to debug issues more easily:
```bash
python main.py --sequential --limit 50
```
This processes all comments in a single batch with clearer error messages.
### Issue: Connection Timeouts
Check Snowflake credentials in `.env` and network connectivity.
### Issue: Parallel Processing Not Working
If multiprocessing issues occur, use sequential mode:
```bash
python main.py --sequential
```
## Performance
### Expected Speedup
Parallel processing provides significant performance improvements:
- **Sequential**: 1x (baseline)
- **2 workers**: ~1.8-1.9x faster
- **5 workers**: ~4-4.5x faster
Speedup isn't perfectly linear due to:
- Snowflake connection overhead
- LLM API rate limits (shared across workers)
- I/O operations
### Monitoring Performance
The processing summary includes:
- Total processing time
- Average time per comment
- Number of workers used
- Batch size calculations
- Failed batches (if any)
## License
Internal use only - Musora sentiment analysis project.