File size: 10,753 Bytes
c1c559f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import json
import threading
import time
from datetime import datetime
from collections import defaultdict, deque
import logging
import os
import random

app = Flask(__name__)
app.config['SECRET_KEY'] = 'sentiment-dashboard-secret'
socketio = SocketIO(app, cors_allowed_origins="*")

# In-memory storage for dashboard data
sentiment_counts = {'positive': 0, 'negative': 0, 'neutral': 0}
recent_tweets = deque(maxlen=50)  # Keep last 50 tweets
hourly_sentiment = defaultdict(lambda: {'positive': 0, 'negative': 0, 'neutral': 0})

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Check environment for mock mode
USE_MOCK = os.environ.get("USE_MOCK", "true").lower() == "true"

def kafka_consumer_thread():
    """Background thread to consume processed tweets from Kafka or generate mock data"""
    if USE_MOCK:
        logger.info("Running in MOCK mode - generating demo data")
        mock_tweet_generator()
    else:
        logger.info("Running in KAFKA mode - connecting to real Kafka")
        real_kafka_consumer()

def mock_tweet_generator():
    """Generate mock tweets for demo purposes"""
    sentiments = ["positive", "neutral", "negative"]
    
    # Sample mock tweets for demo
    sample_tweets = [
        "I absolutely love this new Python framework! Amazing! 🐍✨",
        "Just finished my first machine learning project! So excited! πŸš€",
        "Beautiful sunny day! Perfect for coding β˜•οΈπŸ’»",
        "Finally understood how Kafka works! Awesome technology πŸŽ‰",
        "Ugh, spent 3 hours debugging this error. So frustrated 😀",
        "This API documentation is terrible. Nothing works 😑", 
        "Why is deployment always so painful? πŸ’”",
        "Working on a new feature. Should be ready next week.",
        "Attending a tech conference tomorrow. Looking forward to it.",
        "Updated the dependencies. Everything seems fine.",
        "Django vs Flask debate continues. Both are good.",
        "Love how clean Python code can be. Beautiful language!",
        "FastAPI is becoming my go-to for REST APIs. So fast!",
        "NumPy arrays are much faster than regular lists.",
        "Jupyter notebooks are perfect for data exploration.",
    ]
    
    tweet_count = 0
    
    while True:
        try:
            # Generate a mock tweet
            sentiment = random.choice(sentiments)
            tweet_text = random.choice(sample_tweets)
            
            tweet_data = {
                'text': tweet_text,
                'sentiment': sentiment,
                'timestamp': datetime.now().strftime('%H:%M:%S'),
                'author_id': f'user_{random.randint(1000, 9999)}'
            }
            
            # Update sentiment counts
            sentiment_counts[sentiment] += 1
            
            # Add to recent tweets
            recent_tweets.append(tweet_data)
            
            # Update hourly data
            hour = datetime.now().strftime('%H:00')
            hourly_sentiment[hour][sentiment] += 1
            
            # Emit real-time update to connected clients
            socketio.emit('sentiment_update', {
                'sentiment_counts': dict(sentiment_counts),
                'recent_tweets': list(recent_tweets),
                'hourly_data': dict(hourly_sentiment)
            })
            
            tweet_count += 1
            logger.info(f"Generated mock tweet #{tweet_count} with sentiment: {sentiment}")
            
            # Random delay between tweets (1-3 seconds for demo)
            time.sleep(random.uniform(1, 3))
            
        except Exception as e:
            logger.error(f"Error in mock tweet generator: {e}")
            time.sleep(5)

