Aoun-Ai / app /services /tools /tool_executor.py
MuhammadMahmoud's picture
enhance system prompt
06bb2e7
"""
Tool Executor for running LLM-requested tools against real .NET APIs.
"""
import asyncio
import logging
from typing import Any, Optional
import httpx
from app.core.config import settings
logger = logging.getLogger(__name__)
class ToolExecutor:
"""Executes tool calls with timeout, normalization, and safe errors."""
def __init__(self):
self.base_url = settings.DOTNET_API_BASE_URL.rstrip("/")
self.timeout = settings.DOTNET_API_TIMEOUT_SECONDS
def _error(self, message: str, status: Optional[int] = None) -> dict:
payload = {"error": message, "source": "dotnet_api"}
if status is not None:
payload["http_status"] = status
return payload
def _headers(self, args: dict) -> dict:
headers = {}
token = args.get("access_token")
if token:
headers["Authorization"] = f"Bearer {token}"
internal_key = args.get("internal_key") or settings.DOTNET_INTERNAL_KEY
if internal_key:
headers["X-Internal-Key"] = internal_key
family_id = args.get("family_id")
if family_id is not None and str(family_id).strip():
headers["X-Family-Id"] = str(family_id).strip()
return headers
def _translate_status(self, status: Any) -> Any:
if not status or not isinstance(status, str):
return status
status_map = {
"pending": "قيد الانتظار",
"inreview": "تحت المراجعة",
"approved": "مقبول",
"rejected": "مرفوض",
"completed": "مكتمل",
"cancelled": "ملغى",
"canceled": "ملغى",
}
return status_map.get(status.lower(), status)
@staticmethod
def _unwrap_api_envelope(data: Any) -> Any:
if isinstance(data, dict) and "data" in data:
return data["data"]
return data
async def _call(
self,
method: str,
path: str,
args: dict,
*,
params: Optional[dict] = None,
json_payload: Optional[dict] = None,
) -> dict:
if not self.base_url:
return self._error("لم يتم ضبط عنوان خدمة .NET في الإعدادات.")
url = f"{self.base_url}{path}"
headers = self._headers(args)
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.request(
method=method,
url=url,
headers=headers,
params=params,
json=json_payload,
)
except httpx.TimeoutException:
logger.error("Tool call timeout: %s %s", method, path)
return self._error("انتهى وقت الاتصال بالخدمة الخلفية.", 504)
except Exception as exc:
logger.error("Tool transport error: %s", exc, exc_info=True)
return self._error("تعذر الاتصال بالخدمة الخلفية.", 503)
raw_body: Any
try:
raw_body = response.json()
except Exception:
raw_body = {"message": response.text}
if response.status_code >= 400:
message = "فشل تنفيذ الطلب."
if isinstance(raw_body, dict):
message = raw_body.get("message") or raw_body.get("error") or message
return self._error(message, response.status_code)
unwrapped = self._unwrap_api_envelope(raw_body)
if isinstance(unwrapped, dict):
return unwrapped
return {"result": unwrapped}
async def get_request_status(self, args: dict) -> dict:
request_id = str(args.get("request_id", "")).strip()
if not request_id:
return self._error("رقم الطلب مطلوب.", 400)
payload = await self._call("GET", f"/api/Requests/{request_id}", args)
if "error" in payload:
return payload
return {
"request_id": request_id,
"status": self._translate_status(payload.get("status")),
"need_score": payload.get("needScore"),
"priority_score": payload.get("priorityScore"),
"created_at": payload.get("createdAt"),
"ai_prediction_status": payload.get("aiPredictionStatus"),
}
async def get_required_documents(self, args: dict) -> dict:
assistance_type = args.get("assistance_type", "General")
normalized_type = assistance_type.strip().title()
payload = await self._call(
"GET",
"/api/Requests/required-documents",
args,
params={"assistanceType": normalized_type},
)
if "error" in payload:
return payload
return {
"assistance_type": payload.get("assistanceType", assistance_type),
"documents": payload.get("documents", []),
"notes": payload.get("notes", ""),
}
async def calculate_eligibility(self, args: dict) -> dict:
body = {
"incomeMonthly": args.get("income_monthly", 0),
"familySize": args.get("family_size", 1),
"debts": args.get("debts", 0),
}
payload = await self._call(
"POST",
"/api/Requests/eligibility-check",
args,
json_payload=body,
)
if "error" in payload:
return payload
return {
"eligibility_level": payload.get("eligibilityLevel"),
"primary_reason": payload.get("primaryReason"),
"score": payload.get("score"),
}
async def cancel_request(self, args: dict) -> dict:
request_id = str(args.get("request_id", "")).strip()
if not request_id:
return self._error("رقم الطلب مطلوب.", 400)
payload = await self._call("PATCH", f"/api/Requests/{request_id}/cancel", args)
if "error" in payload:
return payload
return {"request_id": request_id, "cancelled": True, "details": payload}
async def update_family_info(self, args: dict) -> dict:
body = {
"firstName": args.get("first_name", ""),
"lastName": args.get("last_name", ""),
"phone": args.get("phone", ""),
"country": args.get("country", ""),
"governorate": args.get("governorate", ""),
"city": args.get("city", ""),
"neighborhood": args.get("neighborhood"),
}
payload = await self._call("PUT", "/api/Families/profile", args, json_payload=body)
if "error" in payload:
return payload
return {"updated": True, "profile": payload}
async def get_family_requests_summary(self, args: dict) -> dict:
payload = await self._call("GET", "/api/Families/statistics", args)
if "error" in payload:
return payload
return {
"total_requests": payload.get("totalRequests"),
"pending": payload.get("pendingRequests"),
"in_review": payload.get("inReviewRequests"),
"approved": payload.get("approvedRequests"),
"rejected": payload.get("rejectedRequests"),
"completed": payload.get("completedRequests"),
"cancelled": payload.get("cancelledRequests"),
"last_request_date": payload.get("lastRequestDate"),
}
async def get_family_profile(self, args: dict) -> dict:
payload = await self._call("GET", "/api/Families/profile", args)
if "error" in payload:
return payload
return payload
async def get_detailed_requests(self, args: dict) -> dict:
status = args.get("status", "")
params = {"pageSize": 5} # Limit to 5 requests to keep context manageable
if status and status.strip():
params["status"] = status.strip()
payload = await self._call("GET", "/api/Requests", args, params=params)
if "error" in payload:
return payload
items = payload.get("items", payload.get("data", payload))
if isinstance(items, list):
for item in items:
if isinstance(item, dict) and "status" in item:
item["status"] = self._translate_status(item["status"])
return {
"requests": items,
"note": "Showing up to 5 recent requests"
}
async def get_association_analytics(self, args: dict) -> dict:
payload = await self._call("GET", "/api/Associations/analytics", args)
if "error" in payload:
return payload
return payload
async def execute(self, tool_name: str, args: dict) -> dict:
try:
func = getattr(self, tool_name, None)
if not func or not callable(func):
return self._error(f"Tool '{tool_name}' is not registered.")
return await asyncio.wait_for(func(args), timeout=self.timeout + 1.0)
except asyncio.TimeoutError:
logger.error("Tool execution timeout: %s", tool_name)
return self._error("انتهى وقت التنفيذ، يرجى المحاولة لاحقاً.", 504)
except Exception as exc:
logger.error("Tool execution error for %s: %s", tool_name, exc, exc_info=True)
return self._error("حدث خطأ داخلي أثناء تنفيذ الأداة.", 500)
tool_executor = ToolExecutor()