import multiprocessing import uvicorn from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import gradio as gr import json import logging from core.environment import EmailOpsEnv from core.models import Action # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="OpenEnv - EmailOps API") env = EmailOpsEnv() # --- FastAPI Endpoints --- @app.post("/reset") async def reset(request: Request): """Reset the environment with a specific task.""" try: data = await request.json() task_id = data.get("task_id", "easy") obs = env.reset(task_id) logger.info(f"Environment reset with task: {task_id}") return obs.model_dump() except Exception as e: logger.error(f"Error resetting environment: {e}") return JSONResponse(status_code=500, content={"detail": str(e)}) @app.post("/step") async def step(request: Request): """Take a step in the environment.""" try: action_data = await request.json() action = Action(**action_data) obs, reward, done, metrics = env.step(action) return { "obs": obs.model_dump(), "reward": reward, "done": done, "metrics": metrics } except Exception as e: logger.error(f"Error stepping environment: {e}") return JSONResponse(status_code=500, content={"detail": str(e)}) @app.get("/state") async def state(): """Get the current state of the environment.""" return env.state().model_dump() # --- Gradio UI Logic --- def initialize_ui(task_name): obs = env.reset(task_name) return ( f"Task loaded: {task_name.upper()}\n{env.task.description}", json.dumps(obs.model_dump(), indent=2), "0.0", str(env.metrics) ) def step_env_ui(action_type, email_id, folder_name, reply_body): action_dict = { "action_type": action_type, "email_id": email_id if email_id else None, "folder_name": folder_name if folder_name else None, "reply_body": reply_body if reply_body else None } action = Action(**action_dict) obs, score, done, metrics = env.step(action) return ( json.dumps(obs.model_dump(), indent=2), f"{score}", str(metrics), "Completed" if done else "In Progress" ) with gr.Blocks(title="OpenEnv - EmailOps Dashboard") as demo: gr.Markdown("# Email Triage & Operations (OpenEnv)") gr.Markdown("Interactive UI for monitoring and testing the EmailOps environment.") with gr.Row(): with gr.Column(): task_dropdown = gr.Dropdown(choices=["easy", "medium", "hard"], value="easy", label="Select Task") init_btn = gr.Button("Initialize / Reset Environment") task_desc = gr.Textbox(label="Task Description", lines=2) gr.Markdown("### Manual Action Overrides") act_type = gr.Dropdown( choices=["open_email", "close_email", "move_email", "reply", "delete_email", "flag_email", "submit"], value="open_email", label="Action Type" ) email_id = gr.Textbox(label="Email ID (optional)") folder_name = gr.Textbox(label="Folder Name (optional, for move)") reply_body = gr.Textbox(label="Reply Body (optional, for reply)") step_btn = gr.Button("Step Environment") with gr.Column(): gr.Markdown("### Observation & Reward") observation_display = gr.Code(label="Current Observation", language="json") with gr.Row(): score_display = gr.Textbox(label="Reward Score") status_display = gr.Textbox(label="Status") metrics_display = gr.Textbox(label="Metrics", lines=2) init_btn.click( fn=initialize_ui, inputs=[task_dropdown], outputs=[task_desc, observation_display, score_display, metrics_display] ) step_btn.click( fn=step_env_ui, inputs=[act_type, email_id, folder_name, reply_body], outputs=[observation_display, score_display, metrics_display, status_display] ) # Mount Gradio into FastAPI app = gr.mount_gradio_app(app, demo, path="/") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)