Danialebrat's picture
Deploying sentiment analysis project
9858829

A newer version of the Streamlit SDK is available: 1.55.0

Upgrade

Comment Processing with Agentic Workflow

A scalable, modular system for processing comments from multiple data sources using OpenAI API, LangChain, and LangGraph. The system performs language detection, translation, and context-aware sentiment analysis using an agentic workflow architecture.

Data Sources Supported

  • Social Media Comments: External platforms (Facebook, Instagram, YouTube, etc.)
  • Musora Internal Comments: Comments from Musora internal applications
  • Extensible Architecture: Easily add new data sources via configuration

Features

  • Multi-Source Support: Process comments from multiple data sources with a single codebase
  • Configuration-Driven: Add new data sources without code changes
  • Parent Comment Context: Automatically includes parent comment text for reply analysis
  • Modular Agent Architecture: Extensible base classes for easy addition of new agents
  • Language Detection: Hybrid approach using lingua library for fast English detection, with LLM fallback for non-English languages
  • Translation: High-quality translation for non-English comments using OpenAI models
  • Context-Aware Sentiment Analysis:
    • Uses content description for context
    • Includes parent comment text when analyzing replies
    • Multi-label intent classification
  • LangGraph Workflow: Flexible graph-based orchestration of agent operations
  • Snowflake Integration: Seamless data fetching and storage with source-specific tables
  • Parallel Processing: Multiprocessing support for high-performance batch processing
  • Dynamic Batch Sizing: Intelligent batch size calculation based on workload and available resources
  • Independent Batch Execution: Each batch processes and stores results independently
  • Comprehensive Logging: Detailed logging for monitoring and debugging
  • Scalable Configuration: Easy-to-modify sentiment categories and intents via JSON config

Project Structure

musora-sentiment-analysis/
β”œβ”€β”€ agents/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ base_agent.py                    # Base class for all agents
β”‚   β”œβ”€β”€ language_detection_agent.py      # Language detection agent
β”‚   β”œβ”€β”€ translation_agent.py             # Translation agent
β”‚   └── sentiment_analysis_agent.py      # Sentiment analysis agent (parent context support)
β”œβ”€β”€ workflow/
β”‚   β”œβ”€β”€ __init__.py
β”‚   └── comment_processor.py             # LangGraph workflow orchestrator
β”œβ”€β”€ sql/
β”‚   β”œβ”€β”€ fetch_comments.sql               # Query for social media comments (with parent join)
β”‚   β”œβ”€β”€ fetch_musora_comments.sql        # Query for Musora internal comments (with parent join)
β”‚   β”œβ”€β”€ create_ml_features_table.sql     # Schema for social media table (with parent fields)
β”‚   β”œβ”€β”€ init_musora_table.sql            # Initialize empty Musora table (run first!)
β”‚   └── create_musora_ml_features_table.sql  # Full Musora schema with views (optional)
β”œβ”€β”€ config_files/
β”‚   β”œβ”€β”€ data_sources_config.json         # Data source configuration (NEW)
β”‚   β”œβ”€β”€ sentiment_config.json            # Configuration for agents and workflow
β”‚   └── sentiment_analysis_config.json   # Sentiment categories and intents
β”œβ”€β”€ logs/                                 # Processing logs (auto-created)
β”œβ”€β”€ LLM.py                               # LLM utility class
β”œβ”€β”€ SnowFlakeConnection.py               # Snowflake connection handler
β”œβ”€β”€ main.py                              # Main execution script (multi-source support)
β”œβ”€β”€ requirements.txt                      # Python dependencies
β”œβ”€β”€ .env                                 # Environment variables (not in git)
β”œβ”€β”€ README.md                            # This file
└── CLAUDE.md                            # Detailed technical documentation

Setup

1. Install Dependencies

pip install -r requirements.txt

2. Configure Environment Variables

Ensure your .env file contains the required credentials:

# Snowflake
SNOWFLAKE_USER=your_user
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_ROLE=your_role
SNOWFLAKE_DATABASE=SOCIAL_MEDIA_DB
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_SCHEMA=ML_FEATURES

# OpenAI
OPENAI_API_KEY=your_openai_key

3. Create Snowflake Tables

Run the SQL scripts to create the output tables:

# Execute the SQL files in Snowflake
# For social media comments (if not already exists)
sql/create_ml_features_table.sql

