Spaces:
Running
Running
File size: 9,094 Bytes
8ff1b66 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 | """Shared analysis helpers used by both live and worker paths."""
import time
from datetime import datetime, timezone
from typing import Any
from app.core.config import settings
from app.models.schemas import (
PredictionType,
SentimentType,
TopicSentiment,
UserCountPrediction,
)
def calculate_prediction(topics: list[TopicSentiment]) -> UserCountPrediction:
"""Compute the player-count trend prediction from aggregated topics."""
topic_map = {t.topic: t for t in topics}
retention = topic_map.get("Retention")
if retention and retention.mention_count > 5:
if retention.score > settings.prediction_retention_threshold_pos:
return UserCountPrediction(
trend=PredictionType.INCREASING,
confidence=min(0.95, 0.5 + (retention.mention_count / 100)),
reasoning="PREDICTION_REASONING_RETENTION_HIGH",
)
if retention.score < settings.prediction_retention_threshold_neg:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=min(0.95, 0.5 + (retention.mention_count / 100)),
reasoning="PREDICTION_REASONING_RETENTION_LOW",
)
bugs = topic_map.get("Bugs")
performance = topic_map.get("Performance")
tech_score = 0.0
tech_count = 0
if bugs:
tech_score += bugs.score
tech_count += 1
if performance:
tech_score += performance.score
tech_count += 1
if tech_count > 0 and (tech_score / tech_count) < -0.3:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=0.75,
reasoning="PREDICTION_REASONING_TECH_ISSUES",
)
gameplay = topic_map.get("Gameplay")
fun = topic_map.get("Fun")
gameplay_score = 0.0
gameplay_count = 0
if gameplay:
gameplay_score += gameplay.score
gameplay_count += 1
if fun:
gameplay_score += fun.score
gameplay_count += 1
if gameplay_count > 0:
average_gameplay = gameplay_score / gameplay_count
if average_gameplay > 0.4:
return UserCountPrediction(
trend=PredictionType.INCREASING,
confidence=0.8,
reasoning="PREDICTION_REASONING_GAMEPLAY_HIGH",
)
if average_gameplay < -0.2:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=0.6,
reasoning="PREDICTION_REASONING_GAMEPLAY_LOW",
)
return UserCountPrediction(
trend=PredictionType.STABLE,
confidence=0.5,
reasoning="PREDICTION_REASONING_STABLE",
)
def aggregate_topics(
existing: list[TopicSentiment],
new_batch: list[TopicSentiment],
) -> list[TopicSentiment]:
"""Merge topic aggregates using weighted mention counts."""
topic_data: dict[str, dict[str, Any]] = {}
def better_example(
current: tuple[str, float] | None,
new: tuple[str, float] | None,
) -> tuple[str, float] | None:
if new is None:
return current
if current is None:
return new
return new if abs(new[1]) > abs(current[1]) else current
for topic in existing:
if topic.topic not in topic_data:
topic_data[topic.topic] = {"scores": [], "count": 0, "example": None}
topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count)
topic_data[topic.topic]["count"] += topic.mention_count
new_example = (
(topic.example, topic.example_score)
if topic.example and topic.example_score is not None
else None
)
topic_data[topic.topic]["example"] = better_example(
topic_data[topic.topic]["example"],
new_example,
)
for topic in new_batch:
if topic.topic not in topic_data:
topic_data[topic.topic] = {"scores": [], "count": 0, "example": None}
topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count)
topic_data[topic.topic]["count"] += topic.mention_count
new_example = (
(topic.example, topic.example_score)
if topic.example and topic.example_score is not None
else None
)
topic_data[topic.topic]["example"] = better_example(
topic_data[topic.topic]["example"],
new_example,
)
results: list[TopicSentiment] = []
for topic_name, data in topic_data.items():
count = data["count"]
if count == 0:
continue
average_score = sum(data["scores"]) / count
normalized_score = max(-1.0, min(1.0, average_score))
if normalized_score > settings.sentiment_positive_threshold:
sentiment = SentimentType.POSITIVE
elif normalized_score < settings.sentiment_negative_threshold:
sentiment = SentimentType.NEGATIVE
else:
sentiment = SentimentType.NEUTRAL
best_example = None
example_score = None
example_data = data["example"]
if example_data:
example_text, candidate_score = example_data
if sentiment == SentimentType.NEUTRAL or (
sentiment == SentimentType.POSITIVE and candidate_score > 0
) or (
sentiment == SentimentType.NEGATIVE and candidate_score < 0
):
best_example = example_text
example_score = candidate_score
results.append(
TopicSentiment(
topic=topic_name,
sentiment=sentiment,
score=round(normalized_score, 3),
mention_count=count,
example=best_example,
example_score=example_score,
)
)
results.sort(key=lambda item: item.mention_count, reverse=True)
return results
def scale_topics(topics: list[TopicSentiment], factor: float) -> list[TopicSentiment]:
"""Scale mention counts for the approximate recent sliding window."""
return [
topic.model_copy(update={"mention_count": max(1, int(topic.mention_count * factor))})
for topic in topics
]
def filter_topics_by_min_mentions(
topics: list[TopicSentiment],
min_mentions: int | None = None,
) -> list[TopicSentiment]:
"""Filter topics below the minimum mention threshold.
Preserves existing sort order. Only filters — does not modify score or sentiment.
Applied at the final aggregate level, never at the per-review level.
"""
threshold = min_mentions if min_mentions is not None else settings.topic_min_mentions
return [t for t in topics if t.mention_count >= threshold]
def compute_preferred_context(patch_timestamp: int | None) -> str:
"""Choose the default user-facing context tab.
Returns 'current_patch' only when a recent major patch exists; otherwise
returns 'general' so the UI defaults to the full-picture view.
"""
if patch_timestamp is None:
return "general"
patch_age_days = (time.time() - patch_timestamp) / 86400
if patch_age_days > settings.patch_context_max_age_days:
return "general"
return "current_patch"
_LEGACY_FIELD_MAP = {
"topics": "general_topics",
"historical_topics": "general_topics",
"post_update_topics": "current_patch_topics",
"post_update_reviews_count": "current_patch_reviews_count",
"post_update_highlights": "current_patch_highlights",
"previous_update_topics": "last_patch_topics",
"previous_update_reviews_count": "last_patch_reviews_count",
"last_update_timestamp": "current_patch_timestamp",
}
def normalize_legacy_results(results: dict[str, Any]) -> dict[str, Any]:
"""Map legacy persisted result fields to the current schema."""
normalized: dict[str, Any] = {}
for key, value in results.items():
new_key = _LEGACY_FIELD_MAP.get(key, key)
if key == "is_incremental":
continue
if new_key not in normalized:
normalized[new_key] = value
return normalized
def serialize_datetime(value: Any) -> str | Any:
"""Serialize datetimes in SSE payloads and persisted compatibility helpers."""
if isinstance(value, datetime):
return value.isoformat()
return value
def coerce_utc_datetime(value: Any) -> datetime | None:
"""Coerce persisted datetime values into timezone-aware UTC datetimes."""
if isinstance(value, datetime):
return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc)
if isinstance(value, str):
parsed = datetime.fromisoformat(value)
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc)
return None
def datetime_from_timestamp(timestamp: int | None) -> datetime | None:
"""Convert a unix timestamp into UTC datetime."""
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
|