Spaces:
Sleeping
Sleeping
File size: 3,991 Bytes
e18a159 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, from_json, to_json, struct
from pyspark.sql.types import StringType, StructType, StructField, LongType, DoubleType
import time
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def simple_sentiment(text):
if text is None:
return 'neutral'
text = text.lower()
if any(word in text for word in ['good', 'great', 'awesome', 'happy', 'love', 'excellent', 'amazing']):
return 'positive'
elif any(word in text for word in ['bad', 'terrible', 'awful', 'sad', 'hate', 'worst', 'horrible']):
return 'negative'
return 'neutral'
sentiment_udf = udf(simple_sentiment, StringType())
logger.info("Waiting for services to start...")
time.sleep(15) # Wait for services
try:
spark = SparkSession.builder \
.appName("KafkaSentimentConsumer") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.config("spark.sql.adaptive.enabled", "false") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "false") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
logger.info("Spark session created successfully")
tweet_schema = StructType([
StructField("id", LongType(), True),
StructField("text", StringType(), True),
StructField("created_at", StringType(), True),
StructField("author_id", LongType(), True),
StructField("timestamp", DoubleType(), True)
])
logger.info("Connecting to Kafka...")
# Read from input topic
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "sentiment-topic") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
logger.info("Connected to Kafka, processing tweets...")
# Parse and process tweets
parsed_df = df.select(
col("timestamp").alias("kafka_timestamp"),
from_json(col("value").cast("string"), tweet_schema).alias("tweet_data")
).filter(col("tweet_data").isNotNull())
result_df = parsed_df.select(
col("tweet_data.id").alias("tweet_id"),
col("tweet_data.text").alias("tweet_text"),
col("tweet_data.created_at").alias("created_at"),
col("tweet_data.author_id").alias("author_id"),
col("kafka_timestamp")
).withColumn("sentiment", sentiment_udf(col("tweet_text")))
# Create a copy for console output
console_query = result_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime='5 seconds') \
.start()
logger.info("Console output stream started")
# Send results to dashboard topic
dashboard_df = result_df.select(
to_json(struct(
col("tweet_id"),
col("tweet_text"),
col("sentiment"),
col("author_id"),
col("created_at")
)).alias("value")
)
dashboard_query = dashboard_df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "sentiment-results") \
.option("checkpointLocation", "/tmp/checkpoint-dashboard") \
.outputMode("append") \
.trigger(processingTime='5 seconds') \
.start()
logger.info("Dashboard output stream started")
logger.info("Starting sentiment analysis consumer...")
logger.info("Processing tweets and sending results to dashboard...")
logger.info("Topics: sentiment-topic (input) -> sentiment-results (output)")
# Wait for both streams
spark.streams.awaitAnyTermination()
except Exception as e:
logger.error(f"Error in consumer: {e}")
raise |