# For Musora internal comments - INITIAL SETUP (First time only)
# This creates the empty table structure
sql/init_musora_table.sql

Note: Run init_musora_table.sql before the first Musora comments processing run. After that, you can optionally run create_musora_ml_features_table.sql to create the additional views if needed.

Usage

Basic Usage (Process All Data Sources)

Process unprocessed comments from all enabled data sources:

python main.py

This will:

  • Process all enabled data sources (social media and Musora comments)
  • Fetch only comments that haven't been processed yet
  • Process them through the workflow using parallel workers (CPU count - 2, max 5)
  • Each batch processes and stores to Snowflake independently
  • Append new results to the existing tables (no overwrite)

Process Specific Data Source

Process only social media comments:

python main.py --data-source social_media

Process only Musora internal comments:

python main.py --data-source musora_comments

Process Limited Number of Comments

Limit applies per data source:

# Process 100 comments from each enabled data source
python main.py --limit 100

# Process 100 comments from only Musora source
python main.py --limit 100 --data-source musora_comments

Sequential Processing (Debug Mode)

For debugging purposes, use sequential processing:

python main.py --limit 100 --sequential

This processes all comments in a single batch, making it easier to debug issues.

First Run for New Data Source

For the first run of Musora comments:

  1. First: Run the initialization SQL script in Snowflake:

    -- Execute in Snowflake
    sql/init_musora_table.sql
    
  2. Then: Run the processing with overwrite flag:

    python main.py --overwrite --data-source musora_comments --limit 100
    

Why two steps?

  • The fetch query checks for already-processed comments by querying the output table
  • On first run, that table doesn't exist, causing an error
  • The init script creates the empty table structure first
  • Then processing can run normally

Warning: Overwrite will replace all existing data in the output table. Only use for initial table creation or when reprocessing from scratch.

Custom Configuration File

python main.py --config path/to/custom_config.json

Command-Line Arguments

  • --limit N: Process only N comments per data source (default: 10000)
  • --overwrite: Overwrite existing Snowflake table (default: append mode)
  • --config PATH: Custom configuration file path
  • --sequential: Use sequential processing instead of parallel (for debugging)
  • --data-source SOURCE: Process only specific data source (e.g., social_media, musora_comments)

Parallel Processing

The system uses multiprocessing to process comments in parallel:

Worker Calculation:

  • Number of workers: CPU count - 2 (max 5 workers)
  • Leaves CPU cores available for system operations
  • Example: 8-core system β†’ 5 workers (capped at max)

Dynamic Batch Sizing:

  • Batch size calculated as: total_comments / num_workers
  • Minimum batch size: 20 comments
  • Maximum batch size: 1000 comments
  • Batches ≀ 20 comments are not split

Independent Execution:

  • Each batch runs in a separate process
  • Batches store to Snowflake immediately upon completion
  • No waiting for all batches to complete
  • Failed batches don't affect successful ones

Performance:

  • Expected speedup: ~1.8-4.5x depending on number of workers
  • Real-time progress reporting as batches complete
  • Processing time and average per comment displayed in summary

Incremental Processing

The pipeline is designed for incremental processing:

  • Automatic deduplication: SQL query excludes comments already in COMMENT_SENTIMENT_FEATURES
  • Append-only by default: New results are added without overwriting existing data
  • Failed comment retry: Comments with success=False are not stored and will be retried in future runs
  • Run regularly: Safe to run daily/weekly to process new comments

Configuration

Data Sources Configuration

The config_files/data_sources_config.json file defines available data sources:

{
    "data_sources": {
        "social_media": {
            "name": "Social Media Comments",
            "enabled": true,
            "sql_query_file": "sql/fetch_comments.sql",
            "output_config": {
                "table_name": "COMMENT_SENTIMENT_FEATURES",
                "database": "SOCIAL_MEDIA_DB",
                "schema": "ML_FEATURES"
            }
        },
        "musora_comments": {
            "name": "Musora Internal Comments",
            "enabled": true,
            "sql_query_file": "sql/fetch_musora_comments.sql",
            "output_config": {
                "table_name": "MUSORA_COMMENT_SENTIMENT_FEATURES",
                "database": "SOCIAL_MEDIA_DB",
                "schema": "ML_FEATURES"
            },
            "additional_fields": [
                "PERMALINK_URL",
                "THUMBNAIL_URL"
            ]
        }
    }
}

To add a new data source: Simply add a new entry to this config file and create the corresponding SQL query file.

