TaskWeaver / executor /executor.py
PocketSkye's picture
Initial deployment
0242ab2
Raw
History Blame Contribute Delete
3.32 kB
from executor.action_registry import (
ACTION_REGISTRY
)
# Importing action modules automatically registers them
# into ACTION_REGISTRY during module initialization.
import executor.actions.goto_action
import executor.actions.click_action
import executor.actions.fill_action
import executor.actions.wait_action
import executor.actions.extract_action
import executor.actions.save_results_action
import executor.actions.search_action
from models.execution_log import (
ExecutionLog
)
class WorkflowExecutor:
async def execute(
self,
page,
workflow,
db=None,
run_id=None
):
# Store results returned by each workflow step
results = []
# Execute workflow steps one by one in the defined order
for index, step in enumerate(
workflow["steps"],
start=1
):
action = step["action"]
print(
"ACTION_REGISTRY =",
ACTION_REGISTRY
)
print(
"ACTION =",
action
)
# Find the function corresponding to this action
handler = ACTION_REGISTRY.get(
action
)
if not handler:
# Log failure if action is unsupported
if db and run_id:
db.add(
ExecutionLog(
run_id=run_id,
step_number=index,
action=action,
status="failed",
message=f"Unknown action: {action}"
)
)
db.commit()
raise Exception(
f"Unknown action: {action}"
)
try:
# Execute the Playwright action
result = await handler(
page,
step
)
results.append(
result
)
# Store successful execution log
if db and run_id:
db.add(
ExecutionLog(
run_id=run_id,
step_number=index,
action=action,
status=result.get(
"status",
"success"
),
message=f"{action} executed successfully"
)
)
db.commit()
except Exception as e:
# Log step failure before stopping workflow execution
if db and run_id:
db.add(
ExecutionLog(
run_id=run_id,
step_number=index,
action=action,
status="failed",
message=str(e)
)
)
db.commit()
raise e
# Return results collected from all workflow steps
return results