Spaces:
Running
Running
| """ | |
| Analysis Runner — programmatic (non-SSE) analysis for the Worker. | |
| Extracts the core full-analysis logic from analyze.py without SSE wrapping. | |
| Used by the pre-cache service to run analyses in the background. | |
| """ | |
| import asyncio | |
| import contextlib | |
| import json | |
| import logging | |
| import time | |
| from datetime import datetime, timezone | |
| from typing import Any, AsyncGenerator | |
| from app.core.config import settings | |
| from app.core.freshness import FreshnessStatus | |
| from app.core.sampling import create_sample_plan | |
| from app.core.ttl_tiers import get_ttl_hours | |
| from app.core.worker_logging import AsyncTimingContext, get_structured_logger, log_structured | |
| from app.db.mongodb import mongodb | |
| from app.models.schemas import ( | |
| AnalysisProgress, | |
| AnalysisResult, | |
| GameInfo, | |
| Highlight, | |
| TopicHighlights, | |
| TopicSentiment, | |
| ) | |
| from app.services.highlights_service import HighlightsCollector | |
| from app.services.analysis_utils import ( | |
| aggregate_topics, | |
| calculate_prediction, | |
| compute_preferred_context, | |
| datetime_from_timestamp, | |
| filter_topics_by_min_mentions, | |
| normalize_legacy_results, | |
| scale_topics, | |
| serialize_datetime, | |
| ) | |
| from app.services.nlp_service import NLPService | |
| from app.services.steam_service import SteamService | |
| logger = logging.getLogger(__name__) | |
| async def iter_incremental_analysis_events( | |
| game: GameInfo, | |
| stale_doc: dict[str, Any], | |
| steam_svc: SteamService, | |
| nlp_svc: NLPService, | |
| patch_timestamp: int | None = None, | |
| *, | |
| source: str = "live", | |
| ) -> AsyncGenerator[dict[str, str], None]: | |
| """Yield incremental-analysis progress and final result events.""" | |
| ttl_hours = await get_ttl_hours(game.app_id) | |
| old_results = normalize_legacy_results(stale_doc.get("results", {})) | |
| old_review_ids: list[str] = stale_doc.get("analyzed_review_ids", []) | |
| old_review_ids_set = set(old_review_ids) | |
| nlp_cumulative_s: float = 0.0 | |
| old_general = [TopicSentiment(**topic) for topic in old_results.get("general_topics", [])] | |
| old_recent = ( | |
| [TopicSentiment(**topic) for topic in old_results.get("recent_topics", [])] | |
| if old_results.get("recent_topics") | |
| else [] | |
| ) | |
| old_current_patch = ( | |
| [TopicSentiment(**topic) for topic in old_results.get("current_patch_topics", [])] | |
| if old_results.get("current_patch_topics") | |
| else [] | |
| ) | |
| old_last_patch = ( | |
| [TopicSentiment(**topic) for topic in old_results.get("last_patch_topics", [])] | |
| if old_results.get("last_patch_topics") | |
| else None | |
| ) | |
| old_last_patch_count = old_results.get("last_patch_reviews_count", 0) | |
| old_patch_ts = old_results.get("current_patch_timestamp") | |
| new_items = await steam_svc.fetch_recent_reviews( | |
| game.app_id, | |
| exclude_ids=old_review_ids_set, | |
| ) | |
| if not new_items: | |
| refreshed_at = datetime.now(timezone.utc) | |
| refreshed_results = { | |
| **old_results, | |
| "cached_at": refreshed_at, | |
| "analysis_date": refreshed_at, | |
| "current_patch_date": datetime_from_timestamp( | |
| patch_timestamp if patch_timestamp is not None else old_results.get("current_patch_timestamp") | |
| ), | |
| "freshness_status": FreshnessStatus.FRESH.value, | |
| "staleness_reason": None, | |
| "is_refreshing": False, | |
| } | |
| await mongodb.save_analysis( | |
| game.app_id, | |
| refreshed_results, | |
| analyzed_review_ids=old_review_ids, | |
| latest_review_timestamp=stale_doc.get("latest_review_timestamp", 0), | |
| ttl_hours=ttl_hours, | |
| analyzed_at=refreshed_at, | |
| ) | |
| yield { | |
| "event": "complete", | |
| "data": json.dumps(refreshed_results, default=serialize_datetime), | |
| } | |
| return | |
| new_texts = [item.text for item in new_items] | |
| new_review_ids = [item.recommendation_id for item in new_items] | |
| latest_timestamp = max( | |
| (item.timestamp_created for item in new_items), | |
| default=stale_doc.get("latest_review_timestamp", 0), | |
| ) | |
| batch_size = settings.review_batch_size | |
| delta_topics: list[TopicSentiment] = [] | |
| delta_current_patch_topics: list[TopicSentiment] = [] | |
| delta_current_patch_count = 0 | |
| highlights_collector = HighlightsCollector() | |
| processed = 0 | |
| total_skipped = 0 | |
| for i in range(0, len(new_texts), batch_size): | |
| chunk_texts = new_texts[i:i + batch_size] | |
| chunk_items = new_items[i:i + batch_size] | |
| batch_skipped = 0 | |
| if patch_timestamp: | |
| for review_item, text in zip(chunk_items, chunk_texts): | |
| categories = ["recent"] | |
| if review_item.timestamp_created >= patch_timestamp: | |
| categories.append("current_patch") | |
| nlp_start = time.monotonic() | |
| result_topics, skipped = await nlp_svc.analyze_batch( | |
| [text], | |
| highlights_collector=highlights_collector, | |
| categories=categories, | |
| ) | |
| nlp_cumulative_s += time.monotonic() - nlp_start | |
| batch_skipped += skipped | |
| if result_topics: | |
| delta_topics = aggregate_topics(delta_topics, result_topics) | |
| if review_item.timestamp_created >= patch_timestamp: | |
| delta_current_patch_topics = aggregate_topics( | |
| delta_current_patch_topics, | |
| result_topics, | |
| ) | |
| delta_current_patch_count += 1 | |
| total_skipped += batch_skipped | |
| else: | |
| nlp_start = time.monotonic() | |
| batch_results, batch_skipped = await nlp_svc.analyze_batch( | |
| chunk_texts, | |
| highlights_collector=highlights_collector, | |
| categories=["recent"], | |
| ) | |
| nlp_cumulative_s += time.monotonic() - nlp_start | |
| if batch_results: | |
| delta_topics = aggregate_topics(delta_topics, batch_results) | |
| total_skipped += batch_skipped | |
| processed += len(chunk_texts) | |
| progress = AnalysisProgress( | |
| processed=processed, | |
| total=len(new_texts), | |
| current_topics=delta_topics, | |
| skipped_count=total_skipped, | |
| ) | |
| yield {"event": "progress", "data": progress.model_dump_json()} | |
| new_general = aggregate_topics(old_general, delta_topics) | |
| old_recent_count = old_results.get("recent_reviews_count", 0) | |
| new_count = len(new_texts) | |
| if ( | |
| old_recent_count + new_count > settings.recent_sample_limit | |
| and old_recent | |
| and old_recent_count > 0 | |
| ): | |
| overflow = old_recent_count + new_count - settings.recent_sample_limit | |
| retain_ratio = max(0.2, 1.0 - overflow / old_recent_count) | |
| scaled_old = scale_topics(old_recent, retain_ratio) | |
| new_recent = aggregate_topics(scaled_old, delta_topics) | |
| recent_count = int(old_recent_count * retain_ratio) + new_count | |
| else: | |
| new_recent = aggregate_topics(old_recent, delta_topics) if old_recent else delta_topics | |
| recent_count = old_recent_count + new_count | |
| last_patch_topics = old_last_patch | |
| last_patch_count = old_last_patch_count | |
| if patch_timestamp and old_patch_ts and patch_timestamp != old_patch_ts: | |
| last_patch_topics = old_current_patch if old_current_patch else None | |
| last_patch_count = old_results.get("current_patch_reviews_count", 0) | |
| old_current_patch = [] | |
| new_current_patch = ( | |
| aggregate_topics(old_current_patch, delta_current_patch_topics) | |
| if old_current_patch | |
| else (delta_current_patch_topics if delta_current_patch_topics else []) | |
| ) | |
| base_current_patch_count = ( | |
| 0 | |
| if (patch_timestamp and old_patch_ts and patch_timestamp != old_patch_ts) | |
| else old_results.get("current_patch_reviews_count", 0) | |
| ) | |
| new_current_patch_count = base_current_patch_count + delta_current_patch_count | |
| has_current_patch = patch_timestamp is not None and ( | |
| new_current_patch_count > 0 or bool(old_current_patch) | |
| ) | |
| # Apply min-mentions filter on final aggregates (not per-review — see nlp_service.py). | |
| new_general = filter_topics_by_min_mentions(new_general) | |
| new_recent = filter_topics_by_min_mentions(new_recent) | |
| new_current_patch = filter_topics_by_min_mentions(new_current_patch) | |
| prediction = calculate_prediction(new_general) | |
| highlights_data = highlights_collector.compute_highlights() | |
| general_highlights = highlights_data["general"] | |
| recent_highlights = highlights_data["recent"] | |
| current_patch_highlights = highlights_data["current_patch"] | |
| topic_highlights_dict = highlights_data["topics"] | |
| # Restrict topic highlights to topics that survived the min-mentions filter, | |
| # so the topic_highlights set is always consistent with general_topics. | |
| _surviving_topics = {t.topic for t in new_general} | |
| topic_highlights_list = [ | |
| TopicHighlights( | |
| topic=topic, | |
| highlights=[Highlight(**highlight) for highlight in highlights], | |
| ) | |
| for topic, highlights in topic_highlights_dict.items() | |
| if topic in _surviving_topics | |
| ] | |
| merged_review_ids = old_review_ids + new_review_ids | |
| analysis_generated_at = datetime.now(timezone.utc) | |
| result = AnalysisResult( | |
| game=game, | |
| general_topics=new_general, | |
| recent_topics=new_recent, | |
| recent_reviews_count=recent_count, | |
| current_patch_topics=new_current_patch if has_current_patch else None, | |
| current_patch_reviews_count=new_current_patch_count if has_current_patch else 0, | |
| last_patch_topics=last_patch_topics, | |
| last_patch_reviews_count=last_patch_count, | |
| current_patch_timestamp=patch_timestamp, | |
| analysis_date=analysis_generated_at, | |
| current_patch_date=datetime_from_timestamp(patch_timestamp), | |
| prediction=prediction, | |
| analyzed_reviews=old_results.get("analyzed_reviews", 0) + processed, | |
| skipped_count=old_results.get("skipped_count", 0) + total_skipped, | |
| general_highlights=[Highlight(**highlight) for highlight in general_highlights], | |
| recent_highlights=[Highlight(**highlight) for highlight in recent_highlights] if recent_highlights else None, | |
| current_patch_highlights=[Highlight(**highlight) for highlight in current_patch_highlights] if current_patch_highlights else None, | |
| topic_highlights=topic_highlights_list, | |
| cached_at=analysis_generated_at, | |
| preferred_context=compute_preferred_context(patch_timestamp), | |
| freshness_status=FreshnessStatus.FRESH.value, | |
| is_refreshing=False, | |
| ) | |
| await mongodb.save_analysis( | |
| game.app_id, | |
| result.model_dump(), | |
| analyzed_review_ids=merged_review_ids, | |
| latest_review_timestamp=latest_timestamp, | |
| ttl_hours=ttl_hours, | |
| analyzed_at=analysis_generated_at, | |
| ) | |
| if get_structured_logger(): | |
| log_structured( | |
| "incremental_analysis_complete", | |
| app_id=game.app_id, | |
| game_name=game.name if hasattr(game, "name") else str(game.app_id), | |
| source=source, | |
| reviews_processed=processed, | |
| topics_found=len(new_general), | |
| detail={"nlp_cumulative_s": round(nlp_cumulative_s, 3)}, | |
| ) | |
| yield {"event": "complete", "data": result.model_dump_json()} | |
| async def run_incremental_analysis( | |
| app_id: str, | |
| game_name: str, | |
| steam_svc: SteamService, | |
| nlp_svc: NLPService, | |
| ) -> dict[str, Any] | None: | |
| """Run a non-SSE incremental analysis for worker jobs.""" | |
| slog = get_structured_logger() | |
| try: | |
| stale_doc = await mongodb.get_analysis(app_id) | |
| if not stale_doc or not stale_doc.get("results") or not stale_doc.get("analyzed_review_ids"): | |
| return await run_full_analysis(app_id, game_name, steam_svc, nlp_svc, stale_doc=stale_doc) | |
| # Long gap guard: if the most recent review we have is too old, Steam's cursor-based | |
| # API may not reliably surface all reviews since then. Fall back to full analysis. | |
| latest_ts = stale_doc.get("latest_review_timestamp", 0) | |
| if latest_ts > 0: | |
| gap_days = (time.time() - latest_ts) / 86400 | |
| if gap_days > settings.incremental_max_gap_days: | |
| logger.info( | |
| f"Incremental gap {gap_days:.0f}d > {settings.incremental_max_gap_days}d " | |
| f"for {app_id} ({game_name}) — falling back to full analysis" | |
| ) | |
| return await run_full_analysis(app_id, game_name, steam_svc, nlp_svc, stale_doc=stale_doc) | |
| game = await steam_svc.get_game_info(app_id) | |
| if not game: | |
| cached_game = stale_doc.get("results", {}).get("game") | |
| if isinstance(cached_game, dict): | |
| game = GameInfo(**cached_game) | |
| else: | |
| game = GameInfo(app_id=app_id, name=game_name) | |
| patch_date = await mongodb.get_game_patch_date(app_id) | |
| patch_timestamp = int(patch_date.timestamp()) if patch_date else None | |
| if patch_timestamp: | |
| game = game.model_copy(update={"last_game_update_at": patch_timestamp}) | |
| final_payload: dict[str, Any] | None = None | |
| async for event in iter_incremental_analysis_events( | |
| game, | |
| stale_doc, | |
| steam_svc, | |
| nlp_svc, | |
| patch_timestamp=patch_timestamp, | |
| source="worker", | |
| ): | |
| if event.get("event") == "complete": | |
| final_payload = json.loads(event["data"]) | |
| return final_payload | |
| except Exception as e: | |
| logger.error(f"Incremental analysis runner error for {app_id} ({game_name}): {e}", exc_info=True) | |
| if slog: | |
| log_structured( | |
| "analysis_error", | |
| level=logging.ERROR, | |
| app_id=app_id, | |
| game_name=game_name, | |
| source="worker", | |
| error=str(e), | |
| ) | |
| return None | |
| async def run_full_analysis( | |
| app_id: str, | |
| game_name: str, | |
| steam_svc: SteamService, | |
| nlp_svc: NLPService, | |
| stale_doc: dict[str, Any] | None = None, | |
| ) -> dict[str, Any] | None: | |
| """ | |
| Run a full analysis for a game (no SSE, no streaming). | |
| Returns: | |
| Analysis result dict, or None on error. | |
| """ | |
| slog = get_structured_logger() | |
| try: | |
| # Phase 1: Setup — game info + review stats + sample plan | |
| async with AsyncTimingContext() as t_setup: | |
| # 1. Get game info | |
| game = await steam_svc.get_game_info(app_id) | |
| if not game: | |
| logger.warning(f"Analysis runner: game info not found for {app_id}") | |
| return None | |
| # 2. Get review stats | |
| stats = await steam_svc.get_review_stats(app_id) | |
| if stats.total == 0: | |
| logger.warning(f"Analysis runner: no reviews for {app_id}") | |
| return None | |
| # 3. Create sample plan | |
| sample_plan = create_sample_plan(stats.total, stats.positive, stats.negative) | |
| ttl_hours = await get_ttl_hours(app_id) | |
| # 3b. Fetch game patch date for current_patch splitting | |
| patch_date = await mongodb.get_game_patch_date(app_id) | |
| patch_timestamp = int(patch_date.timestamp()) if patch_date else None | |
| if patch_timestamp and isinstance(game, GameInfo): | |
| game = game.model_copy(update={"last_game_update_at": patch_timestamp}) | |
| # Phase 2: Fetch + Analyze — producer-consumer loop | |
| nlp_cumulative_s: float = 0.0 | |
| async with AsyncTimingContext() as t_fetch_analyze: | |
| # 4. Producer-consumer fetch + analyze | |
| queue: asyncio.Queue = asyncio.Queue(maxsize=5) | |
| async def fetch_worker(): | |
| try: | |
| async for batch in steam_svc.fetch_reviews_stratified(app_id, sample_plan): | |
| await queue.put(batch) | |
| except Exception as e: | |
| await queue.put(e) | |
| finally: | |
| await queue.put(None) | |
| fetch_task = asyncio.create_task(fetch_worker()) | |
| processed = 0 | |
| total_skipped = 0 | |
| aggregated_topics: list[TopicSentiment] = [] | |
| recent_processed = 0 | |
| recent_limit = settings.recent_sample_limit | |
| all_review_ids: list[str] = [] | |
| latest_timestamp = 0 | |
| highlights_collector = HighlightsCollector() | |
| current_patch_topics: list[TopicSentiment] = [] | |
| current_patch_count = 0 | |
| review_topic_results: list[tuple[int, list[TopicSentiment]]] = [] | |
| try: | |
| while True: | |
| item = await queue.get() | |
| if item is None: | |
| break | |
| if isinstance(item, Exception): | |
| raise item | |
| batch = item | |
| if not batch.reviews: | |
| continue | |
| for ri in batch.review_items: | |
| all_review_ids.append(ri.recommendation_id) | |
| if ri.timestamp_created > latest_timestamp: | |
| latest_timestamp = ri.timestamp_created | |
| batch_skipped = 0 | |
| if patch_timestamp and batch.review_items: | |
| for ri, text in zip(batch.review_items, batch.reviews): | |
| is_recent = recent_processed < recent_limit | |
| cat = [] | |
| if is_recent: | |
| cat.append("recent") | |
| if ri.timestamp_created >= patch_timestamp: | |
| cat.append("current_patch") | |
| nlp_start = time.monotonic() | |
| res, skipped = await nlp_svc.analyze_batch( | |
| [text], highlights_collector=highlights_collector, categories=cat | |
| ) | |
| nlp_cumulative_s += time.monotonic() - nlp_start | |
| batch_skipped += skipped | |
| if res: | |
| aggregated_topics = aggregate_topics(aggregated_topics, res) | |
| current_patch_topics = aggregate_topics(current_patch_topics, res) | |
| review_topic_results.append((ri.timestamp_created, res)) | |
| current_patch_count += 1 | |
| else: | |
| nlp_start = time.monotonic() | |
| res, skipped = await nlp_svc.analyze_batch( | |
| [text], highlights_collector=highlights_collector, categories=cat | |
| ) | |
| nlp_cumulative_s += time.monotonic() - nlp_start | |
| batch_skipped += skipped | |
| if res: | |
| aggregated_topics = aggregate_topics(aggregated_topics, res) | |
| review_topic_results.append((ri.timestamp_created, res)) | |
| recent_processed += 1 | |
| else: | |
| for ri, text in zip(batch.review_items, batch.reviews) if batch.review_items else enumerate(batch.reviews): | |
| is_recent = recent_processed < recent_limit | |
| cat = ["recent"] if is_recent else [] | |
| nlp_start = time.monotonic() | |
| res, skipped = await nlp_svc.analyze_batch( | |
| [text], highlights_collector=highlights_collector, categories=cat | |
| ) | |
| nlp_cumulative_s += time.monotonic() - nlp_start | |
| batch_skipped += skipped | |
| ts = ri.timestamp_created if batch.review_items else 0 | |
| if res: | |
| aggregated_topics = aggregate_topics(aggregated_topics, res) | |
| review_topic_results.append((ts, res)) | |
| recent_processed += 1 | |
| total_skipped += batch_skipped | |
| processed += len(batch.reviews) | |
| await fetch_task | |
| except BaseException: | |
| fetch_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await fetch_task | |
| raise | |
| # Phase 3: Save — highlights + MongoDB save | |
| async with AsyncTimingContext() as t_save: | |
| # 5. Compute prediction + highlights | |
| # Build recent_topics from highest-timestamp reviews | |
| review_topic_results.sort(key=lambda x: x[0], reverse=True) | |
| recent_entries = review_topic_results[:recent_limit] | |
| recent_topics: list[TopicSentiment] = [] | |
| for _, topics_batch in recent_entries: | |
| for ts in topics_batch: | |
| recent_topics = aggregate_topics(recent_topics, [ts]) | |
| recent_reviews_count = len(recent_entries) | |
| # Apply min-mentions filter on final aggregates (not per-review — see nlp_service.py). | |
| aggregated_topics = filter_topics_by_min_mentions(aggregated_topics) | |
| recent_topics = filter_topics_by_min_mentions(recent_topics) | |
| current_patch_topics = filter_topics_by_min_mentions(current_patch_topics) | |
| prediction = calculate_prediction(aggregated_topics) | |
| highlights_data = highlights_collector.compute_highlights() | |
| general_highlights = highlights_data["general"] | |
| recent_highlights = highlights_data["recent"] | |
| current_patch_highlights = highlights_data["current_patch"] | |
| topic_highlights_dict = highlights_data["topics"] | |
| # Restrict topic highlights to topics that survived the min-mentions filter, | |
| # so the topic_highlights set is always consistent with general_topics. | |
| _surviving_topics = {t.topic for t in aggregated_topics} | |
| topic_highlights_list = [ | |
| TopicHighlights( | |
| topic=topic, | |
| highlights=[Highlight(**h) for h in highlights], | |
| ) | |
| for topic, highlights in topic_highlights_dict.items() | |
| if topic in _surviving_topics | |
| ] | |
| has_recent_split = processed > recent_limit | |
| has_current_patch = patch_timestamp is not None and current_patch_count > 0 | |
| analysis_generated_at = datetime.now(timezone.utc) | |
| current_patch_date = ( | |
| datetime.fromtimestamp(patch_timestamp, tz=timezone.utc) | |
| if patch_timestamp is not None | |
| else None | |
| ) | |
| # Archive last_patch_topics when full analysis replaces a doc with a different patch. | |
| last_patch_topics: list[TopicSentiment] | None = None | |
| last_patch_reviews_count = 0 | |
| if stale_doc: | |
| old_r = normalize_legacy_results(stale_doc.get("results", {})) | |
| old_patch_ts = old_r.get("current_patch_timestamp") | |
| if patch_timestamp and old_patch_ts and patch_timestamp != old_patch_ts: | |
| raw_cp = old_r.get("current_patch_topics") | |
| last_patch_topics = [TopicSentiment(**t) for t in raw_cp] if raw_cp else None | |
| last_patch_reviews_count = old_r.get("current_patch_reviews_count", 0) | |
| else: | |
| raw_lp = old_r.get("last_patch_topics") | |
| last_patch_topics = [TopicSentiment(**t) for t in raw_lp] if raw_lp else None | |
| last_patch_reviews_count = old_r.get("last_patch_reviews_count", 0) | |
| result = AnalysisResult( | |
| game=game, | |
| general_topics=aggregated_topics, | |
| recent_topics=recent_topics if has_recent_split else None, | |
| recent_reviews_count=recent_reviews_count if has_recent_split else 0, | |
| current_patch_topics=current_patch_topics if has_current_patch else None, | |
| current_patch_reviews_count=current_patch_count if has_current_patch else 0, | |
| last_patch_topics=last_patch_topics, | |
| last_patch_reviews_count=last_patch_reviews_count, | |
| current_patch_timestamp=patch_timestamp, | |
| analysis_date=analysis_generated_at, | |
| current_patch_date=current_patch_date, | |
| prediction=prediction, | |
| analyzed_reviews=processed, | |
| skipped_count=total_skipped, | |
| general_highlights=[Highlight(**h) for h in general_highlights], | |
| recent_highlights=[Highlight(**h) for h in recent_highlights] if recent_highlights else None, | |
| current_patch_highlights=[Highlight(**h) for h in current_patch_highlights] if current_patch_highlights else None, | |
| topic_highlights=topic_highlights_list, | |
| cached_at=analysis_generated_at, | |
| preferred_context=compute_preferred_context(patch_timestamp), | |
| freshness_status=FreshnessStatus.FRESH.value, | |
| is_refreshing=False, | |
| ) | |
| # 6. Save to cache | |
| await mongodb.save_analysis( | |
| game.app_id, | |
| result.model_dump(), | |
| analyzed_review_ids=all_review_ids, | |
| latest_review_timestamp=latest_timestamp, | |
| ttl_hours=ttl_hours, | |
| analyzed_at=analysis_generated_at, | |
| ) | |
| total_elapsed = t_setup.elapsed_s + t_fetch_analyze.elapsed_s + t_save.elapsed_s | |
| logger.info( | |
| f"Analysis runner: completed {app_id} ({game_name}) — " | |
| f"{processed} reviews, {len(aggregated_topics)} topics" | |
| ) | |
| if slog: | |
| log_structured( | |
| "analysis_complete", | |
| app_id=app_id, | |
| game_name=game_name, | |
| elapsed_s=round(total_elapsed, 3), | |
| source="worker", | |
| breakdown={ | |
| "setup_s": t_setup.elapsed_s, | |
| "fetch_analyze_s": t_fetch_analyze.elapsed_s, | |
| "nlp_cumulative_s": round(nlp_cumulative_s, 3), | |
| "save_s": t_save.elapsed_s, | |
| }, | |
| reviews_processed=processed, | |
| topics_found=len(aggregated_topics), | |
| ) | |
| return result.model_dump() | |
| except Exception as e: | |
| logger.error(f"Analysis runner error for {app_id} ({game_name}): {e}", exc_info=True) | |
| if slog: | |
| log_structured( | |
| "analysis_error", | |
| level=logging.ERROR, | |
| app_id=app_id, | |
| game_name=game_name, | |
| source="worker", | |
| error=str(e), | |
| ) | |
| return None | |