# Comment Processing with Agentic Workflow A scalable, modular system for processing comments from multiple data sources using OpenAI API, LangChain, and LangGraph. The system performs language detection, translation, and context-aware sentiment analysis using an agentic workflow architecture. ## Data Sources Supported - **Social Media Comments**: External platforms (Facebook, Instagram, YouTube, etc.) - **Musora Internal Comments**: Comments from Musora internal applications - **Extensible Architecture**: Easily add new data sources via configuration ## Features - **Multi-Source Support**: Process comments from multiple data sources with a single codebase - **Configuration-Driven**: Add new data sources without code changes - **Parent Comment Context**: Automatically includes parent comment text for reply analysis - **Modular Agent Architecture**: Extensible base classes for easy addition of new agents - **Language Detection**: Hybrid approach using lingua library for fast English detection, with LLM fallback for non-English languages - **Translation**: High-quality translation for non-English comments using OpenAI models - **Context-Aware Sentiment Analysis**: - Uses content description for context - Includes parent comment text when analyzing replies - Multi-label intent classification - **LangGraph Workflow**: Flexible graph-based orchestration of agent operations - **Snowflake Integration**: Seamless data fetching and storage with source-specific tables - **Parallel Processing**: Multiprocessing support for high-performance batch processing - **Dynamic Batch Sizing**: Intelligent batch size calculation based on workload and available resources - **Independent Batch Execution**: Each batch processes and stores results independently - **Comprehensive Logging**: Detailed logging for monitoring and debugging - **Scalable Configuration**: Easy-to-modify sentiment categories and intents via JSON config ## Project Structure ``` musora-sentiment-analysis/ ├── agents/ │ ├── __init__.py │ ├── base_agent.py # Base class for all agents │ ├── language_detection_agent.py # Language detection agent │ ├── translation_agent.py # Translation agent │ └── sentiment_analysis_agent.py # Sentiment analysis agent (parent context support) ├── workflow/ │ ├── __init__.py │ └── comment_processor.py # LangGraph workflow orchestrator ├── sql/ │ ├── fetch_comments.sql # Query for social media comments (with parent join) │ ├── fetch_musora_comments.sql # Query for Musora internal comments (with parent join) │ ├── create_ml_features_table.sql # Schema for social media table (with parent fields) │ ├── init_musora_table.sql # Initialize empty Musora table (run first!) │ └── create_musora_ml_features_table.sql # Full Musora schema with views (optional) ├── config_files/ │ ├── data_sources_config.json # Data source configuration (NEW) │ ├── sentiment_config.json # Configuration for agents and workflow │ └── sentiment_analysis_config.json # Sentiment categories and intents ├── logs/ # Processing logs (auto-created) ├── LLM.py # LLM utility class ├── SnowFlakeConnection.py # Snowflake connection handler ├── main.py # Main execution script (multi-source support) ├── requirements.txt # Python dependencies ├── .env # Environment variables (not in git) ├── README.md # This file └── CLAUDE.md # Detailed technical documentation ``` ## Setup ### 1. Install Dependencies ```bash pip install -r requirements.txt ``` ### 2. Configure Environment Variables Ensure your `.env` file contains the required credentials: ```env # Snowflake SNOWFLAKE_USER=your_user SNOWFLAKE_PASSWORD=your_password SNOWFLAKE_ACCOUNT=your_account SNOWFLAKE_ROLE=your_role SNOWFLAKE_DATABASE=SOCIAL_MEDIA_DB SNOWFLAKE_WAREHOUSE=your_warehouse SNOWFLAKE_SCHEMA=ML_FEATURES # OpenAI OPENAI_API_KEY=your_openai_key ``` ### 3. Create Snowflake Tables Run the SQL scripts to create the output tables: ```bash # Execute the SQL files in Snowflake # For social media comments (if not already exists) sql/create_ml_features_table.sql # For Musora internal comments - INITIAL SETUP (First time only) # This creates the empty table structure sql/init_musora_table.sql ``` **Note**: Run `init_musora_table.sql` before the first Musora comments processing run. After that, you can optionally run `create_musora_ml_features_table.sql` to create the additional views if needed. ## Usage ### Basic Usage (Process All Data Sources) Process unprocessed comments from all enabled data sources: ```bash python main.py ``` This will: - Process all enabled data sources (social media and Musora comments) - Fetch only comments that haven't been processed yet - Process them through the workflow using parallel workers (CPU count - 2, max 5) - Each batch processes and stores to Snowflake independently - Append new results to the existing tables (no overwrite) ### Process Specific Data Source Process only social media comments: ```bash python main.py --data-source social_media ``` Process only Musora internal comments: ```bash python main.py --data-source musora_comments ``` ### Process Limited Number of Comments Limit applies per data source: ```bash # Process 100 comments from each enabled data source python main.py --limit 100 # Process 100 comments from only Musora source python main.py --limit 100 --data-source musora_comments ``` ### Sequential Processing (Debug Mode) For debugging purposes, use sequential processing: ```bash python main.py --limit 100 --sequential ``` This processes all comments in a single batch, making it easier to debug issues. ### First Run for New Data Source For the first run of Musora comments: 1. **First**: Run the initialization SQL script in Snowflake: ```sql -- Execute in Snowflake sql/init_musora_table.sql ``` 2. **Then**: Run the processing with overwrite flag: ```bash python main.py --overwrite --data-source musora_comments --limit 100 ``` **Why two steps?** - The fetch query checks for already-processed comments by querying the output table - On first run, that table doesn't exist, causing an error - The init script creates the empty table structure first - Then processing can run normally **Warning**: Overwrite will replace all existing data in the output table. Only use for initial table creation or when reprocessing from scratch. ### Custom Configuration File ```bash python main.py --config path/to/custom_config.json ``` ### Command-Line Arguments - `--limit N`: Process only N comments per data source (default: 10000) - `--overwrite`: Overwrite existing Snowflake table (default: append mode) - `--config PATH`: Custom configuration file path - `--sequential`: Use sequential processing instead of parallel (for debugging) - `--data-source SOURCE`: Process only specific data source (e.g., social_media, musora_comments) ### Parallel Processing The system uses multiprocessing to process comments in parallel: **Worker Calculation**: - Number of workers: `CPU count - 2` (max 5 workers) - Leaves CPU cores available for system operations - Example: 8-core system → 5 workers (capped at max) **Dynamic Batch Sizing**: - Batch size calculated as: `total_comments / num_workers` - Minimum batch size: 20 comments - Maximum batch size: 1000 comments - Batches ≤ 20 comments are not split **Independent Execution**: - Each batch runs in a separate process - Batches store to Snowflake immediately upon completion - No waiting for all batches to complete - Failed batches don't affect successful ones **Performance**: - Expected speedup: ~1.8-4.5x depending on number of workers - Real-time progress reporting as batches complete - Processing time and average per comment displayed in summary ### Incremental Processing The pipeline is designed for incremental processing: - **Automatic deduplication**: SQL query excludes comments already in `COMMENT_SENTIMENT_FEATURES` - **Append-only by default**: New results are added without overwriting existing data - **Failed comment retry**: Comments with `success=False` are not stored and will be retried in future runs - **Run regularly**: Safe to run daily/weekly to process new comments ## Configuration ### Data Sources Configuration The `config_files/data_sources_config.json` file defines available data sources: ```json { "data_sources": { "social_media": { "name": "Social Media Comments", "enabled": true, "sql_query_file": "sql/fetch_comments.sql", "output_config": { "table_name": "COMMENT_SENTIMENT_FEATURES", "database": "SOCIAL_MEDIA_DB", "schema": "ML_FEATURES" } }, "musora_comments": { "name": "Musora Internal Comments", "enabled": true, "sql_query_file": "sql/fetch_musora_comments.sql", "output_config": { "table_name": "MUSORA_COMMENT_SENTIMENT_FEATURES", "database": "SOCIAL_MEDIA_DB", "schema": "ML_FEATURES" }, "additional_fields": [ "PERMALINK_URL", "THUMBNAIL_URL" ] } } } ``` **To add a new data source**: Simply add a new entry to this config file and create the corresponding SQL query file. ### Agent Configuration The `config_files/sentiment_config.json` file controls agent behavior: ```json { "agents": { "language_detection": { "model": "gpt-5-nano", "temperature": 0.0, "max_retries": 3 }, "translation": { "model": "gpt-5-nano", "temperature": 0.3, "max_retries": 3 }, "sentiment_analysis": { "model": "gpt-5-nano", "temperature": 0.2, "max_retries": 3 } }, "workflow": { "description": "Batch size is calculated dynamically based on number of workers (min: 20, max: 1000)", "parallel_processing": { "enabled": true, "worker_calculation": "CPU count - 2, max 5 workers", "min_batch_size": 20, "max_batch_size": 1000 } }, "snowflake": { "output_table": "COMMENT_SENTIMENT_FEATURES", "database": "SOCIAL_MEDIA_DB", "schema": "ML_FEATURES" } } ``` **Note**: Batch size is now calculated dynamically and no longer needs to be configured manually. ### Sentiment Categories Configuration The `config_files/sentiment_analysis_config.json` file defines sentiment categories and intents (easily extensible): ```json { "sentiment_polarity": { "categories": [ {"value": "very_positive", "label": "Very Positive", "description": "..."}, {"value": "positive", "label": "Positive", "description": "..."}, {"value": "neutral", "label": "Neutral", "description": "..."}, {"value": "negative", "label": "Negative", "description": "..."}, {"value": "very_negative", "label": "Very Negative", "description": "..."} ] }, "intent": { "categories": [ {"value": "praise", "label": "Praise", "description": "..."}, {"value": "question", "label": "Question", "description": "..."}, {"value": "request", "label": "Request", "description": "..."}, {"value": "feedback_negative", "label": "Negative Feedback", "description": "..."}, {"value": "suggestion", "label": "Suggestion", "description": "..."}, {"value": "humor_sarcasm", "label": "Humor/Sarcasm", "description": "..."}, {"value": "off_topic", "label": "Off Topic", "description": "..."}, {"value": "spam_selfpromo", "label": "Spam/Self-Promotion", "description": "..."} ] }, "reply_policy": { "requires_reply_intents": ["question", "request"], "description": "Comments with these intents should be flagged for reply" }, "intent_settings": { "multi_label": true, "description": "Intent can have multiple labels as a comment can express multiple intents" } } ``` ## Adding New Agents The system is designed for easy extensibility. To add a new agent: ### 1. Create Agent Class ```python from agents.base_agent import BaseAgent from typing import Dict, Any class MyNewAgent(BaseAgent): def __init__(self, config: Dict[str, Any], api_key: str): super().__init__("MyNewAgent", config) # Initialize your agent-specific components def validate_input(self, input_data: Dict[str, Any]) -> bool: # Validate input data return True def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: # Implement your agent logic return {"success": True, "result": "..."} ``` ### 2. Update Workflow Add the agent to `workflow/comment_processor.py`: ```python # Add to CommentState TypedDict new_agent_result: str # Add node workflow.add_node("my_new_agent", self._my_new_agent_node) # Add edges workflow.add_edge("translation", "my_new_agent") workflow.add_edge("my_new_agent", END) ``` ### 3. Update Configuration Add agent config to `sentiment_config.json`: ```json { "agents": { "my_new_agent": { "name": "MyNewAgent", "model": "gpt-4o-mini", "temperature": 0.5, "max_retries": 3 } } } ``` ## Output Schema ### Social Media Comments Table Stored in `SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES` ### Musora Comments Table Stored in `SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES` ### Common Columns (Both Tables) | Column | Type | Description | |--------|------|-------------| | COMMENT_SK | NUMBER | Surrogate key from source | | COMMENT_ID | VARCHAR | Platform comment ID | | ORIGINAL_TEXT | VARCHAR | Original comment text | | **PARENT_COMMENT_ID** | **VARCHAR** | **ID of parent comment if this is a reply** | | **PARENT_COMMENT_TEXT** | **VARCHAR** | **Text of parent comment for context** | | DETECTED_LANGUAGE | VARCHAR | Detected language name | | LANGUAGE_CODE | VARCHAR | ISO 639-1 code | | IS_ENGLISH | BOOLEAN | Is comment in English | | TRANSLATED_TEXT | VARCHAR | English translation | | TRANSLATION_PERFORMED | BOOLEAN | Was translation performed | | SENTIMENT_POLARITY | VARCHAR | Sentiment (very_positive, positive, neutral, negative, very_negative) | | INTENT | VARCHAR | Multi-label intents (comma-separated) | | REQUIRES_REPLY | BOOLEAN | Does comment need a response | | SENTIMENT_CONFIDENCE | VARCHAR | Analysis confidence (high, medium, low) | | PROCESSING_SUCCESS | BOOLEAN | Processing status | | PROCESSED_AT | TIMESTAMP | Processing timestamp | ### Musora-Specific Additional Columns | Column | Type | Description | |--------|------|-------------| | PERMALINK_URL | VARCHAR | Web URL path of the content | | THUMBNAIL_URL | VARCHAR | Thumbnail URL of the content | ### Available Views **Social Media:** - `VW_COMMENTS_REQUIRING_REPLY`: Comments that need responses (includes parent comment info) - `VW_SENTIMENT_DISTRIBUTION`: Sentiment and intent statistics by channel (includes reply comment count) - `VW_NON_ENGLISH_COMMENTS`: Filtered view of non-English comments **Musora:** - `VW_MUSORA_COMMENTS_REQUIRING_REPLY`: Musora comments needing responses - `VW_MUSORA_SENTIMENT_DISTRIBUTION`: Musora sentiment and intent statistics - `VW_MUSORA_NON_ENGLISH_COMMENTS`: Non-English Musora comments ## Workflow Architecture The system uses LangGraph to create a flexible, state-based workflow: ``` ┌─────────────────────┐ │ Fetch Comments │ │ from Snowflake │ │ (Unprocessed Only) │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ Language Detection │ │ Agent │ └──────────┬──────────┘ │ ▼ ┌────┴────┐ │ English?│ └────┬────┘ │ ┌─────┴─────┐ │ │ Yes No │ │ │ ▼ │ ┌─────────────┐ │ │ Translation │ │ │ Agent │ │ └──────┬──────┘ │ │ └─────┬─────┘ │ ▼ ┌──────────────────┐ │ Sentiment │ │ Analysis Agent │ └─────────┬────────┘ │ ▼ ┌──────────────┐ │Store Results │ │to Snowflake │ │(Append Mode) │ └──────────────┘ ``` **Note**: The fetch step automatically excludes comments already present in `COMMENT_SENTIMENT_FEATURES`, enabling incremental processing. ## Logging Logs are automatically created in the `logs/` directory with timestamps: ``` logs/comment_processing_20251001_143022.log ``` ## Adding New Data Sources The system is designed to make adding new data sources easy: ### Steps to Add a New Source: 1. **Update Configuration** (`config_files/data_sources_config.json`): ```json "your_new_source": { "name": "Your New Source Name", "enabled": true, "sql_query_file": "sql/fetch_your_source.sql", "output_config": { "table_name": "YOUR_SOURCE_SENTIMENT_FEATURES", "database": "SOCIAL_MEDIA_DB", "schema": "ML_FEATURES" }, "additional_fields": ["FIELD1", "FIELD2"] // Optional } ``` 2. **Create SQL Query File** (`sql/fetch_your_source.sql`): - Fetch comments with consistent column names - Include self-join for parent comments if available - Exclude already-processed comments (LEFT JOIN with output table) 3. **Create Table Initialization Script** (`sql/init_your_source_table.sql`): - Creates empty table structure - Base schema on `init_musora_table.sql` - Add source-specific fields as needed - **Run this in Snowflake FIRST before processing** 4. **Create Full Schema** (optional): - Base schema on `create_musora_ml_features_table.sql` - Include views and indexes 5. **Run First Time**: ```bash # Step 1: Run init script in Snowflake sql/init_your_source_table.sql # Step 2: Process first batch python main.py --overwrite --data-source your_new_source --limit 100 ``` **No code changes required!** ## Best Practices 1. **Testing**: Always test with `--limit` flag first (e.g., `--limit 100`) 2. **New Data Sources**: Test new sources with `--sequential --limit 100` first 3. **Debugging**: Use `--sequential` flag for easier debugging of processing issues 4. **Incremental Processing**: Run regularly without `--overwrite` to process only new comments 5. **Monitoring**: Check logs for processing errors and batch completion 6. **Performance**: Use default parallel mode for production workloads 7. **Extensibility**: Follow the base agent pattern for consistency 8. **Error Handling**: All agents include robust error handling 9. **Failed Comments**: Review logs for failed comments - they'll be automatically retried in future runs 10. **Resource Management**: System automatically adapts to available CPU resources 11. **Parent Comments**: Ensure SQL queries include parent comment joins for best accuracy ## Sentiment Analysis Features ### Multi-Label Intent Classification The system supports **multi-label intent classification**, meaning a single comment can have multiple intents: - **Example**: "This is amazing! What scale are you using?" → `["praise", "question"]` - **Example**: "Love this but can you make a tutorial on it?" → `["praise", "request"]` ### Context-Aware Analysis with Parent Comment Support The sentiment analysis agent provides rich context understanding: 1. **Content Context**: Uses the `content_description` field to understand what the comment is about 2. **Parent Comment Context** (NEW): When analyzing reply comments, the system: - Automatically detects when a comment is a reply - Fetches the parent comment text from the database - Includes parent comment in the LLM prompt - Explicitly instructs the LLM that this is a reply comment - Results in more accurate sentiment and intent classification **Example**: - Parent Comment: "Does anyone know how to play this riff?" - Reply Comment: "Yes!" - Without parent context: Might be classified as unclear/off-topic - With parent context: Correctly classified as answering a question This dramatically improves accuracy for: - Short reply comments ("Yes", "Thanks!", "Agreed") - Sarcastic replies (context crucial for understanding) - Continuation of discussions - Agreement/disagreement comments ### Failure Handling & Reprocessing Comments that fail sentiment analysis (missing critical fields like sentiment_polarity or intents) are: - Marked as `success=False` in the workflow - **NOT stored in Snowflake** - **Automatically available for reprocessing** in future runs This ensures only successfully processed comments are stored, while failed comments remain available for retry. ### Incremental Processing & Deduplication The pipeline automatically handles incremental processing: - **SQL-level deduplication**: Query excludes comments already in `COMMENT_SENTIMENT_FEATURES` using `LEFT JOIN` - **Automatic retry**: Failed comments (not stored) are automatically retried on next run - **Append-only mode**: Default behavior appends new records without overwriting - **Production-ready**: Safe to run daily/weekly/monthly to process new comments ### Scalable Configuration To add or modify sentiment categories or intents: 1. Edit `config_files/sentiment_analysis_config.json` 2. Add/modify categories in the `sentiment_polarity` or `intent` sections 3. Update `reply_policy.requires_reply_intents` if needed 4. No code changes required! ## Future Extensions The modular architecture supports easy addition of: - Topic classification agent - Entity extraction agent - Engagement score prediction agent - Named entity recognition agent Simply create a new agent inheriting from `BaseAgent` and add it to the workflow graph. ## Troubleshooting ### Issue: "Object does not exist or not authorized" on First Run **Error**: `Object 'SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES' does not exist or not authorized` **Cause**: The fetch query tries to check for already-processed comments, but the output table doesn't exist yet on first run. **Solution**: 1. Run the initialization script first: ```sql -- Execute in Snowflake sql/init_musora_table.sql ``` 2. Then run the processing: ```bash python main.py --overwrite --data-source musora_comments --limit 100 ``` ### Issue: API Rate Limits If hitting API rate limits, reduce the number of parallel workers or process fewer comments: ```bash # Process fewer comments at a time python main.py --limit 500 # Or use sequential mode python main.py --sequential --limit 100 ``` ### Issue: Memory Issues Process in smaller batches using `--limit`: ```bash python main.py --limit 500 ``` ### Issue: Debugging Processing Errors Use sequential mode to debug issues more easily: ```bash python main.py --sequential --limit 50 ``` This processes all comments in a single batch with clearer error messages. ### Issue: Connection Timeouts Check Snowflake credentials in `.env` and network connectivity. ### Issue: Parallel Processing Not Working If multiprocessing issues occur, use sequential mode: ```bash python main.py --sequential ``` ## Performance ### Expected Speedup Parallel processing provides significant performance improvements: - **Sequential**: 1x (baseline) - **2 workers**: ~1.8-1.9x faster - **5 workers**: ~4-4.5x faster Speedup isn't perfectly linear due to: - Snowflake connection overhead - LLM API rate limits (shared across workers) - I/O operations ### Monitoring Performance The processing summary includes: - Total processing time - Average time per comment - Number of workers used - Batch size calculations - Failed batches (if any) ## License Internal use only - Musora sentiment analysis project.