Spaces:
Sleeping
Sleeping
| from public_api.public_api_auth import login | |
| import requests | |
| def get_item(session_data, warehouse_id, item_number): | |
| url = "http://euwdsw202em01:4660/api/item/v1/items" | |
| session = session_data["wms_auth"] | |
| # Using your specific required parameters | |
| params = { | |
| "itemNumber": item_number, | |
| "warehouseId": warehouse_id | |
| #"itemClientId": "----" | |
| } | |
| headers = {"accept": "application/json", "Content-Type": "application/json"} | |
| try: | |
| response = session.get(url, params=params, headers=headers, verify=False) | |
| print("response",response.text) | |
| # Logic for first attempt and retry | |
| if response.status_code == 401: | |
| print("Session expired. Re-logging in...") | |
| new_session, new_wh, success = login() | |
| if success: | |
| session_data["wms_auth"] = new_session | |
| session_data["user_warehouse_id"] = new_wh | |
| # Retry immediately | |
| response = new_session.get(url, params=params, headers=headers, verify=False) | |
| if response.status_code == 200: | |
| try: | |
| res_json = response.json() | |
| print("res_json",res_json) | |
| all_data = res_json.get("items", []) | |
| print("all_data",all_data) | |
| #print(f"DEBUG: Raw data from API: {response.json()}") | |
| #print(f"DEBUG: Data from API: {all_data}") | |
| # Local filtering for the specific warehouse | |
| #filtered_data = [attr for attr in all_data if attr.get("warehouseId") == warehouse_id] | |
| return {"data": all_data, "error": None} | |
| except ValueError: | |
| return {"data": [], "error": "Invalid JSON format from server."} | |
| # Minimalist error handling for all other cases (404, 500, etc.) | |
| print(f"Items API response code: {response.status_code}\n{response.text}") | |
| return {"data": [], "error": f"Items API Error: {response.text}"} | |
| except Exception as e: | |
| print(f"Items API exception: {e}") | |
| return {"data": [], "error": f"Items API Connection failed: {str(e)}"} |