from __future__ import annotations from backend.ai.classifier import classify_email from backend.ai.composer import compose_reply from backend.ai.extractor import extract_email_data def email_received(trigger: dict) -> dict: return trigger def schedule(trigger: dict) -> dict: return trigger def file_uploaded(trigger: dict) -> dict: return trigger def manual_trigger(trigger: dict) -> dict: return trigger def ai_extract(params: dict, runtime: dict) -> dict: email = runtime["trigger"]["email"] return extract_email_data(email["from"], email.get("subject", ""), email.get("body", "")) def ai_classify(params: dict, runtime: dict) -> dict: email = runtime["trigger"]["email"] return classify_email(email["from"], email.get("subject", ""), email.get("body", ""), runtime["owner"].get("business_description", "")) def ai_compose(params: dict, runtime: dict) -> dict: context = _resolve_value(params.get("context"), runtime) return {"reply_body": compose_reply(context if isinstance(context, dict) else {"customer_name": "there"})} def sheets_read(params: dict, runtime: dict) -> dict: sheet_name = params["sheet_name"] return {"rows": runtime["db"].read_sheet(runtime["owner_id"], sheet_name)} def sheets_write(params: dict, runtime: dict) -> dict: data = _resolve_value(params["data"], runtime) sheet_name = params["sheet_name"] runtime["db"].append_sheet_row(runtime["owner_id"], sheet_name, data) return {"written": data} def sheets_update(params: dict, runtime: dict) -> dict: runtime["db"].update_sheet_row( runtime["owner_id"], params["sheet_name"], params["lookup_column"], _resolve_value(params["lookup_value"], runtime), params["update_column"], _resolve_value(params["update_value"], runtime), ) return {"updated": True} def send_email(params: dict, runtime: dict) -> dict: message = { "to": _resolve_value(params["to"], runtime), "subject": params["subject"], "body": _resolve_value(params["body"], runtime), } runtime["db"].record_outbound_email(runtime["owner_id"], message) return message def notify_owner(params: dict, runtime: dict) -> dict: payload = { "message": params["message"], "severity": params.get("severity", "info"), "options": params.get("options", []), } runtime["db"].create_escalation(runtime["owner_id"], runtime["workflow_id"], runtime.get("execution_id", "pending"), payload) return payload def condition(params: dict, runtime: dict) -> dict: check = params["check"] if "needs_clarification" in check: extracted = runtime["step_results"].get("step_extract", {}) return {"next_step": params["if_false"] if extracted.get("needs_clarification") else params["if_true"]} return {"next_step": params["if_true"]} def loop(params: dict, runtime: dict) -> dict: items = _resolve_value(params["items"], runtime) return {"items": items} def wait_for_input(params: dict, runtime: dict) -> dict: return notify_owner( { "message": params["prompt"], "severity": "warning", "options": params["options"], }, runtime, ) def _resolve_value(value, runtime: dict): if isinstance(value, dict): return {key: _resolve_value(item, runtime) for key, item in value.items()} if isinstance(value, list): return [_resolve_value(item, runtime) for item in value] if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"): path = value[2:-2].strip().split(".") current = runtime for part in path: if part in current: current = current[part] elif part == "trigger": current = runtime["trigger"] elif part in runtime["step_results"]: current = runtime["step_results"][part] elif isinstance(current, dict) and part in current: current = current[part] elif part == "config": current = runtime["owner"] else: return value return current return value