Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, jsonify | |
| from flask_socketio import SocketIO, emit | |
| import json | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from collections import defaultdict, deque | |
| import logging | |
| import os | |
| import random | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'sentiment-dashboard-secret' | |
| socketio = SocketIO(app, cors_allowed_origins="*") | |
| # In-memory storage for dashboard data | |
| sentiment_counts = {'positive': 0, 'negative': 0, 'neutral': 0} | |
| recent_tweets = deque(maxlen=50) # Keep last 50 tweets | |
| hourly_sentiment = defaultdict(lambda: {'positive': 0, 'negative': 0, 'neutral': 0}) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Check environment for mock mode | |
| USE_MOCK = os.environ.get("USE_MOCK", "true").lower() == "true" | |
| def kafka_consumer_thread(): | |
| """Background thread to consume processed tweets from Kafka or generate mock data""" | |
| if USE_MOCK: | |
| logger.info("Running in MOCK mode - generating demo data") | |
| mock_tweet_generator() | |
| else: | |
| logger.info("Running in KAFKA mode - connecting to real Kafka") | |
| real_kafka_consumer() | |
| def mock_tweet_generator(): | |
| """Generate mock tweets for demo purposes""" | |
| sentiments = ["positive", "neutral", "negative"] | |
| # Sample mock tweets for demo | |
| sample_tweets = [ | |
| "I absolutely love this new Python framework! Amazing! πβ¨", | |
| "Just finished my first machine learning project! So excited! π", | |
| "Beautiful sunny day! Perfect for coding βοΈπ»", | |
| "Finally understood how Kafka works! Awesome technology π", | |
| "Ugh, spent 3 hours debugging this error. So frustrated π€", | |
| "This API documentation is terrible. Nothing works π‘", | |
| "Why is deployment always so painful? π", | |
| "Working on a new feature. Should be ready next week.", | |
| "Attending a tech conference tomorrow. Looking forward to it.", | |
| "Updated the dependencies. Everything seems fine.", | |
| "Django vs Flask debate continues. Both are good.", | |
| "Love how clean Python code can be. Beautiful language!", | |
| "FastAPI is becoming my go-to for REST APIs. So fast!", | |
| "NumPy arrays are much faster than regular lists.", | |
| "Jupyter notebooks are perfect for data exploration.", | |
| ] | |
| tweet_count = 0 | |
| while True: | |
| try: | |
| # Generate a mock tweet | |
| sentiment = random.choice(sentiments) | |
| tweet_text = random.choice(sample_tweets) | |
| tweet_data = { | |
| 'text': tweet_text, | |
| 'sentiment': sentiment, | |
| 'timestamp': datetime.now().strftime('%H:%M:%S'), | |
| 'author_id': f'user_{random.randint(1000, 9999)}' | |
| } | |
| # Update sentiment counts | |
| sentiment_counts[sentiment] += 1 | |
| # Add to recent tweets | |
| recent_tweets.append(tweet_data) | |
| # Update hourly data | |
| hour = datetime.now().strftime('%H:00') | |
| hourly_sentiment[hour][sentiment] += 1 | |
| # Emit real-time update to connected clients | |
| socketio.emit('sentiment_update', { | |
| 'sentiment_counts': dict(sentiment_counts), | |
| 'recent_tweets': list(recent_tweets), | |
| 'hourly_data': dict(hourly_sentiment) | |
| }) | |
| tweet_count += 1 | |
| logger.info(f"Generated mock tweet #{tweet_count} with sentiment: {sentiment}") | |
| # Random delay between tweets (1-3 seconds for demo) | |
| time.sleep(random.uniform(1, 3)) | |
| except Exception as e: | |
| logger.error(f"Error in mock tweet generator: {e}") | |
| time.sleep(5) | |
| def real_kafka_consumer(): | |
| """Real Kafka consumer for production use""" | |
| try: | |
| from kafka import KafkaConsumer | |
| from kafka.errors import NoBrokersAvailable | |
| def create_kafka_consumer(max_retries=10, retry_delay=5): | |
| """Create Kafka consumer with retry logic""" | |
| for attempt in range(max_retries): | |
| try: | |
| consumer = KafkaConsumer( | |
| 'sentiment-results', | |
| bootstrap_servers=['kafka:9092'], | |
| value_deserializer=lambda m: json.loads(m.decode('utf-8')), | |
| consumer_timeout_ms=1000, | |
| auto_offset_reset='earliest', | |
| enable_auto_commit=True, | |
| group_id='dashboard-group' | |
| ) | |
| logger.info("Successfully connected to Kafka consumer!") | |
| return consumer | |
| except NoBrokersAvailable as e: | |
| logger.warning(f"Kafka not ready, attempt {attempt + 1}/{max_retries}. Retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| except Exception as e: | |
| logger.error(f"Unexpected error connecting to Kafka: {e}") | |
| time.sleep(retry_delay) | |
| raise Exception(f"Could not connect to Kafka consumer after {max_retries} attempts") | |
| # Wait for Kafka and Spark to be ready | |
| logger.info("Waiting for Kafka and Spark services to be ready...") | |
| time.sleep(10) | |
| consumer = create_kafka_consumer() | |
| logger.info("Connected to Kafka consumer for dashboard - waiting for processed tweets...") | |
| message_count = 0 | |
| while True: | |
| try: | |
| # Poll for messages with timeout | |
| message_batch = consumer.poll(timeout_ms=1000) | |
| if message_batch: | |
| logger.info(f"Received batch with {len(message_batch)} topic partitions") | |
| for topic_partition, messages in message_batch.items(): | |
| logger.info(f"Processing {len(messages)} messages from {topic_partition}") | |
| for message in messages: | |
| try: | |
| tweet_data = message.value | |
| message_count += 1 | |
| logger.info(f"Message {message_count}: Received tweet data: {tweet_data}") | |
| # Update sentiment counts | |
| sentiment = tweet_data.get('sentiment', 'neutral') | |
| sentiment_counts[sentiment] += 1 | |
| # Add to recent tweets | |
| recent_tweets.append({ | |
| 'text': tweet_data.get('tweet_text', '')[:100] + '...' if len(tweet_data.get('tweet_text', '')) > 100 else tweet_data.get('tweet_text', ''), | |
| 'sentiment': sentiment, | |
| 'timestamp': datetime.now().strftime('%H:%M:%S'), | |
| 'author_id': tweet_data.get('author_id', 'Unknown') | |
| }) | |
| # Update hourly data | |
| hour = datetime.now().strftime('%H:00') | |
| hourly_sentiment[hour][sentiment] += 1 | |
| # Emit real-time update to connected clients | |
| socketio.emit('sentiment_update', { | |
| 'sentiment_counts': dict(sentiment_counts), | |
| 'recent_tweets': list(recent_tweets), | |
| 'hourly_data': dict(hourly_sentiment) | |
| }) | |
| logger.info(f"Processed tweet with sentiment: {sentiment} - Total counts: {dict(sentiment_counts)}") | |
| except Exception as e: | |
| logger.error(f"Error processing individual tweet data: {e}") | |
| else: | |
| if message_count == 0: | |
| logger.info("No messages received yet, continuing to poll...") | |
| time.sleep(1) | |
| except Exception as e: | |
| logger.error(f"Error in polling loop: {e}") | |
| time.sleep(5) | |
| except ImportError: | |
| logger.warning("kafka-python not available, falling back to mock mode") | |
| mock_tweet_generator() | |
| except Exception as e: | |
| logger.error(f"Error in real Kafka consumer: {e}") | |
| logger.info("Falling back to mock mode") | |
| mock_tweet_generator() | |
| def dashboard(): | |
| """Main dashboard page""" | |
| return render_template('dashboard.html') | |
| def get_data(): | |
| """API endpoint to get current dashboard data""" | |
| data = { | |
| 'sentiment_counts': dict(sentiment_counts), | |
| 'recent_tweets': list(recent_tweets), | |
| 'hourly_data': dict(hourly_sentiment), | |
| 'total_tweets': sum(sentiment_counts.values()) | |
| } | |
| logger.info(f"API request - returning data: {data}") | |
| return jsonify(data) | |
| def handle_connect(): | |
| """Handle client connection""" | |
| logger.info("Client connected to dashboard") | |
| emit('sentiment_update', { | |
| 'sentiment_counts': dict(sentiment_counts), | |
| 'recent_tweets': list(recent_tweets), | |
| 'hourly_data': dict(hourly_sentiment) | |
| }) | |
| if __name__ == '__main__': | |
| # Start consumer thread (either mock or real Kafka) | |
| consumer_thread = threading.Thread(target=kafka_consumer_thread, daemon=True) | |
| consumer_thread.start() | |
| mode = "MOCK" if USE_MOCK else "KAFKA" | |
| logger.info(f"Starting sentiment dashboard in {mode} mode on port 5000") | |
| if USE_MOCK: | |
| logger.info("Dashboard will display mock demo data") | |
| else: | |
| logger.info("Dashboard will display data once Spark processes tweets from Kafka") | |
| # Get port from environment (Hugging Face Spaces uses port 7860) | |
| port = int(os.environ.get('PORT', 5000)) | |
| # Fix for Werkzeug warning - use allow_unsafe_werkzeug for development | |
| socketio.run(app, host='0.0.0.0', port=port, debug=False, allow_unsafe_werkzeug=True) |