Spaces:
Sleeping
Sleeping
| """ | |
| data.py — ExecAssist Environment Data & Scoring | |
| Contains: | |
| - Scenario templates for easy/medium/hard tasks | |
| - Reward functions (email quality, scheduling correctness, conflict resolution) | |
| - Anti-reward hacking penalties | |
| - Helper functions for time/calendar logic | |
| """ | |
| import random | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| from openai import OpenAI | |
| # ============================================================ | |
| # TASK DEFINITIONS | |
| # ============================================================ | |
| TASK_DEFINITIONS = { | |
| "easy": { | |
| "description": ( | |
| "Simple meeting request with clear calendar availability. " | |
| "Draft professional reply and book the meeting." | |
| ), | |
| "action_required": "Send email confirmation and book meeting in available slot", | |
| "reward_weights": { | |
| "email": 0.5, | |
| "scheduling": 0.5, | |
| "conflict": 0.0, | |
| }, | |
| }, | |
| "medium": { | |
| "description": ( | |
| "Scheduling conflict — requested time is already booked. " | |
| "Identify conflict, propose 2-3 alternatives, explain professionally." | |
| ), | |
| "action_required": "Send email with alternative times and explain conflict", | |
| "reward_weights": { | |
| "email": 0.3, | |
| "scheduling": 0.3, | |
| "conflict": 0.4, | |
| }, | |
| }, | |
| "hard": { | |
| "description": ( | |
| "Multi-party coordination with priority conflicts. " | |
| "3 emails requesting meetings, prioritize and reschedule." | |
| ), | |
| "action_required": "Coordinate multiple meetings, prioritize, and reschedule", | |
| "reward_weights": { | |
| "email": 0.34, | |
| "scheduling": 0.33, | |
| "conflict": 0.33, | |
| }, | |
| }, | |
| } | |
| # ============================================================ | |
| # SCENARIO DATA POOLS | |
| # ============================================================ | |
| MEETING_TOPICS = [ | |
| "Q2 roadmap review", | |
| "Budget planning session", | |
| "Project status update", | |
| "Team sync", | |
| "1-on-1 check-in", | |
| "Client presentation prep", | |
| "Sprint retrospective", | |
| "Product demo", | |
| "Strategy discussion", | |
| "Performance review", | |
| ] | |
| SENDER_NAMES = [ | |
| ("John Smith", "john.smith@company.com"), | |
| ("Sarah Johnson", "sarah.johnson@company.com"), | |
| ("Michael Chen", "michael.chen@company.com"), | |
| ("Emily Rodriguez", "emily.rodriguez@company.com"), | |
| ("David Kim", "david.kim@company.com"), | |
| ("Lisa Wang", "lisa.wang@company.com"), | |
| ("James Anderson", "james.anderson@company.com"), | |
| ("Maria Garcia", "maria.garcia@company.com"), | |
| ] | |
| # ============================================================ | |
| # SCENARIO GENERATION | |
| # ============================================================ | |
| def generate_scenario(task: str, seed: int = None) -> dict: | |
| """ | |
| Generate a scenario for the given task difficulty. | |
| Returns dict with: | |
| - id: scenario identifier | |
| - emails: list of email objects | |
| - calendar: calendar state with existing meetings | |
| - contacts: contact information | |
| - expected_behavior: what agent should do | |
| - has_conflict: True if scheduling conflict exists | |
| """ | |
| if seed is not None: | |
| rng = random.Random(seed) | |
| else: | |
| rng = random.Random() | |
| if task == "easy": | |
| return _generate_easy_scenario(rng) | |
| elif task == "medium": | |
| return _generate_medium_scenario(rng) | |
| elif task == "hard": | |
| return _generate_hard_scenario(rng) | |
| else: | |
| raise ValueError(f"Unknown task: {task}") | |
| def _generate_easy_scenario(rng: random.Random) -> dict: | |
| """Generate simple meeting request with clear availability.""" | |
| sender_name, sender_email = rng.choice(SENDER_NAMES) | |
| topic = rng.choice(MEETING_TOPICS) | |
| base_date = datetime(2026, 4, 28, 9, 0) # Monday 9 AM | |
| existing_meetings = [ | |
| { | |
| "id": "mtg_001", | |
| "participants": ["alex.chen@company.com", "team@company.com"], | |
| "start_time": (base_date + timedelta(hours=1)).isoformat(), | |
| "end_time": (base_date + timedelta(hours=2)).isoformat(), | |
| "subject": "Team standup", | |
| "priority": "normal", | |
| }, | |
| { | |
| "id": "mtg_002", | |
| "participants": ["alex.chen@company.com", "client@external.com"], | |
| "start_time": (base_date + timedelta(days=1, hours=5)).isoformat(), | |
| "end_time": (base_date + timedelta(days=1, hours=6, minutes=30)).isoformat(), | |
| "subject": "Client call", | |
| "priority": "high", | |
| }, | |
| ] | |
| email_body = f"Hi Alex,\n\nCan we meet sometime next week to discuss {topic.lower()}? 30 minutes should be enough. I'm flexible on timing.\n\nBest,\n{sender_name}" | |
| return { | |
| "id": "easy_001", | |
| "task": "easy", | |
| "emails": [ | |
| { | |
| "sender": sender_email, | |
| "subject": f"Meeting request: {topic}", | |
| "body": email_body, | |
| "timestamp": datetime.now().isoformat(), | |
| "priority": "normal", | |
| } | |
| ], | |
| "calendar": { | |
| "existing_meetings": existing_meetings, | |
| "working_hours": { | |
| "monday": "9-17", | |
| "tuesday": "9-17", | |
| "wednesday": "9-17", | |
| "thursday": "9-17", | |
| "friday": "9-16", | |
| }, | |
| "executive_name": "Alex Chen", | |
| }, | |
| "contacts": { | |
| sender_email: { | |
| "name": sender_name, | |
| "email": sender_email, | |
| "timezone": "America/Los_Angeles", | |
| "title": "Senior Manager", | |
| } | |
| }, | |
| "expected_behavior": "Book meeting in open slot", | |
| "has_conflict": False, | |
| } | |
| def _generate_medium_scenario(rng: random.Random) -> dict: | |
| """Generate scenario with scheduling conflict.""" | |
| sender_name, sender_email = rng.choice(SENDER_NAMES) | |
| topic = rng.choice(MEETING_TOPICS) | |
| base_date = datetime(2026, 4, 28, 9, 0) | |
| # Conflict: Monday 2-4 PM is already booked | |
| conflict_start = base_date + timedelta(hours=5) | |
| conflict_end = base_date + timedelta(hours=7) | |
| existing_meetings = [ | |
| { | |
| "id": "mtg_001", | |
| "participants": ["alex.chen@company.com", "board@company.com"], | |
| "start_time": conflict_start.isoformat(), | |
| "end_time": conflict_end.isoformat(), | |
| "subject": "Board meeting", | |
| "priority": "high", | |
| }, | |
| { | |
| "id": "mtg_002", | |
| "participants": ["alex.chen@company.com", "manager@company.com"], | |
| "start_time": (base_date + timedelta(days=1, hours=0)).isoformat(), | |
| "end_time": (base_date + timedelta(days=1, hours=1)).isoformat(), | |
| "subject": "1-on-1 with manager", | |
| "priority": "normal", | |
| }, | |
| ] | |
| email_body = f"Hi Alex,\n\nWe need to discuss {topic.lower()}. I'm available Monday 2-4pm or Tuesday morning. Can we make this work? It's fairly urgent.\n\nThanks,\n{sender_name}" | |
| return { | |
| "id": "medium_001", | |
| "task": "medium", | |
| "emails": [ | |
| { | |
| "sender": sender_email, | |
| "subject": f"Urgent: {topic}", | |
| "body": email_body, | |
| "timestamp": datetime.now().isoformat(), | |
| "priority": "high", | |
| } | |
| ], | |
| "calendar": { | |
| "existing_meetings": existing_meetings, | |
| "working_hours": { | |
| "monday": "9-17", | |
| "tuesday": "9-17", | |
| "wednesday": "9-17", | |
| "thursday": "9-17", | |
| "friday": "9-16", | |
| }, | |
| "executive_name": "Alex Chen", | |
| }, | |
| "contacts": { | |
| sender_email: { | |
| "name": sender_name, | |
| "email": sender_email, | |
| "timezone": "America/Los_Angeles", | |
| "title": "Director", | |
| } | |
| }, | |
| "expected_behavior": "Identify conflict, propose Tuesday 10-11 AM as alternative", | |
| "has_conflict": True, | |
| } | |
| def _generate_hard_scenario(rng: random.Random) -> dict: | |
| """Generate multi-party coordination scenario with 3 emails and priority conflicts.""" | |
| senders = rng.sample(SENDER_NAMES, 3) | |
| topics = rng.sample(MEETING_TOPICS, 3) | |
| base_date = datetime(2026, 4, 28, 9, 0) # Monday 9 AM | |
| # Existing calendar — Monday 2-3 PM blocked with team sync | |
| existing_meetings = [ | |
| { | |
| "id": "mtg_001", | |
| "participants": ["alex.chen@company.com", "team@company.com"], | |
| "start_time": (base_date + timedelta(hours=5)).isoformat(), # Monday 2 PM | |
| "end_time": (base_date + timedelta(hours=6)).isoformat(), # Monday 3 PM | |
| "subject": "Team sync", | |
| "priority": "normal", | |
| }, | |
| { | |
| "id": "mtg_002", | |
| "participants": ["alex.chen@company.com", "exec@company.com"], | |
| "start_time": (base_date + timedelta(days=2, hours=2)).isoformat(), # Wed 11 AM | |
| "end_time": (base_date + timedelta(days=2, hours=3)).isoformat(), # Wed 12 PM | |
| "subject": "Executive review", | |
| "priority": "high", | |
| }, | |
| ] | |
| # Three competing email requests | |
| emails = [ | |
| { | |
| "sender": senders[0][1], | |
| "subject": f"Meeting: {topics[0]}", | |
| "body": ( | |
| f"Hi Alex,\n\nCan we meet Monday 2:30-3:30 PM to discuss {topics[0].lower()}? " | |
| f"I'd really appreciate your input.\n\nThanks,\n{senders[0][0]}" | |
| ), | |
| "timestamp": datetime.now().isoformat(), | |
| "priority": "normal", | |
| }, | |
| { | |
| "sender": senders[1][1], | |
| "subject": f"URGENT: {topics[1]}", | |
| "body": ( | |
| f"Alex,\n\nWe need to discuss {topics[1].lower()} ASAP. " | |
| f"Monday afternoon works for me — ideally 2-3 PM. " | |
| f"This is time-sensitive and high priority.\n\nBest,\n{senders[1][0]}" | |
| ), | |
| "timestamp": datetime.now().isoformat(), | |
| "priority": "high", | |
| }, | |
| { | |
| "sender": senders[2][1], | |
| "subject": f"{topics[2]} discussion", | |
| "body": ( | |
| f"Hi Alex,\n\nCan we sync on {topics[2].lower()} sometime this week? " | |
| f"I'm flexible — any 30-minute slot works for me.\n\nThanks,\n{senders[2][0]}" | |
| ), | |
| "timestamp": datetime.now().isoformat(), | |
| "priority": "normal", | |
| }, | |
| ] | |
| contacts = { | |
| sender[1]: { | |
| "name": sender[0], | |
| "email": sender[1], | |
| "timezone": "America/Los_Angeles", | |
| "title": "Manager", | |
| } | |
| for sender in senders | |
| } | |
| return { | |
| "id": "hard_001", | |
| "task": "hard", | |
| "emails": emails, | |
| "calendar": { | |
| "existing_meetings": existing_meetings, | |
| "working_hours": { | |
| "monday": "9-17", | |
| "tuesday": "9-17", | |
| "wednesday": "9-17", | |
| "thursday": "9-17", | |
| "friday": "9-16", | |
| }, | |
| "executive_name": "Alex Chen", | |
| }, | |
| "contacts": contacts, | |
| "expected_behavior": ( | |
| "Prioritize URGENT email (sender 2). Book that meeting. " | |
| "Propose alternatives to sender 1 (conflicts with urgent). " | |
| "Offer flexible times to sender 3." | |
| ), | |
| "has_conflict": True, | |
| } | |
| # ============================================================ | |
| # REWARD FUNCTION 1: EMAIL QUALITY | |
| # ============================================================ | |
| def compute_email_quality(reply: str, scenario: dict) -> float: | |
| """ | |
| Score email quality using rule-based checks + LLM judge. | |
| Returns score 0.0 to 1.0. | |
| Components: | |
| - Politeness (15%) | |
| - Greeting/closing (10%) | |
| - Sufficient detail (15%) | |
| - Not overly uncertain (10%) | |
| - Professional tone (10%) | |
| - LLM judge for nuance (40%) | |
| """ | |
| score = 0.0 | |
| reply_lower = reply.lower() | |
| # Rule 1: Politeness markers (15%) | |
| if any(phrase in reply_lower for phrase in ["thank you", "thanks", "appreciate"]): | |
| score += 0.15 | |
| # Rule 2: Proper greeting (5%) and closing (5%) | |
| if any(greeting in reply_lower for greeting in ["hi ", "hello", "dear"]): | |
| score += 0.05 | |
| if any(closing in reply_lower for closing in ["best", "regards", "sincerely", "thanks,"]): | |
| score += 0.05 | |
| # Rule 3: Sufficient detail (15%) | |
| word_count = len(reply.split()) | |
| if word_count >= 20: | |
| score += 0.15 | |
| elif word_count >= 10: | |
| score += 0.08 | |
| # Rule 4: Not overly uncertain (10%) | |
| question_marks = reply.count("?") | |
| if question_marks <= 2: | |
| score += 0.10 | |
| # Rule 5: Professional tone — no negative phrases (10%) | |
| negative_phrases = ["can't", "won't", "impossible", "sorry but no", "unfortunately not", "no way"] | |
| if not any(neg in reply_lower for neg in negative_phrases): | |
| score += 0.10 | |
| # Rule 6: LLM-as-judge for nuance (40%) | |
| llm_score = _llm_judge_professionalism(reply) | |
| score += llm_score * 0.40 | |
| return min(1.0, score) | |
| def _llm_judge_professionalism(reply: str) -> float: | |
| """ | |
| LLM-as-judge for email professionalism using OpenRouter API. | |
| Falls back to heuristic if API unavailable. | |
| """ | |
| api_key = os.getenv("HFTOKEN") or os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| # Fallback if no API key | |
| if not api_key: | |
| # Simple heuristic fallback | |
| sentences = [s.strip() for s in reply.split('.') if s.strip()] | |
| if len(sentences) >= 2 and len(reply) >= 50: | |
| return 0.7 | |
| return 0.4 | |
| try: | |
| client = OpenAI( | |
| base_url=os.getenv("APIBASEURL") or os.getenv("API_BASE_URL", "https://openrouter.ai/api/v1"), | |
| api_key=api_key, | |
| ) | |
| prompt = f"""Rate the professionalism of this email reply on a scale of 0.0 to 1.0. | |
| Email reply: | |
| \"\"\"{reply}\"\"\" | |
| Criteria: | |
| - Clear and concise | |
| - Professional tone | |
| - No typos or grammar errors | |
| - Appropriate level of formality | |
| - Addresses the request directly | |
| Respond with ONLY a single decimal number between 0.0 and 1.0. No explanation, just the number.""" | |
| response = client.chat.completions.create( | |
| model=os.getenv("MODELNAME") or os.getenv("MODEL_NAME", "nvidia/nemotron-3-super-120b-a12b:free"), | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.1, | |
| max_tokens=10, | |
| ) | |
| score_text = response.choices[0].message.content.strip() | |
| # Extract just the number | |
| for token in score_text.split(): | |
| try: | |
| score = float(token) | |
| return max(0.0, min(1.0, score)) | |
| except ValueError: | |
| continue | |
| return 0.5 | |
| except Exception as e: | |
| print(f"LLM judge error: {e}") | |
| # Fallback heuristic | |
| sentences = [s.strip() for s in reply.split('.') if s.strip()] | |
| if len(sentences) >= 2 and len(reply) >= 50: | |
| return 0.7 | |
| return 0.4 | |
| # ============================================================ | |
| # REWARD FUNCTION 2: SCHEDULING CORRECTNESS | |
| # ============================================================ | |
| def check_scheduling_correctness(meeting_details: Optional[dict], scenario: dict) -> dict: | |
| """ | |
| Verify scheduling correctness with hard checks. | |
| """ | |
| # DEBUG: Print what we received | |
| print("=== DEBUG check_scheduling_correctness ===") | |
| print(f"meeting_details: {meeting_details}") | |
| print(f"scenario keys: {scenario.keys() if scenario else 'None'}") | |
| print(f"calendar: {scenario.get('calendar') if scenario else 'None'}") | |
| print("==========================================") | |
| if not meeting_details: | |
| return { | |
| "checks": { | |
| "meeting_provided": False, | |
| "no_double_booking": False, | |
| "within_working_hours": False, | |
| "appropriate_duration": False, | |
| }, | |
| "score": 0.0, | |
| } | |
| calendar = scenario["calendar"] | |
| existing_meetings = calendar["existing_meetings"] | |
| results = { | |
| "meeting_provided": True, | |
| "no_double_booking": True, | |
| "within_working_hours": True, | |
| "appropriate_duration": True, | |
| } | |
| # Parse meeting times | |
| try: | |
| meeting_start = datetime.fromisoformat(meeting_details["start_time"]) | |
| meeting_end = datetime.fromisoformat(meeting_details["end_time"]) | |
| except (KeyError, ValueError, TypeError): | |
| return { | |
| "checks": { | |
| "meeting_provided": True, | |
| "no_double_booking": False, | |
| "within_working_hours": False, | |
| "appropriate_duration": False, | |
| }, | |
| "score": 0.25, # Some credit for trying | |
| } | |
| # Check 1: No double booking | |
| for existing in existing_meetings: | |
| try: | |
| existing_start = datetime.fromisoformat(existing["start_time"]) | |
| existing_end = datetime.fromisoformat(existing["end_time"]) | |
| # Check for overlap | |
| if not (meeting_end <= existing_start or meeting_start >= existing_end): | |
| results["no_double_booking"] = False | |
| break | |
| except (KeyError, ValueError): | |
| continue | |
| # Check 2: Within working hours (9 AM - 5 PM) | |
| if meeting_start.hour < 9 or meeting_end.hour > 17: | |
| results["within_working_hours"] = False | |
| if meeting_end.hour == 17 and meeting_end.minute > 0: | |
| results["within_working_hours"] = False | |
| # Check 3: Appropriate duration (15 min to 2 hours) | |
| duration_minutes = (meeting_end - meeting_start).total_seconds() / 60 | |
| if not (15 <= duration_minutes <= 120): | |
| results["appropriate_duration"] = False | |
| # Compute overall score | |
| score = sum(results.values()) / len(results) | |
| return { | |
| "checks": results, | |
| "score": score, | |
| } | |
| # ============================================================ | |
| # REWARD FUNCTION 3: CONFLICT RESOLUTION | |
| # ============================================================ | |
| def compute_conflict_resolution(action: dict, scenario: dict) -> float: | |
| """ | |
| Score how well the agent handled scheduling conflicts. | |
| Returns score 0.0 to 1.0. | |
| """ | |
| has_conflict = scenario.get("has_conflict", False) | |
| calendar_action = action.get("calendar_action", "") | |
| email_reply = action.get("email_reply", "") | |
| meeting_details = action.get("meeting_details") or {} | |
| score = 0.0 | |
| if has_conflict: | |
| # Agent should recognize the conflict | |
| if calendar_action in ["propose_alternatives", "reschedule"]: | |
| score += 0.4 | |
| elif calendar_action == "book": | |
| # Check if they at least booked at a non-conflicting time | |
| score += 0.1 | |
| # Check if alternatives were provided | |
| alternatives = meeting_details.get("proposed_alternatives", []) or [] | |
| if alternatives: | |
| num_alternatives = len(alternatives) | |
| score += min(0.4, num_alternatives * 0.2) # 2 alts = 0.4, 3+ = capped at 0.4 | |
| # Check if email mentions the conflict | |
| conflict_keywords = ["conflict", "already booked", "unavailable", "scheduled", "occupied", "another meeting"] | |
| if any(word in email_reply.lower() for word in conflict_keywords): | |
| score += 0.2 | |
| else: | |
| # No conflict — agent should just book | |
| if calendar_action == "book": | |
| score = 1.0 | |
| elif calendar_action == "propose_alternatives": | |
| score = 0.5 # Partial credit | |
| else: | |
| score = 0.3 | |
| return min(1.0, score) | |
| # ============================================================ | |
| # ANTI-REWARD HACKING: PENALTIES | |
| # ============================================================ | |
| def apply_penalties(action: dict, scenario: dict) -> float: | |
| """ | |
| Detect and penalize reward hacking behaviors. | |
| Returns penalty amount (0.0 = no penalty, higher = worse). | |
| """ | |
| penalty = 0.0 | |
| email_reply = action.get("email_reply", "") | |
| calendar_action = action.get("calendar_action", "") | |
| meeting_details = action.get("meeting_details") | |
| # Penalty 1: Email too short (lazy response) | |
| if len(email_reply.strip()) < 30: | |
| penalty += 0.3 | |
| # Penalty 2: Claimed to book but no details provided | |
| if calendar_action == "book" and not meeting_details: | |
| penalty += 0.4 | |
| # Penalty 3: Generic templated phrases | |
| generic_phrases = [ | |
| "as per your request", | |
| "please find attached", | |
| "hope this helps", | |
| "let me know if you have any questions", | |
| "do not hesitate to contact", | |
| ] | |
| if any(phrase in email_reply.lower() for phrase in generic_phrases): | |
| penalty += 0.10 | |
| # Penalty 4: Overly long email (rambling) | |
| if len(email_reply.split()) > 200: | |
| penalty += 0.15 | |
| # Penalty 5: Repeating the same content multiple times | |
| words = email_reply.lower().split() | |
| if len(words) > 20: | |
| word_diversity = len(set(words)) / len(words) | |
| if word_diversity < 0.4: # Less than 40% unique words = repetitive | |
| penalty += 0.20 | |
| return min(1.0, penalty) | |
| # ============================================================ | |
| # HELPER FUNCTIONS | |
| # ============================================================ | |
| def parse_time_slot(time_str: str) -> Optional[datetime]: | |
| """Parse ISO time string to datetime object.""" | |
| try: | |
| return datetime.fromisoformat(time_str) | |
| except (ValueError, TypeError): | |
| return None | |
| def format_time_slot(dt: datetime) -> str: | |
| """Format datetime to readable string.""" | |
| return dt.strftime("%A, %B %d at %I:%M %p") | |