from __future__ import annotations from http import HTTPStatus from typing import Any from dashscope import Application def _safe_get_output_text(response: Any) -> str | None: try: if hasattr(response, "output") and response.output is not None: if hasattr(response.output, "text"): return response.output.text if isinstance(response.output, dict): return response.output.get("text") except Exception: return None return None def _safe_to_dict(response: Any) -> dict[str, Any]: if response is None: return {"raw_type": "None"} data = { "raw_type": type(response).__name__, "status_code": getattr(response, "status_code", None), "request_id": getattr(response, "request_id", None), "code": getattr(response, "code", None), "message": getattr(response, "message", None), "output_text": _safe_get_output_text(response), } for attr in ["output", "usage", "headers"]: try: value = getattr(response, attr, None) if value is not None: if hasattr(value, "__dict__"): data[attr] = value.__dict__ else: data[attr] = value except Exception: pass try: if hasattr(response, "model_dump"): data["model_dump"] = response.model_dump() except Exception: pass try: if hasattr(response, "dict"): data["dict"] = response.dict() except Exception: pass return data def call_bailian_rag_app( *, api_key: str, app_id: str, prompt: str, ) -> dict[str, Any]: result = { "ok": False, "status_code": None, "request_id": None, "output_text": None, "message": None, "raw": None, } try: response = Application.call( api_key=api_key, app_id=app_id, prompt=prompt, ) result["raw"] = _safe_to_dict(response) result["status_code"] = getattr(response, "status_code", None) result["request_id"] = getattr(response, "request_id", None) result["output_text"] = _safe_get_output_text(response) result["message"] = getattr(response, "message", None) if getattr(response, "status_code", None) == HTTPStatus.OK: result["ok"] = True return result except Exception as e: result["message"] = f"{type(e).__name__}: {e}" result["raw"] = { "exception_type": type(e).__name__, "exception_message": str(e), } return result