Spaces:
Sleeping
Sleeping
| import json | |
| from backend.ai.client import llm_client, render_prompt | |
| from backend.ai.prompts import BUILD_WORKFLOW_PROMPT | |
| from backend.models.schemas import BuildWorkflowRequest, WorkflowDefinition, WorkflowStep | |
| def build_workflow_definition(request: BuildWorkflowRequest, owner: dict) -> dict: | |
| if llm_client.is_ready(): | |
| prompt = render_prompt( | |
| BUILD_WORKFLOW_PROMPT, | |
| selected_option=json.dumps(request.selected_option, indent=2), | |
| task_name=request.task_name, | |
| owner_email=owner.get("email", "owner@example.com"), | |
| spreadsheet_id=owner.get("spreadsheet_id", ""), | |
| inventory_sheet=owner.get("spreadsheet_config", {}).get("inventory_sheet", "Inventory"), | |
| orders_sheet=owner.get("spreadsheet_config", {}).get("orders_sheet", "Orders"), | |
| item_column="item", | |
| stock_column="stock", | |
| uploaded_data_summary=json.dumps(owner.get("uploaded_data_summary", []), indent=2), | |
| ) | |
| return llm_client.generate_json(prompt) | |
| trigger_type = "schedule" if "summary" in request.task_name.lower() else "email_received" | |
| steps: list[WorkflowStep] = [] | |
| if trigger_type == "email_received": | |
| steps = [ | |
| WorkflowStep( | |
| id="step_extract", | |
| action="ai_extract", | |
| params={ | |
| "input": "{{trigger.email.body}}", | |
| "extract_fields": ["customer_name", "items", "pickup_date", "needs_clarification"], | |
| "output_var": "order_data", | |
| }, | |
| on_error="notify_owner", | |
| ), | |
| WorkflowStep( | |
| id="step_read_inventory", | |
| action="sheets_read", | |
| params={ | |
| "spreadsheet_id": "{{config.spreadsheet_id}}", | |
| "sheet_name": "Inventory", | |
| "output_var": "inventory_rows", | |
| }, | |
| on_error="abort", | |
| ), | |
| WorkflowStep( | |
| id="step_check_stock", | |
| action="condition", | |
| params={ | |
| "check": "not {{step_extract.order_data.needs_clarification}}", | |
| "if_true": "step_write_order", | |
| "if_false": "step_notify_ambiguity", | |
| }, | |
| on_error="abort", | |
| ), | |
| WorkflowStep( | |
| id="step_write_order", | |
| action="sheets_write", | |
| params={ | |
| "spreadsheet_id": "{{config.spreadsheet_id}}", | |
| "sheet_name": "Orders", | |
| "data": { | |
| "customer": "{{step_extract.order_data.customer_name}}", | |
| "items": "{{step_extract.order_data.items}}", | |
| "priority": "high" if "restaurant" in request.task_name.lower() else "normal", | |
| }, | |
| }, | |
| on_error="notify_owner", | |
| ), | |
| WorkflowStep( | |
| id="step_compose_reply", | |
| action="ai_compose", | |
| params={ | |
| "context": "{{step_extract.order_data}}", | |
| "tone": owner.get("preferred_tone", "friendly"), | |
| "output_var": "reply_body", | |
| }, | |
| on_error="notify_owner", | |
| ), | |
| WorkflowStep( | |
| id="step_send_email", | |
| action="send_email", | |
| params={ | |
| "to": "{{trigger.email.from}}", | |
| "subject": "Order confirmation", | |
| "body": "{{step_compose_reply.reply_body}}", | |
| }, | |
| on_error="notify_owner", | |
| ), | |
| WorkflowStep( | |
| id="step_notify_ambiguity", | |
| action="notify_owner", | |
| params={ | |
| "message": "This order needs clarification before FlowPilot can continue.", | |
| "severity": "warning", | |
| "options": ["Reply myself", "Ask customer for details"], | |
| }, | |
| on_error="skip", | |
| ), | |
| ] | |
| else: | |
| steps = [ | |
| WorkflowStep( | |
| id="step_collect_orders", | |
| action="sheets_read", | |
| params={ | |
| "spreadsheet_id": "{{config.spreadsheet_id}}", | |
| "sheet_name": "Orders", | |
| "output_var": "order_rows", | |
| }, | |
| on_error="abort", | |
| ), | |
| WorkflowStep( | |
| id="step_compose_summary", | |
| action="ai_compose", | |
| params={ | |
| "context": "{{step_collect_orders.order_rows}}", | |
| "tone": "helpful", | |
| "output_var": "summary_body", | |
| }, | |
| on_error="notify_owner", | |
| ), | |
| WorkflowStep( | |
| id="step_send_summary", | |
| action="send_email", | |
| params={ | |
| "to": owner.get("email", "owner@example.com"), | |
| "subject": "Weekly order summary", | |
| "body": "{{step_compose_summary.summary_body}}", | |
| }, | |
| on_error="notify_owner", | |
| ), | |
| ] | |
| workflow = WorkflowDefinition( | |
| name=request.task_name, | |
| description=request.selected_option.get("name", request.task_description), | |
| trigger={"type": trigger_type, "config": {"cron": "0 8 * * FRI"} if trigger_type == "schedule" else {}}, | |
| steps=steps, | |
| ) | |
| return workflow.model_dump() | |