sentimentstream-worker / backend /app /services /analysis_utils.py
GitHub Action
deploy: worker release from GitHub
8ff1b66
"""Shared analysis helpers used by both live and worker paths."""
import time
from datetime import datetime, timezone
from typing import Any
from app.core.config import settings
from app.models.schemas import (
PredictionType,
SentimentType,
TopicSentiment,
UserCountPrediction,
)
def calculate_prediction(topics: list[TopicSentiment]) -> UserCountPrediction:
"""Compute the player-count trend prediction from aggregated topics."""
topic_map = {t.topic: t for t in topics}
retention = topic_map.get("Retention")
if retention and retention.mention_count > 5:
if retention.score > settings.prediction_retention_threshold_pos:
return UserCountPrediction(
trend=PredictionType.INCREASING,
confidence=min(0.95, 0.5 + (retention.mention_count / 100)),
reasoning="PREDICTION_REASONING_RETENTION_HIGH",
)
if retention.score < settings.prediction_retention_threshold_neg:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=min(0.95, 0.5 + (retention.mention_count / 100)),
reasoning="PREDICTION_REASONING_RETENTION_LOW",
)
bugs = topic_map.get("Bugs")
performance = topic_map.get("Performance")
tech_score = 0.0
tech_count = 0
if bugs:
tech_score += bugs.score
tech_count += 1
if performance:
tech_score += performance.score
tech_count += 1
if tech_count > 0 and (tech_score / tech_count) < -0.3:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=0.75,
reasoning="PREDICTION_REASONING_TECH_ISSUES",
)
gameplay = topic_map.get("Gameplay")
fun = topic_map.get("Fun")
gameplay_score = 0.0
gameplay_count = 0
if gameplay:
gameplay_score += gameplay.score
gameplay_count += 1
if fun:
gameplay_score += fun.score
gameplay_count += 1
if gameplay_count > 0:
average_gameplay = gameplay_score / gameplay_count
if average_gameplay > 0.4:
return UserCountPrediction(
trend=PredictionType.INCREASING,
confidence=0.8,
reasoning="PREDICTION_REASONING_GAMEPLAY_HIGH",
)
if average_gameplay < -0.2:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=0.6,
reasoning="PREDICTION_REASONING_GAMEPLAY_LOW",
)
return UserCountPrediction(
trend=PredictionType.STABLE,
confidence=0.5,
reasoning="PREDICTION_REASONING_STABLE",
)
def aggregate_topics(
existing: list[TopicSentiment],
new_batch: list[TopicSentiment],
) -> list[TopicSentiment]:
"""Merge topic aggregates using weighted mention counts."""
topic_data: dict[str, dict[str, Any]] = {}
def better_example(
current: tuple[str, float] | None,
new: tuple[str, float] | None,
) -> tuple[str, float] | None:
if new is None:
return current
if current is None:
return new
return new if abs(new[1]) > abs(current[1]) else current
for topic in existing:
if topic.topic not in topic_data:
topic_data[topic.topic] = {"scores": [], "count": 0, "example": None}
topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count)
topic_data[topic.topic]["count"] += topic.mention_count
new_example = (
(topic.example, topic.example_score)
if topic.example and topic.example_score is not None
else None
)
topic_data[topic.topic]["example"] = better_example(
topic_data[topic.topic]["example"],
new_example,
)
for topic in new_batch:
if topic.topic not in topic_data:
topic_data[topic.topic] = {"scores": [], "count": 0, "example": None}
topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count)
topic_data[topic.topic]["count"] += topic.mention_count
new_example = (
(topic.example, topic.example_score)
if topic.example and topic.example_score is not None
else None
)
topic_data[topic.topic]["example"] = better_example(
topic_data[topic.topic]["example"],
new_example,
)
results: list[TopicSentiment] = []
for topic_name, data in topic_data.items():
count = data["count"]
if count == 0:
continue
average_score = sum(data["scores"]) / count
normalized_score = max(-1.0, min(1.0, average_score))
if normalized_score > settings.sentiment_positive_threshold:
sentiment = SentimentType.POSITIVE
elif normalized_score < settings.sentiment_negative_threshold:
sentiment = SentimentType.NEGATIVE
else:
sentiment = SentimentType.NEUTRAL
best_example = None
example_score = None
example_data = data["example"]
if example_data:
example_text, candidate_score = example_data
if sentiment == SentimentType.NEUTRAL or (
sentiment == SentimentType.POSITIVE and candidate_score > 0
) or (
sentiment == SentimentType.NEGATIVE and candidate_score < 0
):
best_example = example_text
example_score = candidate_score
results.append(
TopicSentiment(
topic=topic_name,
sentiment=sentiment,
score=round(normalized_score, 3),
mention_count=count,
example=best_example,
example_score=example_score,
)
)
results.sort(key=lambda item: item.mention_count, reverse=True)
return results
def scale_topics(topics: list[TopicSentiment], factor: float) -> list[TopicSentiment]:
"""Scale mention counts for the approximate recent sliding window."""
return [
topic.model_copy(update={"mention_count": max(1, int(topic.mention_count * factor))})
for topic in topics
]
def filter_topics_by_min_mentions(
topics: list[TopicSentiment],
min_mentions: int | None = None,
) -> list[TopicSentiment]:
"""Filter topics below the minimum mention threshold.
Preserves existing sort order. Only filters — does not modify score or sentiment.
Applied at the final aggregate level, never at the per-review level.
"""
threshold = min_mentions if min_mentions is not None else settings.topic_min_mentions
return [t for t in topics if t.mention_count >= threshold]
def compute_preferred_context(patch_timestamp: int | None) -> str:
"""Choose the default user-facing context tab.
Returns 'current_patch' only when a recent major patch exists; otherwise
returns 'general' so the UI defaults to the full-picture view.
"""
if patch_timestamp is None:
return "general"
patch_age_days = (time.time() - patch_timestamp) / 86400
if patch_age_days > settings.patch_context_max_age_days:
return "general"
return "current_patch"
_LEGACY_FIELD_MAP = {
"topics": "general_topics",
"historical_topics": "general_topics",
"post_update_topics": "current_patch_topics",
"post_update_reviews_count": "current_patch_reviews_count",
"post_update_highlights": "current_patch_highlights",
"previous_update_topics": "last_patch_topics",
"previous_update_reviews_count": "last_patch_reviews_count",
"last_update_timestamp": "current_patch_timestamp",
}
def normalize_legacy_results(results: dict[str, Any]) -> dict[str, Any]:
"""Map legacy persisted result fields to the current schema."""
normalized: dict[str, Any] = {}
for key, value in results.items():
new_key = _LEGACY_FIELD_MAP.get(key, key)
if key == "is_incremental":
continue
if new_key not in normalized:
normalized[new_key] = value
return normalized
def serialize_datetime(value: Any) -> str | Any:
"""Serialize datetimes in SSE payloads and persisted compatibility helpers."""
if isinstance(value, datetime):
return value.isoformat()
return value
def coerce_utc_datetime(value: Any) -> datetime | None:
"""Coerce persisted datetime values into timezone-aware UTC datetimes."""
if isinstance(value, datetime):
return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc)
if isinstance(value, str):
parsed = datetime.fromisoformat(value)
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc)
return None
def datetime_from_timestamp(timestamp: int | None) -> datetime | None:
"""Convert a unix timestamp into UTC datetime."""
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp, tz=timezone.utc)