Spaces:
Sleeping
Sleeping
| """ | |
| Wrapped Service | |
| Generates card JSON for YouTube Wrapped from events (in-memory, no storage). | |
| Based on cards_doc.md specification. | |
| """ | |
| from collections import Counter, defaultdict | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Tuple | |
| import re | |
| def generate_wrapped_cards(events: List[Dict], stats: Dict) -> Dict: | |
| """ | |
| Generate all card data from preprocessed events. | |
| This is the main entry point - processes events and returns | |
| structured JSON matching the cards in cards_doc.md. | |
| Args: | |
| events: List of preprocessed event dictionaries | |
| stats: Stats dictionary from preprocessing | |
| Returns: | |
| Dictionary with all card data | |
| """ | |
| # Filter watch events | |
| watch_events = [e for e in events if e.get("type") == "watch"] | |
| search_events = [e for e in events if e.get("type") == "search"] | |
| subscribe_events = [e for e in events if e.get("type") == "subscribe"] | |
| if not watch_events: | |
| return {"error": "No watch events found"} | |
| # Generate each card section | |
| cards = { | |
| "intro": generate_intro_card(stats), | |
| "stats_overview": generate_stats_overview(events, stats, watch_events), | |
| "time_spent": generate_time_spent_card(watch_events), | |
| "peak_month": generate_peak_month_card(watch_events), | |
| "top_channel": generate_top_channel_card(watch_events), | |
| "top_channels": generate_top_channels_card(watch_events), | |
| "watch_cycle": generate_watch_cycle_card(watch_events), | |
| "peak_day": generate_peak_day_card(watch_events), | |
| "longest_streak": generate_longest_streak_card(watch_events), | |
| "personality": generate_personality_card(), # Hardcoded for now | |
| "binge_sessions": generate_binge_sessions_card(watch_events), | |
| "late_night": generate_late_night_card(watch_events), | |
| "habits": generate_habits_card(watch_events), | |
| "patterns": generate_patterns_card(watch_events), # NEW: Association rule patterns | |
| "rewatched": generate_rewatched_card(watch_events), | |
| "subscriptions": generate_subscriptions_card(watch_events, subscribe_events), | |
| "searches": generate_searches_card(search_events), | |
| "first_last": generate_first_last_card(watch_events), | |
| "metadata": { | |
| "generated_at": datetime.now().isoformat(), | |
| "version": "1.0", | |
| "total_events": len(events), | |
| "total_watch": len(watch_events) | |
| } | |
| } | |
| return cards | |
| # ============================================ | |
| # INTRO CARD | |
| # ============================================ | |
| def generate_intro_card(stats: Dict) -> Dict: | |
| """Generate intro card data.""" | |
| return { | |
| "username": "there", # Could be extracted from data if available | |
| "year": datetime.now().year | |
| } | |
| # ============================================ | |
| # STATS OVERVIEW CARD | |
| # ============================================ | |
| def generate_stats_overview(events: List[Dict], stats: Dict, watch_events: List[Dict]) -> Dict: | |
| """Generate stats overview card data.""" | |
| # Unique channels | |
| channels = set(e.get("channel_clean") for e in watch_events if e.get("channel_clean")) | |
| # Active days | |
| active_days = set() | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| active_days.add(dt.date()) | |
| except: | |
| pass | |
| # Sessions (using 60 min break threshold) | |
| sessions = count_sessions(watch_events, break_threshold_minutes=60) | |
| return { | |
| "videos_watched": len(watch_events), | |
| "channels_explored": len(channels), | |
| "active_days": len(active_days), | |
| "total_sessions": sessions | |
| } | |
| # ============================================ | |
| # TIME SPENT CARD | |
| # ============================================ | |
| def generate_time_spent_card(watch_events: List[Dict]) -> Dict: | |
| """Generate time spent card data.""" | |
| time_data = compute_time_spent(watch_events) | |
| return { | |
| "total_hours": time_data["total_hours"], | |
| "total_minutes": time_data["total_minutes"], | |
| "avg_daily_minutes": time_data["avg_daily_minutes"] | |
| } | |
| def compute_time_spent(watch_events: List[Dict], | |
| break_threshold_minutes: int = 60, | |
| last_video_minutes: int = 5) -> Dict: | |
| """Compute approximate time spent on YouTube.""" | |
| # Parse timestamps | |
| timed_events = [] | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| timed_events.append(dt) | |
| except: | |
| pass | |
| if not timed_events: | |
| return {"total_hours": 0, "total_minutes": 0, "avg_daily_minutes": 0} | |
| timed_events.sort() | |
| # Detect sessions | |
| sessions = [] | |
| break_threshold = timedelta(minutes=break_threshold_minutes) | |
| session_start = timed_events[0] | |
| session_end = timed_events[0] | |
| for i in range(1, len(timed_events)): | |
| gap = timed_events[i] - timed_events[i-1] | |
| if gap > break_threshold: | |
| duration = (session_end - session_start).total_seconds() / 60.0 + last_video_minutes | |
| sessions.append(duration) | |
| session_start = timed_events[i] | |
| session_end = timed_events[i] | |
| # Last session | |
| duration = (session_end - session_start).total_seconds() / 60.0 + last_video_minutes | |
| sessions.append(duration) | |
| total_minutes = sum(sessions) | |
| total_hours = round(total_minutes / 60, 1) | |
| # Unique days | |
| unique_days = set(dt.date() for dt in timed_events) | |
| avg_daily = round(total_minutes / len(unique_days), 1) if unique_days else 0 | |
| return { | |
| "total_hours": total_hours, | |
| "total_minutes": round(total_minutes, 0), | |
| "avg_daily_minutes": avg_daily, | |
| "total_days": len(unique_days), | |
| "session_count": len(sessions) | |
| } | |
| def count_sessions(watch_events: List[Dict], break_threshold_minutes: int = 60) -> int: | |
| """Count number of watch sessions.""" | |
| timed_events = [] | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| timed_events.append(dt) | |
| except: | |
| pass | |
| if not timed_events: | |
| return 0 | |
| timed_events.sort() | |
| sessions = 1 | |
| break_threshold = timedelta(minutes=break_threshold_minutes) | |
| for i in range(1, len(timed_events)): | |
| if timed_events[i] - timed_events[i-1] > break_threshold: | |
| sessions += 1 | |
| return sessions | |
| # ============================================ | |
| # PEAK MONTH CARD | |
| # ============================================ | |
| def generate_peak_month_card(watch_events: List[Dict]) -> Dict: | |
| """Generate peak month card data.""" | |
| monthly_counts = Counter() | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| month_key = dt.strftime("%B") # Full month name | |
| monthly_counts[month_key] += 1 | |
| except: | |
| pass | |
| if not monthly_counts: | |
| return {"month": "Unknown", "watches": 0} | |
| peak_month, peak_count = monthly_counts.most_common(1)[0] | |
| return { | |
| "month": peak_month, | |
| "watches": peak_count | |
| } | |
| # ============================================ | |
| # TOP CHANNEL CARDS | |
| # ============================================ | |
| def generate_top_channel_card(watch_events: List[Dict]) -> Dict: | |
| """Generate #1 channel spotlight card data.""" | |
| channel_counts = Counter( | |
| e.get("channel_clean") for e in watch_events | |
| if e.get("channel_clean") | |
| ) | |
| if not channel_counts: | |
| return {"name": "Unknown", "views": 0, "percentage": 0} | |
| top_channel, top_count = channel_counts.most_common(1)[0] | |
| total = sum(channel_counts.values()) | |
| percentage = round(top_count / total * 100, 1) if total > 0 else 0 | |
| return { | |
| "name": top_channel, | |
| "views": top_count, | |
| "percentage": percentage | |
| } | |
| def generate_top_channels_card(watch_events: List[Dict], top_n: int = 5) -> Dict: | |
| """Generate top channels list card data.""" | |
| channel_counts = Counter( | |
| e.get("channel_clean") for e in watch_events | |
| if e.get("channel_clean") | |
| ) | |
| top_channels = [ | |
| {"name": ch, "views": count} | |
| for ch, count in channel_counts.most_common(top_n) | |
| ] | |
| return {"channels": top_channels} | |
| # ============================================ | |
| # WATCH CYCLE CARD | |
| # ============================================ | |
| def generate_watch_cycle_card(watch_events: List[Dict]) -> Dict: | |
| """Generate 24-hour watch cycle card data.""" | |
| hourly_counts = Counter() | |
| for e in watch_events: | |
| hour = e.get("hour_local") | |
| if hour is not None: | |
| hourly_counts[hour] += 1 | |
| if not hourly_counts: | |
| return {"peak_hour": 12, "hourly_data": [0] * 24} | |
| peak_hour = hourly_counts.most_common(1)[0][0] | |
| hourly_data = [hourly_counts.get(h, 0) for h in range(24)] | |
| return { | |
| "peak_hour": peak_hour, | |
| "hourly_data": hourly_data | |
| } | |
| # ============================================ | |
| # PEAK DAY CARD | |
| # ============================================ | |
| def generate_peak_day_card(watch_events: List[Dict]) -> Dict: | |
| """Generate day of week card data.""" | |
| day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] | |
| daily_counts = Counter() | |
| for e in watch_events: | |
| day = e.get("day_of_week") | |
| if day is not None: | |
| daily_counts[day] += 1 | |
| if not daily_counts: | |
| return {"day": "Saturday", "daily_data": {}} | |
| peak_day_num = daily_counts.most_common(1)[0][0] | |
| peak_day = day_names[peak_day_num] if 0 <= peak_day_num < 7 else "Saturday" | |
| daily_data = {day_names[i]: daily_counts.get(i, 0) for i in range(7)} | |
| return { | |
| "day": peak_day, | |
| "daily_data": daily_data | |
| } | |
| # ============================================ | |
| # LONGEST STREAK CARD | |
| # ============================================ | |
| def generate_longest_streak_card(watch_events: List[Dict]) -> Dict: | |
| """Generate longest streak card data.""" | |
| # Get unique dates | |
| dates = set() | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| dates.add(dt.date()) | |
| except: | |
| pass | |
| if not dates: | |
| return {"days": 0, "dates": ""} | |
| sorted_dates = sorted(dates) | |
| # Find longest consecutive streak | |
| max_streak = 1 | |
| current_streak = 1 | |
| streak_start = sorted_dates[0] | |
| max_streak_start = sorted_dates[0] | |
| max_streak_end = sorted_dates[0] | |
| for i in range(1, len(sorted_dates)): | |
| if sorted_dates[i] - sorted_dates[i-1] == timedelta(days=1): | |
| current_streak += 1 | |
| if current_streak > max_streak: | |
| max_streak = current_streak | |
| max_streak_start = streak_start | |
| max_streak_end = sorted_dates[i] | |
| else: | |
| current_streak = 1 | |
| streak_start = sorted_dates[i] | |
| dates_str = f"{max_streak_start.strftime('%b %d')} - {max_streak_end.strftime('%b %d')}" | |
| return { | |
| "days": max_streak, | |
| "dates": dates_str | |
| } | |
| # ============================================ | |
| # PERSONALITY CARD (HARDCODED) | |
| # ============================================ | |
| def generate_personality_card() -> Dict: | |
| """Generate personality card data (hardcoded for now).""" | |
| return { | |
| "type": "Curious Mind", | |
| "description": "You dive deep into diverse topics" | |
| } | |
| # ============================================ | |
| # BINGE SESSIONS CARD | |
| # ============================================ | |
| def generate_binge_sessions_card(watch_events: List[Dict], | |
| binge_threshold_hours: float = 2.0) -> Dict: | |
| """Generate binge sessions card data.""" | |
| # Compute sessions with duration | |
| sessions = compute_sessions_with_details(watch_events) | |
| # Filter binges (sessions > threshold) | |
| threshold_minutes = binge_threshold_hours * 60 | |
| binges = [s for s in sessions if s["duration_minutes"] >= threshold_minutes] | |
| if not binges: | |
| return { | |
| "count": 0, | |
| "longest_duration": "0h 0m", | |
| "longest_date": "" | |
| } | |
| # Find longest | |
| longest = max(binges, key=lambda x: x["duration_minutes"]) | |
| duration_hours = int(longest["duration_minutes"] // 60) | |
| duration_mins = int(longest["duration_minutes"] % 60) | |
| return { | |
| "count": len(binges), | |
| "longest_duration": f"{duration_hours}h {duration_mins}m", | |
| "longest_date": longest["start"].strftime("%B %d") if longest.get("start") else "" | |
| } | |
| def compute_sessions_with_details(watch_events: List[Dict], | |
| break_threshold_minutes: int = 60) -> List[Dict]: | |
| """Compute sessions with start time and duration.""" | |
| timed_events = [] | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| timed_events.append({"dt": dt, "event": e}) | |
| except: | |
| pass | |
| if not timed_events: | |
| return [] | |
| timed_events.sort(key=lambda x: x["dt"]) | |
| sessions = [] | |
| break_threshold = timedelta(minutes=break_threshold_minutes) | |
| session_start = timed_events[0]["dt"] | |
| session_end = timed_events[0]["dt"] | |
| event_count = 1 | |
| for i in range(1, len(timed_events)): | |
| current = timed_events[i]["dt"] | |
| gap = current - timed_events[i-1]["dt"] | |
| if gap > break_threshold: | |
| duration = (session_end - session_start).total_seconds() / 60.0 + 5 | |
| sessions.append({ | |
| "start": session_start, | |
| "end": session_end, | |
| "duration_minutes": duration, | |
| "event_count": event_count | |
| }) | |
| session_start = current | |
| session_end = current | |
| event_count = 1 | |
| else: | |
| session_end = current | |
| event_count += 1 | |
| # Last session | |
| duration = (session_end - session_start).total_seconds() / 60.0 + 5 | |
| sessions.append({ | |
| "start": session_start, | |
| "end": session_end, | |
| "duration_minutes": duration, | |
| "event_count": event_count | |
| }) | |
| return sessions | |
| # ============================================ | |
| # LATE NIGHT CARD | |
| # ============================================ | |
| def generate_late_night_card(watch_events: List[Dict]) -> Dict: | |
| """Generate late night activity card data.""" | |
| late_night_events = [] | |
| for e in watch_events: | |
| hour = e.get("hour_local") | |
| if hour is not None and (hour >= 0 and hour < 5): # 12 AM - 5 AM | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| late_night_events.append({"dt": dt, "event": e, "hour": hour}) | |
| except: | |
| pass | |
| if not late_night_events: | |
| return { | |
| "videos": 0, | |
| "latest_time": "", | |
| "latest_date": "" | |
| } | |
| # Find latest (highest hour, or closest to 5 AM) | |
| latest = max(late_night_events, key=lambda x: (x["hour"], x["dt"])) | |
| return { | |
| "videos": len(late_night_events), | |
| "latest_time": latest["dt"].strftime("%I:%M %p").lstrip("0") if latest else "", | |
| "latest_date": latest["dt"].strftime("%B %d") if latest else "" | |
| } | |
| # ============================================ | |
| # HABITS CARD | |
| # ============================================ | |
| def generate_habits_card(watch_events: List[Dict]) -> Dict: | |
| """ | |
| Generate habits card data. | |
| Habit criteria: | |
| 1. Channel must have >= 4 watches | |
| 2. Watches must span at least 7 days (not all in one burst) | |
| 3. Average gap between watches <= 14 days | |
| Ranking: | |
| - Primary: smaller avg gap = stronger habit | |
| - Secondary: more watches = stronger habit | |
| """ | |
| # Group events by channel with timestamps | |
| channel_watches = defaultdict(list) | |
| for e in watch_events: | |
| channel = e.get("channel_clean") | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if channel and ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| channel_watches[channel].append(dt) | |
| except: | |
| pass | |
| # Calculate watch frequency for each channel | |
| habits = [] | |
| for channel, timestamps in channel_watches.items(): | |
| if len(timestamps) >= 4: # Need at least 4 watches to be a habit | |
| timestamps.sort() | |
| # Calculate total timespan | |
| timespan_days = (timestamps[-1] - timestamps[0]).days | |
| # Must span at least 7 days (not a one-day binge) | |
| if timespan_days < 7: | |
| continue | |
| # Calculate average gap | |
| gaps = [(timestamps[i+1] - timestamps[i]).days | |
| for i in range(len(timestamps)-1)] | |
| avg_gap = sum(gaps) / len(gaps) if gaps else 0 | |
| # Must be watched at least every 2 weeks on average | |
| if avg_gap <= 14: | |
| # Score combines consistency and frequency | |
| # Lower avg_gap + higher watch count = better score | |
| habit_score = avg_gap - (len(timestamps) * 0.1) # Slight boost for more watches | |
| habits.append({ | |
| "channel": channel, | |
| "frequency": f"{avg_gap:.1f} days", | |
| "watch_count": len(timestamps), | |
| "avg_gap_days": avg_gap, | |
| "timespan_days": timespan_days, | |
| "habit_score": habit_score | |
| }) | |
| # Sort by habit score (lower = stronger habit) | |
| habits.sort(key=lambda x: x["habit_score"]) | |
| if not habits: | |
| return { | |
| "total_channels": 0, | |
| "strongest": {"channel": "", "frequency": ""}, | |
| "top_habits": [] | |
| } | |
| return { | |
| "total_channels": len(habits), | |
| "strongest": { | |
| "channel": habits[0]["channel"], | |
| "frequency": habits[0]["frequency"] | |
| }, | |
| "top_habits": habits[:5] | |
| } | |
| # ============================================ | |
| # REWATCHED CARD | |
| # ============================================ | |
| def generate_rewatched_card(watch_events: List[Dict]) -> Dict: | |
| """Generate rewatched videos card data.""" | |
| video_counts = Counter() | |
| video_titles = {} | |
| for e in watch_events: | |
| title = e.get("title_original") or e.get("text_clean") | |
| if title: | |
| video_counts[title] += 1 | |
| video_titles[title] = title | |
| # Filter to rewatched only (count > 1) | |
| rewatched = {title: count for title, count in video_counts.items() if count > 1} | |
| if not rewatched: | |
| return { | |
| "count": 0, | |
| "top_video": "", | |
| "top_times": 0 | |
| } | |
| top_video = max(rewatched.items(), key=lambda x: x[1]) | |
| return { | |
| "count": len(rewatched), | |
| "top_video": top_video[0][:100], # Truncate long titles | |
| "top_times": top_video[1] | |
| } | |
| # ============================================ | |
| # SUBSCRIPTIONS CARD | |
| # ============================================ | |
| def generate_subscriptions_card(watch_events: List[Dict], | |
| subscribe_events: List[Dict]) -> Dict: | |
| """Generate subscriptions card data.""" | |
| # Get subscribed channels | |
| subscribed = set() | |
| for e in subscribe_events: | |
| channel = e.get("channel_clean") or e.get("text_clean") | |
| if channel: | |
| subscribed.add(channel) | |
| # Get watched channels | |
| watched = set(e.get("channel_clean") for e in watch_events if e.get("channel_clean")) | |
| # Calculate overlap | |
| overlap = subscribed & watched | |
| total = len(subscribed) | |
| watched_count = len(overlap) | |
| ghost = total - watched_count | |
| percentage = round(watched_count / total * 100, 1) if total > 0 else 0 | |
| return { | |
| "total": total, | |
| "watched": watched_count, | |
| "ghost": ghost, | |
| "overlap_percentage": percentage | |
| } | |
| # ============================================ | |
| # SEARCHES CARD | |
| # ============================================ | |
| def generate_searches_card(search_events: List[Dict]) -> Dict: | |
| """Generate searches card data.""" | |
| if not search_events: | |
| return { | |
| "total": 0, | |
| "top_search": "", | |
| "top_searches": [] | |
| } | |
| search_terms = Counter() | |
| for e in search_events: | |
| term = e.get("text_clean") or e.get("title_original") | |
| if term: | |
| search_terms[term.lower()] += 1 | |
| if not search_terms: | |
| return { | |
| "total": 0, | |
| "top_search": "", | |
| "top_searches": [] | |
| } | |
| top_searches = [{"term": term, "count": count} | |
| for term, count in search_terms.most_common(5)] | |
| return { | |
| "total": len(search_events), | |
| "top_search": top_searches[0]["term"] if top_searches else "", | |
| "top_searches": top_searches | |
| } | |
| # ============================================ | |
| # FIRST & LAST VIDEO CARD | |
| # ============================================ | |
| def generate_first_last_card(watch_events: List[Dict]) -> Dict: | |
| """Generate first and last video card data.""" | |
| timed_events = [] | |
| for e in watch_events: | |
| ts = e.get("timestamp_local") or e.get("timestamp_utc") | |
| if ts: | |
| try: | |
| dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) | |
| timed_events.append({"dt": dt, "event": e}) | |
| except: | |
| pass | |
| if not timed_events: | |
| return { | |
| "first_video": {"title": "", "date": ""}, | |
| "last_video": {"title": "", "date": ""} | |
| } | |
| timed_events.sort(key=lambda x: x["dt"]) | |
| first = timed_events[0] | |
| last = timed_events[-1] | |
| def get_title(event): | |
| return (event.get("title_original") or | |
| event.get("text_clean") or "Unknown")[:80] | |
| return { | |
| "first_video": { | |
| "title": get_title(first["event"]), | |
| "date": first["dt"].strftime("%B %d, %Y") | |
| }, | |
| "last_video": { | |
| "title": get_title(last["event"]), | |
| "date": last["dt"].strftime("%B %d, %Y") | |
| } | |
| } | |
| # ============================================ | |
| # PATTERNS CARD (Association Rule Mining) | |
| # ============================================ | |
| def generate_patterns_card(watch_events: List[Dict]) -> Dict: | |
| """ | |
| Discover viewing patterns using lightweight association rule mining. | |
| Focus on TOP CHANNELS ONLY to avoid noise from rarely-watched channels. | |
| Patterns detected: | |
| 1. Channel + Day of week (e.g., "You watch X every Sunday") | |
| 2. Channel + Time of day (e.g., "You watch X in the mornings") | |
| 3. Weekend preference (e.g., "X is your weekend channel") | |
| """ | |
| day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] | |
| time_slots = { | |
| "morning": (5, 12), # 5 AM - 12 PM | |
| "afternoon": (12, 17), # 12 PM - 5 PM | |
| "evening": (17, 21), # 5 PM - 9 PM | |
| "night": (21, 24), # 9 PM - 12 AM | |
| "late night": (0, 5), # 12 AM - 5 AM | |
| } | |
| # First, identify TOP CHANNELS (top 20 by watch count) | |
| channel_counts = Counter( | |
| e.get("channel_clean") for e in watch_events | |
| if e.get("channel_clean") | |
| ) | |
| if not channel_counts: | |
| return {"total_patterns": 0, "top_patterns": [], "insights": []} | |
| # Get top 20 channels (or fewer if user watched less) | |
| top_channels = set(ch for ch, _ in channel_counts.most_common(20)) | |
| # Also require minimum 10 watches to be considered | |
| min_watches = max(10, len(watch_events) // 100) # At least 1% of total or 10 | |
| top_channels = {ch for ch in top_channels if channel_counts[ch] >= min_watches} | |
| if not top_channels: | |
| return {"total_patterns": 0, "top_patterns": [], "insights": []} | |
| patterns = [] | |
| # 1. Channel + Day of Week associations (top channels only) | |
| channel_day_patterns = find_channel_day_patterns(watch_events, day_names, top_channels) | |
| patterns.extend(channel_day_patterns) | |
| # 2. Channel + Time Slot associations (top channels only) | |
| channel_time_patterns = find_channel_time_patterns(watch_events, time_slots, top_channels) | |
| patterns.extend(channel_time_patterns) | |
| # 3. Weekend vs Weekday preferences (top channels only) | |
| weekend_pattern = find_weekend_pattern(watch_events, top_channels) | |
| if weekend_pattern: | |
| patterns.append(weekend_pattern) | |
| # Sort by a combined score: confidence * watch_count (prioritize high-volume patterns) | |
| patterns.sort(key=lambda x: x.get("confidence", 0) * x.get("count", 1), reverse=True) | |
| # Take top 5 patterns | |
| top_patterns = patterns[:5] | |
| return { | |
| "total_patterns": len(patterns), | |
| "top_patterns": top_patterns, | |
| "insights": [p["insight"] for p in top_patterns] | |
| } | |
| def find_channel_day_patterns(watch_events: List[Dict], day_names: List[str], top_channels: set) -> List[Dict]: | |
| """Find channels that are strongly associated with specific days (top channels only).""" | |
| # Count channel occurrences per day | |
| channel_day_counts = defaultdict(lambda: defaultdict(int)) | |
| channel_total = defaultdict(int) | |
| day_total = defaultdict(int) | |
| for e in watch_events: | |
| channel = e.get("channel_clean") | |
| day = e.get("day_of_week") | |
| if channel and day is not None and channel in top_channels: | |
| channel_day_counts[channel][day] += 1 | |
| channel_total[channel] += 1 | |
| day_total[day] += 1 | |
| total_events = sum(channel_total.values()) | |
| patterns = [] | |
| for channel, day_counts in channel_day_counts.items(): | |
| for day, count in day_counts.items(): | |
| # Calculate confidence: P(day | channel) | |
| confidence = count / channel_total[channel] if channel_total[channel] > 0 else 0 | |
| # Calculate lift: how much more likely vs random | |
| expected = (channel_total[channel] * day_total[day]) / total_events if total_events > 0 else 0 | |
| lift = count / expected if expected > 0 else 0 | |
| # Strong pattern: confidence > 30% AND lift > 1.5 AND at least 5 occurrences | |
| if confidence >= 0.30 and lift >= 1.5 and count >= 5: | |
| day_name = day_names[day] if 0 <= day < 7 else "Unknown" | |
| patterns.append({ | |
| "type": "channel_day", | |
| "channel": channel, | |
| "day": day_name, | |
| "confidence": confidence, | |
| "lift": lift, | |
| "count": count, | |
| "insight": f"You watch **{channel}** on **{day_name}s** ({int(confidence*100)}% of the time)" | |
| }) | |
| return patterns | |
| def find_channel_time_patterns(watch_events: List[Dict], time_slots: Dict, top_channels: set) -> List[Dict]: | |
| """Find channels associated with specific time slots (top channels only).""" | |
| def get_time_slot(hour: int) -> Optional[str]: | |
| for slot_name, (start, end) in time_slots.items(): | |
| if slot_name == "late night": | |
| if hour >= 0 and hour < 5: | |
| return slot_name | |
| elif start <= hour < end: | |
| return slot_name | |
| return None | |
| # Count channel occurrences per time slot | |
| channel_slot_counts = defaultdict(lambda: defaultdict(int)) | |
| channel_total = defaultdict(int) | |
| slot_total = defaultdict(int) | |
| for e in watch_events: | |
| channel = e.get("channel_clean") | |
| hour = e.get("hour_local") | |
| if channel and hour is not None and channel in top_channels: | |
| slot = get_time_slot(hour) | |
| if slot: | |
| channel_slot_counts[channel][slot] += 1 | |
| channel_total[channel] += 1 | |
| slot_total[slot] += 1 | |
| total_events = sum(slot_total.values()) | |
| patterns = [] | |
| for channel, slot_counts in channel_slot_counts.items(): | |
| for slot, count in slot_counts.items(): | |
| confidence = count / channel_total[channel] if channel_total[channel] > 0 else 0 | |
| expected = (channel_total[channel] * slot_total[slot]) / total_events if total_events > 0 else 0 | |
| lift = count / expected if expected > 0 else 0 | |
| # Strong pattern: confidence > 40% AND lift > 1.5 AND at least 5 occurrences | |
| if confidence >= 0.40 and lift >= 1.5 and count >= 5: | |
| time_phrase = { | |
| "morning": "in the mornings", | |
| "afternoon": "in the afternoons", | |
| "evening": "in the evenings", | |
| "night": "at night", | |
| "late night": "during late nights" | |
| }.get(slot, slot) | |
| patterns.append({ | |
| "type": "channel_time", | |
| "channel": channel, | |
| "time_slot": slot, | |
| "confidence": confidence, | |
| "lift": lift, | |
| "count": count, | |
| "insight": f"**{channel}** is your **{slot}** go-to ({int(confidence*100)}%)" | |
| }) | |
| return patterns | |
| def find_weekend_pattern(watch_events: List[Dict], top_channels: set) -> Optional[Dict]: | |
| """Find if watching behavior differs significantly on weekends (top channels only).""" | |
| weekday_channels = defaultdict(int) | |
| weekend_channels = defaultdict(int) | |
| for e in watch_events: | |
| channel = e.get("channel_clean") | |
| day = e.get("day_of_week") | |
| if channel and day is not None and channel in top_channels: | |
| if day in [5, 6]: # Saturday, Sunday | |
| weekend_channels[channel] += 1 | |
| else: | |
| weekday_channels[channel] += 1 | |
| # Find channels that are predominantly weekend | |
| weekend_exclusive = [] | |
| for channel, weekend_count in weekend_channels.items(): | |
| weekday_count = weekday_channels.get(channel, 0) | |
| total = weekend_count + weekday_count | |
| if total >= 10: # Need at least 10 total watches | |
| weekend_ratio = weekend_count / total | |
| if weekend_ratio >= 0.6: # 60%+ on weekends | |
| weekend_exclusive.append((channel, weekend_ratio, total)) | |
| if weekend_exclusive: | |
| top = max(weekend_exclusive, key=lambda x: x[2]) # Most watched | |
| return { | |
| "type": "weekend_preference", | |
| "channel": top[0], | |
| "confidence": top[1], | |
| "count": top[2], | |
| "insight": f"**{top[0]}** is your **weekend** channel ({int(top[1]*100)}% weekend views)" | |
| } | |
| return None | |