email-Assistant-Using-Ai / inference.py
Gaurav3134's picture
Upload 43 files
0387a1c verified
#!/usr/bin/env python3
"""
Inference loop for Email Assistant OpenEnv.
Features:
- Connect to env locally via OpenEnv API.
- Run multiple tasks (easy, medium, hard).
- Query an LLM for each action (OpenAI), with safe fallback policy.
- Log step and episode level metrics.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
from typing import Any
from dotenv import load_dotenv
from openai import OpenAI
from app.openenv_env.models import Action, Observation
from client import EmailAssistantEnvClient
def configure_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
def build_prompt(observation: Observation) -> list[dict[str, str]]:
current = observation.current_email
recent = [r.action.model_dump() for r in observation.previous_actions[-5:]]
system = (
"You are an email assistant agent. Return a single JSON object for the next action. "
"The action must have a 'type' and a 'payload' object.\n"
"Available actions:\n"
"- type='classify', payload={'intent': 'Support|Sales|Spam|General', 'confidence': 0..1, 'reasoning': '...'}\n"
"- type='prioritize', payload={'message_id': '...'}\n"
"- type='reply', payload={'tone': 'formal|neutral|casual'}\n"
"- type='send', payload={'to_email': '...', 'subject': '...', 'body': '...'}\n"
"- type='skip', payload={'escalate': bool, 'reason': '...'}"
)
user = {
"current_email": {
"message_id": current.message_id,
"from_email": current.from_email,
"subject": current.subject,
"body": current.body,
},
"inbox_summary": [
{
"message_id": i.message_id,
"subject": i.subject,
"deadline": i.deadline_minutes,
"urgency": i.urgency_score,
"intent": i.predicted_intent,
"handled": i.handled
} for i in observation.inbox_summary
],
"recent_actions": recent,
}
return [
{"role": "system", "content": system},
{"role": "user", "content": json.dumps(user)},
]
def fallback_policy(observation: Observation) -> Action:
# Very simple deterministic logic for demonstration
email = observation.current_email
# Check if we have an intent for the current email
curr_summary = next((i for i in observation.inbox_summary if i.message_id == email.message_id), None)
if curr_summary and not curr_summary.predicted_intent:
return Action(type="classify", payload={}) # Trigger tool-driven classification
# If classified but not handled, try to reply
return Action(type="reply", payload={"tone": "formal"})
def action_from_llm(observation: Observation, llm: OpenAI | None, model: str) -> Action:
if llm is None:
return fallback_policy(observation)
try:
resp = llm.chat.completions.create(
model=model,
messages=build_prompt(observation),
temperature=0.2,
response_format={"type": "json_object"},
)
content = (resp.choices[0].message.content or "").strip()
payload = json.loads(content) if content else {}
# Ensure payload is wrapped properly if LLM only returns the payload
if "type" not in payload:
return fallback_policy(observation)
return Action(**payload)
except Exception as exc:
logging.warning("LLM action generation failed, using fallback policy: %s", exc)
return fallback_policy(observation)
def run_episode(env: Any, task_id: str, llm: OpenAI | None, model: str, max_steps: int) -> dict[str, Any]:
logging.info("Starting task: %s", task_id)
obs = env.reset(task_id=task_id)
total_reward = 0.0
done = False
steps = 0
while not done and steps < max_steps:
action = action_from_llm(obs, llm, model)
obs, reward, done, info = env.step(action)
total_reward += reward.value
steps += 1
logging.info(
"step=%d action=%s reward=%.3f done=%s",
steps,
action.type,
reward.value,
done,
)
return {
"task_id": task_id,
"steps": steps,
"done": done,
"total_reward": total_reward,
"info": info
}
def main() -> None:
parser = argparse.ArgumentParser(description="Run Email Assistant OpenEnv inference.")
parser.add_argument("--base-url", default="http://127.0.0.1:7860/openenv")
parser.add_argument("--episodes", type=int, default=1, help="Number of times to run each task")
parser.add_argument("--max-steps", type=int, default=10)
parser.add_argument("--model", default="gpt-4o-mini")
parser.add_argument("--task", default=None, help="Specific task ID to run (easy_classification, medium_prioritization, hard_workflow)")
args = parser.parse_args()
configure_logging()
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
if api_key and "your_openai_api_key_here" in api_key:
api_key = None
llm = OpenAI(api_key=api_key) if api_key else None
if llm is None:
logging.warning("Valid OPENAI_API_KEY not found; running with fallback policy.")
client = EmailAssistantEnvClient(base_url=args.base_url)
tasks = [args.task] if args.task else ["easy_classification", "medium_prioritization", "hard_workflow"]
all_results = []
with client.sync() as env:
for task_id in tasks:
for ep in range(args.episodes):
logging.info("=== Task %s | Episode %d/%d ===", task_id, ep + 1, args.episodes)
result = run_episode(env, task_id, llm, args.model, args.max_steps)
all_results.append(result)
avg_reward = sum(r["total_reward"] for r in all_results) / max(1, len(all_results))
logging.info("FINAL SUMMARY: episodes=%d average_reward=%.3f", len(all_results), avg_reward)
for r in all_results:
logging.info("Result: task=%s reward=%.3f done=%s", r["task_id"], r["total_reward"], r["done"])
if __name__ == "__main__":
main()