| """ |
| Substitute runtime intent parameters into recorded trajectory steps. |
| |
| Used by the Gradio preview and mirrored in Pocket Automator's ParameterBinder at replay time. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import copy |
| import json |
| from typing import Any |
|
|
| from src.paths import DATA_DIR |
|
|
| SKILL_SCHEMAS_PATH = DATA_DIR / "skill_schemas.json" |
|
|
| _SEARCH_INPUT_SUFFIXES = ( |
| "search_input", |
| "search_src_text", |
| "search_container", |
| "search_box", |
| "search_bar", |
| "search", |
| ) |
|
|
| _MESSAGE_BODY_SUFFIXES = ( |
| "entry", |
| "message", |
| "message_text", |
| ) |
|
|
| _COMPOSE_BODY_SUFFIXES = ( |
| "editor", |
| "body", |
| ) |
|
|
| _COMPOSE_EXCLUDED_SUFFIXES = ( |
| "subject", |
| "editor", |
| ) |
|
|
|
|
| def _is_compose_recipient_step(step: dict[str, Any]) -> bool: |
| action = step.get("action") or {} |
| if action.get("type") != "set_text": |
| return False |
| package_name = (step.get("packageName") or "").lower() |
| if "gm" not in package_name and "gmail" not in package_name: |
| return False |
| resource_id = action.get("resourceId") or "" |
| return not any(_suffix_matches(resource_id, suffix) for suffix in _COMPOSE_EXCLUDED_SUFFIXES) |
|
|
|
|
| def load_skill_schemas() -> dict[str, Any]: |
| return json.loads(SKILL_SCHEMAS_PATH.read_text(encoding="utf-8")) |
|
|
|
|
| def load_skill_schema(skill: str) -> dict[str, Any]: |
| schemas = load_skill_schemas() |
| if skill not in schemas: |
| raise KeyError(f"Unknown skill schema: {skill!r}") |
| return schemas[skill] |
|
|
|
|
| def _resource_id_suffix(resource_id: str | None) -> str: |
| if not resource_id: |
| return "" |
| return resource_id.rsplit("/", 1)[-1].lower() |
|
|
|
|
| def _suffix_matches(resource_id: str | None, suffix: str) -> bool: |
| if not suffix: |
| return False |
| needle = suffix.lower() |
| rid = _resource_id_suffix(resource_id) |
| return rid == needle or rid.endswith(needle) |
|
|
|
|
| def is_search_input_step(step: dict[str, Any]) -> bool: |
| action = step.get("action") or {} |
| if action.get("type") != "set_text": |
| return False |
| resource_id = action.get("resourceId") or "" |
| if any(_suffix_matches(resource_id, suffix) for suffix in _SEARCH_INPUT_SUFFIXES): |
| return True |
| package_name = (step.get("packageName") or "").lower() |
| class_name = (action.get("className") or "").lower() |
| return "spotify" in package_name and "edittext" in class_name |
|
|
|
|
| def find_preceding_search_step(steps: list[dict[str, Any]], before_index: int) -> dict[str, Any] | None: |
| for index in range(before_index - 1, -1, -1): |
| step = steps[index] |
| action = step.get("action") or {} |
| if action.get("type") != "set_text": |
| continue |
| value = action.get("value") |
| if not value or not str(value).strip(): |
| continue |
| if is_search_input_step(step): |
| return step |
| return None |
|
|
|
|
| def _binding_matches( |
| binding: dict[str, Any], |
| step: dict[str, Any], |
| steps: list[dict[str, Any]], |
| index: int, |
| ) -> bool: |
| action = step.get("action") or {} |
| if action.get("type") != binding.get("action"): |
| return False |
|
|
| resource_id_suffix = binding.get("resource_id_suffix") |
| if resource_id_suffix and not _suffix_matches(action.get("resourceId"), resource_id_suffix): |
| return False |
|
|
| if binding.get("after_search"): |
| return find_preceding_search_step(steps, index) is not None |
|
|
| package_contains = binding.get("package_contains") |
| if package_contains and package_contains.lower() not in (step.get("packageName") or "").lower(): |
| return False |
|
|
| if binding.get("compose_recipient"): |
| return _is_compose_recipient_step(step) |
|
|
| return True |
|
|
|
|
| def _apply_binding(action: dict[str, Any], binding: dict[str, Any], value: str) -> dict[str, Any]: |
| updated = dict(action) |
| if binding.get("action") == "set_text": |
| updated["value"] = value |
| elif binding.get("action") == "click": |
| updated["text"] = value |
| if binding.get("set_content_description"): |
| updated["contentDescription"] = value |
| return updated |
|
|
|
|
| def apply_parameters( |
| trajectory: dict[str, Any], |
| skill: str, |
| parameters: dict[str, str] | None, |
| ) -> dict[str, Any]: |
| """Return a copy of the trajectory with bound parameter values substituted.""" |
| if not parameters: |
| return trajectory |
|
|
| schema = load_skill_schema(skill) |
| bindings = schema.get("bindings") or [] |
| if not bindings: |
| return trajectory |
|
|
| result = copy.deepcopy(trajectory) |
| steps = result.get("steps") or [] |
|
|
| for index, step in enumerate(steps): |
| action = step.get("action") |
| if not isinstance(action, dict): |
| continue |
|
|
| for binding in bindings: |
| param_name = binding.get("parameter") |
| if not param_name or param_name not in parameters: |
| continue |
| if not _binding_matches(binding, step, steps, index): |
| continue |
| step["action"] = _apply_binding(action, binding, str(parameters[param_name])) |
| action = step["action"] |
|
|
| return result |
|
|
|
|
| def _self_test() -> None: |
| from src.skill_router import load_trajectory |
|
|
| whatsapp = load_trajectory("whatsapp_send_message") |
| bound = apply_parameters( |
| whatsapp, |
| "whatsapp_send_message", |
| {"contact": "mom", "message": "i'm on my way"}, |
| ) |
|
|
| search_values = [ |
| step["action"]["value"] |
| for step in bound["steps"] |
| if step["action"]["type"] == "set_text" |
| and _suffix_matches(step["action"].get("resourceId"), "search_input") |
| ] |
| message_values = [ |
| step["action"]["value"] |
| for step in bound["steps"] |
| if step["action"]["type"] == "set_text" |
| and _suffix_matches(step["action"].get("resourceId"), "entry") |
| ] |
|
|
| assert search_values and all(value == "mom" for value in search_values), search_values |
| assert message_values and all(value == "i'm on my way" for value in message_values), message_values |
|
|
| post_search_clicks = [ |
| step["action"] |
| for index, step in enumerate(bound["steps"]) |
| if step["action"]["type"] == "click" |
| and find_preceding_search_step(bound["steps"], index) is not None |
| ] |
| assert post_search_clicks and all(click.get("text") == "mom" for click in post_search_clicks) |
|
|
| gmail = load_trajectory("gmail_send_email") |
| bound_gmail = apply_parameters( |
| gmail, |
| "gmail_send_email", |
| {"recipient": "alice@example.com", "message": "running late"}, |
| ) |
| recipient_values = [ |
| step["action"]["value"] |
| for step in bound_gmail["steps"] |
| if _is_compose_recipient_step(step) |
| ] |
| body_values = [ |
| step["action"]["value"] |
| for step in bound_gmail["steps"] |
| if step["action"]["type"] == "set_text" |
| and _suffix_matches(step["action"].get("resourceId"), "editor") |
| ] |
| assert recipient_values and all(value == "alice@example.com" for value in recipient_values), recipient_values |
| assert body_values and all(value == "running late" for value in body_values), body_values |
|
|
| youtube = load_trajectory("youtube_search") |
| bound_youtube = apply_parameters( |
| youtube, |
| "youtube_search", |
| {"query": "pasta recipes"}, |
| ) |
| youtube_search_values = [ |
| step["action"]["value"] |
| for step in bound_youtube["steps"] |
| if step["action"]["type"] == "set_text" |
| and _suffix_matches(step["action"].get("resourceId"), "search_edit_text") |
| ] |
| assert youtube_search_values and all(value == "pasta recipes" for value in youtube_search_values), youtube_search_values |
|
|
|
|
| if __name__ == "__main__": |
| _self_test() |
| print("parameter_binder: ok") |
|
|