def real_kafka_consumer():
    """Real Kafka consumer for production use"""
    try:
        from kafka import KafkaConsumer
        from kafka.errors import NoBrokersAvailable
        
        def create_kafka_consumer(max_retries=10, retry_delay=5):
            """Create Kafka consumer with retry logic"""
            for attempt in range(max_retries):
                try:
                    consumer = KafkaConsumer(
                        'sentiment-results',
                        bootstrap_servers=['kafka:9092'],
                        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                        consumer_timeout_ms=1000,
                        auto_offset_reset='earliest',
                        enable_auto_commit=True,
                        group_id='dashboard-group'
                    )
                    logger.info("Successfully connected to Kafka consumer!")
                    return consumer
                except NoBrokersAvailable as e:
                    logger.warning(f"Kafka not ready, attempt {attempt + 1}/{max_retries}. Retrying in {retry_delay}s...")
                    time.sleep(retry_delay)
                except Exception as e:
                    logger.error(f"Unexpected error connecting to Kafka: {e}")
                    time.sleep(retry_delay)
            
            raise Exception(f"Could not connect to Kafka consumer after {max_retries} attempts")

        # Wait for Kafka and Spark to be ready
        logger.info("Waiting for Kafka and Spark services to be ready...")
        time.sleep(10)
        
        consumer = create_kafka_consumer()
        logger.info("Connected to Kafka consumer for dashboard - waiting for processed tweets...")
        
        message_count = 0
        
        while True:
            try:
                # Poll for messages with timeout
                message_batch = consumer.poll(timeout_ms=1000)
                
                if message_batch:
                    logger.info(f"Received batch with {len(message_batch)} topic partitions")
                    
                    for topic_partition, messages in message_batch.items():
                        logger.info(f"Processing {len(messages)} messages from {topic_partition}")
                        
                        for message in messages:
                            try:
                                tweet_data = message.value
                                message_count += 1
                                logger.info(f"Message {message_count}: Received tweet data: {tweet_data}")
                                
                                # Update sentiment counts
                                sentiment = tweet_data.get('sentiment', 'neutral')
                                sentiment_counts[sentiment] += 1
                                
                                # Add to recent tweets
                                recent_tweets.append({
                                    'text': tweet_data.get('tweet_text', '')[:100] + '...' if len(tweet_data.get('tweet_text', '')) > 100 else tweet_data.get('tweet_text', ''),
                                    'sentiment': sentiment,
                                    'timestamp': datetime.now().strftime('%H:%M:%S'),
                                    'author_id': tweet_data.get('author_id', 'Unknown')
                                })
                                
                                # Update hourly data
                                hour = datetime.now().strftime('%H:00')
                                hourly_sentiment[hour][sentiment] += 1
                                
                                # Emit real-time update to connected clients
                                socketio.emit('sentiment_update', {
                                    'sentiment_counts': dict(sentiment_counts),
                                    'recent_tweets': list(recent_tweets),
                                    'hourly_data': dict(hourly_sentiment)
                                })
                                
                                logger.info(f"Processed tweet with sentiment: {sentiment} - Total counts: {dict(sentiment_counts)}")
                                
                            except Exception as e:
                                logger.error(f"Error processing individual tweet data: {e}")
                else:
                    if message_count == 0:
                        logger.info("No messages received yet, continuing to poll...")
                    time.sleep(1)
                        
            except Exception as e:
                logger.error(f"Error in polling loop: {e}")
                time.sleep(5)
                
    except ImportError:
        logger.warning("kafka-python not available, falling back to mock mode")
        mock_tweet_generator()
    except Exception as e:
        logger.error(f"Error in real Kafka consumer: {e}")
        logger.info("Falling back to mock mode")
        mock_tweet_generator()

@app.route('/')
def dashboard():
    """Main dashboard page"""
    return render_template('dashboard.html')

@app.route('/api/data')
def get_data():
    """API endpoint to get current dashboard data"""
    data = {
        'sentiment_counts': dict(sentiment_counts),
        'recent_tweets': list(recent_tweets),
        'hourly_data': dict(hourly_sentiment),
        'total_tweets': sum(sentiment_counts.values())
    }
    logger.info(f"API request - returning data: {data}")
    return jsonify(data)

@socketio.on('connect')
def handle_connect():
    """Handle client connection"""
    logger.info("Client connected to dashboard")
    emit('sentiment_update', {
        'sentiment_counts': dict(sentiment_counts),
        'recent_tweets': list(recent_tweets),
        'hourly_data': dict(hourly_sentiment)
    })

if __name__ == '__main__':
    # Start consumer thread (either mock or real Kafka)
    consumer_thread = threading.Thread(target=kafka_consumer_thread, daemon=True)
    consumer_thread.start()
    
    mode = "MOCK" if USE_MOCK else "KAFKA"
    logger.info(f"Starting sentiment dashboard in {mode} mode on port 5000")
    
    if USE_MOCK:
        logger.info("Dashboard will display mock demo data")
    else:
        logger.info("Dashboard will display data once Spark processes tweets from Kafka")
    
    # Get port from environment (Hugging Face Spaces uses port 7860)
    port = int(os.environ.get('PORT', 5000))
    
    # Fix for Werkzeug warning - use allow_unsafe_werkzeug for development
    socketio.run(app, host='0.0.0.0', port=port, debug=False, allow_unsafe_werkzeug=True)