Spaces:
Sleeping
Sleeping
| from pyspark.sql import SparkSession | |
| from pyspark.sql.functions import col, udf, from_json, to_json, struct | |
| from pyspark.sql.types import StringType, StructType, StructField, LongType, DoubleType | |
| import time | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def simple_sentiment(text): | |
| if text is None: | |
| return 'neutral' | |
| text = text.lower() | |
| if any(word in text for word in ['good', 'great', 'awesome', 'happy', 'love', 'excellent', 'amazing']): | |
| return 'positive' | |
| elif any(word in text for word in ['bad', 'terrible', 'awful', 'sad', 'hate', 'worst', 'horrible']): | |
| return 'negative' | |
| return 'neutral' | |
| sentiment_udf = udf(simple_sentiment, StringType()) | |
| logger.info("Waiting for services to start...") | |
| time.sleep(15) # Wait for services | |
| try: | |
| spark = SparkSession.builder \ | |
| .appName("KafkaSentimentConsumer") \ | |
| .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \ | |
| .config("spark.sql.adaptive.enabled", "false") \ | |
| .config("spark.sql.adaptive.coalescePartitions.enabled", "false") \ | |
| .getOrCreate() | |
| spark.sparkContext.setLogLevel("WARN") | |
| logger.info("Spark session created successfully") | |
| tweet_schema = StructType([ | |
| StructField("id", LongType(), True), | |
| StructField("text", StringType(), True), | |
| StructField("created_at", StringType(), True), | |
| StructField("author_id", LongType(), True), | |
| StructField("timestamp", DoubleType(), True) | |
| ]) | |
| logger.info("Connecting to Kafka...") | |
| # Read from input topic | |
| df = spark.readStream \ | |
| .format("kafka") \ | |
| .option("kafka.bootstrap.servers", "kafka:9092") \ | |
| .option("subscribe", "sentiment-topic") \ | |
| .option("startingOffsets", "latest") \ | |
| .option("failOnDataLoss", "false") \ | |
| .load() | |
| logger.info("Connected to Kafka, processing tweets...") | |
| # Parse and process tweets | |
| parsed_df = df.select( | |
| col("timestamp").alias("kafka_timestamp"), | |
| from_json(col("value").cast("string"), tweet_schema).alias("tweet_data") | |
| ).filter(col("tweet_data").isNotNull()) | |
| result_df = parsed_df.select( | |
| col("tweet_data.id").alias("tweet_id"), | |
| col("tweet_data.text").alias("tweet_text"), | |
| col("tweet_data.created_at").alias("created_at"), | |
| col("tweet_data.author_id").alias("author_id"), | |
| col("kafka_timestamp") | |
| ).withColumn("sentiment", sentiment_udf(col("tweet_text"))) | |
| # Create a copy for console output | |
| console_query = result_df.writeStream \ | |
| .outputMode("append") \ | |
| .format("console") \ | |
| .option("truncate", False) \ | |
| .trigger(processingTime='5 seconds') \ | |
| .start() | |
| logger.info("Console output stream started") | |
| # Send results to dashboard topic | |
| dashboard_df = result_df.select( | |
| to_json(struct( | |
| col("tweet_id"), | |
| col("tweet_text"), | |
| col("sentiment"), | |
| col("author_id"), | |
| col("created_at") | |
| )).alias("value") | |
| ) | |
| dashboard_query = dashboard_df.writeStream \ | |
| .format("kafka") \ | |
| .option("kafka.bootstrap.servers", "kafka:9092") \ | |
| .option("topic", "sentiment-results") \ | |
| .option("checkpointLocation", "/tmp/checkpoint-dashboard") \ | |
| .outputMode("append") \ | |
| .trigger(processingTime='5 seconds') \ | |
| .start() | |
| logger.info("Dashboard output stream started") | |
| logger.info("Starting sentiment analysis consumer...") | |
| logger.info("Processing tweets and sending results to dashboard...") | |
| logger.info("Topics: sentiment-topic (input) -> sentiment-results (output)") | |
| # Wait for both streams | |
| spark.streams.awaitAnyTermination() | |
| except Exception as e: | |
| logger.error(f"Error in consumer: {e}") | |
| raise |