import json from backend.ai.client import llm_client, render_prompt from backend.ai.prompts import BUILD_WORKFLOW_PROMPT from backend.models.schemas import BuildWorkflowRequest, WorkflowDefinition, WorkflowStep def build_workflow_definition(request: BuildWorkflowRequest, owner: dict) -> dict: if llm_client.is_ready(): prompt = render_prompt( BUILD_WORKFLOW_PROMPT, selected_option=json.dumps(request.selected_option, indent=2), task_name=request.task_name, owner_email=owner.get("email", "owner@example.com"), spreadsheet_id=owner.get("spreadsheet_id", ""), inventory_sheet=owner.get("spreadsheet_config", {}).get("inventory_sheet", "Inventory"), orders_sheet=owner.get("spreadsheet_config", {}).get("orders_sheet", "Orders"), item_column="item", stock_column="stock", uploaded_data_summary=json.dumps(owner.get("uploaded_data_summary", []), indent=2), ) return llm_client.generate_json(prompt) trigger_type = "schedule" if "summary" in request.task_name.lower() else "email_received" steps: list[WorkflowStep] = [] if trigger_type == "email_received": steps = [ WorkflowStep( id="step_extract", action="ai_extract", params={ "input": "{{trigger.email.body}}", "extract_fields": ["customer_name", "items", "pickup_date", "needs_clarification"], "output_var": "order_data", }, on_error="notify_owner", ), WorkflowStep( id="step_read_inventory", action="sheets_read", params={ "spreadsheet_id": "{{config.spreadsheet_id}}", "sheet_name": "Inventory", "output_var": "inventory_rows", }, on_error="abort", ), WorkflowStep( id="step_check_stock", action="condition", params={ "check": "not {{step_extract.order_data.needs_clarification}}", "if_true": "step_write_order", "if_false": "step_notify_ambiguity", }, on_error="abort", ), WorkflowStep( id="step_write_order", action="sheets_write", params={ "spreadsheet_id": "{{config.spreadsheet_id}}", "sheet_name": "Orders", "data": { "customer": "{{step_extract.order_data.customer_name}}", "items": "{{step_extract.order_data.items}}", "priority": "high" if "restaurant" in request.task_name.lower() else "normal", }, }, on_error="notify_owner", ), WorkflowStep( id="step_compose_reply", action="ai_compose", params={ "context": "{{step_extract.order_data}}", "tone": owner.get("preferred_tone", "friendly"), "output_var": "reply_body", }, on_error="notify_owner", ), WorkflowStep( id="step_send_email", action="send_email", params={ "to": "{{trigger.email.from}}", "subject": "Order confirmation", "body": "{{step_compose_reply.reply_body}}", }, on_error="notify_owner", ), WorkflowStep( id="step_notify_ambiguity", action="notify_owner", params={ "message": "This order needs clarification before FlowPilot can continue.", "severity": "warning", "options": ["Reply myself", "Ask customer for details"], }, on_error="skip", ), ] else: steps = [ WorkflowStep( id="step_collect_orders", action="sheets_read", params={ "spreadsheet_id": "{{config.spreadsheet_id}}", "sheet_name": "Orders", "output_var": "order_rows", }, on_error="abort", ), WorkflowStep( id="step_compose_summary", action="ai_compose", params={ "context": "{{step_collect_orders.order_rows}}", "tone": "helpful", "output_var": "summary_body", }, on_error="notify_owner", ), WorkflowStep( id="step_send_summary", action="send_email", params={ "to": owner.get("email", "owner@example.com"), "subject": "Weekly order summary", "body": "{{step_compose_summary.summary_body}}", }, on_error="notify_owner", ), ] workflow = WorkflowDefinition( name=request.task_name, description=request.selected_option.get("name", request.task_description), trigger={"type": trigger_type, "config": {"cron": "0 8 * * FRI"} if trigger_type == "schedule" else {}}, steps=steps, ) return workflow.model_dump()