Spaces:
Running
Running
| """ | |
| Analytics Service | |
| Computes insights from preprocessed session data | |
| """ | |
| from collections import Counter | |
| from typing import Optional | |
| import json | |
| from pathlib import Path | |
| def load_session_events(token: str) -> tuple[list[dict], dict]: | |
| """Load events and stats from a preprocessed session.""" | |
| storage_dir = Path(__file__).parent.parent / "storage" | |
| session_path = storage_dir / f"preprocessed_{token}.json" | |
| if not session_path.exists(): | |
| return [], {} | |
| with open(session_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data.get("events", []), data.get("stats", {}) | |
| def get_session_summary(token: str) -> dict: | |
| """ | |
| Get overall session summary. | |
| Returns: | |
| - total_events, total_watch, total_search, total_subscribe | |
| - unique_channels | |
| - date_range (first and last timestamp) | |
| - language_breakdown | |
| """ | |
| events, stats = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| # Get unique channels from watch events | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| channels = set(e.get("channel_clean") for e in watch_events if e.get("channel_clean")) | |
| # Get date range | |
| timestamps = [e.get("timestamp_utc") for e in events if e.get("timestamp_utc")] | |
| timestamps.sort() | |
| return { | |
| "total_events": stats.get("total_events", len(events)), | |
| "total_watch": stats.get("total_watch", len(watch_events)), | |
| "total_search": stats.get("total_search", 0), | |
| "total_subscribe": stats.get("total_subscribe", 0), | |
| "unique_channels": len(channels), | |
| "date_range": { | |
| "first": timestamps[0] if timestamps else None, | |
| "last": timestamps[-1] if timestamps else None | |
| }, | |
| "language_breakdown": stats.get("language_breakdown", {}) | |
| } | |
| def get_channel_analytics(token: str, top_n: int = 20, engagement_filter: str = "all") -> dict: | |
| """ | |
| Get channel analytics. | |
| Args: | |
| token: Session token | |
| top_n: Number of top channels to return | |
| engagement_filter: "all" | "watch" (active) | "view" (passive) | |
| Returns: | |
| - top_channels: List of (channel, count) tuples | |
| - total_unique_channels | |
| - channel_distribution: Percentage breakdown | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| # Get all watch-type events | |
| all_watch_events = [e for e in events if e.get("type") == "watch"] | |
| # Filter by engagement if specified | |
| if engagement_filter == "watch": | |
| # Watch = active engagement | |
| filtered_events = [e for e in all_watch_events if e.get("engagement") == "active"] | |
| elif engagement_filter == "view": | |
| # View = passive engagement | |
| filtered_events = [e for e in all_watch_events if e.get("engagement") == "passive"] | |
| else: | |
| # All watch events | |
| filtered_events = all_watch_events | |
| channel_counts = Counter( | |
| e.get("channel_clean") for e in filtered_events | |
| if e.get("channel_clean") | |
| ) | |
| # Get original channel names for display | |
| channel_display_names = {} | |
| for e in all_watch_events: | |
| clean = e.get("channel_clean") | |
| original = e.get("channel") | |
| if clean and original and clean not in channel_display_names: | |
| channel_display_names[clean] = original | |
| total_count = sum(channel_counts.values()) | |
| top_channels = channel_counts.most_common(top_n) | |
| # Build result with display names and percentages | |
| top_channels_result = [] | |
| for channel_clean, count in top_channels: | |
| display_name = channel_display_names.get(channel_clean, channel_clean) | |
| percentage = round((count / total_count) * 100, 2) if total_count > 0 else 0 | |
| top_channels_result.append({ | |
| "channel": display_name, | |
| "channel_clean": channel_clean, | |
| "count": count, | |
| "percentage": percentage | |
| }) | |
| # Build view count distribution histogram | |
| # Buckets: 1, 2-5, 6-10, 11-20, 21-50, 51-100, 100+ | |
| view_buckets = { | |
| "1": 0, | |
| "2-5": 0, | |
| "6-10": 0, | |
| "11-20": 0, | |
| "21-50": 0, | |
| "51-100": 0, | |
| "100+": 0 | |
| } | |
| for count in channel_counts.values(): | |
| if count == 1: | |
| view_buckets["1"] += 1 | |
| elif count <= 5: | |
| view_buckets["2-5"] += 1 | |
| elif count <= 10: | |
| view_buckets["6-10"] += 1 | |
| elif count <= 20: | |
| view_buckets["11-20"] += 1 | |
| elif count <= 50: | |
| view_buckets["21-50"] += 1 | |
| elif count <= 100: | |
| view_buckets["51-100"] += 1 | |
| else: | |
| view_buckets["100+"] += 1 | |
| # Convert to list format for frontend | |
| view_distribution = [ | |
| {"bucket": bucket, "count": count} | |
| for bucket, count in view_buckets.items() | |
| ] | |
| # Also compute summary stats for both types | |
| active_count = len([e for e in all_watch_events if e.get("engagement") == "active"]) | |
| passive_count = len([e for e in all_watch_events if e.get("engagement") == "passive"]) | |
| return { | |
| "total_unique_channels": len(channel_counts), | |
| "total_count": total_count, | |
| "engagement_filter": engagement_filter, | |
| "top_channels": top_channels_result, | |
| "other_count": total_count - sum(c["count"] for c in top_channels_result), | |
| "view_distribution": view_distribution, | |
| "engagement_summary": { | |
| "total_watch": active_count, | |
| "total_view": passive_count, | |
| "total_all": len(all_watch_events) | |
| } | |
| } | |
| def get_watch_patterns(token: str) -> dict: | |
| """ | |
| Get watch time patterns. | |
| Returns: | |
| - hourly_distribution: Watches per hour (0-23) | |
| - daily_distribution: Watches per day of week (0-6) | |
| - peak_hour, peak_day | |
| - weekly_peak_days: For each week, which day had most watches | |
| - time_intervals: Grouped by time periods (morning, afternoon, etc.) | |
| - circular_activity: Average watches per hour for radial chart | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| # Count by hour | |
| hourly = Counter(e.get("hour_local") for e in watch_events if e.get("hour_local") is not None) | |
| # Count by day of week | |
| daily = Counter(e.get("day_of_week") for e in watch_events if e.get("day_of_week") is not None) | |
| # Build full distributions (fill missing with 0) | |
| hourly_dist = [{"hour": h, "count": hourly.get(h, 0)} for h in range(24)] | |
| daily_dist = [{"day": d, "count": daily.get(d, 0)} for d in range(7)] | |
| # Find peaks | |
| peak_hour = max(hourly_dist, key=lambda x: x["count"]) if hourly_dist else None | |
| peak_day = max(daily_dist, key=lambda x: x["count"]) if daily_dist else None | |
| day_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] | |
| # === Weekly Peak Day Analysis === | |
| # Group events by week (using ISO week number) and find peak day for each week | |
| from datetime import datetime | |
| weekly_data = {} # {(year, week): {day_of_week: count}} | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") | |
| dow = e.get("day_of_week") | |
| if ts and dow is not None: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| year, week, _ = dt.isocalendar() | |
| key = (year, week) | |
| if key not in weekly_data: | |
| weekly_data[key] = Counter() | |
| weekly_data[key][dow] += 1 | |
| except: | |
| pass | |
| # Find peak day for each week | |
| weekly_peak_days = [] | |
| peak_day_counter = Counter() # Count how many weeks each day "wins" | |
| for (year, week), day_counts in sorted(weekly_data.items()): | |
| if day_counts: | |
| peak_dow = max(day_counts.keys(), key=lambda d: day_counts[d]) | |
| peak_count = day_counts[peak_dow] | |
| weekly_peak_days.append({ | |
| "year": year, | |
| "week": week, | |
| "peak_day": day_names[peak_dow], | |
| "peak_day_num": peak_dow, | |
| "count": peak_count | |
| }) | |
| peak_day_counter[peak_dow] += 1 | |
| # Overall winner: which day wins the most weeks | |
| overall_peak_day = None | |
| overall_peak_wins = 0 | |
| if peak_day_counter: | |
| winner_dow = max(peak_day_counter.keys(), key=lambda d: peak_day_counter[d]) | |
| overall_peak_day = day_names[winner_dow] | |
| overall_peak_wins = peak_day_counter[winner_dow] | |
| # === Time Intervals === | |
| # Group hours into intervals | |
| intervals = { | |
| "Night (12AM-6AM)": range(0, 6), | |
| "Morning (6AM-12PM)": range(6, 12), | |
| "Afternoon (12PM-6PM)": range(12, 18), | |
| "Evening (6PM-12AM)": range(18, 24) | |
| } | |
| interval_counts = {} | |
| for name, hour_range in intervals.items(): | |
| count = sum(hourly.get(h, 0) for h in hour_range) | |
| interval_counts[name] = count | |
| time_intervals = [ | |
| {"interval": name, "count": count, "hours": f"{list(hours)[0]}-{list(hours)[-1]+1}"} | |
| for name, hours in intervals.items() | |
| for count in [interval_counts[name]] | |
| ] | |
| peak_interval = max(time_intervals, key=lambda x: x["count"]) if time_intervals else None | |
| # === Circular Activity (Average per hour) === | |
| # Calculate average watches per hour across all days in the dataset | |
| total_days = len(weekly_data) * 7 if weekly_data else 1 # Approximate | |
| circular_activity = [] | |
| total_watches = sum(hourly.values()) | |
| for h in range(24): | |
| count = hourly.get(h, 0) | |
| # Percentage of total watches | |
| percentage = round((count / total_watches) * 100, 2) if total_watches > 0 else 0 | |
| # Format hour label | |
| if h == 0: | |
| label = "12 AM" | |
| elif h < 12: | |
| label = f"{h} AM" | |
| elif h == 12: | |
| label = "12 PM" | |
| else: | |
| label = f"{h-12} PM" | |
| circular_activity.append({ | |
| "hour": h, | |
| "label": label, | |
| "count": count, | |
| "percentage": percentage | |
| }) | |
| return { | |
| "hourly_distribution": hourly_dist, | |
| "daily_distribution": daily_dist, | |
| "peak_hour": peak_hour["hour"] if peak_hour else None, | |
| "peak_hour_count": peak_hour["count"] if peak_hour else 0, | |
| "peak_day": day_names[peak_day["day"]] if peak_day and peak_day["day"] is not None else None, | |
| "peak_day_count": peak_day["count"] if peak_day else 0, | |
| # New fields | |
| "weekly_peak_days": weekly_peak_days[-12:], # Last 12 weeks | |
| "overall_peak_day": overall_peak_day, | |
| "overall_peak_wins": overall_peak_wins, | |
| "total_weeks": len(weekly_data), | |
| "time_intervals": time_intervals, | |
| "peak_interval": peak_interval["interval"] if peak_interval else None, | |
| "circular_activity": circular_activity | |
| } | |
| def get_temporal_trends(token: str) -> dict: | |
| """ | |
| Analyze how watching patterns change month-to-month. | |
| Returns: | |
| - monthly_stats: Watch count, peak hour, peak day for each month | |
| - peak_hour_trend: How peak hour shifts over months | |
| - peak_day_trend: How peak day shifts over months | |
| - activity_trend: Total watches per month | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| if not watch_events: | |
| return { | |
| "monthly_stats": [], | |
| "peak_hour_trend": [], | |
| "peak_day_trend": [], | |
| "activity_trend": [], | |
| "summary": "No watch events found" | |
| } | |
| from collections import defaultdict | |
| from datetime import datetime | |
| # Group by month | |
| monthly_data = defaultdict(lambda: { | |
| "watches": 0, | |
| "hourly": Counter(), | |
| "daily": Counter() | |
| }) | |
| day_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] | |
| for event in watch_events: | |
| ts = event.get("timestamp_local") or event.get("timestamp_utc") | |
| hour = event.get("hour_local") | |
| dow = event.get("day_of_week") | |
| if not ts: | |
| continue | |
| try: | |
| # Parse year-month | |
| if "T" in ts: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| else: | |
| dt = datetime.strptime(ts[:10], "%Y-%m-%d") | |
| month_key = f"{dt.year}-{dt.month:02d}" | |
| monthly_data[month_key]["watches"] += 1 | |
| if hour is not None: | |
| monthly_data[month_key]["hourly"][hour] += 1 | |
| if dow is not None: | |
| monthly_data[month_key]["daily"][dow] += 1 | |
| except: | |
| continue | |
| # Build monthly stats | |
| monthly_stats = [] | |
| peak_hour_trend = [] | |
| peak_day_trend = [] | |
| activity_trend = [] | |
| for month_key in sorted(monthly_data.keys()): | |
| data = monthly_data[month_key] | |
| # Find peak hour | |
| peak_hour = None | |
| peak_hour_count = 0 | |
| if data["hourly"]: | |
| peak_hour = max(data["hourly"].keys(), key=lambda h: data["hourly"][h]) | |
| peak_hour_count = data["hourly"][peak_hour] | |
| # Find peak day | |
| peak_day = None | |
| peak_day_name = None | |
| peak_day_count = 0 | |
| if data["daily"]: | |
| peak_day = max(data["daily"].keys(), key=lambda d: data["daily"][d]) | |
| peak_day_name = day_names[peak_day] | |
| peak_day_count = data["daily"][peak_day] | |
| # Format peak hour label | |
| if peak_hour is not None: | |
| if peak_hour == 0: | |
| peak_hour_label = "12 AM" | |
| elif peak_hour < 12: | |
| peak_hour_label = f"{peak_hour} AM" | |
| elif peak_hour == 12: | |
| peak_hour_label = "12 PM" | |
| else: | |
| peak_hour_label = f"{peak_hour - 12} PM" | |
| else: | |
| peak_hour_label = "N/A" | |
| # Month name | |
| year, month = month_key.split("-") | |
| month_names = ["", "Jan", "Feb", "Mar", "Apr", "May", "Jun", | |
| "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] | |
| month_label = f"{month_names[int(month)]} {year}" | |
| monthly_stats.append({ | |
| "month": month_key, | |
| "month_label": month_label, | |
| "total_watches": data["watches"], | |
| "peak_hour": peak_hour, | |
| "peak_hour_label": peak_hour_label, | |
| "peak_hour_count": peak_hour_count, | |
| "peak_day": peak_day, | |
| "peak_day_name": peak_day_name, | |
| "peak_day_count": peak_day_count | |
| }) | |
| # Trend data for charts | |
| peak_hour_trend.append({ | |
| "month": month_key, | |
| "month_label": month_label, | |
| "peak_hour": peak_hour, | |
| "peak_hour_label": peak_hour_label | |
| }) | |
| peak_day_trend.append({ | |
| "month": month_key, | |
| "month_label": month_label, | |
| "peak_day": peak_day, | |
| "peak_day_name": peak_day_name | |
| }) | |
| activity_trend.append({ | |
| "month": month_key, | |
| "month_label": month_label, | |
| "watches": data["watches"] | |
| }) | |
| # Detect significant shifts | |
| shifts = [] | |
| for i in range(1, len(monthly_stats)): | |
| prev = monthly_stats[i-1] | |
| curr = monthly_stats[i] | |
| # Hour shift | |
| if prev["peak_hour"] is not None and curr["peak_hour"] is not None: | |
| hour_diff = abs(curr["peak_hour"] - prev["peak_hour"]) | |
| if hour_diff >= 4: # Significant if 4+ hours shift | |
| shifts.append({ | |
| "type": "peak_hour", | |
| "from_month": prev["month_label"], | |
| "to_month": curr["month_label"], | |
| "from_value": prev["peak_hour_label"], | |
| "to_value": curr["peak_hour_label"], | |
| "description": f"Peak hour shifted from {prev['peak_hour_label']} to {curr['peak_hour_label']}" | |
| }) | |
| # Day shift | |
| if prev["peak_day"] is not None and curr["peak_day"] is not None: | |
| if prev["peak_day"] != curr["peak_day"]: | |
| # Check if weekday<->weekend shift | |
| prev_weekend = prev["peak_day"] >= 5 | |
| curr_weekend = curr["peak_day"] >= 5 | |
| if prev_weekend != curr_weekend: | |
| shifts.append({ | |
| "type": "peak_day", | |
| "from_month": prev["month_label"], | |
| "to_month": curr["month_label"], | |
| "from_value": prev["peak_day_name"], | |
| "to_value": curr["peak_day_name"], | |
| "description": f"Shifted from {'weekend' if prev_weekend else 'weekday'} to {'weekend' if curr_weekend else 'weekday'}" | |
| }) | |
| # Generate summary | |
| if monthly_stats: | |
| first = monthly_stats[0] | |
| last = monthly_stats[-1] | |
| summary = f"Tracked {len(monthly_stats)} months from {first['month_label']} to {last['month_label']}" | |
| if shifts: | |
| summary += f" | {len(shifts)} significant pattern shifts detected" | |
| else: | |
| summary = "No monthly data available" | |
| return { | |
| "monthly_stats": monthly_stats, | |
| "peak_hour_trend": peak_hour_trend, | |
| "peak_day_trend": peak_day_trend, | |
| "activity_trend": activity_trend, | |
| "pattern_shifts": shifts[:10], # Top 10 shifts | |
| "total_months": len(monthly_stats), | |
| "summary": summary | |
| } | |
| def get_search_analytics(token: str, top_n: int = 20) -> dict: | |
| """ | |
| Get search analytics. | |
| Returns: | |
| - total_searches | |
| - top_search_terms | |
| - language_breakdown for searches | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| search_events = [e for e in events if e.get("type") == "search"] | |
| # Count search terms | |
| search_counts = Counter( | |
| e.get("text_clean") for e in search_events | |
| if e.get("text_clean") | |
| ) | |
| # Get raw terms for display | |
| term_display = {} | |
| for e in search_events: | |
| clean = e.get("text_clean") | |
| raw = e.get("text_raw") | |
| if clean and raw and clean not in term_display: | |
| term_display[clean] = raw | |
| top_searches = [] | |
| for term_clean, count in search_counts.most_common(top_n): | |
| display = term_display.get(term_clean, term_clean) | |
| top_searches.append({ | |
| "term": display, | |
| "term_clean": term_clean, | |
| "count": count | |
| }) | |
| # Language breakdown for searches | |
| lang_counts = Counter(e.get("language_type") for e in search_events if e.get("language_type")) | |
| return { | |
| "total_searches": len(search_events), | |
| "unique_searches": len(search_counts), | |
| "top_searches": top_searches, | |
| "language_breakdown": dict(lang_counts) | |
| } | |
| def get_subscription_overlap(token: str) -> dict: | |
| """ | |
| Analyze overlap between subscriptions and watch history. | |
| Returns: | |
| - total_subscriptions | |
| - subscribed_and_watched: Channels you're subscribed to AND watched | |
| - watched_not_subscribed: Channels you watch but aren't subscribed to | |
| - subscribed_not_watched: Channels you're subscribed to but haven't watched | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| # Get subscribed channels (lowercase) | |
| subscribe_events = [e for e in events if e.get("type") == "subscribe"] | |
| subscribed = set(e.get("channel_clean") for e in subscribe_events if e.get("channel_clean")) | |
| # Get watched channels (lowercase) | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| watched = set(e.get("channel_clean") for e in watch_events if e.get("channel_clean")) | |
| # Calculate overlaps | |
| subscribed_and_watched = subscribed & watched | |
| watched_not_subscribed = watched - subscribed | |
| subscribed_not_watched = subscribed - watched | |
| # Get display names | |
| channel_display = {} | |
| for e in events: | |
| clean = e.get("channel_clean") | |
| original = e.get("channel") | |
| if clean and original: | |
| channel_display[clean] = original | |
| return { | |
| "total_subscriptions": len(subscribed), | |
| "total_watched_channels": len(watched), | |
| "subscribed_and_watched": { | |
| "count": len(subscribed_and_watched), | |
| "percentage": round(len(subscribed_and_watched) / len(subscribed) * 100, 1) if subscribed else 0, | |
| "channels": [channel_display.get(c, c) for c in list(subscribed_and_watched)[:20]] | |
| }, | |
| "watched_not_subscribed": { | |
| "count": len(watched_not_subscribed), | |
| "channels": [channel_display.get(c, c) for c in list(watched_not_subscribed)[:20]] | |
| }, | |
| "subscribed_not_watched": { | |
| "count": len(subscribed_not_watched), | |
| "channels": [channel_display.get(c, c) for c in list(subscribed_not_watched)[:20]] | |
| } | |
| } | |
| def get_behavior_anomalies(token: str) -> dict: | |
| """ | |
| Detect deviations from normal watching patterns. | |
| Identifies: | |
| - Late night sessions (watching after midnight when normally don't) | |
| - Binge periods (unusually high watch counts) | |
| - Off-peak hour activity | |
| - Weekly pattern changes | |
| """ | |
| from datetime import datetime | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| if not watch_events: | |
| return {"anomalies": [], "late_night_sessions": [], "binge_days": []} | |
| # === Calculate baseline patterns === | |
| # Average watches per hour across all data | |
| hourly_counts = Counter(e.get("hour_local") for e in watch_events if e.get("hour_local") is not None) | |
| total_watches = sum(hourly_counts.values()) | |
| # Define "late night" as 12AM-5AM (hours 0-4) | |
| late_night_hours = {0, 1, 2, 3, 4} | |
| late_night_baseline = sum(hourly_counts.get(h, 0) for h in late_night_hours) | |
| late_night_percentage = (late_night_baseline / total_watches * 100) if total_watches > 0 else 0 | |
| # === Group by date === | |
| daily_data = {} # {date_str: {hour: count, total: count}} | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") | |
| hour = e.get("hour_local") | |
| if ts and hour is not None: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| date_str = dt.strftime("%Y-%m-%d") | |
| if date_str not in daily_data: | |
| daily_data[date_str] = {"hours": Counter(), "total": 0} | |
| daily_data[date_str]["hours"][hour] += 1 | |
| daily_data[date_str]["total"] += 1 | |
| except: | |
| pass | |
| # Calculate average daily watches | |
| if daily_data: | |
| avg_daily_watches = sum(d["total"] for d in daily_data.values()) / len(daily_data) | |
| std_dev = (sum((d["total"] - avg_daily_watches) ** 2 for d in daily_data.values()) / len(daily_data)) ** 0.5 | |
| else: | |
| avg_daily_watches = 0 | |
| std_dev = 0 | |
| # === Detect Late Night Sessions === | |
| # Days where user watched significantly in late night hours (> usual) | |
| late_night_sessions = [] | |
| for date_str, data in sorted(daily_data.items()): | |
| late_night_count = sum(data["hours"].get(h, 0) for h in late_night_hours) | |
| if late_night_count >= 3: # At least 3 videos in late night | |
| late_night_sessions.append({ | |
| "date": date_str, | |
| "late_night_count": late_night_count, | |
| "total_count": data["total"], | |
| "peak_hour": max(data["hours"].keys(), key=lambda h: data["hours"][h]) if data["hours"] else None | |
| }) | |
| # === Detect Binge Days === | |
| # Days with watch count > mean + 2*std_dev | |
| binge_threshold = avg_daily_watches + 2 * std_dev if std_dev > 0 else avg_daily_watches * 2 | |
| binge_days = [] | |
| for date_str, data in sorted(daily_data.items()): | |
| if data["total"] > binge_threshold and data["total"] >= 10: # At least 10 videos | |
| binge_days.append({ | |
| "date": date_str, | |
| "count": data["total"], | |
| "above_average_by": round(data["total"] - avg_daily_watches, 1), | |
| "multiplier": round(data["total"] / avg_daily_watches, 2) if avg_daily_watches > 0 else 0 | |
| }) | |
| # === Detect Weekly Pattern Shifts === | |
| # Group by week and detect if hourly pattern changed significantly | |
| weekly_patterns = {} # {(year, week): Counter of hours} | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") | |
| hour = e.get("hour_local") | |
| if ts and hour is not None: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| year, week, _ = dt.isocalendar() | |
| key = (year, week) | |
| if key not in weekly_patterns: | |
| weekly_patterns[key] = Counter() | |
| weekly_patterns[key][hour] += 1 | |
| except: | |
| pass | |
| # Find weeks with unusual late-night activity | |
| unusual_weeks = [] | |
| for (year, week), hour_counts in sorted(weekly_patterns.items()): | |
| week_total = sum(hour_counts.values()) | |
| week_late_night = sum(hour_counts.get(h, 0) for h in late_night_hours) | |
| week_late_pct = (week_late_night / week_total * 100) if week_total > 0 else 0 | |
| # If this week's late night % is significantly higher than baseline | |
| if week_late_pct > late_night_percentage * 1.5 and week_late_night >= 5: | |
| unusual_weeks.append({ | |
| "year": year, | |
| "week": week, | |
| "late_night_count": week_late_night, | |
| "late_night_percentage": round(week_late_pct, 1), | |
| "baseline_percentage": round(late_night_percentage, 1), | |
| "total_watches": week_total | |
| }) | |
| # === Detect Streaks (Consecutive Days) === | |
| from datetime import timedelta | |
| def find_streaks(date_list, max_gap=1): | |
| """Find streaks of dates with at most max_gap days between them.""" | |
| if not date_list: | |
| return [] | |
| sorted_dates = sorted([datetime.strptime(d, "%Y-%m-%d") for d in date_list]) | |
| streaks = [] | |
| current_streak = [sorted_dates[0]] | |
| for i in range(1, len(sorted_dates)): | |
| gap = (sorted_dates[i] - sorted_dates[i-1]).days | |
| if gap <= max_gap + 1: # Allow gap of max_gap days | |
| current_streak.append(sorted_dates[i]) | |
| else: | |
| if len(current_streak) >= 2: | |
| streaks.append(current_streak) | |
| current_streak = [sorted_dates[i]] | |
| if len(current_streak) >= 2: | |
| streaks.append(current_streak) | |
| return streaks | |
| # Find binge streaks (consecutive binge days, allowing 1 day gap) | |
| binge_dates = [d["date"] for d in binge_days] | |
| binge_streaks = find_streaks(binge_dates, max_gap=1) | |
| binge_watching_periods = [] | |
| for streak in binge_streaks: | |
| start = streak[0].strftime("%Y-%m-%d") | |
| end = streak[-1].strftime("%Y-%m-%d") | |
| days = (streak[-1] - streak[0]).days + 1 | |
| total_videos = sum(d["count"] for d in binge_days if d["date"] >= start and d["date"] <= end) | |
| binge_watching_periods.append({ | |
| "start_date": start, | |
| "end_date": end, | |
| "duration_days": days, | |
| "total_videos": total_videos, | |
| "avg_per_day": round(total_videos / len(streak), 1) | |
| }) | |
| # Find late night mood (consecutive late night sessions, allowing 1 day gap) | |
| late_night_dates = [s["date"] for s in late_night_sessions] | |
| late_night_streaks = find_streaks(late_night_dates, max_gap=1) | |
| late_night_moods = [] | |
| for streak in late_night_streaks: | |
| start = streak[0].strftime("%Y-%m-%d") | |
| end = streak[-1].strftime("%Y-%m-%d") | |
| days = (streak[-1] - streak[0]).days + 1 | |
| total_late = sum(s["late_night_count"] for s in late_night_sessions if s["date"] >= start and s["date"] <= end) | |
| late_night_moods.append({ | |
| "start_date": start, | |
| "end_date": end, | |
| "duration_days": days, | |
| "total_late_videos": total_late | |
| }) | |
| # === Additional Pattern Analysis === | |
| # 1. Weekend Warrior Detection | |
| weekend_watches = 0 | |
| weekday_watches = 0 | |
| for e in watch_events: | |
| dow = e.get("day_of_week") | |
| if dow is not None: | |
| if dow >= 5: # Saturday=5, Sunday=6 | |
| weekend_watches += 1 | |
| else: | |
| weekday_watches += 1 | |
| total_dow = weekend_watches + weekday_watches | |
| weekend_pct = (weekend_watches / total_dow * 100) if total_dow > 0 else 0 | |
| # If weekend (2 days) has > 35% of watches, that's weekend warrior territory | |
| is_weekend_warrior = weekend_pct > 35 | |
| # 2. Night Owl vs Morning Person | |
| night_hours = {20, 21, 22, 23, 0, 1, 2, 3, 4} # 8PM - 5AM | |
| morning_hours = {5, 6, 7, 8, 9, 10, 11} # 5AM - 12PM | |
| night_count = sum(hourly_counts.get(h, 0) for h in night_hours) | |
| morning_count = sum(hourly_counts.get(h, 0) for h in morning_hours) | |
| if night_count > morning_count * 1.5: | |
| chronotype = "Night Owl" | |
| elif morning_count > night_count * 1.5: | |
| chronotype = "Early Bird" | |
| else: | |
| chronotype = "Balanced" | |
| # 3. Inactive Periods (gaps of 3+ days with no watching) | |
| sorted_dates = sorted(daily_data.keys()) | |
| inactive_periods = [] | |
| for i in range(1, len(sorted_dates)): | |
| prev = datetime.strptime(sorted_dates[i-1], "%Y-%m-%d") | |
| curr = datetime.strptime(sorted_dates[i], "%Y-%m-%d") | |
| gap = (curr - prev).days | |
| if gap >= 4: # 3+ days gap | |
| inactive_periods.append({ | |
| "start": sorted_dates[i-1], | |
| "end": sorted_dates[i], | |
| "gap_days": gap - 1 | |
| }) | |
| # === Build anomaly summary === | |
| anomalies = [] | |
| # Add binge watching periods | |
| for period in sorted(binge_watching_periods, key=lambda x: x["duration_days"], reverse=True)[:3]: | |
| anomalies.append({ | |
| "type": "binge_streak", | |
| "date": f"{period['start_date']} to {period['end_date']}", | |
| "description": f"Binge watching period: {period['duration_days']} days, {period['total_videos']} videos", | |
| "severity": "high" if period["duration_days"] >= 5 else "medium" | |
| }) | |
| # Add late night moods | |
| for mood in sorted(late_night_moods, key=lambda x: x["duration_days"], reverse=True)[:3]: | |
| anomalies.append({ | |
| "type": "late_night_mood", | |
| "date": f"{mood['start_date']} to {mood['end_date']}", | |
| "description": f"Late night mood: {mood['duration_days']} consecutive nights", | |
| "severity": "high" if mood["duration_days"] >= 4 else "medium" | |
| }) | |
| # Add top single-day anomalies | |
| for session in sorted(late_night_sessions, key=lambda x: x["late_night_count"], reverse=True)[:3]: | |
| anomalies.append({ | |
| "type": "late_night", | |
| "date": session["date"], | |
| "description": f"Watched {session['late_night_count']} videos after midnight", | |
| "severity": "high" if session["late_night_count"] >= 10 else "medium" | |
| }) | |
| for day in sorted(binge_days, key=lambda x: x["count"], reverse=True)[:3]: | |
| anomalies.append({ | |
| "type": "binge", | |
| "date": day["date"], | |
| "description": f"Watched {day['count']} videos ({day['multiplier']}x above average)", | |
| "severity": "high" if day["multiplier"] >= 3 else "medium" | |
| }) | |
| return { | |
| "baseline": { | |
| "avg_daily_watches": round(avg_daily_watches, 1), | |
| "std_dev": round(std_dev, 1), | |
| "late_night_baseline_pct": round(late_night_percentage, 1), | |
| "total_days": len(daily_data) | |
| }, | |
| "anomalies": anomalies[:12], # Top 12 anomalies | |
| "late_night_sessions": late_night_sessions[-20:], | |
| "binge_days": binge_days[-20:], | |
| "unusual_weeks": unusual_weeks[-10:], | |
| # New streak data | |
| "binge_watching_periods": binge_watching_periods, | |
| "late_night_moods": late_night_moods, | |
| # Behavior patterns | |
| "patterns": { | |
| "weekend_warrior": is_weekend_warrior, | |
| "weekend_pct": round(weekend_pct, 1), | |
| "chronotype": chronotype, | |
| "night_watches": night_count, | |
| "morning_watches": morning_count, | |
| "inactive_periods": inactive_periods[-5:] # Last 5 | |
| } | |
| } | |
| def get_habit_formation(token: str, min_streak_days: int = 3) -> dict: | |
| """ | |
| Detect habit formation patterns. | |
| Identifies: | |
| - Channels watched daily for consecutive days | |
| - Videos watched multiple times on different days | |
| - Content patterns that indicate habitual watching | |
| Args: | |
| token: Session token | |
| min_streak_days: Minimum consecutive days to count as a habit (default: 3) | |
| Returns: | |
| - channel_habits: Channels with daily watching streaks | |
| - video_habits: Videos watched on multiple days | |
| - content_habits: Topics/keywords watched daily | |
| - habit_strength: Overall habit formation score | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| # Get watch events with dates | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| if not watch_events: | |
| return { | |
| "channel_habits": [], | |
| "video_habits": [], | |
| "content_habits": [], | |
| "habit_strength": 0, | |
| "summary": "No watch events found" | |
| } | |
| # Group events by date and channel | |
| from collections import defaultdict | |
| from datetime import datetime, timedelta | |
| channel_by_date = defaultdict(set) # date -> set of channels | |
| date_by_channel = defaultdict(set) # channel -> set of dates | |
| # Track videos by date (using text_clean as identifier since we don't have video_id) | |
| date_by_video = defaultdict(set) # video title -> set of dates | |
| video_info = {} # video title -> {channel, first_seen, total_watches} | |
| # Also track micro_topics by date | |
| topic_by_date = defaultdict(set) # date -> set of topics | |
| date_by_topic = defaultdict(set) # topic -> set of dates | |
| for event in watch_events: | |
| ts = event.get("timestamp_local") or event.get("timestamp_utc") | |
| if not ts: | |
| continue | |
| # Extract date | |
| try: | |
| date_str = ts.split("T")[0] if "T" in ts else ts[:10] | |
| except: | |
| continue | |
| # Track channel | |
| channel = event.get("channel_clean") | |
| if channel: | |
| channel_by_date[date_str].add(channel) | |
| date_by_channel[channel].add(date_str) | |
| # Track video (using text_clean as identifier) | |
| video_title = event.get("text_clean") or event.get("text_raw") | |
| if video_title and len(video_title) > 5: # Skip very short titles | |
| date_by_video[video_title].add(date_str) | |
| if video_title not in video_info: | |
| video_info[video_title] = { | |
| "channel": channel or "Unknown", | |
| "first_seen": date_str, | |
| "total_watches": 0 | |
| } | |
| video_info[video_title]["total_watches"] += 1 | |
| # Track micro_topics if available | |
| topics = event.get("micro_topics", []) | |
| for topic in topics: | |
| topic_by_date[date_str].add(topic) | |
| date_by_topic[topic].add(date_str) | |
| # Find consecutive day streaks for each channel | |
| def find_daily_streaks(dates_set: set, min_days: int) -> list: | |
| """Find streaks of consecutive days.""" | |
| if len(dates_set) < min_days: | |
| return [] | |
| sorted_dates = sorted(dates_set) | |
| streaks = [] | |
| current_streak = [sorted_dates[0]] | |
| for i in range(1, len(sorted_dates)): | |
| try: | |
| prev_date = datetime.strptime(sorted_dates[i-1], "%Y-%m-%d") | |
| curr_date = datetime.strptime(sorted_dates[i], "%Y-%m-%d") | |
| if (curr_date - prev_date).days == 1: | |
| current_streak.append(sorted_dates[i]) | |
| else: | |
| if len(current_streak) >= min_days: | |
| streaks.append({ | |
| "start": current_streak[0], | |
| "end": current_streak[-1], | |
| "days": len(current_streak) | |
| }) | |
| current_streak = [sorted_dates[i]] | |
| except: | |
| current_streak = [sorted_dates[i]] | |
| # Don't forget the last streak | |
| if len(current_streak) >= min_days: | |
| streaks.append({ | |
| "start": current_streak[0], | |
| "end": current_streak[-1], | |
| "days": len(current_streak) | |
| }) | |
| return streaks | |
| # Find channel habits | |
| channel_habits = [] | |
| for channel, dates in date_by_channel.items(): | |
| streaks = find_daily_streaks(dates, min_streak_days) | |
| if streaks: | |
| # Calculate total days in habits | |
| total_habit_days = sum(s["days"] for s in streaks) | |
| longest_streak = max(s["days"] for s in streaks) | |
| channel_habits.append({ | |
| "channel": channel, | |
| "total_days_watched": len(dates), | |
| "habit_streaks": streaks, | |
| "longest_streak": longest_streak, | |
| "total_habit_days": total_habit_days, | |
| "habit_score": min(100, total_habit_days * 10) # Score 0-100 | |
| }) | |
| # Sort by longest streak, then total habit days | |
| channel_habits.sort(key=lambda x: (x["longest_streak"], x["total_habit_days"]), reverse=True) | |
| # Find video habits (videos watched on multiple different days) | |
| video_habits = [] | |
| for video_title, dates in date_by_video.items(): | |
| if len(dates) >= 2: # Watched on at least 2 different days | |
| streaks = find_daily_streaks(dates, min_streak_days) | |
| info = video_info.get(video_title, {}) | |
| video_habits.append({ | |
| "title": video_title[:80] + "..." if len(video_title) > 80 else video_title, | |
| "channel": info.get("channel", "Unknown"), | |
| "days_watched": len(dates), | |
| "total_watches": info.get("total_watches", 0), | |
| "first_seen": info.get("first_seen", ""), | |
| "has_streak": len(streaks) > 0, | |
| "longest_streak": max((s["days"] for s in streaks), default=0) | |
| }) | |
| # Sort video habits by days watched, then total watches | |
| video_habits.sort(key=lambda x: (x["days_watched"], x["total_watches"]), reverse=True) | |
| # Find content/topic habits (only for topics with >= 5 total occurrences) | |
| content_habits = [] | |
| for topic, dates in date_by_topic.items(): | |
| if len(dates) < 5: # Skip rare topics | |
| continue | |
| streaks = find_daily_streaks(dates, min_streak_days) | |
| if streaks: | |
| total_habit_days = sum(s["days"] for s in streaks) | |
| longest_streak = max(s["days"] for s in streaks) | |
| content_habits.append({ | |
| "topic": topic, | |
| "total_days": len(dates), | |
| "habit_streaks": streaks, | |
| "longest_streak": longest_streak, | |
| "total_habit_days": total_habit_days | |
| }) | |
| # Sort content habits | |
| content_habits.sort(key=lambda x: (x["longest_streak"], x["total_habit_days"]), reverse=True) | |
| # Calculate overall habit strength | |
| total_channels_with_habits = len(channel_habits) | |
| max_channel_streak = max((h["longest_streak"] for h in channel_habits), default=0) | |
| # Habit strength: 0-100 score | |
| habit_strength = 0 | |
| if total_channels_with_habits > 0: | |
| # Factors: number of habitual channels, longest streak, total habit days | |
| habit_strength = min(100, ( | |
| total_channels_with_habits * 10 + | |
| max_channel_streak * 5 + | |
| sum(h["total_habit_days"] for h in channel_habits[:5]) # Top 5 | |
| )) | |
| # Generate habit summary | |
| summary_parts = [] | |
| if channel_habits: | |
| top_habit = channel_habits[0] | |
| summary_parts.append( | |
| f"Strongest habit: {top_habit['channel']} watched {top_habit['longest_streak']} days in a row" | |
| ) | |
| if total_channels_with_habits > 1: | |
| summary_parts.append(f"{total_channels_with_habits} channels with daily habits") | |
| if video_habits: | |
| summary_parts.append(f"{len(video_habits)} rewatched videos") | |
| if content_habits: | |
| summary_parts.append(f"{len(content_habits)} recurring topics") | |
| return { | |
| "channel_habits": channel_habits, # Return ALL channel habits | |
| "video_habits": video_habits[:30], # Top 30 rewatched videos | |
| "content_habits": content_habits[:30], # Top 30 topics | |
| "habit_strength": habit_strength, | |
| "total_channels_with_habits": total_channels_with_habits, | |
| "total_videos_rewatched": len(video_habits), | |
| "total_topics_with_habits": len(content_habits), | |
| "max_streak_days": max_channel_streak, | |
| "summary": " | ".join(summary_parts) if summary_parts else "No strong habits detected" | |
| } | |
| def get_time_spent(token: str, break_threshold_minutes: int = 60, last_video_minutes: int = 5) -> dict: | |
| """ | |
| Calculate approximate time spent on YouTube. | |
| Uses session detection: groups continuous watching periods separated by | |
| significant breaks (>break_threshold_minutes). | |
| Args: | |
| token: Session token | |
| break_threshold_minutes: Gap that ends a session (default 60 min) | |
| last_video_minutes: Estimate for last video duration (default 5 min) | |
| Returns: | |
| - total_minutes, total_hours | |
| - average_daily_minutes | |
| - sessions stats (count, average, longest) | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| # Get watch events with valid timestamps | |
| watch_events = [] | |
| for e in events: | |
| if e.get("type") == "watch": | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| if "T" in ts: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| else: | |
| dt = datetime.strptime(ts[:19], "%Y-%m-%dT%H:%M:%S") | |
| watch_events.append({"timestamp": dt, "event": e}) | |
| except: | |
| pass | |
| if not watch_events: | |
| return { | |
| "total_minutes": 0, | |
| "total_hours": 0, | |
| "average_daily_minutes": 0, | |
| "total_days": 0, | |
| "sessions": { | |
| "total_count": 0, | |
| "average_duration_minutes": 0, | |
| "longest_session_minutes": 0 | |
| }, | |
| "summary": "No watch events with timestamps" | |
| } | |
| # Sort by timestamp | |
| watch_events.sort(key=lambda x: x["timestamp"]) | |
| # Detect sessions | |
| sessions = [] | |
| break_threshold = timedelta(minutes=break_threshold_minutes) | |
| session_start = watch_events[0]["timestamp"] | |
| session_end = watch_events[0]["timestamp"] | |
| session_event_count = 1 | |
| for i in range(1, len(watch_events)): | |
| current = watch_events[i]["timestamp"] | |
| previous = watch_events[i-1]["timestamp"] | |
| gap = current - previous | |
| if gap > break_threshold: | |
| # End current session | |
| duration = (session_end - session_start).total_seconds() / 60.0 | |
| duration += last_video_minutes # Add estimate for last video | |
| sessions.append({ | |
| "start": session_start, | |
| "end": session_end, | |
| "duration_minutes": duration, | |
| "event_count": session_event_count | |
| }) | |
| # Start new session | |
| session_start = current | |
| session_end = current | |
| session_event_count = 1 | |
| else: | |
| session_end = current | |
| session_event_count += 1 | |
| # Don't forget the last session | |
| duration = (session_end - session_start).total_seconds() / 60.0 | |
| duration += last_video_minutes | |
| sessions.append({ | |
| "start": session_start, | |
| "end": session_end, | |
| "duration_minutes": duration, | |
| "event_count": session_event_count | |
| }) | |
| # Calculate totals | |
| total_minutes = sum(s["duration_minutes"] for s in sessions) | |
| total_hours = round(total_minutes / 60, 1) | |
| # Get unique days | |
| unique_days = set() | |
| for we in watch_events: | |
| unique_days.add(we["timestamp"].date()) | |
| total_days = len(unique_days) | |
| average_daily = round(total_minutes / total_days, 1) if total_days > 0 else 0 | |
| # Session stats | |
| session_durations = [s["duration_minutes"] for s in sessions] | |
| avg_session = round(sum(session_durations) / len(session_durations), 1) if sessions else 0 | |
| longest_session = round(max(session_durations), 1) if sessions else 0 | |
| # Generate summary | |
| if total_hours >= 24: | |
| time_str = f"{int(total_hours // 24)} days {int(total_hours % 24)} hours" | |
| else: | |
| time_str = f"{total_hours} hours" | |
| summary = f"Spent approximately {time_str} on YouTube across {total_days} days ({len(sessions)} sessions)" | |
| return { | |
| "total_minutes": round(total_minutes, 1), | |
| "total_hours": total_hours, | |
| "average_daily_minutes": average_daily, | |
| "total_days": total_days, | |
| "sessions": { | |
| "total_count": len(sessions), | |
| "average_duration_minutes": avg_session, | |
| "longest_session_minutes": longest_session | |
| }, | |
| "summary": summary | |
| } | |
| def get_channel_distribution(token: str) -> dict: | |
| """ | |
| Get channel distribution by view count bins. | |
| Returns: | |
| - bin_distribution: Channels grouped by view count [1, 2-5, 6-10, 11-20, 21-50, 51-100, 100+] | |
| - temporal_by_bin: Monthly breakdown of videos watched per bin | |
| """ | |
| events, _ = load_session_events(token) | |
| if not events: | |
| return {"error": "Session not found or empty"} | |
| from collections import defaultdict | |
| from datetime import datetime | |
| # Get watch events | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| # Count views per channel | |
| channel_counts = Counter( | |
| e.get("channel_clean") for e in watch_events | |
| if e.get("channel_clean") | |
| ) | |
| # Define bins | |
| bins = [ | |
| (1, 1, "1"), | |
| (2, 5, "2-5"), | |
| (6, 10, "6-10"), | |
| (11, 20, "11-20"), | |
| (21, 50, "21-50"), | |
| (51, 100, "51-100"), | |
| (101, float('inf'), "100+") | |
| ] | |
| # Count channels per bin | |
| bin_distribution = [] | |
| channel_bin_map = {} # Map channel -> bin label | |
| for min_val, max_val, label in bins: | |
| channels_in_bin = [ | |
| ch for ch, count in channel_counts.items() | |
| if min_val <= count <= max_val | |
| ] | |
| video_count = sum(channel_counts[ch] for ch in channels_in_bin) | |
| bin_distribution.append({ | |
| "bin": label, | |
| "channel_count": len(channels_in_bin), | |
| "video_count": video_count | |
| }) | |
| for ch in channels_in_bin: | |
| channel_bin_map[ch] = label | |
| # Temporal breakdown by bin and month | |
| monthly_data = defaultdict(lambda: defaultdict(int)) # {month: {bin: count}} | |
| for event in watch_events: | |
| channel = event.get("channel_clean") | |
| ts = event.get("timestamp_local") or event.get("timestamp_utc") | |
| if not channel or not ts: | |
| continue | |
| bin_label = channel_bin_map.get(channel, "1") # Default to "1" | |
| try: | |
| if isinstance(ts, str): | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| else: | |
| continue | |
| month_key = f"{dt.year}-{dt.month:02d}" | |
| monthly_data[month_key][bin_label] += 1 | |
| except: | |
| continue | |
| # Convert to sorted list | |
| temporal_by_bin = [] | |
| for month in sorted(monthly_data.keys()): | |
| month_entry = { | |
| "month": month, | |
| "bins": {} | |
| } | |
| for min_val, max_val, label in bins: | |
| month_entry["bins"][label] = monthly_data[month].get(label, 0) | |
| temporal_by_bin.append(month_entry) | |
| # Summary stats | |
| total_channels = len(channel_counts) | |
| total_videos = sum(channel_counts.values()) | |
| single_view_channels = sum(1 for count in channel_counts.values() if count == 1) | |
| return { | |
| "bin_distribution": bin_distribution, | |
| "temporal_by_bin": temporal_by_bin, | |
| "stats": { | |
| "total_channels": total_channels, | |
| "total_videos": total_videos, | |
| "single_view_channels": single_view_channels, | |
| "single_view_percentage": round(single_view_channels / total_channels * 100, 1) if total_channels > 0 else 0 | |
| } | |
| } | |
| def get_full_analytics(token: str) -> dict: | |
| """Get all analytics in one call.""" | |
| return { | |
| "summary": get_session_summary(token), | |
| "channels": get_channel_analytics(token), | |
| "watch_patterns": get_watch_patterns(token), | |
| "searches": get_search_analytics(token), | |
| "subscription_overlap": get_subscription_overlap(token), | |
| "behavior_anomalies": get_behavior_anomalies(token), | |
| "habit_formation": get_habit_formation(token), | |
| "temporal_trends": get_temporal_trends(token), | |
| "time_spent": get_time_spent(token), | |
| "channel_distribution": get_channel_distribution(token) | |
| } | |