File size: 3,991 Bytes
e18a159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, from_json, to_json, struct
from pyspark.sql.types import StringType, StructType, StructField, LongType, DoubleType
import time
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def simple_sentiment(text):
    if text is None:
        return 'neutral'
    text = text.lower()
    if any(word in text for word in ['good', 'great', 'awesome', 'happy', 'love', 'excellent', 'amazing']):
        return 'positive'
    elif any(word in text for word in ['bad', 'terrible', 'awful', 'sad', 'hate', 'worst', 'horrible']):
        return 'negative'
    return 'neutral'

sentiment_udf = udf(simple_sentiment, StringType())

logger.info("Waiting for services to start...")
time.sleep(15)  # Wait for services

try:
    spark = SparkSession.builder \
        .appName("KafkaSentimentConsumer") \
        .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
        .config("spark.sql.adaptive.enabled", "false") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "false") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    logger.info("Spark session created successfully")

    tweet_schema = StructType([
        StructField("id", LongType(), True),
        StructField("text", StringType(), True),
        StructField("created_at", StringType(), True),
        StructField("author_id", LongType(), True),
        StructField("timestamp", DoubleType(), True)
    ])

    logger.info("Connecting to Kafka...")

    # Read from input topic
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "sentiment-topic") \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .load()

    logger.info("Connected to Kafka, processing tweets...")

    # Parse and process tweets
    parsed_df = df.select(
        col("timestamp").alias("kafka_timestamp"),
        from_json(col("value").cast("string"), tweet_schema).alias("tweet_data")
    ).filter(col("tweet_data").isNotNull())

    result_df = parsed_df.select(
        col("tweet_data.id").alias("tweet_id"),
        col("tweet_data.text").alias("tweet_text"),
        col("tweet_data.created_at").alias("created_at"),
        col("tweet_data.author_id").alias("author_id"),
        col("kafka_timestamp")
    ).withColumn("sentiment", sentiment_udf(col("tweet_text")))

    # Create a copy for console output
    console_query = result_df.writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", False) \
        .trigger(processingTime='5 seconds') \
        .start()

    logger.info("Console output stream started")

    # Send results to dashboard topic
    dashboard_df = result_df.select(
        to_json(struct(
            col("tweet_id"),
            col("tweet_text"),
            col("sentiment"),
            col("author_id"),
            col("created_at")
        )).alias("value")
    )

    dashboard_query = dashboard_df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "sentiment-results") \
        .option("checkpointLocation", "/tmp/checkpoint-dashboard") \
        .outputMode("append") \
        .trigger(processingTime='5 seconds') \
        .start()

    logger.info("Dashboard output stream started")
    logger.info("Starting sentiment analysis consumer...")
    logger.info("Processing tweets and sending results to dashboard...")
    logger.info("Topics: sentiment-topic (input) -> sentiment-results (output)")

    # Wait for both streams
    spark.streams.awaitAnyTermination()

except Exception as e:
    logger.error(f"Error in consumer: {e}")
    raise