Agent Configuration

The config_files/sentiment_config.json file controls agent behavior:

{
    "agents": {
        "language_detection": {
            "model": "gpt-5-nano",
            "temperature": 0.0,
            "max_retries": 3
        },
        "translation": {
            "model": "gpt-5-nano",
            "temperature": 0.3,
            "max_retries": 3
        },
        "sentiment_analysis": {
            "model": "gpt-5-nano",
            "temperature": 0.2,
            "max_retries": 3
        }
    },
    "workflow": {
        "description": "Batch size is calculated dynamically based on number of workers (min: 20, max: 1000)",
        "parallel_processing": {
            "enabled": true,
            "worker_calculation": "CPU count - 2, max 5 workers",
            "min_batch_size": 20,
            "max_batch_size": 1000
        }
    },
    "snowflake": {
        "output_table": "COMMENT_SENTIMENT_FEATURES",
        "database": "SOCIAL_MEDIA_DB",
        "schema": "ML_FEATURES"
    }
}

Note: Batch size is now calculated dynamically and no longer needs to be configured manually.

Sentiment Categories Configuration

The config_files/sentiment_analysis_config.json file defines sentiment categories and intents (easily extensible):

{
    "sentiment_polarity": {
        "categories": [
            {"value": "very_positive", "label": "Very Positive", "description": "..."},
            {"value": "positive", "label": "Positive", "description": "..."},
            {"value": "neutral", "label": "Neutral", "description": "..."},
            {"value": "negative", "label": "Negative", "description": "..."},
            {"value": "very_negative", "label": "Very Negative", "description": "..."}
        ]
    },
    "intent": {
        "categories": [
            {"value": "praise", "label": "Praise", "description": "..."},
            {"value": "question", "label": "Question", "description": "..."},
            {"value": "request", "label": "Request", "description": "..."},
            {"value": "feedback_negative", "label": "Negative Feedback", "description": "..."},
            {"value": "suggestion", "label": "Suggestion", "description": "..."},
            {"value": "humor_sarcasm", "label": "Humor/Sarcasm", "description": "..."},
            {"value": "off_topic", "label": "Off Topic", "description": "..."},
            {"value": "spam_selfpromo", "label": "Spam/Self-Promotion", "description": "..."}
        ]
    },
    "reply_policy": {
        "requires_reply_intents": ["question", "request"],
        "description": "Comments with these intents should be flagged for reply"
    },
    "intent_settings": {
        "multi_label": true,
        "description": "Intent can have multiple labels as a comment can express multiple intents"
    }
}

Adding New Agents

The system is designed for easy extensibility. To add a new agent:

1. Create Agent Class

from agents.base_agent import BaseAgent
from typing import Dict, Any

class MyNewAgent(BaseAgent):
    def __init__(self, config: Dict[str, Any], api_key: str):
        super().__init__("MyNewAgent", config)
        # Initialize your agent-specific components

    def validate_input(self, input_data: Dict[str, Any]) -> bool:
        # Validate input data
        return True

    def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        # Implement your agent logic
        return {"success": True, "result": "..."}

2. Update Workflow

Add the agent to workflow/comment_processor.py:

# Add to CommentState TypedDict
new_agent_result: str

# Add node
workflow.add_node("my_new_agent", self._my_new_agent_node)

# Add edges
workflow.add_edge("translation", "my_new_agent")
workflow.add_edge("my_new_agent", END)

3. Update Configuration

Add agent config to sentiment_config.json:

{
    "agents": {
        "my_new_agent": {
            "name": "MyNewAgent",
            "model": "gpt-4o-mini",
            "temperature": 0.5,
            "max_retries": 3
        }
    }
}

Output Schema

Social Media Comments Table

Stored in SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES

Musora Comments Table

Stored in SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES

Common Columns (Both Tables)

