File size: 2,487 Bytes
2f9831b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
import os
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
app = FastAPI(
title="SAP Purchase Order API Demo",
description=(
"Fetches purchase order data from SAP Sandbox API.\n\n"
"Designed for integration with OpenAI Agentkit UI — "
"Agentkit auto-discovers endpoints from `/openapi.json`."
),
version="1.0.0",
)
# Allow all origins for Agentkit / external calls
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Read secret from Hugging Face environment
SAP_API_KEY = os.getenv("SAP_API_KEY")
SAP_BASE_URL = (
"https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata4/"
"sap/api_purchaseorder_2/srvd_a2x/sap/purchaseorder/0001/PurchaseOrder?$top=50"
)
# --- Models ---
class PurchaseOrderItem(BaseModel):
PurchaseOrder: str | None = None
Supplier: str | None = None
CreatedByUser: str | None = None
CreatedDateTime: str | None = None
class PurchaseOrderResponse(BaseModel):
source: str
count: int
data: list[PurchaseOrderItem] | dict
# --- Routes ---
@app.get("/health", summary="Health check", tags=["System"])
async def health_check():
"""Simple endpoint to verify the backend is running."""
return {"status": "ok", "message": "SAP Purchase Order API backend is running"}
@app.get(
"/purchase-orders",
response_model=PurchaseOrderResponse,
summary="Get Purchase Orders",
tags=["Purchase Orders"],
description="Fetch top 50 purchase orders from SAP Sandbox API.",
)
async def get_purchase_orders():
"""Fetches purchase orders from SAP Sandbox API"""
if not SAP_API_KEY:
raise HTTPException(status_code=500, detail="SAP_API_KEY not found in environment")
headers = {"APIKey": SAP_API_KEY}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(SAP_BASE_URL, headers=headers)
resp.raise_for_status()
data = resp.json()
po_items = data.get("value", [])
return {"source": "SAP Sandbox", "count": len(po_items), "data": po_items}
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail=f"SAP API error: {e.response.text}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
|