| import os |
| import time |
| from typing import Optional |
| from backend.database import SessionLocal, Tool, ToolExecution, Connection |
| from backend.composio_client import get_composio |
|
|
|
|
| class ToolExecutor: |
| async def execute( |
| self, |
| tool_id: str, |
| params: dict, |
| connection_id: Optional[str] = None, |
| user_id: str = "", |
| source: str = "api" |
| ) -> dict: |
| start = time.time() |
| status = "success" |
| error_message = None |
| result = None |
|
|
| composio = get_composio() |
| try: |
| if composio: |
| resp = composio.tools.execute( |
| slug=tool_id, |
| arguments=params, |
| connected_account_id=connection_id, |
| user_id=user_id, |
| dangerously_skip_version_check=True, |
| ) |
| result = resp.get("data", resp) |
| else: |
| result = {"tool_id": tool_id, "params": params, "message": "Mock execution - set COMPOSIO_API_KEY for real execution"} |
| except Exception as e: |
| status = "error" |
| error_message = str(e) |
| result = {"error": error_message} |
|
|
| latency = int((time.time() - start) * 1000) |
| self._log_execution(user_id, tool_id, connection_id, params, result, status, latency, error_message, source) |
| return {"execution_id": f"exec_{int(time.time()*1000)}", "status": status, "result": result, "latency_ms": latency, "error": error_message} |
|
|
| def _log_execution(self, user_id, tool_id, connection_id, params, result, status, latency, error, source): |
| if not user_id: |
| return |
| db = SessionLocal() |
| try: |
| db.add(ToolExecution( |
| user_id=user_id, tool_id=tool_id, connection_id=connection_id, |
| input_params=params, output_result=result, status=status, |
| latency_ms=latency, error_message=error, source=source |
| )) |
| db.commit() |
| finally: |
| db.close() |
|
|
|
|
| executor = ToolExecutor() |
|
|