Column Type Description
COMMENT_SK NUMBER Surrogate key from source
COMMENT_ID VARCHAR Platform comment ID
ORIGINAL_TEXT VARCHAR Original comment text
PARENT_COMMENT_ID VARCHAR ID of parent comment if this is a reply
PARENT_COMMENT_TEXT VARCHAR Text of parent comment for context
DETECTED_LANGUAGE VARCHAR Detected language name
LANGUAGE_CODE VARCHAR ISO 639-1 code
IS_ENGLISH BOOLEAN Is comment in English
TRANSLATED_TEXT VARCHAR English translation
TRANSLATION_PERFORMED BOOLEAN Was translation performed
SENTIMENT_POLARITY VARCHAR Sentiment (very_positive, positive, neutral, negative, very_negative)
INTENT VARCHAR Multi-label intents (comma-separated)
REQUIRES_REPLY BOOLEAN Does comment need a response
SENTIMENT_CONFIDENCE VARCHAR Analysis confidence (high, medium, low)
PROCESSING_SUCCESS BOOLEAN Processing status
PROCESSED_AT TIMESTAMP Processing timestamp

Musora-Specific Additional Columns

Column Type Description
PERMALINK_URL VARCHAR Web URL path of the content
THUMBNAIL_URL VARCHAR Thumbnail URL of the content

Available Views

Social Media:

  • VW_COMMENTS_REQUIRING_REPLY: Comments that need responses (includes parent comment info)
  • VW_SENTIMENT_DISTRIBUTION: Sentiment and intent statistics by channel (includes reply comment count)
  • VW_NON_ENGLISH_COMMENTS: Filtered view of non-English comments

Musora:

  • VW_MUSORA_COMMENTS_REQUIRING_REPLY: Musora comments needing responses
  • VW_MUSORA_SENTIMENT_DISTRIBUTION: Musora sentiment and intent statistics
  • VW_MUSORA_NON_ENGLISH_COMMENTS: Non-English Musora comments

Workflow Architecture

The system uses LangGraph to create a flexible, state-based workflow:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Fetch Comments     β”‚
β”‚  from Snowflake     β”‚
β”‚  (Unprocessed Only) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Language Detection β”‚
β”‚  Agent              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
      β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
      β”‚ English?β”‚
      β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
           β”‚
     β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”
     β”‚           β”‚
   Yes          No
     β”‚           β”‚
     β”‚           β–Ό
     β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     β”‚    β”‚ Translation β”‚
     β”‚    β”‚ Agent       β”‚
     β”‚    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
     β”‚           β”‚
     β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ Sentiment        β”‚
    β”‚ Analysis Agent   β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              β”‚
              β–Ό
       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
       β”‚Store Results β”‚
       β”‚to Snowflake  β”‚
       β”‚(Append Mode) β”‚
       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Note: The fetch step automatically excludes comments already present in COMMENT_SENTIMENT_FEATURES, enabling incremental processing.

Logging

Logs are automatically created in the logs/ directory with timestamps:

logs/comment_processing_20251001_143022.log

Adding New Data Sources

The system is designed to make adding new data sources easy:

Steps to Add a New Source:

  1. Update Configuration (config_files/data_sources_config.json):

    "your_new_source": {
        "name": "Your New Source Name",
        "enabled": true,
        "sql_query_file": "sql/fetch_your_source.sql",
        "output_config": {
            "table_name": "YOUR_SOURCE_SENTIMENT_FEATURES",
            "database": "SOCIAL_MEDIA_DB",
            "schema": "ML_FEATURES"
        },
        "additional_fields": ["FIELD1", "FIELD2"]  // Optional
    }
    
  2. Create SQL Query File (sql/fetch_your_source.sql):

    • Fetch comments with consistent column names
    • Include self-join for parent comments if available
    • Exclude already-processed comments (LEFT JOIN with output table)
  3. Create Table Initialization Script (sql/init_your_source_table.sql):

    • Creates empty table structure
    • Base schema on init_musora_table.sql
    • Add source-specific fields as needed
    • Run this in Snowflake FIRST before processing
  4. Create Full Schema (optional):

    • Base schema on create_musora_ml_features_table.sql
    • Include views and indexes
  5. Run First Time:

    # Step 1: Run init script in Snowflake
    sql/init_your_source_table.sql
    
    # Step 2: Process first batch
    python main.py --overwrite --data-source your_new_source --limit 100
    

No code changes required!

Best Practices

  1. Testing: Always test with --limit flag first (e.g., --limit 100)
  2. New Data Sources: Test new sources with --sequential --limit 100 first
  3. Debugging: Use --sequential flag for easier debugging of processing issues
  4. Incremental Processing: Run regularly without --overwrite to process only new comments
  5. Monitoring: Check logs for processing errors and batch completion
  6. Performance: Use default parallel mode for production workloads
  7. Extensibility: Follow the base agent pattern for consistency
  8. Error Handling: All agents include robust error handling
  9. Failed Comments: Review logs for failed comments - they'll be automatically retried in future runs
  10. Resource Management: System automatically adapts to available CPU resources
  11. Parent Comments: Ensure SQL queries include parent comment joins for best accuracy

