from pyspark.sql import SparkSession from pyspark.sql.functions import col, udf, from_json, to_json, struct from pyspark.sql.types import StringType, StructType, StructField, LongType, DoubleType import time import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def simple_sentiment(text): if text is None: return 'neutral' text = text.lower() if any(word in text for word in ['good', 'great', 'awesome', 'happy', 'love', 'excellent', 'amazing']): return 'positive' elif any(word in text for word in ['bad', 'terrible', 'awful', 'sad', 'hate', 'worst', 'horrible']): return 'negative' return 'neutral' sentiment_udf = udf(simple_sentiment, StringType()) logger.info("Waiting for services to start...") time.sleep(15) # Wait for services try: spark = SparkSession.builder \ .appName("KafkaSentimentConsumer") \ .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \ .config("spark.sql.adaptive.enabled", "false") \ .config("spark.sql.adaptive.coalescePartitions.enabled", "false") \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") logger.info("Spark session created successfully") tweet_schema = StructType([ StructField("id", LongType(), True), StructField("text", StringType(), True), StructField("created_at", StringType(), True), StructField("author_id", LongType(), True), StructField("timestamp", DoubleType(), True) ]) logger.info("Connecting to Kafka...") # Read from input topic df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka:9092") \ .option("subscribe", "sentiment-topic") \ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "false") \ .load() logger.info("Connected to Kafka, processing tweets...") # Parse and process tweets parsed_df = df.select( col("timestamp").alias("kafka_timestamp"), from_json(col("value").cast("string"), tweet_schema).alias("tweet_data") ).filter(col("tweet_data").isNotNull()) result_df = parsed_df.select( col("tweet_data.id").alias("tweet_id"), col("tweet_data.text").alias("tweet_text"), col("tweet_data.created_at").alias("created_at"), col("tweet_data.author_id").alias("author_id"), col("kafka_timestamp") ).withColumn("sentiment", sentiment_udf(col("tweet_text"))) # Create a copy for console output console_query = result_df.writeStream \ .outputMode("append") \ .format("console") \ .option("truncate", False) \ .trigger(processingTime='5 seconds') \ .start() logger.info("Console output stream started") # Send results to dashboard topic dashboard_df = result_df.select( to_json(struct( col("tweet_id"), col("tweet_text"), col("sentiment"), col("author_id"), col("created_at") )).alias("value") ) dashboard_query = dashboard_df.writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka:9092") \ .option("topic", "sentiment-results") \ .option("checkpointLocation", "/tmp/checkpoint-dashboard") \ .outputMode("append") \ .trigger(processingTime='5 seconds') \ .start() logger.info("Dashboard output stream started") logger.info("Starting sentiment analysis consumer...") logger.info("Processing tweets and sending results to dashboard...") logger.info("Topics: sentiment-topic (input) -> sentiment-results (output)") # Wait for both streams spark.streams.awaitAnyTermination() except Exception as e: logger.error(f"Error in consumer: {e}") raise