Spaces:
Sleeping
Sleeping
| # orchestrator.py | |
| import os | |
| import json | |
| import re | |
| from typing import Any, Dict, Tuple, Optional | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| from schemas import Plan, PlanStep, FetchEmailsParams | |
| from tools import TOOL_MAPPING | |
| # Load .env and initialize OpenAI client | |
| load_dotenv() | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("Missing OPENAI_API_KEY in environment") | |
| client = OpenAI(api_key=api_key) | |
| # File paths for name mapping | |
| NAME_MAPPING_FILE = "name_mapping.json" | |
| # === Hard-coded list of available actions === | |
| SYSTEM_PLAN_PROMPT = """ | |
| You are an email assistant agent. You have access to the following actions: | |
| β’ fetch_emails - fetch emails using text search with sender keywords and date extraction (e.g., "swiggy emails last week") | |
| β’ show_email - display specific email content | |
| β’ analyze_emails - analyze email patterns or content | |
| β’ draft_reply - create a reply to an email | |
| β’ send_reply - send a drafted reply | |
| β’ done - complete the task | |
| When the user gives you a query, output _only_ valid JSON of this form: | |
| { | |
| "plan": [ | |
| "fetch_emails", | |
| ..., | |
| "done" | |
| ] | |
| } | |
| Rules: | |
| - Use "fetch_emails" for text-based email search (automatically extracts sender keywords and dates) | |
| - The final entry _must_ be "done" | |
| - If no tool is needed, return `{"plan":["done"]}` | |
| Example: For "show me emails from swiggy today" β ["fetch_emails", "done"] | |
| """ | |
| SYSTEM_VALIDATOR_TEMPLATE = """ | |
| You are a plan validator. | |
| Context (results so far): | |
| {context} | |
| Next action: | |
| {action} | |
| Reply _only_ with JSON: | |
| {{ | |
| "should_execute": <true|false>, | |
| "parameters": <null or a JSON object with this action's parameters> | |
| }} | |
| """ | |
| def _load_name_mapping() -> Dict[str, str]: | |
| """Load name to email mapping from JSON file""" | |
| if not os.path.exists(NAME_MAPPING_FILE): | |
| return {} | |
| try: | |
| with open(NAME_MAPPING_FILE, "r") as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, IOError): | |
| return {} | |
| def _save_name_mapping(mapping: Dict[str, str]): | |
| """Save name to email mapping to JSON file""" | |
| with open(NAME_MAPPING_FILE, "w") as f: | |
| json.dump(mapping, f, indent=2) | |
| def store_name_email_mapping(name: str, email: str): | |
| """Store new name to email mapping""" | |
| name_mapping = _load_name_mapping() | |
| name_mapping[name.lower().strip()] = email.lower().strip() | |
| _save_name_mapping(name_mapping) | |
| def extract_sender_info(query: str) -> Dict: | |
| """ | |
| Extract sender information from user query using LLM | |
| """ | |
| system_prompt = """ | |
| You are an email query parser that extracts sender information. | |
| Given a user query, extract the sender intent - the person/entity they want emails from. | |
| This could be: | |
| - A person's name (e.g., "dev", "john smith", "dev agarwal") | |
| - A company/service (e.g., "amazon", "google", "linkedin") | |
| - An email address (e.g., "john@company.com") | |
| Examples: | |
| - "emails from dev agarwal last week" β "dev agarwal" | |
| - "show amazon emails from last month" β "amazon" | |
| - "emails from john@company.com yesterday" β "john@company.com" | |
| - "get messages from sarah" β "sarah" | |
| Return ONLY valid JSON: | |
| { | |
| "sender_intent": "extracted name, company, or email" | |
| } | |
| """ | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| temperature=0.0, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": query} | |
| ], | |
| ) | |
| result = json.loads(response.choices[0].message.content) | |
| return result | |
| def resolve_sender_email(sender_intent: str) -> Tuple[Optional[str], bool]: | |
| """ | |
| Resolve sender intent to actual email address | |
| Returns: (email_address, needs_user_input) | |
| """ | |
| # Check if it's already an email address | |
| if "@" in sender_intent: | |
| return sender_intent.lower(), False | |
| # Load name mapping | |
| name_mapping = _load_name_mapping() | |
| # Normalize the intent (lowercase for comparison) | |
| normalized_intent = sender_intent.lower().strip() | |
| # Check direct match | |
| if normalized_intent in name_mapping: | |
| return name_mapping[normalized_intent], False | |
| # Check partial matches (fuzzy matching) | |
| for name, email in name_mapping.items(): | |
| if normalized_intent in name.lower() or name.lower() in normalized_intent: | |
| return email, False | |
| # No match found | |
| return None, True | |
| def get_plan_from_llm(user_query: str) -> Plan: | |
| """ | |
| Ask the LLM which actions to run, in order. No parameters here. | |
| """ | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| temperature=0.0, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PLAN_PROMPT}, | |
| {"role": "user", "content": user_query}, | |
| ], | |
| ) | |
| plan_json = json.loads(response.choices[0].message.content) | |
| steps = [PlanStep(action=a) for a in plan_json["plan"]] | |
| return Plan(plan=steps) | |
| def think( | |
| step: PlanStep, | |
| context: Dict[str, Any], | |
| user_query: str | |
| ) -> Tuple[bool, Optional[PlanStep], Optional[str]]: | |
| """ | |
| Fill in parameters or skip based on the action: | |
| - fetch_emails: pass the raw query for text-based search and date extraction | |
| - others: ask the LLM validator for params | |
| Returns: (should_execute, updated_step, user_prompt_if_needed) | |
| """ | |
| # 1) fetch_emails β pass the full query for text-based search and date extraction | |
| if step.action == "fetch_emails": | |
| params = FetchEmailsParams( | |
| query=user_query # Pass the full query for keyword and date extraction | |
| ) | |
| return True, PlanStep(action="fetch_emails", parameters=params), None | |
| # 2) everything else β validate & supply params via LLM | |
| prompt = SYSTEM_VALIDATOR_TEMPLATE.format( | |
| context=json.dumps(context, indent=2), | |
| action=step.action, | |
| ) | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| temperature=0.0, | |
| messages=[ | |
| {"role": "system", "content": "Validate or supply parameters for this action."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| ) | |
| verdict = json.loads(response.choices[0].message.content) | |
| if not verdict.get("should_execute", False): | |
| return False, None, None | |
| return True, PlanStep( | |
| action=step.action, | |
| parameters=verdict.get("parameters") | |
| ), None | |
| def act(step: PlanStep) -> Any: | |
| """ | |
| Dispatch to the actual implementation in tools.py. | |
| """ | |
| fn = TOOL_MAPPING.get(step.action) | |
| if fn is None: | |
| raise ValueError(f"Unknown action '{step.action}'") | |
| kwargs = step.parameters.model_dump() if step.parameters else {} | |
| return fn(**kwargs) |