""" Tool Executor for running LLM-requested tools against real .NET APIs. """ import asyncio import logging from typing import Any, Optional import httpx from app.core.config import settings logger = logging.getLogger(__name__) class ToolExecutor: """Executes tool calls with timeout, normalization, and safe errors.""" def __init__(self): self.base_url = settings.DOTNET_API_BASE_URL.rstrip("/") self.timeout = settings.DOTNET_API_TIMEOUT_SECONDS def _error(self, message: str, status: Optional[int] = None) -> dict: payload = {"error": message, "source": "dotnet_api"} if status is not None: payload["http_status"] = status return payload def _headers(self, args: dict) -> dict: headers = {} token = args.get("access_token") if token: headers["Authorization"] = f"Bearer {token}" internal_key = args.get("internal_key") or settings.DOTNET_INTERNAL_KEY if internal_key: headers["X-Internal-Key"] = internal_key family_id = args.get("family_id") if family_id is not None and str(family_id).strip(): headers["X-Family-Id"] = str(family_id).strip() return headers def _translate_status(self, status: Any) -> Any: if not status or not isinstance(status, str): return status status_map = { "pending": "قيد الانتظار", "inreview": "تحت المراجعة", "approved": "مقبول", "rejected": "مرفوض", "completed": "مكتمل", "cancelled": "ملغى", "canceled": "ملغى", } return status_map.get(status.lower(), status) @staticmethod def _unwrap_api_envelope(data: Any) -> Any: if isinstance(data, dict) and "data" in data: return data["data"] return data async def _call( self, method: str, path: str, args: dict, *, params: Optional[dict] = None, json_payload: Optional[dict] = None, ) -> dict: if not self.base_url: return self._error("لم يتم ضبط عنوان خدمة .NET في الإعدادات.") url = f"{self.base_url}{path}" headers = self._headers(args) try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.request( method=method, url=url, headers=headers, params=params, json=json_payload, ) except httpx.TimeoutException: logger.error("Tool call timeout: %s %s", method, path) return self._error("انتهى وقت الاتصال بالخدمة الخلفية.", 504) except Exception as exc: logger.error("Tool transport error: %s", exc, exc_info=True) return self._error("تعذر الاتصال بالخدمة الخلفية.", 503) raw_body: Any try: raw_body = response.json() except Exception: raw_body = {"message": response.text} if response.status_code >= 400: message = "فشل تنفيذ الطلب." if isinstance(raw_body, dict): message = raw_body.get("message") or raw_body.get("error") or message return self._error(message, response.status_code) unwrapped = self._unwrap_api_envelope(raw_body) if isinstance(unwrapped, dict): return unwrapped return {"result": unwrapped} async def get_request_status(self, args: dict) -> dict: request_id = str(args.get("request_id", "")).strip() if not request_id: return self._error("رقم الطلب مطلوب.", 400) payload = await self._call("GET", f"/api/Requests/{request_id}", args) if "error" in payload: return payload return { "request_id": request_id, "status": self._translate_status(payload.get("status")), "need_score": payload.get("needScore"), "priority_score": payload.get("priorityScore"), "created_at": payload.get("createdAt"), "ai_prediction_status": payload.get("aiPredictionStatus"), } async def get_required_documents(self, args: dict) -> dict: assistance_type = args.get("assistance_type", "General") normalized_type = assistance_type.strip().title() payload = await self._call( "GET", "/api/Requests/required-documents", args, params={"assistanceType": normalized_type}, ) if "error" in payload: return payload return { "assistance_type": payload.get("assistanceType", assistance_type), "documents": payload.get("documents", []), "notes": payload.get("notes", ""), } async def calculate_eligibility(self, args: dict) -> dict: body = { "incomeMonthly": args.get("income_monthly", 0), "familySize": args.get("family_size", 1), "debts": args.get("debts", 0), } payload = await self._call( "POST", "/api/Requests/eligibility-check", args, json_payload=body, ) if "error" in payload: return payload return { "eligibility_level": payload.get("eligibilityLevel"), "primary_reason": payload.get("primaryReason"), "score": payload.get("score"), } async def cancel_request(self, args: dict) -> dict: request_id = str(args.get("request_id", "")).strip() if not request_id: return self._error("رقم الطلب مطلوب.", 400) payload = await self._call("PATCH", f"/api/Requests/{request_id}/cancel", args) if "error" in payload: return payload return {"request_id": request_id, "cancelled": True, "details": payload} async def update_family_info(self, args: dict) -> dict: body = { "firstName": args.get("first_name", ""), "lastName": args.get("last_name", ""), "phone": args.get("phone", ""), "country": args.get("country", ""), "governorate": args.get("governorate", ""), "city": args.get("city", ""), "neighborhood": args.get("neighborhood"), } payload = await self._call("PUT", "/api/Families/profile", args, json_payload=body) if "error" in payload: return payload return {"updated": True, "profile": payload} async def get_family_requests_summary(self, args: dict) -> dict: payload = await self._call("GET", "/api/Families/statistics", args) if "error" in payload: return payload return { "total_requests": payload.get("totalRequests"), "pending": payload.get("pendingRequests"), "in_review": payload.get("inReviewRequests"), "approved": payload.get("approvedRequests"), "rejected": payload.get("rejectedRequests"), "completed": payload.get("completedRequests"), "cancelled": payload.get("cancelledRequests"), "last_request_date": payload.get("lastRequestDate"), } async def get_family_profile(self, args: dict) -> dict: payload = await self._call("GET", "/api/Families/profile", args) if "error" in payload: return payload return payload async def get_detailed_requests(self, args: dict) -> dict: status = args.get("status", "") params = {"pageSize": 5} # Limit to 5 requests to keep context manageable if status and status.strip(): params["status"] = status.strip() payload = await self._call("GET", "/api/Requests", args, params=params) if "error" in payload: return payload items = payload.get("items", payload.get("data", payload)) if isinstance(items, list): for item in items: if isinstance(item, dict) and "status" in item: item["status"] = self._translate_status(item["status"]) return { "requests": items, "note": "Showing up to 5 recent requests" } async def get_association_analytics(self, args: dict) -> dict: payload = await self._call("GET", "/api/Associations/analytics", args) if "error" in payload: return payload return payload async def execute(self, tool_name: str, args: dict) -> dict: try: func = getattr(self, tool_name, None) if not func or not callable(func): return self._error(f"Tool '{tool_name}' is not registered.") return await asyncio.wait_for(func(args), timeout=self.timeout + 1.0) except asyncio.TimeoutError: logger.error("Tool execution timeout: %s", tool_name) return self._error("انتهى وقت التنفيذ، يرجى المحاولة لاحقاً.", 504) except Exception as exc: logger.error("Tool execution error for %s: %s", tool_name, exc, exc_info=True) return self._error("حدث خطأ داخلي أثناء تنفيذ الأداة.", 500) tool_executor = ToolExecutor()