File size: 7,071 Bytes
e18a159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
import json
import threading
import time
from datetime import datetime
from collections import defaultdict, deque
import logging
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = 'sentiment-dashboard-secret'
socketio = SocketIO(app, cors_allowed_origins="*")

# In-memory storage for dashboard data
sentiment_counts = {'positive': 0, 'negative': 0, 'neutral': 0}
recent_tweets = deque(maxlen=50)  # Keep last 50 tweets
hourly_sentiment = defaultdict(lambda: {'positive': 0, 'negative': 0, 'neutral': 0})

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_kafka_consumer(max_retries=10, retry_delay=5):
    """Create Kafka consumer with retry logic"""
    for attempt in range(max_retries):
        try:
            consumer = KafkaConsumer(
                'sentiment-results',
                bootstrap_servers=['kafka:9092'],
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                consumer_timeout_ms=1000,
                auto_offset_reset='earliest',  # Changed from 'latest' to 'earliest'
                enable_auto_commit=True,
                group_id='dashboard-group'  # Added consumer group
            )
            logger.info("Successfully connected to Kafka consumer!")
            return consumer
        except NoBrokersAvailable as e:
            logger.warning(f"Kafka not ready, attempt {attempt + 1}/{max_retries}. Retrying in {retry_delay}s...")
            time.sleep(retry_delay)
        except Exception as e:
            logger.error(f"Unexpected error connecting to Kafka: {e}")
            time.sleep(retry_delay)
    
    raise Exception(f"Could not connect to Kafka consumer after {max_retries} attempts")

def kafka_consumer_thread():
    """Background thread to consume processed tweets from Kafka"""
    try:
        # Wait for Kafka and Spark to be ready
        logger.info("Waiting for Kafka and Spark services to be ready...")
        time.sleep(10)  # Reduced from 30 to 10 seconds
        
        consumer = create_kafka_consumer()
        logger.info("Connected to Kafka consumer for dashboard - waiting for processed tweets...")
        logger.info("Starting to poll for messages from sentiment-results topic...")
        
        message_count = 0
        
        while True:
            try:
                # Poll for messages with timeout
                message_batch = consumer.poll(timeout_ms=1000)
                
                if message_batch:
                    logger.info(f"Received batch with {len(message_batch)} topic partitions")
                    
                    for topic_partition, messages in message_batch.items():
                        logger.info(f"Processing {len(messages)} messages from {topic_partition}")
                        
                        for message in messages:
                            try:
                                tweet_data = message.value
                                message_count += 1
                                logger.info(f"Message {message_count}: Received tweet data: {tweet_data}")
                                
                                # Update sentiment counts
                                sentiment = tweet_data.get('sentiment', 'neutral')
                                sentiment_counts[sentiment] += 1
                                
                                # Add to recent tweets
                                recent_tweets.append({
                                    'text': tweet_data.get('tweet_text', '')[:100] + '...' if len(tweet_data.get('tweet_text', '')) > 100 else tweet_data.get('tweet_text', ''),
                                    'sentiment': sentiment,
                                    'timestamp': datetime.now().strftime('%H:%M:%S'),
                                    'author_id': tweet_data.get('author_id', 'Unknown')
                                })
                                
                                # Update hourly data
                                hour = datetime.now().strftime('%H:00')
                                hourly_sentiment[hour][sentiment] += 1
                                
                                # Emit real-time update to connected clients
                                socketio.emit('sentiment_update', {
                                    'sentiment_counts': dict(sentiment_counts),
                                    'recent_tweets': list(recent_tweets),
                                    'hourly_data': dict(hourly_sentiment)
                                })
                                
                                logger.info(f"Processed tweet with sentiment: {sentiment} - Total counts: {dict(sentiment_counts)}")
                                
                            except Exception as e:
                                logger.error(f"Error processing individual tweet data: {e}")
                else:
                    # No messages received
                    if message_count == 0:
                        logger.info("No messages received yet, continuing to poll...")
                    time.sleep(1)
                        
            except Exception as e:
                logger.error(f"Error in polling loop: {e}")
                time.sleep(5)
                
    except Exception as e:
        logger.error(f"Error in Kafka consumer thread: {e}")

@app.route('/')
def dashboard():
    """Main dashboard page"""
    return render_template('dashboard.html')

@app.route('/api/data')
def get_data():
    """API endpoint to get current dashboard data"""
    data = {
        'sentiment_counts': dict(sentiment_counts),
        'recent_tweets': list(recent_tweets),
        'hourly_data': dict(hourly_sentiment),
        'total_tweets': sum(sentiment_counts.values())
    }
    logger.info(f"API request - returning data: {data}")
    return jsonify(data)

@socketio.on('connect')
def handle_connect():
    """Handle client connection"""
    logger.info("Client connected to dashboard")
    emit('sentiment_update', {
        'sentiment_counts': dict(sentiment_counts),
        'recent_tweets': list(recent_tweets),
        'hourly_data': dict(hourly_sentiment)
    })

if __name__ == '__main__':
    # Start Kafka consumer in background thread
    consumer_thread = threading.Thread(target=kafka_consumer_thread, daemon=True)
    consumer_thread.start()
    
    logger.info("Starting sentiment dashboard on port 5000")
    logger.info("Dashboard will display data once Spark processes tweets from Kafka")
    
    # Fix for Werkzeug warning - use allow_unsafe_werkzeug for development
    socketio.run(app, host='0.0.0.0', port=5000, debug=False, allow_unsafe_werkzeug=True)