"""Shared analysis helpers used by both live and worker paths.""" import time from datetime import datetime, timezone from typing import Any from app.core.config import settings from app.models.schemas import ( PredictionType, SentimentType, TopicSentiment, UserCountPrediction, ) def calculate_prediction(topics: list[TopicSentiment]) -> UserCountPrediction: """Compute the player-count trend prediction from aggregated topics.""" topic_map = {t.topic: t for t in topics} retention = topic_map.get("Retention") if retention and retention.mention_count > 5: if retention.score > settings.prediction_retention_threshold_pos: return UserCountPrediction( trend=PredictionType.INCREASING, confidence=min(0.95, 0.5 + (retention.mention_count / 100)), reasoning="PREDICTION_REASONING_RETENTION_HIGH", ) if retention.score < settings.prediction_retention_threshold_neg: return UserCountPrediction( trend=PredictionType.DECREASING, confidence=min(0.95, 0.5 + (retention.mention_count / 100)), reasoning="PREDICTION_REASONING_RETENTION_LOW", ) bugs = topic_map.get("Bugs") performance = topic_map.get("Performance") tech_score = 0.0 tech_count = 0 if bugs: tech_score += bugs.score tech_count += 1 if performance: tech_score += performance.score tech_count += 1 if tech_count > 0 and (tech_score / tech_count) < -0.3: return UserCountPrediction( trend=PredictionType.DECREASING, confidence=0.75, reasoning="PREDICTION_REASONING_TECH_ISSUES", ) gameplay = topic_map.get("Gameplay") fun = topic_map.get("Fun") gameplay_score = 0.0 gameplay_count = 0 if gameplay: gameplay_score += gameplay.score gameplay_count += 1 if fun: gameplay_score += fun.score gameplay_count += 1 if gameplay_count > 0: average_gameplay = gameplay_score / gameplay_count if average_gameplay > 0.4: return UserCountPrediction( trend=PredictionType.INCREASING, confidence=0.8, reasoning="PREDICTION_REASONING_GAMEPLAY_HIGH", ) if average_gameplay < -0.2: return UserCountPrediction( trend=PredictionType.DECREASING, confidence=0.6, reasoning="PREDICTION_REASONING_GAMEPLAY_LOW", ) return UserCountPrediction( trend=PredictionType.STABLE, confidence=0.5, reasoning="PREDICTION_REASONING_STABLE", ) def aggregate_topics( existing: list[TopicSentiment], new_batch: list[TopicSentiment], ) -> list[TopicSentiment]: """Merge topic aggregates using weighted mention counts.""" topic_data: dict[str, dict[str, Any]] = {} def better_example( current: tuple[str, float] | None, new: tuple[str, float] | None, ) -> tuple[str, float] | None: if new is None: return current if current is None: return new return new if abs(new[1]) > abs(current[1]) else current for topic in existing: if topic.topic not in topic_data: topic_data[topic.topic] = {"scores": [], "count": 0, "example": None} topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count) topic_data[topic.topic]["count"] += topic.mention_count new_example = ( (topic.example, topic.example_score) if topic.example and topic.example_score is not None else None ) topic_data[topic.topic]["example"] = better_example( topic_data[topic.topic]["example"], new_example, ) for topic in new_batch: if topic.topic not in topic_data: topic_data[topic.topic] = {"scores": [], "count": 0, "example": None} topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count) topic_data[topic.topic]["count"] += topic.mention_count new_example = ( (topic.example, topic.example_score) if topic.example and topic.example_score is not None else None ) topic_data[topic.topic]["example"] = better_example( topic_data[topic.topic]["example"], new_example, ) results: list[TopicSentiment] = [] for topic_name, data in topic_data.items(): count = data["count"] if count == 0: continue average_score = sum(data["scores"]) / count normalized_score = max(-1.0, min(1.0, average_score)) if normalized_score > settings.sentiment_positive_threshold: sentiment = SentimentType.POSITIVE elif normalized_score < settings.sentiment_negative_threshold: sentiment = SentimentType.NEGATIVE else: sentiment = SentimentType.NEUTRAL best_example = None example_score = None example_data = data["example"] if example_data: example_text, candidate_score = example_data if sentiment == SentimentType.NEUTRAL or ( sentiment == SentimentType.POSITIVE and candidate_score > 0 ) or ( sentiment == SentimentType.NEGATIVE and candidate_score < 0 ): best_example = example_text example_score = candidate_score results.append( TopicSentiment( topic=topic_name, sentiment=sentiment, score=round(normalized_score, 3), mention_count=count, example=best_example, example_score=example_score, ) ) results.sort(key=lambda item: item.mention_count, reverse=True) return results def scale_topics(topics: list[TopicSentiment], factor: float) -> list[TopicSentiment]: """Scale mention counts for the approximate recent sliding window.""" return [ topic.model_copy(update={"mention_count": max(1, int(topic.mention_count * factor))}) for topic in topics ] def filter_topics_by_min_mentions( topics: list[TopicSentiment], min_mentions: int | None = None, ) -> list[TopicSentiment]: """Filter topics below the minimum mention threshold. Preserves existing sort order. Only filters — does not modify score or sentiment. Applied at the final aggregate level, never at the per-review level. """ threshold = min_mentions if min_mentions is not None else settings.topic_min_mentions return [t for t in topics if t.mention_count >= threshold] def compute_preferred_context(patch_timestamp: int | None) -> str: """Choose the default user-facing context tab. Returns 'current_patch' only when a recent major patch exists; otherwise returns 'general' so the UI defaults to the full-picture view. """ if patch_timestamp is None: return "general" patch_age_days = (time.time() - patch_timestamp) / 86400 if patch_age_days > settings.patch_context_max_age_days: return "general" return "current_patch" _LEGACY_FIELD_MAP = { "topics": "general_topics", "historical_topics": "general_topics", "post_update_topics": "current_patch_topics", "post_update_reviews_count": "current_patch_reviews_count", "post_update_highlights": "current_patch_highlights", "previous_update_topics": "last_patch_topics", "previous_update_reviews_count": "last_patch_reviews_count", "last_update_timestamp": "current_patch_timestamp", } def normalize_legacy_results(results: dict[str, Any]) -> dict[str, Any]: """Map legacy persisted result fields to the current schema.""" normalized: dict[str, Any] = {} for key, value in results.items(): new_key = _LEGACY_FIELD_MAP.get(key, key) if key == "is_incremental": continue if new_key not in normalized: normalized[new_key] = value return normalized def serialize_datetime(value: Any) -> str | Any: """Serialize datetimes in SSE payloads and persisted compatibility helpers.""" if isinstance(value, datetime): return value.isoformat() return value def coerce_utc_datetime(value: Any) -> datetime | None: """Coerce persisted datetime values into timezone-aware UTC datetimes.""" if isinstance(value, datetime): return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc) if isinstance(value, str): parsed = datetime.fromisoformat(value) return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc) return None def datetime_from_timestamp(timestamp: int | None) -> datetime | None: """Convert a unix timestamp into UTC datetime.""" if timestamp is None: return None return datetime.fromtimestamp(timestamp, tz=timezone.utc)