sentiment-analysis / consumer.py
xtinkarpiu's picture
Upload folder using huggingface_hub
e18a159 verified
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, from_json, to_json, struct
from pyspark.sql.types import StringType, StructType, StructField, LongType, DoubleType
import time
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def simple_sentiment(text):
if text is None:
return 'neutral'
text = text.lower()
if any(word in text for word in ['good', 'great', 'awesome', 'happy', 'love', 'excellent', 'amazing']):
return 'positive'
elif any(word in text for word in ['bad', 'terrible', 'awful', 'sad', 'hate', 'worst', 'horrible']):
return 'negative'
return 'neutral'
sentiment_udf = udf(simple_sentiment, StringType())
logger.info("Waiting for services to start...")
time.sleep(15) # Wait for services
try:
spark = SparkSession.builder \
.appName("KafkaSentimentConsumer") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.config("spark.sql.adaptive.enabled", "false") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "false") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
logger.info("Spark session created successfully")
tweet_schema = StructType([
StructField("id", LongType(), True),
StructField("text", StringType(), True),
StructField("created_at", StringType(), True),
StructField("author_id", LongType(), True),
StructField("timestamp", DoubleType(), True)
])
logger.info("Connecting to Kafka...")
# Read from input topic
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "sentiment-topic") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
logger.info("Connected to Kafka, processing tweets...")
# Parse and process tweets
parsed_df = df.select(
col("timestamp").alias("kafka_timestamp"),
from_json(col("value").cast("string"), tweet_schema).alias("tweet_data")
).filter(col("tweet_data").isNotNull())
result_df = parsed_df.select(
col("tweet_data.id").alias("tweet_id"),
col("tweet_data.text").alias("tweet_text"),
col("tweet_data.created_at").alias("created_at"),
col("tweet_data.author_id").alias("author_id"),
col("kafka_timestamp")
).withColumn("sentiment", sentiment_udf(col("tweet_text")))
# Create a copy for console output
console_query = result_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime='5 seconds') \
.start()
logger.info("Console output stream started")
# Send results to dashboard topic
dashboard_df = result_df.select(
to_json(struct(
col("tweet_id"),
col("tweet_text"),
col("sentiment"),
col("author_id"),
col("created_at")
)).alias("value")
)
dashboard_query = dashboard_df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "sentiment-results") \
.option("checkpointLocation", "/tmp/checkpoint-dashboard") \
.outputMode("append") \
.trigger(processingTime='5 seconds') \
.start()
logger.info("Dashboard output stream started")
logger.info("Starting sentiment analysis consumer...")
logger.info("Processing tweets and sending results to dashboard...")
logger.info("Topics: sentiment-topic (input) -> sentiment-results (output)")
# Wait for both streams
spark.streams.awaitAnyTermination()
except Exception as e:
logger.error(f"Error in consumer: {e}")
raise