Sentiment Analysis Features

Multi-Label Intent Classification

The system supports multi-label intent classification, meaning a single comment can have multiple intents:

  • Example: "This is amazing! What scale are you using?" β†’ ["praise", "question"]
  • Example: "Love this but can you make a tutorial on it?" β†’ ["praise", "request"]

Context-Aware Analysis with Parent Comment Support

The sentiment analysis agent provides rich context understanding:

  1. Content Context: Uses the content_description field to understand what the comment is about
  2. Parent Comment Context (NEW): When analyzing reply comments, the system:
    • Automatically detects when a comment is a reply
    • Fetches the parent comment text from the database
    • Includes parent comment in the LLM prompt
    • Explicitly instructs the LLM that this is a reply comment
    • Results in more accurate sentiment and intent classification

Example:

  • Parent Comment: "Does anyone know how to play this riff?"
  • Reply Comment: "Yes!"
  • Without parent context: Might be classified as unclear/off-topic
  • With parent context: Correctly classified as answering a question

This dramatically improves accuracy for:

  • Short reply comments ("Yes", "Thanks!", "Agreed")
  • Sarcastic replies (context crucial for understanding)
  • Continuation of discussions
  • Agreement/disagreement comments

Failure Handling & Reprocessing

Comments that fail sentiment analysis (missing critical fields like sentiment_polarity or intents) are:

  • Marked as success=False in the workflow
  • NOT stored in Snowflake
  • Automatically available for reprocessing in future runs

This ensures only successfully processed comments are stored, while failed comments remain available for retry.

Incremental Processing & Deduplication

The pipeline automatically handles incremental processing:

  • SQL-level deduplication: Query excludes comments already in COMMENT_SENTIMENT_FEATURES using LEFT JOIN
  • Automatic retry: Failed comments (not stored) are automatically retried on next run
  • Append-only mode: Default behavior appends new records without overwriting
  • Production-ready: Safe to run daily/weekly/monthly to process new comments

Scalable Configuration

To add or modify sentiment categories or intents:

  1. Edit config_files/sentiment_analysis_config.json
  2. Add/modify categories in the sentiment_polarity or intent sections
  3. Update reply_policy.requires_reply_intents if needed
  4. No code changes required!

Future Extensions

The modular architecture supports easy addition of:

  • Topic classification agent
  • Entity extraction agent
  • Engagement score prediction agent
  • Named entity recognition agent

Simply create a new agent inheriting from BaseAgent and add it to the workflow graph.

Troubleshooting

Issue: "Object does not exist or not authorized" on First Run

Error: Object 'SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES' does not exist or not authorized

Cause: The fetch query tries to check for already-processed comments, but the output table doesn't exist yet on first run.

Solution:

  1. Run the initialization script first:
    -- Execute in Snowflake
    sql/init_musora_table.sql
    
  2. Then run the processing:
    python main.py --overwrite --data-source musora_comments --limit 100
    

Issue: API Rate Limits

If hitting API rate limits, reduce the number of parallel workers or process fewer comments:

# Process fewer comments at a time
python main.py --limit 500

# Or use sequential mode
python main.py --sequential --limit 100

Issue: Memory Issues

Process in smaller batches using --limit:

python main.py --limit 500

Issue: Debugging Processing Errors

Use sequential mode to debug issues more easily:

python main.py --sequential --limit 50

This processes all comments in a single batch with clearer error messages.

Issue: Connection Timeouts

Check Snowflake credentials in .env and network connectivity.

Issue: Parallel Processing Not Working

If multiprocessing issues occur, use sequential mode:

python main.py --sequential

Performance

Expected Speedup

Parallel processing provides significant performance improvements:

  • Sequential: 1x (baseline)
  • 2 workers: ~1.8-1.9x faster
  • 5 workers: ~4-4.5x faster

Speedup isn't perfectly linear due to:

  • Snowflake connection overhead
  • LLM API rate limits (shared across workers)
  • I/O operations

Monitoring Performance

The processing summary includes:

  • Total processing time
  • Average time per comment
  • Number of workers used
  • Batch size calculations
  • Failed batches (if any)

License

Internal use only - Musora sentiment analysis project.