Spaces:
Running
Running
| """Shared analysis helpers used by both live and worker paths.""" | |
| import time | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from app.core.config import settings | |
| from app.models.schemas import ( | |
| PredictionType, | |
| SentimentType, | |
| TopicSentiment, | |
| UserCountPrediction, | |
| ) | |
| def calculate_prediction(topics: list[TopicSentiment]) -> UserCountPrediction: | |
| """Compute the player-count trend prediction from aggregated topics.""" | |
| topic_map = {t.topic: t for t in topics} | |
| retention = topic_map.get("Retention") | |
| if retention and retention.mention_count > 5: | |
| if retention.score > settings.prediction_retention_threshold_pos: | |
| return UserCountPrediction( | |
| trend=PredictionType.INCREASING, | |
| confidence=min(0.95, 0.5 + (retention.mention_count / 100)), | |
| reasoning="PREDICTION_REASONING_RETENTION_HIGH", | |
| ) | |
| if retention.score < settings.prediction_retention_threshold_neg: | |
| return UserCountPrediction( | |
| trend=PredictionType.DECREASING, | |
| confidence=min(0.95, 0.5 + (retention.mention_count / 100)), | |
| reasoning="PREDICTION_REASONING_RETENTION_LOW", | |
| ) | |
| bugs = topic_map.get("Bugs") | |
| performance = topic_map.get("Performance") | |
| tech_score = 0.0 | |
| tech_count = 0 | |
| if bugs: | |
| tech_score += bugs.score | |
| tech_count += 1 | |
| if performance: | |
| tech_score += performance.score | |
| tech_count += 1 | |
| if tech_count > 0 and (tech_score / tech_count) < -0.3: | |
| return UserCountPrediction( | |
| trend=PredictionType.DECREASING, | |
| confidence=0.75, | |
| reasoning="PREDICTION_REASONING_TECH_ISSUES", | |
| ) | |
| gameplay = topic_map.get("Gameplay") | |
| fun = topic_map.get("Fun") | |
| gameplay_score = 0.0 | |
| gameplay_count = 0 | |
| if gameplay: | |
| gameplay_score += gameplay.score | |
| gameplay_count += 1 | |
| if fun: | |
| gameplay_score += fun.score | |
| gameplay_count += 1 | |
| if gameplay_count > 0: | |
| average_gameplay = gameplay_score / gameplay_count | |
| if average_gameplay > 0.4: | |
| return UserCountPrediction( | |
| trend=PredictionType.INCREASING, | |
| confidence=0.8, | |
| reasoning="PREDICTION_REASONING_GAMEPLAY_HIGH", | |
| ) | |
| if average_gameplay < -0.2: | |
| return UserCountPrediction( | |
| trend=PredictionType.DECREASING, | |
| confidence=0.6, | |
| reasoning="PREDICTION_REASONING_GAMEPLAY_LOW", | |
| ) | |
| return UserCountPrediction( | |
| trend=PredictionType.STABLE, | |
| confidence=0.5, | |
| reasoning="PREDICTION_REASONING_STABLE", | |
| ) | |
| def aggregate_topics( | |
| existing: list[TopicSentiment], | |
| new_batch: list[TopicSentiment], | |
| ) -> list[TopicSentiment]: | |
| """Merge topic aggregates using weighted mention counts.""" | |
| topic_data: dict[str, dict[str, Any]] = {} | |
| def better_example( | |
| current: tuple[str, float] | None, | |
| new: tuple[str, float] | None, | |
| ) -> tuple[str, float] | None: | |
| if new is None: | |
| return current | |
| if current is None: | |
| return new | |
| return new if abs(new[1]) > abs(current[1]) else current | |
| for topic in existing: | |
| if topic.topic not in topic_data: | |
| topic_data[topic.topic] = {"scores": [], "count": 0, "example": None} | |
| topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count) | |
| topic_data[topic.topic]["count"] += topic.mention_count | |
| new_example = ( | |
| (topic.example, topic.example_score) | |
| if topic.example and topic.example_score is not None | |
| else None | |
| ) | |
| topic_data[topic.topic]["example"] = better_example( | |
| topic_data[topic.topic]["example"], | |
| new_example, | |
| ) | |
| for topic in new_batch: | |
| if topic.topic not in topic_data: | |
| topic_data[topic.topic] = {"scores": [], "count": 0, "example": None} | |
| topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count) | |
| topic_data[topic.topic]["count"] += topic.mention_count | |
| new_example = ( | |
| (topic.example, topic.example_score) | |
| if topic.example and topic.example_score is not None | |
| else None | |
| ) | |
| topic_data[topic.topic]["example"] = better_example( | |
| topic_data[topic.topic]["example"], | |
| new_example, | |
| ) | |
| results: list[TopicSentiment] = [] | |
| for topic_name, data in topic_data.items(): | |
| count = data["count"] | |
| if count == 0: | |
| continue | |
| average_score = sum(data["scores"]) / count | |
| normalized_score = max(-1.0, min(1.0, average_score)) | |
| if normalized_score > settings.sentiment_positive_threshold: | |
| sentiment = SentimentType.POSITIVE | |
| elif normalized_score < settings.sentiment_negative_threshold: | |
| sentiment = SentimentType.NEGATIVE | |
| else: | |
| sentiment = SentimentType.NEUTRAL | |
| best_example = None | |
| example_score = None | |
| example_data = data["example"] | |
| if example_data: | |
| example_text, candidate_score = example_data | |
| if sentiment == SentimentType.NEUTRAL or ( | |
| sentiment == SentimentType.POSITIVE and candidate_score > 0 | |
| ) or ( | |
| sentiment == SentimentType.NEGATIVE and candidate_score < 0 | |
| ): | |
| best_example = example_text | |
| example_score = candidate_score | |
| results.append( | |
| TopicSentiment( | |
| topic=topic_name, | |
| sentiment=sentiment, | |
| score=round(normalized_score, 3), | |
| mention_count=count, | |
| example=best_example, | |
| example_score=example_score, | |
| ) | |
| ) | |
| results.sort(key=lambda item: item.mention_count, reverse=True) | |
| return results | |
| def scale_topics(topics: list[TopicSentiment], factor: float) -> list[TopicSentiment]: | |
| """Scale mention counts for the approximate recent sliding window.""" | |
| return [ | |
| topic.model_copy(update={"mention_count": max(1, int(topic.mention_count * factor))}) | |
| for topic in topics | |
| ] | |
| def filter_topics_by_min_mentions( | |
| topics: list[TopicSentiment], | |
| min_mentions: int | None = None, | |
| ) -> list[TopicSentiment]: | |
| """Filter topics below the minimum mention threshold. | |
| Preserves existing sort order. Only filters — does not modify score or sentiment. | |
| Applied at the final aggregate level, never at the per-review level. | |
| """ | |
| threshold = min_mentions if min_mentions is not None else settings.topic_min_mentions | |
| return [t for t in topics if t.mention_count >= threshold] | |
| def compute_preferred_context(patch_timestamp: int | None) -> str: | |
| """Choose the default user-facing context tab. | |
| Returns 'current_patch' only when a recent major patch exists; otherwise | |
| returns 'general' so the UI defaults to the full-picture view. | |
| """ | |
| if patch_timestamp is None: | |
| return "general" | |
| patch_age_days = (time.time() - patch_timestamp) / 86400 | |
| if patch_age_days > settings.patch_context_max_age_days: | |
| return "general" | |
| return "current_patch" | |
| _LEGACY_FIELD_MAP = { | |
| "topics": "general_topics", | |
| "historical_topics": "general_topics", | |
| "post_update_topics": "current_patch_topics", | |
| "post_update_reviews_count": "current_patch_reviews_count", | |
| "post_update_highlights": "current_patch_highlights", | |
| "previous_update_topics": "last_patch_topics", | |
| "previous_update_reviews_count": "last_patch_reviews_count", | |
| "last_update_timestamp": "current_patch_timestamp", | |
| } | |
| def normalize_legacy_results(results: dict[str, Any]) -> dict[str, Any]: | |
| """Map legacy persisted result fields to the current schema.""" | |
| normalized: dict[str, Any] = {} | |
| for key, value in results.items(): | |
| new_key = _LEGACY_FIELD_MAP.get(key, key) | |
| if key == "is_incremental": | |
| continue | |
| if new_key not in normalized: | |
| normalized[new_key] = value | |
| return normalized | |
| def serialize_datetime(value: Any) -> str | Any: | |
| """Serialize datetimes in SSE payloads and persisted compatibility helpers.""" | |
| if isinstance(value, datetime): | |
| return value.isoformat() | |
| return value | |
| def coerce_utc_datetime(value: Any) -> datetime | None: | |
| """Coerce persisted datetime values into timezone-aware UTC datetimes.""" | |
| if isinstance(value, datetime): | |
| return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc) | |
| if isinstance(value, str): | |
| parsed = datetime.fromisoformat(value) | |
| return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc) | |
| return None | |
| def datetime_from_timestamp(timestamp: int | None) -> datetime | None: | |
| """Convert a unix timestamp into UTC datetime.""" | |
| if timestamp is None: | |
| return None | |
| return datetime.fromtimestamp(timestamp, tz=timezone.utc) | |