Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, jsonify | |
| from flask_socketio import SocketIO, emit | |
| from kafka import KafkaConsumer | |
| from kafka.errors import NoBrokersAvailable | |
| import json | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from collections import defaultdict, deque | |
| import logging | |
| import os | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'sentiment-dashboard-secret' | |
| socketio = SocketIO(app, cors_allowed_origins="*") | |
| # In-memory storage for dashboard data | |
| sentiment_counts = {'positive': 0, 'negative': 0, 'neutral': 0} | |
| recent_tweets = deque(maxlen=50) # Keep last 50 tweets | |
| hourly_sentiment = defaultdict(lambda: {'positive': 0, 'negative': 0, 'neutral': 0}) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def create_kafka_consumer(max_retries=10, retry_delay=5): | |
| """Create Kafka consumer with retry logic""" | |
| for attempt in range(max_retries): | |
| try: | |
| consumer = KafkaConsumer( | |
| 'sentiment-results', | |
| bootstrap_servers=['kafka:9092'], | |
| value_deserializer=lambda m: json.loads(m.decode('utf-8')), | |
| consumer_timeout_ms=1000, | |
| auto_offset_reset='earliest', # Changed from 'latest' to 'earliest' | |
| enable_auto_commit=True, | |
| group_id='dashboard-group' # Added consumer group | |
| ) | |
| logger.info("Successfully connected to Kafka consumer!") | |
| return consumer | |
| except NoBrokersAvailable as e: | |
| logger.warning(f"Kafka not ready, attempt {attempt + 1}/{max_retries}. Retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| except Exception as e: | |
| logger.error(f"Unexpected error connecting to Kafka: {e}") | |
| time.sleep(retry_delay) | |
| raise Exception(f"Could not connect to Kafka consumer after {max_retries} attempts") | |
| def kafka_consumer_thread(): | |
| """Background thread to consume processed tweets from Kafka""" | |
| try: | |
| # Wait for Kafka and Spark to be ready | |
| logger.info("Waiting for Kafka and Spark services to be ready...") | |
| time.sleep(10) # Reduced from 30 to 10 seconds | |
| consumer = create_kafka_consumer() | |
| logger.info("Connected to Kafka consumer for dashboard - waiting for processed tweets...") | |
| logger.info("Starting to poll for messages from sentiment-results topic...") | |
| message_count = 0 | |
| while True: | |
| try: | |
| # Poll for messages with timeout | |
| message_batch = consumer.poll(timeout_ms=1000) | |
| if message_batch: | |
| logger.info(f"Received batch with {len(message_batch)} topic partitions") | |
| for topic_partition, messages in message_batch.items(): | |
| logger.info(f"Processing {len(messages)} messages from {topic_partition}") | |
| for message in messages: | |
| try: | |
| tweet_data = message.value | |
| message_count += 1 | |
| logger.info(f"Message {message_count}: Received tweet data: {tweet_data}") | |
| # Update sentiment counts | |
| sentiment = tweet_data.get('sentiment', 'neutral') | |
| sentiment_counts[sentiment] += 1 | |
| # Add to recent tweets | |
| recent_tweets.append({ | |
| 'text': tweet_data.get('tweet_text', '')[:100] + '...' if len(tweet_data.get('tweet_text', '')) > 100 else tweet_data.get('tweet_text', ''), | |
| 'sentiment': sentiment, | |
| 'timestamp': datetime.now().strftime('%H:%M:%S'), | |
| 'author_id': tweet_data.get('author_id', 'Unknown') | |
| }) | |
| # Update hourly data | |
| hour = datetime.now().strftime('%H:00') | |
| hourly_sentiment[hour][sentiment] += 1 | |
| # Emit real-time update to connected clients | |
| socketio.emit('sentiment_update', { | |
| 'sentiment_counts': dict(sentiment_counts), | |
| 'recent_tweets': list(recent_tweets), | |
| 'hourly_data': dict(hourly_sentiment) | |
| }) | |
| logger.info(f"Processed tweet with sentiment: {sentiment} - Total counts: {dict(sentiment_counts)}") | |
| except Exception as e: | |
| logger.error(f"Error processing individual tweet data: {e}") | |
| else: | |
| # No messages received | |
| if message_count == 0: | |
| logger.info("No messages received yet, continuing to poll...") | |
| time.sleep(1) | |
| except Exception as e: | |
| logger.error(f"Error in polling loop: {e}") | |
| time.sleep(5) | |
| except Exception as e: | |
| logger.error(f"Error in Kafka consumer thread: {e}") | |
| def dashboard(): | |
| """Main dashboard page""" | |
| return render_template('dashboard.html') | |
| def get_data(): | |
| """API endpoint to get current dashboard data""" | |
| data = { | |
| 'sentiment_counts': dict(sentiment_counts), | |
| 'recent_tweets': list(recent_tweets), | |
| 'hourly_data': dict(hourly_sentiment), | |
| 'total_tweets': sum(sentiment_counts.values()) | |
| } | |
| logger.info(f"API request - returning data: {data}") | |
| return jsonify(data) | |
| def handle_connect(): | |
| """Handle client connection""" | |
| logger.info("Client connected to dashboard") | |
| emit('sentiment_update', { | |
| 'sentiment_counts': dict(sentiment_counts), | |
| 'recent_tweets': list(recent_tweets), | |
| 'hourly_data': dict(hourly_sentiment) | |
| }) | |
| if __name__ == '__main__': | |
| # Start Kafka consumer in background thread | |
| consumer_thread = threading.Thread(target=kafka_consumer_thread, daemon=True) | |
| consumer_thread.start() | |
| logger.info("Starting sentiment dashboard on port 5000") | |
| logger.info("Dashboard will display data once Spark processes tweets from Kafka") | |
| # Fix for Werkzeug warning - use allow_unsafe_werkzeug for development | |
| socketio.run(app, host='0.0.0.0', port=5000, debug=False, allow_unsafe_werkzeug=True) |