flow-pilot / backend /ai /workflow_builder.py
DevelopedBy-Siva
deploy to HF
fb38df2
import json
from backend.ai.client import llm_client, render_prompt
from backend.ai.prompts import BUILD_WORKFLOW_PROMPT
from backend.models.schemas import BuildWorkflowRequest, WorkflowDefinition, WorkflowStep
def build_workflow_definition(request: BuildWorkflowRequest, owner: dict) -> dict:
if llm_client.is_ready():
prompt = render_prompt(
BUILD_WORKFLOW_PROMPT,
selected_option=json.dumps(request.selected_option, indent=2),
task_name=request.task_name,
owner_email=owner.get("email", "owner@example.com"),
spreadsheet_id=owner.get("spreadsheet_id", ""),
inventory_sheet=owner.get("spreadsheet_config", {}).get("inventory_sheet", "Inventory"),
orders_sheet=owner.get("spreadsheet_config", {}).get("orders_sheet", "Orders"),
item_column="item",
stock_column="stock",
uploaded_data_summary=json.dumps(owner.get("uploaded_data_summary", []), indent=2),
)
return llm_client.generate_json(prompt)
trigger_type = "schedule" if "summary" in request.task_name.lower() else "email_received"
steps: list[WorkflowStep] = []
if trigger_type == "email_received":
steps = [
WorkflowStep(
id="step_extract",
action="ai_extract",
params={
"input": "{{trigger.email.body}}",
"extract_fields": ["customer_name", "items", "pickup_date", "needs_clarification"],
"output_var": "order_data",
},
on_error="notify_owner",
),
WorkflowStep(
id="step_read_inventory",
action="sheets_read",
params={
"spreadsheet_id": "{{config.spreadsheet_id}}",
"sheet_name": "Inventory",
"output_var": "inventory_rows",
},
on_error="abort",
),
WorkflowStep(
id="step_check_stock",
action="condition",
params={
"check": "not {{step_extract.order_data.needs_clarification}}",
"if_true": "step_write_order",
"if_false": "step_notify_ambiguity",
},
on_error="abort",
),
WorkflowStep(
id="step_write_order",
action="sheets_write",
params={
"spreadsheet_id": "{{config.spreadsheet_id}}",
"sheet_name": "Orders",
"data": {
"customer": "{{step_extract.order_data.customer_name}}",
"items": "{{step_extract.order_data.items}}",
"priority": "high" if "restaurant" in request.task_name.lower() else "normal",
},
},
on_error="notify_owner",
),
WorkflowStep(
id="step_compose_reply",
action="ai_compose",
params={
"context": "{{step_extract.order_data}}",
"tone": owner.get("preferred_tone", "friendly"),
"output_var": "reply_body",
},
on_error="notify_owner",
),
WorkflowStep(
id="step_send_email",
action="send_email",
params={
"to": "{{trigger.email.from}}",
"subject": "Order confirmation",
"body": "{{step_compose_reply.reply_body}}",
},
on_error="notify_owner",
),
WorkflowStep(
id="step_notify_ambiguity",
action="notify_owner",
params={
"message": "This order needs clarification before FlowPilot can continue.",
"severity": "warning",
"options": ["Reply myself", "Ask customer for details"],
},
on_error="skip",
),
]
else:
steps = [
WorkflowStep(
id="step_collect_orders",
action="sheets_read",
params={
"spreadsheet_id": "{{config.spreadsheet_id}}",
"sheet_name": "Orders",
"output_var": "order_rows",
},
on_error="abort",
),
WorkflowStep(
id="step_compose_summary",
action="ai_compose",
params={
"context": "{{step_collect_orders.order_rows}}",
"tone": "helpful",
"output_var": "summary_body",
},
on_error="notify_owner",
),
WorkflowStep(
id="step_send_summary",
action="send_email",
params={
"to": owner.get("email", "owner@example.com"),
"subject": "Weekly order summary",
"body": "{{step_compose_summary.summary_body}}",
},
on_error="notify_owner",
),
]
workflow = WorkflowDefinition(
name=request.task_name,
description=request.selected_option.get("name", request.task_description),
trigger={"type": trigger_type, "config": {"cron": "0 8 * * FRI"} if trigger_type == "schedule" else {}},
steps=steps,
)
return workflow.model_dump()